PySpark: Read File in Google Cloud Storage
In article Spark - Read from BigQuery Table, I provided details about how to read data from BigQuery in PySpark using Spark 3.1.1 with GCS connector 2.2.0. This article continues the journey about reading JSON file from Google Cloud Storage (GCS) directly. You can use similar APIs to read XML or other file format in GCS as data frame in Spark.
Prerequisites
Similar as the previous article, I will directly use PySpark standalone installation in Cloud Shell to test out the function. Refer to Spark - Read from BigQuery Table about installation guide and also service account permissions and credential file setup.
Cloud storage connector
Cloud storage connector is required to read from GCS in Spark.
Refer to GitHub for more details. hadoop-connectors/gcs at master ยท GoogleCloudDataproc/hadoop-connectors (github.com).
For simplicity, we will directly pass in the package as a dependency when submitting the application:
spark-submit --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0
Without this connector, an error will be thrown out:
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "gs"
Input JSON file
The input JSON file has the following content:
[{"ID":1,"ATTR1":"ABC"}, {"ID":2,"ATTR1":"DEF"}, {"ID":3,"ATTR1":"GHI"}]
File is uploaded to a GCS bucket.
Create the script file
Create a python script named pyspark-gcs.py with the following content:
from pyspark.sql import SparkSession from pyspark.sql.types import StructField, StructType, StringType, IntegerType appName = "PySpark Example - Read JSON file from GCS" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() # Setup hadoop fs configuration for schema gs:// conf = spark.sparkContext._jsc.hadoopConfiguration() conf.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") # Create a schema for the dataframe schema = StructType([ StructField('ID', IntegerType(), True), StructField('ATTR1', StringType(), True) ]) # Create data frame json_file_path = 'gs://YOUR_BUCKET/test.json' df = spark.read.json(json_file_path, schema, multiLine=True) print(df.schema) df.show()
Remember to change the bucket address accordingly.
In the above code, we also dynamically added Hadoop file system configuration properties for Google Cloud Storage. If you have a Hadoop and Spark cluster (not Dataproc), you can also directly update core-site.xml configuration file as specified by GCS connector installation documentation.
Run the script file
Use the following command to run the script:
spark-submit --packages com.google.cloud.bigdataoss:gcs-connector:hadoop3-2.2.0 pyspark-gcs.py
We use the latest GCS connector 2.2.0 (when the article is written) for Hadoop 3 to read from GCS files.
The output looks like the following:
+---+-----+ | ID|ATTR1| +---+-----+ | 1| ABC| | 2| DEF| | 3| GHI| +---+-----+
About guava library
For PySpark 3.1.1, the referenced guava library version is 14.0.1 (guava-14.0.1.jar ). However for GCS Hadoop 3 2.2.0 version, the referenced guava library version is 30.1-jre. This can cause issues like the following:
Caused by: java.lang.NoSuchMethodError: 'void com.google.common.base.Preconditions.checkArgument(boolean, java.lang.String, long, long)'
To address this issue, we need to ensure the versions are consistent and compatible.
Follow these steps to fix this issue:
- Download the version of guava used by GCS connector:
wget https://repo1.maven.org/maven2/com/google/guava/guava/30.1-jre/guava-30.1-jre.jar
- Backup existing guava library in PySpark:
mv /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars/guava-14.0.1.jar /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars/guava-14.0.1.jar.bk
- Copy the downloaded guava jar file to PySpark 3.1.1 jars folder:
cp guava-30.1-jre.jar /home/$USER/.local/lib/python3.7/site-packages/pyspark/jars
About authentication
Authentication is done via the JSON credential file which is specified by environment variable GOOGLE_APPLICATION_CREDENTIALS. You can alternatively specify it via Spark configurations.
References
Read JSON file as Spark DataFrame in Python / Spark
Use the Cloud Storage connector with Apache Spark (google.com)