Create Spark Indexes via Hyperspace
What? yes you heard it right, you can also create indexes for your Spark DataFrame objects.
About Hyperspace
Hyperspace is a simple system that allows users to build, maintain and leverage indexes automagically for query/workload acceleration. This project is lead by Microsoft. The project locates at https://microsoft.github.io/hyperspace/ and source code is available on GitHub.
Currently this project supports for 2.4.* and the support for Apache Spark 3.0 is on the way.
Architectural overview
The following diagram illustrates the overview of Hyperspace index architecture:
*Image from https://microsoft.github.io/hyperspace/docs/toh-overview/
Benefits
The obvious benefit is to accelerate analytical workloads by utilizing indexes to save resource consumption as traditional relational databases do. It will then save cost on large data sets too (PBs).
This article uses PySpark to show you some examples of creating indexes. It also supports Scala and .NET for Spark.
Install PySpark
To save time, I will use my Azure Cloud Shell to install PySpark and then run Spark application with Hyperspace package.
Run the following command to install PySpark:
pip install pyspark==2.4.2
Wait until the package is installed successfully.
Create a PySpark script
Create a PySpark script named pyspark-hyperspace.py with the following content:
from pyspark.sql import SparkSession
from hyperspace import Hyperspace, IndexConfig
appName = "PySpark Hyperspace Spark Indexes Example"
master = "local"
# Create Spark session
spark = SparkSession.builder \
.appName(appName) \
.master(master) \
.getOrCreate()
# Sample data
data = [{"Category": 'Category A', "ID": 1, "Value": 12.40},
{"Category": 'Category B', "ID": 2, "Value": 30.10},
{"Category": 'Category C', "ID": 3, "Value": 100.01}]
# Create dataframe and save it as Parquet file
spark.createDataFrame(data).write.mode("overwrite").parquet("table")
df = spark.read.parquet("table")
# Create Hyperspace object
hs = Hyperspace(spark)
# Create an index on column ID with Value column included
hs.createIndex(df, IndexConfig("index", ["ID"], ["Value"]))
# Show indexes
indexes = hs.indexes()
indexes.show()
# Refreshes the given index if the source data changes.
hs.refreshIndex("index")
# Soft-deletes the given index and does not physically remove it from filesystem.
hs.deleteIndex("index")
# Restores the soft-deleted index.
hs.restoreIndex("index")
# Use index to query data
query = df.filter("""ID = 1""").select("""Value""")
# Explain the query plan to see if the index will be used.
hs.explain(query, verbose = True)
# Enable Hyperspace and execute the query
Hyperspace.enable(spark)
query.show()
# Soft-delete the given index for vacuum.
hs.deleteIndex("index")
# Hard-delete the given index and physically remove it from filesystem.
hs.vacuumIndex("index")
The comments in the script already explain the details very well. There are a few items I want to call out:
- The import statements import the required packages for Hyperspace.
- When creating indexes, we use IndexConfig class and there are three parameters used: index name, columns for the index and included columns. This is similar as a covering index in SQL Server or other relational databases.
Run the script
Run the script using the following command:
spark-submit --packages com.microsoft.hyperspace:hyperspace-core_2.12:0.4.0 ./pyspark-hyperspace.py
Make sure the Scala version is correct otherwise you may encounter the following error:
py4j.protocol.Py4JJavaError: An error occurred while calling None.com.microsoft.hyperspace.index.IndexConfig.
: java.lang.NoClassDefFoundError: scala/Product$class
at com.microsoft.hyperspace.index.IndexConfig.<init>(IndexConfig.scala:28)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 12 more
You can reproduce this error by changing Scala version to 2.11 in the submit command.
The results
Show indexes result
The indexes are shown as the following text output:
+-----+--------------+---------------+----------+--------------------+--------------------+------+
| name|indexedColumns|includedColumns|numBuckets| schema| indexLocation| state|
+-----+--------------+---------------+----------+--------------------+--------------------+------+
|index| [ID]| [Value]| 200|{"type":"struct",...|file:/home/*****_...|ACTIVE|
+-----+--------------+---------------+----------+--------------------+--------------------+------+
As shown in the above table, there is one index named index created. It also shows the index locations. As I'm using local file storage (not HDFS, Azure Data Lake Storage, S3 or Google Cloud Storage), the index is also stored locally.
Execution plan
As the following screenshot shows, the execution plan will print out two plans (one with index and another without):
It also highlights out the differences in physical operator stats. You can clearly see that there is one parquet file scan less compared with the plan without Hyperspace enabled.
Summary
Hyperspace and improve your workload performance and save cost though it will incur extra cost when creating the index files and also to store the index files. I would recommend applying it in your data lake storage to accelerate query speed and save cost especially if you are dealing with PB size of data and only query a few commonly queried columns. You can try it out and to see if it actually saves cost for your scenario. Have fun!