Tuning and Optimizing Spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark import StorageLevel
# Ignore warnings
import warnings
spark = SparkSession.builder.getOrCreate()
Managing Spark Configurations

  • we can check the spark environment variables configuration through Spark UI
  • check if the spark configuration is modifiable using spark.conf.isModifiable() method
  • We can change spark configurations using command line arguments to spark-submit or change it in the SparkSession
  • Order of precedence for the configurations - spark-defaults.conf, spark-submit, SparkSession

Setting and Getting Spark Configurations

spark.conf.set("spark.sql.shuffle.partitions", 6)

Scaling Spark for Large Workloads

The spark configurations affect three spark components
* Spark Driver * The executor * Shuffle service running on the executor

Static Versus Dynamic Resource Allocation

  • Providing spark configurations as command-line arguments to spark-submit will cap the limit of the resources (It is static)
  • If we use dynamic resource allocation, spark driver can request more or fewer compute resources
# Some configurations can be set using spark REPL
# Setting the config programatically
from pyspark import SparkConf
conf = (SparkConf()
conf.set("spark.dynamicAllocation.enabled", True)
spark = SparkSession.builder.config(conf=conf).getOrCreate()
Set the following for dynamic allocation
* spark.dynamicAllocation.enabled true
* spark.dynamicAllocation.minExecutors 2
* spark.dynamicAllocation.maxExecutors 20
* spark.dynamicAllocation.schedulerBacklogTimeout 1m
* spark.dynamicAllocation.executorIdleTimeout 2min

New executors will be requested each time the backlog timeout (spark.dynamicAllocation.schedulerBacklogTimeout) is exceeded. In this case, whenever there are pending tasks that have not been scheduled for over 1 minute, the driver will request that a new executor be launched to schedule backlogged tasks, up to a maximum of 20. By contrast, if an executor finishes a task and is idle for 2 minutes (spark.dynamicAllocation.executorIdleTimeout), the Spark driver will terminate it.

Configuring Spark Executors Memory

  • The amount of memory available to each executor is controlled by spark.executor.memory
  • Executor memory is divided into three sections:
    • Execution Memory (60%)
    • Storage Memory (40%)
    • Reserved Memory (300 MB)
  • we can adjust the configuration
  • If storage memory is not being used, spark can use it for execution memory and vice-versa

Spark Memory layout

conf.set("spark.memory.fraction", 0.5)
  • Execution Memory is used for shuffles, joins, sorts & aggregations
  • Storage memory is used for caching user data structures and partitions derived from DataFrame

Maximizing Spark Parallelism

  • A spark job will have many stages and within each stage there will be many tasks
  • Spark will schedule a task per core
  • Each task will process a partition
  • Ideal is to have as many partitions as there are cores on the executor

cores and partitions

  • The size of partition in spark is given by spark.sql.files.maxPartitionBytes (default is 128MB)
  • Decreasing the size will result in “small file problem” and increase disk I/O and performance degradation
  • We can control the number of partitions
path_to_data = '/home/thulasiram/personal/data_engineering/data/car_data.csv'
df = (spark.read.format("csv")
      .option("inferSchema", "true")
      .option("header", "true")
Shuffle Partitions

  • Shuffle partitions are created during the shuffle stage
  • By default shuffle partitions are set to 200
  • The default value is too high for small workloads
  • During groupBy() or join() operations network and disk I/O is consumed. Shuffle will spill results to executors local disks. SSD disks will boost the performance
  • Number of shuffle partitions will depend on the use case, data set, number of cores and executor memory available

Caching and Persistence

  • persist() provides more control on how and where the data is stored - in Memory or on Disk, serialized or unserialized etc.

  • cache() will store as many of the partitions read in memory across executors

  • DataFrame may be fractionally cached

  • Partitions cannot be fractionally cached

  • When all partitions are not cached, when we access the data, partitions that are not cached will be recomputed

  • DataFrame is not cached until we invoke an action

df = spark.range(1 * 10000).toDF("id").withColumn("square", col("id") * col("id"))
# df will be cached when we call an action
# It takes 0.5 seconds to compute the first time
# Second time will be accessed from the cache and hence faster


  • we can control where is data is stored using storageLevel argument
  • MEMORY_ONLY - Stored only in memory
  • MEMEORY_ONLY_SER - Data is serialized and stored in memory
  • MEMORY_AND_DISK - Data is stored in memory and if the memory is insufficient, then data is serialized and stored on disk
  • DISK_ONLY - Data is serialized and stored on disk
  • MEMORY_AND_DISK_SER - Data is serialized and stored in memory
df_2 = spark.range(1 * 10000).toDF("id").withColumn("square", col("id") * col("id"))
# To unpersist data
# we can cache tables and views as well
spark.sql("CACHE TABLE df_table")
spark.sql("SELECT count(*) from df_table").show()
|   10000|

When to Cache and Persist

  • Cache data which is frequently used
  • Dataframes used iteratively

When not to Cache and Persist

  • DataFrames that are too big to fit in memory
  • An inexpensive transformation not requiring frequent use