ConditionBoundedBuffer.java
import java.util.concurrent.locks.*;
/**
* ConditionBoundedBuffer
* <p/>
* Bounded buffer using explicit condition variables
*/
public class ConditionBoundedBuffer <T> {
protected final Lock lock = new ReentrantLock();
// CONDITION PREDICATE: notFull (count < items.length)
private final Condition notFull = lock.newCondition();
// CONDITION PREDICATE: notEmpty (count > 0)
private final Condition notEmpty = lock.newCondition();
private static final int BUFFER_SIZE = 100;
private final T[] items = (T[]) new Object[BUFFER_SIZE];
private int tail, head, count;
// BLOCKS-UNTIL: notFull
public void put(T x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await();
items[tail] = x;
if (++tail == items.length)
tail = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
// BLOCKS-UNTIL: notEmpty
public T take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
T x = items[head];
items[head] = null;
if (++head == items.length)
head = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
class ConditionBoundedBufferExampleUsage {
private ConditionBoundedBuffer<String> buffer;
int SLEEP_GRANULARITY = 50;
public ConditionBoundedBufferExampleUsage() {
this.buffer = new ConditionBoundedBuffer<>();
}
public static void main(String...args) {
ConditionBoundedBufferExampleUsage ceu = new ConditionBoundedBufferExampleUsage();
new Thread(()->{
try {
ceu.useBuffer();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
try {
Thread.sleep(5000);
ceu.buffer.put("A-Message-from-ConditionBoundedBuffer");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
void useBuffer() throws InterruptedException {
String item = buffer.take();
// use item
System.out.println("took item " + item);
}
}
The ConditionBoundedBuffer
class is a robust and efficient implementation of a thread-safe bounded buffer using explicit locks and condition variables, as provided by the java.util.concurrent.locks
package. It manages producer-consumer coordination with two separate Condition
objects: notFull
and notEmpty
, allowing fine-grained control over thread signaling. When the buffer is full, producers block on notFull
; when it’s empty, consumers block on notEmpty
. Upon state change, the corresponding condition is signaled to resume waiting threads. This design avoids the inefficiency of unconditional notifyAll()
and supports cleaner separation of concerns. The example demonstrates this in action by producing and consuming a message, showcasing an advanced and scalable alternative to intrinsic locking.
No comments:
Post a Comment