Spark cache() and persist() Differences
About data caching
In Spark, one feature is about data caching/persisting. It is done via API cache()
or persist()
. When either API is called against RDD or DataFrame/Dataset, each node in Spark cluster will store the partitions' data it computes in the storage based on storage level. This can usually improve performance especially if the cached data is used multiple times in different actions.
Both APIs exist with RDD, DataFrame (PySpark), Dataset (Scala/Java).
Differences between cache() and persist()
API cache()
is usually considered as a shorthand of persist()
with a default storage level.
The default storage level are different though for different objects:
- RDD: MEMORY_ONLY
- DataFrame/Dataset/Dataset<T>: MEMORY_AND_DISK
Hence, if default storage level is ok, we can use cache()
otherwise we need to use persist()
.
Example of using persist()
The following code snippet provides one example of persisting DataFrame in memory.
from pyspark.sql import SparkSession from pyspark import SparkConf, StorageLevel app_name = "PySpark - persist() Example" master = "local[8]" conf = SparkConf().setAppName(app_name)\ .setMaster(master) spark = SparkSession.builder.config(conf=conf) \ .getOrCreate() spark.sparkContext.setLogLevel("WARN") df = spark.createDataFrame([[1, 'A'], [2, 'B']], ['id', 'attr']) df.persist(storageLevel=StorageLevel.MEMORY_ONLY) print(df.count()) df.groupBy('id').count().show() df.explain(mode='extended')
Output:
2 +---+-----+ | id|count| +---+-----+ | 1| 1| | 2| 1| +---+-----+ == Parsed Logical Plan == LogicalRDD [id#0L, attr#1], false == Analyzed Logical Plan == id: bigint, attr: string LogicalRDD [id#0L, attr#1], false == Optimized Logical Plan == InMemoryRelation [id#0L, attr#1], StorageLevel(memory, 1 replicas) +- *(1) Scan ExistingRDD[id#0L,attr#1] == Physical Plan == InMemoryTableScan [id#0L, attr#1] +- InMemoryRelation [id#0L, attr#1], StorageLevel(memory, 1 replicas) +- *(1) Scan ExistingRDD[id#0L,attr#1]
From the output, we can tell that Storage level is set as memory with only one replica.
About storage level
Different storage levels provide different balance between memory and CPU. We need to choose accordingly.
Storage Level | PySpark | Scala | Meaning |
---|---|---|---|
MEMORY_ONLY | Yes | Yes | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level. |
MEMORY_AND_DISK | Yes | Yes | Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed. |
MEMORY_ONLY_SER | No* | Yes | Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read. |
MEMORY_AND_DISK_SER | No* | Yes | Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed. |
DISK_ONLY | Yes | Yes | Store the RDD partitions only on disk. |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | Yes | Yes | Same as the levels above, but replicate each partition on two cluster nodes. |
OFF_HEAP (experimental) | No | Yes | Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled. |
*For PySpark, the stored objects will always be serialized with Pickle library; thus it doesn't matter whether a serialization level is chosen or not.
How to choose storage level
Storage levels are used to provide different trade-offs between CPU and memory. The official website recommend using the following approach to choose the right level for your Spark application.
- If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
- If not and if your application is Java or Scala, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access.
- Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
- Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.
References
RDD Programming Guide - Spark 3.3.0 Documentation