From be6972b89841730ecde12294d5bc137aef0036b3 Mon Sep 17 00:00:00 2001 From: Luca Conte Date: Wed, 7 May 2025 22:29:00 +0200 Subject: [PATCH] u07-2 b --- ...utorCompletionServiceTester_with_take.java | 56 ++++++++++++ ...leExecutorCompletionService_with_take.java | 85 +++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 u07-2/SimpleExecutorCompletionServiceTester_with_take.java create mode 100644 u07-2/SimpleExecutorCompletionService_with_take.java diff --git a/u07-2/SimpleExecutorCompletionServiceTester_with_take.java b/u07-2/SimpleExecutorCompletionServiceTester_with_take.java new file mode 100644 index 0000000..315f220 --- /dev/null +++ b/u07-2/SimpleExecutorCompletionServiceTester_with_take.java @@ -0,0 +1,56 @@ +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +public class SimpleExecutorCompletionServiceTester_with_take extends Concurrent { + + private static final int N_TASKS_PER_TESTER = 3; + private static final ExecutorService ecs = Executors.newCachedThreadPool(); + private static final SimpleExecutorCompletionService_with_take secs = + new SimpleExecutorCompletionService_with_take<>(ecs); + + public SimpleExecutorCompletionServiceTester_with_take(Integer[] args) { + super(args); + } + + private static int sleepAndReturnTime(int sleepTimeMs) throws InterruptedException { + Thread.sleep(sleepTimeMs); + return sleepTimeMs; + } + + public static void main(String[] args) { + new SimpleExecutorCompletionServiceTester_with_take(new Integer[]{1, 2, 3, 4, 5}).run(); + List unstartedTasks = ecs.shutdownNow(); + if (!unstartedTasks.isEmpty()) + System.out.println("Still some unstarted tasks left after end of all tester threads!?"); + try { + ecs.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + protected void perform(Integer testerNr) { + + for (int i = 0; i < N_TASKS_PER_TESTER; ++i) { + secs.submit(() -> sleepAndReturnTime(10+10*(int)(Math.random()*10))); + // Note that submit's result is ignored here + } + + for (int i = 0; i < N_TASKS_PER_TESTER; ++i) { + try { + Future fut = secs.take(); + System.out.println("Tester " + testerNr + " received result " + fut.get()); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + return; + } + } + System.out.println("Tester " + testerNr + " ends."); + } + +} diff --git a/u07-2/SimpleExecutorCompletionService_with_take.java b/u07-2/SimpleExecutorCompletionService_with_take.java new file mode 100644 index 0000000..b355c13 --- /dev/null +++ b/u07-2/SimpleExecutorCompletionService_with_take.java @@ -0,0 +1,85 @@ +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public class SimpleExecutorCompletionService_with_take { + private ExecutorService ex; + + private ArrayList> pendingFutures; + private ArrayList> doneFutures; + + public SimpleExecutorCompletionService_with_take(ExecutorService ex) { + this.ex = ex; + this.pendingFutures = new ArrayList<>(); + this.doneFutures = new ArrayList<>(); + } + + synchronized public Future submit(Callable task) { + // wrap in another callable to run notify, as soon as task is completed + Callable c = () -> { + ResultType result = task.call(); + synchronized (this) { // creates possibly unwanted memory barrier + notify(); + // calling updateFuturesList will not do anything here + // because the future will only be marked done after this function returns + } + return result; + }; + + Future f = ex.submit(c); + + this.pendingFutures.add(f); + + return f; + + // alternative solution idea: + // create background thread to repeatedly check for done futures + } + + synchronized public Future poll() { + this.updateFuturesLists(); + + if (!this.doneFutures.isEmpty()) { + return this.doneFutures.removeLast(); + } else { + return null; + } + } + + synchronized public Future take() { + // update list first to check if tasks are already completed + this.updateFuturesLists(); + + // wait until a task finishes, if there aren't any currently done + while (this.doneFutures.isEmpty()) { + try { + wait(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + // update list again, after being notified, since callable wrapper can't update list + // this SHOULD always result in at least one future being added to the done list + this.updateFuturesLists(); + } + return this.doneFutures.removeLast(); + } + + synchronized private void updateFuturesLists() { + + for (int i = 0; i < this.pendingFutures.size(); i++) { + Future f = this.pendingFutures.get(i); + + // for (Future f : this.pendingFutures) { + // causes concurrent access exception - why? + + if (f.isDone()) { + this.pendingFutures.remove(f); + this.doneFutures.add(f); + notify(); + } + } + + } +} \ No newline at end of file