Spark - Read from BigQuery Table
access_time 2 months ago languageEnglish
more_vert

Spark - Read from BigQuery Table

visibility 40 comment 0

On Google Cloud, Dataproc can be used to spin up cluster with Spark and other Apache big data frameworks.  It is a fully managed scalable service that can be used to perform different kinds of data processing and transformations. Dataproc also has connectors to connect to different data storages on Google Cloud. 

This article provides details to read data from BigQuery.

Prerequisites

To read data from BigQuery, please ensure you've setup service account and credential environment variables properly. Refer to article Python: Read Data from BigQuery for more details.

PySpark in Cloud Shell

For simplicity, I will directly use local PySpark in Cloud Shell. Follow these steps to setup:

  1. Open Cloud Shell via Cloud Console.
  2. Run the following command to install pyspark package:
    pip3 install pyspark   
  3. Run the following command to ensure PySpark is installed successfully:
    pyspark  
    You should be able to see the following output in terminal:

Read from BigQuery in Spark

About spark-bigquery package

To read from BigQuery, we need to use one Java library: spark-bigquery. It is available in a public GCS bucket:

gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar

As we will run the script separately in a standalone Spark installation, we will include the package reference using --packages argument of spark-submit command:

spark-submit --packages com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.1

Create the script file

Create a script file named pyspark-bq.py in your home folder of the Cloud Shell VM. 

The file content looks like the following:

#!/usr/bin/python
"""PySpark example - Read from BigQuery"""
from pyspark.sql import SparkSession

# Use local master
spark = SparkSession \
    .builder \
    .master('local') \
    .appName('spark-read-from-bigquery') \
    .getOrCreate()

# Load data from BigQuery.
df = spark.read.format('bigquery') \
    .option('project','YOUR_PROJECT_ID') \
    .option('table','test.dim_customer') \
    .load()

print(df.schema)

df.show()

Remember to replace the project ID, dataset and table name accordingly.

Run the script

Run the script using the following command:

spark-submit --packages com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.1 pyspark-bq.py 

Output:

+--------------------+--------------+-------+---------+----------+----------+
|          CustomerID|CustomerNumber|   Name|IsCurrent| StartDate|   EndDate|
+--------------------+--------------+-------+---------+----------+----------+
|18249e80-2692-472...|           001|Raymond|     true|2021-01-01|9999-12-31|
|937bfddc-e381-420...|           002|  Jason|     true|2021-02-01|9999-12-31|
+--------------------+--------------+-------+---------+----------+----------+

About BigQuery streaming storage API error

When running the sample script, you may encounter the following error:

com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.InvalidArgumentException: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid resource field value in the request.

...
uppressed: com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed
                at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57)
                at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112)
                at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.BigQueryReadClient.createReadSession(BigQueryReadClient.java:230)
                at com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation.buildScan(DirectBigQueryRelation.scala:142)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:332)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:365)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:442)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:364)
                at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:332)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
                at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
                at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
                at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
                at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78)
                at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
                at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
                at scala.collection.Iterator.foreach(Iterator.scala:941)
                at scala.collection.Iterator.foreach$(Iterator.scala:941)
                at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
                at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
                at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
                at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75)
                at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
                at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
                at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
                at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67)
                at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:391)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:104)
                at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
                at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
                at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
                at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:104)
                at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:97)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:117)
                at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143)
                at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
                at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143)
                at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:117)
                at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110)
                at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:161)
                at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115)
                at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:161)
                at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:206)
                at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
                at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
                at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
                at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
                at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
                at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
                at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
                at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
                at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
                at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
                at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
                at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.base/java.lang.reflect.Method.invoke(Method.java:566)
                at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                at py4j.Gateway.invoke(Gateway.java:282)
                at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                at py4j.commands.CallCommand.execute(CallCommand.java:79)
                at py4j.GatewayConnection.run(GatewayConnection.java:238)
                ... 1 more
Caused by: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid resource field value in the request.
        at com.google.cloud.spark.bigquery.repackaged.io.grpc.Status.asRuntimeException(Status.java:533)
        ... 15 more

Run the following command to enable BigQuery streaming API:

gcloud services enable bigquerystorage.googleapis.com --project YOUR_PROJECT_ID

The API needs to be enabled. 

I encountered this error because my service account is not linked to a project for billing. Thus, to fix it, we can simply add one extra line to specify the parent project:

df = spark.read.format('bigquery') \
    .option('project','YOUR_PROJECT_ID') \
    .option('parentProject','YOUR_PROJECT_ID') \
    .option('table','test.dim_customer') \
    .load()

The details about these two project options are list below:

projectThe Google Cloud Project ID of the table.
(Optional. Defaults to the project of the Service Account being used)
Read/Write
parentProjectThe Google Cloud Project ID of the table to bill for the export.
(Optional. Defaults to the project of the Service Account being used)
Read/Write

Refer to GitHub for more details: GoogleCloudDataproc/spark-bigquery-connector: BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables. (github.com).

About Java version issue

When running the code with Spark 3.1.1 with Java 11, I also encountered the following error:

Java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available

I downgraded to use JDK 1.8 and the issue is resolved:

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/

References

Refer to the following documentation to connect to BigQuery using Dataproc:

Use the BigQuery connector with Spark  |  Dataproc Documentation (google.com)

info Last modified by Raymond 2 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Follow Kontext

Get our latest updates on LinkedIn or Twitter.

Want to contribute on Kontext to help others?

Learn more