diff --git a/u07-2/Concurrent.java b/u07-2/Concurrent.java new file mode 100644 index 0000000..5c35d05 --- /dev/null +++ b/u07-2/Concurrent.java @@ -0,0 +1,26 @@ +abstract class Concurrent { + private ArgType[] args; + + public Concurrent(ArgType[] args) { + this.args = args; + } + + void run() { + Thread[] threads = new Thread[args.length]; + + for (int i = 0; i < args.length; i++) { + final ArgType arg = args[i]; + threads[i] = new Thread(() -> perform(arg)); + threads[i].start(); + } + for (int i = 0; i < threads.length; i++) { + try { + threads[i].join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + abstract void perform(ArgType arg); +} \ No newline at end of file diff --git a/u07-2/SimpleExecutorCompletionService.java b/u07-2/SimpleExecutorCompletionService.java new file mode 100644 index 0000000..ffc0642 --- /dev/null +++ b/u07-2/SimpleExecutorCompletionService.java @@ -0,0 +1,51 @@ +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +public class SimpleExecutorCompletionService { + private ExecutorService ex; + + private ArrayList> pendingFutures; + private ArrayList> doneFutures; + + public SimpleExecutorCompletionService(ExecutorService ex) { + this.ex = ex; + this.pendingFutures = new ArrayList<>(); + this.doneFutures = new ArrayList<>(); + } + + synchronized public Future submit(Callable task) { + Future f = ex.submit(task); + + this.pendingFutures.add(f); + + return f; + } + + synchronized public Future poll() { + this.updateFuturesLists(); + + if (!this.doneFutures.isEmpty()) { + return this.doneFutures.removeLast(); + } else { + return null; + } + } + + synchronized private void updateFuturesLists() { + + for (int i = 0; i < this.pendingFutures.size(); i++) { + Future f = this.pendingFutures.get(i); + + // for (Future f : this.pendingFutures) { + // causes concurrent access exception - why? + + if (f.isDone()) { + this.pendingFutures.remove(f); + this.doneFutures.add(f); + } + } + + } +} \ No newline at end of file diff --git a/u07-2/SimpleExecutorCompletionServiceTester.java b/u07-2/SimpleExecutorCompletionServiceTester.java new file mode 100644 index 0000000..1d00b2b --- /dev/null +++ b/u07-2/SimpleExecutorCompletionServiceTester.java @@ -0,0 +1,68 @@ +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + + +public class SimpleExecutorCompletionServiceTester extends Concurrent { + + private static final int N_TASKS_PER_TESTER = 10; + private static final ExecutorService ecs = Executors.newCachedThreadPool(); + private static final SimpleExecutorCompletionService secs = + new SimpleExecutorCompletionService<>(ecs); + + public SimpleExecutorCompletionServiceTester(Integer[] args) { + super(args); + } + + private static int sleepAndReturnTime(int sleepTimeMs) throws InterruptedException { + Thread.sleep(sleepTimeMs); + return sleepTimeMs; + } + + public static void main(String[] args) { + new SimpleExecutorCompletionServiceTester(new Integer[]{1, 2, 3, 4, 5}).run(); + List unstartedTasks = ecs.shutdownNow(); + if (!unstartedTasks.isEmpty()) + System.out.println("Still some unstarted tasks left after end of all tester threads!?"); + try { + ecs.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + @Override + protected void perform(Integer testerNr) { + + for (int i = 0; i < N_TASKS_PER_TESTER; ++i) { + secs.submit(() -> sleepAndReturnTime(10+10*(int)(Math.random()*10))); + // Note that submit's result is ignored here + } + + for (int i = 0; i < N_TASKS_PER_TESTER; ++i) { + Future fut = null; + do { + fut = secs.poll(); + if (fut == null) + try { + // No finished task right now => wait a bit (but less than the minimal task duration!) + // before polling again + Thread.sleep((long) (Math.random()*5)); + } catch (InterruptedException e) { + // Nothing to do here: Sleep time is still random even if interrupted + } + } while (fut == null); + try { + System.out.println("Tester " + testerNr + " received result " + fut.get()); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + return; + } + } + System.out.println("Tester " + testerNr + " ends."); + } + +}