I’ve been working with ThreadPoolExecutor
and RejectedExecutionHandler
. The executor framework provides a convenient way to distribute concurrent tasks to multiple threads, and the different RejectedExecutionHandler
policies, ThreadPoolExecutor.DiscardOldestPolicy
, for instance, enable your program to shed load if there is more work than it can reliably handle.
Unfortunately, when I started using Future
s, things became complicated. Consider the following example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | ThreadPoolExecutor es = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10), new ThreadPoolExecutor.DiscardOldestPolicy()); List<Future<Integer>> futures = Lists.newArrayList(); for(int i=0; i<30; ++i) { final int ii = i; futures.add(es.submit(new Callable<Integer>() { public Integer call() { System.out.println("Running task " + ii); try { Thread.sleep(1000); } catch(InterruptedException e) { // ignore } System.out.println("Task " + ii + " ends"); return ii; } })); } for(Future<Integer> f: futures); System.out.println("Result: " + f.get()); } |
It creates a thread pool with a maximum of 10 threads, and a work queue that can hold another 10 items. The policy to shed load is ThreadPoolExecutor.DiscardOldestPolicy
, which means it will favor newer tasks over older tasks, if the queue has reached its capacity.
Then the program creates 30 tasks and submits them to the executor service. The tasks each sleep for a second to make sure we actually hit the capacity of the thread pool executor. As a result, only 20 can be submitted (10 running, but sleeping, and another 10 in the work queue), and 10 need to be rejected. The tasks that are rejected are actually the ones that were in the queue first (the older tasks), so we can expect tasks 0 to 9 as well as 20 to 29 to execute.
There is a problem, however, when we wait for the results of tasks 10 to 19 by calling their Future<Integer>.get()
methods: Because the tasks have been rejected, they never start running, which means they will never finish. The program hangs here.
The only RejectedExecutionHandler
s in the JDK that function correctly here are ThreadPoolExecutor.AbortPolicy
and ThreadPoolExecutor.CallerRunsPolicy
. ThreadPoolExecutor.AbortPolicy
throws a RejectedExecutionException
at the time the task is submitted to the executor service already, not later, and therefore does not return a Future
to wait for. ThreadPoolExecutor.CallerRunsPolicy
executes the task at the time it is executed at the time it is added to the executor service, and while it returns a Future
, that Future
is done immediately, and therefore, getting the Future
‘s result later will not block.
But what’s the right thing to do here for ThreadPoolExecutor.DiscardPolicy
and ThreadPoolExecutor.DiscardOldestPolicy
? Submitting tasks with these policies does return a Future
, but if the task is rejected, that Future
will never be done.
I haven’t found a good solution with the built-in classes of the JDK yet. What I have done now is to cancel the Future
in the RejectedExecutionHandler
. Unfortunately, that can’t easily be done with the existing RejectedExecutionHandler
policies, like ThreadPoolExecutor.DiscardOldestPolicy
.
1 2 3 4 5 6 7 8 9 10 11 | public class DiscardAndCancelOldestPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { Runnable rejected = e.getQueue().poll(); if (rejected instanceof Future) { ((Future<?>) rejected).cancel(false); } e.execute(r); } } } |
1 2 3 4 5 6 7 | public class DiscardAndCancelPolicy implements RejectedExecutionHandler { public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (r instanceof Future) { ((Future<?>) r).cancel(false); } } } |
Now we can use those policies, and the program behaves as expected. For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 | ThreadPoolExecutor es = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10), new DiscardAndCancelOldestPolicy()); List<Future<Integer>> futures = Lists.newArrayList(); for(int i=0; i<30; ++i) { final int ii = i; futures.add(es.submit(new Callable<Integer>() { public Integer call() { System.out.println("Running task " + ii); try { Thread.sleep(1000); } catch(InterruptedException e) { // ignore } System.out.println("Task " + ii + " ends"); return ii; } })); } for(Future<Integer> f: futures); try { System.out.println("Result: " + f.get()); } catch(CancellationException e) { System.out.println("Task was cancelled"); } } |
Note that now you have to catch the CancellationException
when getting the Future
‘s result, because discarding the task will cancel the Future
.
Guava’s Futures.successfulAsList
method makes things a bit easier if what you want to do is wait for all Future
s to finish, and then look at the results. It returns null
for failed or cancelled tasks, including the ones our policies reject:
1 2 3 4 5 6 7 8 | ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10), new DiscardAndCancelOldestPolicy()); ListeningExecutorService es = MoreExecutors.listeningDecorator(tpe); // ... Future<List<Integer>> all = Futures.successfulAsList(futures); List<Integer> results = all.get(); |
Got something better?