access_time 3 years ago languageEnglish
more_vert

Spark - Save DataFrame to Hive Table

visibility 39,682 comment 2
From Spark 2.0, you can easily read data from Hive data warehouse and also write/append new data to Hive tables. This page shows how to operate with Hive in Spark including: Create DataFrame from existing Hive table Save DataFrame to a new Hive table Append data to the existing Hive table via ...
info Last modified by Raymond 6 months ago
thumb_up 7

comment Comments

5 months ago link more_vert

I've tested the following script and it works properly:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# List
data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

# Create a schema for the dataframe
schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

df.write.format('parquet').mode("overwrite").saveAsTable("test_db.test_parquet")

df.write.format('parquet').mode("append").saveAsTable("test_db.test_parquet")


It's hard to guess the root cause for your error.

Can you paste your full script for debug? or alternatively, try my above script to see if it works in your environment.


format_quote

person Abhishek access_time 5 months ago
Re: Spark - Save DataFrame to Hive Table

Hi

Ive written my df as a table this way:

----

mydf.write.format("parquet").saveAsTable("mydb.mytable")

----

But when I'm trying to append same data in the same table using:

----

mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")

----

I get an error:

----

py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.

: java.util.NoSuchElementException: next on empty iterator

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)

        at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)

        at scala.collection.IterableLike$class.head(IterableLike.scala:107)

        at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)

        at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)

        at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)

        at scala.Option.map(Option.scala:146)

        at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)

        at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)

        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)

        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)

        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.CallCommand.execute(CallCommand.java:79)

        at py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.lang.Thread.run(Thread.java:748)

----

No idea why I'm getting this. Please help

Thanks

5 months ago link more_vert

Hi

Ive written my df as a table this way:

----

mydf.write.format("parquet").saveAsTable("mydb.mytable")

----

But when I'm trying to append same data in the same table using:

----

mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")

----

I get an error:

----

py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.

: java.util.NoSuchElementException: next on empty iterator

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)

        at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)

        at scala.collection.IterableLike$class.head(IterableLike.scala:107)

        at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)

        at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)

        at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)

        at scala.Option.map(Option.scala:146)

        at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)

        at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)

        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)

        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)

        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.CallCommand.execute(CallCommand.java:79)

        at py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.lang.Thread.run(Thread.java:748)

----

No idea why I'm getting this. Please help

Thanks

Forum discussions for column Spark.

Follow Kontext

Get our latest updates on LinkedIn.

Want to contribute on Kontext to help others?

Learn more