What is a CompletableFuture?

What is a CompletableFuture?

Java 8 introduced the CompletableFuture class. Along with the Future interface, it also implemented the CompletionStage interface. This interface defines the contract for an asynchronous computation step that we can combine with other steps.

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

CompletableFuture is used for asynchronous programming in Java. Asynchronous programming is a means of writing non-blocking code by running a task on a separate thread than the main application thread and notifying the main thread about its progress, completion or failure. This way, your main thread does not block/wait for the completion of the task and it can execute other tasks in parallel. Having this kind of parallelism greatly improves the performance of your programs.

CompletableFuture is at the same time a building block and a framework, with about 50 different methods for

  1. composing,
  2. combining, and executing asynchronous computation steps and
  3. handling errors.

Such a large API can be overwhelming, but these mostly fall in several clear and distinct use cases.

Since the CompletableFuture class implements the CompletionStage interface, we first need to understand the contract of that interface. It represents a stage of a certain computation which can be done either synchronously or asynchronously. You can think of it as just a single unit of a pipeline of computations that ultimately generates a final result of interest. This means that several CompletionStages can be chained together so that one stage’s completion triggers the execution of another stage, which in turn triggers another, and so on.

In addition to implementing the CompletionStage interface, CompletableFuture also implements Future, which represents a pending asynchronous event, with the ability to explicitly complete this Future, hence the name CompletableFuture.

CompletableFuture collects all the features of ListenableFuture in Guava (https://github.com/google/guava/wiki/listenablefutureexplained) with SettableFuture (https://github.com/google/guava/blob/master/guava/src/com/google/common/util/concurrent/SettableFuture.java).

Moreover built-in lambda support brings it closer to Scala/Akka futures. Akka toolkit

Reading material

  1. https://www.nurkiewicz.com/2013/05/java-8-definitive-guide-to.html?m=1
  2. https://www.baeldung.com/java-completablefuture

Implementation

https://github.com/explorer436/programming-playground/tree/main/java-playground/my-implementations/java-multithreading-and-concurrency-demo/04-java-concurrency-api/src/main/java/com/my/company

Using CompletableFuture with predefined values/results

The static utility method is completedFuture(value) that returns already completed Future object. Might be useful for testing or when writing some adapter layer.

The simplest case creates an already completed CompletableFuture with a predefined result. Usually, this may act as the starting stage in your computation.

If we already know the result of a computation, we can use the static completedFuture method with an argument that represents a result of this computation. Consequently, the get method of the Future will never block, immediately returning this result instead.

public CompletableFuture<String> returnCompletedFuture() throws InterruptedException {

    CompletableFuture<String> completableFuture = CompletableFuture.completedFuture("Already completed!");

    return completableFuture;

}

Using CompletableFuture as a simple Future

First of all, the CompletableFuture class implements the Future interface. So, we can use it as a Future implementation, but with additional completion logic.

For example, we can create an instance of CompletableFuture out of thin air with a no-arg constructor to represent some future result, and give it to our clients, and complete it at some time in the future using the complete method. The consumers may use the get method to block the current thread until this result is provided.

public CompletableFuture<String> ask() {

    final CompletableFuture<String> future = new CompletableFuture<>();
    //...
    return future;

}

Notice that this future is not associated with any Callable<String>, no thread pool, no asynchronous job. If now the client code calls ask().get() it will block forever. If it registers some completion callbacks, they will never fire.

How will this complete?

Without using a computation in another thread

future.complete("42")

…and at this very moment all clients blocked on Future.get() will get the result string. Also completion callbacks will fire immediately. This comes quite handy when you want to represent a task in the future, but not necessarily computational task running on some thread of execution.

By using a computation in another thread

We can spin off a computation in another thread and returns the Future immediately.

When the computation is done, the method completes the Future by providing the result to the complete method.

To spin off the computation, we use the Executor API. This method of creating and completing a CompletableFuture can be used together with any concurrency mechanism or API, including raw threads.

public Future<String> calculateHelloAsync() throws InterruptedException {

    CompletableFuture<String> completableFuture = new CompletableFuture<>();

    // CompletableFuture.complete() method is used to manually complete a Future.
    // The get() method blocks until the Future is complete. So, if the complete() is not implemented, the get call will be blocked forever.
    Executors.newCachedThreadPool().submit(() -> {
        Thread.sleep(1000 * 2);
        completableFuture.complete("Hello");
        return null;
    });

    return completableFuture;
}

get()

The CompletableFuture.get() method is blocking. It waits until the Future is completed and returns the result after its completion.

getNow()

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#getNow-T-

The getNow(null) returns the result if completed (which is obviously the case), but otherwise returns null (the argument).

There is CompletableFuture.getNow(valueIfAbsent) method that doesn’t block but if the Future is not completed yet, returns default value. Useful when building robust systems where we don’t want to wait too much.

join()

The CompletableFuture.join() method is similar to the get method, but it throws an unchecked exception in case the Future does not complete normally.

This makes it possible to use it as a method reference in the Stream.map() method.

Cancelling a CompletableFuture

We can cancel a computation via the cancel(boolean mayInterruptIfRunning) method from the Future interface. For CompletableFuture, the boolean parameter is not used because the implementation does not employ interrupts to do the cancellation.

Instead, cancel() is equivalent to completeExceptionally(new CancellationException()).

Error handling

Throwing errors

To complete the future with an exception, use the completeExceptionally().

public CompletableFuture<Object> calculateNameAsync_throwException(String name) {
        CompletableFuture<Object> completableFuture = new CompletableFuture<>();

        if (StringUtils.isEmpty(name)) {
            completableFuture.completeExceptionally(new RuntimeException("Calculation failed!"));
        } else {
            completableFuture = CompletableFuture.supplyAsync(() -> {
                return "Hello, " + name + "!";
            });
        }

        return completableFuture;
}

Catching and Handling errors

For error handling in a chain of asynchronous computation steps, we have to adapt the throw/catch idiom in a similar fashion.

Instead of catching an exception in a syntactic block, the CompletableFuture class allows us to handle it in a special handle method. It is called whether or not an exception occurs. This method receives two parameters: a result of a computation (if it finished successfully), and the exception thrown (if some computation step did not complete normally). If an exception occurs, then the res argument will be null, otherwise, the ex argument will be null.

Use the handle method to provide a default value when the asynchronous computation does not finish because of an error.

public CompletableFuture<String> calculateNameAsync_handleExceptionWithADefaultValue(String name) {
        CompletableFuture<String> completableFuture
                = CompletableFuture.supplyAsync(() -> {
            if (StringUtils.isEmpty(name)) {
                throw new RuntimeException("Computation error!");
            }
            return "Hello, " + name + "!";
        }).handle((s, t) -> s != null ? s : "Hello, Stranger!");

        return completableFuture;
}

Wrapping existing tasks/functionality with CompletableFuture… supplyAsync and runAsync

This is the alternative to the manual creation of ComputableFutures we have seen above.

This is done using supplyAsync and runAsync methods.

What if we want to simply execute some code asynchronously?

Static methods runAsync and supplyAsync allow us to create a CompletableFuture instance out of Runnable and Supplier functional types correspondingly.

Both Runnable and Supplier are functional interfaces that allow passing their instances as lambda expressions.

The Runnable interface is the same old interface that is used in threads and it does not allow to return a value. It is useful for tasks that don’t return anything.

The Supplier interface is a generic functional interface with a single method that has no arguments and returns a value of a parameterized type. It is useful if you want to return some result from your background task

This allows us to provide an instance of the Supplier as a lambda expression that does the calculation and returns the result.

Difference between supplyAsync and runAsync

  1. Supplier
  2. https://www.baeldung.com/java-completablefuture-runasync-supplyasync

Attaching callbacks to CompletableFutures

The CompletableFuture.get() method is blocking. It waits until the Future is completed and returns the result after its completion. But, that is not what we want. For building asynchronous systems we should be able to attach a callback to the CompletableFuture which should automatically get called when the Future completes. That way, we won’t need to wait for the result, and we can write the logic that needs to be executed after the completion of the Future inside our callback function. You can attach a callback to the CompletableFuture using thenApply(), thenAccept() and thenRun() methods

Transforming and acting on one (or a sequence of) CompletableFuture… thenApply

CompletableFuture is superior to Future but you haven’t yet seen why? Simply put, it’s because CompletableFuture is a monad and a functor.

Not helping I guess? Both Scala and JavaScript allow registering asynchronous callbacks when future is completed. We don’t have to wait and block until it’s ready. We can simply say: run this function on a result, when it arrives. Moreover, we can stack such functions, combine multiple futures together, etc. For example, if we have a function from String to Integer we can turn CompletableFuture<String> to CompletableFuture<Integer without unwrapping it. This is achieved with thenApply() family of methods:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

Note the behavioral keywords in thenApply()

  1. then means that the action of this stage happens when the current stage completes normally (without an exception).
  2. Apply means the returned stage will apply a Function on the result of the previous stage.

How it works:

  1. The thenApply() method accepts a CompletableFunction instance, uses it to process the result, and returns a Future that holds a value returned by a function:
  2. Use thenApply() method to process and transform the result of a CompletableFuture when it arrives.
  3. It takes a Function<T,R> as an argument.
  4. Function<T,R> is a simple functional interface representing a function that accepts an argument of type T and produces a result of type R.

We can also write a sequence of transformations on the CompletableFuture by attaching a series of thenApply() callback methods.

CompletableFuture<String> f1 = //...
CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt);
CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);

The result of one thenApply() method is passed to the next in the series.

We can do it in one statement:

CompletableFuture<Double> f3 = f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

We can do a sequence of transformations using thenApply(). These transformations are neither executed immediately, nor blocking. They are simply remembered. When original f1 completes, the next Function is executed. If some of the transformations are time-consuming, you can supply your own Executor to run them asynchronously.

This operation is equivalent to monadic map in Scala.

Running code on completion… thenAccept/thenRun

  1. These methods are consumers.
  2. These methods are often used as the last callback in the callback chain.
    1. If you don’t want to return anything from your callback function and just want to run some piece of code after the completion of the Future, then you can use thenAccept() and thenRun() methods.
    2. If we don’t need to return a value down the Future chain, we can use an instance of the Consumer functional interface. Its single method takes a parameter and returns void.
  3. They allow us to consume a future value when it’s ready.
  4. While thenAccept() provides the final value, thenRun() executes a Runnable which doesn’t even have access to computed value.
  5. If we neither need the value of the computation, nor want to return some value at the end of the chain, then we can pass a Runnable lambda to the thenRun() method.
    1. While thenAccept() has access to the result of the CompletableFuture on which it is attached, thenRun() doesn’t even have access to the Future’s result.
    2. It takes a Runnable and returns CompletableFuture<Void>

Combining multiple CompletableFutures together

Asynchronous processing of one CompletableFuture is nice but it really shows its power when multiple such futures are combined together in various ways.

chaining two/more futures sequentially.. thenCompose()

Sometimes you want to run some function on future’s value (when it’s ready). But this function returns future as well. CompletableFuture is smart enough to understand that the result of our function should now be used as top-level future, as opposed to CompletableFuture<CompletableFuture<T>>.

We can combine CompletableFuture instances in a chain of computation steps. The result of this chaining is itself a CompletableFuture that allows further chaining and combining. This approach is ubiquitous in functional languages and is often referred to as a monadic design pattern.

The thenCompose method, together with thenApply, implement basic building blocks of the monadic pattern. They closely relate to the map and flatMap methods of Stream and Optional classes that are available in Java 8.

Both methods receive a function and apply it to the computation result, but the thenCompose (flatMap) method receives a function that returns another object of the same type.

This functional structure allows composing the instances of these classes as building blocks.

thenCompose() is an essential method that allows building robust, asynchronous pipelines, without blocking or waiting for intermediate steps.

We use the thenCompose() to chain two/more Futures sequentially. This method takes a function that returns a CompletableFuture instance. The argument of this function is the result of the previous computation step. This allows us to use this value inside the next CompletableFuture‘s lambda.

Transforming values of two futures that run in Parallel… thenCombine()

While thenCompose() is used to chain one future dependent on the other, thenCombine combines two independent futures when they are both done.

If we want to execute two independent Futures and do something with their results after both are complete, we can use the thenCombine method that accepts a Future and a Function with two arguments to process both results.

Imagine you have two CompletableFutures, one that loads Customer and other that loads nearest Shop. They are completely independent from each other, but when both of them are completed, you want to use their values to calculate Route. Here is a stripped example:

CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture = customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));

private Route findRoute(Customer customer, Shop shop) {
  //...
}

Waiting for both CompletableFutures to complete… thenAcceptBoth()

If instead of producing new CompletableFuture combining both results we simply want to be notified when they finish, we can use thenAcceptBoth()/runAfterBoth() family of methods. They work similarly to thenAccept() and thenRun() but wait for two futures instead of one.

Imagine that in the example above, instead of producing new CompletableFuture<Route> you simply want send some event or refresh GUI immediately. This can be easily achieved with thenAcceptBoth():

customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
    final Route route = findRoute(cust, shop);
    //refresh GUI with route
});

Maybe, some of you are asking themselves a question: why can’t I simply block on these two futures? Like here:

Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());

Well, of course you can. But the whole point of CompletableFuture is to allow asynchronous, event driven programming model instead of blocking and eagerly waiting for result. So functionally two code snippets above are equivalent, but the latter unnecessarily occupies one thread of execution.

Waiting for first CompletableFuture to complete… acceptEither, runAfterEither

This can come handy when you have two tasks yielding result of the same type and you only care about response time, not which task resulted first.

As an example say you have two systems you integrate with. One has smaller average response times but high standard deviation. Other one is slower in general, but more predictable. In order to take best of both worlds (performance and predictability) you call both systems at the same time and wait for the first one to complete. Normally it will be the first one, but in case it became slow, second one finishes in an acceptable time:

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
    System.out.println("Result: " + s);
});

s represents String reply either from fetchFast() or from fetchPredictably(). We neither know nor care.

Transforming first completed… applyToEither

While acceptEither() simply calls some piece of code when faster of two futures complete, applyToEither() will return a new future. This future will complete when first of the two underlying futures complete.

<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)

The extra fn function is invoked on the result of first future that completed.

Doubt: What is the difference between this and

fast.applyToEither(predictable).thenApply(fn)
CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone = fast.applyToEither(predictable, Function.<String>identity());

firstDone future can then be passed around. Notice that from the client perspective the fact that two futures are actually behind firstDone is hidden. Client simply waits for future to complete and applyToEither() takes care of notifying the client when any of the two finish first.

Scale to arbitrary number of futures

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf() takes an array of futures and returns a future that completes when all of the underlying futures are completed (barrier waiting for all). anyOf() on the other hand will wait only for the fastest of the underlying futures.

Running Multiple Futures in Parallel

When we need to execute multiple Futures in parallel, we usually want to wait for all of them to execute and then process their combined results. The CompletableFuture.allOf static method allows to wait for completion of all of the Futures provided as a var-arg. It is used in scenarios when you have a List of independent futures that you want to run in parallel and do something after all of them are complete.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");

CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);

combinedFuture.get();

assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

The limitation of this method is that it does not return the combined results of all Futures. Instead, we have to get results from Futures manually. Fortunately, CompletableFuture.join() method and Java 8 Streams API makes it simple:

String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));

assertEquals("Hello Beautiful World", combined);

anyOf()

  1. CompletableFuture.anyOf() as the name suggests, returns a new CompletableFuture which is completed when any of the given CompletableFutures complete, with the same result.
  2. CompletableFuture.anyOf() takes a var-args of Futures and returns CompletableFuture<Object>.
  3. The problem with CompletableFuture.anyOf() is that if you have CompletableFutures that return results of different types, then you won’t know the type of your final CompletableFuture.

Difference Between thenApply() and thenCompose()

Both of these help chain different CompletableFuture calls, but the usage is different.

So if the idea is to chain CompletableFuture methods, then it’s better to use thenCompose().

The difference between these two methods is analogous to the difference between map() and flatMap().

  1. https://www.baeldung.com/java-completablefuture#Combining-1
  2. https://stackoverflow.com/questions/43019126/completablefuture-thenapply-vs-thencompose

thenApply()

We can use this method to work with the result of the previous call. However, a key point to remember is that the return type will be combined of all calls.

So this method is useful when we want to transform the result of a CompletableFuture call.

import java.util.concurrent.*;
import java.util.function.Function;

public class AttachingACallbackToACompletableFutureUsingThenApply {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        AttachingACallbackToACompletableFutureUsingThenApply classUnderTest = new AttachingACallbackToACompletableFutureUsingThenApply();

        System.out.println(classUnderTest.getHelloCompletableFuture().thenApply(appendWithWorld()).get()); // Hello World
    }

    private CompletableFuture<String> getHelloCompletableFuture() {
        return CompletableFuture.supplyAsync(() -> "Hello");
    }

    private static Function<String, String> appendWithWorld() {
        return s -> s + " World";
    }
}

thenCompose()

The thenCompose() is similar to thenApply() in that both return a new CompletionStage. However, thenCompose() uses the previous stage as the argument. It will flatten and return a Future with the result directly, rather than a nested future as we observed in thenApply():

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;

public class UsingThenCompose {

    public static void main(String[] args) throws InterruptedException, ExecutionException {

        UsingThenCompose classUnderTest = new UsingThenCompose();

        System.out.println(classUnderTest.calculateHelloWorldAsync_Sequential(classUnderTest.getHelloCompletableFuture()).get());
    }

    private CompletableFuture<String> getHelloCompletableFuture() {
        return CompletableFuture.supplyAsync(() -> "Hello");
    }

    public CompletableFuture<String> calculateHelloWorldAsync_Sequential (CompletableFuture<String> helloCompletableFuture) {
        return helloCompletableFuture.thenCompose(getStringCompletionStageFunction());
    }

    private static Function<String, CompletionStage<String>> getStringCompletionStageFunction() {
        return s -> CompletableFuture.supplyAsync(() -> s + " World");
    }
}

CompletableFuture Async methods

Tags

  1. Java Future vs CompletableFuture