PySpark - 转换Python数组或串列为Spark DataFrame
在Spark中,函数SparkContext.parallelize 可以用于将Python的串列转换为RDD,然后将RDD转换为DataFrame。以下的示例代码基于Spark 2.x。
在这片文章中,我将展示怎样将以下的串列转换为数据集:
data = [('Category A', 100, "This is category A"), ('Category B', 120, "This is category B"), ('Category C', 150, "This is category C")]
导入类型
首先我们需要导入需要用的类型:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
由于示例中的数组的每个元素包含两个字符和一个整数,我们导入了StringType以及IntegerType以便定义DataFrame的结构类型。
创建SparkSession
使用以下代码创建SparkSession实例:
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType appName = "PySpark Example - Python Array/List to Spark Data Frame" master = "local" # Create SparkSession spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate()
定义DataFrame的架构
我们可以根据Python串列的数据类型来定于DataFrame的架构。
schema = StructType([ StructField('Category', StringType(), True), StructField('Count', IntegerType(), True), StructField('Description', StringType(), True) ])
将串列转换为DataFrame
我们可以将串联通过parallelize函数转换为RDD:
# Convert list to RDD rdd = spark.sparkContext.parallelize(data) # Create data frame df = spark.createDataFrame(rdd,schema) print(df.schema) df.show()
完整的代码
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType appName = "PySpark Example - Python Array/List to Spark Data Frame" master = "local" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() # List data = [('Category A', 100, "This is category A"), ('Category B', 120, "This is category B"), ('Category C', 150, "This is category C")] # Create a schema for the dataframe schema = StructType([ StructField('Category', StringType(), True), StructField('Count', IntegerType(), True), StructField('Description', StringType(), True) ]) # Convert list to RDD rdd = spark.sparkContext.parallelize(data) # Create data frame df = spark.createDataFrame(rdd,schema) print(df.schema) df.show()
输出结果
StructType(List(StructField(Category,StringType,true),StructField(Count,IntegerType,true),StructField(Description,StringType,true))) +----------+-----+------------------+ | Category|Count| Description| +----------+-----+------------------+ |Category A| 100|This is category A| |Category B| 120|This is category B| |Category C| 150|This is category C| +----------+-----+------------------+
总结
对于Python对象,我们可以先将其转换为RDD然后通过SparkSession.createDataFrame函数将其转换为DataFrame类型。
以下的数据类型可以用于定于Spark DataFrame的架构:
NullType
StringType
BinaryType
BooleanType
DateType
TimestampType
DecimalType
DoubleType
FloatType
ByteType
IntegerType
LongType
ShortType
ArrayType
MapType
如果您想要了解更多信息,请参考官方文档:pyspark.sql module。
copyright
This page is subject to Site terms.
comment Comments
No comments yet.