Friday, November 17, 2006

Java 5 Executors: ThreadPool

The following is a sample of a daemon that accepts requests and processes them concurrently. The daemon accepts requests and creates one thread to handle one request.
while (true) {
request = acceptRequest();
Runnable requestHandler = new Runnable() {
public void run() {
handleRequest(request);
}
};
new Thread(requestHandler).start();
}
While this is a correct implementation, it has some performance drawbacks.
  • Thread lifecycle overhead: If the requests are frequent and lightweight, the thread creation and teardown may become an overhead.
  • Resource consumption:
    • Active threads consume system resources.
    • Idle threads may occupy a lot of memory.
    • Having too many threads competing for CPU time may add an overhead to processing time.
  • Stability: Unbounded thread creation may end in an OutOfMemoryError. This is because of the limits (imposed by the native platform, JVM invocation parameters etc.) on the number of threads that can be created.
This is where the Java 5 executor framework comes in handy. Executor is the primay abstraction for task execution in Java 5.
public interface Executor {
void execute(Runnable command);
}
The executor provides a standard means of decoupling task submission from task execution. The Executors also provide thread lifecycle support and hooks for adding statistics gathering, application management, and monitoring. Executor is based on the producer-consumer pattern, where activities that submit tasks are producers and the threads that execute tasks are consumers. The following sample code shows how to use a ThreadPool (an implementation of Executor).
int NTHREADS = 100;
Executor exec = Executors.newFixedThreadPool(NTHREADS);
while (true) {
request = acceptRequest();
Runnable requestHandler = new Runnable() {
public void run() {
handleRequest(request);
}
};
exec.execute(requestHandler);
}
In this case, the main thread is the producer and requestHandler is the consumer.
Execution Policies
The various Executor implementations provide different execution policies to be set while executing the tasks. For example, the ThreadPool supports the following policies:
  • newFixedThreadPool: Creates threads as tasks are submitted, up to the maximum pool size, and then attempts to keep the pool size constant.
  • newCachedThreadPool: Can add new threads when demand increases, no bounds on the size of the pool.
  • newSingleThreadExecutor: Single worker thread to process tasks, Guarantees order of execution based on the queue policy (FIFO, LIFO, priority order).
  • newScheduledThreadPool: Fixed-size, supports delayed and periodic task execution.
Executor Lifecycle
An application can be shut down either gracefully or abruptly, or somewhere in-between. Executors provide the ability to be shutdown as abruptly or gracefully. This is addressed by the ExecutorService, which implements Executor and adds a number of methods for lifecycle management (and some utility methods).
public interface ExecutorService extends Executor {

void shutdown();

List<Runnable> shutdownNow();

boolean isShutdown();

boolean isTerminated();

boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

<T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

<T> T invokeAny(Collection<Callable<T>> tasks)
throws InterruptedException, ExecutionException;

<T> T invokeAny(Collection<Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

}
The first five methods are for lifecycle management. The following code sample shows how the above methods for lifecycle management may be used
doWork() {
ExecutorService exec = ...;
while (!exec.isShutdown()) {
try {
Request request = acceptRequest();
exec.execute(new Runnable() {
public void run() { handleRequest(conn); }
});
} catch (RejectedExecutionException e) {
e.printStackTrace();
}
}
}

public void stop() { exec.shutdown(); }

void handleRequest(Request request) {
if (isShutdownRequest(req))
stop();
else
handle(req);
}

This post presented a brief overview of the Executor framework. The next post will provide more details into the usage of the executor framework, introducing more classes from the java.util.concurrent package.

4 comments:

  1. Look what, this is Mohit.. I was stuck in a problem of threadpool management and reach your arcticle from google..

    Goog blogg..

    ReplyDelete
  2. Do you know whether the submit() method is reentrant? I cannot find any documentation on it. In the Sun's API there is nothing about it.

    DINF - oprogramowanie na zamówienie / enterprise software

    ReplyDelete
  3. submit in turn calls Executors execute() method.

    Avadhoot.

    ReplyDelete
  4. Thanks, this is generally helpful.
    Still, I followed step-by-step your method in this Java online training
    Java online classes

    ReplyDelete

Popular Posts