Spark repartition
function can be used to repartition your DataFrame. In article Spark repartition vs. coalesce, I've explained the differences between two commonly used functions repartition
and coalesce
. I will explore more about repartition
function internals with some examples about physical partition schemes (operators) for partitioning: HashPartitioning
, RangePartitioning
, RoundRobinPartitioning
, and SinglePartition
.
About PySpark repartition function
The source code of PySpark repartition function shows that it directly calls the Java DataFrame obejct's repartition function.
PySpark:
def repartition( # type: ignore[misc] self, numPartitions: Union[int, "ColumnOrName"], *cols: "ColumnOrName" ) -> "DataFrame":
Scala:
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
repartitionByExpression(Some(numPartitions), partitionExprs)
}
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
Repartition(numPartitions, shuffle = true, logicalPlan)
}
def repartition(partitionExprs: Column*): Dataset[T] = {
repartitionByExpression(None, partitionExprs)
}
There are variations:
When only
numPartitions
specified, it returns a newDataset
(DataFrame
) with exactly numPartitions partitions.When only columns are specified, it returns a new
Dataset
(DataFrame
) that is partitioned by the given partitioning expressions, usingspark.sql.shuffle.partitions
as number of partitions. By default, it is 200 partitions.When both specified, it returns a new
Dataset
(DataFrame
) that is partitioned by the given partitioning expressions, usingnumPartitions
as number of partitions.
The returned Dataset
(DataFrame
) is hash partitioned.
Repartition and RepartitionByExpression operators
As shown in the above code, repartition
function results in logical operations RepartitionByExpression
or Repartition
.
The two operators locate in namespace org.apache.spark.sql.catalyst.plans.logical
. The source code can be found here. The following code snippet is extracted:
/**
* A base interface for [[RepartitionByExpression]] and [[Repartition]]
*/
abstract class RepartitionOperation extends UnaryNode {
def shuffle: Boolean
def numPartitions: Int
override final def maxRows: Option[Long] = child.maxRows
override def output: Seq[Attribute] = child.output
final override val nodePatterns: Seq[TreePattern] = Seq(REPARTITION_OPERATION)
def partitioning: Partitioning
}
/**
* Returns a new RDD that has exactly `numPartitions` partitions. Differs from
* [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
* asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
* of the output requires some specific ordering or distribution of the data.
*/
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
extends RepartitionOperation {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
override def partitioning: Partitioning = {
require(shuffle, "Partitioning can only be used in shuffle.")
numPartitions match {
case 1 => SinglePartition
case _ => RoundRobinPartitioning(numPartitions)
}
}
override protected def withNewChildInternal(newChild: LogicalPlan): Repartition =
copy(child = newChild)
}
/**
* This method repartitions data using [[Expression]]s into `optNumPartitions`, and receives
* information about the number of partitions during execution. Used when a specific ordering or
* distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
* `coalesce` and `repartition`. If no `optNumPartitions` is given, by default it partitions data
* into `numShufflePartitions` defined in `SQLConf`, and could be coalesced by AQE.
*/
case class RepartitionByExpression(
partitionExpressions: Seq[Expression],
child: LogicalPlan,
optNumPartitions: Option[Int]) extends RepartitionOperation {
val numPartitions = optNumPartitions.getOrElse(conf.numShufflePartitions)
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
override val partitioning: Partitioning = {
val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder])
require(sortOrder.isEmpty || nonSortOrder.isEmpty,
s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " +
"`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " +
"means `HashPartitioning`. In this case we have:" +
s"""
|SortOrder: $sortOrder
|NonSortOrder: $nonSortOrder
""".stripMargin)
if (numPartitions == 1) {
SinglePartition
} else if (sortOrder.nonEmpty) {
RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]), numPartitions)
} else if (nonSortOrder.nonEmpty) {
HashPartitioning(nonSortOrder, numPartitions)
} else {
RoundRobinPartitioning(numPartitions)
}
}
override def shuffle: Boolean = true
override protected def withNewChildInternal(newChild: LogicalPlan): RepartitionByExpression =
copy(child = newChild)
}
object RepartitionByExpression {
def apply(
partitionExpressions: Seq[Expression],
child: LogicalPlan,
numPartitions: Int): RepartitionByExpression = {
RepartitionByExpression(partitionExpressions, child, Some(numPartitions))
}
}
Now let's examine some examples about the physial
repartition(1)
We can tell from the source code that SinglePartition
will be used.
case 1 => SinglePartition
case _ => RoundRobinPartitioning
repartition(10)
RoundRobinPartitioning
will be used.
repartition('Col1','Col2')
HashPartitioning
will be used.
repartitionByRange(100, "Col1")
RangePartitioning will be used.