arrow_back Spark - Save DataFrame to Hive Table

access_time 3 years ago link more_vert
#378 Re: Spark - Save DataFrame to Hive Table

I've tested the following script and it works properly:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# List
data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

# Create a schema for the dataframe
schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

df.write.format('parquet').mode("overwrite").saveAsTable("test_db.test_parquet")

df.write.format('parquet').mode("append").saveAsTable("test_db.test_parquet")


It's hard to guess the root cause for your error.

Can you paste your full script for debug? or alternatively, try my above script to see if it works in your environment.


format_quote

person Abhishek access_time 3 years ago
Re: Spark - Save DataFrame to Hive Table

Hi

Ive written my df as a table this way:

----

mydf.write.format("parquet").saveAsTable("mydb.mytable")

----

But when I'm trying to append same data in the same table using:

----

mydf.write.mode("append").format("parquet").saveAsTable("mydb.mytable")

----

I get an error:

----

py4j.protocol.Py4JJavaError: An error occurred while calling o106.saveAsTable.

: java.util.NoSuchElementException: next on empty iterator

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)

        at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)

        at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)

        at scala.collection.IterableLike$class.head(IterableLike.scala:107)

        at scala.collection.mutable.ArrayBuffer.scala$collection$IndexedSeqOptimized$$super$head(ArrayBuffer.scala:48)

        at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)

        at scala.collection.mutable.ArrayBuffer.head(ArrayBuffer.scala:48)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:466)

        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$18.apply(DataSource.scala:463)

        at scala.Option.map(Option.scala:146)

        at org.apache.spark.sql.execution.datasources.DataSource.planForWritingFileFormat(DataSource.scala:463)

        at org.apache.spark.sql.execution.datasources.DataSource.writeAndRead(DataSource.scala:516)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.saveDataIntoTable(createDataSourceTables.scala:216)

        at org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand.run(createDataSourceTables.scala:166)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)

        at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)

        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)

        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)

        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)

        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)

        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)

        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)

        at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:458)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:437)

        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:393)

        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

        at java.lang.reflect.Method.invoke(Method.java:498)

        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)

        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)

        at py4j.Gateway.invoke(Gateway.java:282)

        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)

        at py4j.commands.CallCommand.execute(CallCommand.java:79)

        at py4j.GatewayConnection.run(GatewayConnection.java:238)

        at java.lang.Thread.run(Thread.java:748)

----

No idea why I'm getting this. Please help

Thanks

recommendMore from Kontext