Spark repartition Function Internals

event 2022-03-28 visibility 1,503 comment 0 insights
more_vert
insights Stats
Spark repartition Function Internals
Raymond Raymond Spark & PySpark

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

*Spark logo is a registered trademark of Apache Spark.


Spark repartition function can be used to repartition your DataFrame. In article Spark repartition vs. coalesce, I've explained the differences between two commonly used functions repartition and coalesce. I will explore more about repartition function internals with some examples about physical partition schemes (operators) for partitioning: HashPartitioningRangePartitioningRoundRobinPartitioning, and SinglePartition.

About PySpark repartition function

The source code of PySpark repartition function shows that it directly calls the Java DataFrame obejct's repartition function.

PySpark:

def repartition(  # type: ignore[misc]
        self, numPartitions: Union[int, "ColumnOrName"], *cols: "ColumnOrName"
    ) -> "DataFrame":

Scala:

def repartition(numPartitions: Int, partitionExprs: Column*): Dataset[T] = {
    repartitionByExpression(Some(numPartitions), partitionExprs)
}
def repartition(numPartitions: Int): Dataset[T] = withTypedPlan {
  Repartition(numPartitions, shuffle = true, logicalPlan)
}
def repartition(partitionExprs: Column*): Dataset[T] = {
  repartitionByExpression(None, partitionExprs)
}

There are variations:

  • When only numPartitions specified, it returns a new Dataset (DataFrame) with exactly numPartitions partitions. 

  • When only columns are specified, it returns a new Dataset (DataFrame) that is partitioned by the given partitioning expressions, using `spark.sql.shuffle.partitions` as number of partitions. By default, it is 200 partitions. 

  • When both specified, it returns a new Dataset (DataFrame) that is partitioned by the given partitioning expressions, using `numPartitions` as number of partitions. 

The returned Dataset (DataFrame) is hash partitioned. 

Repartition and RepartitionByExpression operators

As shown in the above code, repartition function results in logical operations RepartitionByExpression or Repartition

The two operators locate in namespace org.apache.spark.sql.catalyst.plans.logical. The source code can be found here. The following code snippet is extracted:

/**
 * A base interface for [[RepartitionByExpression]] and [[Repartition]]
 */
abstract class RepartitionOperation extends UnaryNode {
  def shuffle: Boolean
  def numPartitions: Int
  override final def maxRows: Option[Long] = child.maxRows
  override def output: Seq[Attribute] = child.output
  final override val nodePatterns: Seq[TreePattern] = Seq(REPARTITION_OPERATION)
  def partitioning: Partitioning
}

/**
 * Returns a new RDD that has exactly `numPartitions` partitions. Differs from
 * [[RepartitionByExpression]] as this method is called directly by DataFrame's, because the user
 * asked for `coalesce` or `repartition`. [[RepartitionByExpression]] is used when the consumer
 * of the output requires some specific ordering or distribution of the data.
 */
case class Repartition(numPartitions: Int, shuffle: Boolean, child: LogicalPlan)
  extends RepartitionOperation {
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")

  override def partitioning: Partitioning = {
    require(shuffle, "Partitioning can only be used in shuffle.")
    numPartitions match {
      case 1 => SinglePartition
      case _ => RoundRobinPartitioning(numPartitions)
    }
  }
  override protected def withNewChildInternal(newChild: LogicalPlan): Repartition =
    copy(child = newChild)
}

/**
 * This method repartitions data using [[Expression]]s into `optNumPartitions`, and receives
 * information about the number of partitions during execution. Used when a specific ordering or
 * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like
 * `coalesce` and `repartition`. If no `optNumPartitions` is given, by default it partitions data
 * into `numShufflePartitions` defined in `SQLConf`, and could be coalesced by AQE.
 */
case class RepartitionByExpression(
    partitionExpressions: Seq[Expression],
    child: LogicalPlan,
    optNumPartitions: Option[Int]) extends RepartitionOperation {

  val numPartitions = optNumPartitions.getOrElse(conf.numShufflePartitions)
  require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")

  override val partitioning: Partitioning = {
    val (sortOrder, nonSortOrder) = partitionExpressions.partition(_.isInstanceOf[SortOrder])

    require(sortOrder.isEmpty || nonSortOrder.isEmpty,
      s"${getClass.getSimpleName} expects that either all its `partitionExpressions` are of type " +
        "`SortOrder`, which means `RangePartitioning`, or none of them are `SortOrder`, which " +
        "means `HashPartitioning`. In this case we have:" +
      s"""
         |SortOrder: $sortOrder
         |NonSortOrder: $nonSortOrder
       """.stripMargin)

    if (numPartitions == 1) {
      SinglePartition
    } else if (sortOrder.nonEmpty) {
      RangePartitioning(sortOrder.map(_.asInstanceOf[SortOrder]), numPartitions)
    } else if (nonSortOrder.nonEmpty) {
      HashPartitioning(nonSortOrder, numPartitions)
    } else {
      RoundRobinPartitioning(numPartitions)
    }
  }

  override def shuffle: Boolean = true

  override protected def withNewChildInternal(newChild: LogicalPlan): RepartitionByExpression =
    copy(child = newChild)
}

object RepartitionByExpression {
  def apply(
      partitionExpressions: Seq[Expression],
      child: LogicalPlan,
      numPartitions: Int): RepartitionByExpression = {
    RepartitionByExpression(partitionExpressions, child, Some(numPartitions))
  }
}

Now let's examine some examples about the physial 

repartition(1)

We can tell from the source code that SinglePartition will be used. 

case 1 => SinglePartition
case _ => RoundRobinPartitioning

repartition(10)

RoundRobinPartitioning will be used. 

repartition('Col1','Col2')

HashPartitioning will be used. 

repartitionByRange(100, "Col1")

RangePartitioning will be used.

References

Partitioning ยท The Internals of Spark SQL

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts