Convert PySpark Row List to Pandas Data Frame

access_time 2 years ago visibility6241 comment 0

In Spark, it’s easy to convert Spark Dataframe to Pandas dataframe through one line of code:

df_pd = df.toPandas()

In this page, I am going to show you how to convert a list of PySpark row objects to a Pandas data frame.

Prepare the data frame

The following code snippets create a data frame with schema as:

root
  |-- Category: string (nullable = false)
  |-- ItemID: integer (nullable = false)
  |-- Amount: decimal(10,2) (nullable = true)

from pyspark.sql import SparkSession

from pyspark.sql.functions import collect_list,struct
from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DecimalType
from decimal import Decimal
import pandas as pd

appName = "Python Example - PySpark Row List to Pandas Data Frame"
master = "local"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# List
data = [('Category A', 1, Decimal(12.40)),
        ('Category B', 2, Decimal(30.10)),
        ('Category C', 3, Decimal(100.01)),
        ('Category A', 4, Decimal(110.01)),
        ('Category B', 5, Decimal(70.85))
        ]

# Create a schema for the dataframe
schema = StructType([
    StructField('Category', StringType(), False),
    StructField('ItemID', IntegerType(), False),
    StructField('Amount', DecimalType(scale=2), True)
])

# Convert list to RDD
rdd = spark.sparkContext.parallelize(data)

# Create data frame
df = spark.createDataFrame(rdd, schema)
df.printSchema()
df.show()
df_pd = df.toPandas()
df_pd.info()

The above code convert  a list to Spark data frame first and then convert it to a Pandas data frame.

The information of the Pandas data frame looks like the following:

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5 entries, 0 to 4
Data columns (total 3 columns):
Category    5 non-null object
ItemID      5 non-null int32
Amount      5 non-null object
dtypes: int32(1), object(2)
memory usage: 172.0+ bytes

Aggregate the data frame

It’s very common to do aggregations in Spark. For example, the following code snippet groups the above Spark data frame by category attribute.

# Aggregate but still keep all the raw attributes
df_agg = df.groupby("Category").agg(collect_list(struct("*")).alias('Items'))
df_agg.printSchema()

The schema of the new Spark data frame have two attributes: Category and Items.

root
  |-- Category: string (nullable = false)
  |-- Items: array (nullable = true)
  |    |-- element: struct (containsNull = true)
  |    |    |-- Category: string (nullable = false)
  |    |    |-- ItemID: integer (nullable = false)
  |    |    |-- Amount: decimal(10,2) (nullable = true)

The Items attribute is an array or list of pyspark.sql.Row object.

Convert pyspark.sql.Row list to Pandas data frame

Now we can convert the Items attribute using foreach function.

def to_pandas(row):
    print('Create a pandas data frame for category: ' + row["Category"])
    items = [item.asDict() for item in row["Items"]]
    df_pd_items = pd.DataFrame(items)
    print(df_pd_items)

# Convert Items for each Category to a pandas dataframe
df_agg.foreach(to_pandas)

In the above code snippet, Row list is converted to as dictionary list first and then the list is converted to pandas data frame using pd.DateFrame function. As the list element is dictionary object which has keys, we don’t need to specify columns argument for pd.DataFrame function.

info Last modified by Administrator at 3 months ago copyright This page is subject to Site terms.
Like this article?
Share on

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts

Kontext Column

Created for everyone to publish data, programming and cloud related articles.
Follow three steps to create your columns.


Learn more arrow_forward

More from Kontext

local_offer Java local_offer python local_offer SQL Server

visibility 1863
thumb_up 0
access_time 7 months ago

In my previous article  Connect to SQL Server via JayDeBeApi in Python , I showed examples of using Python package jaydebeapi to connect to SQL Server via JDBC. The code was working properly. However after I reinstall Python 3.8.2 x64 version on Windows, I'm getting a new error ...

Pandas DataFrame Plot - Area Chart

local_offer plot local_offer jupyter-notebook local_offer python local_offer pandas local_offer pandas-plot

visibility 174
thumb_up 0
access_time 8 months ago

This article provides examples about plotting area chart using  pandas.DataFrame.plot  or  pandas.core.groupby.DataFrameGroupBy.plot   function. The data I'm going to use is the same as the other article  Pandas DataFrame Plot - Bar Chart . I'm also using Jupyter ...

local_offer spark local_offer pyspark

visibility 3521
thumb_up 0
access_time 2 years ago

In Spark, there are a number of settings/configurations you can specify including application properties and runtime parameters. https://spark.apache.org/docs/latest/configuration.html To retrieve all the current configurations, you can use the following code (Python): from pyspark.sql ...

About column

Apache Spark installation guides, performance tuning tips, general tutorials, etc.

*Spark logo is a registered trademark of Apache Spark.

rss_feed Subscribe RSS