Spark spark.sql.files.maxPartitionBytes Explained in Detail

Kontext Kontext event 2022-08-21 visibility 7,661
more_vert

Spark configuration property spark.sql.files.maxPartitionBytes is used to specify the maximum number of bytes to pack into a single partition when reading from file sources like Parquet, JSON, ORC, CSV, etc. 

Default value

The default value for this property is 134217728 (128MB). If the input file's blocks or single partition file are bigger than 128MB, Spark will read one part/block into multiple partitions.

Let's use one example to look into details.

Read single partition file

For demo purpose, I will utilize the public data from New York City taxi trip record data. The data is available in public S3 buckets. For example, https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com.

The file I am downloading is from Jan 2009.

<Contents>
<Key>2009/01/data.parquet</Key>
<LastModified>2020-01-17T16:52:57.000Z</LastModified>
<ETag>"880538d41446f7b8083573b44f0b8b37-27"</ETag>
<Size>461966527</Size>
<StorageClass>STANDARD</StorageClass>
</Contents>

The file size is 461966527 bytes (441MB). This is bigger than the default partition size.

Now let's read it using local PySpark.

Read parquet file in PySpark

Create a PySpark script with the following content:

from pyspark.sql import SparkSession
from pyspark import SparkConf

app_name = "PySpark Example"
master = "local"

conf = SparkConf().setAppName(app_name)\
    .setMaster(master)

spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

file_path = 'file:///mnt/e/Downloads/data.parquet'

df = spark.read.format('parquet').load(file_path)

print(df.count())

print(df.rdd.getNumPartitions())

df.explain(mode='formatted')

Output:

14092413
4
== Physical Plan ==
* ColumnarToRow (2)
+- Scan parquet  (1)


(1) Scan parquet 
Output [18]: [vendor_id#0, pickup_at#1, dropoff_at#2, passenger_count#3, trip_distance#4, pickup_longitude#5, pickup_latitude#6, rate_code_id#7, store_and_fwd_flag#8, dropoff_longitude#9, dropoff_latitude#10, payment_type#11, fare_amount#12, extra#13, mta_tax#14, tip_amount#15, tolls_amount#16, total_amount#17]
Batched: true
Location: InMemoryFileIndex [file:/mnt/e/Downloads/data.parquet]
ReadSchema: struct<vendor_id:string,pickup_at:timestamp,dropoff_at:timestamp,passenger_count:tinyint,trip_distance:float,pickup_longitude:float,pickup_latitude:float,rate_code_id:int,store_and_fwd_flag:string,dropoff_longitude:float,dropoff_latitude:float,payment_type:string,fare_amount:float,extra:float,mta_tax:float,tip_amount:float,tolls_amount:float,total_amount:float>

(2) ColumnarToRow [codegen id : 1]
Input [18]: [vendor_id#0, pickup_at#1, dropoff_at#2, passenger_count#3, trip_distance#4, pickup_longitude#5, pickup_latitude#6, rate_code_id#7, store_and_fwd_flag#8, dropoff_longitude#9, dropoff_latitude#10, payment_type#11, fare_amount#12, extra#13, mta_tax#14, tip_amount#15, tolls_amount#16, total_amount#17]

From the output we can tell that there are 14,092,413 records in the DataFrame and it was read into 4 partitions.

When creating the SparkSession, the default parallelism is 1 (as I am using localhost as master). However 4 partitions are created because the file size is 441MB (3*128MB + 57MB).

Reduce partitions

To reduce the partition count, we can increase the value for property spark.sql.files.maxPartitionBytes.

The following code snippet increase is to 384MB.

conf = SparkConf().setAppName(app_name)\
    .setMaster(master) \
    .set('spark.sql.files.maxPartitionBytes','402653184')

spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

Now running the same code, the output will be just 2 partitions (one reads 384MB data and the other reads the rest of the data).

Output:

14092413
2
== Physical Plan ==
* ColumnarToRow (2)
+- Scan parquet  (1)


(1) Scan parquet 
Output [18]: [vendor_id#0, pickup_at#1, dropoff_at#2, passenger_count#3, trip_distance#4, pickup_longitude#5, pickup_latitude#6, rate_code_id#7, store_and_fwd_flag#8, dropoff_longitude#9, dropoff_latitude#10, payment_type#11, fare_amount#12, extra#13, mta_tax#14, tip_amount#15, tolls_amount#16, total_amount#17]
Batched: true
Location: InMemoryFileIndex [file:/mnt/e/Downloads/data.parquet]
ReadSchema: struct<vendor_id:string,pickup_at:timestamp,dropoff_at:timestamp,passenger_count:tinyint,trip_distance:float,pickup_longitude:float,pickup_latitude:float,rate_code_id:int,store_and_fwd_flag:string,dropoff_longitude:float,dropoff_latitude:float,payment_type:string,fare_amount:float,extra:float,mta_tax:float,tip_amount:float,tolls_amount:float,total_amount:float>

(2) ColumnarToRow [codegen id : 1]
Input [18]: [vendor_id#0, pickup_at#1, dropoff_at#2, passenger_count#3, trip_distance#4, pickup_longitude#5, pickup_latitude#6, rate_code_id#7, store_and_fwd_flag#8, dropoff_longitude#9, dropoff_latitude#10, payment_type#11, fare_amount#12, extra#13, mta_tax#14, tip_amount#15, tolls_amount#16, total_amount#17]

I hope you learned something from this article today! If you have any questions, feel free to post a comment.

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts