PySpark: Convert Python Array/List to Spark Data Frame

Raymond Raymond event 2019-07-10 visibility 91,490 comment 6
more_vert

In Spark, SparkContext.parallelize function can be used to convert Python list to RDD and then RDD can be converted to DataFrame object. The following sample code is based on Spark 2.x.

In this page, I am going to show you how to convert the following list to a data frame:

data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

Import types

First, let’s import the data types we need for the data frame.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, StringType, IntegerType

We imported StringType and IntegerType because the sample data have three attributes, two are strings and one is integer.

Create Spark session

Create Spark session using the following code:

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Example - Python Array/List to Spark Data Frame"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

Define the schema

Let’s now define a schema for the data frame based on the structure of the Python list.

# Create a schema for the dataframe
schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

Convert the list to data frame

The list can be converted to RDD through parallelize function:

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

Complete script

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType

appName = "PySpark Example - Python Array/List to Spark Data Frame"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# List
data = [('Category A', 100, "This is category A"),
        ('Category B', 120, "This is category B"),
        ('Category C', 150, "This is category C")]

# Create a schema for the dataframe
schema = StructType([
    StructField('Category', StringType(), True),
    StructField('Count', IntegerType(), True),
    StructField('Description', StringType(), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd,schema)
print(df.schema)
df.show()

Sample output

StructType(List(StructField(Category,StringType,true),StructField(Count,IntegerType,true),StructField(Description,StringType,true)))
+----------+-----+------------------+
|  Category|Count|       Description|
+----------+-----+------------------+
|Category A|  100|This is category A|
|Category B|  120|This is category B|
|Category C|  150|This is category C|
+----------+-----+------------------+

Summary

For Python objects, we can convert them to RDD first and then use SparkSession.createDataFrame function to create the data frame based on the RDD.

The following data types are supported for defining the schema:

  • NullType
  • StringType
  • BinaryType
  • BooleanType
  • DateType
  • TimestampType
  • DecimalType
  • DoubleType
  • FloatType
  • ByteType
  • IntegerType
  • LongType
  • ShortType
  • ArrayType
  • MapType

For more information, please refer to the official API documentation pyspark.sql module.

More from Kontext
comment Comments
R Ravi Rajasekharuni

Ravi access_time 3 years ago link more_vert

Very nice code and explanation . Excellent feature in pyspark.


Raymond Raymond

Raymond access_time 3 years ago link more_vert

Thanks for the feedback, Ravi. Welcome to Kontext!

R Ravi Rajasekharuni

Ravi access_time 3 years ago link more_vert

Very nice code and explanation . Excellent feature in pyspark.


V venu gopal

venu access_time 3 years ago link more_vert

Hi Raymond,


But it takes lot of time because of df.collect()

Is there any way to fasten this process? I tried to use --num-executors 5 in spark-submit but no change in performance. Also if possible please provide a solution for this too on how can we leverage --num-executors in this case. Since it's a 'pyspark dataframe' i also used df1 = df.toPandas() but no change in performance.


Raymond Raymond

Raymond access_time 3 years ago link more_vert

Hi venu,

There are several things you need to know:

  • collect function will request all data in the data frame to be sent to your driver application.
  • From Spark later versions, you can directly use DataFrame APIs to transform instead of using RDD and loop through.
  • Similarly for saving as CSV, you can also directly use DataFrame APIs.

Thus, to utilize parallelism and to improve performance, I would suggest the following changes:

  1. Repartition your DataFrame df using repartition function if there is appropriate partition keys. 
  2. Directly use df to do all kinds of transformations. You can find more information here: pyspark.sql.DataFrame — PySpark 3.2.0 documentation (apache.org). Remember to read the documentation of your Spark version. 
  3. Use df.write to save the data into HDFS. 


Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts