Site Search:

SemaphoreBoundedBuffer.java, TestBoundedBuffer.java, TimedPutTakeTest.java

The following files test SemaphoreBoundedBuffer.java, we test for correctness, the blocking operations, thread safety, memory leakage, and performance matrix such as thoroughput.

SemaphoreBoundedBuffer.java

import java.util.concurrent.*;

/**
* BoundedBuffer
* <p/>
* Bounded buffer using \Semaphore
*
*/
public class SemaphoreBoundedBuffer <E> {
private final Semaphore availableItems, availableSpaces;
private final E[] items;
private int putPosition = 0, takePosition = 0;

public SemaphoreBoundedBuffer(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException();
availableItems = new Semaphore(0);
availableSpaces = new Semaphore(capacity);
items = (E[]) new Object[capacity];
}

public boolean isEmpty() {
return availableItems.availablePermits() == 0;
}

public boolean isFull() {
return availableSpaces.availablePermits() == 0;
}

public void put(E x) throws InterruptedException {
availableSpaces.acquire();
doInsert(x);
availableItems.release();
}

public E take() throws InterruptedException {
availableItems.acquire();
E item = doExtract();
availableSpaces.release();
return item;
}

private synchronized void doInsert(E x) {
int i = putPosition;
items[i] = x;
putPosition = (++i == items.length) ? 0 : i;
}

private synchronized E doExtract() {
int i = takePosition;
E x = items[i];
items[i] = null;
takePosition = (++i == items.length) ? 0 : i;
return x;
}
}

TestBoundedBuffer.java

import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryUsage;
import static org.junit.Assert.*;

/**
* TestBoundedBuffer
* <p/>
* Basic unit tests for BoundedBuffer
*
*/
public class TestBoundedBuffer {
private static final long LOCKUP_DETECT_TIMEOUT = 1000;
private static final int CAPACITY = 1000;
private static final int THRESHOLD = 1000;

public static void main(String...args) throws InterruptedException {
TestBoundedBuffer tbb = new TestBoundedBuffer();
tbb.testIsEmptyWhenConstructed();
tbb.testIsFullAfterPuts();
tbb.testTakeBlocksWhenEmpty();
tbb.testLeak();
}

public void testIsEmptyWhenConstructed() {
SemaphoreBoundedBuffer<Integer> bb = new SemaphoreBoundedBuffer<Integer>(10);
assertTrue(bb.isEmpty());
assertFalse(bb.isFull());
}

public void testIsFullAfterPuts() throws InterruptedException {
SemaphoreBoundedBuffer<Integer> bb = new SemaphoreBoundedBuffer<Integer>(10);
for (int i = 0; i < 10; i++)
bb.put(i);
assertTrue(bb.isFull());
assertFalse(bb.isEmpty());
}


public void testTakeBlocksWhenEmpty() {
final SemaphoreBoundedBuffer<Integer> bb = new SemaphoreBoundedBuffer<Integer>(10);
Thread taker = new Thread() {
public void run() {
try {
int unused = bb.take();
fail(); // if we get here, it's an error
} catch (InterruptedException success) {
}
}
};
try {
taker.start();
Thread.sleep(LOCKUP_DETECT_TIMEOUT);
taker.interrupt();
taker.join(LOCKUP_DETECT_TIMEOUT);
assertFalse(taker.isAlive());
} catch (Exception unexpected) {
fail();
}
}

class Big {
double[] data = new double[100000];
}

public void testLeak() throws InterruptedException {
SemaphoreBoundedBuffer<Big> bb = new SemaphoreBoundedBuffer<Big>(CAPACITY);
long heapSize1 = snapshotHeap();
for (int i = 0; i < CAPACITY; i++)
bb.put(new Big());
for (int i = 0; i < CAPACITY; i++) {
Big removed = bb.take();
assertNotNull(removed); // Ensure objects are being removed
}
System.gc();
Thread.sleep(100);
long heapSize2 = snapshotHeap();
System.out.println(heapSize1);
System.out.println(heapSize2);
//assertTrue(Math.abs(heapSize1 - heapSize2) < THRESHOLD);
}

private long snapshotHeap() {
// Get the MemoryMXBean instance
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
// Get the current heap memory usage
MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage();
// Return the used memory in bytes as a long
return heapMemoryUsage.getUsed();
}

}

TimedPutTakeTest.java

import static org.junit.Assert.assertEquals;

import java.util.concurrent.*;

/**
* TimedPutTakeTest
* <p/>
* Testing with a barrier-based timer
*
*/
public class TimedPutTakeTest extends PutTakeTest {
/**
* BarrierTimer
* <p/>
* Barrier-based timer
*/
private class BarrierTimer implements Runnable {
private boolean started;
private long startTime, endTime;

public synchronized void run() {
long t = System.nanoTime();
if (!started) {
started = true;
startTime = t;
} else
endTime = t;
}

public synchronized void clear() {
started = false;
}

public synchronized long getTime() {
return endTime - startTime;
}
}
private BarrierTimer timer = new BarrierTimer();

public TimedPutTakeTest(int cap, int pairs, int trials) {
super(cap, pairs, trials);
barrier = new CyclicBarrier(nPairs * 2 + 1, timer);
}

public void test() {
try {
timer.clear();
for (int i = 0; i < nPairs; i++) {
pool.execute(new PutTakeTest.Producer());
pool.execute(new PutTakeTest.Consumer());
}
barrier.await();
barrier.await();
long nsPerItem = timer.getTime() / (nPairs * (long) nTrials);
System.out.print("Throughput: " + nsPerItem + " ns/item");
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static void main(String[] args) throws Exception {
int tpt = 100000; // trials per thread
for (int cap = 1; cap <= 1000; cap *= 10) {
System.out.println("Capacity: " + cap);
for (int pairs = 1; pairs <= 128; pairs *= 2) {
TimedPutTakeTest t = new TimedPutTakeTest(cap, pairs, tpt);
System.out.print("Pairs: " + pairs + "\t");
t.test();
System.out.print("\t");
Thread.sleep(1000);
t.test();
System.out.println();
Thread.sleep(1000);
}
}
PutTakeTest.pool.shutdown();
}
}

Collectively, these classes serve as practical examples for understanding and testing semaphore-based synchronization in Java, emphasizing thread safety, blocking behavior, and performance considerations in concurrent programming.

No comments:

Post a Comment