06 February 2017

CompletableFuture.allOf that doens't return Void

The following is an implementation for CompletableFuture.allOf does return the result of executing all the CompletableFuture, instead of void.

I take it from here, actually many other places provide this implementation.

What I want to add is a test case and talking a little bit about how its working:
1
2
3
4
5
6
static CompletableFuture<List<?>> allOf(CompletableFuture<?>... cfs) {
    return CompletableFuture.allOf(cfs)
          .thenApply(ignore -> Stream.of(cfs)
                .map(cf -> cf.join())
                .collect(Collectors.toList()));
}
The idea is:

At line 2; we use the JDK method allOf to execute all CompletableFutures passed in. (clear enough, otherwise see the javadoc)
At line 3; we use thenApply, on the result CompletableFuture that is (from javadoc) :
"returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function"
Which means, when the allOf finish execution, the supplier parameter of thenApply will start execution.
Note, the parameter of the supplier is variable ignored, because we don't need it as it is of type Void.

At line 4, 5: what we need to do is to iterate over the completed CompletableFutures (that is granteed to be completed, as I said above) and get the result and aggregate them in a list, so the join here should not wait, as the result is already there.

Now we have a CompletableFuture of list of aggregated results that we can use. (see the test case below)

Test case:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {

 // we have 3 (or any number) of CompletableFutures
 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {sleep(1); return "HELLO";});
 CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {sleep(1); return 10;});
 CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {sleep(1); return 20d;});

 CompletableFuture<List<?>> allOf = allOf(cf1, cf2, cf3); //we call the method we just created above

 // we can get the -already there - result either using then
 // Get result using:
 allOf.thenAccept(l -> l.forEach(System.out::println));
 
 // or using CompletableFuture.join() (or CompletableFuture.get())
 // OR (non-typesafe)
 String s = (String) allOf.join().get(0);
 Integer i = (Integer) allOf.join().get(1);
 Double d = (Double) allOf.join().get(2);
 System.out.println(s + ", " + i + ", "+ d);
   
 sleep(2); // because default CompletableFuture Executor is a daemon-thread based executor
}

private static void sleep(int seconds) {
 try {
  TimeUnit.SECONDS.sleep(seconds);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}

 
Thanks.

No comments: