PySpark - Read and Write Avro Files
Apache Avro is a data serialization system like Thrift and Protocol Buffers. It also provides a compact, fast and binary data format to store persistent data in a container file. When data is stored as Avro, the schema information is also stored with it; thus it can be easily read later by any programming languages. This article provides some examples of reading and writing data with Avro format in Spark.
Environment
The sample code snippets in this article runs in Spark 3.2.1 in WSL 2 Ubuntu distro. You can follow this page Install Spark 3.2.1 on Linux or WSL to setup a Spark environment.
Save DataFrame as Avro format
The following code snippet creates a DataFrame in memory and then save it as Avro format. The data is stored in local file system instead of HDFS.
#avro-example.py from pyspark.sql import SparkSession appName = "PySpark Example - Read and Write Avro" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() # List data = [{ 'col1': 'Category A', 'col2': 100 }, { 'col1': 'Category B', 'col2': 200 }, { 'col1': 'Category C', 'col2': 300 }] df = spark.createDataFrame(data) df.show() # Save as Avro df.write.format('avro').mode('overwrite').save( 'file:///home/kontext/pyspark-examples/data/avro-test')
Run the script
We can then run the script using spark-submit
command. Avro package is not added into Spark jar lib directly. Thus we need to either download it and add it into Spark class path (SPARK_DIST_CLASSPATH). The easy way is to directly add it as package dependency and Spark will download it before the application runs. The following command line shows how to do that:
spark-submit --packages org.apache.spark:spark-avro_2.12:3.3.0 avro-example.py
Once the script is executed successfully, the script will create data in the local file system as the screenshot shows:
About *.avro.crc file
*.avro.crc
file is the checksum file which can be used to validate if the data file has been modified after it is generated. It is a method to protect data.
Load Avro files
Now we can also read the data using Avro data deserializer. This can be done by adding the following lines to the previous one:
# Read Avro df2 = spark.read.format('avro').load( 'file:///home/kontext/pyspark-examples/data/avro-test') df2.show()
Run the script using the same command line:
spark-submit --packages org.apache.spark:spark-avro_2.12:3.3.0 avro-example.py
Output:
+----------+----+ | col1|col2| +----------+----+ |Category A| 100| |Category B| 200| |Category C| 300| +----------+----+
Avro and Spark SQL data types conversion
Spark supports reading all primitive types in Avro and also logical types.
Avro type | Spark SQL type |
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union |
|
Read and write options
When reading or writing Avro data in Spark via DataFrameReader
or DataFrameWriter
, there are a few options we can specify:
- avroSchema - Optional schema JSON file.
- recordName - Top record name in write result. Default value is topLevelRecord.
- recordNamespace - Record namespace in write result. Default value is "".
- ignoreExtension - Ignore file extensions and load all files in the path specified. Default value is true.
- compression - Compression codec. Default is snappy. The possible values are uncompressed, snappy, deflate, bzip2, xz and zstandard. This can also be specified using Spark configuration
spark.sql.avro.compression.codec
. - positionalFieldMatching - Whether to use positional field matching between Avro schema and SQL schema.
- mode - The input value for
from_avro
function. Default value isFAILFAST
which throws errors immediately on processing corrupted records; the other option isPERMISSIVE
which processes corrupted records as NULL. The second option may change the schema compared with the provided. - datetimeRebaseMode - Rebase mode to be used for date, datetime, timestamp types when converting from the Julian to Proleptic Gregorian calendar. By default, the value is
EXCEPTION
. The other possible values areCORRECTED
andLEGACY
. Alternatively, it can also be specified using Spark configurationsspark.sql.avro.datetimeRebaseModeInRead
andspark.sql.avro.datetimeRebaseModeInWrite
.
I hope this gives you a good summary about reading and writing Avro files in Spark. If you have any questions, feel free to post a comment.