Spark 2.x to 3.x - Date, Timestamp and Int96 Rebase Modes

Kontext Kontext event 2022-06-19 visibility 10,377
more_vert

Context

When migrating from Spark 2.x to 3.x, users may encounter a common exception about date time parser like the following message shows. This can occur when reading and writing parquet and Avro files in open source Spark, CDH Spark, Azure HDInsights, GCP Dataproc, AWS EMR or Glue, Databricks, etc. It can also happen when you use built-in date time parse related functions. 

You may get a different result due to the upgrading of Spark 3.0
Fail to parse *** in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.

Or

Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: 
writing dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z into Parquet
files can be dangerous, as the files may be read by Spark 2.x or legacy versions of Hive
later, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic
Gregorian calendar. See more details in SPARK-31404. You can set spark.sql.parquet.datetimeRebaseModeInWrite to 'LEGACY' to rebase the datetime values w.r.t. the calendar difference during writing, to get maximum
interoperability. Or set spark.sql.parquet.datetimeRebaseModeInWrite to 'CORRECTED' to write the datetime values as it is,
if you are 100% sure that the written files will only be read by Spark 3.0+ or other
systems that use Proleptic Gregorian calendar.

The issue happens because Proleptic Gregorian calendar is used in parsing, formatting, and converting dates and timestamps as well as in extracting sub-components like years, days and so on from Spark 3.0.0. This is to get consistent with ISO standards. Before that, a hybrid Julian + Gregorian calendar was used, which impacts on the results for dates before October 15, 1582 (Gregorian). 

About Int96 timestamp encoding (Impala Timestamp)

Before we dive a little bit more into the solutions to fix these issues, let's talk more about Int96 timestamp encoding in Parquet. This type is now deprecated but some processes may still uses it due to legacy reasons. Timestamps saved as an int96 are made up of the nanoseconds in the day (first 8 byte) and the Julian day (last 4 bytes). There is no timezone information attached to this value. To convert the timestamp into Unix timestamp, nanoseconds since epoch, 00:00:00.000000 on 1 January 1970, the following formula can be used: (julian_day - 2440588) * (86400 * 1000 * 1000 * 1000) + nanoseconds. Number 2440588 is the Julian day for 1 January 1970.

Reproduce the error

We can reproduce the error by creating a parquet, Avro or Orc file using Spark 2.4.8 and then read is using Spark 3.2.1. For simplicity, we can just use PySpark to test this out in different Anaconda environments with different versions of PySpark installed.  

$ conda env list
# conda environments:
#
base                     /home/kontext/miniconda3
pyspark2.1.2          *  /home/kontext/miniconda3/envs/pyspark2.1.2
pyspark3.2.1             /home/kontext/miniconda3/envs/pyspark3.2.1

Create the data files using Spark 2

For pyspark2.1.2 environment, make sure Python version is lower than 3.8 as some packages don't work in Python 3.8+. The following steps reproduce the error.

In pyspark2.1.2 environment, run the following script:

# spark-datatime-rebase-v2.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date

appName = "PySpark Example - Spark 2.x DateTime Example"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

data = [{'id': 1, 'dt': '1200-01-01'}, {'id': 2, 'dt': '2022-06-19'}]

df = spark.createDataFrame(data)
df = df.withColumn('dt', to_date(df['dt']))
print(df.schema)
df.show()
# Write to a folder
df.write.format('parquet').mode('overwrite').save(
    'file:///home/kontext/pyspark-examples/data/datetime-sparkv2')

Make sure you run the script using the right version of Spark:

$ export PATH=/home/kontext/miniconda3/envs/pyspark2.1.2/bin:$PATH
$ which spark-submit
/home/kontext/miniconda3/envs/pyspark2.1.2/bin/spark-submit

Run the script using command line:

spark-submit spark-datatime-rebase-v2.py

Now the data is saved to the local file system successfully:

$ ls ./data/datetime-sparkv2/ -alt
total 20
drwxr-xr-x 2 kontext kontext 4096 Jun 19 22:44 .
-rw-r--r-- 1 kontext kontext    8 Jun 19 22:44 ._SUCCESS.crc
-rw-r--r-- 1 kontext kontext    0 Jun 19 22:44 _SUCCESS
-rw-r--r-- 1 kontext kontext   16 Jun 19 22:44 .part-00000-e31c5e65-e4be-477c-828b-7d272a344c8e.snappy.parquet.crc
-rw-r--r-- 1 kontext kontext  543 Jun 19 22:44 part-00000-e31c5e65-e4be-477c-828b-7d272a344c8e.snappy.parquet
drwxr-xr-x 6 kontext kontext 4096 Jun 19 22:44 ..

Read the data using Spark 3

Now switch the environment to pyspark3.2.1.

conda activate pyspark3.2.1

Run the following script to read the data:

# spark-datatime-rebase-v3.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Spark 3.x DateTime Example"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

df = spark.read.format('parquet').load(
    'file:///home/kontext/pyspark-examples/data/datetime-sparkv2')
df.show()
When run the following script using this:
spark-submit spark-datatime-rebase-v3.py

The following exception is thrown as expected:

Caused by: org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: 
reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet
files can be ambiguous, as the files may be written by Spark 2.x or legacy versions of
Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+'s Proleptic
Gregorian calendar. See more details in SPARK-31404. You can set the SQL config
'spark.sql.parquet.datetimeRebaseModeInRead' or the datasource option 'datetimeRebaseMode' to 'LEGACY' to rebase the datetime values
w.r.t. the calendar difference during reading. To read the datetime values as it is,
set the SQL config 'spark.sql.parquet.datetimeRebaseModeInRead' or the datasource option 'datetimeRebaseMode' to 'CORRECTED'.
       
        at org.apache.spark.sql.errors.QueryExecutionErrors$.sparkUpgradeInReadingDatesError(QueryExecutionErrors.scala:474)
        at org.apache.spark.sql.execution.datasources.DataSourceUtils$.newRebaseExceptionInRead(DataSourceUtils.scala:171)
        at org.apache.spark.sql.execution.datasources.DataSourceUtils.newRebaseExceptionInRead(DataSourceUtils.scala)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.rebaseDays(ParquetVectorUpdaterFactory.java:1052)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.access$1200(ParquetVectorUpdaterFactory.java:43)
        at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$IntegerWithRebaseUpdater.readValue(ParquetVectorUpdaterFactory.java:388)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:238)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:171)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:227)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:298)
        at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:196)
        at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:191)
        at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:104)
        at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:522)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        ... 1 more

Fix the issue

As the error message shows, we can configure Spark date time rebase mode when read and writing data.

Fix parquet date time rebase issue

For parquet format, there are several configurations can be used to control the behavior of date and timestamp rebase mode.

  • spark.sql.parquet.datetimeRebaseModeInRead - The rebasing mode for the values of the DATE, TIMESTAMP_MILLIS, TIMESTAMP_MICROS logical types from the Julian to Proleptic Gregorian calendar:
    EXCEPTION: Spark will fail the reading if it sees ancient dates/timestamps that are ambiguous between the two calendars.
    CORRECTED: Spark will not do rebase and read the dates/timestamps as it is.
    LEGACY: Spark will rebase dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar to Proleptic Gregorian calendar when reading Parquet files.
    The default value is EXCEPTION. 
  • spark.sql.parquet.datetimeRebaseModeInWrite - The rebasing mode for the values of the DATE, TIMESTAMP_MILLIS, TIMESTAMP_MICROS logical types from the Proleptic Gregorian to Julian calendar:
    EXCEPTION: Spark will fail the writing if it sees ancient dates/timestamps that are ambiguous between the two calendars.
    CORRECTED: Spark will not do rebase and write the dates/timestamps as it is.
    LEGACY: Spark will rebase dates/timestamps from Proleptic Gregorian calendar to the legacy hybrid (Julian + Gregorian) calendar when writing Parquet files.
    The default value is EXCEPTION.
  • spark.sql.parquet.int96RebaseModeInRead - Similar as spark.sql.parquet.datetimeRebaseModeInRead.
  • spark.sql.parquet.int96RebaseModeInWrite - Similar as spark.sql.parquet.datetimeRebaseModeInWrite.

For the issue in this example, we can setup the read mode to LEGACY. Update the previous script file spark-datatime-rebase-v3.py as the following code snippet to add an extra configuration:

# spark-datatime-rebase-v3.py
from pyspark.sql import SparkSession

appName = "PySpark Example - Spark 3.x DateTime Example"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config('spark.sql.parquet.datetimeRebaseModeInRead', 'LEGACY') \
    .getOrCreate()

df = spark.read.format('parquet').load(
    'file:///home/kontext/pyspark-examples/data/datetime-sparkv2')
df.show()

Run the script and the following output will show:

+----------+---+
|        dt| id|
+----------+---+
|1200-01-01|  1|
|2022-06-19|  2|
+----------+---+

Fix Avro date time rebase issue

Similarly, we can use the following two configurations to impact Spark's behavior when reading and writing legacy date time values to Avro file.

  • spark.sql.avro.datetimeRebaseModeInRead - Similar as spark.sql.parquet.datetimeRebaseModeInRead.
  • spark.sql.avro.datetimeRebaseModeInWrite - Similar as spark.sql.parquet.datetimeRebaseModeInWrite.

To use these configurations, we just need to add them when creating Spark session:

spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config('spark.sql.avro.datetimeRebaseModeInRead', 'LEGACY') \
    .config('spark.sql.avro.datetimeRebaseModeInRead', 'CORRECTED') \
    .getOrCreate()

Based the situation, please select the right mode accordingly. 

References

Migration Guide: SQL, Datasets and DataFrame - Spark 3.3.0 Documentation

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts