u07-2 a
This commit is contained in:
parent
9ec2dae73f
commit
8f78ceb52e
|
@ -0,0 +1,26 @@
|
||||||
|
abstract class Concurrent<ArgType> {
|
||||||
|
private ArgType[] args;
|
||||||
|
|
||||||
|
public Concurrent(ArgType[] args) {
|
||||||
|
this.args = args;
|
||||||
|
}
|
||||||
|
|
||||||
|
void run() {
|
||||||
|
Thread[] threads = new Thread[args.length];
|
||||||
|
|
||||||
|
for (int i = 0; i < args.length; i++) {
|
||||||
|
final ArgType arg = args[i];
|
||||||
|
threads[i] = new Thread(() -> perform(arg));
|
||||||
|
threads[i].start();
|
||||||
|
}
|
||||||
|
for (int i = 0; i < threads.length; i++) {
|
||||||
|
try {
|
||||||
|
threads[i].join();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract void perform(ArgType arg);
|
||||||
|
}
|
|
@ -0,0 +1,51 @@
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
public class SimpleExecutorCompletionService<ResultType> {
|
||||||
|
private ExecutorService ex;
|
||||||
|
|
||||||
|
private ArrayList<Future<ResultType>> pendingFutures;
|
||||||
|
private ArrayList<Future<ResultType>> doneFutures;
|
||||||
|
|
||||||
|
public SimpleExecutorCompletionService(ExecutorService ex) {
|
||||||
|
this.ex = ex;
|
||||||
|
this.pendingFutures = new ArrayList<>();
|
||||||
|
this.doneFutures = new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized public Future<ResultType> submit(Callable<ResultType> task) {
|
||||||
|
Future<ResultType> f = ex.submit(task);
|
||||||
|
|
||||||
|
this.pendingFutures.add(f);
|
||||||
|
|
||||||
|
return f;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized public Future<ResultType> poll() {
|
||||||
|
this.updateFuturesLists();
|
||||||
|
|
||||||
|
if (!this.doneFutures.isEmpty()) {
|
||||||
|
return this.doneFutures.removeLast();
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized private void updateFuturesLists() {
|
||||||
|
|
||||||
|
for (int i = 0; i < this.pendingFutures.size(); i++) {
|
||||||
|
Future<ResultType> f = this.pendingFutures.get(i);
|
||||||
|
|
||||||
|
// for (Future<ResultType> f : this.pendingFutures) {
|
||||||
|
// causes concurrent access exception - why?
|
||||||
|
|
||||||
|
if (f.isDone()) {
|
||||||
|
this.pendingFutures.remove(f);
|
||||||
|
this.doneFutures.add(f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,68 @@
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
|
||||||
|
public class SimpleExecutorCompletionServiceTester extends Concurrent<Integer> {
|
||||||
|
|
||||||
|
private static final int N_TASKS_PER_TESTER = 10;
|
||||||
|
private static final ExecutorService ecs = Executors.newCachedThreadPool();
|
||||||
|
private static final SimpleExecutorCompletionService<Integer> secs =
|
||||||
|
new SimpleExecutorCompletionService<>(ecs);
|
||||||
|
|
||||||
|
public SimpleExecutorCompletionServiceTester(Integer[] args) {
|
||||||
|
super(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static int sleepAndReturnTime(int sleepTimeMs) throws InterruptedException {
|
||||||
|
Thread.sleep(sleepTimeMs);
|
||||||
|
return sleepTimeMs;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
new SimpleExecutorCompletionServiceTester(new Integer[]{1, 2, 3, 4, 5}).run();
|
||||||
|
List<Runnable> unstartedTasks = ecs.shutdownNow();
|
||||||
|
if (!unstartedTasks.isEmpty())
|
||||||
|
System.out.println("Still some unstarted tasks left after end of all tester threads!?");
|
||||||
|
try {
|
||||||
|
ecs.awaitTermination(1, TimeUnit.SECONDS);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void perform(Integer testerNr) {
|
||||||
|
|
||||||
|
for (int i = 0; i < N_TASKS_PER_TESTER; ++i) {
|
||||||
|
secs.submit(() -> sleepAndReturnTime(10+10*(int)(Math.random()*10)));
|
||||||
|
// Note that submit's result is ignored here
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < N_TASKS_PER_TESTER; ++i) {
|
||||||
|
Future<Integer> fut = null;
|
||||||
|
do {
|
||||||
|
fut = secs.poll();
|
||||||
|
if (fut == null)
|
||||||
|
try {
|
||||||
|
// No finished task right now => wait a bit (but less than the minimal task duration!)
|
||||||
|
// before polling again
|
||||||
|
Thread.sleep((long) (Math.random()*5));
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Nothing to do here: Sleep time is still random even if interrupted
|
||||||
|
}
|
||||||
|
} while (fut == null);
|
||||||
|
try {
|
||||||
|
System.out.println("Tester " + testerNr + " received result " + fut.get());
|
||||||
|
} catch (InterruptedException | ExecutionException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
System.out.println("Tester " + testerNr + " ends.");
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue