PySpark - Read and Write Avro Files

Kontext Kontext event 2022-06-18 visibility 7,970
more_vert

Apache Avro is a data serialization system like Thrift and Protocol Buffers. It also provides a compact, fast and binary data format to store persistent data in a container file. When data is stored as Avro, the schema information is also stored with it; thus it can be easily read later by any programming languages. This article provides some examples of reading and writing data with Avro format in Spark. 

Environment

The sample code snippets in this article runs in Spark 3.2.1 in WSL 2 Ubuntu distro. You can follow this page Install Spark 3.2.1 on Linux or WSL to setup a Spark environment.

Save DataFrame as Avro format

The following code snippet creates a DataFrame in memory and then save it as Avro format. The data is stored in local file system instead of HDFS. 

#avro-example.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Read and Write Avro"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# List
data = [{
    'col1': 'Category A',
    'col2': 100
}, {
    'col1': 'Category B',
    'col2': 200
}, {
    'col1': 'Category C',
    'col2': 300
}]

df = spark.createDataFrame(data)
df.show()

# Save as Avro
df.write.format('avro').mode('overwrite').save(
    'file:///home/kontext/pyspark-examples/data/avro-test')

Run the script

We can then run the script using spark-submit command. Avro package is not added into Spark jar lib directly. Thus we need to either download it and add it into Spark class path (SPARK_DIST_CLASSPATH). The easy way is to directly add it as package dependency and Spark will download it before the application runs. The following command line shows how to do that:

spark-submit --packages org.apache.spark:spark-avro_2.12:3.3.0 avro-example.py 

Once the script is executed successfully, the script will create data in the local file system as the screenshot shows:

2022061810304-image.png

About *.avro.crc file

*.avro.crc file is the checksum file which can be used to validate if the data file has been modified after it is generated. It is a method to protect data.

Load Avro files

Now we can also read the data using Avro data deserializer. This can be done by adding the following lines to the previous one:

# Read Avro
df2 = spark.read.format('avro').load(
    'file:///home/kontext/pyspark-examples/data/avro-test')
df2.show()

Run the script using the same command line:

spark-submit --packages org.apache.spark:spark-avro_2.12:3.3.0 avro-example.py 

Output:

+----------+----+
|      col1|col2|
+----------+----+
|Category A| 100|
|Category B| 200|
|Category C| 300|
+----------+----+

Avro and Spark SQL data types conversion

Spark supports reading all primitive types in Avro and also logical types.

Avro typeSpark SQL type
boolean
BooleanType
int
IntegerType
long
LongType
floatFloatType
doubleDoubleType
stringStringType
enumStringType
fixedBinaryType
bytesBinaryType
recordStructType
arrayArrayType
mapMapType
union
  • union(int, long) will be mapped to LongType.
  • union(float, double) will be mapped to DoubleType.
  • union(something, null), where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet.

Read and write options

When reading or writing Avro data in Spark via DataFrameReader or DataFrameWriter, there are a few options we can specify:

  • avroSchema - Optional schema JSON file.
  • recordName - Top record name in write result. Default value is topLevelRecord.
  • recordNamespace - Record namespace in write result. Default value is "".
  • ignoreExtension - Ignore file extensions and load all files in the path specified. Default value is true.
  • compression - Compression codec. Default is snappy. The possible values are uncompressed, snappy, deflate, bzip2, xz and zstandard. This can also be specified using Spark configuration spark.sql.avro.compression.codec.
  • positionalFieldMatching - Whether to use positional field matching between Avro schema and SQL schema.
  • mode - The input value for from_avro function. Default value is FAILFAST which throws errors immediately on processing corrupted records; the other option is PERMISSIVE which processes corrupted records as NULL. The second option may change the schema compared with the provided.
  • datetimeRebaseMode - Rebase mode to be used for date, datetime, timestamp types when converting from the Julian to Proleptic Gregorian calendar. By default, the value is EXCEPTION. The other possible values are CORRECTED and LEGACY. Alternatively, it can also be specified using Spark configurations spark.sql.avro.datetimeRebaseModeInRead and spark.sql.avro.datetimeRebaseModeInWrite.

I hope this gives you a good summary about reading and writing Avro files in Spark. If you have any questions, feel free to post a comment.

References

Apache Avro Data Source Guide

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts