Problems with RejectedExecutionHandler and Futures

I’ve been working with ThreadPoolExecutor and RejectedExecutionHandler. The executor framework provides a convenient way to distribute concurrent tasks to multiple threads, and the different RejectedExecutionHandler policies, ThreadPoolExecutor.DiscardOldestPolicy, for instance, enable your program to shed load if there is more work than it can reliably handle.

Unfortunately, when I started using Futures, things became complicated. Consider the following example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ThreadPoolExecutor es = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10),
  new ThreadPoolExecutor.DiscardOldestPolicy());

List<Future<Integer>> futures = Lists.newArrayList();
for(int i=0; i<30; ++i) {
  final int ii = i;
  futures.add(es.submit(new Callable<Integer>() {
    public Integer call() {
      System.out.println("Running task " + ii);
      try {
        Thread.sleep(1000);
      } catch(InterruptedException e) {
        // ignore
      }
      System.out.println("Task " + ii + " ends");
      return ii;
    }
  }));
}

for(Future<Integer> f: futures);
  System.out.println("Result: " + f.get());
}

It creates a thread pool with a maximum of 10 threads, and a work queue that can hold another 10 items. The policy to shed load is ThreadPoolExecutor.DiscardOldestPolicy, which means it will favor newer tasks over older tasks, if the queue has reached its capacity.

Then the program creates 30 tasks and submits them to the executor service. The tasks each sleep for a second to make sure we actually hit the capacity of the thread pool executor. As a result, only 20 can be submitted (10 running, but sleeping, and another 10 in the work queue), and 10 need to be rejected. The tasks that are rejected are actually the ones that were in the queue first (the older tasks), so we can expect tasks 0 to 9 as well as 20 to 29 to execute.

There is a problem, however, when we wait for the results of tasks 10 to 19 by calling their Future<Integer>.get() methods: Because the tasks have been rejected, they never start running, which means they will never finish. The program hangs here.

The only RejectedExecutionHandlers in the JDK that function correctly here are ThreadPoolExecutor.AbortPolicy and ThreadPoolExecutor.CallerRunsPolicy. ThreadPoolExecutor.AbortPolicy throws a RejectedExecutionException at the time the task is submitted to the executor service already, not later, and therefore does not return a Future to wait for. ThreadPoolExecutor.CallerRunsPolicy executes the task at the time it is executed at the time it is added to the executor service, and while it returns a Future, that Future is done immediately, and therefore, getting the Future‘s result later will not block.

But what’s the right thing to do here for ThreadPoolExecutor.DiscardPolicy and ThreadPoolExecutor.DiscardOldestPolicy? Submitting tasks with these policies does return a Future, but if the task is rejected, that Future will never be done.

I haven’t found a good solution with the built-in classes of the JDK yet. What I have done now is to cancel the Future in the RejectedExecutionHandler. Unfortunately, that can’t easily be done with the existing RejectedExecutionHandler policies, like ThreadPoolExecutor.DiscardOldestPolicy.

1
2
3
4
5
6
7
8
9
10
11
public class DiscardAndCancelOldestPolicy implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
      Runnable rejected = e.getQueue().poll();
      if (rejected instanceof Future) {
        ((Future<?>) rejected).cancel(false);
      }
      e.execute(r);
    }
  }
}
1
2
3
4
5
6
7
public class DiscardAndCancelPolicy implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (r instanceof Future) {
      ((Future<?>) r).cancel(false);
    }
  }
}

Now we can use those policies, and the program behaves as expected. For example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
ThreadPoolExecutor es = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10),
  new DiscardAndCancelOldestPolicy());

List<Future<Integer>> futures = Lists.newArrayList();
for(int i=0; i<30; ++i) {
  final int ii = i;
  futures.add(es.submit(new Callable<Integer>() {
    public Integer call() {
      System.out.println("Running task " + ii);
      try {
        Thread.sleep(1000);
      } catch(InterruptedException e) {
        // ignore
      }
      System.out.println("Task " + ii + " ends");
      return ii;
    }
  }));
}

for(Future<Integer> f: futures);
  try {
    System.out.println("Result: " + f.get());
  } catch(CancellationException e) {
    System.out.println("Task was cancelled");
  }
}

Note that now you have to catch the CancellationException when getting the Future‘s result, because discarding the task will cancel the Future.

Guava’s Futures.successfulAsList method makes things a bit easier if what you want to do is wait for all Futures to finish, and then look at the results. It returns null for failed or cancelled tasks, including the ones our policies reject:

1
2
3
4
5
6
7
8
ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(10),
  new DiscardAndCancelOldestPolicy());
ListeningExecutorService es = MoreExecutors.listeningDecorator(tpe);

// ...

Future<List<Integer>> all = Futures.successfulAsList(futures);
List<Integer> results = all.get();

Got something better?

Share

About Mathias

Software development engineer. Principal developer of DrJava. Recent Ph.D. graduate from the Department of Computer Science at Rice University.
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply