Building on your understanding of concurrency, today we’ll implement a parallel merge sort algorithm using Java's Phaser class. In previous lessons, you explored the nuances of thread-safe data structures like an LRU cache. Now, we’ll take that knowledge further to tackle sorting algorithms in a concurrent environment, optimizing performance on multi-core systems.
By the end of this lesson, you will understand:
- The mechanics of parallel merge sort and how it divides and conquers large datasets.
- How the
Phaser
class helps synchronize multiple threads. - How to implement an efficient parallel merge sort using thread pools and phased synchronization.
This lesson will equip you with the tools to handle large datasets efficiently using parallelism and phased synchronization in Java.
Merge sort is a divide-and-conquer algorithm. It breaks down a dataset into smaller subarrays, sorts them, and then merges them back together. The parallel merge sort enhances this process by assigning different subarrays to different threads, allowing multiple parts of the array to be sorted simultaneously.
To coordinate these threads, we use Phaser, a synchronization aid that enables threads to wait for each other at specific points, called phases. This is essential in the parallel merge sort, where threads must wait until all chunks are sorted before merging them.
Here, we define the ParallelMergeSort
class. This class manages the overall sorting process, coordinating multiple threads using an ExecutorService
for parallel execution and a Phaser
for synchronizing the threads.
Java1public class ParallelMergeSort { 2 3 private final ExecutorService executor; 4 private final Phaser phaser; 5 private final int numThreads; 6 7 public ParallelMergeSort(ExecutorService executor, Phaser phaser, int numThreads) { 8 this.executor = executor; 9 this.phaser = phaser; 10 this.numThreads = numThreads; 11 } 12}
In this code, ParallelMergeSort
takes three parameters: an ExecutorService
, which manages the thread pool, a Phaser
for synchronization, and the number of threads. The constructor initializes these fields, preparing the class for sorting. The executor
allows tasks to run concurrently across multiple threads, while the phaser
ensures that the threads wait for each other at specific points, such as after sorting or merging phases.
The first step in parallel merge sort is dividing the array into smaller chunks and sorting each chunk in parallel. This is done using an executor service that distributes tasks among multiple threads. The Phaser
class ensures that threads synchronize during different phases, such as sorting and merging.
Java1import java.util.Arrays; 2import java.util.concurrent.ExecutorService; 3import java.util.concurrent.Phaser; 4 5public class ParallelMergeSort { 6 7 public void sort(int[] array) throws InterruptedException { 8 int chunkSize = (array.length + numThreads - 1) / numThreads; 9 phaser.bulkRegister(numThreads); // Register threads with the Phaser 10 11 for (int i = 0; i < numThreads; i++) { 12 int left = i * chunkSize; 13 int right = Math.min(left + chunkSize - 1, array.length - 1); 14 15 if (left <= right) { 16 executor.submit(new SortTask(array, left, right)); 17 } else { 18 phaser.arriveAndDeregister(); // No work to do 19 } 20 } 21 22 phaser.arriveAndAwaitAdvance(); // Wait for sorting phase to complete 23 } 24}
In this part of the code, the array is divided into smaller chunks, and each chunk is assigned to a thread for sorting. The bulkRegister()
method is used to register all threads with the Phaser
so that they synchronize during each phase of the algorithm. Each chunk is sorted by submitting a SortTask
to the ExecutorService
, allowing the tasks to run in parallel. The arriveAndAwaitAdvance()
method ensures that no thread moves to the next phase until all sorting is completed.
Below is the SortTask
, which handles the sorting of individual chunks.
Java1private class SortTask implements Runnable { 2 private final int[] array; 3 private final int left; 4 private final int right; 5 6 public SortTask(int[] array, int left, int right) { 7 this.array = array; 8 this.left = left; 9 this.right = right; 10 } 11 12 @Override 13 public void run() { 14 Arrays.sort(array, left, right + 1); // Sort the assigned chunk 15 phaser.arriveAndDeregister(); // Signal that this task is complete 16 } 17}
Each SortTask
is responsible for sorting a specific chunk of the array, as determined by the left
and right
indices. The task uses Arrays.sort()
to perform the sort on that portion of the array. Once the sorting is complete, the task calls arriveAndDeregister()
to signal the Phaser
that the sorting phase for this thread is finished.
Once all chunks are sorted, the next phase begins: merging the sorted chunks. This merging process occurs in several steps, starting with small chunks and progressively merging larger ones, until the entire array is sorted.
Java1public void sort(int[] array) throws InterruptedException { 2 // Phase 1: Sort chunks in parallel as shown above... 3 4 int currentChunkSize = chunkSize; 5 while (currentChunkSize < array.length) { 6 int merges = (array.length + 2 * currentChunkSize - 1) / (2 * currentChunkSize); 7 phaser.bulkRegister(merges); // Register merge tasks 8 9 for (int i = 0; i < merges; i++) { 10 int left = i * 2 * currentChunkSize; 11 int mid = left + currentChunkSize - 1; 12 int right = Math.min(left + 2 * currentChunkSize - 1, array.length - 1); 13 14 if (mid < right) { 15 executor.submit(new MergeTask(array, left, mid, right)); 16 } else { 17 phaser.arriveAndDeregister(); // No work to do 18 } 19 } 20 21 phaser.arriveAndAwaitAdvance(); // Wait for merging phase to complete 22 currentChunkSize *= 2; // Double chunk size for next phase 23 } 24 25 executor.shutdown(); // Shut down the executor service 26 System.out.println("Sorted array: " + Arrays.toString(array)); 27}
In this section, the sorted chunks are merged together in a phased manner. The process starts with the smallest chunks and proceeds to merge larger and larger ones. The Phaser
ensures that no thread moves to the next phase until all merges in the current phase are completed. The executor.submit()
method runs each merge in parallel, and the process continues until the entire array is sorted.
Here is the MergeTask
that handles merging two sorted subarrays.
Java1private class MergeTask implements Runnable { 2 private final int[] array; 3 private final int left; 4 private final int mid; 5 private final int right; 6 7 public MergeTask(int[] array, int left, int mid, int right) { 8 this.array = array; 9 this.left = left; 10 this.mid = mid; 11 this.right = right; 12 } 13 14 @Override 15 public void run() { 16 merge(array, left, mid, right); 17 phaser.arriveAndDeregister(); // Signal merge completion 18 } 19}
Each MergeTask
merges two sorted subarrays. The merge happens between indices left
, mid
, and right
. This task ensures that two sorted sections of the array are combined into one larger sorted section. After the merge is completed, the phaser.arriveAndDeregister()
method signals the completion of this merging task.
The merge()
method performs the merging operation.
Java1private void merge(int[] array, int left, int mid, int right) { 2 int[] leftArr = Arrays.copyOfRange(array, left, mid + 1); 3 int[] rightArr = Arrays.copyOfRange(array, mid + 1, right + 1); 4 5 int i = 0, j = 0, k = left; 6 7 while (i < leftArr.length && j < rightArr.length) { 8 array[k++] = (leftArr[i] <= rightArr[j]) ? leftArr[i++] : rightArr[j++]; 9 } 10 while (i < leftArr.length) { 11 array[k++] = leftArr[i++]; 12 } 13 while (j < rightArr.length) { 14 array[k++] = rightArr[j++]; 15 } 16}
The merge()
method compares elements from two sorted subarrays and merges them into a single sorted array. The method copies the elements from the left and right subarrays and places the smaller element into the original array. After one of the subarrays is fully merged, the remaining elements from the other subarray are added to the array.
Here’s how the program is executed from the Main
class:
Java1import java.util.Arrays; 2import java.util.concurrent.ExecutorService; 3import java.util.concurrent.Executors; 4import java.util.concurrent.Phaser; 5 6public class Main { 7 8 private static final int ARRAY_SIZE = 16; 9 private static final int NUM_THREADS = 4; 10 11 public static void main(String[] args) throws InterruptedException { 12 new Main().startParallelMergeSort(); 13 } 14 15 public void startParallelMergeSort() throws InterruptedException { 16 int[] array = new int[ARRAY_SIZE]; 17 fillRandom(array); 18 System.out.println("Unsorted array: " + Arrays 19 20.toString(array)); 21 22 ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); 23 Phaser phaser = new Phaser(1); // Main thread is a registered party 24 25 ParallelMergeSort sorter = new ParallelMergeSort(executor, phaser, NUM_THREADS); 26 sorter.sort(array); 27 } 28 29 private void fillRandom(int[] array) { 30 for (int i = 0; i < array.length; i++) { 31 array[i] = (int) (Math.random() * 100); 32 } 33 } 34}
In this Main
class, we create a random array of integers and print it before and after sorting. The ParallelMergeSort
class is instantiated with a fixed thread pool and a Phaser
instance. The startParallelMergeSort()
method initiates the parallel merge sort process.
Understanding and implementing parallel merge sort with Phaser
is essential because:
- Efficiency: Parallelizing this process enables faster sorting of large datasets, taking full advantage of modern multi-core CPUs.
- Synchronization: Phaser helps coordinate multiple threads without complex locking mechanisms, providing a cleaner approach to concurrency.
- Scalable Solutions: The approach allows you to handle increased data loads efficiently by scaling the number of threads.
By mastering parallel merge sort, you’ll be able to write scalable, efficient sorting algorithms that perform well on large datasets in real-world applications. Now, you're ready to apply this knowledge in the upcoming practice section!