Spark - Read and Write Data with MongoDB

visibility 65 comment 0 access_time 2m languageEnglish
Spark - Read and Write Data with MongoDB

MongoDB is a NoSQL database that can be used for all kinds of workloads. MongoDB publishes connectors for Spark. We can use the connector to read data from MongoDB. This article uses Python as programming language but you can easily convert the code to Scala too.

Prerequisites

  • MongoDB instance - Refer to article Install MongoDB on WSL to learn how to install MongoDB in Linux or WSL. We will use the example collection in that article.
  • Spark - Search keyword Spark Installation on Kontext to find out tutorials to install Spark in Windows 10, WSL or macOS or Linux.

The sample collection is named users in database app with the following documents:

> db.users.find()

{ "_id" : "1", "firstName" : "Raymond", "lastName" : "Tang", "roles" : [ "user", "admin" ] }
{ "_id" : "2", "firstName" : "Kontext", "lastName" : "", "roles" : [ "admin" ] }

Download Spark connector

You can download Spark connector from MongoDB Connector for Apache Spark.

This step is optional as you can directly specify the dependency on MongoDB connector when submitting the job using spark-submit command:

$SPARK_HOME/bin/spark-shell --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1
$SPARK_HOME/bin/spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 /path/to/your/script

You can also add it into your Spark cluster libraries so that it will be available for all your applications. For this article, I prefer to add it in submit as there is a dependency chain of packages. Refer to Add JARs to a Spark Job for more details how about to reference jars in Spark applications.

Read data from MongoDB

Now let's create a PySpark scripts to read data from MongoDB.

Code snippet

from pyspark.sql import SparkSession
appName = "PySpark MongoDB Examples"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/app.users") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/app.users") \
    .getOrCreate()
# Read data from MongoDB
df = spark.read.format('mongo').load()
df.printSchema()
df.show()

I specified default URIs for read and write data. The alternative way is to specify it as options when reading or writing. For all the configuration items for mongo format, refer to Configuration Options.

Run the script with the following command line:

spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.1 .\spark-mongo-examples.py

When it runs, Spark will install dependencies automatically:

  • org.mongodb#bson;4.0.5 from central in [default]
  • org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
  • org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
  • org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]

If you want to reference these package dynamically, for example, if no network access or Maven central access in your cluster, you need to download these packages manually and include them in your project.

Output - schema:

root
 |-- _id: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- roles: array (nullable = true)
 |    |-- element: string (containsNull = true)

Output - DataFrame:

+---+---------+--------+-------------+
|_id|firstName|lastName|        roles|
+---+---------+--------+-------------+
|  1|  Raymond|    Tang|[user, admin]|
|  2|  Kontext|        |      [admin]|
+---+---------+--------+-------------+

Spark has inferred the schema automatically.

Write data into MongoDB

Now let's add a new column with constant value and then write the results into a new collection named users2.

Code snippet

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
appName = "PySpark MongoDB Examples"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/app.users") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/app.users") \
    .getOrCreate()

# Read data from MongoDB
df = spark.read.format('mongo').load()
df.printSchema()
df.show()
# Write data into MongoDB
df = df.withColumn("newCol", lit("Test new column"))
df.write.format("mongo").option('uri', 'mongodb://127.0.0.1')\
    .option('database', 'app') \
    .option('collection', 'users2') \
    .mode("overwrite") \
    .save()

Run the script and then verify in the database with mongo CLI:

use app
db.users2.find()

The output looks like the following screenshot:


References

MongoDB Connector for Spark

Spark Connector Python Guide

copyright This page is subject to Site terms.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Tags
More from Kontext