Executor, memory and core setting for optimal performance on Spark
Spark is adopted by tech giants to bring intelligence to their applications. Predictive analysis and machine learning along with traditional data warehousing is using spark as the execution engine behind the scenes.
I have been exploring spark since incubation and I have found spark core as an effective replacement for map reduce. Optimizing jobs in spark is tricky area as there are no many ways to do it. I have tried to optimize performance by sequencing code differently. But as Spark use lazy evaluation and DAGs are pre created during execution there are no may ways to alter it.
This blog share some information
about optimizing spark jobs - programmatically i.e. writing better code and playing around with hardware.
Rule of thumb for better performance
- Use per key aggregation - use reduce by instead of group by.Blog talks about the difference aggregate function options and why is some better than others.
- If an RDD(Resilient distributed data set ) is used multiple times with slightly different action/transformation, it is better to persist or cache(persist in memory). This avoid recreating RDD multiple times. Spark use LRU (least recent used algorithm) to clear out data in memory. So if an RDD which you know , if it is going to get used later it is better to store it in disk or memory.
Play around with executor , core and memory
Before getting into details let us get familiarized how spark runs a job through YARN
- Our job / application launched on a node called (driver program)
- Cluster manager i.e any resource manager YARN or MESOS find different resources/nodes where to run the programs. They just create a container in nodes where eventually spark jobs will run.
- Once allocated resource is connected to driver , it sends your application code defined in JAR created by us to each worker node's executor (A process launched for an application on a worker node, that runs tasks and keeps data in memory or disk storage across them. Each application has its own executors.)
- Since driver is per application , each application gets its own executor. Executor is nothing but a JVM which stay up and connected through out the process, this means multiple threads can run on same JVM. Task(A unit of work that will be sent to one executor) in a executor is a thread pool executing a common job.
- During task execution communication is directly between worker node and driver by passing cluster manager completely.
Now let us have a look at configure able properties.
spark.executor.cores Number of cores to use for the executor process
spark.executor.memory Amount of memory to use per executor process
spark.executor.instancesNumber of executor process
For further explanation I am using a cluster with configuration
- 20 Node
- 16 core per Node
- 64GB RAM per Node
- Smallest executor possible - (i.e smallest JVM ) - use 1 Core so for all 20 Nodes that will be 20 Core together. Now RAM will be divided for 16 cores i.e 64 GB / 16 core will be 4 GB RAM per core. So all together 20 Node* 1 Core * 4 GB RAM. This configuration works, but this cannot run multiple task in single JVM or not using multi threading capability of JVM
- Now in the above configuration let us use all 64GB RAM for core -This is also not recommended as we need to leave some memory for Hadoop daemon process.
So before deciding on how much RAM we need to use per executor, let us look at YARN's memory needs which is again configured by
spark.yarn.executor.memoryOverhead The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).Default is executorMemory * 0.10, with minimum of 384
So YARN needs say 6-10% i.e is 4-6 MB RAM . YARN also need 1 core for processing that leaves to 15 core / Node with say 60 GB RAM per core.
But 15 core per executor will cause bad HDFS input output, optimal will be 5-6 core per executor considering HDFS IO needs.
So cluster had 20 Node * 15 cores (took out 1 core for YARN) = 300 Cores
Number of executor is 300 Core/ 6 Core per executor = 50 Executors with 6 Core each
So each node has 50/20 executor 2.5 ~ 3 executors
memory per executor will be 60 /3 =20 multiplied by (1-.06) for heap overhead i.e 19 GB RAM