Run Multiple Python Scripts PySpark Application with yarn-cluster Mode
insights Stats
Apache Spark installation guides, performance tuning tips, general tutorials, etc.
*Spark logo is a registered trademark of Apache Spark.
When submitting Spark applications to YARN cluster, two deploy modes can be used: client and cluster. For client mode (default), Spark driver runs on the machine that the Spark application was submitted while for cluster mode, the driver runs on a random node in a cluster. On this page, I am going to show you how to submit an PySpark application with multiple Python script files in both modes.
PySpark application
The application is very simple with two scripts file.
pyspark_example.py
from pyspark.sql import SparkSession from pyspark_example_module import test_function appName = "Python Example - PySpark Row List to Pandas Data Frame" # Create Spark session spark = SparkSession.builder \ .appName(appName) \ .getOrCreate() # Call the function test_function()
This script file references another script file named pyspark_example_module.py. It creates a Spark session and then call the function from the other module.
pyspark_example_module.py
This script file is a simple Python script file with a simple function in it.
def test_function(): """ Test function """ print("This is a test function")
Run the application with local master
To run the application with local master, we can simply call spark-submit CLI in the script folder.
spark-submit pyspark_example.py
Run the application in YARN with deployment mode as client
Deploy mode is specified through argument --deploy-mode. --py-files is used to specify other Python script files used in this application.
spark-submit --master yarn --deploy-mode client --py-files pyspark_example_module.py pyspark_example.py
Run the application in YARN with deployment mode as cluster
To run the application in cluster mode, simply change the argument --deploy-mode to cluster.
spark-submit --master yarn --deploy-mode cluster --py-files pyspark_example_module.py pyspark_example.py
The scripts will complete successfully like the following log shows:
2019-08-25 12:07:09,047 INFO yarn.Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: ***
ApplicationMaster RPC port: 3047
queue: default
start time: 1566698770726
final status: SUCCEEDED
tracking URL: http://localhost:8088/proxy/application_1566698727165_0001/
user: tangr
In YARN, the output is shown too as the above screenshot shows.
Submit scripts to HDFS so that it can be accessed by all the workers
When submit the application through Hue Oozie workflow, you usually can use HDFS file locations.
Use the following command to upload the script files to HDFS:
hadoop fs -copyFromLocal *.py /scripts
Both scripts are uploaded to the /scripts folder in HDFS:
-rw-r--r-- 1 tangr supergroup 288 2019-08-25 12:11 /scripts/pyspark_example.py
-rw-r--r-- 1 tangr supergroup 91 2019-08-25 12:11 /scripts/pyspark_example_module.py
And then run the following command to use the HDFS scripts:
spark-submit --master yarn --deploy-mode cluster --py-files hdfs://localhost:19000/scripts/pyspark_example_module.py hdfs://localhost:19000/scripts/pyspark_example.py
The application should be able to complete successfully without errors.
If you use Hue, follow this page to set up your Spark action: How to Submit Spark jobs with Spark on YARN and Oozie.
Replace the file names accordingly:
- Jar/py names: pyspark_example.py
- Files: /scripts/pyspark_example_module.py
- Options list: --py-files pyspark_example_module.py. If you have multiple files, sperate them with comma.
- In the settings of this action, change master and deploy mode accordingly.
*Image from gethue.com.
person venu access_time 3 years ago
Hi Raymond!
I can share the overall structure of my code.
1) Importing required libraries
2) creating spark session
spark = SparkSession.builder.appName('God').getOrCreate()
3) loading data from database to data frame
Using Spark.read()-->successfully able to pull data
4) cleaning the data and doing transformation
It is also successful. you can mark my word on this.
5) and after transforming the data(Internal use), trying to write it in a file and want to store the output to the desired location.
Failing here
with open('path/to/hdfs/filename') as file:#also tried with local path
file.write(g.serialize(format='turtle'))#Please ignore on what is written inside file.write
It shows No such file or directory exists. but in client mode, it is able to create the file provided by the local path. I feel open() is not able to look outside current pwd.
On other pages everyone is using 'df.write.format' but I want my output to be written to turtle file which is neither text,CSV, parquet etc.
Sorry, I can't share the code. But I hope I have tried to explain as much as possible. If still any doubt/question please reply here.
Hi Raymond!
I can share the overall structure of my code.
1) Importing required libraries
2) creating spark session
spark = SparkSession.builder.appName('God').getOrCreate()
3) loading data from database to data frame
Using Spark.read()-->successfully able to pull data
4) cleaning the data and doing transformation
It is also successful. you can mark my word on this.
5) and after transforming the data(Internal use), trying to write it in a file and want to store the output to the desired location.
Failing here
with open('path/to/hdfs/filename') as file:#also tried with local path
file.write(g.serialize(format='turtle'))#Please ignore on what is written inside file.write
It shows No such file or directory exists. but in client mode, it is able to create the file provided by the local path. I feel open() is not able to look outside current pwd.
On other pages everyone is using 'df.write.format' but I want my output to be written to turtle file which is neither text,CSV, parquet etc.
Sorry, I can't share the code. But I hope I have tried to explain as much as possible. If still any doubt/question please reply here.
person Raymond access_time 3 years ago
Hi Venu,
The code example you provided to me are local file write which has nothing to do with Spark:
with open("/user/user_name/myfile.ttl",mode='w+') as file:# It's a turtle file.
file.write("This is truth")
The above lines will run in driver application container in the Spark cluster.
That is why I made the comments before.
For me to illustrate more, can you share complete script if it is okay?
Hi Venu,
The code example you provided to me are local file write which has nothing to do with Spark:
with open("/user/user_name/myfile.ttl",mode='w+') as file:# It's a turtle file.
file.write("This is truth")
The above lines will run in driver application container in the Spark cluster.
That is why I made the comments before.
For me to illustrate more, can you share complete script if it is okay?
person venu access_time 3 years ago
Hi Raymond
Thanks for the reply!
I have some doubts. (.txt) is just an example but actually, I want to store .ttl type of file(turtle file)Want to store RDF(Resource descriptive framework) Triples. I don't want to read the file. I already have read the data using Spark. read and stored it in a data frame. After transforming the data I just want to write the output of the program to a file in spark-cluster mode.
Note:- I have tried your suggestion but still it gives the same error.
Can you please provide some more detailed explanation/solution?
Hi Raymond
Thanks for the reply!
I have some doubts. (.txt) is just an example but actually, I want to store .ttl type of file(turtle file)Want to store RDF(Resource descriptive framework) Triples. I don't want to read the file. I already have read the data using Spark. read and stored it in a data frame. After transforming the data I just want to write the output of the program to a file in spark-cluster mode.
Note:- I have tried your suggestion but still it gives the same error.
Can you please provide some more detailed explanation/solution?
person Raymond access_time 3 years ago
Hi Venu,
When you run the job locally, your Python application can reference the local file path that your master can reach.
When you submit the job to run in a cluster and also master container is in the cluster, you can only reference the local file paths in the server where master container is spin up.
So to fix your issue, you can upload your file into HDFS and use spark.read APIs to read the data; alternatively, you can pass the file when you submit the application as I did in this article:
--py-files file.txt
In your code, you can reference it with path file.txt.
I would recommend uploading the file into HDFS first.
Hi Venu,
When you run the job locally, your Python application can reference the local file path that your master can reach.
When you submit the job to run in a cluster and also master container is in the cluster, you can only reference the local file paths in the server where master container is spin up.
So to fix your issue, you can upload your file into HDFS and use spark.read APIs to read the data; alternatively, you can pass the file when you submit the application as I did in this article:
--py-files file.txt
In your code, you can reference it with path file.txt.
I would recommend uploading the file into HDFS first.
person venu access_time 3 years ago
Hi,
I'm running this script in spark cluster mode on a server.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('God').getOrCreate
with open("/user/user_name/myfile.ttl",mode='w+') as file:# It's a turtle file.
file.write("This is truth")
Trying to run this script in spark cluster mode
spark-submit --master yarn --deploy-mode cluster h1.py
Getting error: No such file or directory exists. I have provided the correct path. checked several times.
Even though I have checked that the directory exists & also tried with different hdfs paths too.
code works perfectly fine in client mode. It seems that the executor node is not able to find the mentioned path. Can we use with open to write files in cluster mode? If not then how to write files in cluster mode.
Kindly help me on this.
Hi,
I'm running this script in spark cluster mode on a server.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('God').getOrCreate
with open("/user/user_name/myfile.ttl",mode='w+') as file:# It's a turtle file.
file.write("This is truth")
Trying to run this script in spark cluster mode
spark-submit --master yarn --deploy-mode cluster h1.py
Getting error: No such file or directory exists. I have provided the correct path. checked several times.
Even though I have checked that the directory exists & also tried with different hdfs paths too.
code works perfectly fine in client mode. It seems that the executor node is not able to find the mentioned path. Can we use with open to write files in cluster mode? If not then how to write files in cluster mode.
Kindly help me on this.
I understand your problems very well now. I think you are confused by a) HDFS path vs. a local PATH; 2) accessing/writing data using PySpark vs. pure Python.
There are two problems with your code:
The node in the above two points can be randomly picked up thus even you can write the file successfully you won't be able to retrieve easily.
Thus to resolve your problem:
1) Read using Spark.read from HDFS not open. Or you may try some HDFS python library (without Spark) however I won't recommend it as you may hit some problems (permission setup) and I have never used this library:
Another Python library you can potentially use it hdfs.
2) When you write the data, there several possible ways:
A) - You can only write Spark with the supported format and the one you used is not supported. I am not sure about the format you mentioned, if you can use CSV, JSON, etc. to save the file into HDFS using DataFrameWriter (df.write) APIs, you can then use HDFS command or pure Python HDFS client libraries to copy the file into local server.
B) - Write Spark DataFrame into a database using JDBC and then retrieve the data using Python. The retrieve script need to run in local Python environment instead of using PySpark.
C) - It's possible to customize Spark writer from Spark 3.0 but if are not familiar with Spark APIs, I won't recommend this.