23 Java - Stream (Java8)

Stream

  • We can consider stream as a pipeline, through which our collection elements passes through.

  • While elements passes through pipelines, it perform various operations like sorting, filtering etc.

  • Useful when deals with bulk processing. (Can do parallel processing)

Architecture

Example

public class Main {
    public static void main(String[] args) {
        List<Integer> salaryList = new ArrayList<>();

        salaryList.add(3000);
        salaryList.add(4100);
        salaryList.add(9000);
        salaryList.add(1000);
        salaryList.add(3500);

        int count = 0;
        for (Integer salary : salaryList){
            if (salary > 3000){
                count++;
            }
        }

        System.out.println("Total Employee with salary > 3000: "+count);
    }
}
// Total Employee with salary > 3000: 3
public class Main {
    public static void main(String[] args) {
        List<Integer> salaryList = new ArrayList<>();

        salaryList.add(3000);
        salaryList.add(4100);
        salaryList.add(9000);
        salaryList.add(1000);
        salaryList.add(3500);

        long output = salaryList.stream().filter((Integer sal) -> sal > 3000).count();
        System.out.println("Total Employee with salary > 3000: "+output);

    }
}
//Total Employee with salary > 3000: 3

Different ways to create a Stream

1. From Collection

List<Integer> salaryList = Arrays.asList(3000, 4100, 9000, 1000, 3500);
Stream<Integer> streamFromIntegerList = salaryList.stream();

2. From Array

Integer[] salaryArray = {3000, 4100, 9000, 1000, 3500};
Stream<Integer> streamFromIntegerList = Arrays.stream(salaryArray);

3. From Static Method

Stream<Integer> streamFromStaticMethod = Stream.of(1000,3500,4000,9000);

4. From Stream Builder

Stream.Builder<Integer> streamBuilder = Stream.builder();
streamBuilder.add(1000).add(9000).add(3500);

Stream<Integer> streamFromStreamBuilder = streamBuilder.build();

5. From Stream Iterate

Stream<Integer> streamFromIterate = Stream.iterate(1000, (Integer n) -> n + 5000).limit(5);

Different Intermediate Operations

We can chain multiple intermediate operations together to perform mroe complex processing before applying terminal operation to produce the result.

1. filter(Predicate<T> predicate)

Filters the element

Stream<String> nameStream = Stream.of("HELLO", "EVERYBODY","HOW","ARE","YOU","DOING");
Stream<String> filteredStream = nameStream.filter((String name) -> name.length()<=3);

List<String> filteredNameList = filteredStream.collect(Collectors.toList());

// [HOW, ARE, YOU]

2. map(Function<T,R> mapper)

Used to transform each element

Stream<String> nameStream = Stream.of("HELLO", "EVERYBODY","HOW","ARE","YOU","DOING");
Stream<String> filteredNames = nameStream.map((String name) -> name.toLowerCase());
//[hello, everybody, how, are, you, doing]

3. flatMap(Function<T,Stream<R>> mapper)

Used to iterate over each element of the complext collection, and helps to flatten it.

List<List<String>> sentenceList = Arrays.asList(
        Arrays.asList("I","LOVE","JAVA"),
        Arrays.asList("CONCEPTS","ARE","CLEAR"),
        Arrays.asList("ITS","VERY","EASY")
);

Stream<String> wordsStream1 = sentenceList.stream().flatMap((List<String> sentence) -> sentence.stream());
System.out.println(wordsStream1.collect(Collectors.toList()));

Stream<String> wordsStream2 = sentenceList.stream().flatMap((List<String> sentence) -> sentence.stream().map((String value)-> value.toLowerCase()));
System.out.println(wordsStream2.collect(Collectors.toList()));

/*
[I, LOVE, JAVA, CONCEPTS, ARE, CLEAR, ITS, VERY, EASY]
[i, love, java, concepts, are, clear, its, very, easy]
*/

4. distinct()

Removes duplicate from the stream

Integer[] arr = {1,5,2,7,4,4,2,0,9};

Stream<Integer> arrStream = Arrays.stream(arr).distinct();
System.out.println(arrStream.collect(Collectors.toList()));

//[1, 5, 2, 7, 4, 0, 9]

5. sorted()

Sorts the elements

Integer[] arr = {1,5,2,7,4,4,2,0,9};

Stream<Integer> arrStream = Arrays.stream(arr).sorted();
System.out.println(arrStream.collect(Collectors.toList()));

//[0, 1, 2, 2, 4, 4, 5, 7, 9]

Stream<Integer> arrStream = Arrays.stream(arr).sorted((Integer val1, Integer val2) -> val2-val1);
System.out.println(arrStream.collect(Collectors.toList()));

//[9, 7, 5, 4, 4, 2, 2, 1, 0]

6. peek(Consumer<T> action)

Helps you to see the intermediate result of the stream which is getting processed.

List<Integer> numbers = Arrays.asList(2,1,3,4,6);
Stream<Integer> numberStream = numbers.stream()
        .filter((Integer val) -> val>2)
        .peek((Integer val) -> System.out.println(val)) //it will print 3,4,6
        .map((Integer val)->-1*val);

List<Integer> numberList = numberStream.collect(Collectors.toList());

7. limit(long maxSize)

Truncate the stream, to have no logner than given maxSize

List<Integer> numbers = Arrays.asList(2,1,3,4,6);
Stream<Integer> numberStream = numbers.stream()
        .limit(3);

System.out.println(numberStream.collect(Collectors.toList()));
//[2, 1, 3]

8. skip(long n)

Skip the first n elements of the stream

    List<Integer> numbers = Arrays.asList(2,1,3,4,6);
    Stream<Integer> numberStream = numbers.stream().skip(3);
    System.out.println(numberStream.collect(Collectors.toList()));
//[4, 6]

9. mapToInt(ToIntFunction<T> mapper)

Helps to work with primitive "int" data types

Example 1:

List<String> numbers = Arrays.asList("2","1","4","7");
IntStream numberStream = numbers.stream().mapToInt((String val)->Integer.parseInt(val));

int[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7

Example 2:


int[] numbersArray = {2, 1, 4, 7};
IntStream numbersStream = Arrays.stream(numbersArray);
numbersStream.filter((int val)->val>2);
int[] filteredArray = numbersStream.toArray();
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:246)
    at java.base/java.util.stream.IntPipeline.toArray(IntPipeline.java:562)
    at learn.collection.Main.main(Main.java:22)

Example 3:

int[] numbersArray = {2, 1, 4, 7};
IntStream numbersStream = Arrays.stream(numbersArray);
int[] output = numbersStream.filter((int val)->val>2).toArray();
//4 7

10. mapToLong(ToLongFunction<T> mapper)

List<String> numbers = Arrays.asList("2","1","4","7");
LongStream numberStream = numbers.stream().mapToLong((String val)->Long.parseLong(val));

long[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7

long[] numbersArray = {2, 1, 4, 7};
LongStream numbersStream = Arrays.stream(numbersArray);
long[] output = numbersStream.filter((long val)->val>2).toArray();
//Output: 4,7

11. mapToDouble(ToDoubleFunction<T> mapper)

List<String> numbers = Arrays.asList("2","1","4","7");
DoubleStream numberStream = numbers.stream().mapToDouble((String val)->Double.parseDouble(val));

double[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7

double[] numbersArray = {2, 1, 4, 7};
DoubleStream numbersStream = Arrays.stream(numbersArray);
double[] output = numbersStream.filter((double val)->val>2).toArray();
//Output: 4,7

Why we call intermediate operation "Lazy"?

    List<Integer> numbers = Arrays.asList(2,1,4,7,10);
    Stream<Integer> numberStream = numbers.stream().filter((Integer val) -> val>=3).peek((Integer val) -> System.out.println(val));

Output: Nothing would be printed in the output

List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Stream<Integer> numberStream = numbers.stream().filter((Integer val) -> val>=3).peek((Integer val) -> System.out.println(val));
numberStream.count(); //count is one of the terminal operation
/*
4
7
10
*/

Sequence of Stream Operations

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(2,1,4,7,10);
        Stream<Integer> numberStream = numbers.stream()
                .filter((Integer val) -> val>=3)
                .peek((Integer val) -> System.out.println("after filter: "+val))
                .map((Integer val) -> (val*-1))
                .peek((Integer val) -> System.out.println("after negating:"+ val))
                .sorted()
                .peek((Integer val)-> System.out.println("after Sorted:"+val));

        List<Integer> filteredNumberStream = numberStream.collect(Collectors.toList());
    }
}

Expected Output

after filter: 4
after filter: 7
after filter: 10

after negating:-4
after negating:-7
after negating:-10

after Sorted:-10
after Sorted:-7
after Sorted:-4

Actual Output

after filter: 4
after negating:-4
after filter: 7
after negating:-7
after filter: 10
after negating:-10
after Sorted:-10
after Sorted:-7
after Sorted:-4

Different Terminal Operations

Terminal operations are the ones that produces the result. It triggers the processing of the stream.

1. forEach(Consumer<T> action)

Perform action on each elemnt of the stream. DO NOT returns any value.

List<Integer> numbers = Arrays.asList(2,1,4,7,10);
numbers.stream().filter((Integer val) -> val>=3)
        .forEach((Integer val)-> System.out.println(val));
//Output 4, 7, 10

2. toArray()

Collects the elements of the stream into an Array

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Object[] filteredNumberArrType1 = numbers.stream()
        .filter((Integer val)-> val>=3)
        .toArray();

Integer[] filteredNumberArrType2 = numbers.stream()
        .filter((Integer val)->val>=3)
        .toArray((int size) -> new Integer[size]);

3. reduce(BinaryOperator<T> accumulator)

Does reduction on the elements of the stream. Perform associative aggregation function.

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(2,1,4,7,10);

        Optional<Integer> reducedValue = numbers.stream()
                .reduce((Integer val1, Integer val2)-> val1+val2);

        System.out.println(reducedValue.get());
        //output: 24

    }
}

4. collect(Collector<T,A,R> collector)

can be used to collects the elements of the stream into an List.

List<Integer> numbers = Arrays.asList(2,1,4,7,10);


List<Integer> filteredNumber = numbers.stream()
            .filter((Integer val)->val>=3)
            .collect(Collectors.toList());

5. min(Comparator<T> comparator)

Finds the minimum or maximum element from the stream based on the comparator provided.

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Optional<Integer> minimumValueType1 = numbers.stream()
                            .filter((Integer val)->val>=3)
                            .min((Integer val1, Integer val2)->val1-val2);
System.out.println(minimumValueType1.get());
//4
Optional<Integer> minimumValueType2 = numbers.stream()
        .filter((Integer val)->val>=3)
        .min((Integer val1, Integer val2)-> val2-val1);
System.out.println(minimumValueType2.get());
//10

6. max(Comparator<T> comparator)

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Optional<Integer> minimumValueType1 = numbers.stream()
                            .filter((Integer val)->val>=3)
                            .max((Integer val1, Integer val2)->val1-val2);
System.out.println(minimumValueType1.get());
//10
Optional<Integer> minimumValueType2 = numbers.stream()
        .filter((Integer val)->val>=3)
        .max((Integer val1, Integer val2)-> val2-val1);
System.out.println(minimumValueType2.get());
//4

7. count()

returns the count of element present in the stream


    List<Integer> numbers = Arrays.asList(2,1,4,7,10);

    long noOfValuesPresent = numbers.stream()
                                .filter((Integer val)->val>=3)
                                .count();
    //3

8. anyMatch(Predicate<T> predicate)

Checks if any value in the stream match the given predicate and return the boolean

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

boolean hasValueGreaterThanThree = numbers.stream()
                            .anyMatch((Integer val)->val>3);
//true

9. allMatch(Predicate<T> predicate)

Checks if all values in the stream match the given predicate and return the boolean

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

boolean hasValueGreaterThanThree = numbers.stream()
                            .allMatch((Integer val)->val>3);
//false

10. noneMatch(Predicate<T> predicate)

Checks if no value in the stream match the given predicate and return the boolean

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

boolean hasValueGreaterThanThree = numbers.stream()
                            .noneMatch((Integer val)->val>3);
//false

11. findFirst()

finds the first element of the stream

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Optional<Integer> firstValue = numbers.stream()
                            .filter((Integer val)->val>=3)
                                    .findFirst();
System.out.println(firstValue.get());
//4

12. findAny()

finds any random element of the stream

List<Integer> numbers = Arrays.asList(2,1,4,7,10);

Optional<Integer> anyValue = numbers.stream()
                            .filter((Integer val)->val>=3)
                                    .findAny();
System.out.println(anyValue.get());
//4

How many times we can use a single stream?

One terminal operation is used on a stream, it is closed/consumed and cannot be used again for another terminal operation.

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(2,1,4,7,10);

        Stream<Integer> filteredNumbers = numbers.stream()
                .filter((Integer val)->val>=3);

        filteredNumbers.forEach((Integer val) -> System.out.println(val));//consumed the filtered Numbers stream

        //trying to use the closed stream again
        List<Integer> listFromStream = filteredNumbers.collect(Collectors.toList());
    }
}
/*
4
7
10
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at learn.collection.Main.main(Main.java:20)
*/

Parallel Stream

  • Helps to perform operation on stream concurrently, taking advantage of multi core CPU.

  • ParallelStream() method is used instead of regular stream() method.

  • Internally it does:

    • Task Splitting: it uses "spliterator" function to split the data into multiple chunks.

    • Task submission and parallel processing: uses Fork-Join technique.

public class Main {
    public static void main(String[] args) {
        List<Integer> numbers = Arrays.asList(11,22,33,44,55,66,77,88,99,110);

        // Sequential Processing
        long sequentialProcessingStartTime = System.currentTimeMillis();
        numbers.stream()
                .map((Integer val)->val*val)
                .forEach((Integer val)-> System.out.println(val));

        System.out.println("Sequential processing time taken: "+ (System.currentTimeMillis()-sequentialProcessingStartTime)+" millisecond");

        // Parallel Processing
        long parallelProcessingStartTime = System.currentTimeMillis();
        numbers.parallelStream()
                .map((Integer val)->val*val)
                .forEach((Integer val)-> System.out.println(val));

        System.out.println("Parallel processing time taken: "+ (System.currentTimeMillis()-parallelProcessingStartTime)+" millisecond");

    }
}
121
484
1089
1936
3025
4356
5929
7744
9801
12100
Sequential processing time taken: 2 millisecond
5929
4356
9801
12100
7744
484
1089
121
3025
1936
Parallel processing time taken: 4 millisecond