Spark - Read from BigQuery Table

Raymond Raymond event 2021-03-21 visibility 5,660
more_vert
Spark - Read from BigQuery Table

On Google Cloud, Dataproc can be used to spin up cluster with Spark and other Apache big data frameworks.  It is a fully managed scalable service that can be used to perform different kinds of data processing and transformations. Dataproc also has connectors to connect to different data storages on Google Cloud. 

This article provides details to read data from BigQuery.

Prerequisites

To read data from BigQuery, please ensure you've setup service account and credential environment variables properly. Refer to article Python: Read Data from BigQuery for more details.

PySpark in Cloud Shell

For simplicity, I will directly use local PySpark in Cloud Shell. Follow these steps to setup:

  1. Open Cloud Shell via Cloud Console.
  2. Run the following command to install pyspark package:
    pip3 install pyspark   
  3. Run the following command to ensure PySpark is installed successfully:
    pyspark  
    You should be able to see the following output in terminal:
    20210320225145-image.png

Read from BigQuery in Spark

About spark-bigquery package

To read from BigQuery, we need to use one Java library: spark-bigquery. It is available in a public GCS bucket:

gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

As we will run the script separately in a standalone Spark installation, we will include the package reference using --packages argument of spark-submit command:

spark-submit --packages com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.1

Create the script file

Create a script file named pyspark-bq.py in your home folder of the Cloud Shell VM. 

The file content looks like the following:

#!/usr/bin/python
"""PySpark example - Read from BigQuery"""
from pyspark.sql import SparkSession

# Use local master
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('spark-read-from-bigquery') \
    .getOrCreate()

# Load data from BigQuery.
df = spark.read.format('bigquery') \
    .option('project','YOUR_PROJECT_ID') \
    .option('table','test.dim_customer') \
    .load()

print(df.schema)

df.show()

Remember to replace the project ID, dataset and table name accordingly.

Run the script

Run the script using the following command:

spark-submit --packages com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.1 pyspark-bq.py 

Output:

+--------------------+--------------+-------+---------+----------+----------+
|          CustomerID|CustomerNumber|   Name|IsCurrent| StartDate|   EndDate|
+--------------------+--------------+-------+---------+----------+----------+
|18249e80-2692-472...|           001|Raymond|     true|2021-01-01|9999-12-31|
|937bfddc-e381-420...|           002|  Jason|     true|2021-02-01|9999-12-31|
+--------------------+--------------+-------+---------+----------+----------+

About BigQuery streaming storage API error

When running the sample script, you may encounter the following error:

com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.InvalidArgumentException: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid resource field value in the request.

...
uppressed: com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
                at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
                at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
                at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.BigQueryReadClient.createReadSession(BigQueryReadClient.java:230)
                at com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation.buildScan(DirectBigQueryRelation.scala:142)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:332)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:365)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:442)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:364)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:332)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
                at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
                at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
                at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
                at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
                at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
                at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
                at scala.collection.Iterator.foreach(Iterator.scala:941)
                at scala.collection.Iterator.foreach$(Iterator.scala:941)
                at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
                at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
                at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
                at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
                at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
                at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
                at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
                at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:391)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:104)
                at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
                at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
                at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
                at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:104)
                at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:97)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:117)
                at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
                at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
                at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
                at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:117)
                at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:161)
                at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115)
                at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:161)
                at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:206)
                at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
                at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
                at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
                at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
                at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
                at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
                at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
                at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
                at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
                at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
                at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
                at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.base/java.lang.reflect.Method.invoke(Method.java:566)
                at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                at py4j.Gateway.invoke(Gateway.java:282)
                at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                at py4j.commands.CallCommand.execute(CallCommand.java:79)
                at py4j.GatewayConnection.run(GatewayConnection.java:238)
                ... 1 more
Caused by: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid resource field value in the request.
        at com.google.cloud.spark.bigquery.repackaged.io.grpc.Status.asRuntimeException(Status.java:533)
        ... 15 more

Run the following command to enable BigQuery streaming API:

gcloud services enable bigquerystorage.googleapis.com --project YOUR_PROJECT_ID

2021032110823-image.png

The API needs to be enabled. 

I encountered this error because my service account is not linked to a project for billing. Thus, to fix it, we can simply add one extra line to specify the parent project:

df = spark.read.format('bigquery') \
    .option('project','YOUR_PROJECT_ID') \
    .option('parentProject','YOUR_PROJECT_ID') \
    .option('table','test.dim_customer') \
    .load()

The details about these two project options are list below:

projectThe Google Cloud Project ID of the table.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
parentProjectThe Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used)
Read/Write

Refer to GitHub for more details: GoogleCloudDataproc/spark-bigquery-connector: BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables. (github.com).

About Java version issue

When running the code with Spark 3.1.1 with Java 11, I also encountered the following error:

Java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available

I downgraded to use JDK 1.8 and the issue is resolved:

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/

References

Refer to the following documentation to connect to BigQuery using Dataproc:

Use the BigQuery connector with Spark  |  Dataproc Documentation (google.com)

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts