Raymond Raymond

PySpark: 将DataFrame中的JSON字符列转换为数组

event 2021-10-13 visibility 1,055 comment 0 insights toc
more_vert
insights Stats

这篇文章将展示如果在Spark DataFrame中将一个JSON数组的字符串列转换为新的列。以下的示例代码均使用Spark 2.2.1;其它版本诸如 Spark 1.6.0 也可运行。

必要条件

如果您没有Spark的集群可以使用,您需要先在您的Windows或者其它系统中安装Spark。

Install Spark 2.2.1 in Windows (英文版本)

*如果您使用Linux或者UNIX的系统,代码可以依然正常运行。

需求

将以下的串列数组转换为Spark DataFrame:

source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {
"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]

生成的DataFrame需要包含两列:

  • attr_1: 类型为 IntegerType
  • attr_2: 类型为 ArrayType (数组元素是StructType类型并且包含两个类型为StructField的字段)

DataFrame的架构应如下所示:

root
  |-- attr_1: long (nullable = true)
  |-- attr_2: array (nullable = true)
  |    |-- element: struct (containsNull = true)
  |    |    |-- a: integer (nullable = false)
  |    |    |-- b: integer (nullable = false)

解决方案

将串联转为DataFrame

首先我们使用以下代码将串列转换为Spark DataFrame:

# Read the list into data frame
df = sqlContext.read.json(sc.parallelize(source))
df.show()
df.printSchema()

JSON数据被读取到DataFrame对象中。输入结果如下:

+------+--------------------+
|attr_1|              attr_2|
+------+--------------------+
|     1|[{"a":1,"b":1},{"...|
|     2|[{"a":3,"b":3},{"...|
+------+--------------------+
root
  |-- attr_1: long (nullable = true)
  |-- attr_2: string (nullable = true)

数据集列attr_2 是字符串类型而不是串列。

创建Python自定义函数用于解析JSON

对于attr_2列,让我们创建一个函数来解析其包含的字符串并将其从JSON转换为串列。我们需要用到json包。

# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])

定于列attr_2的架构

# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))

根据以上的JSON字符串,我们将架构定义为一个结构体数组。

创建PySpark UDF

现在我们可以开始创建PySpark的自定义函数。我们需要用到之前定义的Python自定义函数 parse_json 以及架构变量 json_schema。

# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)

创建新的DataFrame

最后,我们将使用自定义的PySpark UDF来创建一个新的DataFrame:

# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()

输出结果如下:

+------+--------------+
|attr_1|        attr_2|
+------+--------------+
|     1|[[1,1], [2,2]]|
|     2|[[3,3], [4,4]]|
+------+--------------+
root
  |-- attr_1: long (nullable = true)
  |-- attr_2: array (nullable = true)
  |    |-- element: struct (containsNull = true)
  |    |    |-- a: integer (nullable = false)
  |    |    |-- b: integer (nullable = false)

总结

完整的代码如下:

from pyspark import SparkContext, SparkConf, SQLContext
appName = "JSON Parse Example"
master = "local[2]"
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {
"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
# Read the list into data frame
df = sqlContext.read.json(sc.parallelize(source))
df.show()
df.printSchema()
# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))
# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()

将以上代码存在文件 parse_json.py; 然后您可以使用以下命令行提交到Spark中执行:

spark-submit parse_json.py

以下截图为我本机的输出结果(版本为:Spark 2.2.1, Python 3.6.4):

image

本文英文版本:PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts