Connect to PostgreSQL in Spark (PySpark)

event 2021-02-14 visibility 18,824 comment 0 insights
more_vert
insights Stats
Connect to PostgreSQL in Spark (PySpark)
Raymond Raymond Spark & PySpark

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

*Spark logo is a registered trademark of Apache Spark.


Spark is an analytics engine for big data processing. There are various ways to connect to a PostgreSQL database in Spark. This page summarizes some of common approaches to connect to PostgreSQL using Python as programming language.

Connect to PostgreSQL

Similar as Connect to SQL Server in Spark (PySpark), there are several typical ways to connect to PostgreSQL in Spark:

  • Via PostgreSQL JDBC (runs in systems that have Java runtime); py4j can be used to communicate between Python and Java processes. 
  • Via PostgreSQL ODBC (runs in systems that support ODBC)
  • Via Python packages (pure python or any supported platforms).

This article provides one example of using pure python package psycopg2 and SQLAlchemy.

PostgreSQL environment

The following PostgreSQL setup will be used in code examples:

  • Host: localhost or 127.0.0.1
  • Port: 5432
  • User: postgres
  • Password: password
  • Database: testdb

Python environment

The python environment for running cod examples is: Python 3.8.2.

These steps can also work with most of the other Python versions.

Install psycopg2

Run the following command to install Python PostgreSQL connector:

pip install psycopg2

Example output:

Collecting psycopg2
  Downloading psycopg2-2.8.6-cp38-cp38-win_amd64.whl (1.1 MB)
     |████████████████████████████████| 1.1 MB 2.2 MB/s
Installing collected packages: psycopg2
Successfully installed psycopg2-2.8.6

We also need to use SQLAlchemy so that we can directly use read_sql and to_sql functions with pandas DataFrame. Install it if it is not available:

 pip install sqlalchemy

Save pandas DataFrame to PostgreSQL

Now we can use psycopg2 package to save data to PostgreSQL database in Python scripts.

infoConnection string for SQLAlchemy with psycopg2 is like this: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]

The following code snippet constructs a pandas DataFrame in memory and then save the result to PostgreSQL database.

import psycopg2
import pandas as pd
from sqlalchemy import create_engine

# Construct the pandas dataframe
data = [{"id": 1, "value": 'ABC'},
        {"id": 2, "value": 'DEF'}]
pdf = pd.DataFrame(data)
print(pdf)
# Create SQLAlchemy engine
engine = create_engine("postgresql+psycopg2://postgres:password@localhost/testdb?client_encoding=utf8")
# Save result to the database via engine
pdf.to_sql('test_table', engine, index=False, if_exists='replace')

After the above scripts are executed, a table named test_table will be created:

testdb=# \dt
           List of relations
 Schema |    Name    | Type  |  Owner
--------+------------+-------+----------
 public | test_table | table | postgres
(1 row)

testdb=# select * from test_table;
 id | value
----+-------
  1 | ABC
  2 | DEF
(2 rows)

testdb=#

*The above results are from psql CLI tool.

Load data from PostgreSQL in Spark

Now we can use the same package to load data from PostgreSQL database in Spark. The data load part will run in Spark driver application. 

import psycopg2
import pandas as pd
from pyspark.sql import SparkSession
from sqlalchemy import create_engine

appName = "PySpark PostgreSQL Example - via psycopg2"
master = "local"

spark = SparkSession.builder.master(master).appName(appName).getOrCreate()

engine = create_engine(
    "postgresql+psycopg2://postgres:password@localhost/testdb?client_encoding=utf8")
pdf = pd.read_sql('select * from test_table', engine)

# Convert Pandas dataframe to spark DataFrame
df = spark.createDataFrame(pdf)
print(df.schema)
df.show()

The above code snippet does the following:

  • Establish a SparkSession with local master.
  • Then create a PostgreSQL connection using SQLAlchemy python package.
  • Pandas is used to create a DataFrame object using read_sql API.
  • Eventually the Pandas DataFrame is converted to a Spark DataFrame.

The following is the sample output:

StructType(List(StructField(id,LongType,true),StructField(value,StringType,true)))
+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  DEF|
+---+-----+
infoSpark DataFrame's schema is automatically inferred from Pandas DataFrame. You can also explicitly specify the schema when creating the DataFrame object. 
More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts