Tuesday, November 21, 2006

Java 5 Concurrency: Conditions

As mentioned in the previous post Java 5 Locks, the standard wait, notify and notifyall methods do not allow multiple wait sets per object. The Java 5 condition object by factoring out these methods into distinct objects to give the effect of having multiple wait sets per object. Conditions provide a means for one thread to suspend execution (to "wait") until notified by another thread that some state condition may now be true. The key property that waiting for a condition provides is that it atomically releases the associated lock and suspends the current thread, just like Object.wait. A Condition instance is intrinsically bound to a lock. To obtain a Condition instance for a particular Lock instance use its newCondition() method.
Skip to Sample Code
The Condition interface describes condition variables that may be associated with Locks. These are similar in usage to the implicit monitors accessed using Object.wait(), but offer extended capabilities. In particular, multiple Condition objects may be associated with a single Lock. The following example demonstrates the use of Conditions through an implementation of the Producer/Consumer problem (Bounded buffer problem). This example is similar to the one in the reader-writer locks example
public class Buffer {
ReentrantLock lock = new ReentrantLock();
private String[] names;
private int MAXLENGTH = 10;
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
private int putPosition, getPosition, count;

public Buffer() {
names = new String[MAXLENGTH];
}

public void put(String str) {
lock.lock();
try {
System.out.println("Writer : Array Size : " + count);
while (count == MAXLENGTH) {
System.out.println("Writer Waiting");
notFull.await();
}
count++;
names[putPosition++] = str;
if(putPosition == MAXLENGTH)
putPosition = 0;
Thread.sleep(100);
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}

public void get() {
lock.lock();
try {
System.out.println("Reader : Array Size : " +count);
while (count == 0) {
System.out.println("Reader Waiting");
notEmpty.await();
}
count--;
names[getPosition++] = null;
if(getPosition == MAXLENGTH)
getPosition = 0;
Thread.sleep(100);
notFull.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
Buffer.java

Note that in this case, we use the await() method instead of wait() and signal() instead of notify(). This is done to ensure that they are different from the methods of Object.
public class Producer implements Runnable {
Buffer myData;
public Producer(Buffer myData) {
super();
this.myData = myData;
}
public void run() {
for(int i = 0; i < 10; i++) {
myData.put(Thread.currentThread().getName() + " : " + i);
}
}
}
Producer.java
public class Consumer implements Runnable {
Buffer myData;
public void run() {
for(int i = 0; i < 10; i++) {
myData.get();
}
}

public Consumer(Buffer myData) {
super();
this.myData = myData;
}
}
Consumer.java
public class ProducerConsumer {
static int THREADS = 10;

public static void main(String[] args) {
Consumer[] consumers = new Consumer[THREADS];
Producer[] producers = new Producer[THREADS];
Buffer data = new Buffer();
Thread[] threads = new Thread[THREADS * 2];
for (int i = 0; i < THREADS; i++) {
consumers[i] = new Consumer(data);
producers[i] = new Producer(data);
threads[i] = new Thread(consumers[i], "" + i);
threads[i + THREADS] = new Thread(producers[i], "" + i);
}

for (int i = 0; i < THREADS * 2; i++) {
threads[i].start();
}

for (int i = 0; i < THREADS * 2; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
ProducerConsumer.java

No comments:

Post a Comment

Popular Posts