Raymond Raymond

Spark - Save DataFrame to Hive Table

event 2019-03-27 visibility 81,426 comment 9 insights toc
more_vert
insights Stats

From Spark 2.0, you can easily read data from Hive data warehouse and also write/append new data to Hive tables.

This page shows how to operate with Hive in Spark including:

  • Create DataFrame from existing Hive table
  • Save DataFrame to a new Hive table
  • Append data to the existing Hive table via both INSERT statement and append write mode.

Python is used as programming language. The syntax for Scala will be very similar.

Create a SparkSession with Hive supported

Run the following code to create a Spark session with Hive support:

from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()

Read data from Hive

And now we can use the SparkSession object to read data from Hive database:

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

I use Derby as Hive metastore and I already created on database named test_db with a table named test_table. Inside the table, there are two records.

The results look similar to the following:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  DEF|
+---+-----+

Add a new column

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

The following is the result:

+---+-----+---------+
| id|value|NewColumn|
+---+-----+---------+
|  1|  ABC|     Test|
|  2|  DEF|     Test|
+---+-----+---------+

Save DataFrame as a new Hive table

Use the following code to save the data frame to a new hive table named test_table2:

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

In the logs, I can see the new table is saved as Parquet by default:

Initialized Parquet WriteSupport with Catalyst schema:
{
   "type" : "struct",
   "fields" : [ {
     "name" : "id",
     "type" : "long",
     "nullable" : true,
     "metadata" : { }
   }, {
     "name" : "value",
     "type" : "string",
     "nullable" : true,
     "metadata" : {
       "HIVE_TYPE_STRING" : "varchar(100)"
     }
   }, {
     "name" : "NewColumn",
     "type" : "string",
     "nullable" : false,
     "metadata" : { }
   } ]
}
and corresponding Parquet message type:
message spark_schema {
   optional int64 id;
   optional binary value (UTF8);
   required binary NewColumn (UTF8);
}

Append data to existing Hive table

You can also append data to existing Hive table either via ‘INSERT SQL statement’ or ‘append’ write mode.

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

Both records are inserted into the table successfully as the following output shows:

+---+-----+--------------------+
| id|value|           NewColumn|
+---+-----+--------------------+
|  4|  JKL|Spark Write Appen...|
|  1|  ABC|                Test|
|  2|  DEF|                Test|
|  3|  GHI|          SQL INSERT|
+---+-----+--------------------+

Complete code - hive-example.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()

# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

Have fun with Spark!

More from Kontext
comment Comments
R Royer Andino #1776 access_time 2 years ago more_vert

Hello, how would it be if I don't have a database created? Instead, I create the dataframes in the following way:

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"])

From this what is the way to save the dataframe as a hive table.

S Swapnil Patil #1711 access_time 2 years ago more_vert
Thanks Raymond this makes sense..
format_quote

person Raymond access_time 2 years ago

Hi Swapnil,

These frameworks are designed for batch upserts even they support ACID. If you want to use traditional UPDATE, DELETE and INSERT approach (OLTP), it would be better to use relational databases or nosql databases or alternatively use tools like JDBC to connect to Hive. 

Raymond Raymond #1708 access_time 2 years ago more_vert

Hi Swapnil,

These frameworks are designed for batch upserts even they support ACID. If you want to use traditional UPDATE, DELETE and INSERT approach (OLTP), it would be better to use relational databases or nosql databases or alternatively use tools like JDBC to connect to Hive. 

format_quote

person Swapnil access_time 2 years ago

HI  Reymond, Although hive/hudi supports ACID but we cannot spark.sql() dont support update ,merge or delete statements.so is there any workaround for submitting DML .
S Swapnil Patil #1707 access_time 2 years ago more_vert
HI  Reymond, Although hive/hudi supports ACID but we cannot spark.sql() dont support update ,merge or delete statements.so is there any workaround for submitting DML .
format_quote

person Raymond access_time 2 years ago

Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql to invoke these statements if necessary. 

Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc. 


S Swapnil Patil #1704 access_time 2 years ago more_vert

Thanks for the pointers

format_quote

person Raymond access_time 2 years ago

Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql to invoke these statements if necessary. 

Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc. 


Raymond Raymond #1703 access_time 2 years ago more_vert

Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql to invoke these statements if necessary. 

Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc. 


format_quote

person Swapnil access_time 2 years ago

How can we Update hive table while saving  dataframe.

To run update is Apache kudu best option than hive.

S Swapnil Patil #1702 access_time 2 years ago more_vert

How can we Update hive table while saving  dataframe.

To run update is Apache kudu best option than hive.

Raymond Raymond #378 access_time 4 years ago more_vert

I've tested the following script and it works properly:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# List
data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

# Create a schema for the dataframe
schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

df.write.format('parquet').mode("overwrite").saveAsTable("test_db.test_parquet")

df.write.format('parquet').mode("append").saveAsTable("test_db.test_parquet")


It's hard to guess the root cause for your error.

Can you paste your full script for debug? or alternatively, try my above script to see if it works in your environment.


format_quote

person Abhishek access_time 4 years ago

Hi

Ive written my df as a table this way:

----

mydf.write.format("parquet").saveAsTable("mydb.mytable")

----

But when I'm trying to append same data in the same table using:

----

mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")

----

I get an error:

----

py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.

: java.util.NoSuchElementException: next on empty iterator

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)

        at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)

        at scala.collection.IterableLike$class.head(IterableLike.scala:107)

        at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)

        at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)

        at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)

        at scala.Option.map(Option.scala:146)

        at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)

        at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)

        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)

        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)

        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.CallCommand.execute(CallCommand.java:79)

        at py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.lang.Thread.run(Thread.java:748)

----

No idea why I'm getting this. Please help

Thanks

A Abhishek Choudhary #377 access_time 4 years ago more_vert

Hi

Ive written my df as a table this way:

----

mydf.write.format("parquet").saveAsTable("mydb.mytable")

----

But when I'm trying to append same data in the same table using:

----

mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")

----

I get an error:

----

py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.

: java.util.NoSuchElementException: next on empty iterator

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)

        at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)

        at scala.collection.IterableLike$class.head(IterableLike.scala:107)

        at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)

        at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)

        at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)

        at scala.Option.map(Option.scala:146)

        at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)

        at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)

        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)

        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)

        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.CallCommand.execute(CallCommand.java:79)

        at py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.lang.Thread.run(Thread.java:748)

----

No idea why I'm getting this. Please help

Thanks

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts