Save Spark DataFrame to Teradata and Resolve Common Errors

Raymond Tang Raymond Tang 1 2839 1.87 index 3/8/2021

In article Save DataFrame to SQL Databases via JDBC in PySpark, I showed examples about saving Spark DataFrame to a relational database like Teradata. Most recently I was asked by several friends that they encountered errors when using JDBC approach to save data to Teradata.  Thus I am summarizing one approach to address that issue.

About the error

The detailed exception looks like the following:

py4j.protocol.Py4JJavaError: An error occurred while calling o56.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, raymond-pc.mshome.net, executor driver): java.sql.BatchUpdateException: [Teradata JDBC Driver] [TeraJDBC 16.20.00.13] [Error 1338] [SQLState HY000] A failure occurred while executing a PreparedStatement batch request. Details of the failure can be found in the exception chain that is accessible with getNextException.
        at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeBatchUpdateException(ErrorFactory.java:149)
        at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeBatchUpdateException(ErrorFactory.java:138)
        at com.teradata.jdbc.jdbc_4.TDPreparedStatement.executeBatchDMLArray(TDPreparedStatement.java:277)
        at com.teradata.jdbc.jdbc_4.TDPreparedStatement.executeBatch(TDPreparedStatement.java:2755)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:691)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:858)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:856)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:994)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:994)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2133)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.13] [Error -2802] [SQLState 23000] Duplicate row error in TestDb.spark_jdbc_test.
        at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:302)
        at com.teradata.jdbc.jdbc_4.statemachine.ReceiveEndSubState.action(ReceiveEndSubState.java:95)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:311)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:200)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:137)
        at com.teradata.jdbc.jdbc_4.statemachine.PreparedBatchStatementController.run(PreparedBatchStatementController.java:58)
        at com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:389)
        at com.teradata.jdbc.jdbc_4.TDPreparedStatement.executeBatchDMLArray(TDPreparedStatement.java:257)
        ... 15 more

Reproduce the error

The following PySpark script can reproduce the error.

from pyspark.sql import SparkSession
import pandas as pd

appName = "PySpark Teradata Example"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

database = "TestDb"
table = "spark_jdbc_test"
user = "dbc"
password = "dbc"
driver = 'com.teradata.jdbc.TeraDriver'

# Construct a pandas DataFrame
users = {'ID': [1, 1, 3], 'Value': ['A', 'A', 'C']}
pdf = pd.DataFrame(users, columns=['ID', 'Value'])

# Create Spark DaraFrame from pandas DataFrame
df = spark.createDataFrame(pdf)

# Save the result to database
df.write \
    .format('jdbc') \
    .option('driver', driver) \
    .option('url', f'jdbc:teradata://192.168.119.128/Database={database}') \
    .option("dbtable", table) \
    .option('user', user) \
    .option('password', password) \
    .mode('overwrite')\
    .save()

spark.stop()

The script is very simple. It creates a Pandas data frame in memory and then converts it to Spark DataFrame which is finally saved to Teradata using JDBC driver. If the table already exists, it will be dropped and recreated.

Root cause

In fact, the error message itself already points out the issue:

java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.13] [Error -2802] [SQLState 23000] Duplicate row error in TestDb.spark_jdbc_test.

Thus to address this issue, we just need to deduplicate or specify table primary index.

Resolutions

To resolve this issue, we can remove duplicates from the DataFrame. In Spark, we can achieve this using distinctor dropDuplicatefunction.

df.distinct().write \
    .format('jdbc') \
    .option('driver', driver) \
    .option('url', f'jdbc:teradata://192.168.119.128/Database={database}') \
    .option("dbtable", table) \
    .option('user', user) \
    .option('password', password) \
    .mode('overwrite')\
    .save()

In the database, only two records will be saved.

ID Value

1 3 C

2 1 A

If you do need to keep the duplicates, one approach is to add a unique number for each row to make each row unique.

We can either use the index column in Pandas DataFrame if you can or we can use Spark function like 'monotonically_increasing_id' to assign a unique value.

Note: this function doesn't return sequential values.

df.withColumn('idx', monotonically_increasing_id()).write \
    .format('jdbc') \
    .option('driver', driver) \
    .option('url', f'jdbc:teradata://192.168.119.128/Database={database}') \
    .option("dbtable", table) \
    .option('user', user) \
    .option('password', password) \
    .option("createTableOptions", "PRIMARY INDEX (idx)")\
    .mode('overwrite')\
    .save()

Remember to import this function first:

from pyspark.sql.functions import monotonically_increasing_id

infoOption createTableOptions is used to explicitly specify the primary index for the Teradata table. 

The saved table has three columns and three rows as expected:

ID Value idx

1 1 A 1

2 1 A 0

3 3 C 2

pyspark spark teradata

Join the Discussion

View or add your thoughts below

Comments