Time Travel with Delta Table in PySpark

Delta Lake provides time travel functionalities to retrieve data at certain point of time or at certain version. This can be done easily using the following two options when reading from delta table as DataFrame: - `versionAsOf `- an integer value to specify a version. - `timestampAsOf `- A timestamp or date string. This code snippet shows you how to use them in Spark `DataFrameReader` APIs. It includes three examples: - Query data as version 1 - Query data as 30 days ago (using computed value via Spark SQL) - Query data as certain timestamp ('2022-09-01 12:00:00.999999UTC 10:00') You may encounter issues if the timestamp is earlier than the earlier commit: > pyspark.sql.utils.AnalysisException: The provided timestamp (2022-08-03 00:00:00.0) is before the earliest version available to this > table (2022-08-27 10:53:18.213). Please use a timestamp after 2022-08-27 10:53:18. Similarly, if the provided timestamp is later than the last commit, you may encounter another issue like the following: > pyspark.sql.utils.AnalysisException: The provided timestamp: 2022-09-07 00:00:00.0 is after the latest commit timestamp of > 2022-08-27 11:30:47.185. If you wish to query this version of the table, please either provide > the version with "VERSION AS OF 1" or use the exact timestamp > of the last commit: "TIMESTAMP AS OF '2022-08-27 11:30:47'". ## References [Delta Lake with PySpark Walkthrough](https://kontext.tech/article/1175/introduction-to-delta-lake-with-pyspark)

Kontext Kontext 1 1562 1.54 index 9/3/2022

Code description

Delta Lake provides time travel functionalities to retrieve data at certain point of time or at certain version. This can be done easily using the following two options when reading from delta table as DataFrame:

  • versionAsOf - an integer value to specify a version.

  • timestampAsOf - A timestamp or date string.

This code snippet shows you how to use them in Spark DataFrameReader APIs. It includes three examples:

  • Query data as version 1
  • Query data as 30 days ago (using computed value via Spark SQL)
  • Query data as certain timestamp ('2022-09-01 12:00:00.999999UTC 10:00')

You may encounter issues if the timestamp is earlier than the earlier commit:

pyspark.sql.utils.AnalysisException: The provided timestamp (2022-08-03 00:00:00.0) is before the earliest version available to this
table (2022-08-27 10:53:18.213). Please use a timestamp after 2022-08-27 10:53:18.

Similarly, if the provided timestamp is later than the last commit, you may encounter another issue like the following:

pyspark.sql.utils.AnalysisException: The provided timestamp: 2022-09-07 00:00:00.0 is after the latest commit timestamp of
2022-08-27 11:30:47.185. If you wish to query this version of the table, please either provide
the version with "VERSION AS OF 1" or use the exact timestamp
of the last commit: "TIMESTAMP AS OF '2022-08-27 11:30:47'".

References

Delta Lake with PySpark Walkthrough

Code snippet

    from pyspark.sql import SparkSession
    from delta.tables import *
    from pyspark.sql.functions import *
    
    if __name__ == "__main__":
        app_name = "PySpark Delta Lake Time Travel Example"
        master = "local"
    
        # Create Spark session with Delta extension
    
        builder = SparkSession.builder.appName(app_name)             .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")             .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")             .master(master)
    
        spark = builder.getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
    
        # Read from Delta Lake table (version 2)
        df = spark.read.format('delta')            .option('versionAsOf', '1')             .load("/data/delta/kafka-events")
        df.show()
    
        last_month = spark.sql("SELECT CAST(add_months(current_date(), -1) AS STRING)").collect()[0][0]
        df = spark.read.format('delta')            .option('timestampAsOf', last_month)             .load("/data/delta/kafka-events")
        df.show()
    
        df = spark.read.format('delta')            .option('timestampAsOf', '2022-09-01 12:00:00.999999UTC 10:00')             .load("/data/delta/kafka-events")
        df.show()
    
delta-lake pyspark

Join the Discussion

View or add your thoughts below

Comments