import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; public class SimpleExecutorCompletionService_with_take { private ExecutorService ex; private ArrayList> pendingFutures; private ArrayList> doneFutures; public SimpleExecutorCompletionService_with_take(ExecutorService ex) { this.ex = ex; this.pendingFutures = new ArrayList<>(); this.doneFutures = new ArrayList<>(); } synchronized public Future submit(Callable task) { // wrap in another callable to run notify, as soon as task is completed Callable c = () -> { ResultType result = task.call(); synchronized (this) { // creates possibly unwanted memory barrier notify(); // calling updateFuturesList will not do anything here // because the future will only be marked done after this function returns } return result; }; Future f = ex.submit(c); this.pendingFutures.add(f); return f; // alternative solution idea: // create background thread to repeatedly check for done futures } synchronized public Future poll() { this.updateFuturesLists(); if (!this.doneFutures.isEmpty()) { return this.doneFutures.removeLast(); } else { return null; } } synchronized public Future take() throws InterruptedException { // update list first to check if tasks are already completed this.updateFuturesLists(); // wait until a task finishes, if there aren't any currently done while (this.doneFutures.isEmpty()) { wait(); // update list again, after being notified, since callable wrapper can't update list // this SHOULD always result in at least one future being added to the done list this.updateFuturesLists(); } return this.doneFutures.removeLast(); } synchronized private void updateFuturesLists() { // for (int i = 0; i < this.pendingFutures.size(); i++) { for (Future f : this.pendingFutures) { // Future f = this.pendingFutures.get(i); // causes concurrent access exception - why? if (f.isDone()) { this.pendingFutures.remove(f); this.doneFutures.add(f); //notify(); } } } }