PySpark - Read Data from MariaDB Database

Kontext Kontext event 2022-06-18 visibility 3,126
more_vert
PySpark - Read Data from MariaDB Database
This article provides example code to load data from MariaDB database using jdbc connector in PySpark. The same approach can be applied to other relational databases like MySQL, PostgreSQL, SQL Server, etc.

Prerequisites

PySpark environment

You can install Spark on you Windows or Linux machine by following this article: Install Spark 3.2.1 on Linux or WSL. For macOS,  follow this one: Apache Spark 3.0.1 Installation on macOS.

For testing the sample script, you can also just use PySpark package directly without doing Spark configurations:

pip install pyspark

For Anaconda environment, you can also install PySpark using the following command:

conda install pyspark

MariaDB environment

If you don't have MariaDB environment, follow Install MariaDB Server on WSL to create one.

The user and test table is created using the following SQL statements:

MariaDB [(none)]> CREATE USER 'spark'@'localhost' IDENTIFIED BY 'kontext';
Query OK, 0 rows affected (0.001 sec)

MariaDB [(none)]> GRANT SELECT ON *.* TO 'spark'@'localhost';
Query OK, 0 rows affected (0.000 sec)

MariaDB [(none)]> create database test;
Query OK, 1 row affected (0.002 sec)

MariaDB [(none)]> use test;
Database changed
MariaDB [test]> create table customers (id int, name varchar(100));
Query OK, 0 rows affected (0.370 sec)

MariaDB [test]> insert into customers values (1,'Kontext'),(2,'Seb'),(3,'Mary');
Query OK, 3 rows affected (0.054 sec)
Records: 3  Duplicates: 0  Warnings: 0

MariaDB [test]> select * from customers;
+------+---------+
| id   | name    |
+------+---------+
|    1 | Kontext |
|    2 | Seb     |
|    3 | Mary    |
+------+---------+
3 rows in set (0.001 sec)

In the following sections, we will read data from this sample table test.customers on localhost server with user spark and password 'kontext'.

When login with this user, we should be able to query the data:

$ mariadb -u spark -pkontext
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 54
Server version: 10.3.34-MariaDB-0ubuntu0.20.04.1 Ubuntu 20.04

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [(none)]> select * from test.customers;
+------+---------+
| id   | name    |
+------+---------+
|    1 | Kontext |
|    2 | Seb     |
|    3 | Mary    |
+------+---------+
3 rows in set (0.000 sec)

MariaDB JDBC driver

As we will use JDBC to connect to MariaDB, let's download the driver from the official website MariaDB Products & Tools Downloads | MariaDB. Download the right version based on your JDK version accordingly. For my WSL environment, I am using 3.0.5-GA version for Java 8+.

20220618124624-image.png

Save the downloaded jar file (mariadb-java-client-3.0.5.jar) to your PySpark project folder.

Read from MariaDB database

Now we can create a PySpark script (mariadb-example.py) to load data from Oracle database as DataFrame.

# mariadb-example.py
from pyspark.sql import SparkSession

appName = "PySpark Example - MariaDB Example"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

sql = "select * from test.customers"
database = "test"
user = "spark"
password = "kontext"
server = "localhost"
port = 3306
jdbc_url = f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme"
jdbc_driver = "org.mariadb.jdbc.Driver"

# Create a data frame by reading data from Oracle via JDBC
df = spark.read.format("jdbc") \
    .option("url", jdbc_url) \
    .option("query", sql) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", jdbc_driver) \
    .load()

df.show()
Run the script with the following command:
spark-submit --jars mariadb-java-client-3.0.5.jar mariadb-example.py 

Argument --jars will add the jar package to both driver and executor containers. 

Output:

+---+-------+
| id|   name|
+---+-------+
|  1|Kontext|
|  2|    Seb|
|  3|   Mary|
+---+-------+

About the JDBC URL

As Spark doesn't recognize 'jdbc:mariadb', we have to use JDBC URL like MySQL: f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme". This will work with the MariaDB JDBC driver even the scheme is specified as jdbc:mysql.

This is important otherwise you may encounter conversion issues like the following:

java.sql.SQLDataException: value 'id' cannot be decoded as Integer
        at org.mariadb.jdbc.plugin.codec.IntCodec.decodeTextInt(IntCodec.java:127)
        at org.mariadb.jdbc.codec.TextRowDecoder.decodeInt(TextRowDecoder.java:42)
        at org.mariadb.jdbc.codec.RowDecoder.getIntValue(RowDecoder.java:145)
        at org.mariadb.jdbc.client.result.Result.getInt(Result.java:399)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$7(JdbcUtils.scala:431)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$7$adapted(JdbcUtils.scala:430)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:367)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:349)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

You detailed message can be different but the it should follow this format: value '{column name}' cannot be decoded as {data type}. This is because MariaDB server returns column names instead of column values without the setup. For instance, the query in this example returns the following data:

+---+-------+
| id|   name|
+---+-------+
| id|   name|
| id|   name|
| id|   name|
+---+-------+

Thus the data type conversion error throw out. For more details, you can refer to this StackOverflow topic: mysql - Spark MariaDB jdbc SQL query returns column names instead of column values - Stack Overflow.

References

Save DataFrame to SQL Databases via JDBC in PySpark

Connect to MySQL in Spark (PySpark)

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts