Save DataFrame to SQL Databases via JDBC in PySpark
In my previous article about Connect to SQL Server in Spark (PySpark), I mentioned the ways to read data from SQL Server databases as dataframe using JDBC. We can also use JDBC to write data from Spark dataframe to database tables. In the following sections, I'm going to show you how to write dataframe into SQL Server. However you can definitely extend it to other databases, for example MySQL, Oracle, Teradata, DB2, etc. as long as JDBC driver is available.
Spark write with JDBC API
We can easily use spark.DataFrame.write.format('jdbc') to write into any JDBC compatible databases. There are many options you can specify with this API. For example, you can customize the schema or specify addtional options when creating CREATE TABLE statements. Refer to References section on this page for more details.
Write mode
Write mode can be used to control write behavior. It specifies the behavior of the save operation when data already exists.
- append: Append contents of this DataFrame to existing data. For this scenario, data will be appended into existing database table.
- overwrite: Overwrite existing data. For this scenario, new tables will be created unless truncate option is used.
- ignore: Silently ignore this operation if table already exists.
- error or errorifexists (default case): Throw an exception if the table already exists.
Sample code
The following is one complete example.
from pyspark import SparkContext, SparkConf, SQLContext appName = "PySpark SQL Server Example - via JDBC" master = "local" conf = SparkConf() \ .setAppName(appName) \ .setMaster(master) \ .set("spark.driver.extraClassPath","sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar") sc = SparkContext(conf=conf) sqlContext = SQLContext(sc) spark = sqlContext.sparkSession database = "test" src_table = "dbo.Employees" user = "zeppelin" password = "zeppelin" jdbcUrl = f"jdbc:sqlserver://localhost:1433;databaseName={database}" jdbcDriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" # Create a data frame by reading data from SQL Server via JDBC jdbcDF = spark.read.format("jdbc") \ .option("url", jdbcUrl) \ .option("dbtable", "Employees") \ .option("user", user) \ .option("password", password) \ .option("driver", jdbcDriver) \ .load() jdbcDF.show() # We can also save the data frame to the database via JBBC too. jdbcDF.select("EmployeeID","EmployeeName", "Position").write.format("jdbc") \ .mode("overwrite") \ .option("url", jdbcUrl) \ .option("dbtable", "dbo.Employees2") \ .option("user", user) \ .option("password", password) \ .save()
It simply does the following work:
- Read data from SQL Server table dbo.Employees as a Spark dataframe using JDBC driver.
- Select a few columns from the table and then save this new dataframe into a new table named dbo.Employees2.
Run the script
python script-name.py
Things to pay attention to
In the sample code, I used a driver locates at 'sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar'. Make sure the JDBC driver your are using matches with your environment Java version. For mine, Java version is:
$ java -version openjdk version "1.8.0_212" OpenJDK Runtime Environment (build 1.8.0_212-8u212-b03-0ubuntu1.18.04.1-b03) OpenJDK 64-Bit Server VM (build 25.212-b03, mixed mode)
You also need to ensure driver JAR file location path is correct.
If you use spark-submit command to run the script, ensure you pass in locations of your driver JAR files via option --jars:
spark-submit --jars /path/to/sqljdbc_7.2/enu/mssql-jdbc-7.2.2.jre8.jar script-name.py
One more thing you need to ensure is the permission to create tables or write data into your tables in your database.
For my scenario, zeppelin user is mapped with db_datawriter and db_ddladmin roles for test database so that it can create tables and insert data into tables.
After I run the scripts, a new table is created in test database as the following screenshot shows:
Create table options
As mentioned earlier, sometimes it's useful to have custom CREATE TABLE options. One good example is that in Teradata, you need to specify primary index to have a better data distribution among AMPs. If you don't do that, the first non-blob/clob column will be chosen and you may end up with data skews. To address this you can use createTableOptions property as the following code snippet shows:
jdbcDF.select("EmployeeID","EmployeeName", "Position").write.format("jdbc") \ .mode("overwrite") \ .option("url", jdbcUrl) \ .option("dbtable", "TE.Employees2") \ .option("user", user) \ .option("password", password) \ .option("createTableOptions", "PRIMARY INDEX (EmployeeID)")\ .save()
Refer to this page about how to connect to Teradata in Spark: Load Data from Teradata in Spark (PySpark).
References
Refer to the official page about all the options you can use when using JDBC write API:
The following are the list of commonly used ones:
- url - The JDBC URL to connect to. The source-specific connection properties may be specified in the URL. e.g., jdbc:postgresql://localhost/test?user=fred&password=secret
- dbtable - The JDBC table that should be read from or written into.
- driver - The class name of the JDBC driver to use to connect to this URL.
- partitionColumn, lowerBound, upperBound - These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
- numPartitions - The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
- isolationLevel - The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of READ_UNCOMMITTED. This option applies only to writing.
- truncate - This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
- cascadeTruncate - This is a JDBC writer related option..
- createTableOptions - This is a JDBC writer related option.
- createTableColumnTypes - The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
- customSchema - The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading. The JDBC table that should be read from or written into.
- driver - The class name of the JDBC driver to use to connect to this URL.
- partitionColumn, lowerBound, upperBound - These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric, date, or timestamp column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
- numPartitions - The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
- isolationLevel - The transaction isolation level, which applies to current connection. It can be one of NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, or SERIALIZABLE, corresponding to standard transaction isolation levels defined by JDBC's Connection object, with default of READ_UNCOMMITTED. This option applies only to writing.
- truncate - This is a JDBC writer related option. When SaveMode.Overwrite is enabled, this option causes Spark to truncate an existing table instead of dropping and recreating it. This can be more efficient, and prevents the table metadata (e.g., indices) from being removed. However, it will not work in some cases, such as when the new data has a different schema. It defaults to false. This option applies only to writing.
- cascadeTruncate - This is a JDBC writer related option.
- createTableOptions - This is a JDBC writer related option.
- createTableColumnTypes - The database column data types to use instead of the defaults, when creating the table. Data type information should be specified in the same format as CREATE TABLE columns syntax (e.g: "name CHAR(64), comments VARCHAR(1024)"). The specified types should be valid spark sql data types. This option applies only to writing.
- customSchema - The custom schema to use for reading data from JDBC connectors. For example, "id DECIMAL(38, 0), name STRING". You can also specify partial fields, and the others use the default type mapping. For example, "id DECIMAL(38, 0)". The column names should be identical to the corresponding column names of JDBC table. Users can specify the corresponding data types of Spark SQL instead of using the defaults. This option applies only to reading.
Related solutions
You can easily change the above code examples to adapt it for other databases. You need to change JDBC URL and driver class name to match with your target database.
Save DataFrame to SQL Server in PySpark
Save DataFrame to Oracle in PySpark
Save DataFrame to Teradata in PySpark
Save DataFrame to MySQL in PySpark
Save DataFrame to PostgreSQL in PySpark