Tuesday, April 10, 2012

Simple Thread-Safe Bounded Queue Implementation in Java

I was asked to implement a thread-safe bounded queue for a pattern of producer and customer. The implementation involves ReentrantLock and Condition; it's straight forward. The testing of the queue requires a bit more work. For verification,  I created create a thread pool of size 2. One for consumer, one for producer.

Here is the BoundedQueue code:


public class BoundedQueue <T> {

    private final T[] buffer;
    private final int capacity;
 
    private int front;
    private int rear;
    private int count;
 
    private final Lock lock = new ReentrantLock();
 
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    /**
     * 
     * @param capacity is the fixed queue capacity 
     */
    public BoundedQueue (int capacity) {
        super();
        this.capacity = capacity;
        buffer = ((T[])new Object[capacity]);
    }

    /**
     * 
     * @return the object in the front of the queue 
     * 
     * @throws InterruptedException
     */
    public T get() throws InterruptedException {
        lock.lock();
 
        try {
            while (count == 0) {
                notEmpty.await();
            }
 
            T result = buffer[front];
            front = (front + 1) % capacity;
            count--;
            notFull.signal();
            return result;
        } finally {
            lock.unlock();
        }
    }

    /**
     * return object of the subclass 
     * 
     * @param <R>
     * @param type
     * @return
     * @throws InterruptedException
     */
    public <R extends T> R get(final Class<R> type) throws InterruptedException {
     return (R)this.get();
    }

    /**
     * put the object in the rear of the queue
     * @param data
     * @throws InterruptedException
     */
    public void put(final T data) throws InterruptedException {
        lock.lock();
        try {
            while (count == capacity) {
                notFull.await();
            }
 
            buffer[rear] = data;
            rear = (rear + 1) % capacity;
            count++;
 
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
 
}

Here is the JUnit test code for BoundedQueue:
public class BoundedQueueTest {
 static public class QueueConsumer implements Runnable, Callable<Long> {
  private BoundedQueue<Integer> queue;
  private long sleepTime;
  private int itemCount;
  private List items = new LinkedList<Integer>(); 
  public QueueConsumer (BoundedQueue<Integer> que, int itemCount, long sleepTime) {
   this.queue = que;
   this.itemCount = itemCount;
   this.sleepTime = sleepTime;
  }
 
  @Override
  public Long call() throws Exception {
   long sum = 0;
   try {
    for ( int i=0; i<itemCount; i++) {
     Integer obj = this.queue.get();
     Integer compareToObj = new Integer(i);
     items.add(obj);
     Assert.assertEquals(obj, compareToObj);
     //System.out.println("item - " + obj);
     sum++;
     Thread.sleep(sleepTime);
    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   }

   return sum;
  }

  
  public void run () {
   try {
    long sum = this.call();
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }

 static public class QueueProducer implements Runnable, Callable<Long> {
  private BoundedQueue<Integer> queue;
  private int itemCount;
  private long sleepTime;
  private Integer[] items; 
  public QueueProducer (BoundedQueue<Integer> que, int itemCount, long sleepTime) {
   this.queue = que;
   this.itemCount = itemCount;
   this.sleepTime = sleepTime;
   this.items = new Integer[itemCount];
   for ( int i=0; i<itemCount; i++) {
    this.items[i] = new Integer(i);
   }
  }

  @Override
  public Long call() throws Exception {
   long sum = 0;
   try {
    for ( int i=0; i<itemCount; i++) {
     this.queue.put(this.items[i]);
     sum++;
     Thread.sleep(sleepTime);
    }
   } catch (InterruptedException e) {
    e.printStackTrace();
   }
   return sum;
  }
  public void run () {
   try {
    call();
   } catch (Exception e) {
    e.printStackTrace();
   }
  }
 }


 @Test
 public void doFastConsumerBoundedQueueTest() {

  try {
   test(2, 100, 1000, 10, 8);
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (ExecutionException e) {
   e.printStackTrace();
  }
 }

 @Test
 public void doFastProducerBoundedQueueTest() {

  try {
   test(2, 100, 1000, 10, 15);
  } catch (InterruptedException e) {
   e.printStackTrace();
  } catch (ExecutionException e) {
   e.printStackTrace();
  }
 }

 
 private void test(final int threadCount, int queueSize, int itemCound, long produceSleepTime, long consumerSleepTime) throws InterruptedException, ExecutionException {
  BoundedQueue<Integer> queue = new BoundedQueue<Integer>(queueSize);
  Callable<Long> task1 = new QueueProducer(queue, itemCound, produceSleepTime);
  Callable<Long> task2 = new QueueConsumer(queue, itemCound, consumerSleepTime);
  
  List<Callable<Long>> tasks = new ArrayList<Callable<Long>>(); 
  tasks.add(task1);
  tasks.add(task2);
  
  ExecutorService executorService = Executors.newFixedThreadPool(2);
  
  List<Future<Long>> futures = executorService.invokeAll(tasks);
  List<Long> resultList = new ArrayList<Long>(futures.size());

  for (Future<Long> future : futures) {
   resultList.add(future.get());
  }
  
  executorService.shutdown();
 }
 
 public static void main(String args[]) {
  new BoundedQueueTest().doFastProducerBoundedQueueTest();
  
 }

}

Enjoy the journey.

No comments:

Post a Comment