PySpark DataFrame - rank() and dense_rank() Functions

Kontext Kontext event 2022-08-18 visibility 2,116
more_vert

Code description

In Spark SQL, rank and dense_rank functions can be used to rank the rows within a window partition. In Spark SQL, we can use RANK(Spark SQL - RANK Window Function) and DENSE_RANK(Spark SQL - DENSE_RANK Window Function). This code snippet implements ranking directly using PySpark DataFrame APIs instead of Spark SQL. It created a window that partitions the data by TXN_DT attribute and sorts the records in each partition via AMT column in descending order. The frame boundary of the window is defined as unbounded preceding and current row. 

Output:

+----+------+-------------------+----+----------+
|ACCT|   AMT|             TXN_DT|rank|dense_rank|
+----+------+-------------------+----+----------+
| 101|102.01|2021-01-01 00:00:00|   1|         1|
| 102|  93.0|2021-01-01 00:00:00|   2|         2|
| 101| 10.01|2021-01-01 00:00:00|   3|         3|
| 103| 913.1|2021-01-02 00:00:00|   1|         1|
| 101|900.56|2021-01-03 00:00:00|   1|         1|
| 102|900.56|2021-01-03 00:00:00|   1|         1|
| 103|  80.0|2021-01-03 00:00:00|   3|         2|
+----+------+-------------------+----+----------+

As printed out, the difference between dense_rank and rank is that the former will not generate any gaps if the ranked values are the same for multiple rows. 

Code snippet

from pyspark.sql import SparkSession, Window
from datetime import datetime
from pyspark.sql.functions import rank, dense_rank, desc

app_name = "PySpark rank and dense_rank Window Function"
master = "local"

spark = SparkSession.builder \
    .appName(app_name) \
    .master(master) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

data = [
    [101, 10.01, datetime.strptime('2021-01-01', '%Y-%m-%d')],
    [101, 102.01, datetime.strptime('2021-01-01', '%Y-%m-%d')],
    [102, 93.0, datetime.strptime('2021-01-01', '%Y-%m-%d')],
    [103, 913.1, datetime.strptime('2021-01-02', '%Y-%m-%d')],
    [101, 900.56, datetime.strptime('2021-01-03', '%Y-%m-%d')],
    [102, 900.56, datetime.strptime('2021-01-03', '%Y-%m-%d')],
    [103, 80.0, datetime.strptime('2021-01-03', '%Y-%m-%d')]
]

df = spark.createDataFrame(data, ['ACCT', 'AMT', 'TXN_DT'])
df.show()

window = Window.partitionBy('TXN_DT').orderBy(desc("AMT")).rowsBetween(
    Window.unboundedPreceding, Window.currentRow)
df = df.withColumn('rank', rank().over(window)).withColumn(
    'dense_rank', dense_rank().over(window))

df.show()
More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts