Spark - 保存DataFrame为Hive数据表
insights Stats
warning Please login first to view stats information.
从Spark 2.0开始,您可以很容易的从Hive数据仓库中读取数据或者将数据写入到现有的Hive数据表中。
本文将展示如何在Spark中操作Hive数据:
- 从现有Hive表创建Spark DataFrame
- 保存Spark DataFrame到一个新的Hive表
- 将新的数据添加到现有Hive表中(通过 INSERT SQL以及append模式)
本文示例中的编程语言为Python;如果需要,您可以很容易的将其转换为Scala。
创建一个集成Hive的SparkSession
使用以下代码创建一个SparkSession:
from pyspark.sql import SparkSession
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()
调用enableHiveSupport可以帮助我们使用Hive的自带函数(有些函数在Spark-SQL中不存在)。
从Hive读取数据
现在我们可以使用创建的SparkSession对象读取Hive数据库中的数据:
# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()
我使用文件型数据库Derby作为Hive的metastore(元数据比如表的定义的存储)。我已经创建了一个名为test_db的数据库;它包含一个名为test_table的表。
输出的结果如下:
+---+-----+ | id|value| +---+-----+ | 1| ABC| | 2| DEF| +---+-----+
添加一个新列
使用以下代码添加一个新的常数列:
# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()
以下为输出结果:
+---+-----+---------+ | id|value|NewColumn| +---+-----+---------+ | 1| ABC| Test| | 2| DEF| Test| +---+-----+---------+
保存DataFrame到新的Hive表
使用以下代码将DataFrame保存于一个名为test_table2新的表中
# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()
通过日志,我可以明确新表的存储文件类型为Parquet:
Initialized Parquet WriteSupport with Catalyst schema: {
"type" : "struct",
"fields" : [ {
"name" : "id",
"type" : "long",
"nullable" : true,
"metadata" : { }
}, {
"name" : "value",
"type" : "string",
"nullable" : true,
"metadata" : {
"HIVE_TYPE_STRING" : "varchar(100)"
}
}, {
"name" : "NewColumn",
"type" : "string",
"nullable" : false,
"metadata" : { }
} ] } and corresponding Parquet message type: message spark_schema {
optional int64 id;
optional binary value (UTF8);
required binary NewColumn (UTF8); }
添加数据到现有的Hive表
您可以直接添加新的数据到现有表中 - 通过 ‘INSERT’ SQL语句或者‘append’写模式。
# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()
上面两条记录均被成功的插入到现有表中:
+---+-----+--------------------+ | id|value| NewColumn| +---+-----+--------------------+ | 4| JKL|Spark Write Appen...| | 1| ABC| Test| | 2| DEF| Test| | 3| GHI| SQL INSERT| +---+-----+--------------------+
完整的代码- hive-example.py
from pyspark.sql import SparkSession from pyspark.sql.functions import lit appName = "PySpark Hive Example" master = "local" # Create Spark session with Hive supported. spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .enableHiveSupport() \ .getOrCreate() # Read data from Hive database test_db, table name: test_table. df = spark.sql("select * from test_db.test_table") df.show() # Let's add a new column df = df.withColumn("NewColumn",lit('Test')) df.show() # Save df to a new table in Hive df.write.mode("overwrite").saveAsTable("test_db.test_table2") # Show the results using SELECT spark.sql("select * from test_db.test_table2").show() # Append data via SQL spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')") spark.sql("select * from test_db.test_table2").show() # Append data via code df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn") df.show() df.write.mode("append").saveAsTable("test_db.test_table2") spark.sql("select * from test_db.test_table2").show()
希望您享受您的Spark之旅!
本文英文版本:Spark - Save DataFrame to Hive Table
copyright
This page is subject to Site terms.
comment Comments
No comments yet.