Here is the BoundedQueue code:
public class BoundedQueue <T> { private final T[] buffer; private final int capacity; private int front; private int rear; private int count; private final Lock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); /** * * @param capacity is the fixed queue capacity */ public BoundedQueue (int capacity) { super(); this.capacity = capacity; buffer = ((T[])new Object[capacity]); } /** * * @return the object in the front of the queue * * @throws InterruptedException */ public T get() throws InterruptedException { lock.lock(); try { while (count == 0) { notEmpty.await(); } T result = buffer[front]; front = (front + 1) % capacity; count--; notFull.signal(); return result; } finally { lock.unlock(); } } /** * return object of the subclass * * @param <R> * @param type * @return * @throws InterruptedException */ public <R extends T> R get(final Class<R> type) throws InterruptedException { return (R)this.get(); } /** * put the object in the rear of the queue * @param data * @throws InterruptedException */ public void put(final T data) throws InterruptedException { lock.lock(); try { while (count == capacity) { notFull.await(); } buffer[rear] = data; rear = (rear + 1) % capacity; count++; notEmpty.signal(); } finally { lock.unlock(); } } }
Here is the JUnit test code for BoundedQueue:
public class BoundedQueueTest { static public class QueueConsumer implements Runnable, Callable<Long> { private BoundedQueue<Integer> queue; private long sleepTime; private int itemCount; private List items = new LinkedList<Integer>(); public QueueConsumer (BoundedQueue<Integer> que, int itemCount, long sleepTime) { this.queue = que; this.itemCount = itemCount; this.sleepTime = sleepTime; } @Override public Long call() throws Exception { long sum = 0; try { for ( int i=0; i<itemCount; i++) { Integer obj = this.queue.get(); Integer compareToObj = new Integer(i); items.add(obj); Assert.assertEquals(obj, compareToObj); //System.out.println("item - " + obj); sum++; Thread.sleep(sleepTime); } } catch (InterruptedException e) { e.printStackTrace(); } return sum; } public void run () { try { long sum = this.call(); } catch (Exception e) { e.printStackTrace(); } } } static public class QueueProducer implements Runnable, Callable<Long> { private BoundedQueue<Integer> queue; private int itemCount; private long sleepTime; private Integer[] items; public QueueProducer (BoundedQueue<Integer> que, int itemCount, long sleepTime) { this.queue = que; this.itemCount = itemCount; this.sleepTime = sleepTime; this.items = new Integer[itemCount]; for ( int i=0; i<itemCount; i++) { this.items[i] = new Integer(i); } } @Override public Long call() throws Exception { long sum = 0; try { for ( int i=0; i<itemCount; i++) { this.queue.put(this.items[i]); sum++; Thread.sleep(sleepTime); } } catch (InterruptedException e) { e.printStackTrace(); } return sum; } public void run () { try { call(); } catch (Exception e) { e.printStackTrace(); } } } @Test public void doFastConsumerBoundedQueueTest() { try { test(2, 100, 1000, 10, 8); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Test public void doFastProducerBoundedQueueTest() { try { test(2, 100, 1000, 10, 15); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } private void test(final int threadCount, int queueSize, int itemCound, long produceSleepTime, long consumerSleepTime) throws InterruptedException, ExecutionException { BoundedQueue<Integer> queue = new BoundedQueue<Integer>(queueSize); Callable<Long> task1 = new QueueProducer(queue, itemCound, produceSleepTime); Callable<Long> task2 = new QueueConsumer(queue, itemCound, consumerSleepTime); List<Callable<Long>> tasks = new ArrayList<Callable<Long>>(); tasks.add(task1); tasks.add(task2); ExecutorService executorService = Executors.newFixedThreadPool(2); List<Future<Long>> futures = executorService.invokeAll(tasks); List<Long> resultList = new ArrayList<Long>(futures.size()); for (Future<Long> future : futures) { resultList.add(future.get()); } executorService.shutdown(); } public static void main(String args[]) { new BoundedQueueTest().doFastProducerBoundedQueueTest(); } }
Enjoy the journey.
No comments:
Post a Comment