Time Travel with Delta Table in PySpark
Code description
Delta Lake provides time travel functionalities to retrieve data at certain point of time or at certain version. This can be done easily using the following two options when reading from delta table as DataFrame:
versionAsOf
- an integer value to specify a version.timestampAsOf
- A timestamp or date string.
This code snippet shows you how to use them in Spark DataFrameReader
APIs. It includes three examples:
- Query data as version 1
- Query data as 30 days ago (using computed value via Spark SQL)
- Query data as certain timestamp ('2022-09-01 12:00:00.999999UTC 10:00')
You may encounter issues if the timestamp is earlier than the earlier commit:
pyspark.sql.utils.AnalysisException: The provided timestamp (2022-08-03 00:00:00.0) is before the earliest version available to this
table (2022-08-27 10:53:18.213). Please use a timestamp after 2022-08-27 10:53:18.
Similarly, if the provided timestamp is later than the last commit, you may encounter another issue like the following:
pyspark.sql.utils.AnalysisException: The provided timestamp: 2022-09-07 00:00:00.0 is after the latest commit timestamp of
2022-08-27 11:30:47.185. If you wish to query this version of the table, please either provide
the version with "VERSION AS OF 1" or use the exact timestamp
of the last commit: "TIMESTAMP AS OF '2022-08-27 11:30:47'".
References
Delta Lake with PySpark Walkthrough
Code snippet
from pyspark.sql import SparkSession from delta.tables import * from pyspark.sql.functions import * if __name__ == "__main__": app_name = "PySpark Delta Lake Time Travel Example" master = "local" # Create Spark session with Delta extension builder = SparkSession.builder.appName(app_name) \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \ .master(master) spark = builder.getOrCreate() spark.sparkContext.setLogLevel("WARN") # Read from Delta Lake table (version 2) df = spark.read.format('delta')\ .option('versionAsOf', '1') \ .load("/data/delta/kafka-events") df.show() last_month = spark.sql("SELECT CAST(add_months(current_date(), -1) AS STRING)").collect()[0][0] df = spark.read.format('delta')\ .option('timestampAsOf', last_month) \ .load("/data/delta/kafka-events") df.show() df = spark.read.format('delta')\ .option('timestampAsOf', '2022-09-01 12:00:00.999999UTC 10:00') \ .load("/data/delta/kafka-events") df.show()