Spark repartition
function can be used to repartition your DataFrame. In article Spark repartition vs. coalesce, I've explained the differences between two commonly used functions repartition
and coalesce
. I will explore more about repartition
function internals with some examples about physical partition schemes (operators) for partitioning: HashPartitioning
, RangePartitioning
, RoundRobinPartitioning
, and SinglePartition
.
About PySpark repartition function
The source code of PySpark repartition function shows that it directly calls the Java DataFrame obejct's repartition function.
PySpark:
def repartition( # type: ignore[misc]
self, numPartitions: Union[int, "ColumnOrName"], *cols: "ColumnOrName"
) -> "DataFrame":
Scala:
def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = { repartitionByExpression(Some(numPartitions), partitionExprs) }
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan { Repartition(numPartitions, shuffle = true, logicalPlan) }
def repartition(partitionExprs: Column*): Dataset[T] = { repartitionByExpression(None, partitionExprs) }
There are variations:
- When only
numPartitions
specified, it returns a newDataset
(DataFrame
) with exactly numPartitions partitions.
- When only columns are specified, it returns a new
Dataset
(DataFrame
) that is partitioned by the given partitioning expressions, using `spark.sql.shuffle.partitions
` as number of partitions. By default, it is 200 partitions.
- When both specified, it returns a new
Dataset
(DataFrame
) that is partitioned by the given partitioning expressions, using `numPartitions` as number of partitions.
The returned Dataset
(DataFrame
) is hash partitioned.
Repartition and RepartitionByExpression operators
As shown in the above code, repartition
function results in logical operations RepartitionByExpression
or Repartition
.
The two operators locate in namespace org.apache.spark.sql.catalyst.plans.logical
. The source code can be found here. The following code snippet is extracted:
/** * A base interface for [[RepartitionByExpression]] and [[Repartition]] */ abstract class RepartitionOperation extends UnaryNode { def shuffle: Boolean def numPartitions: Int override final def maxRows: Option[Long] = child.maxRows override def output: Seq[Attribute] = child.output final override val nodePatterns: Seq[TreePattern] = Seq(REPARTITION_OPERATION) def partitioning: Partitioning } /** * Returns a new RDD that has exactly `numPartitions` partitions. Differs from * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer * of the output requires some specific ordering or distribution of the data. */ case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan) extends RepartitionOperation { require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") override def partitioning: Partitioning = { require(shuffle, "Partitioning can only be used in shuffle.") numPartitions match { case 1 => SinglePartition case _ => RoundRobinPartitioning(numPartitions) } } override protected def withNewChildInternal(newChild: LogicalPlan): Repartition = copy(child = newChild) } /** * This method repartitions data using [[Expression]]s into `optNumPartitions`, and receives * information about the number of partitions during execution. Used when a specific ordering or * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like * `coalesce` and `repartition`. If no `optNumPartitions` is given, by default it partitions data * into `numShufflePartitions` defined in `SQLConf`, and could be coalesced by AQE. */ case class RepartitionByExpression( partitionExpressions: Seq[Expression], child: LogicalPlan, optNumPartitions: Option[Int]) extends RepartitionOperation { val numPartitions = optNumPartitions.getOrElse(conf.numShufflePartitions) require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.") override val partitioning: Partitioning = { val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder]) require(sortOrder.isEmpty || nonSortOrder.isEmpty, s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " + "`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " + "means `HashPartitioning`. In this case we have:" + s""" |SortOrder: $sortOrder |NonSortOrder: $nonSortOrder """.stripMargin) if (numPartitions == 1) { SinglePartition } else if (sortOrder.nonEmpty) { RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]), numPartitions) } else if (nonSortOrder.nonEmpty) { HashPartitioning(nonSortOrder, numPartitions) } else { RoundRobinPartitioning(numPartitions) } } override def shuffle: Boolean = true override protected def withNewChildInternal(newChild: LogicalPlan): RepartitionByExpression = copy(child = newChild) } object RepartitionByExpression { def apply( partitionExpressions: Seq[Expression], child: LogicalPlan, numPartitions: Int): RepartitionByExpression = { RepartitionByExpression(partitionExpressions, child, Some(numPartitions)) } }
Now let's examine some examples about the physial
repartition(1)
We can tell from the source code that SinglePartition
will be used.
case 1 => SinglePartition case _ => RoundRobinPartitioning
repartition(10)
RoundRobinPartitioning
will be used.
repartition('Col1','Col2')
HashPartitioning
will be used.
repartitionByRange(100, "Col1")
RangePartitioning will be used.