Raymond Raymond

Spark Basics - Application, Driver, Executor, Job, Stage and Task Walkthrough

event 2022-08-23 visibility 4,985 comment 0 insights toc
more_vert
insights Stats
Spark Basics - Application, Driver, Executor, Job, Stage and Task Walkthrough
Spark is one of the most popular big data frameworks used by data engineers and analysts. It is very straightforward and easy to get started with developers' preferred programming languages. This article walkthroughs the basics of Spark, including concepts like driver, executor, operations (transformations and actions), Spark application, job, stage and tasks. A good understanding of these basics can help you to write more performant Spark code or debug your application easily with Spark UI.

Spark application

The first concept to understand is Spark Application. An Spark application is a program built with Spark APIs and runs in a Spark compatible cluster/environment. It can be a PySpark script, a Java application, a Scala application, a SparkSession started by spark-shell or spark-sql command, a AWS EMR Step, etc. 

A Spark application consists of a driver container and executors.

Example PySpark script

The following is a sample PySpark script named spark-basics.py

from pyspark.sql import SparkSession

appName = "PySpark App - Spark Basics"
master = "local"

spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# Create two dataframe
df1 = spark.read.format('parquet').load(
    '/data/test-data')
df2 = spark.read.format('parquet').load(
    '/data/crud_table_stg')

# Joins
df = df1.join(df2, on='id', how='inner')
df.show()

# Print out the total record count
print(df.count())

# Save df to HDFS
df.write.format('parquet').mode('overwrite').save(
    '/data/spark-basics')

This script will perform these processes:

  • Create a DataFrame named df1 by reading data from HDFS.
  • Create another DataFrame named df2 from another HDFS path.
  • Create a new DataFrame named df using inner join between df1 and df2 and then print it out.
  • Finally save df into HDFS using Parquet format. 

Run the application using the following command:

spark-submit spark-basic.py

This script will be run as a Spark application once submitted. If you change the master to yarn, the command line will print out logs similar as the following output:

2022-08-23T17:28:37,961 INFO [Thread-5] org.apache.spark.deploy.yarn.Client - Application report for application_1661218991004_0001 (state: ACCEPTED)
2022-08-23T17:28:37,967 INFO [Thread-5] org.apache.spark.deploy.yarn.Client - 
         client token: N/A
         diagnostics: [Tue Aug 23 17:28:37 +1000 2022] Scheduler has assigned a container for AM, waiting for AM container to be launched
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: default
         start time: 1661239716517
         final status: UNDEFINED
         tracking URL: http://localhost:8088/proxy/application_1661218991004_0001/
         user: kontext

'application_1661218991004_0001' is the application identifier in YARN. For simplicity, I am just hosting the driver application in local. 

In Spark History Server, we can find out the run time information of the application. For this case, my Spark application ID for this script is local-1661240132793.2022082373759-image.png

We will look into more details later. 

Spark driver and executors

Driver and executors are important to understand before we deep dive into details.

A Spark driver is the process where the main() method of your Spark application runs. It creates SparkSession and SparkContext objects and convert the code to transformation and action operations. It also create logical and physical plans and schedule and coordinate the tasks with Cluster Manager.

A Spark executor just simply run the tasks in executor nodes of the cluster. 

The following diagram shows how drivers and executors are located in a cluster:


For this scenario, there is only one driver (no executors) involved as I am running the code in local master mode. In normal production nodes, you should be able to notice all executors and drivers in Executors tab of your Spark application UI:

2022082384900-image.png

Transformations and actions

When Spark driver container application converts code to operations, it creates two types: transformation and action.

Transformation operations are lazy executed and return a DataFrame, Dataset or an RDD. When we create a chain of transformations, no data will be executed until an action is called. Since it is lazy-evaluated, transformations will need to be recomputed in reuse unless data is cached or persisted

Actions will execute all the computation on the dataset to generate values that will be returned to the driver program. Transformation's output is an input of Actions.

Transformation operations

For a complete list of transformations, refer to Transformations. For example, map, filter, union, sample, join, repartition, and coalesce are all transformations. 

Action operations

On the other hand, reduce, collect, count, first, taketakeSample, takeOrdered, saveAsTextFile, saveAsSequenceFile, saveAsObjectFile, countByKey, foreach, and save are all actions. 

In the above example, the following three lines will create three actions but they are not the only actions involved in the application. 

df.show()

# Print out the total record count
print(df.count())

# Save df to HDFS
df.write.format('parquet').mode('overwrite').save(
    '/data/spark-basics')

Spark jobs and stages

A Spark job is a parallel computation of tasks. Each action operation will create one Spark job. 

Each Spark job will be converted to a DAG which includes one or more stages. 

A Spark stage is a smaller sets of tasks that depend on each other. Stages are created for each job based on shuffle boundaries, i.e. what operations can be performed serially or in parallel. Not all Spark operations or actions can happen in a single stage without data shuffling, thus they may be divided into multiple stages. For example, an operation involves data shuffling will lead to the creation of a new stage. If there is no data shuffling for the job, there is usually one stage created. 

For this example application, 8 jobs were created as the following screenshot shows:

2022082391532-image.png

The following screenshot is the DAG from job 6:

2022082391927-image.png

Spark tasks

A Spark task is a single unit of work or execution that runs in a Spark executor. It is the parallelism unit in Spark. Each stage contains one or multiple tasks. Each task is mapped to a single core and a partition in the dataset. 

In the above example, each stage only has one task because the sample input data is stored in one single small file in HDFS. If you have a data input with 1000 partitions, then at least 1000 tasks will be created for the operations. 

In the Executors tab in Spark UI, you will be able to see the tasks run stats. 

2022082384900-image.png

Execution plan for the save() job

The following is the final execution plan (DAG) for the job to save df to HDFS. 

2022082395219-image.png

I hope you now have a good understanding of these basic concepts in Spark. If you have any questions, feel free to leave a comment.

References

Web UI - Spark 3.3.0 Documentation

What is the difference between a transformation and an action in Apache Spark?

RDD Programming Guide - Spark 3.3.0 Documentation

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts