Problems with RejectedExecutionHandler and Futures

I’ve been working with [cc_java inline=”true”]ThreadPoolExecutor[/cc_java] and [cc_java inline=”true”]RejectedExecutionHandler[/cc_java]. The executor framework provides a convenient way to distribute concurrent tasks to multiple threads, and the different [cc_java inline=”true”]RejectedExecutionHandler[/cc_java] policies, [cc_java inline=”true”]ThreadPoolExecutor.DiscardOldestPolicy[/cc_java], for instance, enable your program to shed load if there is more work than it can reliably handle.

Unfortunately, when I started using [cc_java inline=”true”]Future[/cc_java]s, things became complicated. Consider the following example:

[cc_java]
ThreadPoolExecutor es = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(10),
new ThreadPoolExecutor.DiscardOldestPolicy());

List> futures = Lists.newArrayList();
for(int i=0; i<30; ++i) { final int ii = i; futures.add(es.submit(new Callable() {
public Integer call() {
System.out.println(“Running task ” + ii);
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
// ignore
}
System.out.println(“Task ” + ii + ” ends”);
return ii;
}
}));
}

for(Future f: futures);
System.out.println(“Result: ” + f.get());
}
[/cc_java]

It creates a thread pool with a maximum of 10 threads, and a work queue that can hold another 10 items. The policy to shed load is [cc_java inline=”true”]ThreadPoolExecutor.DiscardOldestPolicy[/cc_java], which means it will favor newer tasks over older tasks, if the queue has reached its capacity.

Then the program creates 30 tasks and submits them to the executor service. The tasks each sleep for a second to make sure we actually hit the capacity of the thread pool executor. As a result, only 20 can be submitted (10 running, but sleeping, and another 10 in the work queue), and 10 need to be rejected. The tasks that are rejected are actually the ones that were in the queue first (the older tasks), so we can expect tasks 0 to 9 as well as 20 to 29 to execute.

There is a problem, however, when we wait for the results of tasks 10 to 19 by calling their [cc_java inline=”true”]Future.get()[/cc_java] methods: Because the tasks have been rejected, they never start running, which means they will never finish. The program hangs here.

The only [cc_java inline=”true”]RejectedExecutionHandler[/cc_java]s in the JDK that function correctly here are [cc_java inline=”true”]ThreadPoolExecutor.AbortPolicy[/cc_java] and [cc_java inline=”true”]ThreadPoolExecutor.CallerRunsPolicy[/cc_java]. [cc_java inline=”true”]ThreadPoolExecutor.AbortPolicy[/cc_java] throws a [cc_java inline=”true”]RejectedExecutionException[/cc_java] at the time the task is submitted to the executor service already, not later, and therefore does not return a [cc_java inline=”true”]Future[/cc_java] to wait for. [cc_java inline=”true”]ThreadPoolExecutor.CallerRunsPolicy[/cc_java] executes the task at the time it is executed at the time it is added to the executor service, and while it returns a [cc_java inline=”true”]Future[/cc_java], that [cc_java inline=”true”]Future[/cc_java] is done immediately, and therefore, getting the [cc_java inline=”true”]Future[/cc_java]’s result later will not block.

But what’s the right thing to do here for [cc_java inline=”true”]ThreadPoolExecutor.DiscardPolicy[/cc_java] and [cc_java inline=”true”]ThreadPoolExecutor.DiscardOldestPolicy[/cc_java]? Submitting tasks with these policies does return a [cc_java inline=”true”]Future[/cc_java], but if the task is rejected, that [cc_java inline=”true”]Future[/cc_java] will never be done.

I haven’t found a good solution with the built-in classes of the JDK yet. What I have done now is to cancel the [cc_java inline=”true”]Future[/cc_java] in the [cc_java inline=”true”]RejectedExecutionHandler[/cc_java]. Unfortunately, that can’t easily be done with the existing [cc_java inline=”true”]RejectedExecutionHandler[/cc_java] policies, like [cc_java inline=”true”]ThreadPoolExecutor.DiscardOldestPolicy[/cc_java].

[cc_java]
public class DiscardAndCancelOldestPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
Runnable rejected = e.getQueue().poll();
if (rejected instanceof Future) {
((Future) rejected).cancel(false);
}
e.execute(r);
}
}
}
[/cc_java]

[cc_java]
public class DiscardAndCancelPolicy implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (r instanceof Future) {
((Future) r).cancel(false);
}
}
}
[/cc_java]

Now we can use those policies, and the program behaves as expected. For example:

[cc_java]
ThreadPoolExecutor es = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(10),
new DiscardAndCancelOldestPolicy());

List> futures = Lists.newArrayList();
for(int i=0; i<30; ++i) { final int ii = i; futures.add(es.submit(new Callable() {
public Integer call() {
System.out.println(“Running task ” + ii);
try {
Thread.sleep(1000);
} catch(InterruptedException e) {
// ignore
}
System.out.println(“Task ” + ii + ” ends”);
return ii;
}
}));
}

for(Future f: futures);
try {
System.out.println(“Result: ” + f.get());
} catch(CancellationException e) {
System.out.println(“Task was cancelled”);
}
}
[/cc_java]

Note that now you have to catch the [cc_java inline=”true”]CancellationException[/cc_java] when getting the [cc_java inline=”true”]Future[/cc_java]’s result, because discarding the task will cancel the [cc_java inline=”true”]Future[/cc_java].

Guava’s [cc_java inline=”true”]Futures.successfulAsList[/cc_java] method makes things a bit easier if what you want to do is wait for all [cc_java inline=”true”]Future[/cc_java]s to finish, and then look at the results. It returns [cc_java inline=”true”]null[/cc_java] for failed or cancelled tasks, including the ones our policies reject:

[cc_java]
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(10),
new DiscardAndCancelOldestPolicy());
ListeningExecutorService es = MoreExecutors.listeningDecorator(tpe);

// …

Future> all = Futures.successfulAsList(futures);
List results = all.get();
[/cc_java]

Got something better?

Share

About Mathias

Software development engineer. Principal developer of DrJava. Recent Ph.D. graduate from the Department of Computer Science at Rice University.
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply