Spark cache() and persist() Differences

Kontext Kontext event 2022-08-21 visibility 2,385
more_vert

About data caching

In Spark, one feature is about data caching/persisting. It is done via API cache() or persist(). When either API is called against RDD or DataFrame/Dataset, each node in Spark cluster will store the partitions' data it computes in the storage based on storage level. This can usually improve performance especially if the cached data is used multiple times in different actions. 

Both APIs exist with RDD, DataFrame (PySpark), Dataset (Scala/Java).

Differences between cache() and persist()

API cache() is usually considered as a shorthand of persist() with a default storage level.

The default storage level are different though for different objects:

  • RDD: MEMORY_ONLY
  • DataFrame/Dataset/Dataset<T>: MEMORY_AND_DISK

Hence, if default storage level is ok, we can use cache() otherwise we need to use persist().

Example of using persist()

The following code snippet provides one example of persisting DataFrame in memory.

from pyspark.sql import SparkSession
from pyspark import SparkConf, StorageLevel

app_name = "PySpark - persist() Example"
master = "local[8]"

conf = SparkConf().setAppName(app_name)\
    .setMaster(master)


spark = SparkSession.builder.config(conf=conf) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

df = spark.createDataFrame([[1, 'A'], [2, 'B']], ['id', 'attr'])
df.persist(storageLevel=StorageLevel.MEMORY_ONLY)
print(df.count())
df.groupBy('id').count().show()
df.explain(mode='extended')

Output:

2
+---+-----+
| id|count|
+---+-----+
|  1|    1|
|  2|    1|
+---+-----+

== Parsed Logical Plan ==
LogicalRDD [id#0L, attr#1], false

== Analyzed Logical Plan ==
id: bigint, attr: string
LogicalRDD [id#0L, attr#1], false

== Optimized Logical Plan ==
InMemoryRelation [id#0L, attr#1], StorageLevel(memory, 1 replicas)
   +- *(1) Scan ExistingRDD[id#0L,attr#1]

== Physical Plan ==
InMemoryTableScan [id#0L, attr#1]
   +- InMemoryRelation [id#0L, attr#1], StorageLevel(memory, 1 replicas)
         +- *(1) Scan ExistingRDD[id#0L,attr#1]

From the output, we can tell that Storage level is set as memory with only one replica. 

About storage level

Different storage levels provide different balance between memory and CPU. We need to choose accordingly.

Storage LevelPySparkScalaMeaning
MEMORY_ONLY YesYes Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK YesYes Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER
No*Yes Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER
No*Yes Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY YesYes Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. YesYes Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) NoYes Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.

*For PySpark, the stored objects will always be serialized with Pickle library; thus it doesn't matter whether a serialization level is chosen or not.

How to choose storage level

Storage levels are used to provide different trade-offs between CPU and memory. The official website recommend using the following approach to choose the right level for your Spark application.

  • If your RDDs fit comfortably with the default storage level (MEMORY_ONLY), leave them that way. This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
  • If not and if your application is Java or Scala, try using MEMORY_ONLY_SER and selecting a fast serialization library to make the objects much more space-efficient, but still reasonably fast to access. 
  • Don’t spill to disk unless the functions that computed your datasets are expensive, or they filter a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from disk.
  • Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve requests from a web application). All the storage levels provide full fault tolerance by recomputing lost data, but the replicated ones let you continue running tasks on the RDD without waiting to recompute a lost partition.

References

RDD Programming Guide - Spark 3.3.0 Documentation

More from Kontext
comment Comments
No comments yet.

Please log in or register to comment.

account_circle Log in person_add Register

Log in with external accounts