# Spark Streaming Dynamic Resource Allocation

Posted by BigData Blog on November 28, 2018

## Problem Statement

DRA has already been implemented since Spark 1.2 . However the existing Spark DRA on Yarn implementation does not embody the specific property of Spark Streaming.

Spark DRA works when there are some executors being idle for removeExecutorInterval time,then they will be removed or when there is a backlog of pending tasks waiting to be rescheduled,then new executors will be added。This mechanism works fine for Long-Time stage。 Spark Streaming is known as micro batch processing,hence most of time the duration of batch is small. It will cause churn when removing executors at the end of the batch and adding executors at the beginning of next batch.

In real-time data processing, data slowly increase or slowly down,so the adjacent batches have almost the same data to process.

The perfect result of spark streaming is processing time equal to duration. So the key concept is reducing/increasing resource until processing time infinitely close to duration .

## Goals

The goal is to make processing time infinitely close to duration by reducing/increasing resource in spark streaming . And we also hope having a reasonable stable time after short-time adjustment which means resource adjustment rarely happens

## Design Summary

The same as spark DRA, Spark Streaming DRA is also disabled,but can be enabled by a new Spark conf property spark.streaming.dynamicAllocation.enabled. All related Spark Streaming DRA properties introduced will bepackaged in the spark.streaming.dynamicAllocation.* namespace

The job in Spark Streaming is time-sensitive . Once there is a batch delayed it triggers Adding-Executors action. This Action will request the number of spark.streaming.dynamicAllocation.maxExecutors executors from Yarn immediately in greedy way since we hope we can eliminate the delay as soon as possible ,at the same time,we also should take care of the situation when requesting resource with multi rounds may have resource scarcity in shared cluster.

Removing Executors

As mentioned before, Spark Streaming, time-sensitive , so the action removing executors is not allowed to consume too much time of duration. So executors marked as removed will be added to pendingToRemove set so new tasks will not be launched in them ,then we acknowledge yarn to kill them asynchronously.

## Algorithm of DRA

To calculate the number of executor should be removed every round,the formula is used as following

val totalRemoveExecutorNum = Math.round(
currentExecutors * (
(duration.toDouble - processDuration) / duration - reserveRate)
)
val actualShouldRemoveExecutorNumInThisRound = totalRemoveExecutorNum / releaseRounds


In this formula, we suppose processing time has a strong relationship with executor number,however,they are not linear relationship. To fix this, we add some new parameters like reserveRate and releaseRounds to make sure we fit this non-linear relationship .

We also provide a heuristic strategy to reduce executor number. This formula will be calculate in every round, so every round the number will be readjusted circularly ,and finally actualShouldRemoveExecutorNumInThisRound will converge to zero .

### Implementation of DRA

New classes should be added in spark streaming module:

package org.apache.spark.streaming
private[spark] class StreamingExecutorAllocationManager(client: ExecutorAllocationClient,
duration: Long,
steamingListenerBus: StreamingListenerBus,
listenerBus: LiveListenerBus,
conf: SparkConf) extends Logging {
allocationManager =>
......
}

private class StreamingExecutorAllocationListener extends SparkListener {
......
}

private class StreamingSchedulerListener extends StreamingListener {
.....
}



StreamingExecutorAllocationManager is initialed in StreamingContext

Class affected in spark core module:

package org.apache.spark
private[spark] trait ExecutorAllocationClient {
//blackList who can remove executors immediately make Yarn remove them asynchronously possible
def addExecutorToPendingStatus(executorId: String): Unit = {}

//cause spark streaming is initial late then spark core module,
// we need to know how many executors we already have when
// spark streaming is initaled
def executors(): List[String] = { List() }
}


SparkContext and CoarseGrainedSchedulerBackend both are affected because they are extend from ExecutorAllocationClient.

JobScheduler is also should be modified so before batch job submitted we can adjust resource.

package org.apache.spark.streaming.scheduler
private[streaming]class JobScheduler(val ssc: StreamingContext) extends Logging {

def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
} else {
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)

//before job submitted,we should compute resource actually required
ssc.executorAllocationManager match {
case Some(eam)=>  eam.run()
case None => //do nothing
}
.........
}
}
......}


## DRA Available Properties

Enable DRA:

spark.streaming.dynamicAllocation.enabled=true


Upper/Lower bound for the number of executors if dynamic allocation is enabled.

spark.streaming.dynamicAllocation.minExecutors=0
spark.streaming.dynamicAllocation.maxExecutors=50


More message will be printed in log if DRA is enabled. Default is false

spark.streaming.dynamicAllocation.debug=true


Rounds(Batches) will be used to release the resource calculated in current round. Default is 5

spark.streaming.dynamicAllocation.releaseRounds=5


The number of rounds(Batches) should be remembered. We can increase this number if batch processing time is unstable . this number affects processDuration in (duration.toDouble - processDuration) .Default is 1

spark.streaming.dynamicAllocation.rememberBatchSize=1


DRA delays to work when the specific number of rounds has been submitted. Default is 10

spark.streaming.dynamicAllocation.delay.rounds=10


Make sure the resource is more than reserveRate * current number of Executors. More useful than minExecutorNumber. Default is 0.2

spark.streaming.dynamicAllocation.reserveRate=0.2