# Executor, ExecutorService and Thread pools
The Executor (opens new window) interface in Java provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc. An Executor is normally used instead of explicitly creating threads. With Executors, developers won't have to significantly rewrite their code to be able to easily tune their program's task-execution policy.
# ThreadPoolExecutor
A common Executor used is the ThreadPoolExecutor
, which takes care of Thread handling. You can configure the minimal amount of Threads the executor always has to maintain when there's not much to do (it's called core size) and a maximal Thread size to which the Pool can grow, if there is more work to do. Once the workload declines, the Pool slowly reduces the Thread count again until it reaches min size.
ThreadPoolExecutor pool = new ThreadPoolExecutor(
1, // keep at least one thread ready,
// even if no Runnables are executed
5, // at most five Runnables/Threads
// executed in parallel
1, TimeUnit.MINUTES, // idle Threads terminated after one
// minute, when min Pool size exceeded
new ArrayBlockingQueue<Runnable>(10)); // outstanding Runnables are kept here
pool.execute(new Runnable() {
@Override public void run() {
//code to run
}
});
Note If you configure the ThreadPoolExecutor
with an unbounded queue, then the thread count will not exceed corePoolSize
since new threads are only created if the queue is full:
ThreadPoolExecutor with all parameters:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
from JavaDoc (opens new window)
If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
Advantages:
- In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
- In `ThreadPoolExecutor.CallerRunsPolicy`, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
- In `ThreadPoolExecutor.DiscardPolicy`, a task that cannot be executed is simply dropped.
- In `ThreadPoolExecutor.DiscardOldestPolicy`, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)
- To set a more descriptive thread name
- To set thread daemon status
- To set thread priority
-
Wait indefinitely for future to finish with a result.
try { // Blocks current thread until future is completed Integer result = future.get(); catch (InterruptedException || ExecutionException e) { // handle appropriately }
-
Wait for future to finish, but no longer than specified time.
try { // Blocks current thread for a maximum of 500 milliseconds. // If the future finishes before that, result is returned, // otherwise TimeoutException is thrown. Integer result = future.get(500, TimeUnit.MILLISECONDS); catch (InterruptedException || ExecutionException || TimeoutException e) { // handle appropriately }
- Calling
cancel(false)
will just remove the task from the queue of tasks to be run. - Calling
cancel(true)
will also interrupt the task if it is currently running. - you try to submit tasks to a shutdown Executor or
- the queue is saturated (only possible with bounded ones) and maximum number of Threads has been reached,
- ThreadPoolExecutor.AbortPolicy (default, will throw REE)
- ThreadPoolExecutor.CallerRunsPolicy (executes task on caller's thread - blocking it)
- ThreadPoolExecutor.DiscardPolicy (silently discard task)
- ThreadPoolExecutor.DiscardOldestPolicy (silently discard oldest task in queue and retry execution of the new task)
- [ExecutorService](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html) `ExecutorService executor = Executors.newFixedThreadPool(50);` It is simple and easy to use. It hides low level details of `ThreadPoolExecutor`. I prefer this one when number of `Callable/Runnable` tasks are small in number and piling of tasks in unbounded queue does not increase memory & degrade the performance of the system. If you have `CPU/Memory` constraints, I prefer to use `ThreadPoolExecutor` with capacity constraints & `RejectedExecutionHandler` to handle rejection of tasks.
-
[CountDownLatch](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html)
`CountDownLatch` will be initialized with a given count. This count is decremented by calls to the `countDown()` method. Threads waiting for this count to reach zero can call one of the `await()` methods. Calling `await()` blocks the thread until the count reaches zero. **This class enables a java thread to wait until other set of threads completes their tasks.**
**Use cases:**
- Achieving Maximum Parallelism: Sometimes we want to start a number of threads at the same time to achieve maximum parallelism
- Wait N threads to completes before start execution
- Deadlock detection.
-
[ThreadPoolExecutor](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html) : It provides more control. If application is constrained by number of pending Runnable/Callable tasks, you can use bounded queue by setting the max capacity. Once the queue reaches maximum capacity, you can define RejectionHandler. Java provides four types of `RejectedExecutionHandler` [policies](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html).
- `ThreadPoolExecutor.AbortPolicy`, the handler throws a runtime RejectedExecutionException upon rejection.
- ThreadPoolExecutor.CallerRunsPolicy`, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
- In `ThreadPoolExecutor.DiscardPolicy`, a task that cannot be executed is simply dropped.
- `ThreadPoolExecutor.DiscardOldestPolicy`, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)
-
One more mechanism you did not quote is [ForkJoinPool](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html)
The `ForkJoinPool` was added to Java in Java 7. The `ForkJoinPool` is similar to the Java `ExecutorService` but with one difference. The `ForkJoinPool` makes it easy for tasks to split their work up into smaller tasks which are then submitted to the `ForkJoinPool` too. Task stealing happens in `ForkJoinPool` when free worker threads steal tasks from busy worker thread queue.
Java 8 has introduced one more API in [ExecutorService](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html) to create work stealing pool. You don't have to create `RecursiveTask` and `RecursiveAction` but still can use `ForkJoinPool`.public static ExecutorService newWorkStealingPool()
Creates a work-stealing thread pool using all available processors as its target parallelism level.
By default, it will take number of CPU cores as parameter. - [ExecutorService](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html) `invokeAll()`
Executes the given tasks, returning a list of Futures holding their status and results when everything is completed.
-
[CountDownLatch](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CountDownLatch.html)
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A **CountDownLatch** is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the `countDown()` method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a **CyclicBarrier**.
- [ForkJoinPool](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ForkJoinPool.html) or `newWorkStealingPool()` in [Executors](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html)
- Iterate through all `Future` objects created after submitting to `ExecutorService`
-
Recommended way of shutdown from oracle documentation page of [ExecutorService](http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html):
void shutdownAndAwaitTermination(ExecutorService pool) { pool.shutdown(); // Disable new tasks from being submitted try { // Wait a while for existing tasks to terminate if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { pool.shutdownNow(); // Cancel currently executing tasks // Wait a while for tasks to respond to being cancelled if (!pool.awaitTermination(60, TimeUnit.SECONDS)) System.err.println("Pool did not terminate"); } } catch (InterruptedException ie) { // (Re-)Cancel if current thread also interrupted pool.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); }
shutdown():
Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.shutdownNow():
Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution. In above example, if your tasks are taking more time to complete, you can change if condition to while condition Replaceif (!pool.awaitTermination(60, TimeUnit.SECONDS))
with
while(!pool.awaitTermination(60, TimeUnit.SECONDS)) { Thread.sleep(60000);
}
-
`public static ExecutorService newSingleThreadExecutor()`
Creates an Executor that uses a single worker thread operating off an unbounded queue
There is a difference between `newFixedThreadPool(1)` and `newSingleThreadExecutor()` as the java doc says for the latter:Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
Which means that a `newFixedThreadPool` can be reconfigured later in the program by: `((ThreadPoolExecutor) fixedThreadPool).setMaximumPoolSize(10)` This is not possible for `newSingleThreadExecutor`
Use cases:-
1. You want to execute the submitted tasks in a sequence.
1. You need only one Thread to handle all your request
- Unbounded queue is harmful
- Effective use of available cores. Configure
nThreads
asRuntime.getRuntime().availableProcessors()
- When you decide that number of thread should not exceed a number in the thread pool
- Unbounded queue is harmful.
- For short-lived asynchronous tasks
- Unbounded queue is harmful.
- Each new task will create a new thread if all existing threads are busy. If the task is taking long duration, more number of threads will be created,which will degrade the performance of the system. Alternative in this case:
newFixedThreadPool
-
`public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)`
Creates a thread pool that can schedule commands to run after a given delay, or to execute periodically.
Use cases:-
1. Handling recurring events with delays, which will happen in future at certain interval of times
- Unbounded queue is harmful.
- For divide and conquer type of problems.
- Effective use of idle threads. Idle threads steals tasks from busy threads.
- Unbounded queue size is harmful.
- Control Thread pool size dynamically
- Set the capacity for
BlockingQueue
- Define
RejectionExecutionHander
when queue is full CustomThreadFactory
to add some additional functionality during Thread creation(public Thread newThread(Runnable r)
- When you schedule a task for repeated execution, depending on the ScheduledExecutorService used, your task might be suspended from any further execution, if an execution of your task causes an exception which isn't handled. See Mother F**k the ScheduledExecutorService! (opens new window)
Cons:
5.
public static ExecutorService newWorkStealingPool()
Creates a work-stealing thread pool using all available processors as its target parallelism level
Use cases:
Cons:
You can see one common drawbacks in all these ExecutorService : unbounded queue. This will be addressed with ThreadPoolExecutor (opens new window)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
With
ThreadPoolExecutor
, you can# Scheduling tasks to run at a fixed time, after a delay or repeatedly
The
ScheduledExecutorService
class provides a methods for scheduling single or repeated tasks in a number of ways. The following code sample assume thatpool
has been declared and initialized as follows:ScheduledExecutorService pool = Executors.newScheduledThreadPool(2);
In addition to the normal
ExecutorService
methods, theScheduledExecutorService
API adds 4 methods that schedule tasks and returnScheduledFuture
objects. The latter can be used to retrieve results (in some cases) and cancel tasks.# Starting a task after a fixed delay
The following example schedules a task to start after ten minutes.
ScheduledFuture<Integer> future = pool.schedule(new Callable<>() { @Override public Integer call() { // do something return 42; } }, 10, TimeUnit.MINUTES);
# Starting tasks at a fixed rate
The following example schedules a task to start after ten minutes, and then repeatedly at a rate of once every one minute.
ScheduledFuture<?> future = pool.scheduleAtFixedRate(new Runnable() { @Override public void run() { // do something } }, 10, 1, TimeUnit.MINUTES);
Task execution will continue according to the schedule until the
pool
is shut down, thefuture
is canceled, or one of the tasks encounters an exception.It is guaranteed that the tasks scheduled by a given
scheduledAtFixedRate
call will not overlap in time. If a task takes longer than the prescribed period, then the next and subsequent task executions may start late.# Starting tasks with a fixed delay
The following example schedules a task to start after ten minutes, and then repeatedly with a delay of one minute between one task ending and the next one starting.
ScheduledFuture<?> future = pool.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // do something } }, 10, 1, TimeUnit.MINUTES);
Task execution will continue according to the schedule until the
pool
is shut down, thefuture
is canceled, or one of the tasks encounters an exception.# Using Thread Pools
Thread Pools are used mostly calling methods in
ExecutorService
.The following methods can be used to submit work for execution:
Method Description submit
Executes a the submitted work and return a future which can be used to get the result execute
Execute the task sometime in the future without getting any return value invokeAll
Execute a list of tasks and return a list of Futures invokeAny
Executes all the but return only the result of one that has been successful (without exceptions) Once you are done with the Thread Pool you can call
shutdown()
to terminate the Thread Pool. This executes all pending tasks. To wait for all tasks to execute you can can loop aroundawaitTermination
orisShutdown()
.# Remarks
Pitfalls
Unlike the otherwise equivalent newFixedThreadPool(1) the returned executor is guaranteed not to be reconfigurable to use additional threads.
Cons:
public static ExecutorService newFixedThreadPool(int nThreads)
Creates a thread pool that reuses a fixed number of threads operating off a shared unbounded queue. At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available
Use cases:
Cons:
public static ExecutorService newCachedThreadPool()
Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available
Use cases:
Cons:
If you want to simulate
CountDownLatch
behaviour, you can useinvokeAll()
method.All these four mechanism are complimentary to each other. Depending on level of granularity you want to control, you have to chose right ones.
# Wait for completion of all tasks in ExecutorService
Let's have a look at various options to wait for completion of tasks submitted to Executor (opens new window)
Example:
import java.util.concurrent.*; import java.util.*; public class InvokeAllDemo{ public InvokeAllDemo(){ System.out.println("creating service"); ExecutorService service = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); List<MyCallable> futureList = new ArrayList<MyCallable>(); for (int i = 0; i < 10; i++){ MyCallable myCallable = new MyCallable((long)i); futureList.add(myCallable); } System.out.println("Start"); try{ List<Future<Long>> futures = service.invokeAll(futureList); } catch(Exception err){ err.printStackTrace(); } System.out.println("Completed"); service.shutdown(); } public static void main(String args[]){ InvokeAllDemo demo = new InvokeAllDemo(); } class MyCallable implements Callable<Long>{ Long id = 0L; public MyCallable(Long val){ this.id = val; } public Long call(){ // Add your business logic return id; } } }
A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the
countDown()
method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.# Use cases for different types of ExecutorService
Executors (opens new window) returns different type of ThreadPools catering to specific need.
Custom ThreadFactory
can be configured, which is useful :
Here (opens new window) is a example of how to use ThreadPoolExecutor
# Retrieving value from computation - Callable
If your computation produces some return value which later is required, a simple Runnable task isn't sufficient. For such cases you can use ExecutorService.submit(
Callable
(opens new window)<T>)
which returns a value after execution completes.
The Service will return a Future
(opens new window) which you can use to retrieve the result of the task execution.
// Submit a callable for execution
ExecutorService pool = anExecutorService;
Future<Integer> future = pool.submit(new Callable<Integer>() {
@Override public Integer call() {
//do some computation
return new Random().nextInt();
}
});
// ... perform other tasks while future is executed in a different thread
When you need to get the result of the future, call future.get()
If the result of a scheduled or running task is no longer required, you can call Future.cancel(boolean)
to cancel it.
# submit() vs execute() exception handling differences
Generally execute() command is used for fire and forget calls (without need of analyzing the result) and submit() command is used for analyzing the result of Future object.
We should be aware of key difference of Exception Handling mechanisms between these two commands.
Exceptions from submit() are swallowed by framework if you did not catch them.
Code example to understand the difference:
Case 1: submit the Runnable with execute() command, which reports the Exception.
import java.util.concurrent.*;
import java.util.*;
public class ExecuteSubmitDemo {
public ExecuteSubmitDemo() {
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(2);
//ExtendedExecutor service = new ExtendedExecutor();
for (int i = 0; i < 2; i++){
service.execute(new Runnable(){
public void run(){
int a = 4, b = 0;
System.out.println("a and b=" + a + ":" + b);
System.out.println("a/b:" + (a / b));
System.out.println("Thread Name in Runnable after divide by zero:"+Thread.currentThread().getName());
}
});
}
service.shutdown();
}
public static void main(String args[]){
ExecuteSubmitDemo demo = new ExecuteSubmitDemo();
}
}
class ExtendedExecutor extends ThreadPoolExecutor {
public ExtendedExecutor() {
super(1, 1, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(100));
}
// ...
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
if (t == null && r instanceof Future<?>) {
try {
Object result = ((Future<?>) r).get();
} catch (CancellationException ce) {
t = ce;
} catch (ExecutionException ee) {
t = ee.getCause();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
if (t != null)
System.out.println(t);
}
}
output:
creating service
a and b=4:0
a and b=4:0
Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-2" java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
java.lang.ArithmeticException: / by zero
at ExecuteSubmitDemo$1.run(ExecuteSubmitDemo.java:15)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Case 2: Replace execute() with submit() : service.submit(new Runnable(){
In this case, Exceptions are swallowed by framework since run() method did not catch them explicitly.
output:
creating service
a and b=4:0
a and b=4:0
Case 3: Change the newFixedThreadPool to ExtendedExecutor
//ExecutorService service = Executors.newFixedThreadPool(2);
ExtendedExecutor service = new ExtendedExecutor();
output:
creating service
a and b=4:0
java.lang.ArithmeticException: / by zero
a and b=4:0
java.lang.ArithmeticException: / by zero
I have demonstrated this example to cover two topics : Use your custom ThreadPoolExecutor and handle Exectpion with custom ThreadPoolExecutor.
Other simple solution to above problem : When you are using normal ExecutorService & submit command, get the Future object from submit() command call get() API on Future. Catch the three exceptions, which have been quoted in afterExecute method implementation. Advantage of custom ThreadPoolExecutor over this approach : You have to handle Exception handling mechanism in only one place - Custom ThreadPoolExecutor.
# Fire and Forget - Runnable Tasks
Executors accept a java.lang.Runnable
which contains (potentially computationally or otherwise long-running or heavy) code to be run in another Thread.
Usage would be:
Executor exec = anExecutor;
exec.execute(new Runnable() {
@Override public void run() {
//offloaded work, no need to get result back
}
});
Note that with this executor, you have no means to get any computed value back.
With Java 8, one can utilize lambdas to shorten the code example.
Executor exec = anExecutor;
exec.execute(() -> {
//offloaded work, no need to get result back
});
# Handle Rejected Execution
If
RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor)
will be called.
The default behavior is that you'll get a RejectedExecutionException thrown at the caller. But there are more predefined behaviors available:
You can set them using one of the ThreadPool constructors (opens new window):
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) // <--
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) // <--
You can as well implement your own behavior by extending RejectedExecutionHandler (opens new window) interface:
void rejectedExecution(Runnable r, ThreadPoolExecutor executor)