Raymond Raymond

Spark Join Strategy Hints for SQL Queries

event 2022-08-21 visibility 3,984 comment 0 insights toc
more_vert
insights Stats
Spark Join Strategy Hints for SQL Queries

Spark query engine supports different join strategies for different queries. These strategies include BROADCAST, MERGE, SHUFFLE_HASH and SHUFFLE_REPLICATE_NL. Prior to Spark 3.0.0, only broadcast join hint are supported; from Spark 3.0.0, all these four typical join strategies hints are supported. These join hints can be used in Spark SQL directly or through Spark DataFrame APIs (hint). This article provides a detailed walkthrough of these join hints.

About join hints

BROADCAST join hint suggests Spark to use broadcast join regardless of configuration property autoBroadcastJoinThreshold. If both sides of the join have the broadcast hints, the one with the smaller size (based on stats) will be broadcast. The aliases for BROADCAST are BROADCASTJOIN and MAPJOIN.

MERGE join hint suggests Spark to use shuffle sort merge join. Its aliases are SHUFFLE_MERGE and MERGEJOIN.

SHUFFLE_HASH join hint suggests Spark to use shuffle hash join. If both sides have the shuffle hash hints, Spark chooses the smaller side (based on stats) as the build side.

SHUFFLE_REPLICATE_NL join hint suggests Spark to use shuffle-and-replicate nested loop join.

The performances of each join strategy can vary based on the stats of each relation table. 

Let's look into some examples. 

Walkthrough via example

Create DataFrames and join without hint

Create two DataFrames using the following code snippet.

from pyspark.sql import SparkSession
from pyspark import SparkConf

app_name = "PySpark - Join Hints Example"
master = "local[1]"

conf = SparkConf().setAppName(app_name)\
    .setMaster(master)


spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df1 = spark.createDataFrame([[1, 'A'], [2, 'B']], ['id', 'attr'])
df1.createOrReplaceTempView("t1")
df1.show()
df2 = spark.createDataFrame([[1, 100], [2, 200], [3, 200]],
                            ['id', 'int_attr'])
df2.createOrReplaceTempView("t2")
df2.show()

df = df1.join(df2, on='id', how='inner')
df.show()
df.explain()

This PySpark script is very simple - it creates two DataFrame objects in memory and then perform an inner join without any hints.

The output are:

+---+----+
| id|attr|
+---+----+
|  1|   A|
|  2|   B|
+---+----+

+---+--------+
| id|int_attr|
+---+--------+
|  1|     100|
|  2|     200|
|  3|     200|
+---+--------+

+---+----+--------+
| id|attr|int_attr|
+---+----+--------+
|  1|   A|     100|
|  2|   B|     200|
+---+----+--------+

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#0L, attr#1, int_attr#14L]
   +- SortMergeJoin [id#0L], [id#13L], Inner
      :- Sort [id#0L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#165]
      :     +- Filter isnotnull(id#0L)
      :        +- Scan ExistingRDD[id#0L,attr#1]
      +- Sort [id#13L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#13L, 200), ENSURE_REQUIREMENTS, [id=#166]
            +- Filter isnotnull(id#13L)
               +- Scan ExistingRDD[id#13L,int_attr#14L]

SortMergeJoin is used for this query. By looking into Spark History UI, we can also know the join strategy is SortMergeJoin.

2022082164646-image.png

Use Broadcast join hint

Now let's add a broadcast join hint to the query by modifying the join code.

df = df1.hint('broadcast').join(df2, on='id', how='inner')

The query plan is printed out as the following and BroadcastHashJoin is used as strategy:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#0L, attr#1, int_attr#14L]
   +- BroadcastHashJoin [id#0L], [id#13L], Inner, BuildLeft, false
      :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#117]
      :  +- Filter isnotnull(id#0L)
      :     +- Scan ExistingRDD[id#0L,attr#1]
      +- Filter isnotnull(id#13L)
         +- Scan ExistingRDD[id#13L,int_attr#14L]

This is also reflected in Spark UI:

2022082165104-image.png


Use Merge join hint

The following code utilizes merge join hint:

df = df1.hint('merge').join(df2, on='id', how='inner')

Physical plan is printed out as the following:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#0L, attr#1, int_attr#14L]
   +- SortMergeJoin [id#0L], [id#13L], Inner
      :- Sort [id#0L ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#165]
      :     +- Filter isnotnull(id#0L)
      :        +- Scan ExistingRDD[id#0L,attr#1]
      +- Sort [id#13L ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#13L, 200), ENSURE_REQUIREMENTS, [id=#166]
            +- Filter isnotnull(id#13L)
               +- Scan ExistingRDD[id#13L,int_attr#14L]

From Spark UI, we can also identify that SortMergeJoin is now used:

2022082164646-image.png

Use SHUFFLE_HASH join hint

Now we can also change the join hint to use shuffle hash join:

df = df1.hint('SHUFFLE_HASH').join(df2, on='id', how='inner')

Physical plan is printed out as the following text:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#0L, attr#1, int_attr#14L]
   +- ShuffledHashJoin [id#0L], [id#13L], Inner, BuildLeft
      :- Exchange hashpartitioning(id#0L, 200), ENSURE_REQUIREMENTS, [id=#144]
      :  +- Filter isnotnull(id#0L)
      :     +- Scan ExistingRDD[id#0L,attr#1]
      +- Exchange hashpartitioning(id#13L, 200), ENSURE_REQUIREMENTS, [id=#145]
         +- Filter isnotnull(id#13L)
            +- Scan ExistingRDD[id#13L,int_attr#14L]

As we can tell, ShuffledHashJoin is used. This is also reflected in Spark UI as the following screenshot shows:

2022082165722-image.png

Use SHUFFLE_REPLICATE_NL join hints

Finally, let's change join hint to SHUFFLE_REPLICATE_NL.

== Physical Plan ==
*(3) Project [id#0L, attr#1, int_attr#14L]
+- CartesianProduct (id#0L = id#13L)
   :- *(1) Filter isnotnull(id#0L)
   :  +- *(1) Scan ExistingRDD[id#0L,attr#1]
   +- *(2) Filter isnotnull(id#13L)
      +- *(2) Scan ExistingRDD[id#13L,int_attr#14L]

This ends up with a CartesianProduct join. The graphical execution plan looks like the following screenshot:

2022082170028-image.png

Join hints in Spark SQL directly

We can also directly add these join hints to Spark SQL queries directly.

df = spark.sql("SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.id = t2.id;")

This add broadcast join hint for t1. t1 was registered as temporary view/table from df1.

The result is exactly the same as previous broadcast join hint:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [id#0L], [id#13L], Inner, BuildLeft, false
   :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#113]
   :  +- Filter isnotnull(id#0L)
   :     +- Scan ExistingRDD[id#0L,attr#1]
   +- Filter isnotnull(id#13L)
      +- Scan ExistingRDD[id#13L,int_attr#14L]

The following SQL statements show more examples of using these join hints in Spark SQL directly.

-- Join Hints for broadcast join
SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.id = t2.id;
SELECT /*+ BROADCASTJOIN (t1) */ * FROM t1 left JOIN t2 ON t1.id = t2.id;
SELECT /*+ MAPJOIN(t2) */ * FROM t1 right JOIN t2 ON t1.id = t2.id;

-- Join Hints for shuffle sort merge join
SELECT /*+ SHUFFLE_MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.id = t2.id;
SELECT /*+ MERGEJOIN(t2) */ * FROM t1 INNER JOIN t2 ON t1.id = t2.id;
SELECT /*+ MERGE(t1) */ * FROM t1 INNER JOIN t2 ON t1.id = t2.id;

-- Join Hints for shuffle hash join
SELECT /*+ SHUFFLE_HASH(t1) */ * FROM t1 INNER JOIN t2 ON t1.id = t2.id;

-- Join Hints for shuffle-and-replicate nested loop join
SELECT /*+ SHUFFLE_REPLICATE_NL(t1) */ * FROM t1 INNER JOIN t2 ON t1.id = t2.id;

-- When different join strategy hints are specified on both sides of a join, Spark
-- prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint
-- over the SHUFFLE_REPLICATE_NL hint.
-- Spark will issue Warning in the following example
-- org.apache.spark.sql.catalyst.analysis.HintErrorLogger: Hint (strategy=merge)
-- is overridden by another hint and will not take effect.
SELECT /*+ BROADCAST(t1), MERGE(t1, t2) */ * FROM t1 INNER JOIN t2 ON t1.id = t2.id;

Priority of join hints

For the scenario that multiple different join hints are added for the same table, Spark follows the priority list below:

  1. BROADCAST
  2. MERGE
  3. SHUFFLE_HASH
  4. SHUFFLE_REPLACE_NL

Summary

I hope you now have a good understanding of join hints in Spark. We can directly apply join hints via DataFrame APIs or in SQL statements. Please be cautious when using these join hints as they can largely impact the join performance. If you are not sure, it's better to leave to Spark SQL engine to pick up the appropriate join strategy based on stats and execution information.

If you have any questions, feel free to leave a comment.

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts