Introduction to PySpark ArrayType and MapType

Kontext Kontext event 2022-08-18 visibility 2,308
more_vert

In Spark SQL, ArrayType and MapType are two of the complex data types supported by Spark. We can use them to define an array of elements or a dictionary. The element or dictionary value type can be any Spark SQL supported data types too, i.e. we can create really complex data types with nested types. They can be used to define DataFrame's schema or to create schema as UDF returned data type.

About DataType in Spark

The following table list all the supported data types in Spark.

Data type Value type in Scala API to access or create a data type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
YearMonthIntervalType java.time.Period YearMonthIntervalType
DayTimeIntervalType java.time.Duration DayTimeIntervalType
ArrayType scala.collection.Seq ArrayType(elementType, [containsNull])
Note: The default value of containsNull is true.
MapType scala.collection.Map MapType(keyType, valueType, [valueContainsNull])
Note: The default value of valueContainsNull is true.
StructType org.apache.spark.sql.Row StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.
StructField The value type in Scala of the data type of this field(For example, Int for a StructField with the data type IntegerType) StructField(name, dataType, [nullable])
Note: The default value of nullable is true.

*Cited from Data Types - Spark 3.3.0 Documentation.

Use ArrayType

The following code snippet use ArrayType to define column array_col as an array of integers as the data input is a list of integers. 

infoInfo - Spark and infer schema from most of data sources. Explicit schema definition can be used to ensure input data source match with your target schema.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType

app_name = "PySpark ArrayType and MapType Example"
master = "local"

spark = SparkSession.builder \
    .appName(app_name) \
    .master(master) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

data = [['Hello Kontext!', [1, 2, 3, 4, 5]],
        ['Hello Context!', [100, 1000]]]

# Define the schema for the input data
schema = StructType([StructField('str_col', StringType(), nullable=False),
                     StructField('array_col', ArrayType(IntegerType()), nullable=False)])

# Create a DataFrame with the schema provided
df = spark.createDataFrame(data=data, schema=schema)

print(df.schema)

df.show()

Run the above PySpark script, the output looks like the following:

StructType([StructField('str_col', StringType(), False), StructField('array_col', ArrayType(IntegerType(), True), False)])

+--------------+---------------+
|       str_col|      array_col|
+--------------+---------------+
|Hello Kontext!|[1, 2, 3, 4, 5]|
|Hello Context!|    [100, 1000]|
+--------------+---------------+

If you want to explode or flatten the array column, follow this article PySpark DataFrame - explode Array and Map Columns.

Use MapType

In the following example, let's just use MapType to define a UDF which returns a Python dictionary.  If you want to define a column as MapType, the approach is similar as the ArrayType example.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType
from pyspark.sql.functions import udf
import json
app_name = "PySpark ArrayType and MapType Example"
master = "local"

spark = SparkSession.builder \
    .appName(app_name) \
    .master(master) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

data = [['Hello Kontext!', '{"a":1,"b":100}'],
        ['Hello Context!', '{"a":2,"b":200}']]

# Define the schema for the input data
schema = StructType([StructField('str_col', StringType(), nullable=False),
                     StructField('dict_str_col', StringType(), nullable=False)])

# Create a DataFrame with the schema provided
df = spark.createDataFrame(data=data, schema=schema)

print(df.schema)

df.show()

# Define a udf that returns key as StringType and value as IntegerType


@udf(MapType(StringType(), IntegerType()))
def to_map(json_str):
    json_obj = json.loads(json_str)
    map = {}
    map['a'] = int(json_obj["a"])
    map['b'] = int(json_obj["b"])
    return map


df = df.withColumn('dict', to_map(df['dict_str_col']))
print(df.schema)
df.show()

Output:

StructType([StructField('str_col', StringType(), False), StructField('dict_str_col', StringType(), False)])
+--------------+---------------+
|       str_col|   dict_str_col|
+--------------+---------------+
|Hello Kontext!|{"a":1,"b":100}|
|Hello Context!|{"a":2,"b":200}|
+--------------+---------------+

StructType([StructField('str_col', StringType(), False), StructField('dict_str_col', StringType(), False), StructField('dict', MapType(StringType(), IntegerType(), True), True)])
+--------------+---------------+------------------+
|       str_col|   dict_str_col|              dict|
+--------------+---------------+------------------+
|Hello Kontext!|{"a":1,"b":100}|{a -> 1, b -> 100}|
|Hello Context!|{"a":2,"b":200}|{a -> 2, b -> 200}|
+--------------+---------------+------------------+

As you can see from the output, derived column 'dict' is a StructField with type as MapType.

I hope you now have a good understanding about how to use these types in Spark.

References

Introduction to PySpark StructType and StructField

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts