Kontext Kontext

Time Travel with Delta Table in PySpark

event 2022-09-03 visibility 1,225 comment 0 insights
more_vert
insights Stats

Code description

Delta Lake provides time travel functionalities to retrieve data at certain point of time or at certain version. This can be done easily using the following two options when reading from delta table as DataFrame:

  • versionAsOf - an integer value to specify a version.
  • timestampAsOf - A timestamp or date string.

This code snippet shows you how to use them in Spark DataFrameReader APIs. It includes three examples:

  • Query data as version 1
  • Query data as 30 days ago (using computed value via Spark SQL)
  • Query data as certain timestamp ('2022-09-01 12:00:00.999999UTC 10:00')

You may encounter issues if the timestamp is earlier than the earlier commit:

pyspark.sql.utils.AnalysisException: The provided timestamp (2022-08-03 00:00:00.0) is before the earliest version available to this
table (2022-08-27 10:53:18.213). Please use a timestamp after 2022-08-27 10:53:18.

Similarly, if the provided timestamp is later than the last commit, you may encounter another issue like the following:

pyspark.sql.utils.AnalysisException: The provided timestamp: 2022-09-07 00:00:00.0 is after the latest commit timestamp of
2022-08-27 11:30:47.185. If you wish to query this version of the table, please either provide
the version with "VERSION AS OF 1" or use the exact timestamp
of the last commit: "TIMESTAMP AS OF '2022-08-27 11:30:47'".

References

Delta Lake with PySpark Walkthrough

Code snippet

from pyspark.sql import SparkSession
from delta.tables import *
from pyspark.sql.functions import *

if __name__ == "__main__":
    app_name = "PySpark Delta Lake Time Travel Example"
    master = "local"

    # Create Spark session with Delta extension

    builder = SparkSession.builder.appName(app_name) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .master(master)

    spark = builder.getOrCreate()

    spark.sparkContext.setLogLevel("WARN")

    # Read from Delta Lake table (version 2)
    df = spark.read.format('delta')\
        .option('versionAsOf', '1') \
        .load("/data/delta/kafka-events")
    df.show()

    last_month = spark.sql("SELECT CAST(add_months(current_date(), -1) AS STRING)").collect()[0][0]
    df = spark.read.format('delta')\
        .option('timestampAsOf', last_month) \
        .load("/data/delta/kafka-events")
    df.show()

    df = spark.read.format('delta')\
        .option('timestampAsOf', '2022-09-01 12:00:00.999999UTC 10:00') \
        .load("/data/delta/kafka-events")
    df.show()
More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts