Spark - Read from BigQuery Table
On Google Cloud, Dataproc can be used to spin up cluster with Spark and other Apache big data frameworks. It is a fully managed scalable service that can be used to perform different kinds of data processing and transformations. Dataproc also has connectors to connect to different data storages on Google Cloud.
This article provides details to read data from BigQuery.
Prerequisites
To read data from BigQuery, please ensure you've setup service account and credential environment variables properly. Refer to article Python: Read Data from BigQuery for more details.
PySpark in Cloud Shell
For simplicity, I will directly use local PySpark in Cloud Shell. Follow these steps to setup:
- Open Cloud Shell via Cloud Console.
- Run the following command to install pyspark package:
pip3 install pyspark
- Run the following command to ensure PySpark is installed successfully:
pyspark
You should be able to see the following output in terminal:
Read from BigQuery in Spark
About spark-bigquery package
To read from BigQuery, we need to use one Java library: spark-bigquery. It is available in a public GCS bucket:
gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
As we will run the script separately in a standalone Spark installation, we will include the package reference using --packages argument of spark-submit command:
spark-submit --packages com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.1
Create the script file
Create a script file named pyspark-bq.py in your home folder of the Cloud Shell VM.
The file content looks like the following:
#!/usr/bin/python """PySpark example - Read from BigQuery""" from pyspark.sql import SparkSession # Use local master spark = SparkSession \ .builder \ .master('local') \ .appName('spark-read-from-bigquery') \ .getOrCreate() # Load data from BigQuery. df = spark.read.format('bigquery') \ .option('project','YOUR_PROJECT_ID') \ .option('table','test.dim_customer') \ .load() print(df.schema) df.show()
Remember to replace the project ID, dataset and table name accordingly.
Run the script
Run the script using the following command:
spark-submit --packages com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.17.1 pyspark-bq.py
Output:
+--------------------+--------------+-------+---------+----------+----------+
| CustomerID|CustomerNumber| Name|IsCurrent| StartDate| EndDate|
+--------------------+--------------+-------+---------+----------+----------+
|18249e80-2692-472...| 001|Raymond| true|2021-01-01|9999-12-31|
|937bfddc-e381-420...| 002| Jason| true|2021-02-01|9999-12-31|
+--------------------+--------------+-------+---------+----------+----------+
About BigQuery streaming storage API error
When running the sample script, you may encounter the following error:
com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.InvalidArgumentException: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid resource field value in the request. ... uppressed: com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.AsyncTaskException: Asynchronous task failed at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.ApiExceptions.callAndTranslateApiException(ApiExceptions.java:57) at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.rpc.UnaryCallable.call(UnaryCallable.java:112) at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1.BigQueryReadClient.createReadSession(BigQueryReadClient.java:230) at com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation.buildScan(DirectBigQueryRelation.scala:142) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:332) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:365) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:442) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:364) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:332) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:78) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160) at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:75) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:67) at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:391) at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:104) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143) at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:104) at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:97) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:117) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:143) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:143) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:117) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110) at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:161) at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115) at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:161) at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:206) at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685) at org.apache.spark.sql.Dataset.head(Dataset.scala:2722) at org.apache.spark.sql.Dataset.take(Dataset.scala:2929) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301) at org.apache.spark.sql.Dataset.showString(Dataset.scala:338) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) ... 1 more Caused by: com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Invalid resource field value in the request. at com.google.cloud.spark.bigquery.repackaged.io.grpc.Status.asRuntimeException(Status.java:533) ... 15 more
Run the following command to enable BigQuery streaming API:
gcloud services enable bigquerystorage.googleapis.com --project YOUR_PROJECT_ID
The API needs to be enabled.
I encountered this error because my service account is not linked to a project for billing. Thus, to fix it, we can simply add one extra line to specify the parent project:
df = spark.read.format('bigquery') \ .option('project','YOUR_PROJECT_ID') \ .option('parentProject','YOUR_PROJECT_ID') \ .option('table','test.dim_customer') \ .load()
The details about these two project options are list below:
project | The Google Cloud Project ID of the table. (Optional. Defaults to the project of the Service Account being used) | Read/Write |
parentProject | The Google Cloud Project ID of the table to bill for the export. (Optional. Defaults to the project of the Service Account being used) | Read/Write |
Refer to GitHub for more details: GoogleCloudDataproc/spark-bigquery-connector: BigQuery data source for Apache Spark: Read data from BigQuery into DataFrames, write DataFrames into BigQuery tables. (github.com).
About Java version issue
When running the code with Spark 3.1.1 with Java 11, I also encountered the following error:
Java.lang.UnsupportedOperationException: sun.misc.Unsafe or java.nio.DirectByteBuffer.<init>(long, int) not available
I downgraded to use JDK 1.8 and the issue is resolved:
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-amd64/
References
Refer to the following documentation to connect to BigQuery using Dataproc:
Use the BigQuery connector with Spark | Dataproc Documentation (google.com)