Welcome to the course on Mastering Java Concurrency with Practice! In this course, we’ll explore how to apply concurrency utilities such as Semaphores, Phasers, CompletableFuture, and more to real-world problems. Each unit will introduce a problem scenario, walk through its solution using concurrency tools, and follow up with similar practice problems to reinforce your understanding.
In this unit, we focus on Asynchronous File Processing using CompletableFuture. This lesson will build on the concurrency concepts you’ve already learned, showing you how to handle multiple file operations asynchronously and keep your programs efficient and responsive.
In this unit, you’ll learn how to:
- Apply
CompletableFuture
to handle asynchronous tasks. - Manage exceptions in an asynchronous environment.
- Combine multiple asynchronous operations.
- Aggregate results from concurrent file tasks efficiently.
By now, you should be comfortable with basic threading concepts, so we’ll dive deeper into how asynchronous file processing works in practice using CompletableFuture
.
In a previous lesson, you learned about CompletableFuture
, a versatile class in Java’s java.util.concurrent
package that simplifies asynchronous programming. It allows you to handle tasks without blocking the main thread, combine multiple asynchronous computations, handle exceptions gracefully, and chain operations together.
As a quick recap:
supplyAsync()
runs a task asynchronously and returns aCompletableFuture
representing the future result of that task.thenApply()
andthenAccept()
allow you to process the result of a completed task.exceptionally()
provides a way to handle exceptions in an asynchronous pipeline.
In this lesson, we’ll use CompletableFuture
to process a series of text files asynchronously. Each file will be processed on a separate thread, and once all files have been processed, we’ll aggregate the results to get a total word count. This approach will allow us to process multiple files simultaneously, improving efficiency and responsiveness, particularly when dealing with large files.
Now, let’s see how to set up the AsynchronousFileProcessing class and its methods.
We’ll start by creating a class that manages asynchronous file processing. This class will have an ExecutorService
to handle the threads and methods to process files asynchronously.
Java1public class AsynchronousFileProcessing { 2 3 private final ExecutorService executor; 4 5 public AsynchronousFileProcessing(ExecutorService executor) { 6 this.executor = executor; 7 } 8}
This class will include methods for counting words in a file, processing files asynchronously, and aggregating the results.
Since we’ll be using Java Streams in this lesson to count words in files, let’s quickly recap the basics. Streams provide a functional way to process collections of data, offering a series of methods to transform, filter, and aggregate elements. In this case, we’ll use a stream to read file lines, split them into words, and count them. Streams make it easy to process large datasets in a concise and readable way, while leveraging the power of functional programming.
Here are some of the methods used and what they do:
Files.lines()
: Reads all lines from a file and returns them as aStream<String>
.Stream.flatMap()
: Flattens a stream of streams into a single stream, in this case, breaking each line into words.Stream.of()
: Creates a stream from the array of words after splitting a line.Stream.filter()
: Filters out empty words, ensuring only valid ones are counted.Stream.count()
: Aggregates the results by counting the total number of elements in the stream (in this case, the total number of words).
If you want to explore more about functional programming concepts, you can learn more in the "Functional Programming in Java" Path here (Opens in the same tab by default)
Next, we need a method to count the words in a file. This method reads the file line by line, splits each line into words, filters out empty words, and counts the total. If an error occurs while reading the file, we throw a runtime exception with a detailed error message.
Java1private long countWordsInFile(Path file) { 2 try (Stream<String> lines = Files.lines(file)) { 3 return lines 4 .flatMap(line -> Stream.of(line.split("\\s+"))) 5 .filter(word -> !word.isEmpty()) 6 .count(); 7 } catch (Exception e) { 8 throw new RuntimeException("Failed to read file: " + file.getFileName(), e); 9 } 10}
Now that we can process each file, we’ll define how to handle these tasks asynchronously.
We’ll create the processFileAsync
method, which counts the number of words in the file asynchronously. This method will handle errors that may occur during the processing.
Java1private CompletableFuture<Long> processFileAsync(Path file) { 2 return CompletableFuture.supplyAsync(() -> countWordsInFile(file), executor) 3 .exceptionally(ex -> { 4 System.err.println("Error processing " + file.getFileName() + ": " + ex.getMessage()); 5 return 0L; 6 }); 7}
This method processes the file asynchronously using supplyAsync()
. In case of errors, we handle them using exceptionally()
.
Once all files are processed, we need to aggregate the results to calculate the total word count. We can combine the CompletableFuture
instances from each file and aggregate their results.
Java1public CompletableFuture<Long> processFiles(List<Path> files) { 2 List<CompletableFuture<Long>> futures = files.stream() 3 .map(this::processFileAsync) 4 .collect(Collectors.toList()); 5 6 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 7 .thenApply(v -> futures.stream() 8 .map(CompletableFuture::join) 9 .reduce(0L, Long::sum)); 10}
This method waits for all the CompletableFuture
instances to complete and then sums up the word counts using join()
. Note how the reduce(0L, Long::sum)
operation aggregates the word counts from all CompletableFuture
instances by starting with an initial value of 0L
(zero). It then sums each word count using Long::sum
, effectively computing the total word count across all files.
Next, we’ll see how this class is used in the Main
class to process the files.
Before we start processing the files, we’ll need to set up the ExecutorService
, which manages the threads that will process the files.
Java1ExecutorService executor = Executors.newFixedThreadPool(4); 2AsynchronousFileProcessing asyncProcessor = new AsynchronousFileProcessing(executor);
This code creates a thread pool of 4 threads to process the files.
Here’s how we initiate the asynchronous file processing in the Main
class:
Java1CompletableFuture<Long> totalWordCountFuture = asyncProcessor.processFiles(files); 2totalWordCountFuture.thenAccept(total -> System.out.println("Total word count: " + total)) 3 .exceptionally(ex -> { 4 System.err.println("Error: " + ex.getMessage()); 5 return null; 6 }) 7 .get(); // Wait for all tasks to complete
This code processes the files asynchronously and prints the total word count after all tasks are completed.
We also need to ensure proper cleanup by handling any exceptions and shutting down the executor service.
Java1try { 2 totalWordCountFuture.get(); // Block until all tasks are complete 3} catch (InterruptedException | ExecutionException e) { 4 System.err.println("Error during file processing: " + e.getMessage()); 5 Thread.currentThread().interrupt(); 6} finally { 7 shutdownExecutor(executor); 8}
If an error occurs, it is caught and logged, and then we proceed to shut down the executor service.
Finally, we ensure the executor service shuts down properly to avoid resource leaks:
Java1private void shutdownExecutor(ExecutorService executor) { 2 executor.shutdown(); 3 try { 4 if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { 5 executor.shutdownNow(); 6 if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { 7 System.err.println("Executor did not terminate."); 8 } 9 } 10 } catch (InterruptedException e) { 11 executor.shutdownNow(); 12 Thread.currentThread().interrupt(); 13 } 14}
This method shuts down the executor service gracefully, and if it doesn’t shut down within 10 seconds, it forces a shutdown.
Asynchronous file processing is a powerful technique for modern applications that deal with large files or datasets. By processing files concurrently, you optimize resource usage and make your applications more efficient. This is especially valuable in systems such as web servers, data analysis applications, or file-processing services, where responsiveness is critical.
With CompletableFuture
, you can design systems that remain responsive even under heavy workloads.
Now that you’ve understood how asynchronous file processing works, let’s move to the practice section to see how this code performs in action and apply similar concepts in real-world scenarios.