Quick Refresh : Java : Executor Framework
Thread Pool and Executor Framework
A thread pool reuses previously created threads to execute current tasks and offers a solution to the problem of thread life cycle overhead and resource thrashing. Since the thread is already existing when the request arrives, the delay introduced by thread creation is eliminated, making the application more responsive.Java provides the Executor framework which is centered around the Executor interface, its sub-interface –ExecutorService and the class-ThreadPoolExecutor, which implements both of these interfaces.
By using the executor, one only has to implement the Runnable/Callable objects and send them to the executor to execute. They allow you to take advantage of threading, but focus on the tasks that you want the thread to perform, instead of thread mechanics.
To use thread pools, we first create a object of ExecutorService and pass a set of tasks to it. ThreadPoolExecutor class allows to set the core and maximum pool size.The runnables that are run by a particular thread are executed sequentially.
Important Points
- Executor
- ExecutorService
- ScheduledExecutorService
- Future
- CountDownLatch
- CyclicBarrier
- Semaphore
- ThreadFactory
- BlockingQueue
- DelayQueue
- Locks
- Phaser
Executors: in large-scale applications, it makes sense to separate thread management and creation from the rest of the application. Objects that encapsulate these functions are known as executors.
- Executor Interfaces define the three executor object types.
- Thread Pools are the most common kind of executor implementation.
- Fork/Join is a framework (new in JDK 7) for taking advantage of multiple processors.
- Executor Interfaces
(1) Executor Interface : a simple interface that supports launching new tasks.
- The Executor interface provides a single method, execute, designed to be a drop-in replacement for a common thread-creation. If r is a Runnable object, and e is an Executorobject you can replace (new Thread(r)).start(); with e.
execute(r); - The low-level idiom creates a new thread and launches it immediately. Depending on the Executor implementation, execute may do the same thing, but is more likely to use an existing worker thread to run r, or to place r in a queue to wait for a worker thread to become available.
- Point to note here is that if the executor can’t accept the task for execution, it will throw RejectedExecutionException.
void execute(Runnable command)
(2) ExecutorService Interface: - ExecutorService is a sub interface of Executor. It adds methods which help to manage the life cycle of individual tasks and of the executor itself.
- Tasks can be assigned to the ExecutorService using several methods, including execute(), which is inherited from the Executor interface, and also submit(), invokeAny(), invokeAll().
- The execute() method is void, and it doesn’t give any possibility to get the result of task’s execution or to check the task’s status (is it running or executed).
executorService.execute(runnableTask); - ExecutorService interface has more versatile submit method. submit accepts Runnable or Callable objects. The submit method returns a Future object, which is used to retrieve the Callable return value and to manage the status of both Callable and Runnable tasks. Future that can be used to cancel execution and/or wait for completion.
<T> Future<T> submit(Callable<T> task) : Submits a value-returning task for execution and returns a Future representing the pending results of the task.Future<?> submit(Runnable task) : Submits a Runnable task for execution and returns a Future representing that task.<T> Future<T> submit(Runnable task, T result) : Submits a Runnable task for execution and returns a Future representing that task.
- ExecutorService also provides methods for submitting large collections of Callable objects. Methods invokeAny and invokeAll perform the most commonly useful forms of bulk execution, executing a collection of tasks and then waiting for at least one, or all, to complete. (Class ExecutorCompletionService can be used to write customized variants of these methods.)
- The submit() and invokeAll() methods return an object or a collection of objects of type Future, which allows us to get the result of a task’s execution or to check the task’s status (is it running or executed). The Future interface provides a special blocking method get() which returns an actual result of the Callable task’s execution or null in the case of Runnable task. Calling the get() method while the task is still running will cause execution to block until the task is properly executed and the result is available.
- An ExecutorService can be shut down, which will cause it to reject new tasks. Two different methods are provided for shutting down an ExecutorService. The shutdown() method will allow previously submitted tasks to execute before terminating, while the shutdownNow() method prevents waiting tasks from starting and attempts to stop currently executing tasks. Upon termination, an executor has no tasks actively executing, no tasks awaiting execution, and no new tasks can be submitted. An unused ExecutorService should be shut down to allow reclamation of its resources.
void shutdown() : Initiates an orderly shutdown in which previously submitted tasks are executed, but no new tasks will be accepted.List<Runnable> shutdownNow() : Attempts to stop all actively executing tasks, halts the processing of waiting tasks, and returns a list of the tasks that were awaiting execution.
One good way to shut down the ExecutorService (which is also recommended by Oracle) is to use both of these methods combined with the awaitTermination() method. With this approach, the ExecutorService will first stop taking new tasks, the wait up to a specified period of time for all tasks to be completed. If that time expires, the execution is stopped immediately:executorService.shutdown();try {if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) { executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();}
The ExecutorService interface has three standard implementations:
ThreadPoolExecutor — for executing tasks using a pool of threads. Once a thread is finished executing the task, it goes back into the pool. If all threads in the pool are busy, then the task has to wait for its turn.
ScheduledThreadPoolExecutor allows to schedule task execution instead of running it immediately when a thread is available. It can also schedule tasks with fixed rate or fixed delay.
ForkJoinPool is a special ExecutorService for dealing with recursive algorithms tasks. If you use a regular ThreadPoolExecutor for a recursive algorithm, you will quickly find all your threads are busy waiting for the lower levels of recursion to finish. The ForkJoinPool implements the so-called work-stealing algorithm that allows it to use available threads more efficiently.
Difference between the java.lang.Runnable and java.util.concurrent.Callable interfaces:
The Callable interface has a single call method and represents a task that has a value. That’s why the call method returns a value. It can also throw exceptions. Callable is generally used in ExecutorService instances to start an asynchronous task and then call the returned Future instance to get its value.
public void run();
}
public interface Callable{public Object call() throws Exception;}
Future (java.util.concurrent.Future):
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning)
V get();
V get(long timeout, TimeUnit unit);
boolean isCancelled();
boolean isDone();
}
- get() method can be used to get computed result.
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();}
If you call the
get()
method before the asynchronous task has completed, theget()
method will block until the result is ready. With very long blocking caused by the get() method, an application’s performance can degrade. If the resulting data is not crucial, it is possible to avoid such a problem by using timeouts:
String result = future.get(200, TimeUnit.MILLISECONDS);If the execution period is longer than specified (in this case 200 milliseconds), a TimeoutException will be thrown.
- Check if Task is Done: The isDone() method can be used to check if the assigned task is already completed or not.
- Cancelling the Task: The Future interface also provides for the cancellation of task (Runnable or Callable) which was submitted to a Java
ExecutorService
for execution with the cancel(boolean mayInterruptIfRunning) method . If the value of mayInterruptIfRunning is true, the thread executing the task will be terminated instantly. Otherwise, in-progress tasks will be allowed to complete. - Check if Task is Cancelled: isCancelled() method can be used to check the cancellation.
ThreadPoolExecu tor
has several constructors available. For instance:ExecutorService threadPoolExecutor =
new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>( ) );
corePoolSize: If less than corePoolSize threads are created in the the thread pool when a task is delegated to the thread pool, then a new thread is created, even if idle threads exist in the pool.
maximumPoolSize: If the internal queue of tasks is full, and corePoolSize threads or more are running, but less than maximumPoolSize threads are running, then a new thread is created to execute the task.
an ExecutorService with a single thread to execute commands with method newSingleThreadExecutor.
// Creates a single thread ExecutorService
ExecutorService singleExecutorService = Executors.newSingleThreadExecutor(); - an ExecutorService that use a fixed length pool of threads to execute commands with the method newFixedThreadPool.
- a ScheduledExecutorService with a single thread to execute commands with the method newSingleThreadScheduledExecut
or.
- a ScheduledExecutorService with a fixed length pool of threads to execute scheduled commands with the method newScheduledThreadPool.
- an ExecutorService with a pool of threads that creates a new thread if no thread is available and reuse an existing thread if they are available with newCachedThreadPool.
Common pitfalls while using ExecutorService:
- Keeping an unused ExecutorService alive: There is a detailed explanation above about how to shut down an ExecutorService;
- Wrong thread-pool capacity while using fixed length thread-pool: It is very important to determine how many threads the application will need to execute tasks efficiently. A thread-pool that is too large will cause unnecessary overhead just to create threads which mostly will be in the waiting mode. Too few can make an application seem unresponsive because of long waiting periods for tasks in the queue.
- Calling a Future‘s get() method after task cancellation: An attempt to get the result of an already canceled task will trigger a CancellationException.
- Unexpectedly-long blocking with Future‘s get() method: Timeouts should be used to avoid unexpected waits.
ExecutorService vs. Fork/Join:
After the release of Java 7, many developers decided that the ExecutorService framework should be replaced by the fork/join framework. This is not always the right decision, however. Despite the simplicity of usage and the frequent performance gains associated with fork/join, there is also a reduction in the amount of developer control over concurrent execution.
ExecutorService gives the developer the ability to control the number of generated threads and the granularity of tasks which should be executed by separate threads. The best use case for ExecutorService is the processing of independent tasks, such as transactions or requests according to the scheme “one thread for one task.”
Describe the purpose and use-cases of the fork/join framework:
The fork/join framework entry point is the ForkJoinPool class which is an implementation of ExecutorService. It implements the work-stealing algorithm, where idle threads try to “steal” work from busy threads. This allows to spread the calculations between different threads and make progress while using fewer threads than it would require with a usual thread pool.
B
lockingQueue
interface in the java.util.concurrent
packaBlockingQueue
is typically used to have on thread produce objects, which another thread consumes. Here is a diagram that illustrates this principle:A BlockingQueue with one thread putting into it, and another thread taking from it. |
The producing thread will keep producing new objects and insert them into the queue, until the queue reaches some upper bound on what it can contain. It's limit, in other words. If the blocking queue reaches its upper limit, the producing thread is blocked while trying to insert the new object. It remains blocked until a consuming thread takes an object out of the queue.
The consuming thread keeps taking objects out of the blocking queue, and processes them. If the consuming thread tries to take an object out of an empty queue, the consuming thread is blocked until a producing thread puts an object into the queue.
It is not possible to insert null
into a BlockingQueue
. If you try to insert null, the BlockingQueue
will throw a NullPointerException
.
It is also possible to access all the elements inside a BlockingQueue
, and not just the elements at the start and end. For instance, say you have queued an object for processing, but your application decides to cancel it. You can then call e.g. remove(o)
to remove a specific object in the queue. However, this is not done very efficiently, so you should not use these Collection
methods unless you really have to.
The fork/join framework was presented in Java 7. It provides tools to help speed up parallel processing by attempting to use all available processor cores – which is accomplished through a divide and conquer approach.
In practice, this means that the framework first “forks”, recursively breaking the task into smaller independent subtasks until they are simple enough to be executed asynchronously.
After that, the “join” part begins, in which results of all subtasks are recursively joined into a single result, or in the case of a task which returns void, the program simply waits until every subtask is executed.
To provide effective parallel execution, the fork/join framework uses a pool of threads called the ForkJoinPool, which manages worker threads of type ForkJoinWorkerThread.
2. ForkJoinPool
The ForkJoinPool is the heart of the framework. It is an implementation of the ExecutorService that manages worker threads and provides us with tools to get information about the thread pool state and performance.
Worker threads can execute only one task at the time, but the ForkJoinPool doesn’t create a separate thread for every single subtask. Instead, each thread in the pool has its own double-ended queue (or deque, pronounced deck) which stores tasks.
This architecture is vital for balancing the thread’s workload with the help of the work-stealing algorithm.
2.1. Work Stealing Algorithm
Simply put – free threads try to “steal” work from deques of busy threads.
By default, a worker thread gets tasks from the head of its own deque. When it is empty, the thread takes a task from the tail of the deque of another busy thread or from the global entry queue, since this is where the biggest pieces of work are likely to be located.
This approach minimizes the possibility that threads will compete for tasks. It also reduces the number of times the thread will have to go looking for work, as it works on the biggest available chunks of work first.
2.2. ForkJoinPool Instantiatio n
In Java 8, the most convenient way to get access to the instance of the ForkJoinPool is to use its static method commonPool(). As its name suggests, this will provide a reference to the common pool, which is a default thread pool for every ForkJoinTask.
According to Oracle’s documentation, using the predefined common pool reduces resource consumption, since this discourages the creation of a separate thread pool per task.
1 | ForkJoinPool commonPool = ForkJoinPool.commonPool(); |
The same behavior can be achieved in Java 7 by creating a ForkJoinPool and assigning it to a public static field of a utility class:
1 | public static ForkJoinPool forkJoinPool = new ForkJoinPool( 2 ); |
Now it can be easily accessed:
1 | ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool; |
With ForkJoinPool’s constructo
3. ForkJoinTask<V>
ForkJoinTask is the base type for tasks executed inside ForkJoinPool. In practice, one of its two subclasses should be extended: the RecursiveAction for void t
3.1. RecursiveAction – An Example
In the example below, the unit of work to be processed is represented by a String called workload. For demonstration purposes, the task is a nonsensical one: it simply uppercases its input and logs it.
To demonstrate the forking behavior of the framework, the example splits the task if workload.length() is larger than a specified threshold using the createSubtask() method.
The String is recursively divided into substrings, creating CustomRecursiveTask i
As a result, the method returns a List<CustomRecursiveAction>.
The list is submitted to the ForkJoinPool using the invokeAll() method:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | public class CustomRecursiveAction extends RecursiveAction { private String workload = "" ; private static final int THRESHOLD = 4 ; private static Logger logger = Logger. public CustomRecursiveAction(String workload) { this .workload = workload; } @Override protected void compute() { if (workload.length() > THRESHOLD) { ForkJoinTask. } else { processing( } } private List<CustomRecursiveAction> createSubtasks() { List< new ArrayList<>(); String partOne = workload.substring( 0 , workload.length() / 2 ); String partTwo = workload.substring(workload. 2 , workload.length()); subtasks.add( new CustomRecursiveAction(partOne) subtasks.add( new CustomRecursiveAction(partTwo) return subtasks; } private void processing(String work) { String result = work.toUpperCase(); logger.info( "This result - (" + result + ") - was processed by " + Thread.currentThread(). } } |
This pattern can be used to develop your own RecursiveAction classes.
3.2. RecursiveTask<V>
For tasks that return a value, the logic here is similar, except that resulting for each subtask is united in a single result:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | public class CustomRecursiveTask extends RecursiveTask<Integer> { private int [] arr; private static final int THRESHOLD = 20 ; public CustomRecursiveTask( int [] arr) { this .arr = arr; } @Override protected Integer compute() { if (arr.length > THRESHOLD) { return ForkJoinTask.invokeAll( .stream() .mapToInt( .sum(); } else { return processing(arr); } } private Collection< List< new ArrayList<>(); dividedTasks.add( new CustomRecursiveTask( Arrays.copyOfRange( 0 , arr.length / 2 ))); dividedTasks.add( new CustomRecursiveTask( Arrays.copyOfRange( 2 , arr.length))); return dividedTasks; } private Integer processing( int [] arr) { return Arrays.stream(arr) .filter(a -> a > 10 && a < 27 ) .map(a -> a * 10 ) .sum(); } } |
In this example, the work is represented by an array stored in the arr field of the CustomRecursiveTask class. The createSubtask() method recursively divides the task into smaller pieces of work until each piece is smaller than the threshold. Then, the invokeAll() method submits subtasks to the common pull and returns a list of Future.
To trigger execution, the join() method called for each subtask.
In this example, this is accomplished using Java 8’s Stream API; the sum() method is used as a representation of combining sub results into the final result.
4. Submitting Tasks to the ForkJoinPool
To submit tasks to the thread pool, few approaches can be used.
The submit() or execute() meth
1 2 | forkJoinPool.execute( int result = customRecursiveTask.join(); |
The invoke() method forks the task and waits for the result, and doesn’t need any manual joining:
1 | int result = forkJoinPool.invoke( |
The invokeAll() method is the most convenient way to submit a sequence of ForkJoinTasks to the ForkJoinPool. It takes tasks as parameters (two tasks, var args, or a collection), forks them returns a collection of Future objects in the order in which they were produced.
Alternatively, you can use separate fork() and join()
1 2 | customRecursiveTaskFirst.fork( result = customRecursiveTaskLast.join() |
In our RecursiveTask<V> example we used the invokeAll() method to submit a sequence of subtasks to the pool. The same job can be done with fork() and join(), though this has consequences for the ordering of the results.
To avoid confusion, it is generally a good idea to use invokeAll() method to submit more than one task to the ForkJoinPool.
5. Conclusions
Using the fork/join framework can speed up processing of large tasks, but to achieve this outcome, some guidelines should be followed:
- Use as few thread pools as possible – in most cases, the best decision is to use one thread pool per application or system
- Use the default common thread pool, if no specific tuning is needed
- Use a reasonable threshold for splitting ForkJoingTask into subtasks
- Avoid any blocking in your ForkJoingTasks
- AtomicBoolean : A boolean value that may be updated atomically.
- AtomicInteger : An int value that may be updated atomically.
- AtomicIntegerArray : An int array in which elements may be updated atomically.
- AtomicIntegerFieldUpdater<T> : A reflection-based utility that enables atomic updates to designated volatile int fields of designated classes.
- AtomicLong :A long value that may be updated atomically.
- AtomicLongArray :A long array in which elements may be updated atomically.
- AtomicLongFieldUpdater<T> : A reflection-based utility that enables atomic updates to designated volatile long fields of designated classes.
- AtomicMarkableReference<V>: An AtomicMarkableReference maintains an object reference along with a mark bit, that can be updated atomically.
- AtomicReference<V> : An object reference that may be updated atomically.
- AtomicReferenceArray<E> : An array of object references in which elements may be updated atomically.
- AtomicReferenceFieldUpdater<T,
V> : A reflection-based utility that enables atomic updates to designated volatile reference fields of designated classes. - AtomicStampedReference<V> : An AtomicStampedReference maintains an object reference along with an integer "stamp", that can be updated atomically.
Comments
Post a Comment