Connect to PostgreSQL in Spark (PySpark)
access_time 6 months ago languageEnglish
more_vert

Connect to PostgreSQL in Spark (PySpark)

visibility 1,569 comment 0

Spark is an analytics engine for big data processing. There are various ways to connect to a PostgreSQL database in Spark. This page summarizes some of common approaches to connect to PostgreSQL using Python as programming language.

Connect to PostgreSQL

Similar as Connect to SQL Server in Spark (PySpark), there are several typical ways to connect to PostgreSQL in Spark:

  • Via PostgreSQL JDBC (runs in systems that have Java runtime); py4j can be used to communicate between Python and Java processes. 
  • Via PostgreSQL ODBC (runs in systems that support ODBC)
  • Via Python packages (pure python or any supported platforms).

This article provides one example of using pure python package psycopg2 and SQLAlchemy.

PostgreSQL environment

The following PostgreSQL setup will be used in code examples:

  • Host: localhost or 127.0.0.1
  • Port: 5432
  • User: postgres
  • Password: password
  • Database: testdb

Python environment

The python environment for running cod examples is: Python 3.8.2.

These steps can also work with most of the other Python versions.

Install psycopg2

Run the following command to install Python PostgreSQL connector:

pip install psycopg2

Example output:

Collecting psycopg2
  Downloading psycopg2-2.8.6-cp38-cp38-win_amd64.whl (1.1 MB)
     |████████████████████████████████| 1.1 MB 2.2 MB/s
Installing collected packages: psycopg2
Successfully installed psycopg2-2.8.6

We also need to use SQLAlchemy so that we can directly use read_sql and to_sql functions with pandas DataFrame. Install it if it is not available:

 pip install sqlalchemy

Save pandas DataFrame to PostgreSQL

Now we can use psycopg2 package to save data to PostgreSQL database in Python scripts.

infoConnection string for SQLAlchemy with psycopg2 is like this: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]

The following code snippet constructs a pandas DataFrame in memory and then save the result to PostgreSQL database.

import psycopg2
import pandas as pd
from sqlalchemy import create_engine

# Construct the pandas dataframe
data = [{"id": 1, "value": 'ABC'},
        {"id": 2, "value": 'DEF'}]
pdf = pd.DataFrame(data)
print(pdf)
# Create SQLAlchemy engine
engine = create_engine("postgresql+psycopg2://postgres:password@localhost/testdb?client_encoding=utf8")
# Save result to the database via engine
pdf.to_sql('test_table', engine, index=False, if_exists='replace')

After the above scripts are executed, a table named test_table will be created:

testdb=# \dt
           List of relations
 Schema |    Name    | Type  |  Owner
--------+------------+-------+----------
 public | test_table | table | postgres
(1 row)

testdb=# select * from test_table;
 id | value
----+-------
  1 | ABC
  2 | DEF
(2 rows)

testdb=#

*The above results are from psql CLI tool.

Load data from PostgreSQL in Spark

Now we can use the same package to load data from PostgreSQL database in Spark. The data load part will run in Spark driver application. 

import psycopg2
import pandas as pd
from pyspark.sql import SparkSession
from sqlalchemy import create_engine

appName = "PySpark PostgreSQL Example - via psycopg2"
master = "local"

spark = SparkSession.builder.master(master).appName(appName).getOrCreate()

engine = create_engine(
    "postgresql+psycopg2://postgres:password@localhost/testdb?client_encoding=utf8")
pdf = pd.read_sql('select * from test_table', engine)

# Convert Pandas dataframe to spark DataFrame
df = spark.createDataFrame(pdf)
print(df.schema)
df.show()

The above code snippet does the following:

  • Establish a SparkSession with local master.
  • Then create a PostgreSQL connection using SQLAlchemy python package.
  • Pandas is used to create a DataFrame object using read_sql API.
  • Eventually the Pandas DataFrame is converted to a Spark DataFrame.

The following is the sample output:

StructType(List(StructField(id,LongType,true),StructField(value,StringType,true)))
+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  DEF|
+---+-----+
infoSpark DataFrame's schema is automatically inferred from Pandas DataFrame. You can also explicitly specify the schema when creating the DataFrame object. 
info Last modified by Raymond 6 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Follow Kontext

Get our latest updates on LinkedIn.

Want to contribute on Kontext to help others?

Learn more

More from Kontext

Connect to MySQL in Spark (PySpark)
visibility 1158
thumb_up 0
access_time 7 months ago
visibility 11784
thumb_up 1
access_time 3 years ago