By using this site, you acknowledge that you have read and understand our Cookie policy, Privacy policy and Terms .
close

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

rss_feed Subscribe RSS

From Spark 2.0, you can easily read data from Hive data warehouse and also write/append new data to Hive tables.

This page shows how to operate with Hive in Spark including:

  • Create DataFrame from existing Hive table
  • Save DataFrame to a new Hive table
  • Append data to the existing Hive table via both INSERT statement and append write mode.

Python is used as programming language. The syntax for Scala will be very similar.

Create a SparkSession with Hive supported

Run the following code to create a Spark session with Hive support:

from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()

Read data from Hive

And now we can use the SparkSession object to read data from Hive database:

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

I use Derby as Hive metastore and I already created on database named test_db with a table named test_table. Inside the table, there are two records.

The results look similar to the following:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  DEF|
+---+-----+

Add a new column

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

The following is the result:

+---+-----+---------+
| id|value|NewColumn|
+---+-----+---------+
|  1|  ABC|     Test|
|  2|  DEF|     Test|
+---+-----+---------+

Save DataFrame as a new Hive table

Use the following code to save the data frame to a new hive table named test_table2:

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

In the logs, I can see the new table is saved as Parquet by default:

Initialized Parquet WriteSupport with Catalyst schema:
{
   "type" : "struct",
   "fields" : [ {
     "name" : "id",
     "type" : "long",
     "nullable" : true,
     "metadata" : { }
   }, {
     "name" : "value",
     "type" : "string",
     "nullable" : true,
     "metadata" : {
       "HIVE_TYPE_STRING" : "varchar(100)"
     }
   }, {
     "name" : "NewColumn",
     "type" : "string",
     "nullable" : false,
     "metadata" : { }
   } ]
}
and corresponding Parquet message type:
message spark_schema {
   optional int64 id;
   optional binary value (UTF8);
   required binary NewColumn (UTF8);
}

Append data to existing Hive table

You can also append data to existing Hive table either via ‘INSERT SQL statement’ or ‘append’ write mode.

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

Both records are inserted into the table successfully as the following output shows:

+---+-----+--------------------+
| id|value|           NewColumn|
+---+-----+--------------------+
|  4|  JKL|Spark Write Appen...|
|  1|  ABC|                Test|
|  2|  DEF|                Test|
|  3|  GHI|          SQL INSERT|
+---+-----+--------------------+

Complete code - hive-example.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()

# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

Have fun with Spark!

info Last modified by Raymond at 2 months ago * This page is subject to Site terms.

info About author

More from Kontext

local_offer hive local_offer hdfs

visibility 30
thumb_up 0
access_time 4 days ago

In Hive, there are two types of tables can be created - internal and external table. Internal tables are also called managed tables. Different features are available to different types. This article lists some of the common differences.  Internal table By default, Hive creates ...

open_in_new View

Spark Read from SQL Server Source using Windows/Kerberos Authentication

local_offer pyspark local_offer SQL Server local_offer spark-2-x

visibility 42
thumb_up 0
access_time 23 days ago

In this article, I am going to show you how to use JDBC Kerberos authentication to connect to SQL Server sources in Spark (PySpark). I will use  Kerberos connection with principal names and password directly that requires  ...

open_in_new View

Schema Merging (Evolution) with Parquet in Spark and Hive

local_offer parquet local_offer pyspark local_offer spark-2-x local_offer hive local_offer hdfs

visibility 67
thumb_up 0
access_time 24 days ago

Schema evolution is supported by many frameworks or data serialization systems such as Avro, Orc, Protocol Buffer and Parquet. With schema evolution, one set of data can be stored in multiple files with different but compatible schema. In Spark, Parquet data source can detect and merge schema ...

open_in_new View

local_offer pyspark local_offer spark-2-x local_offer python

visibility 182
thumb_up 0
access_time 2 months ago

This articles show you how to convert a Python dictionary list to a Spark DataFrame. The code snippets runs on Spark 2.x environments. Input The input data (dictionary list looks like the following): data = [{"Category": 'Category A', 'ItemID': 1, 'Amount': 12.40}, ...

open_in_new View

Kontext Column

Kontext Column

Created for everyone to publish data, programming and cloud related articles. Follow three steps to create your columns.

Learn more arrow_forward
info Follow us on Twitter to get the latest article updates. Follow us