Raymond Raymond

Run Multiple Python Scripts PySpark Application with yarn-cluster Mode

event 2019-08-25 visibility 12,311 comment 6 insights toc
more_vert
insights Stats

When submitting Spark applications to YARN cluster, two deploy modes can be used: client and cluster. For client mode (default), Spark driver runs on the machine that the Spark application was submitted while for cluster mode, the driver runs on a random node in a cluster. On this page, I am going to show you how to submit an PySpark application with multiple Python script files in both modes.

PySpark application

The application is very simple with two scripts file.

pyspark_example.py

from pyspark.sql import SparkSession
from pyspark_example_module import test_function

appName = "Python Example - PySpark Row List to Pandas Data Frame"

# Create Spark session
spark = SparkSession.builder \
    .appName(appName) \
    .getOrCreate()

# Call the function
test_function()

This script file references another script file named pyspark_example_module.py.  It creates a Spark session and then call the function from the other module.

pyspark_example_module.py

This script file is a simple Python script file with a simple function in it.

def test_function():
    """
    Test function
    """
    print("This is a test function")

Run the application with local master

To run the application with local master, we can simply call spark-submit CLI in the script folder.

spark-submit pyspark_example.py

Run the application in YARN with deployment mode as client

Deploy mode is specified through argument --deploy-mode. --py-files is used to specify other Python script files used in this application.

spark-submit --master yarn --deploy-mode client --py-files pyspark_example_module.py  pyspark_example.py

Run the application in YARN with deployment mode as cluster

To run the application in cluster mode, simply change the argument --deploy-mode to cluster.

spark-submit --master yarn --deploy-mode cluster --py-files pyspark_example_module.py  pyspark_example.py

The scripts will complete successfully like the following log shows:

2019-08-25 12:07:09,047 INFO yarn.Client:
          client token: N/A
          diagnostics: N/A
          ApplicationMaster host: ***
          ApplicationMaster RPC port: 3047
          queue: default
          start time: 1566698770726
          final status: SUCCEEDED
          tracking URL: http://localhost:8088/proxy/application_1566698727165_0001/
          user: tangr

image

In YARN, the output is shown too as the above screenshot shows.

Submit scripts to HDFS so that it can be accessed by all the workers

When submit the application through Hue Oozie workflow, you usually can use HDFS file locations.

Use the following command to upload the script files to HDFS:

hadoop fs -copyFromLocal *.py /scripts

Both scripts are uploaded to the /scripts folder in HDFS:

-rw-r--r--   1 tangr supergroup        288 2019-08-25 12:11 /scripts/pyspark_example.py
-rw-r--r--   1 tangr supergroup         91 2019-08-25 12:11 /scripts/pyspark_example_module.py

And then run the following command to use the HDFS scripts:

spark-submit --master yarn --deploy-mode cluster --py-files hdfs://localhost:19000/scripts/pyspark_example_module.py  hdfs://localhost:19000/scripts/pyspark_example.py

The application should be able to complete successfully without errors.

If you use Hue, follow this page to set up your Spark action: How to Submit Spark jobs with Spark on YARN and Oozie.

Replace the file names accordingly:

  • Jar/py names: pyspark_example.py
  • Files: /scripts/pyspark_example_module.py
  • Options list: --py-files pyspark_example_module.py. If you have multiple files, sperate them with comma.
  • In the settings of this action, change master and deploy mode accordingly.

*Image from gethue.com.

More from Kontext
comment Comments
Raymond Raymond #1547 access_time 3 years ago more_vert

I understand your problems very well now. I think you are confused by a) HDFS path vs. a local PATH; 2) accessing/writing data using PySpark vs. pure Python. 

There are two problems with your code:

  1. As I explained before, open is a pure Python function and it can only be used to read local files on the node and it cannot read HDFS path. This was why I suggested you to pass the file in submit command and then the file will be passed to driver and executor containers and then you can use open command to read it. However then my question would be what is the purpose of using Spark if all your input and output are done locally?
  2. When you write it, you will also write it into the node server where your Spark master applications resides. 

The node in the above two points can be randomly picked up thus even you can write the file successfully you won't be able to retrieve easily.

Thus to resolve your problem:

1) Read using Spark.read from HDFS not open. Or you may try some HDFS python library (without Spark) however I won't recommend it as you may hit some problems (permission setup) and I have never used this library:

import pydoop.hdfs as hdfs

with hdfs.open('/user/myuser/filename') as f:
    for line in f:
        do_something(line)

Another Python library you can potentially use it hdfs.

2) When you write the data, there several possible ways:

A) - You can only write Spark with the supported format and the one you used is not supported. I am not sure about the format you mentioned, if you can use CSV, JSON, etc. to save the file into HDFS using DataFrameWriter (df.write) APIs, you can then use HDFS command or pure Python HDFS client libraries to copy the file into local server.

B) - Write Spark DataFrame into a database using JDBC and then retrieve the data using Python. The retrieve script need to run in local Python environment instead of using PySpark.

C) - It's possible to customize Spark writer from Spark 3.0 but if are not familiar with Spark APIs, I won't recommend this. 

format_quote

person venu access_time 3 years ago

Hi Raymond!

I can share the overall structure of my code.

1) Importing required libraries

2) creating spark session

spark = SparkSession.builder.appName('God').getOrCreate()

3) loading data from database to data frame

Using Spark.read()-->successfully able to pull data

4) cleaning the data and doing transformation

It is also successful. you can mark my word on this.

5) and after transforming the data(Internal use), trying to write it in a file and want to store the output to the desired location.

Failing here

with open('path/to/hdfs/filename') as file:#also tried with local path

       file.write(g.serialize(format='turtle'))#Please ignore on what is                                                                         written inside file.write

It shows No such file or directory exists. but in client mode, it is able to create the file provided by the local path. I feel open() is not able to look outside current pwd.

On other pages everyone is using 'df.write.format' but I want my output to be written to turtle file which is neither text,CSV, parquet etc.

Sorry, I can't share the code. But I hope I have tried to explain as much as possible. If still any doubt/question please reply here.


V venu gopal #1546 access_time 3 years ago more_vert

Hi Raymond!

I can share the overall structure of my code.

1) Importing required libraries

2) creating spark session

spark = SparkSession.builder.appName('God').getOrCreate()

3) loading data from database to data frame

Using Spark.read()-->successfully able to pull data

4) cleaning the data and doing transformation

It is also successful. you can mark my word on this.

5) and after transforming the data(Internal use), trying to write it in a file and want to store the output to the desired location.

Failing here

with open('path/to/hdfs/filename') as file:#also tried with local path

       file.write(g.serialize(format='turtle'))#Please ignore on what is                                                                         written inside file.write

It shows No such file or directory exists. but in client mode, it is able to create the file provided by the local path. I feel open() is not able to look outside current pwd.

On other pages everyone is using 'df.write.format' but I want my output to be written to turtle file which is neither text,CSV, parquet etc.

Sorry, I can't share the code. But I hope I have tried to explain as much as possible. If still any doubt/question please reply here.


format_quote

person Raymond access_time 3 years ago

Hi Venu,

The code example you provided to me are local file write which has nothing to do with Spark:

with open("/user/user_name/myfile.ttl",mode='w+') as file:# It's a turtle file.

    file.write("This is truth")

The above lines will run in driver application container in the Spark cluster. 

That is why I made the comments before.

For me to illustrate more, can you share complete script if it is okay?

Raymond Raymond #1545 access_time 3 years ago more_vert

Hi Venu,

The code example you provided to me are local file write which has nothing to do with Spark:

with open("/user/user_name/myfile.ttl",mode='w+') as file:# It's a turtle file.

    file.write("This is truth")

The above lines will run in driver application container in the Spark cluster. 

That is why I made the comments before.

For me to illustrate more, can you share complete script if it is okay?

format_quote

person venu access_time 3 years ago

Hi Raymond

Thanks for the reply!

I have some doubts. (.txt) is just an example but actually, I want to store .ttl type of file(turtle file)Want to store RDF(Resource descriptive framework) Triples. I don't want to read the file. I already have read the data using Spark. read and stored it in a data frame. After transforming the data I just want to write the output of the program to a file in spark-cluster mode.

Note:- I have tried your suggestion but still it gives the same error.

Can you please provide some more detailed explanation/solution?

V venu gopal #1544 access_time 3 years ago more_vert

Hi Raymond

Thanks for the reply!

I have some doubts. (.txt) is just an example but actually, I want to store .ttl type of file(turtle file)Want to store RDF(Resource descriptive framework) Triples. I don't want to read the file. I already have read the data using Spark. read and stored it in a data frame. After transforming the data I just want to write the output of the program to a file in spark-cluster mode.

Note:- I have tried your suggestion but still it gives the same error.

Can you please provide some more detailed explanation/solution?

format_quote

person Raymond access_time 3 years ago

Hi Venu,

When you run the job locally, your Python application can reference the local file path that your master can reach.

When you submit the job to run in a cluster and also master container is in the cluster, you can only reference the local file paths in the server where master container is spin up.

So to fix your issue, you can upload your file into HDFS and use spark.read APIs to read the data; alternatively, you can pass the file when you submit the application as I did in this article:

--py-files file.txt

In your code, you can reference it with path file.txt.

I would recommend uploading the file into HDFS first. 

Raymond Raymond #1543 access_time 3 years ago more_vert

Hi Venu,

When you run the job locally, your Python application can reference the local file path that your master can reach.

When you submit the job to run in a cluster and also master container is in the cluster, you can only reference the local file paths in the server where master container is spin up.

So to fix your issue, you can upload your file into HDFS and use spark.read APIs to read the data; alternatively, you can pass the file when you submit the application as I did in this article:

--py-files file.txt

In your code, you can reference it with path file.txt.

I would recommend uploading the file into HDFS first. 

format_quote

person venu access_time 3 years ago

Hi,

I'm running this script in spark cluster mode on a server.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('God').getOrCreate

with open("/user/user_name/myfile.ttl",mode='w+') as file:# It's a turtle file.

    file.write("This is truth")

Trying to run this script in spark cluster mode

spark-submit --master yarn --deploy-mode cluster h1.py

Getting error: No such file or directory exists. I have provided the correct path. checked several times.

Even though I have checked that the directory exists & also tried with different hdfs paths too.

code works perfectly fine in client mode. It seems that the executor node is not able to find the mentioned path. Can we use with open to write files in cluster mode? If not then how to write files in cluster mode.

Kindly help me on this.



V venu gopal #1542 access_time 3 years ago more_vert

Hi,

I'm running this script in spark cluster mode on a server.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('God').getOrCreate

with open("/user/user_name/myfile.ttl",mode='w+') as file:# It's a turtle file.

    file.write("This is truth")

Trying to run this script in spark cluster mode

spark-submit --master yarn --deploy-mode cluster h1.py

Getting error: No such file or directory exists. I have provided the correct path. checked several times.

Even though I have checked that the directory exists & also tried with different hdfs paths too.

code works perfectly fine in client mode. It seems that the executor node is not able to find the mentioned path. Can we use with open to write files in cluster mode? If not then how to write files in cluster mode.

Kindly help me on this.



Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts