Raymond Raymond

Spark - 保存DataFrame为Hive数据表

event 2021-10-13 visibility 2,016 comment 0 insights toc
more_vert
insights Stats

从Spark 2.0开始,您可以很容易的从Hive数据仓库中读取数据或者将数据写入到现有的Hive数据表中。

本文将展示如何在Spark中操作Hive数据:

  • 从现有Hive表创建Spark DataFrame
  • 保存Spark DataFrame到一个新的Hive表
  • 将新的数据添加到现有Hive表中(通过 INSERT SQL以及append模式)

本文示例中的编程语言为Python;如果需要,您可以很容易的将其转换为Scala。

创建一个集成Hive的SparkSession

使用以下代码创建一个SparkSession:

from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()

调用enableHiveSupport可以帮助我们使用Hive的自带函数(有些函数在Spark-SQL中不存在)。

从Hive读取数据

现在我们可以使用创建的SparkSession对象读取Hive数据库中的数据:

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

我使用文件型数据库Derby作为Hive的metastore(元数据比如表的定义的存储)。我已经创建了一个名为test_db的数据库;它包含一个名为test_table的表。 

输出的结果如下:

+---+-----+
| id|value|
+---+-----+
|  1|  ABC|
|  2|  DEF|
+---+-----+

添加一个新列

使用以下代码添加一个新的常数列:

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

以下为输出结果:

+---+-----+---------+
| id|value|NewColumn|
+---+-----+---------+
|  1|  ABC|     Test|
|  2|  DEF|     Test|
+---+-----+---------+

保存DataFrame到新的Hive表

使用以下代码将DataFrame保存于一个名为test_table2新的表中

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

通过日志,我可以明确新表的存储文件类型为Parquet:

Initialized Parquet WriteSupport with Catalyst schema:
{
   "type" : "struct",
   "fields" : [ {
     "name" : "id",
     "type" : "long",
     "nullable" : true,
     "metadata" : { }
   }, {
     "name" : "value",
     "type" : "string",
     "nullable" : true,
     "metadata" : {
       "HIVE_TYPE_STRING" : "varchar(100)"
     }
   }, {
     "name" : "NewColumn",
     "type" : "string",
     "nullable" : false,
     "metadata" : { }
   } ] } and corresponding Parquet message type: message spark_schema {
   optional int64 id;
   optional binary value (UTF8);
   required binary NewColumn (UTF8); }

添加数据到现有的Hive表

您可以直接添加新的数据到现有表中 - 通过 ‘INSERT’ SQL语句或者‘append’写模式。

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

上面两条记录均被成功的插入到现有表中:

+---+-----+--------------------+
| id|value|           NewColumn|
+---+-----+--------------------+
|  4|  JKL|Spark Write Appen...|
|  1|  ABC|                Test|
|  2|  DEF|                Test|
|  3|  GHI|          SQL INSERT|
+---+-----+--------------------+

完整的代码- hive-example.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

appName = "PySpark Hive Example"
master = "local"

# Create Spark session with Hive supported.
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .enableHiveSupport() \
    .getOrCreate()

# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()

# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()

# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()

# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()

# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()

希望您享受您的Spark之旅!

本文英文版本:Spark - Save DataFrame to Hive Table

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts