Lesson 5
Image Processing Pipeline with Exchanger
Image Processing Pipeline with Exchanger

Welcome back! In previous lessons, we covered Java concurrency utilities such as CompletableFuture, Phaser, and Semaphore. Today, we’ll revisit the Exchanger class and explore how to use it in an image processing pipeline. This lesson will help you better understand how to handle synchronization between threads and how to exchange data in a producer-consumer scenario effectively.

What You'll Learn

By the end of this lesson, you’ll learn how to:

  • Use Exchanger to synchronize data between threads.
  • Implement a two-stage producer-consumer pattern in a multi-threaded environment.
  • Understand how to handle synchronization challenges in concurrent applications.

In this lesson, we’ll simulate an image processing pipeline where one thread loads raw image data and another applies a filter. The Exchanger will ensure smooth data exchange between these two stages, allowing us to process images step by step in a synchronized way.

Revisiting Exchanger in Thread Synchronization

The Java Exchanger class provides a synchronization point where two threads can meet and swap data. This class is particularly useful when two tasks need to exchange information before continuing their execution. In many real-world applications, such as image processing pipelines, manufacturing assembly lines, or even communication systems, there are multiple stages where different workers (or threads) need to hand off data or results to the next stage.

In this lesson, we’ll simulate an image processing pipeline where one thread (ImageLoader) loads raw image data, and another thread (ImageFilter) applies a filter to that data. These two threads will synchronize using the Exchanger, ensuring they don’t move forward until both are ready to exchange the data.

Setting Up the ImageLoader

The ImageLoader is responsible for simulating the loading of raw image data. After it loads the image, it passes the data to the ImageFilter for further processing. Let’s take a look at the ImageLoader implementation:

Java
1import java.util.concurrent.Exchanger; 2 3public class ImageLoader implements Runnable { 4 private final Exchanger<String> exchanger; 5 6 public ImageLoader(Exchanger<String> exchanger) { 7 this.exchanger = exchanger; 8 } 9 10 @Override 11 public void run() { 12 try { 13 // Simulate loading raw image data 14 String imageData = "RawImageData"; 15 System.out.println(Thread.currentThread().getName() + " loaded image."); 16 17 // Pass raw data to the ImageFilter via Exchanger 18 exchanger.exchange(imageData); 19 20 // Receive the processed image data from the ImageFilter 21 imageData = exchanger.exchange(null); 22 System.out.println(Thread.currentThread().getName() + " received processed image: " + imageData); 23 } catch (InterruptedException e) { 24 Thread.currentThread().interrupt(); 25 System.err.println(Thread.currentThread().getName() + " was interrupted."); 26 } 27 } 28}

In this class, the ImageLoader does the following:

  • Loads raw image data: We simulate this by initializing the imageData variable with a string representing raw image data.
  • Uses exchanger.exchange(): The exchange() method is where the ImageLoader passes the raw image data to the ImageFilter thread. This method blocks until the other thread (in this case, ImageFilter) is ready to receive and exchange the data.
  • Receives processed image data: After ImageFilter processes the image, the ImageLoader retrieves the processed data using another exchange() call.

This setup ensures that the ImageLoader waits until ImageFilter is ready to process the data before proceeding. If either thread is not ready, the other will wait at the exchange point, ensuring synchronization.

Setting Up the ImageFilter

Now, let’s move on to the ImageFilter, which simulates the processing of raw image data. After processing, it sends the processed data back to the ImageLoader:

Java
1import java.util.concurrent.Exchanger; 2 3public class ImageFilter implements Runnable { 4 private final Exchanger<String> exchanger; 5 6 public ImageFilter(Exchanger<String> exchanger) { 7 this.exchanger = exchanger; 8 } 9 10 @Override 11 public void run() { 12 try { 13 // Receive the raw image data from the ImageLoader 14 String imageData = exchanger.exchange(null); 15 System.out.println(Thread.currentThread().getName() + " received image for processing."); 16 17 // Process the image (simulated by adding "Processed" to the data) 18 imageData = processImage(imageData); 19 System.out.println(Thread.currentThread().getName() + " processed image."); 20 21 // Pass the processed image back to the ImageLoader 22 exchanger.exchange(imageData); 23 } catch (InterruptedException e) { 24 Thread.currentThread().interrupt(); 25 System.err.println(Thread.currentThread().getName() + " was interrupted."); 26 } 27 } 28 29 private String processImage(String data) { 30 // Simulate image processing by modifying the data 31 return "Processed" + data; 32 } 33}

In the ImageFilter class:

  • Receives raw image data: The ImageFilter uses exchanger.exchange() to receive the raw image data from ImageLoader. This method blocks until the ImageLoader thread is ready to hand off the data.
  • Processes the image: The processImage() method simulates image processing by simply appending the word "Processed" to the image data string. In a real application, this could represent complex image transformations.
  • Sends processed data back: Once the processing is complete, the ImageFilter uses exchanger.exchange() again to send the processed data back to the ImageLoader.

The ImageFilter also synchronizes with the ImageLoader at both points of data exchange, ensuring that the raw image is fully processed before continuing.

Bringing It Together

Now that we have both ImageLoader and ImageFilter set up, let’s bring them together in the main class:

Java
1import java.util.concurrent.Exchanger; 2 3public class Main { 4 public static void main(String[] args) { 5 // Create an Exchanger instance for exchanging image data between threads 6 Exchanger<String> exchanger = new Exchanger<>(); 7 8 // Create and start the ImageLoader and ImageFilter threads 9 Thread loader = new Thread(new ImageLoader(exchanger), "Loader"); 10 Thread filter = new Thread(new ImageFilter(exchanger), "Filter"); 11 12 loader.start(); 13 filter.start(); 14 15 // Ensure the main thread waits for both threads to complete 16 try { 17 loader.join(); 18 filter.join(); 19 } catch (InterruptedException e) { 20 Thread.currentThread().interrupt(); 21 System.err.println("Main thread was interrupted."); 22 } 23 24 System.out.println("Image processing pipeline completed."); 25 } 26}

Here’s what happens in the Main class:

  • Creating the Exchanger: We create an instance of Exchanger<String> that will be used by both the ImageLoader and ImageFilter threads to exchange data.
  • Starting the threads: The ImageLoader and ImageFilter threads are started using loader.start() and filter.start(). Each thread runs concurrently, performing its tasks and synchronizing through the Exchanger.
  • Joining the threads: The loader.join() and filter.join() calls ensure that the main thread waits for both worker threads to finish before printing the final completion message.
Why It Matters

The Exchanger class is a powerful tool for handling synchronization between two threads in a multi-stage process. This image processing pipeline demonstrates how Exchanger can be used to create seamless transitions between threads performing different tasks. This approach is not only useful for image processing but also for many real-world applications where data needs to be handed off between stages, such as assembly lines, data pipelines, or streaming services.

Now that you’ve seen how Exchanger works in this context, it’s time to reinforce your understanding with some hands-on practice. Let’s move on to the practice section, where you’ll get to apply what you’ve learned to similar challenges!

Enjoy this lesson? Now it's time to practice with Cosmo!
Practice is how you turn knowledge into actual skills.