Showing posts with label threads. Show all posts
Showing posts with label threads. Show all posts

06 February 2017

CompletableFuture.allOf that doens't return Void

The following is an implementation for CompletableFuture.allOf does return the result of executing all the CompletableFuture, instead of void.

I take it from here, actually many other places provide this implementation.

What I want to add is a test case and talking a little bit about how its working:
1
2
3
4
5
6
static CompletableFuture<List<?>> allOf(CompletableFuture<?>... cfs) {
    return CompletableFuture.allOf(cfs)
          .thenApply(ignore -> Stream.of(cfs)
                .map(cf -> cf.join())
                .collect(Collectors.toList()));
}
The idea is:

At line 2; we use the JDK method allOf to execute all CompletableFutures passed in. (clear enough, otherwise see the javadoc)
At line 3; we use thenApply, on the result CompletableFuture that is (from javadoc) :
"returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function"
Which means, when the allOf finish execution, the supplier parameter of thenApply will start execution.
Note, the parameter of the supplier is variable ignored, because we don't need it as it is of type Void.

At line 4, 5: what we need to do is to iterate over the completed CompletableFutures (that is granteed to be completed, as I said above) and get the result and aggregate them in a list, so the join here should not wait, as the result is already there.

Now we have a CompletableFuture of list of aggregated results that we can use. (see the test case below)

Test case:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {

 // we have 3 (or any number) of CompletableFutures
 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {sleep(1); return "HELLO";});
 CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {sleep(1); return 10;});
 CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {sleep(1); return 20d;});

 CompletableFuture<List<?>> allOf = allOf(cf1, cf2, cf3); //we call the method we just created above

 // we can get the -already there - result either using then
 // Get result using:
 allOf.thenAccept(l -> l.forEach(System.out::println));
 
 // or using CompletableFuture.join() (or CompletableFuture.get())
 // OR (non-typesafe)
 String s = (String) allOf.join().get(0);
 Integer i = (Integer) allOf.join().get(1);
 Double d = (Double) allOf.join().get(2);
 System.out.println(s + ", " + i + ", "+ d);
   
 sleep(2); // because default CompletableFuture Executor is a daemon-thread based executor
}

private static void sleep(int seconds) {
 try {
  TimeUnit.SECONDS.sleep(seconds);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}

 
Thanks.

31 August 2016

callback on thread completion

The code illustrate how callback is being accomplished on Thread completion:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Executor {

  public <T> void execute(Callable<T> task, BiConsumer<T, ? super Exception> callback) {
     CallableThread<T> thread = new CallableThread<>(task, callback);
     thread.start();
  }

  class CallableThread<T> extends Thread {

     Callable<T> task;
     BiConsumer<T, ? super Exception> callback;

     public CallableThread(Callable<T> task, BiConsumer<T, ? super Exception> callback) {
        this.task = task;
        this.callback = callback;
     }

     public void run() {
        System.out.println("running task on thread : " + Thread.currentThread().getName());
        try {
            T t = task.call();
            callback.accept(t, null);
        } catch (Exception ex) {
            callback.accept(null, ex);
        }
     }
   }
 }

Caller:
1
2
3
4
5
6
7
8
System.out.println("running task on thread : " + Thread.currentThread().getName());

new Executor().execute(() -> "HELLO WORKD", (result, ex) -> {
    System.out.println("result: " + result);
    System.out.println("exception: " + ex);
});

System.out.println("finished running task on thread : " + Thread.currentThread().getName());


Output:
1
2
3
4
5
running task on thread : main
finished running task on thread : main
running task on thread : Thread-0
result: HELLO WORKD
exception: null

It appears that, the Executor is submitting the Callable to execution and on complete it invokes the callback functional interface.


31 January 2012

Servlets and thread safty

From Servlet 2.4 specs:

SRV.2.3.3.3
Thread Safety
Implementations of the request and response objects are not guaranteed to be thread
safe. This means that they should only be used within the scope of the request han-
dling thread.
References to the request and response objects should not be given to objects
executing in other threads as the resulting behavior may be nondeterministic. If
the thread created by the application uses the container-managed objects, such as
the request or response object, those objects must be accessed only within the
servlet’s service life cycle and such thread itself should have a life cycle within
the life cycle of the servlet’s service method because accessing those objects
after the service method ends may cause undeterministic problems. Be aware
that the request and response objects are not thread safe. If those objects were
accessed in the multiple threads, the access should be synchronized or be done
through the wrapper to add the thread safety, for instance, synchronizing the call
of the methods to access the request attribute, or using a local output stream for
the response object within a thread.