Google

Aug 22, 2012

Article: Scenarios and solutions for better concurrency and thread safety part-2 CountDownLatch and CyclicBarrier

There are other real life scenarios where the java.util.concurrent package with the following classes can be put to great use.
  • CountDownLatch
  • CyclicBarrier
Here are some real life scenarios discusssed below to make use of the above classes.

Scenario: A main thread creates 3 database connections and assigns each of those connection to 3 different child threads that are spawned from the main thread. The main thread must wait while all the child threads are completed and then close all the database connections. So, how will you accomplish this?

Solution: This where the CountDownLatch comes in handy as you already know that there are finite (i.e 3) number of threads.  CountDownLatch can be used by the main thread to wait on the child threads. A CountDownLatch will be created with 3 being the count.

CountDownLatch countDownLatch = new CountDownLatch(MAX_THREADS);

The main thread will spawn new threads and wait on the count to reach 0 with the awit( ) method.

countDownLatch.await();

As the each child thread process within the run( ) method, as the child thread completes processing, the count can be decremented with

countDownLatch.countDown(); 





Here is a simplified example where the worker threads wait for all the other worker threads to start with the startSignal and the main thread waits for all the worker threads to complete with a stopSignal as illustrated in the above diagram . Firstly, define a worker thread class.


import java.util.concurrent.CountDownLatch;

public class Worker implements Runnable {
 
    private CountDownLatch startLatch;
    private CountDownLatch stopLatch;

    public Worker(CountDownLatch startLatch, CountDownLatch stopLatch) {
       this.startLatch = startLatch;
       this.stopLatch = stopLatch;
    }

    @Override
    public void run() {
        try {
            startLatch.await(); // wait until the latch has counted down to zero
            System.out.println("Running: " + Thread.currentThread().getName());
        } catch (InterruptedException ex)  {
            ex.printStackTrace();
        }
        finally {
         //count down to let the main thread to continue once the count reaches 0 from MAX_THREADS (i.e. 5)
         stopLatch.countDown(); 
        }
 }

}


Finally, define the WaitForAllThreadsToStart class that creates the worker thread.

import java.util.concurrent.CountDownLatch;

public class WaitForAllThreadsToStart {
 
 private static final int MAX_THREADS = 3;
 
 public static void main(String[] args) throws Exception {
     CountDownLatch startSignal = new CountDownLatch(1);   //count down from 1 to 0
     CountDownLatch stopSignal = new CountDownLatch(MAX_THREADS); // count down from 3 to 0
        
     System.out.println("The main thread is going to spawn " + MAX_THREADS  + " worker threads.....");  
     for (int i = 1; i <= MAX_THREADS; i++) {
        Thread t = new Thread(new Worker(startSignal,stopSignal), "thread-" + i);
        Thread.sleep(300);
        t.start();
        System.out.println("Started: " + t.getName() + " but waits for other threads to start.");
     }
        
     //count down the start signal from 1 to 0, so that the waiting worker threads can start executing 
     startSignal.countDown();
     System.out.println("worker threads can now start executing as all worker threads have started.....");
     try{       
        stopSignal.await(); // wait for the worker threads to complete by counting down the stopLatch
     } catch (InterruptedException ex){
        ex.printStackTrace();
     }
     System.out.println("finished executing the worker threads and the main thread is continuing.");
     System.out.println("The main thread can execute any task here.");
        
 }
}


The output will like:

The main thread is going to spawn 3 worker threads.....
Started: thread-1 but waiting for other threads to start.
Started: thread-2 but waiting for other threads to start.
Started: thread-3 but waiting for other threads to start.
worker threads can now start executing as all worker threads have started.....
Running: thread-1
Running: thread-3
Running: thread-2
finished executing the worker threads and the main thread is continuing.
The main thread can execute any task here.


Scenario: What if the above scenario has a specific requirement where the 3 children threads need to wait between each other? For example, if each of the 3 threads need to perform 2 tasks. Before the task 2 can be started by a thread, all the 3 child threads must finish task 1. The task 1 can be reading data from the database and task 2 could be performing some computation and finally these computations need to be consolidated and written back by a single thread.

Solution:  A CyclicBarrier can be used if the number of children threads are know upfront and to implement waiting amongst child threads until all of them finish.  This is useful where parallel threads need to perform a job which requires sequential execution. It has methods like cyclicBarrier.await( ) and cyclicBarrier.reset( );




Here is the simplified code to improve understanding. Firstly the WorkerTask thread.


import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class WorkerTask implements Runnable {
 
 private CyclicBarrier barrier;
  
 public WorkerTask(CyclicBarrier barrier) {
  this.barrier = barrier;
 }

 @Override
 public void run() {
   String threadName = Thread.currentThread().getName();
   try {
         System.out.println(threadName + " is now performing Task-A");
         barrier.await();      //barrier point for all threads to finish Task-A
           
         System.out.println(threadName + " is now performing Task-B");
         barrier.await();     //barrier point for all threads to finish Task-B 
           
   } catch (BrokenBarrierException ex)  {
         ex.printStackTrace();
   } catch (InterruptedException e) {
      e.printStackTrace();
   }
  }
}


Now, the test class that creates the worker threads to perform Tasak-A and Task-B, and the barrier thread for consolidation work.

import java.util.concurrent.CyclicBarrier;

public class WaitForBarrierPoint {
 
 private static final int MAX_THREADS = 3;
 private static final int NO_OF_TASKS = 2;    //Task-A and Task-B
 
 private static int taskCount = 0;
 
 //new worker thread that monitors the barrier condition, i.e. 
 //MAX_THREADS that complete a task, and performs the consolidation task.
 //This is an anonymous inner class in action
 private static CyclicBarrier cb = new CyclicBarrier(MAX_THREADS, new Runnable() {
 
   @Override
   public void run() {
     System.out.println("All " + MAX_THREADS + " threads have reached the barrier point.");
     ++taskCount;
   
     //the consolidation job starts only after both tasks are completed.
     if(taskCount == NO_OF_TASKS) {
      System.out.println("The consolidation job can start now .... ");
   }
  }
 }); 
 
 
 public static void main(String[] args) {
  Thread t = null;
  //create 3 worker threads
  for (int i = 1; i <= MAX_THREADS; i++) {
   t = new Thread(new WorkerTask(cb), "Thread-" + i);
   t.start();
  }
  
  System.out.println("The main thread ends here.");
 }
 
}


Q. Why is it called a cyclic barrier?
A. Because it acts as a barrier point for a number of worker threads to wait for each other to complete their tasks. The barrier is called cyclic because it can be re-used after all the waiting worker threads are released for the next barrier point.

A barrier is constructed with the following arguments to a constructor.
  • the number of threads that will be participating in the parallel operation.
  • an optional, amalgamation or consolidation  routine to be run at the end of each step or iteration.
At each step (or iteration) of the operation:
  • each thread carries out its portion of the work to complete that step.
  • after doing its portion of the work, each thread calls the barrier's await ( ) method.
  • the await( ) method returns only when 
          -- all 3 threads have called await( ).
          -- the amalgamation or consolidation  method has run (the barrier calls this on the last thread to call
              await( ) before releasing the awaiting threads).

if any of the 3 threads is interrupted or times out while waiting for the barrier, then the barrier is "broken" and all other waiting threads receive a BrokenBarrierException. This will  propagate to all threads and for the other steps to halt, or for the steps to be interrupted externally by interrupting just one of the threads.

Q. So, when to use a CountDownLatch and when to use a  CyclicBarrier?
A. A CountDownLatch is initialized with a counter. Threads can then either count down on the latch or wait for it to reach 0. When the latch reaches 0, all waiting threads can resume.

If you want a set of threads to repeatedly meet at a common point, you are better served by using a CyclicBarrier. For example, start a bunch of threads, meet, do some stuff like data consolidation or amalgamation, meet again, validate some assertions, and do this repeatedly.

A given CountDownLatch can only be used once, making it inconvenient for operations that occur in steps, with intermediate results from the different threads needing to be consolidated between steps. The CountDownLatch also doesn't explicitly allow one thread to tell the others to "stop waiting", which is sometimes useful, for example, if an error occurs in one of the threads.

The CyclicBarrier is generally more useful than CountDownLatch in scenarios where:
  • a multithreaded operation occurs in steps or iterations, and
  • a single-threaded operation is required between steps/iterations, for example, to combine the results of the previous multithreaded steps.

Stay tuned for Semaphores and Atomic classes in the next part.

Labels:

3 Comments:

Blogger Anshul Pathak said...

Thread knowledge booster...Thanks Sir

2:56 PM, August 23, 2012  
Anonymous Anonymous said...

This blog is genuinely impressive in all aspects.
article submission

10:26 PM, February 19, 2013  
Blogger Unknown said...

Excellent blog, I have got a deep understanding of java concurrency concept after reading your through your diagrams and code. Thanks you very much sir. I appreciate it very much.

9:59 AM, February 25, 2013  

Post a Comment

Subscribe to Post Comments [Atom]

<< Home