In my previous article about Connect to SQL Server in Spark (PySpark), I mentioned the ways to read data from SQL Server databases as dataframe using JDBC. We can also use JDBC to write data from Spark dataframe to database tables. In the following sections, I'm going to show you how to write dataframe into SQL Server. However you can definitely extend it to other databases, for example MySQL, Oracle, Teradata, DB2, etc. as long as JDBC driver is available.

Spark write with JDBC API

We can easily use spark.DataFrame.write.format('jdbc') to write into any JDBC compatible databases. There are many options you can specify with this API. For example, you can customize the schema or specify addtional options when creating CREATE TABLE statements. Refer to References section on this page for more details. 

Write mode

Write mode can be used to control write behavior. It specifies the behavior of the save operation when data already exists.

  • append: Append contents of this DataFrame to existing data. For this scenario, data will be appended into existing database table. 
  • overwrite: Overwrite existing data. For this scenario, new tables will be created unless truncate option is used. 
  • ignore: Silently ignore this operation if table already exists.
  • error or errorifexists (default case): Throw an exception if the table already exists.

Sample code

The following is one complete example.

from pyspark import SparkContext, SparkConf, SQLContext

appName = "PySpark SQL Server Example - via JDBC"
master = "local"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
    .set("spark.driver.extraClassPath","sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession

database = "test"
src_table = "dbo.Employees"
user = "zeppelin"
password  = "zeppelin"


jdbcUrl = f"jdbc:sqlserver://localhost:1433;databaseName={database}"
jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
# Create a data frame by reading data from SQL Server via JDBC
jdbcDF = spark.read.format("jdbc") \
    .option("url", jdbcUrl) \
    .option("dbtable", "Employees") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbcDriver) \
    .load()

jdbcDF.show()

# We can also save the data frame to the database via JBBC too.

jdbcDF.select("EmployeeID","EmployeeName", "Position").write.format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbcUrl) \
  .option("dbtable", "dbo.Employees2") \
  .option("user", user) \
  .option("password", password) \
  .save()

It simply does the following work:

  • Read data from SQL Server table dbo.Employees as a Spark dataframe using JDBC driver.
  • Select a few columns from the table and then save this new dataframe into a new table named dbo.Employees2.

Run the script

python script-name.py

Things to pay attention to

In the sample code, I used a driver locates at 'sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar'. Make sure the JDBC driver your are using matches with your environment Java version. For mine, Java version is:

$ java -version
openjdk version "1.8.0_212"
OpenJDK Runtime Environment (build 1.8.0_212-8u212-b03-0ubuntu1.18.04.1-b03)
OpenJDK 64-Bit Server VM (build 25.212-b03, mixed mode)

You also need to ensure driver JAR file location path is correct.

If you use spark-submit command to run the script, ensure you pass in locations of your driver JAR files via option --jars:

spark-submit --jars /path/to/sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar  script-name.py

One more thing you need to ensure is the permission to create tables or write data into your tables in your database.

For my scenario, zeppelin user is mapped with db_datawriter and db_ddladmin roles for test database so that it can create tables and insert data into tables.

After I run the scripts, a new table is created in test database as the following screenshot shows:

Create table options

As mentioned earlier, sometimes it's useful to have custom CREATE TABLE options. One good example is that in Teradata, you need to specify primary index to have a better data distribution among AMPs. If you don't do that, the first non-blob/clob column will be chosen and you may end up with data skews. To address this you can use createTableOptions property as the following code snippet shows:

jdbcDF.select("EmployeeID","EmployeeName", "Position").write.format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbcUrl) \
  .option("dbtable", "TE.Employees2") \
  .option("user", user) \
  .option("password", password) \
  .option("createTableOptions", "PRIMARY INDEX (EmployeeID)")\
  .save()

Refer to this page about how to connect to Teradata in Spark: Load Data from Teradata in Spark (PySpark).

References

Refer to the official page about all the options you can use when using JDBC write API:

JDBC to Other Databases

warning Not all the options are available in all Spark version. Make sure you check the documentation version that matches your Spark environment.

The following are the list of commonly used ones:

  • url - The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
  • dbtable - The JDBC table that should be read from or written into. 
  • driver - The class name of the JDBC driver to use to connect to this URL.
  • partitionColumn, lowerBound, upperBound - These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
  • numPartitions - The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
  • isolationLevel - The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of READ_UNCOMMITTED. This option applies only to writing.
  • truncate - This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
  • cascadeTruncate - This is a JDBC writer related option..
  • createTableOptions - This is a JDBC writer related option. 
  • createTableColumnTypes - The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
  • customSchema - The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
  • The JDBC table that should be read from or written into. 
  • driver - The class name of the JDBC driver to use to connect to this URL.
  • partitionColumn, lowerBound, upperBound - These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
  • numPartitions - The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
  • isolationLevel - The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of READ_UNCOMMITTED. This option applies only to writing.
  • truncate - This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
  • cascadeTruncate - This is a JDBC writer related option.
  • createTableOptions - This is a JDBC writer related option.
  • createTableColumnTypes - The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
  • customSchema - The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.

Related solutions

You can easily change the above code examples to adapt it for other databases. You need to change JDBC URL and driver class name to match with your target database.

Save DataFrame to SQL Server in PySpark

Save DataFrame to Oracle in PySpark

Save DataFrame to Teradata in PySpark

Save DataFrame to MySQL in PySpark

Save DataFrame to PostgreSQL in PySpark



* This page is subject to Site terms.

More from Kontext

local_offer teradata local_offer python

visibility 560
thumb_up 1
access_time 3 months ago

Pandas is commonly used by Python users to perform data operations. In many scenarios, the results need to be saved to a storage like Teradata. This article shows you how to do that easily using JayDeBeApi or  ...

open_in_new Spark + PySpark

local_offer teradata local_offer SQL

visibility 52
thumb_up 0
access_time 3 months ago

In SQL Server, we can use TRUNCATE statement to clear all the records in a table and it usually performs better compared with DELETE statements as no transaction log for each individual row deletion. The syntax looks like the following: TRUNCATE TABLE { database_name.schema_name.tab...

open_in_new Code snippets

local_offer teradata local_offer python local_offer Java

visibility 309
thumb_up 0
access_time 3 months ago

Python JayDeBeApi module allows you to connect from Python to Teradata databases using Java JDBC drivers. In article Connect to Teradata database through Python , I showed ho...

open_in_new Python Programming

Azure SQL Database Automated Backup Strategy

local_offer Azure local_offer SQL Server

visibility 76
thumb_up 0
access_time 3 months ago

When designing the architecture of Kontext platform, Azure SQL Database is chosen as the storage for relational data. TDE and other advanced security features are always enabled to protect the database. Backup plans are also employed to ensure I can always restore the database for as point of tim...

open_in_new Azure

info About author

comment Comments (0)

comment Add comment

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

No comments yet.

Dark theme mode

Dark theme mode is available on Kontext.

Learn more arrow_forward

Kontext Column

Created for everyone to publish data, programming and cloud related articles. Follow three steps to create your columns.


Learn more arrow_forward