Use Spark SQL Partitioning Hints
In Spark or PySpark, we can use coalesce
and repartition
functions to change the partitions of a DataFrame. In article Spark repartition vs. coalesce, I summarized the key differences between these two. If we are using Spark SQL directly, how do we repartition the data? The answer is partitioning hints.
About partitioning hints
Partitioning hints first were introduced in Spark 2.4 which added support for COALESCE
and REPARTITION
hints. Now in Spark 3.3.0, we have four hint types that can be used in Spark SQL queries.
Partitioning hints | PySpark DataFrame API |
COALESCE | DataFrame.coalesce(numPartitions) |
REPARTITION | DataFrame.repartition(numPartitions, *cols) |
REPARTITION_BY_RANGE | DataFrame.repartitionByRange(numPartitions, *cols) |
REBALANCE | N/A - no corresponded DataFrame or Dataset API |
COALESCE
The COALESCE
hint can be used to reduce the number of partitions to the specified number of partitions. It takes a partition number as a parameter. It is similar as PySpark coalesce API of DataFrame:
def coalesce(numPartitions)
Example
The following code snippet creates a DataFrame in memory directly and then use partitioning hints to create a new DataFrame df
with partition number reduced from 8 to 5.
from pyspark.sql import SparkSession from pyspark import SparkConf app_name = "PySpark - Partitioning Hints Example" master = "local[8]" conf = SparkConf().setAppName(app_name)\ .setMaster(master) spark = SparkSession.builder.config(conf=conf) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") df1 = spark.createDataFrame([[1, 'A'], [2, 'B']], ['id', 'attr']) df1.createOrReplaceTempView("t1") df = spark.sql("SELECT /*+ COALESCE(5) */ * FROM t1") df.explain(mode='formatted') print(df.rdd.getNumPartitions())
Output:
== Physical Plan ==
Coalesce (2)
+- * Scan ExistingRDD (1)
(1) Scan ExistingRDD [codegen id : 1]
Output [2]: [id#0L, attr#1]
Arguments: [id#0L, attr#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Coalesce
Input [2]: [id#0L, attr#1]
Arguments: 5
5
From the execution plan, we can tell that COALESCE
hint is effective.
REPARTITION
The REPARTITION
hint is used to repartition to the specified number of partitions using the specified partitioning expressions. It takes a partition number, column names, or both as parameters. For details about repartition
API, refer to Spark repartition vs. coalesce.
Example
Let's change the above code snippet slightly to use REPARTITION
hint.
df = spark.sql("SELECT /*+ REPARTITION(5, attr) */ * FROM t1")
The code suggests Spark to repartition the DataFrame to 5 partitions and column 'attr
' is used as partition key.
The output is now changed slightly:
== Physical Plan ==
AdaptiveSparkPlan (3)
+- Exchange (2)
+- Scan ExistingRDD (1)
(1) Scan ExistingRDD
Output [2]: [id#0L, attr#1]
Arguments: [id#0L, attr#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Exchange
Input [2]: [id#0L, attr#1]
Arguments: hashpartitioning(attr#1, 5), REPARTITION_BY_NUM, [id=#9]
(3) AdaptiveSparkPlan
Output [2]: [id#0L, attr#1]
Arguments: isFinalPlan=false
5
In the second stage, hashparitioning
partitioner is used as specified by the hint.
REPARTITION_BY_RANGE
The REPARTITION_BY_RANGE
hint can be used to repartition to the specified number of partitions using the specified partitioning expressions. It takes column names and an optional partition number as parameters.
Example
Now let's try this partition hint in the above code by changing SELECT statement slightly:
df = spark.sql("SELECT /*+ REPARTITION_BY_RANGE(5, attr) */ * FROM t1")
Output:
== Physical Plan ==
AdaptiveSparkPlan (3)
+- Exchange (2)
+- Scan ExistingRDD (1)
(1) Scan ExistingRDD
Output [2]: [id#0L, attr#1]
Arguments: [id#0L, attr#1], MapPartitionsRDD[4] at applySchemaToPythonRDD at NativeMethodAccessorImpl.java:0, ExistingRDD, UnknownPartitioning(0)
(2) Exchange
Input [2]: [id#0L, attr#1]
Arguments: rangepartitioning(attr#1 ASC NULLS FIRST, 5), REPARTITION_BY_NUM, [id=#9]
(3) AdaptiveSparkPlan
Output [2]: [id#0L, attr#1]
Arguments: isFinalPlan=false
2
This time rangepartitioning
partitioner is used instead. As we have just a few records, the final number of partitions is 2 instead of 5.
REBALANCE
Unlike the other three, REBALANCE
can only be used as hint, i.e. there is no corresponded Spark Dataset or PySpark DataFrame API. It can be used to rebalance the query result output partitions, so that every partition is of a reasonable size (not too small and not too big). It can also take column names as parameters, and try its best to partition the query result by these columns. This is a best-effort: if there are skews, Spark will split the skewed partitions, to make these partitions not too big. This hint is ignored if AQE (Adaptive Query Execution) is not enabled. In Spark 3.0 by default, AQE is enabled.
This hint is very useful when you need to write the result of this query to a table, to avoid too small/big files.
Example
df = spark.sql("SELECT /*+ REBALANCE */ * FROM t1")
In this code snippet, we directly use REBALANCE
hint without parameters.
The extended query execution plan explains look like the following output:
== Parsed Logical Plan == 'UnresolvedHint REBALANCE +- 'Project [*] +- 'UnresolvedRelation [t1], [], false == Analyzed Logical Plan == id: bigint, attr: string RebalancePartitions +- Project [id#0L, attr#1] +- SubqueryAlias t1 +- View (`t1`, [id#0L,attr#1]) +- LogicalRDD [id#0L, attr#1], false == Optimized Logical Plan == RebalancePartitions +- LogicalRDD [id#0L, attr#1], false == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Exchange RoundRobinPartitioning(200), REBALANCE_PARTITIONS_BY_NONE, [id=#9] +- Scan ExistingRDD[id#0L,attr#1] 1
As we can tell, RoundRobinPartitioning
partitioner is used with 200 as default partitions.
About Spark partitioning function
In the above examples, I mentioned about different physical partitioners. Refer to this diagram to learn more.