visibility 78,219 comment 9 access_time 5 years ago languageEnglish
more_vert
Raymond Raymond
articleArticles 549
codeCode 3
imageDiagrams 49
descriptionNotebooks 0
chat_bubble_outlineThreads 8
commentComments 268
loyaltyKontext Points 6058
account_circleProfile

Spark - Save DataFrame to Hive Table

From Spark 2.0, you can easily read data from Hive data warehouse and also write/append new data to Hive tables. This page shows how to operate with Hive in Spark including: Create DataFrame from existing Hive table Save DataFrame to a new Hive table Append data to the existing Hive table via ...
info Last modified by Raymond 3 years ago
thumb_up 16

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

comment Comments
4 months ago link more_vert
R Royer
articleArticles 0
codeCode 0
imageDiagrams 0
descriptionNotebooks 0
chat_bubble_outlineThreads 1
commentComments 2
loyaltyKontext Points 7
account_circleProfile
#1776 Re: Spark - Save DataFrame to Hive Table

Hello, how would it be if I don't have a database created? Instead, I create the dataframes in the following way:

import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *

spark = SparkSession.builder.getOrCreate()

emp = [(1, "AAA", "dept1", 1000),
    (2, "BBB", "dept1", 1100),
    (3, "CCC", "dept1", 3000),
    (4, "DDD", "dept1", 1500),
    (5, "EEE", "dept2", 8000),
    (6, "FFF", "dept2", 7200),
    (7, "GGG", "dept3", 7100),
    (None, None, None, 7500),
    (9, "III", None, 4500),
    (10, None, "dept5", 2500)]

dept = [("dept1", "Department - 1"),
        ("dept2", "Department - 2"),
        ("dept3", "Department - 3"),
        ("dept4", "Department - 4")
       ]

df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"])

From this what is the way to save the dataframe as a hive table.

8 months ago link more_vert
S Swapnil
articleArticles 1
codeCode 0
imageDiagrams 0
descriptionNotebooks 0
chat_bubble_outlineThreads 0
commentComments 11
loyaltyKontext Points 21
account_circleProfile
#1711 Re: Spark - Save DataFrame to Hive Table
Thanks Raymond this makes sense..
format_quote

person Raymond access_time 7 months ago
Re: Spark - Save DataFrame to Hive Table

Hi Swapnil,

These frameworks are designed for batch upserts even they support ACID. If you want to use traditional UPDATE, DELETE and INSERT approach (OLTP), it would be better to use relational databases or nosql databases or alternatively use tools like JDBC to connect to Hive. 

7 months ago link more_vert
Raymond Raymond
articleArticles 549
codeCode 3
imageDiagrams 49
descriptionNotebooks 0
chat_bubble_outlineThreads 8
commentComments 268
loyaltyKontext Points 6058
account_circleProfile
#1708 Re: Spark - Save DataFrame to Hive Table

Hi Swapnil,

These frameworks are designed for batch upserts even they support ACID. If you want to use traditional UPDATE, DELETE and INSERT approach (OLTP), it would be better to use relational databases or nosql databases or alternatively use tools like JDBC to connect to Hive. 

format_quote

person Swapnil access_time 7 months ago
Re: Spark - Save DataFrame to Hive Table

HI  Reymond, Although hive/hudi supports ACID but we cannot spark.sql() dont support update ,merge or delete statements.so is there any workaround for submitting DML .
7 months ago link more_vert
S Swapnil
articleArticles 1
codeCode 0
imageDiagrams 0
descriptionNotebooks 0
chat_bubble_outlineThreads 0
commentComments 11
loyaltyKontext Points 21
account_circleProfile
#1707 Re: Spark - Save DataFrame to Hive Table
HI  Reymond, Although hive/hudi supports ACID but we cannot spark.sql() dont support update ,merge or delete statements.so is there any workaround for submitting DML .
format_quote

person Raymond access_time 8 months ago
Re: Spark - Save DataFrame to Hive Table

Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql to invoke these statements if necessary. 

Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc. 


8 months ago link more_vert
S Swapnil
articleArticles 1
codeCode 0
imageDiagrams 0
descriptionNotebooks 0
chat_bubble_outlineThreads 0
commentComments 11
loyaltyKontext Points 21
account_circleProfile
#1704 Re: Spark - Save DataFrame to Hive Table

Thanks for the pointers

format_quote

person Raymond access_time 8 months ago
Re: Spark - Save DataFrame to Hive Table

Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql to invoke these statements if necessary. 

Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc. 


8 months ago link more_vert
Raymond Raymond
articleArticles 549
codeCode 3
imageDiagrams 49
descriptionNotebooks 0
chat_bubble_outlineThreads 8
commentComments 268
loyaltyKontext Points 6058
account_circleProfile
#1703 Re: Spark - Save DataFrame to Hive Table

Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql to invoke these statements if necessary. 

Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc. 


format_quote

person Swapnil access_time 8 months ago
Re: Spark - Save DataFrame to Hive Table

How can we Update hive table while saving  dataframe.

To run update is Apache kudu best option than hive.

3 years ago link more_vert
Raymond Raymond
articleArticles 549
codeCode 3
imageDiagrams 49
descriptionNotebooks 0
chat_bubble_outlineThreads 8
commentComments 268
loyaltyKontext Points 6058
account_circleProfile
#378 Re: Spark - Save DataFrame to Hive Table

I've tested the following script and it works properly:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# List
data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

# Create a schema for the dataframe
schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

df.write.format('parquet').mode("overwrite").saveAsTable("test_db.test_parquet")

df.write.format('parquet').mode("append").saveAsTable("test_db.test_parquet")


It's hard to guess the root cause for your error.

Can you paste your full script for debug? or alternatively, try my above script to see if it works in your environment.


format_quote

person Abhishek access_time 3 years ago
Re: Spark - Save DataFrame to Hive Table

Hi

Ive written my df as a table this way:

----

mydf.write.format("parquet").saveAsTable("mydb.mytable")

----

But when I'm trying to append same data in the same table using:

----

mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")

----

I get an error:

----

py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.

: java.util.NoSuchElementException: next on empty iterator

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)

        at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)

        at scala.collection.IterableLike$class.head(IterableLike.scala:107)

        at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)

        at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)

        at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)

        at scala.Option.map(Option.scala:146)

        at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)

        at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)

        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)

        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)

        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.CallCommand.execute(CallCommand.java:79)

        at py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.lang.Thread.run(Thread.java:748)

----

No idea why I'm getting this. Please help

Thanks

3 years ago link more_vert
A Abhishek
articleArticles 0
codeCode 0
imageDiagrams 0
descriptionNotebooks 0
chat_bubble_outlineThreads 0
commentComments 1
loyaltyKontext Points 1
#377 Re: Spark - Save DataFrame to Hive Table

Hi

Ive written my df as a table this way:

----

mydf.write.format("parquet").saveAsTable("mydb.mytable")

----

But when I'm trying to append same data in the same table using:

----

mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")

----

I get an error:

----

py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.

: java.util.NoSuchElementException: next on empty iterator

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)

        at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)

        at scala.collection.IterableLike$class.head(IterableLike.scala:107)

        at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)

        at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)

        at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)

        at scala.Option.map(Option.scala:146)

        at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)

        at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)

        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)

        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)

        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.CallCommand.execute(CallCommand.java:79)

        at py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.lang.Thread.run(Thread.java:748)

----

No idea why I'm getting this. Please help

Thanks

recommendMore from Kontext