Introduction to PySpark ArrayType and MapType
In Spark SQL, ArrayType
and MapType
are two of the complex data types supported by Spark. We can use them to define an array of elements or a dictionary. The element or dictionary value type can be any Spark SQL supported data types too, i.e. we can create really complex data types with nested types. They can be used to define DataFrame's schema or to create schema as UDF returned data type.
About DataType in Spark
The following table list all the supported data types in Spark.
Data type | Value type in Scala | API to access or create a data type |
---|---|---|
ByteType | Byte | ByteType |
ShortType | Short | ShortType |
IntegerType | Int | IntegerType |
LongType | Long | LongType |
FloatType | Float | FloatType |
DoubleType | Double | DoubleType |
DecimalType | java.math.BigDecimal | DecimalType |
StringType | String | StringType |
BinaryType | Array[Byte] | BinaryType |
BooleanType | Boolean | BooleanType |
TimestampType | java.sql.Timestamp | TimestampType |
DateType | java.sql.Date | DateType |
YearMonthIntervalType | java.time.Period | YearMonthIntervalType |
DayTimeIntervalType | java.time.Duration | DayTimeIntervalType |
ArrayType | scala.collection.Seq | ArrayType(elementType, [containsNull]) Note: The default value of containsNull is true. |
MapType | scala.collection.Map | MapType(keyType, valueType, [valueContainsNull]) Note: The default value of valueContainsNull is true. |
StructType | org.apache.spark.sql.Row | StructType(fields) Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed. |
StructField | The value type in Scala of the data type of this field(For example, Int for a StructField with the data type IntegerType) | StructField(name, dataType, [nullable]) Note: The default value of nullable is true. |
*Cited from Data Types - Spark 3.3.0 Documentation.
Use ArrayType
The following code snippet use ArrayType
to define column array_col
as an array of integers as the data input is a list of integers.
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType app_name = "PySpark ArrayType and MapType Example" master = "local" spark = SparkSession.builder \ .appName(app_name) \ .master(master) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") data = [['Hello Kontext!', [1, 2, 3, 4, 5]], ['Hello Context!', [100, 1000]]] # Define the schema for the input data schema = StructType([StructField('str_col', StringType(), nullable=False), StructField('array_col', ArrayType(IntegerType()), nullable=False)]) # Create a DataFrame with the schema provided df = spark.createDataFrame(data=data, schema=schema) print(df.schema) df.show()
Run the above PySpark script, the output looks like the following:
StructType([StructField('str_col', StringType(), False), StructField('array_col', ArrayType(IntegerType(), True), False)]) +--------------+---------------+ | str_col| array_col| +--------------+---------------+ |Hello Kontext!|[1, 2, 3, 4, 5]| |Hello Context!| [100, 1000]| +--------------+---------------+
If you want to explode or flatten the array column, follow this article PySpark DataFrame - explode Array and Map Columns.
Use MapType
In the following example, let's just use MapType
to define a UDF which returns a Python dictionary. If you want to define a column as MapType
, the approach is similar as the ArrayType
example.
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType, IntegerType, MapType from pyspark.sql.functions import udf import json app_name = "PySpark ArrayType and MapType Example" master = "local" spark = SparkSession.builder \ .appName(app_name) \ .master(master) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") data = [['Hello Kontext!', '{"a":1,"b":100}'], ['Hello Context!', '{"a":2,"b":200}']] # Define the schema for the input data schema = StructType([StructField('str_col', StringType(), nullable=False), StructField('dict_str_col', StringType(), nullable=False)]) # Create a DataFrame with the schema provided df = spark.createDataFrame(data=data, schema=schema) print(df.schema) df.show() # Define a udf that returns key as StringType and value as IntegerType @udf(MapType(StringType(), IntegerType())) def to_map(json_str): json_obj = json.loads(json_str) map = {} map['a'] = int(json_obj["a"]) map['b'] = int(json_obj["b"]) return map df = df.withColumn('dict', to_map(df['dict_str_col'])) print(df.schema) df.show()
Output:
StructType([StructField('str_col', StringType(), False), StructField('dict_str_col', StringType(), False)]) +--------------+---------------+ | str_col| dict_str_col| +--------------+---------------+ |Hello Kontext!|{"a":1,"b":100}| |Hello Context!|{"a":2,"b":200}| +--------------+---------------+ StructType([StructField('str_col', StringType(), False), StructField('dict_str_col', StringType(), False), StructField('dict', MapType(StringType(), IntegerType(), True), True)]) +--------------+---------------+------------------+ | str_col| dict_str_col| dict| +--------------+---------------+------------------+ |Hello Kontext!|{"a":1,"b":100}|{a -> 1, b -> 100}| |Hello Context!|{"a":2,"b":200}|{a -> 2, b -> 200}| +--------------+---------------+------------------+
As you can see from the output, derived column 'dict
' is a StructField
with type as MapType
.
I hope you now have a good understanding about how to use these types in Spark.
References
Introduction to PySpark StructType and StructField