Spark spark.sql.files.maxPartitionBytes Explained in Detail
Spark configuration property spark.sql.files.maxPartitionBytes
is used to specify the maximum number of bytes to pack into a single partition when reading from file sources like Parquet, JSON, ORC, CSV, etc.
Default value
The default value for this property is 134217728 (128MB). If the input file's blocks or single partition file are bigger than 128MB, Spark will read one part/block into multiple partitions.
Let's use one example to look into details.
Read single partition file
For demo purpose, I will utilize the public data from New York City taxi trip record data. The data is available in public S3 buckets. For example, https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com.
The file I am downloading is from Jan 2009.
<Contents> <Key>2009/01/data.parquet</Key> <LastModified>2020-01-17T16:52:57.000Z</LastModified> <ETag>"880538d41446f7b8083573b44f0b8b37-27"</ETag> <Size>461966527</Size> <StorageClass>STANDARD</StorageClass> </Contents>
The file size is 461966527 bytes (441MB). This is bigger than the default partition size.
Now let's read it using local PySpark.
Read parquet file in PySpark
Create a PySpark script with the following content:
from pyspark.sql import SparkSession from pyspark import SparkConf app_name = "PySpark Example" master = "local" conf = SparkConf().setAppName(app_name)\ .setMaster(master) spark = SparkSession.builder.config(conf=conf) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") file_path = 'file:///mnt/e/Downloads/data.parquet' df = spark.read.format('parquet').load(file_path) print(df.count()) print(df.rdd.getNumPartitions()) df.explain(mode='formatted')
Output:
14092413 4 == Physical Plan == * ColumnarToRow (2) +- Scan parquet (1) (1) Scan parquet Output [18]: [vendor_id#0, pickup_at#1, dropoff_at#2, passenger_count#3, trip_distance#4, pickup_longitude#5, pickup_latitude#6, rate_code_id#7, store_and_fwd_flag#8, dropoff_longitude#9, dropoff_latitude#10, payment_type#11, fare_amount#12, extra#13, mta_tax#14, tip_amount#15, tolls_amount#16, total_amount#17] Batched: true Location: InMemoryFileIndex [file:/mnt/e/Downloads/data.parquet] ReadSchema: struct<vendor_id:string,pickup_at:timestamp,dropoff_at:timestamp,passenger_count:tinyint,trip_distance:float,pickup_longitude:float,pickup_latitude:float,rate_code_id:int,store_and_fwd_flag:string,dropoff_longitude:float,dropoff_latitude:float,payment_type:string,fare_amount:float,extra:float,mta_tax:float,tip_amount:float,tolls_amount:float,total_amount:float> (2) ColumnarToRow [codegen id : 1] Input [18]: [vendor_id#0, pickup_at#1, dropoff_at#2, passenger_count#3, trip_distance#4, pickup_longitude#5, pickup_latitude#6, rate_code_id#7, store_and_fwd_flag#8, dropoff_longitude#9, dropoff_latitude#10, payment_type#11, fare_amount#12, extra#13, mta_tax#14, tip_amount#15, tolls_amount#16, total_amount#17]
From the output we can tell that there are 14,092,413 records in the DataFrame and it was read into 4 partitions.
When creating the SparkSession, the default parallelism is 1 (as I am using localhost as master). However 4 partitions are created because the file size is 441MB (3*128MB + 57MB).
Reduce partitions
To reduce the partition count, we can increase the value for property spark.sql.files.maxPartitionBytes
.
The following code snippet increase is to 384MB.
conf = SparkConf().setAppName(app_name)\ .setMaster(master) \ .set('spark.sql.files.maxPartitionBytes','402653184') spark = SparkSession.builder.config(conf=conf) \ .getOrCreate()
Now running the same code, the output will be just 2 partitions (one reads 384MB data and the other reads the rest of the data).
Output:
14092413 2 == Physical Plan == * ColumnarToRow (2) +- Scan parquet (1) (1) Scan parquet Output [18]: [vendor_id#0, pickup_at#1, dropoff_at#2, passenger_count#3, trip_distance#4, pickup_longitude#5, pickup_latitude#6, rate_code_id#7, store_and_fwd_flag#8, dropoff_longitude#9, dropoff_latitude#10, payment_type#11, fare_amount#12, extra#13, mta_tax#14, tip_amount#15, tolls_amount#16, total_amount#17] Batched: true Location: InMemoryFileIndex [file:/mnt/e/Downloads/data.parquet] ReadSchema: struct<vendor_id:string,pickup_at:timestamp,dropoff_at:timestamp,passenger_count:tinyint,trip_distance:float,pickup_longitude:float,pickup_latitude:float,rate_code_id:int,store_and_fwd_flag:string,dropoff_longitude:float,dropoff_latitude:float,payment_type:string,fare_amount:float,extra:float,mta_tax:float,tip_amount:float,tolls_amount:float,total_amount:float> (2) ColumnarToRow [codegen id : 1] Input [18]: [vendor_id#0, pickup_at#1, dropoff_at#2, passenger_count#3, trip_distance#4, pickup_longitude#5, pickup_latitude#6, rate_code_id#7, store_and_fwd_flag#8, dropoff_longitude#9, dropoff_latitude#10, payment_type#11, fare_amount#12, extra#13, mta_tax#14, tip_amount#15, tolls_amount#16, total_amount#17]
I hope you learned something from this article today! If you have any questions, feel free to post a comment.