Spark - Save DataFrame to Hive Table
From Spark 2.0, you can easily read data from Hive data warehouse and also write/append new data to Hive tables.
This page shows how to operate with Hive in Spark including:
- Create DataFrame from existing Hive table
- Save DataFrame to a new Hive table
- Append data to the existing Hive table via both INSERT statement and append write mode.
Python is used as programming language. The syntax for Scala will be very similar.
Create a SparkSession with Hive supported
Run the following code to create a Spark session with Hive support:
from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()
Read data from Hive
And now we can use the SparkSession object to read data from Hive database:
# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()
I use Derby as Hive metastore and I already created on database named test_db with a table named test_table. Inside the table, there are two records.
The results look similar to the following:
+---+-----+ | id|value| +---+-----+ | 1| ABC| | 2| DEF| +---+-----+
Add a new column
# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()
The following is the result:
+---+-----+---------+ | id|value|NewColumn| +---+-----+---------+ | 1| ABC| Test| | 2| DEF| Test| +---+-----+---------+
Save DataFrame as a new Hive table
Use the following code to save the data frame to a new hive table named test_table2:
# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()
In the logs, I can see the new table is saved as Parquet by default:
Initialized Parquet WriteSupport with Catalyst schema:
{
"type" : "struct",
"fields" : [ {
"name" : "id",
"type" : "long",
"nullable" : true,
"metadata" : { }
}, {
"name" : "value",
"type" : "string",
"nullable" : true,
"metadata" : {
"HIVE_TYPE_STRING" : "varchar(100)"
}
}, {
"name" : "NewColumn",
"type" : "string",
"nullable" : false,
"metadata" : { }
} ]
}
and corresponding Parquet message type:
message spark_schema {
optional int64 id;
optional binary value (UTF8);
required binary NewColumn (UTF8);
}
Append data to existing Hive table
You can also append data to existing Hive table either via ‘INSERT SQL statement’ or ‘append’ write mode.
# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()
Both records are inserted into the table successfully as the following output shows:
+---+-----+--------------------+ | id|value| NewColumn| +---+-----+--------------------+ | 4| JKL|Spark Write Appen...| | 1| ABC| Test| | 2| DEF| Test| | 3| GHI| SQL INSERT| +---+-----+--------------------+
Complete code - hive-example.py
from pyspark.sql import SparkSession from pyspark.sql.functions import lit appName = "PySpark Hive Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() # Read data from Hive database test_db, table name: test_table. df = spark.sql("select * from test_db.test_table") df.show() # Let's add a new column df = df.withColumn("NewColumn",lit('Test')) df.show() # Save df to a new table in Hive df.write.mode("overwrite").saveAsTable("test_db.test_table2") # Show the results using SELECT spark.sql("select * from test_db.test_table2").show() # Append data via SQL spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')") spark.sql("select * from test_db.test_table2").show() # Append data via code df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn") df.show() df.write.mode("append").saveAsTable("test_db.test_table2") spark.sql("select * from test_db.test_table2").show()
Have fun with Spark!
How can we Update hive table while saving dataframe.
To run update is Apache kudu best option than hive.
Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql
to invoke these statements if necessary.
Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc.
Hi Swapnil,
These frameworks are designed for batch upserts even they support ACID. If you want to use traditional UPDATE, DELETE and INSERT approach (OLTP), it would be better to use relational databases or nosql databases or alternatively use tools like JDBC to connect to Hive.
Thanks for the pointers
Hi
Ive written my df as a table this way:
----
mydf.write.format("parquet").saveAsTable("mydb.mytable")
----
But when I'm trying to append same data in the same table using:
----
mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")
----
I get an error:
----
py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.
: java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
----
No idea why I'm getting this. Please help
Thanks
I've tested the following script and it works properly:
from pyspark.sql import SparkSession from pyspark.sql.functions import lit from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType appName = "PySpark Hive Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() # List data = [('Category A', 100, "This is category A"), ('Category B', 120, "This is category B"), ('Category C', 150, "This is category C")] # Create a schema for the dataframe schema = StructType([ StructField('Category', StringType(), True), StructField('Count', IntegerType(), True), StructField('Description', StringType(), True) ]) # Convert list to RDD rdd = spark.sparkContext.parallelize(data) # Create data frame df = spark.createDataFrame(rdd,schema) print(df.schema) df.show() df.write.format('parquet').mode("overwrite").saveAsTable("test_db.test_parquet") df.write.format('parquet').mode("append").saveAsTable("test_db.test_parquet")
It's hard to guess the root cause for your error.
Can you paste your full script for debug? or alternatively, try my above script to see if it works in your environment.
Hello, how would it be if I don't have a database created? Instead, I create the dataframes in the following way:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
emp = [(1, "AAA", "dept1", 1000),
(2, "BBB", "dept1", 1100),
(3, "CCC", "dept1", 3000),
(4, "DDD", "dept1", 1500),
(5, "EEE", "dept2", 8000),
(6, "FFF", "dept2", 7200),
(7, "GGG", "dept3", 7100),
(None, None, None, 7500),
(9, "III", None, 4500),
(10, None, "dept5", 2500)]
dept = [("dept1", "Department - 1"),
("dept2", "Department - 2"),
("dept3", "Department - 3"),
("dept4", "Department - 4")
]
df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"])
From this what is the way to save the dataframe as a hive table.