Spark - Save DataFrame to Hive Table
Log in with external accounts
person Raymond access_time 7 months ago
Re: Spark - Save DataFrame to Hive Table
Hi Swapnil,
These frameworks are designed for batch upserts even they support ACID. If you want to use traditional UPDATE, DELETE and INSERT approach (OLTP), it would be better to use relational databases or nosql databases or alternatively use tools like JDBC to connect to Hive.
Hi Swapnil,
These frameworks are designed for batch upserts even they support ACID. If you want to use traditional UPDATE, DELETE and INSERT approach (OLTP), it would be better to use relational databases or nosql databases or alternatively use tools like JDBC to connect to Hive.
person Swapnil access_time 7 months ago
Re: Spark - Save DataFrame to Hive Table
person Raymond access_time 8 months ago
Re: Spark - Save DataFrame to Hive Table
Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql
to invoke these statements if necessary.
Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc.
Thanks for the pointers
person Raymond access_time 8 months ago
Re: Spark - Save DataFrame to Hive Table
Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql
to invoke these statements if necessary.
Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc.
Hive 3 does support ACID if table data is stored as ORC format and we can use INSERT, UPDATE and DELETE statements to manipulate the data as you can do with other databases. You can use SparkSession.sql
to invoke these statements if necessary.
Hudi and DeltaLake both support ACID too and can be used when working with data lake directly, for example, data stored as hudi and delta format in HDFS or S3, GCS, Azure Blob Storage, etc.
person Swapnil access_time 8 months ago
Re: Spark - Save DataFrame to Hive Table
How can we Update hive table while saving dataframe.
To run update is Apache kudu best option than hive.
How can we Update hive table while saving dataframe.
To run update is Apache kudu best option than hive.
I've tested the following script and it works properly:
from pyspark.sql import SparkSession from pyspark.sql.functions import lit from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType appName = "PySpark Hive Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() # List data = [('Category A', 100, "This is category A"), ('Category B', 120, "This is category B"), ('Category C', 150, "This is category C")] # Create a schema for the dataframe schema = StructType([ StructField('Category', StringType(), True), StructField('Count', IntegerType(), True), StructField('Description', StringType(), True) ]) # Convert list to RDD rdd = spark.sparkContext.parallelize(data) # Create data frame df = spark.createDataFrame(rdd,schema) print(df.schema) df.show() df.write.format('parquet').mode("overwrite").saveAsTable("test_db.test_parquet") df.write.format('parquet').mode("append").saveAsTable("test_db.test_parquet")
It's hard to guess the root cause for your error.
Can you paste your full script for debug? or alternatively, try my above script to see if it works in your environment.
person Abhishek access_time 3 years ago
Re: Spark - Save DataFrame to Hive Table
Hi
Ive written my df as a table this way:
----
mydf.write.format("parquet").saveAsTable("mydb.mytable")
----
But when I'm trying to append same data in the same table using:
----
mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")
----
I get an error:
----
py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.
: java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
----
No idea why I'm getting this. Please help
Thanks
Hi
Ive written my df as a table this way:
----
mydf.write.format("parquet").saveAsTable("mydb.mytable")
----
But when I'm trying to append same data in the same table using:
----
mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")
----
I get an error:
----
py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.
: java.util.NoSuchElementException: next on empty iterator
at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
at scala.collection.IterableLike$class.head(IterableLike.scala:107)
at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)
at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)
at scala.Option.map(Option.scala:146)
at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)
at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)
at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)
at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
----
No idea why I'm getting this. Please help
Thanks
Hello, how would it be if I don't have a database created? Instead, I create the dataframes in the following way:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *
spark = SparkSession.builder.getOrCreate()
emp = [(1, "AAA", "dept1", 1000),
(2, "BBB", "dept1", 1100),
(3, "CCC", "dept1", 3000),
(4, "DDD", "dept1", 1500),
(5, "EEE", "dept2", 8000),
(6, "FFF", "dept2", 7200),
(7, "GGG", "dept3", 7100),
(None, None, None, 7500),
(9, "III", None, 4500),
(10, None, "dept5", 2500)]
dept = [("dept1", "Department - 1"),
("dept2", "Department - 2"),
("dept3", "Department - 3"),
("dept4", "Department - 4")
]
df = spark.createDataFrame(emp, ["id", "name", "dept", "salary"])
deptdf = spark.createDataFrame(dept, ["id", "name"])
From this what is the way to save the dataframe as a hive table.