Differences between spark.sql.shuffle.partitions and spark.default.parallelism

visibility 92 event 2022-08-20 access_time 2 months ago language English
more_vert

In Spark, there are two commonly used parallelism configurations: spark.sql.shuffle.partitions and spark.default.parallelism. This article summarizes the key differences between them.

About spark.default.parallelism

Configuration spark.default.parallelism specifies the default number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user.

The default value for this configuration item itself depends on the cluster type and operations:

  • For shuffle operations like reduceByKey and join, the default value is the largest number of partitions in a upstream RDD.
  • For operations like parallelize with no parent RDDs, it depends on the cluster manager:
    • Local mode: number of cores on the local machine
    • Mesos fine grained mode: 8
    • Others: total number of cores on all executor nodes or 2, whichever is larger

    This configuration can also be used when reading files as minimum split file partitions when configuration spark.sql.files.minPartitionNum is not specified. 

      About spark.sql.shuffle.partitions

      This configuration is used to specify the default number partitions when shuffles data for aggregations or joins. The default value is 200.

      When using repartition function, the default partition number will be this configuration value.

      Key differences

      Configuration spark.default.parallelism is mainly used when directly working with RDDs (not DataFrame) while spark.sql.shuffle.partitions is used by Spark SQL engine.

      Configure these two items

      Depends on how you are running your code, there can be different approaches to set these two configuration items.

      Via SparkSession.conf.set

      We can set the configuration directly:

      spark.conf.set('spark.sql.shuffle.partitions','100')
      spark.conf.set('spark.default.parallelism','100')

      We can also specify it before creating SparkSession object:

      conf = SparkConf().setAppName(app_name)\
          .setMaster(master) \
          .set('spark.default.parallelism', '100') \
          .set('spark.sql.shuffle.partitions', '100')
      
      spark = SparkSession.builder.config(conf=conf) \
          .getOrCreate()

      Via spark-submit command

      If you don't want to specify the configuration items into the code, you can add them when submit Spark application:

      spark-submit --conf spark.sql.shuffle.partitions=100 --conf spark.default.parallelism=100

      Examples

      Now let's look into some of the examples of these two configuration items.

      The following example reads JSON data in my local system:

      from pyspark.sql import SparkSession
      from pyspark import SparkConf
      
      app_name = "PySpark Example"
      master = "local[2]"
      
      conf = SparkConf().setAppName(app_name)\
          .setMaster(master) \
          .set('spark.default.parallelism', '100')
      
      spark = SparkSession.builder.config(conf=conf) \
          .getOrCreate()
      
      spark.sparkContext.setLogLevel("WARN")
      
      df = spark.read.format('json').load(
          'file:///home/kontext/pyspark-examples/data/json-example')
      print(df.rdd.getNumPartitions())
      

      The printed out number of partitions is 1 because there is only one small partition file in my local system.

      If we don't specify spark.default.parallelism, the default parallelism will be 2.

      Let's use parallelize function:

      from pyspark.sql import SparkSession
      from pyspark.sql.types import IntegerType, StructField, StructType
      from pyspark import SparkConf
      
      app_name = "PySpark Example"
      master = "local[2]"
      
      conf = SparkConf().setAppName(app_name)\
          .setMaster(master) \
          .set('spark.default.parallelism', '100')
      
      spark = SparkSession.builder.config(conf=conf) \
          .getOrCreate()
      
      spark.sparkContext.setLogLevel("WARN")
      
      schema = StructType([StructField('num', IntegerType(), False)])
      df = spark.sparkContext.parallelize(range(1, 100, 1)).toDF(schema=schema)
      print(df.rdd.getNumPartitions())

      This time the number of partitions will be 100 as we have specified it in the configuration when creating SparkSession object.

      Now let's look into one example of join which will take 'spark.sql.shuffle.partitions' into consideration.

      from pyspark.sql import SparkSession
      from pyspark import SparkConf
      
      app_name = "PySpark Example"
      master = "local[*]"
      
      conf = SparkConf().setAppName(app_name)\
          .setMaster(master) \
          .set('spark.sql.shuffle.partitions', '100')
      
      spark = SparkSession.builder.config(conf=conf) \
          .getOrCreate()
      
      spark.sparkContext.setLogLevel("WARN")
      
      data = [
          [101, 56, 'A'],
          [101, 67, 'B'],
          [102, 70, 'C'],
          [103, 93, 'D'],
          [104, 70, 'E']
      ]
      
      df = spark.createDataFrame(data, ['ID', 'Value', 'Category'])
      
      df_agg = df.groupBy('Category').sum('Value')
      
      df_agg.explain()
      
      

      During data shuffling stage, 100 partitions will be created. This is also reflected in the execution plan (Exchange hashpartitioning(Category#2, 100)):

      == Physical Plan ==
      AdaptiveSparkPlan isFinalPlan=true
      +- == Final Plan ==
         *(2) HashAggregate(keys=[Category#2], functions=[sum(Value#1L)])
         +- AQEShuffleRead coalesced
            +- ShuffleQueryStage 0
               +- Exchange hashpartitioning(Category#2, 100), ENSURE_REQUIREMENTS, [id=#67]
                  +- *(1) HashAggregate(keys=[Category#2], functions=[partial_sum(Value#1L)])
                     +- *(1) Project [Value#1L, Category#2]
                        +- *(1) Scan ExistingRDD[ID#0L,Value#1L,Category#2]
      +- == Initial Plan ==
         HashAggregate(keys=[Category#2], functions=[sum(Value#1L)])
         +- Exchange hashpartitioning(Category#2, 100), ENSURE_REQUIREMENTS, [id=#57]
            +- HashAggregate(keys=[Category#2], functions=[partial_sum(Value#1L)])
               +- Project [Value#1L, Category#2]
                  +- Scan ExistingRDD[ID#0L,Value#1L,Category#2]

      Change partition numbers of DataFrame

      We can use repartition or coalesce functions to change the partition number of one DataFrame. Refer to this article for more details:

      Spark repartition vs. coalesce

      I hope you now have a good understanding of these two configurations. Let me know if you have any questions.

      copyright This page is subject to Site terms.
      Like this article?
      Share on

      Please log in or register to comment.

      account_circle Log in person_add Register

      Log in with external accounts