81 lines
2.3 KiB
Java
81 lines
2.3 KiB
Java
import java.util.ArrayList;
|
|
import java.util.concurrent.Callable;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Future;
|
|
|
|
public class SimpleExecutorCompletionService_with_take<ResultType> {
|
|
private ExecutorService ex;
|
|
|
|
private ArrayList<Future<ResultType>> pendingFutures;
|
|
private ArrayList<Future<ResultType>> doneFutures;
|
|
|
|
public SimpleExecutorCompletionService_with_take(ExecutorService ex) {
|
|
this.ex = ex;
|
|
this.pendingFutures = new ArrayList<>();
|
|
this.doneFutures = new ArrayList<>();
|
|
}
|
|
|
|
synchronized public Future<ResultType> submit(Callable<ResultType> task) {
|
|
// wrap in another callable to run notify, as soon as task is completed
|
|
Callable<ResultType> c = () -> {
|
|
ResultType result = task.call();
|
|
synchronized (this) { // creates possibly unwanted memory barrier
|
|
notify();
|
|
// calling updateFuturesList will not do anything here
|
|
// because the future will only be marked done after this function returns
|
|
}
|
|
return result;
|
|
};
|
|
|
|
Future<ResultType> f = ex.submit(c);
|
|
|
|
this.pendingFutures.add(f);
|
|
|
|
return f;
|
|
|
|
// alternative solution idea:
|
|
// create background thread to repeatedly check for done futures
|
|
}
|
|
|
|
synchronized public Future<ResultType> poll() {
|
|
this.updateFuturesLists();
|
|
|
|
if (!this.doneFutures.isEmpty()) {
|
|
return this.doneFutures.removeLast();
|
|
} else {
|
|
return null;
|
|
}
|
|
}
|
|
|
|
synchronized public Future<ResultType> take() throws InterruptedException {
|
|
// update list first to check if tasks are already completed
|
|
this.updateFuturesLists();
|
|
|
|
// wait until a task finishes, if there aren't any currently done
|
|
while (this.doneFutures.isEmpty()) {
|
|
wait();
|
|
|
|
// update list again, after being notified, since callable wrapper can't update list
|
|
// this SHOULD always result in at least one future being added to the done list
|
|
this.updateFuturesLists();
|
|
}
|
|
return this.doneFutures.removeLast();
|
|
}
|
|
|
|
synchronized private void updateFuturesLists() {
|
|
|
|
// for (int i = 0; i < this.pendingFutures.size(); i++) {
|
|
for (Future<ResultType> f : this.pendingFutures) {
|
|
// Future<ResultType> f = this.pendingFutures.get(i);
|
|
|
|
// causes concurrent access exception - why?
|
|
|
|
if (f.isDone()) {
|
|
this.pendingFutures.remove(f);
|
|
this.doneFutures.add(f);
|
|
//notify();
|
|
}
|
|
}
|
|
|
|
}
|
|
} |