PySpark: Read File in Google Cloud Storage

Raymond Raymond event 2021-03-21 visibility 13,196
more_vert
PySpark: Read File in Google Cloud Storage

In article Spark - Read from BigQuery Table, I provided details about how to read data from BigQuery in PySpark using Spark 3.1.1 with GCS connector 2.2.0. This article continues the journey about reading JSON file from Google Cloud Storage (GCS) directly. You can use similar APIs to read XML or other file format in GCS as data frame in Spark.

Prerequisites

Similar as the previous article, I will directly use PySpark standalone installation in Cloud Shell to test out the function. Refer to Spark - Read from BigQuery Table about installation guide and also service account permissions and credential file setup.

Cloud storage connector

Cloud storage connector is required to read from GCS in Spark. 

Refer to GitHub for more details. hadoop-connectors/gcs at master ยท GoogleCloudDataproc/hadoop-connectors (github.com).

For simplicity, we will directly pass in the package as a dependency when submitting the application:

spark-submit --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0

Without this connector, an error will be thrown out:

org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"

Input JSON file

The input JSON file has the following content:

[{"ID":1,"ATTR1":"ABC"},
{"ID":2,"ATTR1":"DEF"},
{"ID":3,"ATTR1":"GHI"}]

File is uploaded to a GCS bucket. 

Create the script file

Create a python script named pyspark-gcs.py with the following content:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

appName = "PySpark Example - Read JSON file from GCS"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# Setup hadoop fs configuration for schema gs://
conf = spark.sparkContext._jsc.hadoopConfiguration()
conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")

# Create a schema for the dataframe
schema = StructType([
    StructField('ID', IntegerType(), True),
    StructField('ATTR1', StringType(), True)
])

# Create data frame
json_file_path = 'gs://YOUR_BUCKET/test.json'
df = spark.read.json(json_file_path, schema, multiLine=True)
print(df.schema)
df.show()

Remember to change the bucket address accordingly.

In the above code, we also dynamically added Hadoop file system configuration properties for Google Cloud Storage.  If you have a Hadoop and Spark cluster (not Dataproc), you can also directly update core-site.xml configuration file as specified by GCS connector installation documentation.

Run the script file

Use the following command to run the script:

spark-submit --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0 pyspark-gcs.py

We use the latest GCS connector 2.2.0 (when the article is written) for Hadoop 3 to read from GCS files.

The output looks like the following:

+---+-----+
| ID|ATTR1|
+---+-----+
|  1|  ABC|
|  2|  DEF|
|  3|  GHI|
+---+-----+

About guava library

For PySpark 3.1.1, the referenced guava library version is 14.0.1 (guava-14.0.1.jar ). However for GCS Hadoop 3 2.2.0 version, the referenced guava library version is 30.1-jre. This can cause issues like the following:

Caused by: java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, long, long)'

To address this issue, we need to ensure the versions are consistent and compatible.

Follow these steps to fix this issue:

  1. Download the version of guava used by GCS connector:
    wget https://repo1.maven.org/maven2/com/google/guava/guava/30.1-jre/guava-30.1-jre.jar
  2. Backup existing guava library in PySpark:
    mv /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars/guava-14.0.1.jar /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars/guava-14.0.1.jar.bk
  3. Copy the downloaded guava jar file to PySpark 3.1.1 jars folder:
    cp guava-30.1-jre.jar /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars
infoMy PySpark is installed into /home/$USER/.local folder by default in Cloud Shell.  Please change the paths accordingly if that is not the case for your environment.

About authentication

Authentication is done via the JSON credential file which is specified by environment variable GOOGLE_APPLICATION_CREDENTIALS. You can alternatively specify it via Spark configurations.

References

Read JSON file as Spark DataFrame in Python / Spark

Use the Cloud Storage connector with Apache Spark (google.com)

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts