Differences between spark.sql.shuffle.partitions and spark.default.parallelism
In Spark, there are two commonly used parallelism configurations: spark.sql.shuffle.partitions
and spark.default.parallelism
. This article summarizes the key differences between them.
About spark.default.parallelism
Configuration spark.default.parallelism
specifies the default number of partitions in RDDs returned by transformations like join
, reduceByKey
, and parallelize
when not set by user.
The default value for this configuration item itself depends on the cluster type and operations:
- For shuffle operations like
reduceByKey
andjoin
, the default value is the largest number of partitions in a upstream RDD. - For operations like
parallelize
with no parent RDDs, it depends on the cluster manager:- Local mode: number of cores on the local machine
- Mesos fine grained mode: 8
- Others: total number of cores on all executor nodes or 2, whichever is larger
This configuration can also be used when reading files as minimum split file partitions when configuration spark.sql.files.minPartitionNum
is not specified.
About spark.sql.shuffle.partitions
This configuration is used to specify the default number partitions when shuffles data for aggregations or joins. The default value is 200.
When using repartition
function, the default partition number will be this configuration value.
Key differences
Configuration spark.default.parallelism is mainly used when directly working with RDDs (not DataFrame) while spark.sql.shuffle.partitions
is used by Spark SQL engine.
Configure these two items
Depends on how you are running your code, there can be different approaches to set these two configuration items.
Via SparkSession.conf.set
We can set the configuration directly:
spark.conf.set('spark.sql.shuffle.partitions','100') spark.conf.set('spark.default.parallelism','100')
We can also specify it before creating SparkSession
object:
conf = SparkConf().setAppName(app_name)\ .setMaster(master) \ .set('spark.default.parallelism', '100') \ .set('spark.sql.shuffle.partitions', '100') spark = SparkSession.builder.config(conf=conf) \ .getOrCreate()
Via spark-submit command
If you don't want to specify the configuration items into the code, you can add them when submit Spark application:
spark-submit --conf spark.sql.shuffle.partitions=100 --conf spark.default.parallelism=100
Examples
Now let's look into some of the examples of these two configuration items.
The following example reads JSON data in my local system:
from pyspark.sql import SparkSession
from pyspark import SparkConf
app_name = "PySpark Example"
master = "local[2]"
conf = SparkConf().setAppName(app_name)\
.setMaster(master) \
.set('spark.default.parallelism', '100')
spark = SparkSession.builder.config(conf=conf) \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
df = spark.read.format('json').load(
'file:///home/kontext/pyspark-examples/data/json-example')
print(df.rdd.getNumPartitions())
The printed out number of partitions is 1 because there is only one small partition file in my local system.
If we don't specify spark.default.parallelism
, the default parallelism will be 2.
Let's use parallelize
function:
from pyspark.sql import SparkSession from pyspark.sql.types import IntegerType, StructField, StructType from pyspark import SparkConf app_name = "PySpark Example" master = "local[2]" conf = SparkConf().setAppName(app_name)\ .setMaster(master) \ .set('spark.default.parallelism', '100') spark = SparkSession.builder.config(conf=conf) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") schema = StructType([StructField('num', IntegerType(), False)]) df = spark.sparkContext.parallelize(range(1, 100, 1)).toDF(schema=schema) print(df.rdd.getNumPartitions())
This time the number of partitions will be 100 as we have specified it in the configuration when creating SparkSession
object.
Now let's look into one example of join which will take 'spark.sql.shuffle.partitions
' into consideration.
from pyspark.sql import SparkSession from pyspark import SparkConf app_name = "PySpark Example" master = "local[*]" conf = SparkConf().setAppName(app_name)\ .setMaster(master) \ .set('spark.sql.shuffle.partitions', '100') spark = SparkSession.builder.config(conf=conf) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") data = [ [101, 56, 'A'], [101, 67, 'B'], [102, 70, 'C'], [103, 93, 'D'], [104, 70, 'E'] ] df = spark.createDataFrame(data, ['ID', 'Value', 'Category']) df_agg = df.groupBy('Category').sum('Value') df_agg.explain()
During data shuffling stage, 100 partitions will be created. This is also reflected in the execution plan (Exchange hashpartitioning(Category#2, 100)):
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(2) HashAggregate(keys=[Category#2], functions=[sum(Value#1L)])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(Category#2, 100), ENSURE_REQUIREMENTS, [id=#67]
+- *(1) HashAggregate(keys=[Category#2], functions=[partial_sum(Value#1L)])
+- *(1) Project [Value#1L, Category#2]
+- *(1) Scan ExistingRDD[ID#0L,Value#1L,Category#2]
+- == Initial Plan ==
HashAggregate(keys=[Category#2], functions=[sum(Value#1L)])
+- Exchange hashpartitioning(Category#2, 100), ENSURE_REQUIREMENTS, [id=#57]
+- HashAggregate(keys=[Category#2], functions=[partial_sum(Value#1L)])
+- Project [Value#1L, Category#2]
+- Scan ExistingRDD[ID#0L,Value#1L,Category#2]
Change partition numbers of DataFrame
We can use repartition
or coalesce
functions to change the partition number of one DataFrame. Refer to this article for more details:
Spark repartition vs. coalesce
I hope you now have a good understanding of these two configurations. Let me know if you have any questions.