Spark Basics - Application, Driver, Executor, Job, Stage and Task Walkthrough
The first concept to understand is Spark Application. An Spark application is a program built with Spark APIs and runs in a Spark compatible cluster/environment. It can be a PySpark script, a Java application, a Scala application, a SparkSession started by
spark-sql command, a AWS EMR Step, etc.
A Spark application consists of a driver container and executors.
Example PySpark script
The following is a sample PySpark script named spark-basics.py.
from pyspark.sql import SparkSession appName = "PySpark App - Spark Basics" master = "local" spark = SparkSession.builder \ .appName(appName) \ .master(master) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") # Create two dataframe df1 = spark.read.format('parquet').load( '/data/test-data') df2 = spark.read.format('parquet').load( '/data/crud_table_stg') # Joins df = df1.join(df2, on='id', how='inner') df.show() # Print out the total record count print(df.count()) # Save df to HDFS df.write.format('parquet').mode('overwrite').save( '/data/spark-basics')
This script will perform these processes:
- Create a DataFrame named
df1by reading data from HDFS.
- Create another DataFrame named
df2from another HDFS path.
- Create a new DataFrame named
dfusing inner join between
df2and then print it out.
- Finally save
dfinto HDFS using Parquet format.
Run the application using the following command:
This script will be run as a Spark application once submitted. If you change the master to yarn, the command line will print out logs similar as the following output:
2022-08-23T17:28:37,961 INFO [Thread-5] org.apache.spark.deploy.yarn.Client - Application report for application_1661218991004_0001 (state: ACCEPTED) 2022-08-23T17:28:37,967 INFO [Thread-5] org.apache.spark.deploy.yarn.Client - client token: N/A diagnostics: [Tue Aug 23 17:28:37 +1000 2022] Scheduler has assigned a container for AM, waiting for AM container to be launched ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1661239716517 final status: UNDEFINED tracking URL: http://localhost:8088/proxy/application_1661218991004_0001/ user: kontext
'application_1661218991004_0001' is the application identifier in YARN. For simplicity, I am just hosting the driver application in local.
In Spark History Server, we can find out the run time information of the application. For this case, my Spark application ID for this script is local-1661240132793.
We will look into more details later.
Spark driver and executors
Driver and executors are important to understand before we deep dive into details.
A Spark driver is the process where the
main() method of your Spark application runs. It creates SparkSession and SparkContext objects and convert the code to transformation and action operations. It also create logical and physical plans and schedule and coordinate the tasks with Cluster Manager.
A Spark executor just simply run the tasks in executor nodes of the cluster.
The following diagram shows how drivers and executors are located in a cluster:
For this scenario, there is only one driver (no executors) involved as I am running the code in local master mode. In normal production nodes, you should be able to notice all executors and drivers in Executors tab of your Spark application UI:
Transformations and actions
When Spark driver container application converts code to operations, it creates two types: transformation and action.
Transformation operations are lazy executed and return a DataFrame, Dataset or an RDD. When we create a chain of transformations, no data will be executed until an action is called. Since it is lazy-evaluated, transformations will need to be recomputed in reuse unless data is cached or persisted.
Actions will execute all the computation on the dataset to generate values that will be returned to the driver program. Transformation's output is an input of Actions.
For a complete list of transformations, refer to Transformations. For example,
coalesce are all transformations.
On the other hand,
save are all actions.
In the above example, the following three lines will create three actions but they are not the only actions involved in the application.
df.show() # Print out the total record count print(df.count()) # Save df to HDFS df.write.format('parquet').mode('overwrite').save( '/data/spark-basics')
Spark jobs and stages
A Spark job is a parallel computation of tasks. Each action operation will create one Spark job.
Each Spark job will be converted to a DAG which includes one or more stages.
A Spark stage is a smaller sets of tasks that depend on each other. Stages are created for each job based on shuffle boundaries, i.e. what operations can be performed serially or in parallel. Not all Spark operations or actions can happen in a single stage without data shuffling, thus they may be divided into multiple stages. For example, an operation involves data shuffling will lead to the creation of a new stage. If there is no data shuffling for the job, there is usually one stage created.
For this example application, 8 jobs were created as the following screenshot shows:
The following screenshot is the DAG from job 6:
A Spark task is a single unit of work or execution that runs in a Spark executor. It is the parallelism unit in Spark. Each stage contains one or multiple tasks. Each task is mapped to a single core and a partition in the dataset.
In the above example, each stage only has one task because the sample input data is stored in one single small file in HDFS. If you have a data input with 1000 partitions, then at least 1000 tasks will be created for the operations.
In the Executors tab in Spark UI, you will be able to see the tasks run stats.
Execution plan for the save() job
The following is the final execution plan (DAG) for the job to save
df to HDFS.
I hope you now have a good understanding of these basic concepts in Spark. If you have any questions, feel free to leave a comment.