PySpark - Read Data from MariaDB Database
Prerequisites
PySpark environment
You can install Spark on you Windows or Linux machine by following this article: Install Spark 3.2.1 on Linux or WSL. For macOS, follow this one: Apache Spark 3.0.1 Installation on macOS.
For testing the sample script, you can also just use PySpark package directly without doing Spark configurations:
pip install pyspark
For Anaconda environment, you can also install PySpark using the following command:
conda install pyspark
MariaDB environment
If you don't have MariaDB environment, follow Install MariaDB Server on WSL to create one.
The user and test table is created using the following SQL statements:
MariaDB [(none)]> CREATE USER 'spark'@'localhost' IDENTIFIED BY 'kontext'; Query OK, 0 rows affected (0.001 sec) MariaDB [(none)]> GRANT SELECT ON *.* TO 'spark'@'localhost'; Query OK, 0 rows affected (0.000 sec) MariaDB [(none)]> create database test; Query OK, 1 row affected (0.002 sec) MariaDB [(none)]> use test; Database changed MariaDB [test]> create table customers (id int, name varchar(100)); Query OK, 0 rows affected (0.370 sec) MariaDB [test]> insert into customers values (1,'Kontext'),(2,'Seb'),(3,'Mary'); Query OK, 3 rows affected (0.054 sec) Records: 3 Duplicates: 0 Warnings: 0 MariaDB [test]> select * from customers; +------+---------+ | id | name | +------+---------+ | 1 | Kontext | | 2 | Seb | | 3 | Mary | +------+---------+ 3 rows in set (0.001 sec)
In the following sections, we will read data from this sample table test.customers on localhost server with user spark and password 'kontext'.
When login with this user, we should be able to query the data:
$ mariadb -u spark -pkontext Welcome to the MariaDB monitor. Commands end with ; or \g. Your MariaDB connection id is 54 Server version: 10.3.34-MariaDB-0ubuntu0.20.04.1 Ubuntu 20.04 Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. MariaDB [(none)]> select * from test.customers; +------+---------+ | id | name | +------+---------+ | 1 | Kontext | | 2 | Seb | | 3 | Mary | +------+---------+ 3 rows in set (0.000 sec)
MariaDB JDBC driver
As we will use JDBC to connect to MariaDB, let's download the driver from the official website MariaDB Products & Tools Downloads | MariaDB. Download the right version based on your JDK version accordingly. For my WSL environment, I am using 3.0.5-GA version for Java 8+.
Save the downloaded jar file (mariadb-java-client-3.0.5.jar) to your PySpark project folder.
Read from MariaDB database
Now we can create a PySpark script (mariadb-example.py) to load data from Oracle database as DataFrame.
# mariadb-example.py from pyspark.sql import SparkSession appName = "PySpark Example - MariaDB Example" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() sql = "select * from test.customers" database = "test" user = "spark" password = "kontext" server = "localhost" port = 3306 jdbc_url = f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme" jdbc_driver = "org.mariadb.jdbc.Driver" # Create a data frame by reading data from Oracle via JDBC df = spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("query", sql) \ .option("user", user) \ .option("password", password) \ .option("driver", jdbc_driver) \ .load() df.show()
spark-submit --jars mariadb-java-client-3.0.5.jar mariadb-example.py
Argument --jars
will add the jar package to both driver and executor containers.
Output:
+---+-------+ | id| name| +---+-------+ | 1|Kontext| | 2| Seb| | 3| Mary| +---+-------+
About the JDBC URL
As Spark doesn't recognize 'jdbc:mariadb', we have to use JDBC URL like MySQL: f"jdbc:mysql://{server}:{port}/{database}?permitMysqlScheme". This will work with the MariaDB JDBC driver even the scheme is specified as jdbc:mysql.
This is important otherwise you may encounter conversion issues like the following:
java.sql.SQLDataException: value 'id' cannot be decoded as Integer
at org.mariadb.jdbc.plugin.codec.IntCodec.decodeTextInt(IntCodec.java:127)
at org.mariadb.jdbc.codec.TextRowDecoder.decodeInt(TextRowDecoder.java:42)
at org.mariadb.jdbc.codec.RowDecoder.getIntValue(RowDecoder.java:145)
at org.mariadb.jdbc.client.result.Result.getInt(Result.java:399)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$7(JdbcUtils.scala:431)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$makeGetter$7$adapted(JdbcUtils.scala:430)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:367)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:349)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
You detailed message can be different but the it should follow this format: value '{column name}' cannot be decoded as {data type}. This is because MariaDB server returns column names instead of column values without the setup. For instance, the query in this example returns the following data:
+---+-------+ | id| name| +---+-------+ | id| name| | id| name| | id| name| +---+-------+
Thus the data type conversion error throw out. For more details, you can refer to this StackOverflow topic: mysql - Spark MariaDB jdbc SQL query returns column names instead of column values - Stack Overflow.
References
Save DataFrame to SQL Databases via JDBC in PySpark
Connect to MySQL in Spark (PySpark)