Here we will discuss about CompletableFuture in Java programming language. Using CompletableFuture API we can complete the tasks in an ad hoc manner.

A Future represents the pending result of an asynchronous computation. It offers a method — get — that returns the result of the computation when it’s done.

The problem is that a call to get is blocking until the computation is done. This is quite restrictive and can quickly make the asynchronous computation pointless.

CompletableFuture<T> extends Future<T> and makes it completable in an ad hoc manner. This is a big deal, considering that Future objects were limited before Java 8.

This new and improved CompletableFuture has mainly two benefits:

  • It can be explicitly completed by calling the complete() method without any synchronous wait. It allows values of any type to be available in the future with default return values, even if the computation did not complete, using default/intermediate results.
  • It also allows you to build a pipeline data process in a series of actions. You can find a number of patterns for CompletableFutures such as creating a CompletableFuture from a task, or building a CompletableFuture chain.

CompletableFuture implements Future and CompletionStage interfaces.

A CompletionStage is a promise that ensures the computation will eventually be done and the great thing about the CompletionStage is, it offers a vast selection of methods that let you attach callbacks that will be executed on completion. This way we can build systems in a non-blocking fashion.

CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::createRandomNumber);

supplyAsync() takes a Supplier containing the code we want to execute asynchronously — in our case the createRandomNumber() or createId() method.

If you had worked with Future then you may surprise where the Executor gone. If you want, you can still pass Executor as a second argument to the supplyAsync() method. However, if you do not pass any Executor as a second argument then it will be submitted to the default ForkJoinPool.commonPool().

CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::createId).thenApply(this::convert)
                .thenAccept(this::store);

thenAccept() is one of many ways to add a callback. It takes a Consumer — in our case a method called store — which handles the result of the preceding computation when it is done but it does not return any value.

If you want to continue passing values from one callback to another and also if you want to return value and pass that to next callback then you can use thenApply() callback. In our example, thenApply() takes an argument a method called convert.

In the example above, everything will be executed on the same thread. This results in the last message waiting for the first message to complete.

Now look at the below example each message is submitted as a separate task to the ForkJoinPool.commonPool(). This results in both the sendURL() callbacks being executed when the preceding calculation(findURL) is done.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(this::findURL);
//execute the below task on separate thread on completion of previous task cf
CompletableFuture<String> resp1 = cf.thenApplyAsync(this::sendURL);
//execute the below task on separate thread on completion of previous task cf
CompletableFuture<String> resp2 = cf.thenApplyAsync(this::sendURL);
cf.thenAccept(System.out::println);
resp1.thenAccept(System.out::println);
resp2.thenAccept(System.out::println);

The key is — the asynchronous version can be convenient when you have several callbacks dependent on the same computation.

As we know sometimes we may face things go wrong as we have already faced working with Future but CompletableFuture has a feature to handle such situation in a nice way, using execptionally().

exceptionally() gives us a chance to recover by taking an alternative function that will be executed if preceding calculation fails with an exception. This way succeeding callbacks can continue with the alternative result as input.

CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::failureMsg).exceptionally(ex -> notify(ex))
                .thenAccept(this::notify);
cf.get();

Sometimes you need to create a callback that is dependent on the result of two computations. This is where thenCombine() comes into the picture.

CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);

First, we have started two asynchronous jobs — creating random number and finding a URL. Then we use thenCombine to do with the result of these two computations by defining our method notify.

We have covered the scenario where you were dependent on two computations for further process. But, what about when you just need the result of one of them?

CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);

The complete example of CompletableFuture is given below:

package com.roytuts.java.completablefuture;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class CompletableFutureApi {

	public static void main(String[] args) throws InterruptedException, ExecutionException {
		CompletableFutureApi example = new CompletableFutureApi();

		// chain example
		example.exampleOp();

		// explicitly complete
		example.completeOp();

		// return known value
		example.knownValueOp();

		// separate task using the async suffix
		example.separateTasksOp();

		// exceptionally
		// example.exceptionallyOp();
		// thenCombine
		example.combineOp();

		// acceptEither
		example.acceptEitherOp();
	}

	public void exampleOp() throws InterruptedException, ExecutionException {
		// block and wait for the result
		// CompletableFuture allows you to build pipeline executed
		// asynchronously within the ForkJoinPool
		CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::createId).thenApply(this::convert)
				.thenAccept(this::store);
		System.out.println("null : " + cf.get());
	}

	public void completeOp() throws InterruptedException, ExecutionException {
		CompletableFuture<Double> cf = CompletableFuture.supplyAsync(this::createRandomNumber);
		// if you want to control concurrency using ExecutorService, i.e., if
		// you do not want to submit the task to default
		// ForkJoinPool.commonPool(), then pass es as second argument to the
		// supplyAsync method
		/*
		 * ExecutorService es = Executors.newFixedThreadPool(2);
		 * CompletableFuture<Double> cf =
		 * CompletableFuture.supplyAsync(this::createRandomNumber, es);
		 */
		// explicitly complete and return default value if you do not want to
		// wait for the task to complete
		cf.complete(434345.8765);
		System.out.println("Random Number : " + cf.get());
	}

	public void knownValueOp() throws InterruptedException, ExecutionException {
		// create a completed CompletableFuture in advance that returns a known
		// value
		// this might come in handy in testing environment, in case you will
		// want to combine that known value with one that needs to be computed
		CompletableFuture<String> cf = CompletableFuture.completedFuture("I'm done");
		cf.isDone(); // return true
		cf.join(); // return "I'm done"
		System.out.println("Known value : " + cf.get());
	}

	public void separateTasksOp() throws InterruptedException, ExecutionException {
		CompletableFuture<String> cf = CompletableFuture.supplyAsync(this::findURL);
		// execute the below task on separate thread on completion of previous
		// task cf
		CompletableFuture<String> resp1 = cf.thenApplyAsync(this::sendURL);
		// execute the below task on separate thread on completion of previous
		// task cf
		CompletableFuture<String> resp2 = cf.thenApplyAsync(this::sendURL);
		cf.thenAccept(System.out::println);
		// cf.thenAcceptAsync(System.out::println);
		resp1.thenAccept(System.out::println);
		// resp1.thenAcceptAsync(System.out::println);
		resp2.thenAccept(System.out::println);
		// resp2.thenAcceptAsync(System.out::println);
	}

	public void exceptionallyOp() throws InterruptedException, ExecutionException {
		CompletableFuture<?> cf = CompletableFuture.supplyAsync(this::failureMsg).exceptionally(ex -> notify(ex))
				.thenAccept(this::notify);
		cf.get();
	}

	public void combineOp() throws InterruptedException, ExecutionException {
		// what we want to do with the result of these two computations
		CompletableFuture<Double> cfNum = CompletableFuture.supplyAsync(this::createRandomNumber);
		CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
		CompletableFuture<String> resp = cfNum.thenCombine(cfUrl, this::notify);
		System.out.println("Combine : " + resp.get());
	}

	public void acceptEitherOp() throws InterruptedException, ExecutionException {
		// what we want to do with the result of any of the two computations
		CompletableFuture<String> cfName = CompletableFuture.supplyAsync(this::findName);
		CompletableFuture<String> cfUrl = CompletableFuture.supplyAsync(this::findURL);
		cfName.acceptEither(cfUrl, this::sendMsg);
	}

	private Double createRandomNumber() {
		return Math.random();
	}

	private UUID createId() {
		return UUID.randomUUID();
	}

	private String convert(UUID input) {
		return input.toString();
	}

	private String findURL() {
		return "roytuts.com";
	}

	private String findName() {
		return "Roy Tutorials";
	}

	private String sendURL(String url) {
		return "Sending " + url + " to destination";
	}

	private String failureMsg() {
		throw new RuntimeException("Failured due to Exception");
	}

	private String notify(Throwable t) {
		throw new RuntimeException(t.getMessage());
	}

	private void notify(String msg) {
		System.out.println("The message : " + msg);
	}

	private void sendMsg(String msg) {
		System.out.println("The message : " + msg);
	}

	private String notify(Double num, String msg) {
		return msg + "," + num;
	}

	private void store(String message) {
		System.out.println("message : " + message);
	}

}

The output of the above code is given below:

message : 2ac0786b-8768-4368-b9cb-a2c3e407036b
null : null
Random Number : 434345.8765
Known value : I'm done
roytuts.com
Sending roytuts.com to destination
Sending roytuts.com to destination
Combine : roytuts.com,0.8346270693364803
The message : Roy Tutorials

Source Code

Download

Thanks for reading.

Tags:

Leave a Reply

Your email address will not be published. Required fields are marked *