Spark provides flexible APIs to read data from various data sources including Hive databases. In article Spark - Save DataFrame to Hive Table, it provides guidance about writing Spark DataFrame to Hive tables; this article will provides you examples of reading data from Hive using PySpark.
Prerequisites
Environment
- Spark- If you don't have Spark environment, you can follow these articles to install Spark in your machine.
- Hive- Similarly, follow hive installation articles to install Hive.
Sample table
Create a sample Hive table using the following HQL:
create table test_db.test_table(id int, attr string);
insert into test_db.test_table(id, attr) values (1,'a'), (2,'b'),(3,'c');
The statements create a table with three records:
select * from test_db.test_table;
1 a
2 b
3 c
Read data from Hive
Now we can create a PySpark script (read-hive.py) to read from Hive table.
from pyspark.sql import SparkSession
appName = "PySpark Example - Read Hive"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# Create DF by reading from Hive
df = spark.sql("select * from test_db.test_table")
print(df.schema)
df.show()
enableHiveSupport
will force Spark to use Hive data data catalog instead of in-memory catalog. You will be able to see logs of connecting Hive metastore thrift service like the following:
2022-07-08T19:43:23,205 INFO [Thread-5] hive.metastore - Trying to connect to metastore with URI thrift://127.0.0.1:9083 2022-07-08T19:43:23,225 INFO [Thread-5] hive.metastore - Opened a connection to metastore, current connections: 1 2022-07-08T19:43:23,253 INFO [Thread-5] hive.metastore - Connected to metastore.
Run the script using the following command:
spark-submit read-hive.py
Output:
StructType([StructField('id', IntegerType(), True), StructField('attr', StringType(), True)])
+---+----+
| id|attr|
+---+----+
| 1| a|
| 2| b|
| 3| c|
+---+----+
With Hive support enabled, you can also use Hive built-in SQL functions that don't exist in Spark SQL functions.