Showing posts with label promise. Show all posts
Showing posts with label promise. Show all posts

06 February 2017

CompletableFuture.allOf that doens't return Void

The following is an implementation for CompletableFuture.allOf does return the result of executing all the CompletableFuture, instead of void.

I take it from here, actually many other places provide this implementation.

What I want to add is a test case and talking a little bit about how its working:
1
2
3
4
5
6
static CompletableFuture<List<?>> allOf(CompletableFuture<?>... cfs) {
    return CompletableFuture.allOf(cfs)
          .thenApply(ignore -> Stream.of(cfs)
                .map(cf -> cf.join())
                .collect(Collectors.toList()));
}
The idea is:

At line 2; we use the JDK method allOf to execute all CompletableFutures passed in. (clear enough, otherwise see the javadoc)
At line 3; we use thenApply, on the result CompletableFuture that is (from javadoc) :
"returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function"
Which means, when the allOf finish execution, the supplier parameter of thenApply will start execution.
Note, the parameter of the supplier is variable ignored, because we don't need it as it is of type Void.

At line 4, 5: what we need to do is to iterate over the completed CompletableFutures (that is granteed to be completed, as I said above) and get the result and aggregate them in a list, so the join here should not wait, as the result is already there.

Now we have a CompletableFuture of list of aggregated results that we can use. (see the test case below)

Test case:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {

 // we have 3 (or any number) of CompletableFutures
 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {sleep(1); return "HELLO";});
 CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {sleep(1); return 10;});
 CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {sleep(1); return 20d;});

 CompletableFuture<List<?>> allOf = allOf(cf1, cf2, cf3); //we call the method we just created above

 // we can get the -already there - result either using then
 // Get result using:
 allOf.thenAccept(l -> l.forEach(System.out::println));
 
 // or using CompletableFuture.join() (or CompletableFuture.get())
 // OR (non-typesafe)
 String s = (String) allOf.join().get(0);
 Integer i = (Integer) allOf.join().get(1);
 Double d = (Double) allOf.join().get(2);
 System.out.println(s + ", " + i + ", "+ d);
   
 sleep(2); // because default CompletableFuture Executor is a daemon-thread based executor
}

private static void sleep(int seconds) {
 try {
  TimeUnit.SECONDS.sleep(seconds);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}

 
Thanks.

09 January 2017

CompletableFuture cheat sheet

I've divided CompletableFuture methods into groups for me easy to remember

Beside the static factory methods that creates a Completable Future instance, here are the following groups

methodparameter expressionsimilar to
// basic
run () -> {}
accept x -> {}
apply x -> y map
compose x -> f2<y>flatMap
// both
run x, f2<?>, () -> {}
accept x, f2<y>, x, y -> {}
combine x, f2<y>, x, y -> z
// either
run x, f2<?>, () -> {}
accept x, f2<x>, (x) -> {}
apply x, f2<x>, (x) -> y
// exceptions
exceptionally x, (ex) -> x
whenComplete x, (x, ex) -> {}
handle x, (x, ex) -> y

We have 4 groups here, basic, both, either and exceptions, and each method of the above can have 3 versions (the main function and one that run async and third one that run async with custom user-provided Executor).

Preface

In functional programming, we have 4 kinds of functional interfaces ( that represents mainly the functions)

void fn ()         => in java called Runnable (implementations of java.lang.Runnable)
void fu (T t)    => in java called Consumers (implementations of java.util.function.Consumer)
U fn (T t)        => in java called Functions (implementations of java.util.function.Function)
U fn()              => in java called Suppliers (implementations of java.util.function.Supplier)

The last type of functions (Suppliers) sometimes are represented by java.util.concurrent.Callable, but Callable returns checked Exception, so Suppliers are more suitable.

In completable future, user can stream over the result and apply different kind of operations on the result (typically as if you use java.util.Stream, and this I see the completable future should have some relation to the Stream interface, I write a little more about this here), and this operation takes a version of the first 3 versions of the functional interfaces above.

Basic Group

The first group, are basic methods, in which the parameter will run on the output object of the completable future instance, for example, apply method (thenApply) with the following signature:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

This method applies the function in the parameter to the result of type T and return a new object of type U, (hence it is typical mapping function)

Note, the thenApply function has three version, the basic one that run on the same thread as the completable future that has just completes, and 2 version one Async and the other is Async with Executor .

Similar to thenApply are thenRun (3 versions as well) and thenAccept (3 version as well), but thenRun parameter is Runnable which means it expects no input (as result of the completable execution) and return no output.
thenAccept, expects the completable future result as input but returns no output

The last method in this group is thenCompose, which is more or less a flatMap function, which takes the result of the completable future as input and return a CompletableFuture of other type (3 versions as well of this method:

public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

Note, CompletableFuture  implements CompletableFuture.

A good use case of thenCompose is when we have a mapping function that will return a CompletableFuture and if we use the regular map function (thenApply), the output will be CompletableFuture<CompletableFuture<U>>. (see the resources for an example).

In this group we could see 4 operations that utilize the first 3 types of functional interfaces, and other groups will keep the same way.

Both Group

The basic group contains the main idea of the whole methods of the CompletableFuture, if you understand it, it will be easy to understand other groups.

Both group is all about, execute the current completable future and then anther completable future (comes as a first parameter to the method), and then the result do with 1 one of 3 things:

Ignore the result and return no output (run)  {method runAfterBoth with 3 versions}
Take the result and return no output (accept) {method thenAcceptBoth with 3 versions}
Take the result and return a new output of different type (combine) (similar to apply in basic group) {method thenCombine with 3 versions}

for example here's the syntax of the thenCombine method:

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)


the method combine the result of the current CompletableFuture with the result of the other CompletableFuture and send them to the function that takes T (type of current compeletable future), U (type of other completable future) and return new type V.

example usage for clarification:

CompletableFuture<String> current = .....
CompletableFuture<Integer> other = .....

current.thenCombine(other, (String s, Integer i) ->  0.99f);

Either Group

Either group is pretty much like the both group, but one of them once executed, the function will be called.

The thing to note here, the combine function name is apply, so why?

apply executed on one of them, combine takes the 2 result and return a new result of new type.

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)

So, current Completable Future and other completable future should be of same generic type. (T)

Exceptions Group

Exception group are operations that except the completable future will return an exception and will deal with it.

exceptionally: will register function what would happen if exception thrown (to return some value)
whenComplete: will register a consumer of result (might be null) or exception (might be null) (mutual exclusive)
handle: same as whenComplete but register a function instead of a consumer. (to return some value of new type)

simple example on handle:

CompletableFuture.supplyAsync(() -> 10).handle((x, ex)-> "hello" );

Note, although whenComplete takes a consumer function, it returns either the result or the exception thrown.


resources:

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html
http://www.nurkiewicz.com/2013/05/java-8-definitive-guide-to.html