Spark - Read and Write Data with MongoDB
insights Stats
Apache Spark installation guides, performance tuning tips, general tutorials, etc.
*Spark logo is a registered trademark of Apache Spark.
MongoDB is a NoSQL database that can be used for all kinds of workloads. MongoDB publishes connectors for Spark. We can use the connector to read data from MongoDB. This article uses Python as programming language but you can easily convert the code to Scala too.
Prerequisites
- MongoDB instance - Refer to article Install MongoDB on WSL to learn how to install MongoDB in Linux or WSL. We will use the example collection in that article.
- Spark - Search keyword Spark Installation on Kontext to find out tutorials to install Spark in Windows 10, WSL or macOS or Linux.
The sample collection is named users in database app with the following documents:
> db.users.find()
{ "_id" : "1", "firstName" : "Raymond", "lastName" : "Tang", "roles" : [ "user", "admin" ] }
{ "_id" : "2", "firstName" : "Kontext", "lastName" : "", "roles" : [ "admin" ] }
Download Spark connector
You can download Spark connector from MongoDB Connector for Apache Spark.
This step is optional as you can directly specify the dependency on MongoDB connector when submitting the job using spark-submit command:
$SPARK_HOME/bin/spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 $SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 /path/to/your/script
You can also add it into your Spark cluster libraries so that it will be available for all your applications. For this article, I prefer to add it in submit as there is a dependency chain of packages. Refer to Add JARs to a Spark Job for more details how about to reference jars in Spark applications.
Read data from MongoDB
Now let's create a PySpark scripts to read data from MongoDB.
Code snippet
from pyspark.sql import SparkSession
appName = "PySpark MongoDB Examples"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/app.users") \
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/app.users") \
.getOrCreate()
# Read data from MongoDB
df = spark.read.format('mongo').load()
df.printSchema()
df.show()
I specified default URIs for read and write data. The alternative way is to specify it as options when reading or writing. For all the configuration items for mongo format, refer to Configuration Options.
Run the script with the following command line:
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 .\spark-mongo-examples.py
When it runs, Spark will install dependencies automatically:
- org.mongodb#bson;4.0.5 from central in [default]
- org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
- org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
- org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
If you want to reference these package dynamically, for example, if no network access or Maven central access in your cluster, you need to download these packages manually and include them in your project.
Output - schema:
root |-- _id: string (nullable = true) |-- firstName: string (nullable = true) |-- lastName: string (nullable = true) |-- roles: array (nullable = true) | |-- element: string (containsNull = true)
Output - DataFrame:
+---+---------+--------+-------------+ |_id|firstName|lastName| roles| +---+---------+--------+-------------+ | 1| Raymond| Tang|[user, admin]| | 2| Kontext| | [admin]| +---+---------+--------+-------------+
Spark has inferred the schema automatically.
Write data into MongoDB
Now let's add a new column with constant value and then write the results into a new collection named users2.
Code snippet
from pyspark.sql import SparkSession from pyspark.sql.functions import lit appName = "PySpark MongoDB Examples" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/app.users") \ .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/app.users") \ .getOrCreate() # Read data from MongoDB df = spark.read.format('mongo').load() df.printSchema() df.show() # Write data into MongoDB df = df.withColumn("newCol", lit("Test new column")) df.write.format("mongo").option('uri', 'mongodb://127.0.0.1')\ .option('database', 'app') \ .option('collection', 'users2') \ .mode("overwrite") \ .save()
Run the script and then verify in the database with mongo CLI:
use app db.users2.find()
The output looks like the following screenshot:
References