Save Spark DataFrame to Teradata and Resolve Common Errors

Raymond Raymond event 2021-03-08 visibility 2,698 comment 9
more_vert

In article Save DataFrame to SQL Databases via JDBC in PySpark, I showed examples about saving Spark DataFrame to a relational database like Teradata. Most recently I was asked by several friends that they encountered errors when using JDBC approach to save data to Teradata.  Thus I am summarizing one approach to address that issue. 

About the error

The detailed exception looks like the following:

py4j.protocol.Py4JJavaError: An error occurred while calling o56.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, raymond-pc.mshome.net, executor driver): java.sql.BatchUpdateException: [Teradata JDBC Driver] [TeraJDBC 16.20.00.13] [Error 1338] [SQLState HY000] A failure occurred while executing a PreparedStatement batch request. Details of the failure can be found in the exception chain that is accessible with getNextException.
        at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeBatchUpdateException(ErrorFactory.java:149)
        at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeBatchUpdateException(ErrorFactory.java:138)
        at com.teradata.jdbc.jdbc_4.TDPreparedStatement.executeBatchDMLArray(TDPreparedStatement.java:277)
        at com.teradata.jdbc.jdbc_4.TDPreparedStatement.executeBatch(TDPreparedStatement.java:2755)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:691)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:858)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:856)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:994)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:994)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2133)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.13] [Error -2802] [SQLState 23000] Duplicate row error in TestDb.spark_jdbc_test.
        at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:302)
        at com.teradata.jdbc.jdbc_4.statemachine.ReceiveEndSubState.action(ReceiveEndSubState.java:95)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:311)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:200)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:137)
        at com.teradata.jdbc.jdbc_4.statemachine.PreparedBatchStatementController.run(PreparedBatchStatementController.java:58)
        at com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:389)
        at com.teradata.jdbc.jdbc_4.TDPreparedStatement.executeBatchDMLArray(TDPreparedStatement.java:257)
        ... 15 more

Reproduce the error

The following PySpark script can reproduce the error.

from pyspark.sql import SparkSession
import pandas as pd

appName = "PySpark Teradata Example"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

database = "TestDb"
table = "spark_jdbc_test"
user = "dbc"
password = "dbc"
driver = 'com.teradata.jdbc.TeraDriver'

# Construct a pandas DataFrame
users = {'ID': [1, 1, 3], 'Value': ['A', 'A', 'C']}
pdf = pd.DataFrame(users, columns=['ID', 'Value'])

# Create Spark DaraFrame from pandas DataFrame
df = spark.createDataFrame(pdf)

# Save the result to database
df.write \
    .format('jdbc') \
    .option('driver', driver) \
    .option('url', f'jdbc:teradata://192.168.119.128/Database={database}') \
    .option("dbtable", table) \
    .option('user', user) \
    .option('password', password) \
    .mode('overwrite')\
    .save()

spark.stop()

The script is very simple. It creates a Pandas data frame in memory and then converts it to Spark DataFrame which is finally saved to Teradata using JDBC driver. If the table already exists, it will be dropped and recreated. 

Root cause

In fact, the error message itself already points out the issue: 

java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.13] [Error -2802] [SQLState 23000] Duplicate row error in TestDb.spark_jdbc_test.

Thus to address this issue, we just need to deduplicate or specify table primary index.

Resolutions

To resolve this issue, we can remove duplicates from the DataFrame. In Spark, we can achieve this using distinct or dropDuplicate function.

df.distinct().write \
    .format('jdbc') \
    .option('driver', driver) \
    .option('url', f'jdbc:teradata://192.168.119.128/Database={database}') \
    .option("dbtable", table) \
    .option('user', user) \
    .option('password', password) \
    .mode('overwrite')\
    .save()

In the database, only two records will be saved.

  ID Value

1 3 C

2 1 A

If you do need to keep the duplicates, one approach is to add a unique number for each row to make each row unique. 

We can either use the index column in Pandas DataFrame if you can or we can use Spark function like 'monotonically_increasing_id' to assign a unique value. 

Note: this function doesn't return sequential values. 

df.withColumn('idx', monotonically_increasing_id()).write \
    .format('jdbc') \
    .option('driver', driver) \
    .option('url', f'jdbc:teradata://192.168.119.128/Database={database}') \
    .option("dbtable", table) \
    .option('user', user) \
    .option('password', password) \
    .option("createTableOptions", "PRIMARY INDEX (idx)")\
    .mode('overwrite')\
    .save()

Remember to import this function first:

from pyspark.sql.functions import monotonically_increasing_id
infoOption createTableOptions is used to explicitly specify the primary index for the Teradata table. 

The saved table has three columns and three rows as expected:

ID Value idx

1 1 A 1

2 1 A 0

3 3 C 2

More from Kontext
comment Comments
V vignesh jaisankar

vignesh access_time 3 years ago link more_vert

Hey Raymond, Thanks for writing this insightful post. I appreciate your contribution. I found few of your posts that is supporting my project currently.

I have a question Please write back:

I'm trying to do similar exercise that you have explained above.
the data frame is having unique records. so tried loading it in teradata and it was throwing above duplicate record error.

so i have created a new empty table in teradata with primary idex with multiple key columns but during writing of table from databricks the primary index is getting changed to a single column.
Does this cause this duplication error?

If I use .option("createTableOptions", "PRIMARY INDEX (idx)") this option with multiple column will solve my issue.

anyway Ill try this method too.

Kindly please write back to this post.

Note: multiple column is used as primary key for this table 

Raymond Raymond

Raymond access_time 3 years ago link more_vert

Welcome to Kontext! 

If you are writing table with overwrite option, the table will be recreated hence the previously created primary index will be gone.

I suggest doing the following:

  1. If you want to use overwrite mode, make sure you add primary index option as mentioned in the article.
  2. If you don't want to recreate table each time when it is loaded, you can truncate the table (if it is allowed and reasonable for your scenario) and use append mode.
V vignesh jaisankar

vignesh access_time 3 years ago link more_vert

Thanks Raymond for writing back. One issue I seen is that while we are writing in Teradata, the tables are getting created as SET table. Even I had unique records and providing index columns gave me the same error.

Setting TMODE : "ANSI" in property creating MULTISET table and it solved my issue.

Raymond Raymond

Raymond access_time 3 years ago link more_vert

I'm glad you find the solution. As mentioned in the following page, the default table type will be MULTISET if you create column partitioned table regardless of the session mode:

SET and MULTISET • Teradata Database SQL Data Definition Language Detailed Topics • Reader • Teradata Online Documentation | Quick access to technical manuals

Alternatively, if you need a MULTISET table, you can use ANSI mode as you did which will by default create MULTISET table.

SJ S Jayanth Satya Shankar

S Jayanth access_time 3 years ago link more_vert

Hi @Raymond Is there any way to explicitly mention the primary-index column name of the data-frame before writing it to Teradata. 

Raymond Raymond

Raymond access_time 3 years ago link more_vert

If the table doesn’t exist, you can then use the option mentioned in the article:

option("createTableOptions", "PRIMARY INDEX (idx)")

If the table already exists with primary index definition, then there is no need to specify it.

Am I answering your question?

SJ S Jayanth Satya Shankar

S Jayanth access_time 3 years ago link more_vert

Yes, I did try that option. It worked; Thank's for the reply.

(1) As of now my spark is running in local mode[4-core], Could you please provide me some insights to reduce the "df.write" run-time. My data Frame has 1.4M records, On an average its taking 12-20 minutes to write the data to the TereData Database.

(2) Let's say my dataframe have 8 partitions. I am being forced to repartiton my dataframe to 1 so that it wont throw an Error :"Caused by: java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.12] [Error 2631] [SQLState 40001] Transaction ABORTed due to deadlock".

Any insights would be greatly appreciated.

Thank you !

SJ S Jayanth Satya Shankar

S Jayanth access_time 3 years ago link more_vert

Yes, I did try that option. It worked; Thank's for the reply.

(1) As of now my spark is running in local mode[4-core], Could you please provide me some insights to reduce the "df.write" run-time. My data Frame has 1.4M records, On an average its taking 12-20 minutes to write the data to the TereData Database.

(2) Let's say my dataframe have 8 partitions. I am being forced to repartiton my dataframe to 1 so that it wont throw an Error :"Caused by: java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.12] [Error 2631] [SQLState 40001] Transaction ABORTed due to deadlock".

Any insights would be greatly appreciated.

Thank you !

Raymond Raymond

Raymond access_time 3 years ago link more_vert

If you can partition your Teradata table accordingly, you can try to align the partition column in your Spark DataFrame and Teradata table definition via partition related options in your JDBC connection.

.option("numPartitions", "4")

Technically it will establish 4 connections to insert into the table (if your mode is append). However I have not tried this in any of my code so I am not quite sure whether it works or performs better. 

Since you run it locally, you can also try FastLoad Java API:

*Sometimes one connection with FastLoad performs better.

Teradata JDBC Driver User Guide Chapter 2 (teradata-docs.s3.amazonaws.com)


Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts