By using this site, you acknowledge that you have read and understand our Cookie policy, Privacy policy and Terms .

In my previous post about Data Partitioning in Spark (PySpark) In-depth Walkthrough, I mentioned how to repartition data frames in Spark using repartition or coalesce functions.

In this post, I am going to explain how Spark partition data using partitioning functions.

Partitioner

Partitioner class is used to partition data based on keys. It accepts two parameters numPartitions and partitionFunc to initiate as the following code shows:

def __init__(self, numPartitions, partitionFunc):

The first parameter defines the number of partitions while the second parameter defines the partition function.

def __call__(self, k):

return self.partitionFunc(k) % self.numPartitions

Partitioner then uses this partition function to generate the partition number for each keys.

partitionBy function

The partitionBy function is defined as the following:

def partitionBy(self, numPartitions, partitionFunc=portable_hash)

By default, the partition function is portable_hash.

Walkthrough with data

Create a sample data frame

Let’s first create a data frame using the following code:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.rdd import portable_hash
from pyspark import Row

appName = "PySpark Partition Example"
master = "local[8]"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()
print(spark.version)
spark.sparkContext.setLogLevel("ERROR")

# Populate sample data
countries = ("CN", "AU", "US")
data = []
for i in range(1, 13):
    data.append({"ID": i, "Country": countries[i % 3],  "Amount": 10+i})

def print_partitions(df):
    numPartitions = df.rdd.getNumPartitions()
    print("Total partitions: {}".format(numPartitions))
    print("Partitioner: {}".format(df.rdd.partitioner))
    df.explain()
    parts = df.rdd.glom().collect()
    i = 0
    j = 0
    for p in parts:
        print("Partition {}:".format(i))
        for r in p:
            print("Row {}:{}".format(j, r))
            j = j+1
        i = i+1


df = spark.createDataFrame(data)
df.show()
print_partitions(df)

In the above code, print_partitions function will print out all the details about the RDD partitions including the rows in each partition.

There are 12 records populated:

image

The output from print_partitions function is shown below:

Total partitions: 8
Partitioner: None
== Physical Plan ==
Scan ExistingRDD[Amount#0L,Country#1,ID#2L]
Partition 0:
Row 0:Row(Amount=11, Country='AU', ID=1)
Partition 1:
Row 1:Row(Amount=12, Country='US', ID=2)
Row 2:Row(Amount=13, Country='CN', ID=3)
Partition 2:
Row 3:Row(Amount=14, Country='AU', ID=4)
Partition 3:
Row 4:Row(Amount=15, Country='US', ID=5)
Row 5:Row(Amount=16, Country='CN', ID=6)
Partition 4:
Row 6:Row(Amount=17, Country='AU', ID=7)
Partition 5:
Row 7:Row(Amount=18, Country='US', ID=8)
Row 8:Row(Amount=19, Country='CN', ID=9)
Partition 6:
Row 9:Row(Amount=20, Country='AU', ID=10)
Partition 7:
Row 10:Row(Amount=21, Country='US', ID=11)
Row 11:Row(Amount=22, Country='CN', ID=12)

Records are divided into 8 partitions as 8 worker threads were configured.

Repartition data

Let’s repartition the data to three partitions only by Country column.

numPartitions = 3

df = df.repartition(numPartitions, "Country")

print_partitions(df)

The output looks like the following:

Total partitions: 3
Partitioner: None
== Physical Plan ==
Exchange hashpartitioning(Country#1, 3)
+- Scan ExistingRDD[Amount#0L,Country#1,ID#2L]
Partition 0:
Partition 1:
Row 0:Row(Amount=12, Country='US', ID=2)
Row 1:Row(Amount=13, Country='CN', ID=3)
Row 2:Row(Amount=15, Country='US', ID=5)
Row 3:Row(Amount=16, Country='CN', ID=6)
Row 4:Row(Amount=18, Country='US', ID=8)
Row 5:Row(Amount=19, Country='CN', ID=9)
Row 6:Row(Amount=21, Country='US', ID=11)
Row 7:Row(Amount=22, Country='CN', ID=12)
Partition 2:
Row 8:Row(Amount=11, Country='AU', ID=1)
Row 9:Row(Amount=14, Country='AU', ID=4)
Row 10:Row(Amount=17, Country='AU', ID=7)
Row 11:Row(Amount=20, Country='AU', ID=10)

You may expect that each partition includes data for each Country but that is not the case. Why? Because repartition function by default uses hash partitioning. For different country code, it may be allocated into the same partition number.

We can verify this by using the following code to calculate the hash

udf_portable_hash = udf(lambda str: portable_hash(str))

df = df.withColumn("Hash#", udf_portable_hash(df.Country))

df = df.withColumn("Partition#", df["Hash#"] % numPartitions)

df.show()

The output looks like the following:

image

This output is consistent with the previous one as record ID 1,4,7,10 are allocated to one partition while the others are allocated to another question. There is also one partition with empty content as no records are allocated to that partition.

Allocate one partition for each key value

For the above example, if we want to allocate one partition for each Country (CN, US, AU), what should we do?

Well, the first thing we can try is to increase the partition number. In this way, the chance for allocating each different value to different partition is higher.

So if we increate the partition number to 5.

numPartitions = 5

df = df.repartition(numPartitions, "Country")

print_partitions(df)

udf_portable_hash = udf(lambda str: portable_hash(str))

df = df.withColumn("Hash#", udf_portable_hash(df.Country))

df = df.withColumn("Partition#", df["Hash#"] % numPartitions)

df.show()

The output shows that each country’s data is now located in the same partition:

Total partitions: 5
Partitioner: None
== Physical Plan ==
Exchange hashpartitioning(Country#1, 5)
+- Scan ExistingRDD[Amount#0L,Country#1,ID#2L]
Partition 0:
Partition 1:
Partition 2:
Row 0:Row(Amount=12, Country='US', ID=2)
Row 1:Row(Amount=15, Country='US', ID=5)
Row 2:Row(Amount=18, Country='US', ID=8)
Row 3:Row(Amount=21, Country='US', ID=11)
Partition 3:
Row 4:Row(Amount=13, Country='CN', ID=3)
Row 5:Row(Amount=16, Country='CN', ID=6)
Row 6:Row(Amount=19, Country='CN', ID=9)
Row 7:Row(Amount=22, Country='CN', ID=12)
Partition 4:
Row 8:Row(Amount=11, Country='AU', ID=1)
Row 9:Row(Amount=14, Country='AU', ID=4)
Row 10:Row(Amount=17, Country='AU', ID=7)
Row 11:Row(Amount=20, Country='AU', ID=10)
+------+-------+---+-----------+----------+
|Amount|Country| ID|      Hash#|Partition#|
+------+-------+---+-----------+----------+
|    12|     US|  2| 1071627821|       1.0|
|    15|     US|  5| 1071627821|       1.0|
|    18|     US|  8| 1071627821|       1.0|
|    21|     US| 11| 1071627821|       1.0|
|    13|     CN|  3|-1457862464|      -4.0|
|    16|     CN|  6|-1457862464|      -4.0|
|    19|     CN|  9|-1457862464|      -4.0|
|    22|     CN| 12|-1457862464|      -4.0|
|    11|     AU|  1| -425625517|      -2.0|
|    14|     AU|  4| -425625517|      -2.0|
|    17|     AU|  7| -425625517|      -2.0|
|    20|     AU| 10| -425625517|      -2.0|
+------+-------+---+-----------+----------+

However, what if the hashing algorithm generates the same hash code/number?

Use partitionBy function

To address the above issue, we can create a customised partitioning function.

At the moment in PySpark (my Spark version is 2.3.3) , we cannot specify partition function in repartition function. So we can only use this function with RDD class.

As partitionBy function requires data to be in key/value format, we need to also transform our data.

We can run the following code to use a custom paritioner:

def country_partitioning(k):
    return countries.index(k)
    
udf_country_hash = udf(lambda str: country_partitioning(str))

df = df.rdd \
    .map(lambda el: (el["Country"], el)) \
    .partitionBy(numPartitions, country_partitioning) \
    .toDF()
print_partitions(df)

df = df.withColumn("Hash#", udf_country_hash(df[0]))
df = df.withColumn("Partition#", df["Hash#"] % numPartitions)
df.show()

The output looks like the following:

Total partitions: 5
Partitioner: None
== Physical Plan ==
Scan ExistingRDD[_1#17,_2#18]
Partition 0:
Row 0:Row(_1='CN', _2=Row(Amount=13, Country='CN', ID=3))
Row 1:Row(_1='CN', _2=Row(Amount=16, Country='CN', ID=6))
Row 2:Row(_1='CN', _2=Row(Amount=19, Country='CN', ID=9))
Row 3:Row(_1='CN', _2=Row(Amount=22, Country='CN', ID=12))
Partition 1:
Row 4:Row(_1='AU', _2=Row(Amount=11, Country='AU', ID=1))
Row 5:Row(_1='AU', _2=Row(Amount=14, Country='AU', ID=4))
Row 6:Row(_1='AU', _2=Row(Amount=17, Country='AU', ID=7))
Row 7:Row(_1='AU', _2=Row(Amount=20, Country='AU', ID=10))
Partition 2:
Row 8:Row(_1='US', _2=Row(Amount=12, Country='US', ID=2))
Row 9:Row(_1='US', _2=Row(Amount=15, Country='US', ID=5))
Row 10:Row(_1='US', _2=Row(Amount=18, Country='US', ID=8))
Row 11:Row(_1='US', _2=Row(Amount=21, Country='US', ID=11))
Partition 3:
Partition 4:
+---+------------+-----+----------+
| _1|          _2|Hash#|Partition#|
+---+------------+-----+----------+
| CN| [13, CN, 3]|    0|       0.0|
| CN| [16, CN, 6]|    0|       0.0|
| CN| [19, CN, 9]|    0|       0.0|
| CN|[22, CN, 12]|    0|       0.0|
| AU| [11, AU, 1]|    1|       1.0|
| AU| [14, AU, 4]|    1|       1.0|
| AU| [17, AU, 7]|    1|       1.0|
| AU|[20, AU, 10]|    1|       1.0|
| US| [12, US, 2]|    2|       2.0|
| US| [15, US, 5]|    2|       2.0|
| US| [18, US, 8]|    2|       2.0|
| US|[21, US, 11]|    2|       2.0|
+---+------------+-----+----------+

Through this customised partitioning function, we guarantee each different country code gets a unique deterministic hash number.

Now if we change the number of partitions to 2, both US and CN records will be allocated to one partition because:

  • CN (Hash# = 0): 0%2 = 0
  • US (Hash# = 2): 2%2 = 0
  • AU (Hash# = 1): 1%2 = 1

Let’s keep exploring a little bit more.

For the above partitioned data frame (2 partitions), if we then write the dataframe to file system, how many sharded files will be generated?

df = df.select(df["_1"].alias("Country"), df["Hash#"], df["Partition#"],

df["_2"]["Amount"].alias("Amount"), df["_2"]["ID"].alias("ID"))

df.show()

print_partitions(df)

df.write.mode("overwrite").partitionBy("Country").csv("data/example2.csv", header=True)

The answer is 2 as there are two partitions.

The first file includes data for country CN and US and the other one includes data for country AU.

However, if we change the last line of code to the follow:

df.write.mode("overwrite").partitionBy("Country").csv("data/example2.csv", header=True)

Then three folders will be created with one file in each. The partition number for CN and US folders will be the same since the data is from the same partition.

image

Scala / Java

For this post, I am only focusing on PySpark, if you primarily use Scala or Java, the concepts are similar. For example, in Scala/Java APIs, you can also implement a customised Partitioner class to customise your partition strategy.

If you have any questions, feel free to comment here.

info Last modified by Raymond at 2 years ago * This page is subject to Site terms.

More from Kontext

PySpark Read Multiple Lines Records from CSV

local_offer pyspark local_offer spark-2-x local_offer python

visibility 27
thumb_up 0
access_time 9 days ago

CSV is a common format used when extracting and exchanging data between systems and platforms. Once CSV file is ingested into HDFS, you can easily read them as DataFrame in Spark. However there are a few options you need to pay attention to especially if you source file: Has records ac...

open_in_new View open_in_new Spark + PySpark

local_offer pyspark local_offer spark-2-x local_offer teradata local_offer SQL Server

visibility 60
thumb_up 0
access_time 20 days ago

In my previous article about  Connect to SQL Server in Spark (PySpark) , I mentioned the ways t...

open_in_new View open_in_new Spark + PySpark

Spark Read from SQL Server Source using Windows/Kerberos Authentication

local_offer pyspark local_offer SQL Server local_offer spark-2-x

visibility 145
thumb_up 0
access_time 3 months ago

In this article, I am going to show you how to use JDBC Kerberos authentication to connect to SQL Server sources in Spark (PySpark). I will use  Kerberos connection with principal names and password directly that requires  ...

open_in_new View open_in_new Spark + PySpark

Schema Merging (Evolution) with Parquet in Spark and Hive

local_offer parquet local_offer pyspark local_offer spark-2-x local_offer hive local_offer hdfs

visibility 332
thumb_up 0
access_time 3 months ago

Schema evolution is supported by many frameworks or data serialization systems such as Avro, Orc, Protocol Buffer and Parquet. With schema evolution, one set of data can be stored in multiple files with different but compatible schema. In Spark, Parquet data source can detect and merge sch...

open_in_new View open_in_new Spark + PySpark

info About author

Dark theme mode

Dark theme mode is available on Kontext.

Learn more arrow_forward
Kontext Column

Kontext Column

Created for everyone to publish data, programming and cloud related articles. Follow three steps to create your columns.

Learn more arrow_forward
info Follow us on Twitter to get the latest article updates. Follow us