Spark repartition Function Internals
Spark repartition
function can be used to repartition your DataFrame. In article Spark repartition vs. coalesce, I've explained the differences between two commonly used functions repartition
and coalesce
. I will explore more about repartition
function internals with some examples about physical partition schemes (operators) for partitioning: HashPartitioning
, RangePartitioning
, RoundRobinPartitioning
, and SinglePartition
.
About PySpark repartition function
The source code of PySpark repartition function shows that it directly calls the Java DataFrame obejct's repartition function.
PySpark:
def repartition( # type: ignore[misc]
self, numPartitions: Union[int, "ColumnOrName"], *cols: "ColumnOrName"
) -> "DataFrame":
Scala:
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { repartitionByExpression(Some(numPartitions), partitionExprs) }
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan { Repartition(numPartitions, shuffle = true, logicalPlan) }
def repartition(partitionExprs: Column*): Dataset[T] = { repartitionByExpression(None, partitionExprs) }
There are variations:
- When only
numPartitions
specified, it returns a newDataset
(DataFrame
) with exactly numPartitions partitions.
- When only columns are specified, it returns a new
Dataset
(DataFrame
) that is partitioned by the given partitioning expressions, using `spark.sql.shuffle.partitions
` as number of partitions. By default, it is 200 partitions.
- When both specified, it returns a new
Dataset
(DataFrame
) that is partitioned by the given partitioning expressions, using `numPartitions` as number of partitions.
The returned Dataset
(DataFrame
) is hash partitioned.
Repartition and RepartitionByExpression operators
As shown in the above code, repartition
function results in logical operations RepartitionByExpression
or Repartition
.
The two operators locate in namespace org.apache.spark.sql.catalyst.plans.logical
. The source code can be found here. The following code snippet is extracted:
/** * A base interface for [[RepartitionByExpression]] and [[Repartition]] */ abstract class RepartitionOperation extends UnaryNode { def shuffle: Boolean def numPartitions: Int override final def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output final override val nodePatterns: Seq[TreePattern] = Seq(REPARTITION_OPERATION) def partitioning: Partitioning } /** * Returns a new RDD that has exactly `numPartitions` partitions. Differs from * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer * of the output requires some specific ordering or distribution of the data. */ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends RepartitionOperation { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") override def partitioning: Partitioning = { require(shuffle, "Partitioning can only be used in shuffle.") numPartitions match { case 1 => SinglePartition case _ => RoundRobinPartitioning(numPartitions) } } override protected def withNewChildInternal(newChild: LogicalPlan): Repartition = copy(child = newChild) } /** * This method repartitions data using [[Expression]]s into `optNumPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like * `coalesce` and `repartition`. If no `optNumPartitions` is given, by default it partitions data * into `numShufflePartitions` defined in `SQLConf`, and could be coalesced by AQE. */ case class RepartitionByExpression( partitionExpressions: Seq[Expression], child: LogicalPlan, optNumPartitions: Option[Int]) extends RepartitionOperation { val numPartitions = optNumPartitions.getOrElse(conf.numShufflePartitions) require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") override val partitioning: Partitioning = { val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) require(sortOrder.isEmpty || nonSortOrder.isEmpty, s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " + "`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " + "means `HashPartitioning`. In this case we have:" + s""" |SortOrder: $sortOrder |NonSortOrder: $nonSortOrder """.stripMargin) if (numPartitions == 1) { SinglePartition } else if (sortOrder.nonEmpty) { RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]), numPartitions) } else if (nonSortOrder.nonEmpty) { HashPartitioning(nonSortOrder, numPartitions) } else { RoundRobinPartitioning(numPartitions) } } override def shuffle: Boolean = true override protected def withNewChildInternal(newChild: LogicalPlan): RepartitionByExpression = copy(child = newChild) } object RepartitionByExpression { def apply( partitionExpressions: Seq[Expression], child: LogicalPlan, numPartitions: Int): RepartitionByExpression = { RepartitionByExpression(partitionExpressions, child, Some(numPartitions)) } }
Now let's examine some examples about the physial
repartition(1)
We can tell from the source code that SinglePartition
will be used.
case 1 => SinglePartition case _ => RoundRobinPartitioning
repartition(10)
RoundRobinPartitioning
will be used.
repartition('Col1','Col2')
HashPartitioning
will be used.
repartitionByRange(100, "Col1")
RangePartitioning will be used.
References
Partitioning ยท The Internals of Spark SQL