Spark - Save DataFrame to Hive Table

access_time 2 years ago visibility26597 comment 0

From Spark 2.0, you can easily read data from Hive data warehouse and also write/append new data to Hive tables.

This page shows how to operate with Hive in Spark including:

  • Create DataFrame from existing Hive table
  • Save DataFrame to a new Hive table
  • Append data to the existing Hive table via both INSERT statement and append write mode.

Python is used as programming language. The syntax for Scala will be very similar.

Create a SparkSession with Hive supported

Run the following code to create a Spark session with Hive support:

from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()

Read data from Hive

And now we can use the SparkSession object to read data from Hive database:

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

I use Derby as Hive metastore and I already created on database named test_db with a table named test_table. Inside the table, there are two records.

The results look similar to the following:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  DEF|
+---+-----+

Add a new column

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

The following is the result:

+---+-----+---------+
| id|value|NewColumn|
+---+-----+---------+
|  1|  ABC|     Test|
|  2|  DEF|     Test|
+---+-----+---------+

Save DataFrame as a new Hive table

Use the following code to save the data frame to a new hive table named test_table2:

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

In the logs, I can see the new table is saved as Parquet by default:

Initialized Parquet WriteSupport with Catalyst schema:
{
   "type" : "struct",
   "fields" : [ {
     "name" : "id",
     "type" : "long",
     "nullable" : true,
     "metadata" : { }
   }, {
     "name" : "value",
     "type" : "string",
     "nullable" : true,
     "metadata" : {
       "HIVE_TYPE_STRING" : "varchar(100)"
     }
   }, {
     "name" : "NewColumn",
     "type" : "string",
     "nullable" : false,
     "metadata" : { }
   } ]
}
and corresponding Parquet message type:
message spark_schema {
   optional int64 id;
   optional binary value (UTF8);
   required binary NewColumn (UTF8);
}

Append data to existing Hive table

You can also append data to existing Hive table either via ‘INSERT SQL statement’ or ‘append’ write mode.

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

Both records are inserted into the table successfully as the following output shows:

+---+-----+--------------------+
| id|value|           NewColumn|
+---+-----+--------------------+
|  4|  JKL|Spark Write Appen...|
|  1|  ABC|                Test|
|  2|  DEF|                Test|
|  3|  GHI|          SQL INSERT|
+---+-----+--------------------+

Complete code - hive-example.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()

# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

Have fun with Spark!

info Last modified by Administrator at 4 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Want to publish your article on Kontext?

Learn more

Kontext Column

Created for everyone to publish data, programming and cloud related articles.
Follow three steps to create your columns.


Learn more arrow_forward

More from Kontext

local_offer tutorial local_offer pyspark local_offer spark local_offer how-to local_offer spark-dataframe

visibility 766
thumb_up 0
access_time 4 months ago

This article shows how to add a constant or literal column to Spark data frame using Python.  Follow article  Convert Python Dictionary List to PySpark DataFrame to construct a dataframe. +----------+---+------+ | Category| ID| Value| +----------+---+------+ |Category A| 1| ...

local_offer tutorial local_offer pyspark local_offer spark local_offer how-to local_offer spark-dataframe

visibility 2410
thumb_up 0
access_time 4 months ago

This article shows how to change column types of Spark DataFrame using Python. For example, convert StringType to DoubleType, StringType to Integer, StringType to DateType. Follow article  Convert Python Dictionary List to PySpark DataFrame to construct a dataframe.

local_offer python local_offer spark local_offer pyspark local_offer spark-advanced

visibility 44938
thumb_up 13
access_time 2 years ago

Data partitioning is critical to data processing performance especially for large volume of data processing in Spark. Partitions in Spark won’t span across nodes though one node can contains more than one partitions. When processing, Spark assigns one task for each partition and each worker threads ...

About column

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

*Spark logo is a registered trademark of Apache Spark.

rss_feed Subscribe RSS