Create Spark Indexes via Hyperspace

Raymond Raymond event 2021-12-22 visibility 674
more_vert
Create Spark Indexes via Hyperspace

What? yes you heard it right, you can also create indexes for your Spark DataFrame objects. 

About Hyperspace

Hyperspace is a simple system that allows users to build, maintain and leverage indexes automagically for query/workload acceleration. This project is lead by Microsoft. The project locates at https://microsoft.github.io/hyperspace/ and source code is available on GitHub.

Currently this project supports for 2.4.* and the support for Apache Spark 3.0 is on the way.

Architectural overview

The following diagram illustrates the overview of Hyperspace index architecture:

20211222225856-image.png

*Image from https://microsoft.github.io/hyperspace/docs/toh-overview/ 

Benefits

The obvious benefit is to accelerate analytical workloads by utilizing indexes to save resource consumption as traditional relational databases do. It will then save cost on large data sets too (PBs). 

This article uses PySpark to show you some examples of creating indexes. It also supports Scala and .NET for Spark.

Install PySpark

To save time, I will use my Azure Cloud Shell to install PySpark and then run Spark application with Hyperspace package.

Run the following command to install PySpark:

pip install pyspark==2.4.2

Wait until the package is installed successfully.

Create a PySpark script

Create a PySpark script named pyspark-hyperspace.py with the following content:

from pyspark.sql import SparkSession
from hyperspace import Hyperspace, IndexConfig
appName = "PySpark Hyperspace Spark Indexes Example"
master = "local"
# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()
# Sample data
data = [{"Category": 'Category A', "ID": 1, "Value": 12.40},
        {"Category": 'Category B', "ID": 2, "Value": 30.10},
        {"Category": 'Category C', "ID": 3, "Value": 100.01}]
# Create dataframe and save it as Parquet file
spark.createDataFrame(data).write.mode("overwrite").parquet("table")
df = spark.read.parquet("table")
# Create Hyperspace object
hs = Hyperspace(spark)
# Create an index on column ID with Value column included
hs.createIndex(df, IndexConfig("index", ["ID"], ["Value"]))
# Show indexes
indexes = hs.indexes()
indexes.show()
# Refreshes the given index if the source data changes.
hs.refreshIndex("index")
# Soft-deletes the given index and does not physically remove it from filesystem.
hs.deleteIndex("index")
# Restores the soft-deleted index.
hs.restoreIndex("index")
# Use index to query data
query = df.filter("""ID = 1""").select("""Value""")
# Explain the query plan to see if the index will be used.
hs.explain(query, verbose = True)
# Enable Hyperspace and execute the query
Hyperspace.enable(spark)
query.show()
# Soft-delete the given index for vacuum.
hs.deleteIndex("index")
# Hard-delete the given index and physically remove it from filesystem.
hs.vacuumIndex("index")

The comments in the script already explain the details very well. There are a few items I want to call out:

  • The import statements import the required packages for Hyperspace.
  • When creating indexes, we use IndexConfig class and there are three parameters used: index name, columns for the index and included columns. This is similar as a covering index in SQL Server or other relational databases. 

Run the script

Run the script using the following command:

spark-submit --packages com.microsoft.hyperspace:hyperspace-core_2.12:0.4.0 ./pyspark-hyperspace.py

Make sure the Scala version is correct otherwise you may encounter the following error:

py4j.protocol.Py4JJavaError: An error occurred while calling None.com.microsoft.hyperspace.index.IndexConfig.
: java.lang.NoClassDefFoundError: scala/Product$class
        at com.microsoft.hyperspace.index.IndexConfig.<init>(IndexConfig.scala:28)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:238)
        at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
        at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: scala.Product$class
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
        ... 12 more

You can reproduce this error by changing Scala version to 2.11 in the submit command. 

The results

Show indexes result

The indexes are shown as the following text output:

+-----+--------------+---------------+----------+--------------------+--------------------+------+
| name|indexedColumns|includedColumns|numBuckets| schema| indexLocation| state|
+-----+--------------+---------------+----------+--------------------+--------------------+------+
|index| [ID]| [Value]| 200|{"type":"struct",...|file:/home/*****_...|ACTIVE|
+-----+--------------+---------------+----------+--------------------+--------------------+------+

As shown in the above table, there is one index named index created. It also shows the index locations. As I'm using local file storage (not HDFS, Azure Data Lake Storage, S3 or Google Cloud Storage), the index is also stored locally. 

Execution plan

As the following screenshot shows, the execution plan will print out two plans (one with index and another without):

20211222234814-image.png

It also highlights out the differences in physical operator stats. You can clearly see that there is one parquet file scan less compared with the plan without Hyperspace enabled.

Summary

Hyperspace and improve your workload performance and save cost though it will incur extra cost when creating the index files and also to store the index files. I would recommend applying it in your data lake storage to accelerate query speed and save cost especially if you are dealing with PB size of data and only query a few commonly queried columns. You can try it out and to see if it actually saves cost for your scenario. Have fun!

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts