Site Search:

BoundedBuffer.java

BoundedBuffer.java

/**
* BoundedBuffer
* <p/>
* Bounded buffer using condition queues
*
*/
public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
// CONDITION PREDICATE: not-full (!isFull())
// CONDITION PREDICATE: not-empty (!isEmpty())
public BoundedBuffer() {
this(100);
}

public BoundedBuffer(int size) {
super(size);
}

// BLOCKS-UNTIL: not-full
public synchronized void put(V v) throws InterruptedException {
while (isFull()) {
wait();
System.out.print("p-");
}
doPut(v);
notifyAll();
}

// BLOCKS-UNTIL: not-empty
public synchronized V take() throws InterruptedException {
while (isEmpty()) {
wait();
System.out.print("t-");
}
V v = doTake();
notifyAll();
return v;
}

// BLOCKS-UNTIL: not-full
// Alternate form of put() using conditional notification
public synchronized void alternatePut(V v) throws InterruptedException {
while (isFull()) {
wait();
System.out.print("ap-");
}
boolean wasEmpty = isEmpty();
doPut(v);
if (wasEmpty) {
notifyAll();
System.out.println("wasEmpty");
} else {
System.out.println("was not Empty");
}
}
}

class BoundedBufferExampleUsage {
private BoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;

public BoundedBufferExampleUsage() {
this.buffer = new BoundedBuffer<>();
}

public static void main(String...args) {
BoundedBufferExampleUsage beu = new BoundedBufferExampleUsage();
new Thread(()->{
try {
beu.useBuffer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
Thread.sleep(5000);
beu.buffer.put("A-Message-from-BoundedBuffer");
beu.buffer.alternatePut("A-Message-from-BoundedBuffer-for-alternate");
Thread.sleep(5000);
beu.buffer.alternatePut("A-Message-from-BoundedBuffer-for-alternate-again");
} catch (InterruptedException e) {
e.printStackTrace();
}

}

void useBuffer() throws InterruptedException {
String item = buffer.take();
// use item
System.out.println("took item " + item);
item = buffer.take();
// use item
System.out.println("took item " + item);
item = buffer.take();
// use item
System.out.println("took item " + item);
}
}

The BoundedBuffer class extends BaseBoundedBuffer and uses Java’s intrinsic monitor methods (wait() and notifyAll()) to implement a classic producer-consumer pattern with proper blocking behavior. When a producer calls put(), it waits if the buffer is full; when a consumer calls take(), it waits if the buffer is empty. Both operations unblock when their respective conditions change. An alternative alternatePut() method demonstrates conditional notification optimization: it only notifies waiting consumers if the buffer was previously empty. The accompanying example shows how messages are safely passed between producer and consumer threads using coordinated blocking, making this a correct and efficient implementation for concurrent bounded buffers.

No comments:

Post a Comment