Kontext Copilot - An AI-powered assistant for data analytics that runs on your local computer. Learn more
Get started
PySpark: 将DataFrame中的JSON字符列转换为数组
insights Stats
warning Please login first to view stats information.
这篇文章将展示如果在Spark DataFrame中将一个JSON数组的字符串列转换为新的列。以下的示例代码均使用Spark 2.2.1;其它版本诸如 Spark 1.6.0 也可运行。
必要条件
如果您没有Spark的集群可以使用,您需要先在您的Windows或者其它系统中安装Spark。
*如果您使用Linux或者UNIX的系统,代码可以依然正常运行。
需求
将以下的串列数组转换为Spark DataFrame:
source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {
"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
生成的DataFrame需要包含两列:
- attr_1: 类型为 IntegerType
- attr_2: 类型为 ArrayType (数组元素是StructType类型并且包含两个类型为StructField的字段)
DataFrame的架构应如下所示:
root
|-- attr_1: long (nullable = true)
|-- attr_2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = false)
| | |-- b: integer (nullable = false)
解决方案
将串联转为DataFrame
首先我们使用以下代码将串列转换为Spark DataFrame:
# Read the list into data frame
df = sqlContext.read.json(sc.parallelize(source))
df.show()
df.printSchema()
JSON数据被读取到DataFrame对象中。输入结果如下:
+------+--------------------+ |attr_1| attr_2| +------+--------------------+ | 1|[{"a":1,"b":1},{"...| | 2|[{"a":3,"b":3},{"...| +------+--------------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: string (nullable = true)
数据集列attr_2 是字符串类型而不是串列。
创建Python自定义函数用于解析JSON
对于attr_2列,让我们创建一个函数来解析其包含的字符串并将其从JSON转换为串列。我们需要用到json包。
# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
定于列attr_2的架构
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))
根据以上的JSON字符串,我们将架构定义为一个结构体数组。
创建PySpark UDF
现在我们可以开始创建PySpark的自定义函数。我们需要用到之前定义的Python自定义函数 parse_json 以及架构变量 json_schema。
# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
创建新的DataFrame
最后,我们将使用自定义的PySpark UDF来创建一个新的DataFrame:
# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()
输出结果如下:
+------+--------------+ |attr_1| attr_2| +------+--------------+ | 1|[[1,1], [2,2]]| | 2|[[3,3], [4,4]]| +------+--------------+
root
|-- attr_1: long (nullable = true)
|-- attr_2: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- a: integer (nullable = false)
| | |-- b: integer (nullable = false)
总结
完整的代码如下:
from pyspark import SparkContext, SparkConf, SQLContext
appName = "JSON Parse Example"
master = "local[2]"
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
source = [{"attr_1": 1, "attr_2": "[{\"a\":1,\"b\":1},{\"a\":2,\"b\":2}]"}, {
"attr_1": 2, "attr_2": "[{\"a\":3,\"b\":3},{\"a\":4,\"b\":4}]"}]
# Read the list into data frame
df = sqlContext.read.json(sc.parallelize(source))
df.show()
df.printSchema()
# Function to convert JSON array string to a list
import json
def parse_json(array_str):
json_obj = json.loads(array_str)
for item in json_obj:
yield (item["a"], item["b"])
# Define the schema
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField
json_schema = ArrayType(StructType([StructField('a', IntegerType(
), nullable=False), StructField('b', IntegerType(), nullable=False)]))
# Define udf
from pyspark.sql.functions import udf
udf_parse_json = udf(lambda str: parse_json(str), json_schema)
# Generate a new data frame with the expected schema
df_new = df.select(df.attr_1, udf_parse_json(df.attr_2).alias("attr_2"))
df_new.show()
df_new.printSchema()
将以上代码存在文件 parse_json.py; 然后您可以使用以下命令行提交到Spark中执行:
spark-submit parse_json.py
以下截图为我本机的输出结果(版本为:Spark 2.2.1, Python 3.6.4):
本文英文版本:PySpark: Convert JSON String Column to Array of Object (StructType) in Data Frame。
copyright
This page is subject to Site terms.
comment Comments
No comments yet.