Save DataFrame as CSV File in Spark
Spark provides rich APIs to save data frames to many different formats of files such as CSV, Parquet, Orc, Avro, etc. CSV is commonly used in data application though nowadays binary formats are getting momentum. In this article, I am going to show you how to save Spark data frame as CSV file in both local file system and HDFS.
Spark CSV parameters
Refer to the following official documentation about all the parameters supported by CSV api in PySpark.
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=savemode#pyspark.sql.DataFrameReader.csv
Example code
In the following sample code, a data frame is created from a python list. The data frame is then saved to both local file path and HDFS. To save file to local path, specify 'file://'. By default, the path is HDFS path. There are also several options used:
- header: to specify whether include header in the file.
- sep: to specify the delimiter
- mode is used to specify the behavior of the save operation when data already exists.
append
: Append contents of thisDataFrame
to existing data.overwrite
: Overwrite existing data.ignore
: Silently ignore this operation if data already exists.error
orerrorifexists
(default case): Throw an exception if data already exists.
from pyspark.sql import SparkSession from pyspark.sql.types import ArrayType, StructField, StructType, StringType, IntegerType, DecimalType from decimal import Decimal appName = "Python Example - PySpark Save DataFrame as CSV" master = 'local' # Create Spark session spark = SparkSession.builder \ .master(master) \ .appName(appName) \ .getOrCreate() # List data = [('Category A', 1, Decimal(12.40)), ('Category B', 2, Decimal(30.10)), ('Category C', 3, Decimal(100.01)), ('Category A', 4, Decimal(110.01)), ('Category B', 5, Decimal(70.85)) ] # Create a schema for the dataframe schema = StructType([ StructField('Category', StringType(), False), StructField('ItemID', IntegerType(), False), StructField('Amount', DecimalType(scale=2), True) ]) # Convert list to data frame df = spark.createDataFrame(data, schema) df.show() # Save file local folder, delimiter by default is , df.write.format('csv').option('header',True).mode('overwrite').option('sep',',').save('file:///home/tangr/output.csv') # Save file to HDFS df.write.format('csv').option('header',True).mode('overwrite').option('sep','|').save('/output.csv')
Check the results
You can then check the results in HDFS and local file storage.
The following are examples from my WSL:
tangr@raymond-pc:~$ hadoop fs -ls / Found 4 items drwxr-xr-x - tangr supergroup 0 2019-12-03 20:40 /output.csv drwxr-xr-x - tangr supergroup 0 2019-08-25 12:11 /scripts drwxrwxr-x - tangr supergroup 0 2019-05-18 15:52 /tmp drwxr-xr-x - tangr supergroup 0 2019-08-25 09:35 /user tangr@raymond-pc:~$ hadoop fs -ls /output.csv Found 2 items -rw-r--r-- 1 tangr supergroup 0 2019-12-03 20:40 /output.csv/_SUCCESS -rw-r--r-- 1 tangr supergroup 120 2019-12-03 20:40 /output.csv/part-00000-508be2a7-a564-4603-b77c-f4de7c07dbcd-c000.csv tangr@raymond-pc:~$ hadoop fs -cat /output.csv/part-00000-508be2a7-a564-4603-b77c-f4de7c07dbcd-c000.csv Category|ItemID|Amount Category A|1|12.40 Category B|2|30.10 Category C|3|100.01 Category A|4|110.01 Category B|5|70.85 tangr@raymond-pc:~$ cd output.csv/ tangr@raymond-pc:~/output.csv$ ls _SUCCESS part-00000-bfbb44b0-1880-4400-a9c1-9c03180553a2-c000.csv tangr@raymond-pc:~/output.csv$ cat part-00000-bfbb44b0-1880-4400-a9c1-9c03180553a2-c000.csv Category,ItemID,Amount Category A,1,12.40 Category B,2,30.10 Category C,3,100.01 Category A,4,110.01 Category B,5,70.85