Spark - Save DataFrame to Hive Table
From Spark 2.0, you can easily read data from Hive data warehouse and also write/append new data to Hive tables.
This page shows how to operate with Hive in Spark including:
- Create DataFrame from existing Hive table
- Save DataFrame to a new Hive table
- Append data to the existing Hive table via both INSERT statement and append write mode.
Python is used as programming language. The syntax for Scala will be very similar.
Create a SparkSession with Hive supported
Run the following code to create a Spark session with Hive support:
from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()
Read data from Hive
And now we can use the SparkSession object to read data from Hive database:
# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()
I use Derby as Hive metastore and I already created on database named test_db with a table named test_table. Inside the table, there are two records.
The results look similar to the following:
+---+-----+ | id|value| +---+-----+ | 1| ABC| | 2| DEF| +---+-----+
Add a new column
# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()
The following is the result:
+---+-----+---------+ | id|value|NewColumn| +---+-----+---------+ | 1| ABC| Test| | 2| DEF| Test| +---+-----+---------+
Save DataFrame as a new Hive table
Use the following code to save the data frame to a new hive table named test_table2:
# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()
In the logs, I can see the new table is saved as Parquet by default:
Initialized Parquet WriteSupport with Catalyst schema:
{
"type" : "struct",
"fields" : [ {
"name" : "id",
"type" : "long",
"nullable" : true,
"metadata" : { }
}, {
"name" : "value",
"type" : "string",
"nullable" : true,
"metadata" : {
"HIVE_TYPE_STRING" : "varchar(100)"
}
}, {
"name" : "NewColumn",
"type" : "string",
"nullable" : false,
"metadata" : { }
} ]
}
and corresponding Parquet message type:
message spark_schema {
optional int64 id;
optional binary value (UTF8);
required binary NewColumn (UTF8);
}
Append data to existing Hive table
You can also append data to existing Hive table either via ‘INSERT SQL statement’ or ‘append’ write mode.
# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()
Both records are inserted into the table successfully as the following output shows:
+---+-----+--------------------+ | id|value| NewColumn| +---+-----+--------------------+ | 4| JKL|Spark Write Appen...| | 1| ABC| Test| | 2| DEF| Test| | 3| GHI| SQL INSERT| +---+-----+--------------------+
Complete code - hive-example.py
from pyspark.sql import SparkSession from pyspark.sql.functions import lit appName = "PySpark Hive Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() # Read data from Hive database test_db, table name: test_table. df = spark.sql("select * from test_db.test_table") df.show() # Let's add a new column df = df.withColumn("NewColumn",lit('Test')) df.show() # Save df to a new table in Hive df.write.mode("overwrite").saveAsTable("test_db.test_table2") # Show the results using SELECT spark.sql("select * from test_db.test_table2").show() # Append data via SQL spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')") spark.sql("select * from test_db.test_table2").show() # Append data via code df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn") df.show() df.write.mode("append").saveAsTable("test_db.test_table2") spark.sql("select * from test_db.test_table2").show()
Have fun with Spark!