add third semaphore
This commit is contained in:
parent
5947fbc22c
commit
49ee1fa302
|
@ -4,22 +4,26 @@ import java.util.Queue;
|
||||||
import java.util.concurrent.Semaphore;
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
class WorkQueue {
|
class WorkQueue {
|
||||||
private Queue<Task> tasks = new LinkedList<Task>();
|
private volatile Queue<Task> tasks = new LinkedList<Task>();
|
||||||
private Semaphore readSemaphore;
|
private Semaphore readSemaphore;
|
||||||
private Semaphore writeSemaphore;
|
private Semaphore writeSemaphore;
|
||||||
|
private Semaphore generalAccessSemaphore;
|
||||||
|
|
||||||
public WorkQueue(int maxSize) {
|
public WorkQueue(int maxSize) {
|
||||||
readSemaphore = new Semaphore(0);
|
readSemaphore = new Semaphore(0);
|
||||||
writeSemaphore = new Semaphore(maxSize);
|
writeSemaphore = new Semaphore(maxSize);
|
||||||
|
generalAccessSemaphore = new Semaphore(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void producer() throws InterruptedException {
|
public void producer() throws InterruptedException {
|
||||||
writeSemaphore.acquire();
|
writeSemaphore.acquire();
|
||||||
|
generalAccessSemaphore.acquire();
|
||||||
|
|
||||||
Task t = new Task();
|
Task t = new Task();
|
||||||
tasks.add(t);
|
tasks.add(t);
|
||||||
|
|
||||||
readSemaphore.release();
|
readSemaphore.release();
|
||||||
|
generalAccessSemaphore.release();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void consumer() throws InterruptedException {
|
public void consumer() throws InterruptedException {
|
||||||
|
@ -27,9 +31,10 @@ class WorkQueue {
|
||||||
Task t;
|
Task t;
|
||||||
|
|
||||||
readSemaphore.acquire();
|
readSemaphore.acquire();
|
||||||
|
generalAccessSemaphore.acquire();
|
||||||
|
|
||||||
// for some reason, consumer can get here without tasks in the list. how to fix?
|
// for some reason, consumer can get here without tasks in the list. how to fix?
|
||||||
System.out.println(tasks.size());
|
// -> adding general access semaphore fixed it :)
|
||||||
if (tasks.isEmpty()) {
|
if (tasks.isEmpty()) {
|
||||||
// SHOULD never be reached
|
// SHOULD never be reached
|
||||||
System.out.println("!!! No tasks in the queue");
|
System.out.println("!!! No tasks in the queue");
|
||||||
|
@ -43,6 +48,7 @@ class WorkQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
writeSemaphore.release();
|
writeSemaphore.release();
|
||||||
|
generalAccessSemaphore.release();
|
||||||
|
|
||||||
t.run();
|
t.run();
|
||||||
}
|
}
|
||||||
|
@ -54,17 +60,15 @@ class WorkQueue {
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
WorkQueue w = new WorkQueue(10);
|
WorkQueue w = new WorkQueue(10);
|
||||||
Thread[] producers = new Thread[5];
|
Thread[] producers = new Thread[50];
|
||||||
Thread[] consumers = new Thread[5];
|
Thread[] consumers = new Thread[50];
|
||||||
for (int i = 1; i <= 2; i++) {
|
for (int i = 0; i < 50; i++) {
|
||||||
final int i_final = i;
|
|
||||||
producers[i] = new Thread(() -> {
|
producers[i] = new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
for (int j = 0; j < 10; j++) {
|
for (int j = 0; j < 100; j++) {
|
||||||
w.producer();
|
w.producer();
|
||||||
System.out.println("Producer " + i_final + " produced task " + j);
|
|
||||||
w.printTaksSize();
|
w.printTaksSize();
|
||||||
// Thread.sleep(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
@ -72,11 +76,10 @@ class WorkQueue {
|
||||||
});
|
});
|
||||||
consumers[i] = new Thread(() -> {
|
consumers[i] = new Thread(() -> {
|
||||||
try {
|
try {
|
||||||
for (int j = 0; j < 10; j++) {
|
for (int j = 0; j < 100; j++) {
|
||||||
w.consumer();
|
w.consumer();
|
||||||
w.printTaksSize();
|
w.printTaksSize();
|
||||||
System.out.println("Consumer " + i_final + " consumed task " + j);
|
Thread.sleep(100);
|
||||||
// Thread.sleep(100);
|
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
Thread.currentThread().interrupt();
|
Thread.currentThread().interrupt();
|
||||||
|
|
Loading…
Reference in New Issue