Showing posts with label streams. Show all posts
Showing posts with label streams. Show all posts

06 February 2017

CompletableFuture.allOf that doens't return Void

The following is an implementation for CompletableFuture.allOf does return the result of executing all the CompletableFuture, instead of void.

I take it from here, actually many other places provide this implementation.

What I want to add is a test case and talking a little bit about how its working:
1
2
3
4
5
6
static CompletableFuture<List<?>> allOf(CompletableFuture<?>... cfs) {
    return CompletableFuture.allOf(cfs)
          .thenApply(ignore -> Stream.of(cfs)
                .map(cf -> cf.join())
                .collect(Collectors.toList()));
}
The idea is:

At line 2; we use the JDK method allOf to execute all CompletableFutures passed in. (clear enough, otherwise see the javadoc)
At line 3; we use thenApply, on the result CompletableFuture that is (from javadoc) :
"returns a new CompletionStage that, when this stage completes normally, is executed with this stage's result as the argument to the supplied function"
Which means, when the allOf finish execution, the supplier parameter of thenApply will start execution.
Note, the parameter of the supplier is variable ignored, because we don't need it as it is of type Void.

At line 4, 5: what we need to do is to iterate over the completed CompletableFutures (that is granteed to be completed, as I said above) and get the result and aggregate them in a list, so the join here should not wait, as the result is already there.

Now we have a CompletableFuture of list of aggregated results that we can use. (see the test case below)

Test case:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static void main(String[] args) {

 // we have 3 (or any number) of CompletableFutures
 CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {sleep(1); return "HELLO";});
 CompletableFuture<Integer> cf2 = CompletableFuture.supplyAsync(() -> {sleep(1); return 10;});
 CompletableFuture<Double> cf3 = CompletableFuture.supplyAsync(() -> {sleep(1); return 20d;});

 CompletableFuture<List<?>> allOf = allOf(cf1, cf2, cf3); //we call the method we just created above

 // we can get the -already there - result either using then
 // Get result using:
 allOf.thenAccept(l -> l.forEach(System.out::println));
 
 // or using CompletableFuture.join() (or CompletableFuture.get())
 // OR (non-typesafe)
 String s = (String) allOf.join().get(0);
 Integer i = (Integer) allOf.join().get(1);
 Double d = (Double) allOf.join().get(2);
 System.out.println(s + ", " + i + ", "+ d);
   
 sleep(2); // because default CompletableFuture Executor is a daemon-thread based executor
}

private static void sleep(int seconds) {
 try {
  TimeUnit.SECONDS.sleep(seconds);
 } catch (InterruptedException e) {
  e.printStackTrace();
 }
}

 
Thanks.

09 January 2017

CompletableFuture cheat sheet

I've divided CompletableFuture methods into groups for me easy to remember

Beside the static factory methods that creates a Completable Future instance, here are the following groups

methodparameter expressionsimilar to
// basic
run () -> {}
accept x -> {}
apply x -> y map
compose x -> f2<y>flatMap
// both
run x, f2<?>, () -> {}
accept x, f2<y>, x, y -> {}
combine x, f2<y>, x, y -> z
// either
run x, f2<?>, () -> {}
accept x, f2<x>, (x) -> {}
apply x, f2<x>, (x) -> y
// exceptions
exceptionally x, (ex) -> x
whenComplete x, (x, ex) -> {}
handle x, (x, ex) -> y

We have 4 groups here, basic, both, either and exceptions, and each method of the above can have 3 versions (the main function and one that run async and third one that run async with custom user-provided Executor).

Preface

In functional programming, we have 4 kinds of functional interfaces ( that represents mainly the functions)

void fn ()         => in java called Runnable (implementations of java.lang.Runnable)
void fu (T t)    => in java called Consumers (implementations of java.util.function.Consumer)
U fn (T t)        => in java called Functions (implementations of java.util.function.Function)
U fn()              => in java called Suppliers (implementations of java.util.function.Supplier)

The last type of functions (Suppliers) sometimes are represented by java.util.concurrent.Callable, but Callable returns checked Exception, so Suppliers are more suitable.

In completable future, user can stream over the result and apply different kind of operations on the result (typically as if you use java.util.Stream, and this I see the completable future should have some relation to the Stream interface, I write a little more about this here), and this operation takes a version of the first 3 versions of the functional interfaces above.

Basic Group

The first group, are basic methods, in which the parameter will run on the output object of the completable future instance, for example, apply method (thenApply) with the following signature:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)

This method applies the function in the parameter to the result of type T and return a new object of type U, (hence it is typical mapping function)

Note, the thenApply function has three version, the basic one that run on the same thread as the completable future that has just completes, and 2 version one Async and the other is Async with Executor .

Similar to thenApply are thenRun (3 versions as well) and thenAccept (3 version as well), but thenRun parameter is Runnable which means it expects no input (as result of the completable execution) and return no output.
thenAccept, expects the completable future result as input but returns no output

The last method in this group is thenCompose, which is more or less a flatMap function, which takes the result of the completable future as input and return a CompletableFuture of other type (3 versions as well of this method:

public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)

Note, CompletableFuture  implements CompletableFuture.

A good use case of thenCompose is when we have a mapping function that will return a CompletableFuture and if we use the regular map function (thenApply), the output will be CompletableFuture<CompletableFuture<U>>. (see the resources for an example).

In this group we could see 4 operations that utilize the first 3 types of functional interfaces, and other groups will keep the same way.

Both Group

The basic group contains the main idea of the whole methods of the CompletableFuture, if you understand it, it will be easy to understand other groups.

Both group is all about, execute the current completable future and then anther completable future (comes as a first parameter to the method), and then the result do with 1 one of 3 things:

Ignore the result and return no output (run)  {method runAfterBoth with 3 versions}
Take the result and return no output (accept) {method thenAcceptBoth with 3 versions}
Take the result and return a new output of different type (combine) (similar to apply in basic group) {method thenCombine with 3 versions}

for example here's the syntax of the thenCombine method:

<U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)


the method combine the result of the current CompletableFuture with the result of the other CompletableFuture and send them to the function that takes T (type of current compeletable future), U (type of other completable future) and return new type V.

example usage for clarification:

CompletableFuture<String> current = .....
CompletableFuture<Integer> other = .....

current.thenCombine(other, (String s, Integer i) ->  0.99f);

Either Group

Either group is pretty much like the both group, but one of them once executed, the function will be called.

The thing to note here, the combine function name is apply, so why?

apply executed on one of them, combine takes the 2 result and return a new result of new type.

public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)

So, current Completable Future and other completable future should be of same generic type. (T)

Exceptions Group

Exception group are operations that except the completable future will return an exception and will deal with it.

exceptionally: will register function what would happen if exception thrown (to return some value)
whenComplete: will register a consumer of result (might be null) or exception (might be null) (mutual exclusive)
handle: same as whenComplete but register a function instead of a consumer. (to return some value of new type)

simple example on handle:

CompletableFuture.supplyAsync(() -> 10).handle((x, ex)-> "hello" );

Note, although whenComplete takes a consumer function, it returns either the result or the exception thrown.


resources:

https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletionStage.html
http://www.nurkiewicz.com/2013/05/java-8-definitive-guide-to.html




21 April 2014

Java8 Stream api throughout the JDK

According to My first look to java8 lambda and streams, I can tell that this new features of Java8 will make our lives much fun and easier..

One thing is that, how many places in JDK I can use the funny stream-api with lambda in?

I have grepped the code (Using eclipse) to find interesting places where we can start using the stream-api.

Result of grep:

java.io.BufferedReader
java.nio.file.Files
java.util.Arrays
java.util.Collections.AsLIFOQueue
java.util.Collections.CheckedCollection
java.util.Collection
java.util.Collections
java.util.Collections.CopiesList
java.util.Collections.SetFromMap
java.util.Collections.SynchronizedCollection
java.util.Collections.UnmodifiableCollection
java.util.Collections.UnmodifiableMap.UnmodifiableEntrySet
java.util.jar.JarFile
java.util.regex.Pattern
java.util.zip.ZipFile

I can find many interesting places like the BufferedReader, Files and of course the Collection API.

I've some fun with the Stream-api with Files and BufferedReader classes ...
See inline comments for explanation


package helloJava8;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Optional;

public class LambdaExpr {

    public static void main(String[] args) throws IOException {

        String searchPath = "/bin/";
        String command = "which";

        Optional<Path> commandFound =
        // return the Stream that I gonna walkthrough
        Files.list(Paths.get(searchPath))
        // map the path to the file name, ex from /bin/which to which
                .map(p -> p.getFileName())
                // match the command name to "which"
                .filter(s -> command.equals(s.toString()))
                // find first match and end the stream
                .findFirst();

        // check the optional, BTW, Optional is very interesting API too
        if (commandFound.isPresent()) {
            // regular Java7 code
            BufferedReader reader = new BufferedReader(new FileReader(searchPath + command));
            // here the interesting part, get Stream from the Buffered Reader
            // and here we go.
            reader.lines().forEach(l -> System.out.println(l));
            reader.close();
        }
    }
}



Thanks.

19 April 2014

Java 8 features

Hello folks,

I am very glad that java 8 is now released...

It contains many features among them are ... "Lambda expressions", "java.util.stream" and "java.time" packages.

See more here (java 8 release notes) ....

Both Lambda expressions and java.util.stream will provide a very good experience and make the Java language more modern (I believe if such decision not talked, many ppl would migrate to more modern langs like Scala).

And "java.time" is very straight forward api... see it here

I've tried java8 in Eclipse kepler, but it needed some patch to support java8 syntax

The following example illustrates Lambda expressions along with java.util.stream..

package helloJava8;

import java.util.ArrayList;
import java.util.List;

public class Main {

    public static void main(String[] args) {

        List<Employee> list = new ArrayList<>();

        list.add(new Employee("Ali", 20, 2000));
        list.add(new Employee("Wael", 30, 3000));
        list.add(new Employee("Taher", 40, 1000));
        list.add(new Employee("Ibrahim", 10, 1500));

        System.out.println("Sorted:");
        list.stream().sorted((o1, o2) -> o1.getName().compareTo(o2.getName()))
                .forEach(o -> System.out.println(o));

        System.out.println("\nFiltered: (age >= 30) ");
        list.stream().filter(o -> o.getAge() >= 30).forEach(o -> System.out.println(o));

        System.out.println("\nMapping user to its salary then sort:");
        list.stream().map(o -> o.getSalary()).sorted().forEach(o -> System.out.println(o));
    }

    static class Employee {
        private String name;
        private int age;
        private int salary;

        public Employee(String name, int age, int salary) {
            super();
            this.name = name;
            this.age = age;
            this.salary = salary;
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }

        public int getSalary() {
            return salary;
        }

        public void setSalary(int salary) {
            this.salary = salary;
        }

        @Override
        public String toString() {
            return "Employee [name=" + name + ", age=" + age + ", salary=" + salary + "]";
        }
    }
}


You have to read more about lambda and streams here:
http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html
http://docs.oracle.com/javase/8/docs/api/java/util/stream/package-summary.html


That's all.

16 February 2013

Make System.out to write to Null OutputStream

Sometimes we have our app have System.out.println in many places in the app.

We can make the System.out to write to empty (Null) output stream.

At the entry point of the app, use the following code:


System.setOut(new PrintStream(new OutputStream() {
public void write(int b) throws IOException {
}
}));

13 August 2011

Read n bytes from start of a large file

SA,

How are you all doing? I hope that you are doing well...

I was just playing.... ya playing...

I was trying to install Mac OS X on my new HP computer, I had an old MAC OS X iso file and when I try to use it from inside VMWare, it keep saying .. "No Operating System found!" so I need to know why??

I've bough a new DVD Disk and tried to burn that iso image but WIN 7 Burn program told me that the iso file is not in a valid format ...

So, I need to see how this iso file looks like...
I've tried to open this huge (4+ GB) iso file in np++, but this cause the np++ to hang .. so I need to read the first n bytes from the file and see how these bytes looks like..

I am a Java programmer with little knowledge in C/C++. But I wanted to write this program in CPP...

It reads starting from a random byte to 10000 bytes from the iso file and write to some arbitrary file on the FS..

Here's the source code:


  1. #include <iostream>
  2. #include <fstream>
  3. #include <cstdlib>
  4. #include <ctime>
  5. using namespace std;
  6. 
  7. int main(void)
  8. {
  9.     ifstream f;
 10.     ofstream out;
 11.     f.open("D:\\Programs\\mac\\Mac OS X\\Mac OS X 10.4.8 [JaS AMD-Intel-SSE2-SSE3 with PPF1].iso", fstream::in);
 12.     out.open("D:\\mac.out.txt", fstream::out);
 13.     
 14.     srand(time(NULL));
 15.     int start = rand(); 
 16.     cout << "start reading fomr byte: " << start << endl;
 17.     
 18.     char c = 0;
 19.     for (int i=start; i < start + 10000; i++)
 20.     {
 21.         f >> c;
 22.         out <<c;
 23.     }
 24.        
 25.     f.close();
 26.     out.close();
 27.     
 28.     return 0;
 29. }

Hence I am not a CPP Guru, I've used cplusplus.com as API reference..