Connect to PostgreSQL in Spark (PySpark)
Spark is an analytics engine for big data processing. There are various ways to connect to a PostgreSQL database in Spark. This page summarizes some of common approaches to connect to PostgreSQL using Python as programming language.
Connect to PostgreSQL
Similar as Connect to SQL Server in Spark (PySpark), there are several typical ways to connect to PostgreSQL in Spark:
- Via PostgreSQL JDBC (runs in systems that have Java runtime); py4j can be used to communicate between Python and Java processes.
- Via PostgreSQL ODBC (runs in systems that support ODBC)
- Via Python packages (pure python or any supported platforms).
This article provides one example of using pure python package psycopg2 and SQLAlchemy.
PostgreSQL environment
The following PostgreSQL setup will be used in code examples:
- Host: localhost or 127.0.0.1
- Port: 5432
- User: postgres
- Password: password
- Database: testdb
Python environment
The python environment for running cod examples is: Python 3.8.2.
These steps can also work with most of the other Python versions.
Install psycopg2
Run the following command to install Python PostgreSQL connector:
pip install psycopg2
Example output:
Collecting psycopg2 Downloading psycopg2-2.8.6-cp38-cp38-win_amd64.whl (1.1 MB) |████████████████████████████████| 1.1 MB 2.2 MB/s Installing collected packages: psycopg2 Successfully installed psycopg2-2.8.6
We also need to use SQLAlchemy so that we can directly use read_sql and to_sql functions with pandas DataFrame. Install it if it is not available:
pip install sqlalchemy
Save pandas DataFrame to PostgreSQL
Now we can use psycopg2 package to save data to PostgreSQL database in Python scripts.
The following code snippet constructs a pandas DataFrame in memory and then save the result to PostgreSQL database.
import psycopg2 import pandas as pd from sqlalchemy import create_engine # Construct the pandas dataframe data = [{"id": 1, "value": 'ABC'}, {"id": 2, "value": 'DEF'}] pdf = pd.DataFrame(data) print(pdf) # Create SQLAlchemy engine engine = create_engine("postgresql+psycopg2://postgres:password@localhost/testdb?client_encoding=utf8") # Save result to the database via engine pdf.to_sql('test_table', engine, index=False, if_exists='replace')
After the above scripts are executed, a table named test_table will be created:
testdb=# \dt List of relations Schema | Name | Type | Owner --------+------------+-------+---------- public | test_table | table | postgres (1 row) testdb=# select * from test_table; id | value ----+------- 1 | ABC 2 | DEF (2 rows) testdb=#
*The above results are from psql CLI tool.
Load data from PostgreSQL in Spark
Now we can use the same package to load data from PostgreSQL database in Spark. The data load part will run in Spark driver application.
import psycopg2 import pandas as pd from pyspark.sql import SparkSession from sqlalchemy import create_engine appName = "PySpark PostgreSQL Example - via psycopg2" master = "local" spark = SparkSession.builder.master(master).appName(appName).getOrCreate() engine = create_engine( "postgresql+psycopg2://postgres:password@localhost/testdb?client_encoding=utf8") pdf = pd.read_sql('select * from test_table', engine) # Convert Pandas dataframe to spark DataFrame df = spark.createDataFrame(pdf) print(df.schema) df.show()
The above code snippet does the following:
- Establish a SparkSession with local master.
- Then create a PostgreSQL connection using SQLAlchemy python package.
- Pandas is used to create a DataFrame object using read_sql API.
- Eventually the Pandas DataFrame is converted to a Spark DataFrame.
The following is the sample output:
StructType(List(StructField(id,LongType,true),StructField(value,StringType,true))) +---+-----+ | id|value| +---+-----+ | 1| ABC| | 2| DEF| +---+-----+