Use Spark SQL Partitioning Hints

event 2022-08-21 visibility 4,217 comment 0 insights
more_vert
insights Stats
Raymond Raymond Spark & PySpark

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

*Spark logo is a registered trademark of Apache Spark.


In Spark or PySpark, we can use coalesce and repartition functions to change the partitions of a DataFrame. In article Spark repartition vs. coalesce, I summarized the key differences between these two. If we are using Spark SQL directly, how do we repartition the data? The answer is partitioning hints.

About partitioning hints

Partitioning hints first were introduced in Spark 2.4 which added support for COALESCE and REPARTITION hints. Now in Spark 3.3.0, we have four hint types that can be used in Spark SQL queries.

Partitioning hintsPySpark DataFrame API
COALESCE
DataFrame.coalesce(numPartitions)
REPARTITION
DataFrame.repartition(numPartitions, *cols)
REPARTITION_BY_RANGE
DataFrame.repartitionByRange(numPartitions, *cols)
REBALANCEN/A - no corresponded DataFrame or Dataset API

COALESCE

The COALESCE hint can be used to reduce the number of partitions to the specified number of partitions. It takes a partition number as a parameter. It is similar as PySpark coalesce API of DataFrame:

def coalesce(numPartitions)

Example

The following code snippet creates a DataFrame in memory directly and then use partitioning hints to create a new DataFrame df with partition number reduced from 8 to 5.

from pyspark.sql import SparkSession
from pyspark import SparkConf

app_name = "PySpark - Partitioning Hints Example"
master = "local[8]"

conf = SparkConf().setAppName(app_name)\
    .setMaster(master)


spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df1 = spark.createDataFrame([[1, 'A'], [2, 'B']], ['id', 'attr'])
df1.createOrReplaceTempView("t1")

df = spark.sql("SELECT /*+ COALESCE(5) */ * FROM t1")
df.explain(mode='formatted')
print(df.rdd.getNumPartitions())

Output:

== Physical Plan ==
Coalesce (2)
+- * Scan ExistingRDD (1)

(1) Scan ExistingRDD [codegen id : 1]
Output [2]: [id#0L, attr#1]
Arguments: [id#0L, attr#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Coalesce
Input [2]: [id#0L, attr#1]
Arguments: 5

5

From the execution plan, we can tell that COALESCE hint is effective. 

REPARTITION

The REPARTITION hint is used to repartition to the specified number of partitions using the specified partitioning expressions. It takes a partition number, column names, or both as parameters. For details about repartition API, refer to Spark repartition vs. coalesce.

Example

Let's change the above code snippet slightly to use REPARTITION hint.

df = spark.sql("SELECT /*+ REPARTITION(5, attr) */ * FROM t1")

The code suggests Spark to repartition the DataFrame to 5 partitions and column 'attr' is used as partition key.

The output is now changed slightly:

== Physical Plan ==
AdaptiveSparkPlan (3)
+- Exchange (2)
   +- Scan ExistingRDD (1)

(1) Scan ExistingRDD
Output [2]: [id#0L, attr#1]
Arguments: [id#0L, attr#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Exchange
Input [2]: [id#0L, attr#1]
Arguments: hashpartitioning(attr#1, 5), REPARTITION_BY_NUM, [id=#9]
(3) AdaptiveSparkPlan
Output [2]: [id#0L, attr#1]
Arguments: isFinalPlan=false

5

In the second stage, hashparitioning partitioner is used as specified by the hint.

REPARTITION_BY_RANGE

The REPARTITION_BY_RANGE hint can be used to repartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters.

Example

Now let's try this partition hint in the above code by changing SELECT statement slightly:

df = spark.sql("SELECT /*+ REPARTITION_BY_RANGE(5, attr) */ * FROM t1")

Output:

== Physical Plan ==
AdaptiveSparkPlan (3)
+- Exchange (2)
   +- Scan ExistingRDD (1)

(1) Scan ExistingRDD
Output [2]: [id#0L, attr#1]
Arguments: [id#0L, attr#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Exchange
Input [2]: [id#0L, attr#1]
Arguments: rangepartitioning(attr#1 ASC NULLS FIRST, 5), REPARTITION_BY_NUM, [id=#9]
(3) AdaptiveSparkPlan
Output [2]: [id#0L, attr#1]
Arguments: isFinalPlan=false

2

This time rangepartitioning partitioner is used instead. As we have just a few records, the final number of partitions is 2 instead of 5.

REBALANCE

Unlike the other three, REBALANCE can only be used as hint, i.e. there is no corresponded Spark Dataset or PySpark DataFrame API. It can be used to rebalance the query result output partitions, so that every partition is of a reasonable size (not too small and not too big). It can also take column names as parameters, and try its best to partition the query result by these columns. This is a best-effort: if there are skews, Spark will split the skewed partitions, to make these partitions not too big. This hint is ignored if AQE (Adaptive Query Execution) is not enabled. In Spark 3.0 by default, AQE is enabled. 

This hint is very useful when you need to write the result of this query to a table, to avoid too small/big files.

Example

df = spark.sql("SELECT /*+ REBALANCE */ * FROM t1")

In this code snippet, we directly use REBALANCE hint without parameters. 

The extended query execution plan explains look like the following output:

== Parsed Logical Plan ==
'UnresolvedHint REBALANCE
+- 'Project [*]
   +- 'UnresolvedRelation [t1], [], false

== Analyzed Logical Plan ==
id: bigint, attr: string
RebalancePartitions
+- Project [id#0L, attr#1]
   +- SubqueryAlias t1
      +- View (`t1`, [id#0L,attr#1])
         +- LogicalRDD [id#0L, attr#1], false

== Optimized Logical Plan ==
RebalancePartitions
+- LogicalRDD [id#0L, attr#1], false

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(200), REBALANCE_PARTITIONS_BY_NONE, [id=#9]
   +- Scan ExistingRDD[id#0L,attr#1]

1

As we can tell, RoundRobinPartitioning partitioner is used with 200 as default partitions.

About Spark partitioning function

In the above examples, I mentioned about different physical partitioners. Refer to this diagram to learn more.

2022082183825-image.png

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts