import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; public class SimpleExecutorCompletionServiceTester extends Concurrent { private static final int N_TASKS_PER_TESTER = 10; private static final ExecutorService ecs = Executors.newCachedThreadPool(); private static final SimpleExecutorCompletionService secs = new SimpleExecutorCompletionService<>(ecs); public SimpleExecutorCompletionServiceTester(Integer[] args) { super(args); } private static int sleepAndReturnTime(int sleepTimeMs) throws InterruptedException { Thread.sleep(sleepTimeMs); return sleepTimeMs; } public static void main(String[] args) { new SimpleExecutorCompletionServiceTester(new Integer[]{1, 2, 3, 4, 5}).run(); List unstartedTasks = ecs.shutdownNow(); if (!unstartedTasks.isEmpty()) System.out.println("Still some unstarted tasks left after end of all tester threads!?"); try { ecs.awaitTermination(1, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } @Override protected void perform(Integer testerNr) { for (int i = 0; i < N_TASKS_PER_TESTER; ++i) { secs.submit(() -> sleepAndReturnTime(10+10*(int)(Math.random()*10))); // Note that submit's result is ignored here } for (int i = 0; i < N_TASKS_PER_TESTER; ++i) { Future fut = null; do { fut = secs.poll(); if (fut == null) try { // No finished task right now => wait a bit (but less than the minimal task duration!) // before polling again Thread.sleep((long) (Math.random()*5)); } catch (InterruptedException e) { // Nothing to do here: Sleep time is still random even if interrupted } } while (fut == null); try { System.out.println("Tester " + testerNr + " received result " + fut.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return; } } System.out.println("Tester " + testerNr + " ends."); } }