Lesson 1
Asynchronous File Processing with CompletableFuture
Asynchronous File Processing with CompletableFuture

Welcome to the course on Mastering Java Concurrency with Practice! In this course, we’ll explore how to apply concurrency utilities such as Semaphores, Phasers, CompletableFuture, and more to real-world problems. Each unit will introduce a problem scenario, walk through its solution using concurrency tools, and follow up with similar practice problems to reinforce your understanding.

In this unit, we focus on Asynchronous File Processing using CompletableFuture. This lesson will build on the concurrency concepts you’ve already learned, showing you how to handle multiple file operations asynchronously and keep your programs efficient and responsive.

What You’ll Learn

In this unit, you’ll learn how to:

  • Apply CompletableFuture to handle asynchronous tasks.
  • Manage exceptions in an asynchronous environment.
  • Combine multiple asynchronous operations.
  • Aggregate results from concurrent file tasks efficiently.

By now, you should be comfortable with basic threading concepts, so we’ll dive deeper into how asynchronous file processing works in practice using CompletableFuture.

Recap: CompletableFuture and Its Role in Asynchronous Programming

In a previous lesson, you learned about CompletableFuture, a versatile class in Java’s java.util.concurrent package that simplifies asynchronous programming. It allows you to handle tasks without blocking the main thread, combine multiple asynchronous computations, handle exceptions gracefully, and chain operations together.

As a quick recap:

  • supplyAsync() runs a task asynchronously and returns a CompletableFuture representing the future result of that task.
  • thenApply() and thenAccept() allow you to process the result of a completed task.
  • exceptionally() provides a way to handle exceptions in an asynchronous pipeline.

In this lesson, we’ll use CompletableFuture to process a series of text files asynchronously. Each file will be processed on a separate thread, and once all files have been processed, we’ll aggregate the results to get a total word count. This approach will allow us to process multiple files simultaneously, improving efficiency and responsiveness, particularly when dealing with large files.

Now, let’s see how to set up the AsynchronousFileProcessing class and its methods.

Creating the AsynchronousFileProcessing Class

We’ll start by creating a class that manages asynchronous file processing. This class will have an ExecutorService to handle the threads and methods to process files asynchronously.

Java
1public class AsynchronousFileProcessing { 2 3 private final ExecutorService executor; 4 5 public AsynchronousFileProcessing(ExecutorService executor) { 6 this.executor = executor; 7 } 8}

This class will include methods for counting words in a file, processing files asynchronously, and aggregating the results.

Using Stream API

Since we’ll be using Java Streams in this lesson to count words in files, let’s quickly recap the basics. Streams provide a functional way to process collections of data, offering a series of methods to transform, filter, and aggregate elements. In this case, we’ll use a stream to read file lines, split them into words, and count them. Streams make it easy to process large datasets in a concise and readable way, while leveraging the power of functional programming.

Here are some of the methods used and what they do:

  • Files.lines(): Reads all lines from a file and returns them as a Stream<String>.
  • Stream.flatMap(): Flattens a stream of streams into a single stream, in this case, breaking each line into words.
  • Stream.of(): Creates a stream from the array of words after splitting a line.
  • Stream.filter(): Filters out empty words, ensuring only valid ones are counted.
  • Stream.count(): Aggregates the results by counting the total number of elements in the stream (in this case, the total number of words).

If you want to explore more about functional programming concepts, you can learn more in the "Functional Programming in Java" Path here (Opens in the same tab by default)

Counting Words in Each File

Next, we need a method to count the words in a file. This method reads the file line by line, splits each line into words, filters out empty words, and counts the total. If an error occurs while reading the file, we throw a runtime exception with a detailed error message.

Java
1private long countWordsInFile(Path file) { 2 try (Stream<String> lines = Files.lines(file)) { 3 return lines 4 .flatMap(line -> Stream.of(line.split("\\s+"))) 5 .filter(word -> !word.isEmpty()) 6 .count(); 7 } catch (Exception e) { 8 throw new RuntimeException("Failed to read file: " + file.getFileName(), e); 9 } 10}

Now that we can process each file, we’ll define how to handle these tasks asynchronously.

Processing Files Asynchronously

We’ll create the processFileAsync method, which counts the number of words in the file asynchronously. This method will handle errors that may occur during the processing.

Java
1private CompletableFuture<Long> processFileAsync(Path file) { 2 return CompletableFuture.supplyAsync(() -> countWordsInFile(file), executor) 3 .exceptionally(ex -> { 4 System.err.println("Error processing " + file.getFileName() + ": " + ex.getMessage()); 5 return 0L; 6 }); 7}

This method processes the file asynchronously using supplyAsync(). In case of errors, we handle them using exceptionally().

Aggregating Results from Multiple Files

Once all files are processed, we need to aggregate the results to calculate the total word count. We can combine the CompletableFuture instances from each file and aggregate their results.

Java
1public CompletableFuture<Long> processFiles(List<Path> files) { 2 List<CompletableFuture<Long>> futures = files.stream() 3 .map(this::processFileAsync) 4 .collect(Collectors.toList()); 5 6 return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) 7 .thenApply(v -> futures.stream() 8 .map(CompletableFuture::join) 9 .reduce(0L, Long::sum)); 10}

This method waits for all the CompletableFuture instances to complete and then sums up the word counts using join(). Note how the reduce(0L, Long::sum) operation aggregates the word counts from all CompletableFuture instances by starting with an initial value of 0L (zero). It then sums each word count using Long::sum, effectively computing the total word count across all files.

Next, we’ll see how this class is used in the Main class to process the files.

Configuring the ExecutorService

Before we start processing the files, we’ll need to set up the ExecutorService, which manages the threads that will process the files.

Java
1ExecutorService executor = Executors.newFixedThreadPool(4); 2AsynchronousFileProcessing asyncProcessor = new AsynchronousFileProcessing(executor);

This code creates a thread pool of 4 threads to process the files.

Processing Files in the Main Class

Here’s how we initiate the asynchronous file processing in the Main class:

Java
1CompletableFuture<Long> totalWordCountFuture = asyncProcessor.processFiles(files); 2totalWordCountFuture.thenAccept(total -> System.out.println("Total word count: " + total)) 3 .exceptionally(ex -> { 4 System.err.println("Error: " + ex.getMessage()); 5 return null; 6 }) 7 .get(); // Wait for all tasks to complete

This code processes the files asynchronously and prints the total word count after all tasks are completed.

Handling Exceptions and Shutting Down the Executor

We also need to ensure proper cleanup by handling any exceptions and shutting down the executor service.

Java
1try { 2 totalWordCountFuture.get(); // Block until all tasks are complete 3} catch (InterruptedException | ExecutionException e) { 4 System.err.println("Error during file processing: " + e.getMessage()); 5 Thread.currentThread().interrupt(); 6} finally { 7 shutdownExecutor(executor); 8}

If an error occurs, it is caught and logged, and then we proceed to shut down the executor service.

Shutting Down the Executor Service

Finally, we ensure the executor service shuts down properly to avoid resource leaks:

Java
1private void shutdownExecutor(ExecutorService executor) { 2 executor.shutdown(); 3 try { 4 if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { 5 executor.shutdownNow(); 6 if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { 7 System.err.println("Executor did not terminate."); 8 } 9 } 10 } catch (InterruptedException e) { 11 executor.shutdownNow(); 12 Thread.currentThread().interrupt(); 13 } 14}

This method shuts down the executor service gracefully, and if it doesn’t shut down within 10 seconds, it forces a shutdown.

Why It Matters

Asynchronous file processing is a powerful technique for modern applications that deal with large files or datasets. By processing files concurrently, you optimize resource usage and make your applications more efficient. This is especially valuable in systems such as web servers, data analysis applications, or file-processing services, where responsiveness is critical.

With CompletableFuture, you can design systems that remain responsive even under heavy workloads.

Now that you’ve understood how asynchronous file processing works, let’s move to the practice section to see how this code performs in action and apply similar concepts in real-world scenarios.

Enjoy this lesson? Now it's time to practice with Cosmo!
Practice is how you turn knowledge into actual skills.