Using CyclicBarrier in Java

By Arvind Rai, January 16, 2023
On this page, we will learn using CyclicBarrier in our Java application.
1. The CyclicBarrier class helps in synchronization of threads in such a way that all parties(threads) wait for each other to reach a common barrier point. This barrier is called cyclic because it can be re-used after the waiting threads are released.
2. The CyclicBarrier is created with number of parties and with optional Runnable. All the threads of the group will wait for each other to complete and once all threads complete its task then the specified Runnable instance will execute.
3. The CyclicBarrier is used in such programs where a party of threads of fixed size need to wait for each other.

Understanding the Working

To understand the working of CyclicBarrier, find the diagram.
Using CyclicBarrier in Java
In the above diagram,
1. A group of three threads are created and started.
2. Each thread reaches to common barrier point after completing its tasks. Order of reaching to common barrier point does not matter.
3. Once each thread of the group has reached to the common barrier point, CyclicBarrier trips and specified Runnable to the CyclicBarrier constructor, is started to execute.

Now understand by the code.
CyclicBarrierDemo.java
package com.concretepage;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
class CyclicBarrierDemo {
  class Worker implements Runnable {
	String workName;
	CyclicBarrier barrier;
	Worker(String workName, CyclicBarrier barrier) {
	  this.workName = workName;
	  this.barrier = barrier;
	}
	public void run() {
	  doWork(workName);
	  try {
		barrier.await(); // Threads reach here after completing its task
	  } catch (InterruptedException e) {
		e.printStackTrace();
	  } catch (BrokenBarrierException e) {
		e.printStackTrace();
	  }
	}
  }
  void doWork(String workName) {
	System.out.println(workName + " processed.");
  }
  public static void main(String[] args) {
	Runnable barrierAction = () -> System.out.println("Executing Runnable after completing all threads of the party.");
	int partySize = 3;
	CyclicBarrier barrier = new CyclicBarrier(partySize, barrierAction);

	CyclicBarrierDemo ob = new CyclicBarrierDemo();
	// Thread-1
	Thread thread1 = new Thread(ob.new Worker("work-1", barrier));
	// Thread-2
	Thread thread2 = new Thread(ob.new Worker("work-2", barrier));
	// Thread-3
	Thread thread3 = new Thread(ob.new Worker("work-3", barrier));

	thread1.start();
	thread2.start();
	thread3.start();
  }
} 
Output
work-2 processed.
work-3 processed.
work-1 processed.
Executing Runnable after completing all threads of the party. 

Constructors

Find the constructors of the CyclicBarrier class.
1.
public CyclicBarrier(int parties) 
Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it.
Parameters:
The parties is the number of threads that must invoke await() before the barrier is tripped.
Throws:
It throws IllegalArgumentException, if parties is less than 1.

2.
public CyclicBarrier(int parties, Runnable barrierAction) 
Creates a new CyclicBarrier that will trip when the given number of parties (threads) are waiting upon it. Once the barrier is tripped, the given barrierAction is executed.
Parameters:
The parties is the number of threads that must invoke await() before the barrier is tripped.
The barrierAction is the command to execute when the barrier is tripped, or null if there is no action.
Throws:
It throws IllegalArgumentException, if parties is less than 1.

Methods

1.
public int await() throws InterruptedException, BrokenBarrierException 
Waits until all parties have invoked await on this barrier.
Returns the arrival index of the current thread.
2.
public int await(long timeout, TimeUnit unit) 
            throws InterruptedException, BrokenBarrierException, TimeoutException 
Waits until all parties have invoked await on this barrier, or the specified waiting time elapses.
Returns the arrival index of the current thread.
3.
public int getNumberWaiting() 
Returns the number of parties waiting for the barrier.
4.
public int getParties() 
Returns the number of parties required to trip this barrier.
5.
public boolean isBroken() 
Queries if this barrier is in a broken state.
6.
public void reset() 
Resets the barrier to its initial state.

Examples

Find the example.
AddMultiply.java
package com.concretepage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class AddMultiply {
  static List<Integer> dataToAddList = new ArrayList<>();
  class Worker implements Runnable {
	int num1;
	int num2;
	CyclicBarrier barrier;
	Worker(int num1, int num2, CyclicBarrier barrier) {
	  this.num1 = num1;
	  this.num2 = num2;
	  this.barrier = barrier;
	}
	public void run() {
	  int multiplyRes = addNum(num1, num2);
	  System.out.println(String.format("%d * %d = %d", num1, num2, multiplyRes));
	  dataToAddList.add(multiplyRes);
	  try {
		barrier.await();
	  } catch (InterruptedException e) {
		e.printStackTrace();
	  } catch (BrokenBarrierException e) {
		e.printStackTrace();
	  }
	}
  }
  int addNum(int num1, int num2) {
	return num1 * num2;
  }
  public static void main(String[] args) {
	Runnable barrierAction = () -> {
	  int sum = 0;
	  for (int i = 0; i < dataToAddList.size(); i++) {
		sum += dataToAddList.get(i);
	  }
	  System.out.println("sum: " + sum);
	};
	int[][] arr = { { 2, 3 }, { 4, 5 }, { 6, 7 } };
	final int partySize = arr.length;
	CyclicBarrier barrier = new CyclicBarrier(partySize, barrierAction);
	AddMultiply ob = new AddMultiply();
	ExecutorService executorService = Executors.newFixedThreadPool(partySize);
	for (int i = 0; i < arr.length; i++) {
	  executorService.execute(ob.new Worker(arr[i][0], arr[i][1], barrier));
	}
	executorService.shutdown();
  }
} 
Output
4 * 5 = 20
2 * 3 = 6
6 * 7 = 42
sum: 68 

Reusing cyclic barrier
In CyclicBarrier, the cyclic barrier is reusable. Now find one more example. In this example, data to process are more than the number of CyclicBarrier party numbers i.e. 2 and data to process are 10. Once the 2 threads reach to the common barrier point, CyclicBarrier will trip and given Runnable starts executing. Again once two more threads reach to the common barrier point, CyclicBarrier will trip and given Runnable starts executing, and so on.
Find the example.
ReusingCyclicBarrier.java
package com.concretepage;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
class ReusingCyclicBarrier {
  class Worker implements Runnable {
	int task;
	CyclicBarrier barrier;
	Worker(int task, CyclicBarrier barrier) {
	  this.task = task;
	  this.barrier = barrier;
	}
	public void run() {
	  System.out.println(String.format("Task %d is processed and reached to common barrier point.", task));
	  try {
		barrier.await();
	  } catch (InterruptedException e) {
		e.printStackTrace();
	  } catch (BrokenBarrierException e) {
		e.printStackTrace();
	  }
	}
  }
  public static void main(String[] args) {
	Runnable barrierAction = () -> {
	  System.out.println("Barrier is tripped");
	};
	final int partySize = 2;
	final int poolSize = 2;
	CyclicBarrier barrier = new CyclicBarrier(partySize, barrierAction);
	ReusingCyclicBarrier ob = new ReusingCyclicBarrier();
	ExecutorService executorService = Executors.newFixedThreadPool(poolSize);
	for (int i = 1; i <= 10; i++) {
	  executorService.execute(ob.new Worker(i, barrier));
	}
	executorService.shutdown();
  }
} 
Output
Task 2 is processed and reached to common barrier point.
Task 1 is processed and reached to common barrier point.
Barrier is tripped
Task 3 is processed and reached to common barrier point.
Task 4 is processed and reached to common barrier point.
Barrier is tripped
Task 5 is processed and reached to common barrier point.
Task 6 is processed and reached to common barrier point.
Barrier is tripped
Task 7 is processed and reached to common barrier point.
Task 8 is processed and reached to common barrier point.
Barrier is tripped
Task 9 is processed and reached to common barrier point.
Task 10 is processed and reached to common barrier point.
Barrier is tripped 

Reference

Class CyclicBarrier
POSTED BY
ARVIND RAI
ARVIND RAI











©2023 concretepage.com | Privacy Policy | Contact Us