Here is the BoundedQueue code:
public class BoundedQueue <T> {
private final T[] buffer;
private final int capacity;
private int front;
private int rear;
private int count;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
/**
*
* @param capacity is the fixed queue capacity
*/
public BoundedQueue (int capacity) {
super();
this.capacity = capacity;
buffer = ((T[])new Object[capacity]);
}
/**
*
* @return the object in the front of the queue
*
* @throws InterruptedException
*/
public T get() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
T result = buffer[front];
front = (front + 1) % capacity;
count--;
notFull.signal();
return result;
} finally {
lock.unlock();
}
}
/**
* return object of the subclass
*
* @param <R>
* @param type
* @return
* @throws InterruptedException
*/
public <R extends T> R get(final Class<R> type) throws InterruptedException {
return (R)this.get();
}
/**
* put the object in the rear of the queue
* @param data
* @throws InterruptedException
*/
public void put(final T data) throws InterruptedException {
lock.lock();
try {
while (count == capacity) {
notFull.await();
}
buffer[rear] = data;
rear = (rear + 1) % capacity;
count++;
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
Here is the JUnit test code for BoundedQueue:
public class BoundedQueueTest {
static public class QueueConsumer implements Runnable, Callable<Long> {
private BoundedQueue<Integer> queue;
private long sleepTime;
private int itemCount;
private List items = new LinkedList<Integer>();
public QueueConsumer (BoundedQueue<Integer> que, int itemCount, long sleepTime) {
this.queue = que;
this.itemCount = itemCount;
this.sleepTime = sleepTime;
}
@Override
public Long call() throws Exception {
long sum = 0;
try {
for ( int i=0; i<itemCount; i++) {
Integer obj = this.queue.get();
Integer compareToObj = new Integer(i);
items.add(obj);
Assert.assertEquals(obj, compareToObj);
//System.out.println("item - " + obj);
sum++;
Thread.sleep(sleepTime);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return sum;
}
public void run () {
try {
long sum = this.call();
} catch (Exception e) {
e.printStackTrace();
}
}
}
static public class QueueProducer implements Runnable, Callable<Long> {
private BoundedQueue<Integer> queue;
private int itemCount;
private long sleepTime;
private Integer[] items;
public QueueProducer (BoundedQueue<Integer> que, int itemCount, long sleepTime) {
this.queue = que;
this.itemCount = itemCount;
this.sleepTime = sleepTime;
this.items = new Integer[itemCount];
for ( int i=0; i<itemCount; i++) {
this.items[i] = new Integer(i);
}
}
@Override
public Long call() throws Exception {
long sum = 0;
try {
for ( int i=0; i<itemCount; i++) {
this.queue.put(this.items[i]);
sum++;
Thread.sleep(sleepTime);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return sum;
}
public void run () {
try {
call();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Test
public void doFastConsumerBoundedQueueTest() {
try {
test(2, 100, 1000, 10, 8);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
@Test
public void doFastProducerBoundedQueueTest() {
try {
test(2, 100, 1000, 10, 15);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private void test(final int threadCount, int queueSize, int itemCound, long produceSleepTime, long consumerSleepTime) throws InterruptedException, ExecutionException {
BoundedQueue<Integer> queue = new BoundedQueue<Integer>(queueSize);
Callable<Long> task1 = new QueueProducer(queue, itemCound, produceSleepTime);
Callable<Long> task2 = new QueueConsumer(queue, itemCound, consumerSleepTime);
List<Callable<Long>> tasks = new ArrayList<Callable<Long>>();
tasks.add(task1);
tasks.add(task2);
ExecutorService executorService = Executors.newFixedThreadPool(2);
List<Future<Long>> futures = executorService.invokeAll(tasks);
List<Long> resultList = new ArrayList<Long>(futures.size());
for (Future<Long> future : futures) {
resultList.add(future.get());
}
executorService.shutdown();
}
public static void main(String args[]) {
new BoundedQueueTest().doFastProducerBoundedQueueTest();
}
}
Enjoy the journey.
No comments:
Post a Comment