Introduction

Concurrency happens in a program when more than one process appears to be running at the same time. It enables a program to achieve high performance and throughput by utilizing the capabilities of the underlying operating system and machine. As this concept has improved the execution of systems, it is hardly avoidable in modern development.

What you can avoid is implementing a lot of tedious or difficult functionalities on your own since Java Concurrent package contains a bunch of interesting classes that provide necessary and helpful features needed to implement multi-threaded applications. Here, we will do an overview of some of its more interesting ones.

Concurrent collections

Even though the Collections class in java.util package lets us create a synchronized version of a formerly unsynchronized collections class, the concurrent package implements more advanced techniques that encompass special algorithms designed for concurrent access. In addition, these collections are preferred over the synchronized collections for better performance and scalability.

BlockingQueue interface represents a queue which is thread safe to put elements into, and take elements out of from, i.e., multiple threads can be inserting and taking elements concurrently from a BlockingQueue, without any concurrency issues arising. As BlockingQueue is an interface, one of the interesting implementations found in this package is a DelayQueue. It allows us to sort elements based on their delay time which can be taken from the queue only if their time has expired.

In the example below, we show the usage of this class where we set different delays to different events and process them one by one. Note that elements of the DelayQueue must implement the Delayed interface and override the necessary methods.

public class Event implements Delayed {
    	private final long startTime;
    	private final String message;

    	public Event(final long startTime, final String message) {
        	this.startTime = startTime;
        	this.message = message;
    	}

    	public long getStartTime() {
        	return startTime;
    	}

    	public String getMessage() {
        	return message;
    	}

    	@Override
    	public long getDelay(final TimeUnit unit) {
        	final long difference = startTime - System.currentTimeMillis();
        	return unit.convert(diff, TimeUnit.MILLISECONDS);
    	}

    	@Override
    	public int compareTo(final Delayed o) {
        	return (int) (this.startTime - ((Event) o).startTime);
    	}
}

public void delayQueueExample() throws InterruptedException {
    	final DelayQueue<Event> queue = new DelayQueue<>();

    	queue.offer(new Event(System.currentTimeMillis() + 20000, "1"));
    	queue.offer(new Event(System.currentTimeMillis() + 10000, "2"));
    	LOGGER.info("Done");

    	final Event event1 = queue.take();
    	LOGGER.info(event1.getMessage());

    	final Event event2 = queue.take();
    	LOGGER.info(event2.getMessage());
}

The result of the code above will be:

2022-04-04 15:07:37.433  INFO main com.abh.Example : Done
2022-04-04 15:07:47.428  INFO main com.abh.Example : 2
2022-04-04 15:07:57.427  INFO main com.abh.Example : 1

Atomic Variables

While designing concurrent algorithms, we also have to consider the behavior of accessing the value of a variable. One of the ways to go about this is using Compare and Swap technique which compares the value of a variable with an expected value, and if the values are equal then swaps the value of the variable for a new value. Java Concurrent package offers a list of atomic classes that utilizes the underlying compare and swap features of the CPU our application is running on. Handling access and updates to a single variable of the corresponding type can be done using AtomicBoolean, AtomicInteger, AtomicLong and AtomicReference classes.

As one of the most common scenarios in concurrent programming is updating of numeric counters accessed by multiple threads, the obvious example of the described issue would be a multi-threaded counter. We can use the AtomicLong class to implement the counter and make sure multiple threads will never get the same result.

private final AtomicLong counter; 

public int getNextUniqueIndex() { 
    	return counter.getAndIncrement(); 
}

It is possible to achieve the same synchronization guarantees with appropriate synchronized declarations, but the beauty of AtomicLong and similar classes is that the thread-safety is built into the actual object itself. You don’t need to worry about the possible interleavings of every method that happens to access the value anymore.

Concurrent Accumulators

As previously mentioned, atomic variables can be a great solution to implement updating of numeric counters accessed by multiple threads. However, they require a higher degree of familiarity to implement the required semantics correctly. Concurrent package offers another solution at the framework level with concurrent accumulator classes. They allow us a very efficient, lock-free, thread-safe algorithm that is preferable to atomic variables.

LongAccumulator updates a value using a supplied function. The function is passed as a first argument in the constructor, and the second argument indicates the initial value of the accumulator. An important thing to note is that the supplied function must be side-effects free and not depend on the order of accumulation for the accumulator to work correctly.

In the following example, we will create a LongAccumulator which adds a new value to the value that was already in the accumulator. The initial value is set to zero and for every number passed as an argument to the accumulate() method, sum() function will be invoked with the previous value.

final LongAccumulator accumulator = new LongAccumulator(Long::sum, 0);
Thread[] threads = new Thread[100];

for(int i = 0; i < 100; i++) {
    	final int value = i;
    	threads[i] = new Thread(() -> accumulator.accumulate(value));
}

for(int i = 0; i < 100; i++) {
    	threads[i].start();
}

for(int i = 0; i < 100; i++) {
    	threads[i].join();
}

LOGGER.info(String.valueOf(accumulator.get()));

The result of the code above will be:

2022-04-04 15:24:04.821  INFO main com.abh.Example : 4950

Synchronization

Concurrent package provides different synchronization aids like CountDownLatch and CyclicBarrier, but we are going to be focused on a reusable and more flexible solution Phaser. One prominent advantage is that the number of threads registered to synchronize on a Phaser can change over time. It could be used to synchronize a single phase, however, as the name suggests, it is more suitable for use where it is required to synchronize threads over multiple phases of activity.

In the following example, we set a barrier of three threads to arrive before going to the next phase of execution. Then we invoke the method arriveAndAwaitAdvance() on the Phaser instance. It blocks the thread until all the three threads arrive and repeats the same for three phases before termination. Note that the sequence in which the threads get to log may vary from execution to execution. Sometimes Thread-0 logs first, sometimes Thread-1 logs first etc.

public class CustomThread implements Runnable {
    	final Phaser phaser;

    	public CustomThread(final Phaser phaser) {
        	this.phaser = phaser;
        	phaser.register();
        	new Thread(this).start();
    	}

    	@Override 
      public void run() {
        	LOGGER.info("Phase One");
        	phaser.arriveAndAwaitAdvance();

        	LOGGER.info("Phase Two");
        	phaser.arriveAndAwaitAdvance();

        	LOGGER.info("Phase Three");
        	phaser.arriveAndDeregister();
    	}
}

public void phaserExample() {
    	final Phaser phaser = new Phaser();
    	phaser.register();

    	new CustomThread(phaser);
    	new CustomThread(phaser);
    	new CustomThread(phaser);

    	// Wait for all threads to complete phase One
    	phaser.arriveAndAwaitAdvance();

    	// Wait for all threads to complete phase Two
    	phaser.arriveAndAwaitAdvance();

    	// Wait for all threads to complete phase Three
    	phaser.arriveAndAwaitAdvance();

    	// Deregister the main thread
    	phaser.arriveAndDeregister();
    	LOGGER.info("Phaser terminated");
}

The result of the code above will be:

2022-04-04 15:36:06.779  INFO Thread-2 com.abh.Example : Phase One
2022-04-04 15:36:06.779  INFO Thread-1 com.abh.Example : Phase One
2022-04-04 15:36:06.779  INFO Thread-0 com.abh.Example : Phase One
2022-04-04 15:36:06.812  INFO Thread-0 com.abh.Example : Phase Two
2022-04-04 15:36:06.812  INFO Thread-1 com.abh.Example : Phase Two
2022-04-04 15:36:06.812  INFO Thread-2 com.abh.Example : Phase Two
2022-04-04 15:36:06.813  INFO Thread-1 com.abh.Example : Phase Three
2022-04-04 15:36:06.813  INFO Thread-0 com.abh.Example : Phase Three
2022-04-04 15:36:06.813  INFO Thread-2 com.abh.Example : Phase Three
2022-04-04 15:36:06.814  INFO main com.abh.Example : Phaser terminated

Conclusion

This was a brief introduction to the Java Concurrent package with short guidelines and descriptions of some of its interesting features. Each of them is worth delving into and learning more about, but the package is also offering more lightweight and flexible tools which should help you to avoid reinventing the wheel.

If you are interested in looking more into it, you can check out the documentation with the list of all classes in this package here.


“Java Concurrent package overview” Tech Bite was brought to you by Lejla Mehmedagić, Junior Software Engineer at Atlantbh.

Tech Bites are tips, tricks, snippets or explanations about various programming technologies and paradigms, which can help engineers with their everyday job.

Leave a Reply