Save DataFrame to SQL Databases via JDBC in PySpark

access_time 9 months ago visibility7692 comment 0

In my previous article about Connect to SQL Server in Spark (PySpark), I mentioned the ways to read data from SQL Server databases as dataframe using JDBC. We can also use JDBC to write data from Spark dataframe to database tables. In the following sections, I'm going to show you how to write dataframe into SQL Server. However you can definitely extend it to other databases, for example MySQL, Oracle, Teradata, DB2, etc. as long as JDBC driver is available.

Spark write with JDBC API

We can easily use spark.DataFrame.write.format('jdbc') to write into any JDBC compatible databases. There are many options you can specify with this API. For example, you can customize the schema or specify addtional options when creating CREATE TABLE statements. Refer to References section on this page for more details. 

Write mode

Write mode can be used to control write behavior. It specifies the behavior of the save operation when data already exists.

  • append: Append contents of this DataFrame to existing data. For this scenario, data will be appended into existing database table. 
  • overwrite: Overwrite existing data. For this scenario, new tables will be created unless truncate option is used. 
  • ignore: Silently ignore this operation if table already exists.
  • error or errorifexists (default case): Throw an exception if the table already exists.

Sample code

The following is one complete example.

from pyspark import SparkContext, SparkConf, SQLContext

appName = "PySpark SQL Server Example - via JDBC"
master = "local"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
    .set("spark.driver.extraClassPath","sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

database = "test"
src_table = "dbo.Employees"
user = "zeppelin"
password  = "zeppelin"


jdbcUrl = f"jdbc:sqlserver://localhost:1433;databaseName={database}"
jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# Create a data frame by reading data from SQL Server via JDBC
jdbcDF = spark.read.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", "Employees") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbcDriver) \
    .load()

jdbcDF.show()

# We can also save the data frame to the database via JBBC too.

jdbcDF.select("EmployeeID","EmployeeName", "Position").write.format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbcUrl) \
  .option("dbtable", "dbo.Employees2") \
  .option("user", user) \
  .option("password", password) \
  .save()

It simply does the following work:

  • Read data from SQL Server table dbo.Employees as a Spark dataframe using JDBC driver.
  • Select a few columns from the table and then save this new dataframe into a new table named dbo.Employees2.

Run the script

python script-name.py

Things to pay attention to

In the sample code, I used a driver locates at 'sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar'. Make sure the JDBC driver your are using matches with your environment Java version. For mine, Java version is:

$ java -version
openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-8u212-b03-0ubuntu1.18.04.1-b03)
OpenJDK 64-Bit Server VM (build 25.212-b03, mixed mode)

You also need to ensure driver JAR file location path is correct.

If you use spark-submit command to run the script, ensure you pass in locations of your driver JAR files via option --jars:

spark-submit --jars /path/to/sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar  script-name.py

One more thing you need to ensure is the permission to create tables or write data into your tables in your database.

For my scenario, zeppelin user is mapped with db_datawriter and db_ddladmin roles for test database so that it can create tables and insert data into tables.

After I run the scripts, a new table is created in test database as the following screenshot shows:

Create table options

As mentioned earlier, sometimes it's useful to have custom CREATE TABLE options. One good example is that in Teradata, you need to specify primary index to have a better data distribution among AMPs. If you don't do that, the first non-blob/clob column will be chosen and you may end up with data skews. To address this you can use createTableOptions property as the following code snippet shows:

jdbcDF.select("EmployeeID","EmployeeName", "Position").write.format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbcUrl) \
  .option("dbtable", "TE.Employees2") \
  .option("user", user) \
  .option("password", password) \
  .option("createTableOptions", "PRIMARY INDEX (EmployeeID)")\
  .save()

Refer to this page about how to connect to Teradata in Spark: Load Data from Teradata in Spark (PySpark).

References

Refer to the official page about all the options you can use when using JDBC write API:

JDBC to Other Databases

warning Not all the options are available in all Spark version. Make sure you check the documentation version that matches your Spark environment.

The following are the list of commonly used ones:

  • url - The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
  • dbtable - The JDBC table that should be read from or written into. 
  • driver - The class name of the JDBC driver to use to connect to this URL.
  • partitionColumn, lowerBound, upperBound - These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
  • numPartitions - The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
  • isolationLevel - The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of READ_UNCOMMITTED. This option applies only to writing.
  • truncate - This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
  • cascadeTruncate - This is a JDBC writer related option..
  • createTableOptions - This is a JDBC writer related option. 
  • createTableColumnTypes - The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
  • customSchema - The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
  • The JDBC table that should be read from or written into. 
  • driver - The class name of the JDBC driver to use to connect to this URL.
  • partitionColumn, lowerBound, upperBound - These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
  • numPartitions - The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
  • isolationLevel - The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of READ_UNCOMMITTED. This option applies only to writing.
  • truncate - This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
  • cascadeTruncate - This is a JDBC writer related option.
  • createTableOptions - This is a JDBC writer related option.
  • createTableColumnTypes - The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
  • customSchema - The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.

Related solutions

You can easily change the above code examples to adapt it for other databases. You need to change JDBC URL and driver class name to match with your target database.

Save DataFrame to SQL Server in PySpark

Save DataFrame to Oracle in PySpark

Save DataFrame to Teradata in PySpark

Save DataFrame to MySQL in PySpark

Save DataFrame to PostgreSQL in PySpark



info Last modified by Administrator at 4 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Want to publish your article on Kontext?

Learn more

Kontext Column

Created for everyone to publish data, programming and cloud related articles.
Follow three steps to create your columns.


Learn more arrow_forward

More from Kontext

local_offer python local_offer spark local_offer pyspark

visibility 4910
thumb_up 0
access_time 2 years ago

When running pyspark or spark-submit command in Windows to execute python scripts, you may encounter the following error: PermissionError: [WinError 5] Access is denied As it’s self-explained, permissions are not setup correctly. To resolve this issue you can try different approaches: Run ...

local_offer teradata local_offer SQL

visibility 980
thumb_up 0
access_time 2 years ago

This code snippet shows how to calculate time differences.

local_offer teradata local_offer SQL local_offer teradata-functions

visibility 153
thumb_up 0
access_time 3 months ago

Teradata LPAD function is used to add repeated characters at the beginning of a string to increase the string to a specified length. It can be used to add leading space or zeros to a string. LPAD(source_string, length, fill_string) Returns the source_string padded to the left with the ...

About column

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

*Spark logo is a registered trademark of Apache Spark.

rss_feed Subscribe RSS