u07-2 b
This commit is contained in:
parent
8f78ceb52e
commit
be6972b898
|
@ -0,0 +1,56 @@
|
|||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SimpleExecutorCompletionServiceTester_with_take extends Concurrent<Integer> {
|
||||
|
||||
private static final int N_TASKS_PER_TESTER = 3;
|
||||
private static final ExecutorService ecs = Executors.newCachedThreadPool();
|
||||
private static final SimpleExecutorCompletionService_with_take<Integer> secs =
|
||||
new SimpleExecutorCompletionService_with_take<>(ecs);
|
||||
|
||||
public SimpleExecutorCompletionServiceTester_with_take(Integer[] args) {
|
||||
super(args);
|
||||
}
|
||||
|
||||
private static int sleepAndReturnTime(int sleepTimeMs) throws InterruptedException {
|
||||
Thread.sleep(sleepTimeMs);
|
||||
return sleepTimeMs;
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
new SimpleExecutorCompletionServiceTester_with_take(new Integer[]{1, 2, 3, 4, 5}).run();
|
||||
List<Runnable> unstartedTasks = ecs.shutdownNow();
|
||||
if (!unstartedTasks.isEmpty())
|
||||
System.out.println("Still some unstarted tasks left after end of all tester threads!?");
|
||||
try {
|
||||
ecs.awaitTermination(1, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void perform(Integer testerNr) {
|
||||
|
||||
for (int i = 0; i < N_TASKS_PER_TESTER; ++i) {
|
||||
secs.submit(() -> sleepAndReturnTime(10+10*(int)(Math.random()*10)));
|
||||
// Note that submit's result is ignored here
|
||||
}
|
||||
|
||||
for (int i = 0; i < N_TASKS_PER_TESTER; ++i) {
|
||||
try {
|
||||
Future<Integer> fut = secs.take();
|
||||
System.out.println("Tester " + testerNr + " received result " + fut.get());
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
e.printStackTrace();
|
||||
return;
|
||||
}
|
||||
}
|
||||
System.out.println("Tester " + testerNr + " ends.");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,85 @@
|
|||
import java.util.ArrayList;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class SimpleExecutorCompletionService_with_take<ResultType> {
|
||||
private ExecutorService ex;
|
||||
|
||||
private ArrayList<Future<ResultType>> pendingFutures;
|
||||
private ArrayList<Future<ResultType>> doneFutures;
|
||||
|
||||
public SimpleExecutorCompletionService_with_take(ExecutorService ex) {
|
||||
this.ex = ex;
|
||||
this.pendingFutures = new ArrayList<>();
|
||||
this.doneFutures = new ArrayList<>();
|
||||
}
|
||||
|
||||
synchronized public Future<ResultType> submit(Callable<ResultType> task) {
|
||||
// wrap in another callable to run notify, as soon as task is completed
|
||||
Callable<ResultType> c = () -> {
|
||||
ResultType result = task.call();
|
||||
synchronized (this) { // creates possibly unwanted memory barrier
|
||||
notify();
|
||||
// calling updateFuturesList will not do anything here
|
||||
// because the future will only be marked done after this function returns
|
||||
}
|
||||
return result;
|
||||
};
|
||||
|
||||
Future<ResultType> f = ex.submit(c);
|
||||
|
||||
this.pendingFutures.add(f);
|
||||
|
||||
return f;
|
||||
|
||||
// alternative solution idea:
|
||||
// create background thread to repeatedly check for done futures
|
||||
}
|
||||
|
||||
synchronized public Future<ResultType> poll() {
|
||||
this.updateFuturesLists();
|
||||
|
||||
if (!this.doneFutures.isEmpty()) {
|
||||
return this.doneFutures.removeLast();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized public Future<ResultType> take() {
|
||||
// update list first to check if tasks are already completed
|
||||
this.updateFuturesLists();
|
||||
|
||||
// wait until a task finishes, if there aren't any currently done
|
||||
while (this.doneFutures.isEmpty()) {
|
||||
try {
|
||||
wait();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
// update list again, after being notified, since callable wrapper can't update list
|
||||
// this SHOULD always result in at least one future being added to the done list
|
||||
this.updateFuturesLists();
|
||||
}
|
||||
return this.doneFutures.removeLast();
|
||||
}
|
||||
|
||||
synchronized private void updateFuturesLists() {
|
||||
|
||||
for (int i = 0; i < this.pendingFutures.size(); i++) {
|
||||
Future<ResultType> f = this.pendingFutures.get(i);
|
||||
|
||||
// for (Future<ResultType> f : this.pendingFutures) {
|
||||
// causes concurrent access exception - why?
|
||||
|
||||
if (f.isDone()) {
|
||||
this.pendingFutures.remove(f);
|
||||
this.doneFutures.add(f);
|
||||
notify();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue