Differences between spark.sql.shuffle.partitions and spark.default.parallelism

Kontext Kontext 0 3105 2.98 index 8/20/2022

In Spark, there are two commonly used parallelism configurations: spark.sql.shuffle.partitions and spark.default.parallelism. This article summarizes the key differences between them.

About spark.default.parallelism

Configuration spark.default.parallelism specifies the default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

The default value for this configuration item itself depends on the cluster type and operations:

  • For shuffle operations like reduceByKey and join, the default value is the largest number of partitions in a upstream RDD.
  • For operations like parallelize with no parent RDDs, it depends on the cluster manager:
    • Local mode: number of cores on the local machine
    • Mesos fine grained mode: 8
    • Others: total number of cores on all executor nodes or 2, whichever is larger

This configuration can also be used when reading files as minimum split file partitions when configuration spark.sql.files.minPartitionNum is not specified.

About spark.sql.shuffle.partitions

This configuration is used to specify the default number partitions when shuffles data for aggregations or joins. The default value is 200.

When using repartition function, the default partition number will be this configuration value.

Key differences

Configuration spark.default.parallelism is mainly used when directly working with RDDs (not DataFrame) while spark.sql.shuffle.partitions is used by Spark SQL engine.

Configure these two items

Depends on how you are running your code, there can be different approaches to set these two configuration items.

Via SparkSession.conf.set

We can set the configuration directly:

spark.conf.set('spark.sql.shuffle.partitions','100')
spark.conf.set('spark.default.parallelism','100')

We can also specify it before creating SparkSession object:

conf = SparkConf().setAppName(app_name)\
    .setMaster(master) \
    .set('spark.default.parallelism', '100') \
    .set('spark.sql.shuffle.partitions', '100')

spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

Via spark-submit command

If you don't want to specify the configuration items into the code, you can add them when submit Spark application:

spark-submit --conf spark.sql.shuffle.partitions=100 --conf spark.default.parallelism=100

Examples

Now let's look into some of the examples of these two configuration items.

The following example reads JSON data in my local system:

from pyspark.sql import SparkSession
from pyspark import SparkConf

app_name = "PySpark Example"
master = "local[2]"

conf = SparkConf().setAppName(app_name)\
    .setMaster(master) \
    .set('spark.default.parallelism', '100')

spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = spark.read.format('json').load(
    'file:///home/kontext/pyspark-examples/data/json-example')
print(df.rdd.getNumPartitions())

The printed out number of partitions is 1 because there is only one small partition file in my local system.

If we don't specify spark.default.parallelism, the default parallelism will be 2.

Let's use parallelize function:

from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StructField, StructType
from pyspark import SparkConf

app_name = "PySpark Example"
master = "local[2]"

conf = SparkConf().setAppName(app_name)\
    .setMaster(master) \
    .set('spark.default.parallelism', '100')

spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

schema = StructType([StructField('num', IntegerType(), False)])
df = spark.sparkContext.parallelize(range(1, 100, 1)).toDF(schema=schema)
print(df.rdd.getNumPartitions())

This time the number of partitions will be 100 as we have specified it in the configuration when creating SparkSession object.

Now let's look into one example of join which will take 'spark.sql.shuffle.partitions' into consideration.

from pyspark.sql import SparkSession
from pyspark import SparkConf

app_name = "PySpark Example"
master = "local[*]"

conf = SparkConf().setAppName(app_name)\
    .setMaster(master) \
    .set('spark.sql.shuffle.partitions', '100')

spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

data = [
    [101, 56, 'A'],
    [101, 67, 'B'],
    [102, 70, 'C'],
    [103, 93, 'D'],
    [104, 70, 'E']
]

df = spark.createDataFrame(data, ['ID', 'Value', 'Category'])

df_agg = df.groupBy('Category').sum('Value')

df_agg.explain()

During data shuffling stage, 100 partitions will be created. This is also reflected in the execution plan (Exchange hashpartitioning(Category#2, 100)):

== Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan ==    *(2) HashAggregate(keys=[Category#2], functions=[sum(Value#1L)])    +- AQEShuffleRead coalesced       +- ShuffleQueryStage 0          +- Exchange hashpartitioning(Category#2, 100), ENSURE_REQUIREMENTS, [id=#67]             +- *(1) HashAggregate(keys=[Category#2], functions=[partial_sum(Value#1L)])                +- *(1) Project [Value#1L, Category#2]                   +- *(1) Scan ExistingRDD[ID#0L,Value#1L,Category#2] +- == Initial Plan ==    HashAggregate(keys=[Category#2], functions=[sum(Value#1L)])    +- Exchange hashpartitioning(Category#2, 100), ENSURE_REQUIREMENTS, [id=#57]       +- HashAggregate(keys=[Category#2], functions=[partial_sum(Value#1L)])          +- Project [Value#1L, Category#2]             +- Scan ExistingRDD[ID#0L,Value#1L,Category#2]

Change partition numbers of DataFrame

We can use repartition or coalesce functions to change the partition number of one DataFrame. Refer to this article for more details:

Spark repartition vs. coalesce

I hope you now have a good understanding of these two configurations. Let me know if you have any questions.

pyspark spark spark-config

Join the Discussion

View or add your thoughts below

Comments