Lesson 5
Implementing a Parallel Merge Sort with Phaser
Welcome to Parallel Merge Sort with Phaser

Building on your understanding of concurrency, today we’ll implement a parallel merge sort algorithm using Java's Phaser class. In previous lessons, you explored the nuances of thread-safe data structures like an LRU cache. Now, we’ll take that knowledge further to tackle sorting algorithms in a concurrent environment, optimizing performance on multi-core systems.

What You'll Learn

By the end of this lesson, you will understand:

  • The mechanics of parallel merge sort and how it divides and conquers large datasets.
  • How the Phaser class helps synchronize multiple threads.
  • How to implement an efficient parallel merge sort using thread pools and phased synchronization.

This lesson will equip you with the tools to handle large datasets efficiently using parallelism and phased synchronization in Java.

Understanding Parallel Merge Sort with Phaser

Merge sort is a divide-and-conquer algorithm. It breaks down a dataset into smaller subarrays, sorts them, and then merges them back together. The parallel merge sort enhances this process by assigning different subarrays to different threads, allowing multiple parts of the array to be sorted simultaneously.

To coordinate these threads, we use Phaser, a synchronization aid that enables threads to wait for each other at specific points, called phases. This is essential in the parallel merge sort, where threads must wait until all chunks are sorted before merging them.

ParallelMergeSort Class Initialization

Here, we define the ParallelMergeSort class. This class manages the overall sorting process, coordinating multiple threads using an ExecutorService for parallel execution and a Phaser for synchronizing the threads.

Java
1public class ParallelMergeSort { 2 3 private final ExecutorService executor; 4 private final Phaser phaser; 5 private final int numThreads; 6 7 public ParallelMergeSort(ExecutorService executor, Phaser phaser, int numThreads) { 8 this.executor = executor; 9 this.phaser = phaser; 10 this.numThreads = numThreads; 11 } 12}

In this code, ParallelMergeSort takes three parameters: an ExecutorService, which manages the thread pool, a Phaser for synchronization, and the number of threads. The constructor initializes these fields, preparing the class for sorting. The executor allows tasks to run concurrently across multiple threads, while the phaser ensures that the threads wait for each other at specific points, such as after sorting or merging phases.

Sorting Chunks in Parallel

The first step in parallel merge sort is dividing the array into smaller chunks and sorting each chunk in parallel. This is done using an executor service that distributes tasks among multiple threads. The Phaser class ensures that threads synchronize during different phases, such as sorting and merging.

Java
1import java.util.Arrays; 2import java.util.concurrent.ExecutorService; 3import java.util.concurrent.Phaser; 4 5public class ParallelMergeSort { 6 7 public void sort(int[] array) throws InterruptedException { 8 int chunkSize = (array.length + numThreads - 1) / numThreads; 9 phaser.bulkRegister(numThreads); // Register threads with the Phaser 10 11 for (int i = 0; i < numThreads; i++) { 12 int left = i * chunkSize; 13 int right = Math.min(left + chunkSize - 1, array.length - 1); 14 15 if (left <= right) { 16 executor.submit(new SortTask(array, left, right)); 17 } else { 18 phaser.arriveAndDeregister(); // No work to do 19 } 20 } 21 22 phaser.arriveAndAwaitAdvance(); // Wait for sorting phase to complete 23 } 24}

In this part of the code, the array is divided into smaller chunks, and each chunk is assigned to a thread for sorting. The bulkRegister() method is used to register all threads with the Phaser so that they synchronize during each phase of the algorithm. Each chunk is sorted by submitting a SortTask to the ExecutorService, allowing the tasks to run in parallel. The arriveAndAwaitAdvance() method ensures that no thread moves to the next phase until all sorting is completed.

SortTask Class

Below is the SortTask, which handles the sorting of individual chunks.

Java
1private class SortTask implements Runnable { 2 private final int[] array; 3 private final int left; 4 private final int right; 5 6 public SortTask(int[] array, int left, int right) { 7 this.array = array; 8 this.left = left; 9 this.right = right; 10 } 11 12 @Override 13 public void run() { 14 Arrays.sort(array, left, right + 1); // Sort the assigned chunk 15 phaser.arriveAndDeregister(); // Signal that this task is complete 16 } 17}

Each SortTask is responsible for sorting a specific chunk of the array, as determined by the left and right indices. The task uses Arrays.sort() to perform the sort on that portion of the array. Once the sorting is complete, the task calls arriveAndDeregister() to signal the Phaser that the sorting phase for this thread is finished.

Merging Sorted Chunks

Once all chunks are sorted, the next phase begins: merging the sorted chunks. This merging process occurs in several steps, starting with small chunks and progressively merging larger ones, until the entire array is sorted.

Java
1public void sort(int[] array) throws InterruptedException { 2 // Phase 1: Sort chunks in parallel as shown above... 3 4 int currentChunkSize = chunkSize; 5 while (currentChunkSize < array.length) { 6 int merges = (array.length + 2 * currentChunkSize - 1) / (2 * currentChunkSize); 7 phaser.bulkRegister(merges); // Register merge tasks 8 9 for (int i = 0; i < merges; i++) { 10 int left = i * 2 * currentChunkSize; 11 int mid = left + currentChunkSize - 1; 12 int right = Math.min(left + 2 * currentChunkSize - 1, array.length - 1); 13 14 if (mid < right) { 15 executor.submit(new MergeTask(array, left, mid, right)); 16 } else { 17 phaser.arriveAndDeregister(); // No work to do 18 } 19 } 20 21 phaser.arriveAndAwaitAdvance(); // Wait for merging phase to complete 22 currentChunkSize *= 2; // Double chunk size for next phase 23 } 24 25 executor.shutdown(); // Shut down the executor service 26 System.out.println("Sorted array: " + Arrays.toString(array)); 27}

In this section, the sorted chunks are merged together in a phased manner. The process starts with the smallest chunks and proceeds to merge larger and larger ones. The Phaser ensures that no thread moves to the next phase until all merges in the current phase are completed. The executor.submit() method runs each merge in parallel, and the process continues until the entire array is sorted.

MergeTask Class

Here is the MergeTask that handles merging two sorted subarrays.

Java
1private class MergeTask implements Runnable { 2 private final int[] array; 3 private final int left; 4 private final int mid; 5 private final int right; 6 7 public MergeTask(int[] array, int left, int mid, int right) { 8 this.array = array; 9 this.left = left; 10 this.mid = mid; 11 this.right = right; 12 } 13 14 @Override 15 public void run() { 16 merge(array, left, mid, right); 17 phaser.arriveAndDeregister(); // Signal merge completion 18 } 19}

Each MergeTask merges two sorted subarrays. The merge happens between indices left, mid, and right. This task ensures that two sorted sections of the array are combined into one larger sorted section. After the merge is completed, the phaser.arriveAndDeregister() method signals the completion of this merging task.

Merge Method

The merge() method performs the merging operation.

Java
1private void merge(int[] array, int left, int mid, int right) { 2 int[] leftArr = Arrays.copyOfRange(array, left, mid + 1); 3 int[] rightArr = Arrays.copyOfRange(array, mid + 1, right + 1); 4 5 int i = 0, j = 0, k = left; 6 7 while (i < leftArr.length && j < rightArr.length) { 8 array[k++] = (leftArr[i] <= rightArr[j]) ? leftArr[i++] : rightArr[j++]; 9 } 10 while (i < leftArr.length) { 11 array[k++] = leftArr[i++]; 12 } 13 while (j < rightArr.length) { 14 array[k++] = rightArr[j++]; 15 } 16}

The merge() method compares elements from two sorted subarrays and merges them into a single sorted array. The method copies the elements from the left and right subarrays and places the smaller element into the original array. After one of the subarrays is fully merged, the remaining elements from the other subarray are added to the array.

Running the Program

Here’s how the program is executed from the Main class:

Java
1import java.util.Arrays; 2import java.util.concurrent.ExecutorService; 3import java.util.concurrent.Executors; 4import java.util.concurrent.Phaser; 5 6public class Main { 7 8 private static final int ARRAY_SIZE = 16; 9 private static final int NUM_THREADS = 4; 10 11 public static void main(String[] args) throws InterruptedException { 12 new Main().startParallelMergeSort(); 13 } 14 15 public void startParallelMergeSort() throws InterruptedException { 16 int[] array = new int[ARRAY_SIZE]; 17 fillRandom(array); 18 System.out.println("Unsorted array: " + Arrays 19 20.toString(array)); 21 22 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); 23 Phaser phaser = new Phaser(1); // Main thread is a registered party 24 25 ParallelMergeSort sorter = new ParallelMergeSort(executor, phaser, NUM_THREADS); 26 sorter.sort(array); 27 } 28 29 private void fillRandom(int[] array) { 30 for (int i = 0; i < array.length; i++) { 31 array[i] = (int) (Math.random() * 100); 32 } 33 } 34}

In this Main class, we create a random array of integers and print it before and after sorting. The ParallelMergeSort class is instantiated with a fixed thread pool and a Phaser instance. The startParallelMergeSort() method initiates the parallel merge sort process.

Why Parallel Merge Sort with Phaser Matters

Understanding and implementing parallel merge sort with Phaser is essential because:

  • Efficiency: Parallelizing this process enables faster sorting of large datasets, taking full advantage of modern multi-core CPUs.
  • Synchronization: Phaser helps coordinate multiple threads without complex locking mechanisms, providing a cleaner approach to concurrency.
  • Scalable Solutions: The approach allows you to handle increased data loads efficiently by scaling the number of threads.

By mastering parallel merge sort, you’ll be able to write scalable, efficient sorting algorithms that perform well on large datasets in real-world applications. Now, you're ready to apply this knowledge in the upcoming practice section!

Enjoy this lesson? Now it's time to practice with Cosmo!
Practice is how you turn knowledge into actual skills.