PySpark DataFrame - rank() and dense_rank() Functions
Code description
In Spark SQL, rank
and dense_rank
functions can be used to rank the rows within a window partition. In Spark SQL, we can use RANK(Spark SQL - RANK Window Function) and DENSE_RANK(Spark SQL - DENSE_RANK Window Function). This code snippet implements ranking directly using PySpark DataFrame APIs instead of Spark SQL. It created a window that partitions the data by TXN_DT
attribute and sorts the records in each partition via AMT
column in descending order. The frame boundary of the window is defined as unbounded preceding and current row.
Output:
+----+------+-------------------+----+----------+ |ACCT| AMT| TXN_DT|rank|dense_rank| +----+------+-------------------+----+----------+ | 101|102.01|2021-01-01 00:00:00| 1| 1| | 102| 93.0|2021-01-01 00:00:00| 2| 2| | 101| 10.01|2021-01-01 00:00:00| 3| 3| | 103| 913.1|2021-01-02 00:00:00| 1| 1| | 101|900.56|2021-01-03 00:00:00| 1| 1| | 102|900.56|2021-01-03 00:00:00| 1| 1| | 103| 80.0|2021-01-03 00:00:00| 3| 2| +----+------+-------------------+----+----------+
As printed out, the difference between dense_rank
and rank
is that the former will not generate any gaps if the ranked values are the same for multiple rows.
Code snippet
from pyspark.sql import SparkSession, Window from datetime import datetime from pyspark.sql.functions import rank, dense_rank, desc app_name = "PySpark rank and dense_rank Window Function" master = "local" spark = SparkSession.builder \ .appName(app_name) \ .master(master) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") data = [ [101, 10.01, datetime.strptime('2021-01-01', '%Y-%m-%d')], [101, 102.01, datetime.strptime('2021-01-01', '%Y-%m-%d')], [102, 93.0, datetime.strptime('2021-01-01', '%Y-%m-%d')], [103, 913.1, datetime.strptime('2021-01-02', '%Y-%m-%d')], [101, 900.56, datetime.strptime('2021-01-03', '%Y-%m-%d')], [102, 900.56, datetime.strptime('2021-01-03', '%Y-%m-%d')], [103, 80.0, datetime.strptime('2021-01-03', '%Y-%m-%d')] ] df = spark.createDataFrame(data, ['ACCT', 'AMT', 'TXN_DT']) df.show() window = Window.partitionBy('TXN_DT').orderBy(desc("AMT")).rowsBetween( Window.unboundedPreceding, Window.currentRow) df = df.withColumn('rank', rank().over(window)).withColumn( 'dense_rank', dense_rank().over(window)) df.show()
copyright
This page is subject to Site terms.
comment Comments
No comments yet.