Welcome back! In previous lessons, we covered Java concurrency utilities such as CompletableFuture
, Phaser
, and Semaphore
. Today, we’ll revisit the Exchanger
class and explore how to use it in an image processing pipeline. This lesson will help you better understand how to handle synchronization between threads and how to exchange data in a producer-consumer scenario effectively.
By the end of this lesson, you’ll learn how to:
- Use
Exchanger
to synchronize data between threads. - Implement a two-stage producer-consumer pattern in a multi-threaded environment.
- Understand how to handle synchronization challenges in concurrent applications.
In this lesson, we’ll simulate an image processing pipeline where one thread loads raw image data and another applies a filter. The Exchanger
will ensure smooth data exchange between these two stages, allowing us to process images step by step in a synchronized way.
The Java Exchanger
class provides a synchronization point where two threads can meet and swap data. This class is particularly useful when two tasks need to exchange information before continuing their execution. In many real-world applications, such as image processing pipelines, manufacturing assembly lines, or even communication systems, there are multiple stages where different workers (or threads) need to hand off data or results to the next stage.
In this lesson, we’ll simulate an image processing pipeline where one thread (ImageLoader
) loads raw image data, and another thread (ImageFilter
) applies a filter to that data. These two threads will synchronize using the Exchanger
, ensuring they don’t move forward until both are ready to exchange the data.
The ImageLoader
is responsible for simulating the loading of raw image data. After it loads the image, it passes the data to the ImageFilter
for further processing. Let’s take a look at the ImageLoader
implementation:
Java1import java.util.concurrent.Exchanger; 2 3public class ImageLoader implements Runnable { 4 private final Exchanger<String> exchanger; 5 6 public ImageLoader(Exchanger<String> exchanger) { 7 this.exchanger = exchanger; 8 } 9 10 @Override 11 public void run() { 12 try { 13 // Simulate loading raw image data 14 String imageData = "RawImageData"; 15 System.out.println(Thread.currentThread().getName() + " loaded image."); 16 17 // Pass raw data to the ImageFilter via Exchanger 18 exchanger.exchange(imageData); 19 20 // Receive the processed image data from the ImageFilter 21 imageData = exchanger.exchange(null); 22 System.out.println(Thread.currentThread().getName() + " received processed image: " + imageData); 23 } catch (InterruptedException e) { 24 Thread.currentThread().interrupt(); 25 System.err.println(Thread.currentThread().getName() + " was interrupted."); 26 } 27 } 28}
In this class, the ImageLoader
does the following:
- Loads raw image data: We simulate this by initializing the
imageData
variable with a string representing raw image data. - Uses
exchanger.exchange()
: Theexchange()
method is where theImageLoader
passes the raw image data to theImageFilter
thread. This method blocks until the other thread (in this case,ImageFilter
) is ready to receive and exchange the data. - Receives processed image data: After
ImageFilter
processes the image, theImageLoader
retrieves the processed data using anotherexchange()
call.
This setup ensures that the ImageLoader
waits until ImageFilter
is ready to process the data before proceeding. If either thread is not ready, the other will wait at the exchange point, ensuring synchronization.
Now, let’s move on to the ImageFilter
, which simulates the processing of raw image data. After processing, it sends the processed data back to the ImageLoader
:
Java1import java.util.concurrent.Exchanger; 2 3public class ImageFilter implements Runnable { 4 private final Exchanger<String> exchanger; 5 6 public ImageFilter(Exchanger<String> exchanger) { 7 this.exchanger = exchanger; 8 } 9 10 @Override 11 public void run() { 12 try { 13 // Receive the raw image data from the ImageLoader 14 String imageData = exchanger.exchange(null); 15 System.out.println(Thread.currentThread().getName() + " received image for processing."); 16 17 // Process the image (simulated by adding "Processed" to the data) 18 imageData = processImage(imageData); 19 System.out.println(Thread.currentThread().getName() + " processed image."); 20 21 // Pass the processed image back to the ImageLoader 22 exchanger.exchange(imageData); 23 } catch (InterruptedException e) { 24 Thread.currentThread().interrupt(); 25 System.err.println(Thread.currentThread().getName() + " was interrupted."); 26 } 27 } 28 29 private String processImage(String data) { 30 // Simulate image processing by modifying the data 31 return "Processed" + data; 32 } 33}
In the ImageFilter
class:
- Receives raw image data: The
ImageFilter
usesexchanger.exchange()
to receive the raw image data fromImageLoader
. This method blocks until theImageLoader
thread is ready to hand off the data. - Processes the image: The
processImage()
method simulates image processing by simply appending the word "Processed" to the image data string. In a real application, this could represent complex image transformations. - Sends processed data back: Once the processing is complete, the
ImageFilter
usesexchanger.exchange()
again to send the processed data back to theImageLoader
.
The ImageFilter
also synchronizes with the ImageLoader
at both points of data exchange, ensuring that the raw image is fully processed before continuing.
Now that we have both ImageLoader
and ImageFilter
set up, let’s bring them together in the main class:
Java1import java.util.concurrent.Exchanger; 2 3public class Main { 4 public static void main(String[] args) { 5 // Create an Exchanger instance for exchanging image data between threads 6 Exchanger<String> exchanger = new Exchanger<>(); 7 8 // Create and start the ImageLoader and ImageFilter threads 9 Thread loader = new Thread(new ImageLoader(exchanger), "Loader"); 10 Thread filter = new Thread(new ImageFilter(exchanger), "Filter"); 11 12 loader.start(); 13 filter.start(); 14 15 // Ensure the main thread waits for both threads to complete 16 try { 17 loader.join(); 18 filter.join(); 19 } catch (InterruptedException e) { 20 Thread.currentThread().interrupt(); 21 System.err.println("Main thread was interrupted."); 22 } 23 24 System.out.println("Image processing pipeline completed."); 25 } 26}
Here’s what happens in the Main
class:
- Creating the Exchanger: We create an instance of
Exchanger<String>
that will be used by both theImageLoader
andImageFilter
threads to exchange data. - Starting the threads: The
ImageLoader
andImageFilter
threads are started usingloader.start()
andfilter.start()
. Each thread runs concurrently, performing its tasks and synchronizing through theExchanger
. - Joining the threads: The
loader.join()
andfilter.join()
calls ensure that the main thread waits for both worker threads to finish before printing the final completion message.
The Exchanger
class is a powerful tool for handling synchronization between two threads in a multi-stage process. This image processing pipeline demonstrates how Exchanger
can be used to create seamless transitions between threads performing different tasks. This approach is not only useful for image processing but also for many real-world applications where data needs to be handed off between stages, such as assembly lines, data pipelines, or streaming services.
Now that you’ve seen how Exchanger
works in this context, it’s time to reinforce your understanding with some hands-on practice. Let’s move on to the practice section, where you’ll get to apply what you’ve learned to similar challenges!