Raymond Raymond

Connect to MySQL in Spark (PySpark)

event 2021-01-23 visibility 14,220 comment 2 insights toc
more_vert
insights Stats
Connect to MySQL in Spark (PySpark)

Spark is an analytics engine for big data processing. There are various ways to connect to a MySQL database in Spark. This page summarizes some of common approaches to connect to MySQL using Python as programming language.

Connect to MySQL

Similar as Connect to SQL Server in Spark (PySpark), there are several typical ways to connect to MySQL in Spark:

  • Via MySQL JDBC (runs in systems that have Java runtime); py4j can be used to communicate between Python and Java processes. 
  • Via MySQL ODBC (runs in systems that support ODBC)
  • Via native Python packages.

20210123223454-image.png

This article provides one example of using native python package mysql.connector.

MySQL environment

The following MySQL setup will be used in code examples:

  • Host: localhost or 127.0.0.1
  • Port: 10101 (different from default port 3306).
  • User: hive
  • Password: hive
  • Database: test_db
  • Table: test_table

Python environment

The python environment for running cod examples is: Python 3.8.2.

These steps can also work with most of the other Python versions.

Install mysql.connector

Run the following command to install Python MySQL connector:

pip install mysql-connector-python

This package depends on Python protobuf package (>=3.0.0).

Example output:

PS F:\Projects\Python> pip install mysql-connector-python
Collecting mysql-connector-python
  Downloading mysql_connector_python-8.0.23-cp38-cp38-win_amd64.whl (854 kB)
     |████████████████████████████████| 854 kB 1.6 MB/s
Collecting protobuf>=3.0.0
  Downloading protobuf-3.14.0-py2.py3-none-any.whl (173 kB)
     |████████████████████████████████| 173 kB 6.4 MB/s
Requirement already satisfied: six>=1.9 in c:\users\fahao.000\appdata\roaming\python\python38\site-packages (from protobuf>=3.0.0->mysql-connector-python) (1.14.0)
Installing collected packages: protobuf, mysql-connector-python
Successfully installed mysql-connector-python-8.0.23 protobuf-3.14.0

Code snippet

Now we can use mysql.connector package to load data from MySQL database in Spark. The data load part will run in Spark driver application. 

import mysql.connector
import pandas as pd
from pyspark.sql import SparkSession

appName = "PySpark MySQL Example - via mysql.connector"
master = "local"

spark = SparkSession.builder.master(master).appName(appName).getOrCreate()

# Establish a connection
conn = mysql.connector.connect(user='hive', database='test_db',
                               password='hive',
                               host="localhost",
                               port=10101)
cursor = conn.cursor()
query = "SELECT id, value FROM test_table"
# Create a pandas dataframe
pdf = pd.read_sql(query, con=conn)
conn.close()

# Convert Pandas dataframe to spark DataFrame
df = spark.createDataFrame(pdf)

df.show()

The above code snippet does the following:

  • Establish a SparkSession with local master.
  • Then create a MySQL connection using mysql.connector python package.
  • Pandas is used to create a DataFrame object using read_sql API.
  • Eventually the Pandas DataFrame is converted to a Spark DataFrame.

The following is the sample output:

StructType(List(StructField(id,LongType,true),StructField(value,StringType,true)))
+---+--------+
| id|   value|
+---+--------+
|  0|Record 0|
|  1|Record 1|
|  2|Record 2|
|  3|Record 3|
|  4|Record 4|
+---+--------+
infoSpark DataFrame's schema is automatically inferred from Pandas DataFrame. You can also explicitly specify the schema when creating the DataFrame object. 
More from Kontext
comment Comments
Raymond Raymond #1660 access_time 2 years ago more_vert

You need to run some tests to find out.

One is using JDBC and another using Python native driver. I dont think there be much major performance differences as the Spark read from JDBC will not run in parallel anyway. So the main performance differences will be the difference between these two drivers.

If you do want to extract data using JDBC in parallel by utilizing partition columns, you can consider Sqoop.

format_quote

person Samuel access_time 2 years ago

question for you: Is is more or less performant to use the spark only technique of:

spark_df = spark.read.format("jdbc").option("url", "jdbc:mysql://<host_string>/<database>").option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "(SELECT * FROM some_table) temp_spark_table").option("user", "theusername").option("password", "thepassword").load()


S Samuel Lawson #1659 access_time 2 years ago more_vert

question for you: Is is more or less performant to use the spark only technique of:

spark_df = spark.read.format("jdbc").option("url", "jdbc:mysql://<host_string>/<database>").option("driver", "com.mysql.cj.jdbc.Driver").option("dbtable", "(SELECT * FROM some_table) temp_spark_table").option("user", "theusername").option("password", "thepassword").load()


Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts