PySpark - 转换Python数组或串列为Spark DataFrame

event 2021-10-13 visibility 1,334 comment 0 insights
more_vert
insights Stats
Raymond Raymond Spark 中文

关于Spark的中文教程。

在Spark中,函数SparkContext.parallelize 可以用于将Python的串列转换为RDD,然后将RDD转换为DataFrame。以下的示例代码基于Spark 2.x。

在这片文章中,我将展示怎样将以下的串列转换为数据集:

data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

导入类型

首先我们需要导入需要用的类型:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

由于示例中的数组的每个元素包含两个字符和一个整数,我们导入了StringType以及IntegerType以便定义DataFrame的结构类型。

创建SparkSession

使用以下代码创建SparkSession实例:

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Example - Python Array/List to Spark Data Frame"
master = "local"

# Create SparkSession
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

定义DataFrame的架构

我们可以根据Python串列的数据类型来定于DataFrame的架构。

schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

将串列转换为DataFrame

我们可以将串联通过parallelize函数转换为RDD:

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

完整的代码

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Example - Python Array/List to Spark Data Frame"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# List
data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

# Create a schema for the dataframe
schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

输出结果

StructType(List(StructField(Category,StringType,true),StructField(Count,IntegerType,true),StructField(Description,StringType,true)))
+----------+-----+------------------+
|  Category|Count|       Description|
+----------+-----+------------------+
|Category A|  100|This is category A|
|Category B|  120|This is category B|
|Category C|  150|This is category C|
+----------+-----+------------------+

总结

对于Python对象,我们可以先将其转换为RDD然后通过SparkSession.createDataFrame函数将其转换为DataFrame类型。

以下的数据类型可以用于定于Spark DataFrame的架构:

  • NullType
  • StringType
  • BinaryType
  • BooleanType
  • DateType
  • TimestampType
  • DecimalType
  • DoubleType
  • FloatType
  • ByteType
  • IntegerType
  • LongType
  • ShortType
  • ArrayType
  • MapType

如果您想要了解更多信息,请参考官方文档:pyspark.sql module

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts