从Spark 2.0开始,您可以很容易的从Hive数据仓库中读取数据或者将数据写入到现有的Hive数据表中。
本文将展示如何在Spark中操作Hive数据:
- 从现有Hive表创建Spark DataFrame
- 保存Spark DataFrame到一个新的Hive表
- 将新的数据添加到现有Hive表中(通过 INSERT SQL以及append模式)
本文示例中的编程语言为Python;如果需要,您可以很容易的将其转换为Scala。
创建一个集成Hive的SparkSession
使用以下代码创建一个SparkSession:
from pyspark.sql import SparkSessionappName = "PySpark Hive Example"master = "local"# Create Spark session with Hive supported.spark = SparkSession.builder \.appName(appName) \.master(master) \.enableHiveSupport() \.getOrCreate()
调用enableHiveSupport可以帮助我们使用Hive的自带函数(有些函数在Spark-SQL中不存在)。
从Hive读取数据
现在我们可以使用创建的SparkSession对象读取Hive数据库中的数据:
# Read data from Hive database test_db, table name: test_table.df = spark.sql("select * from test_db.test_table")df.show()
我使用文件型数据库Derby作为Hive的metastore(元数据比如表的定义的存储)。我已经创建了一个名为test_db的数据库;它包含一个名为test_table的表。
输出的结果如下:
+---+-----+ | id|value| +---+-----+ | 1| ABC| | 2| DEF| +---+-----+
添加一个新列
使用以下代码添加一个新的常数列:
# Let's add a new columndf = df.withColumn("NewColumn",lit('Test'))df.show()
以下为输出结果:
+---+-----+---------+ | id|value|NewColumn| +---+-----+---------+ | 1| ABC| Test| | 2| DEF| Test| +---+-----+---------+
保存DataFrame到新的Hive表
使用以下代码将DataFrame保存于一个名为test_table2新的表中
# Save df to a new table in Hivedf.write.mode("overwrite").saveAsTable("test_db.test_table2")# Show the results using SELECTspark.sql("select * from test_db.test_table2").show()
通过日志,我可以明确新表的存储文件类型为Parquet:
Initialized Parquet WriteSupport with Catalyst schema: { "type" : "struct", "fields" : [ { "name" : "id", "type" : "long", "nullable" : true, "metadata" : { } }, { "name" : "value", "type" : "string", "nullable" : true, "metadata" : { "HIVE_TYPE_STRING" : "varchar(100)" } }, { "name" : "NewColumn", "type" : "string", "nullable" : false, "metadata" : { } } ] } and corresponding Parquet message type: message spark_schema { optional int64 id; optional binary value (UTF8); required binary NewColumn (UTF8); }
添加数据到现有的Hive表
您可以直接添加新的数据到现有表中 - 通过 ‘INSERT’ SQL语句或者‘append’写模式。
# Append data via SQLspark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")spark.sql("select * from test_db.test_table2").show()# Append data via codedf = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")df.show()df.write.mode("append").saveAsTable("test_db.test_table2")spark.sql("select * from test_db.test_table2").show()
上面两条记录均被成功的插入到现有表中:
+---+-----+--------------------+ | id|value| NewColumn| +---+-----+--------------------+ | 4| JKL|Spark Write Appen...| | 1| ABC| Test| | 2| DEF| Test| | 3| GHI| SQL INSERT| +---+-----+--------------------+
完整的代码- hive-example.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
appName = "PySpark Hive Example"
master = "local"
# Create Spark session with Hive supported.
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()
# Read data from Hive database test_db, table name: test_table.
df = spark.sql("select * from test_db.test_table")
df.show()
# Let's add a new column
df = df.withColumn("NewColumn",lit('Test'))
df.show()
# Save df to a new table in Hive
df.write.mode("overwrite").saveAsTable("test_db.test_table2")
# Show the results using SELECT
spark.sql("select * from test_db.test_table2").show()
# Append data via SQL
spark.sql("insert into test_db.test_table2 values (3, 'GHI', 'SQL INSERT')")
spark.sql("select * from test_db.test_table2").show()
# Append data via code
df = spark.sql("select 4 as id, 'JKL' as value, 'Spark Write Append Mode' as NewColumn")
df.show()
df.write.mode("append").saveAsTable("test_db.test_table2")
spark.sql("select * from test_db.test_table2").show()
希望您享受您的Spark之旅!