PySpark: 将DataFrame中的JSON字符列转换为数组

Raymond Tang Raymond Tang 0 1293 0.95 index 10/13/2021

这篇文章将展示如果在Spark DataFrame中将一个JSON数组的字符串列转换为新的列。以下的示例代码均使用Spark 2.2.1;其它版本诸如 Spark 1.6.0 也可运行。

必要条件

如果您没有Spark的集群可以使用,您需要先在您的Windows或者其它系统中安装Spark。

Install Spark 2.2.1 in Windows (英文版本)

*如果您使用Linux或者UNIX的系统,代码可以依然正常运行。

需求

将以下的串列数组转换为Spark DataFrame:

source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]

生成的DataFrame需要包含两列:

  • attr_1: 类型为 IntegerType
  • attr_2: 类型为 ArrayType (数组元素是StructType类型并且包含两个类型为StructField的字段)

DataFrame的架构应如下所示:

root  |-- attr_1: long (nullable = true)  |-- attr_2: array (nullable = true)  |    |-- element: struct (containsNull = true)  |    |    |-- a: integer (nullable = false)  |    |    |-- b: integer (nullable = false)

解决方案

将串联转为DataFrame

首先我们使用以下代码将串列转换为Spark DataFrame:

# Read the list into data framedf = sqlContext.read.json(sc.parallelize(source))df.show()df.printSchema()

JSON数据被读取到DataFrame对象中。输入结果如下:

+------+--------------------+
|attr_1|              attr_2|
+------+--------------------+
|     1|[{"a":1,"b":1},{"...|
|     2|[{"a":3,"b":3},{"...|
+------+--------------------+root  |-- attr_1: long (nullable = true)  |-- attr_2: string (nullable = true)

数据集列attr_2是字符串类型而不是串列。

创建Python自定义函数用于解析JSON

对于attr_2列,让我们创建一个函数来解析其包含的字符串并将其从JSON转换为串列。我们需要用到json包。

# Function to convert JSON array string to a listimport jsondef parse_json(array_str):json_obj = json.loads(array_str)for item in json_obj:   yield (item["a"], item["b"])

定于列attr\_2的架构

# Define the schemafrom pyspark.sql.types import ArrayType, IntegerType, StructType, StructFieldjson_schema = ArrayType(StructType([StructField('a', IntegerType(), nullable=False), StructField('b', IntegerType(), nullable=False)]))

根据以上的JSON字符串,我们将架构定义为一个结构体数组。

创建PySpark UDF

现在我们可以开始创建PySpark的自定义函数。我们需要用到之前定义的Python自定义函数 parse_json 以及架构变量json_schema。

# Define udffrom pyspark.sql.functions import udfudf_parse_json = udf(lambda str: parse_json(str), json_schema)

创建新的DataFrame

最后,我们将使用自定义的PySpark UDF来创建一个新的DataFrame:

# Generate a new data frame with the expected schemadf_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))df_new.show()df_new.printSchema()

输出结果如下:

+------+--------------+
|attr_1|        attr_2|
+------+--------------+
|     1|[[1,1], [2,2]]|
|     2|[[3,3], [4,4]]|
+------+--------------+root  |-- attr_1: long (nullable = true)  |-- attr_2: array (nullable = true)  |    |-- element: struct (containsNull = true)  |    |    |-- a: integer (nullable = false)  |    |    |-- b: integer (nullable = false)

总结

完整的代码如下:

from pyspark import SparkContext, SparkConf, SQLContextappName = "JSON Parse Example"master = "local[2]"conf = SparkConf().setAppName(appName).setMaster(master)sc = SparkContext(conf=conf)sqlContext = SQLContext(sc)source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]# Read the list into data framedf = sqlContext.read.json(sc.parallelize(source))df.show()df.printSchema()# Function to convert JSON array string to a listimport jsondef parse_json(array_str):json_obj = json.loads(array_str)for item in json_obj:yield (item["a"], item["b"])# Define the schemafrom pyspark.sql.types import ArrayType, IntegerType, StructType, StructFieldjson_schema = ArrayType(StructType([StructField('a', IntegerType(), nullable=False), StructField('b', IntegerType(), nullable=False)]))# Define udffrom pyspark.sql.functions import udfudf_parse_json = udf(lambda str: parse_json(str), json_schema)# Generate a new data frame with the expected schemadf_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))df_new.show()df_new.printSchema()

将以上代码存在文件 parse_json.py; 然后您可以使用以下命令行提交到Spark中执行:

spark-submit parse_json.py

以下截图为我本机的输出结果(版本为:Spark 2.2.1, Python 3.6.4):

https://api.kontext.tech/resource/1502b379-5b0d-5eb6-9f4c-93b878385a2b

本文英文版本:PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame

python spark

Join the Discussion

View or add your thoughts below

Comments