BoundedBuffer.java
/**
* BoundedBuffer
* <p/>
* Bounded buffer using condition queues
*
*/
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
// CONDITION PREDICATE: not-full (!isFull())
// CONDITION PREDICATE: not-empty (!isEmpty())
public BoundedBuffer() {
this(100);
}
public BoundedBuffer(int size) {
super(size);
}
// BLOCKS-UNTIL: not-full
public synchronized void put(V v) throws InterruptedException {
while (isFull()) {
wait();
System.out.print("p-");
}
doPut(v);
notifyAll();
}
// BLOCKS-UNTIL: not-empty
public synchronized V take() throws InterruptedException {
while (isEmpty()) {
wait();
System.out.print("t-");
}
V v = doTake();
notifyAll();
return v;
}
// BLOCKS-UNTIL: not-full
// Alternate form of put() using conditional notification
public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull()) {
wait();
System.out.print("ap-");
}
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty) {
notifyAll();
System.out.println("wasEmpty");
} else {
System.out.println("was not Empty");
}
}
}
class BoundedBufferExampleUsage {
private BoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;
public BoundedBufferExampleUsage() {
this.buffer = new BoundedBuffer<>();
}
public static void main(String...args) {
BoundedBufferExampleUsage beu = new BoundedBufferExampleUsage();
new Thread(()->{
try {
beu.useBuffer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
Thread.sleep(5000);
beu.buffer.put("A-Message-from-BoundedBuffer");
beu.buffer.alternatePut("A-Message-from-BoundedBuffer-for-alternate");
Thread.sleep(5000);
beu.buffer.alternatePut("A-Message-from-BoundedBuffer-for-alternate-again");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
void useBuffer() throws InterruptedException {
String item = buffer.take();
// use item
System.out.println("took item " + item);
item = buffer.take();
// use item
System.out.println("took item " + item);
item = buffer.take();
// use item
System.out.println("took item " + item);
}
}
The BoundedBuffer
class extends BaseBoundedBuffer
and uses Java’s intrinsic monitor methods (wait()
and notifyAll()
) to implement a classic producer-consumer pattern with proper blocking behavior. When a producer calls put()
, it waits if the buffer is full; when a consumer calls take()
, it waits if the buffer is empty. Both operations unblock when their respective conditions change. An alternative alternatePut()
method demonstrates conditional notification optimization: it only notifies waiting consumers if the buffer was previously empty. The accompanying example shows how messages are safely passed between producer and consumer threads using coordinated blocking, making this a correct and efficient implementation for concurrent bounded buffers.
No comments:
Post a Comment