Table of contents
- Stream
- Different ways to create a Stream
- Different Intermediate Operations
- 1. filter(Predicate<T> predicate)
- 2. map(Function<T,R> mapper)
- 3. flatMap(Function<T,Stream<R>> mapper)
- 4. distinct()
- 5. sorted()
- 6. peek(Consumer<T> action)
- 7. limit(long maxSize)
- 8. skip(long n)
- 9. mapToInt(ToIntFunction<T> mapper)
- 10. mapToLong(ToLongFunction<T> mapper)
- 11. mapToDouble(ToDoubleFunction<T> mapper)
- Why we call intermediate operation "Lazy"?
- Sequence of Stream Operations
- Different Terminal Operations
- 1. forEach(Consumer<T> action)
- 2. toArray()
- 3. reduce(BinaryOperator<T> accumulator)
- 4. collect(Collector<T,A,R> collector)
- 5. min(Comparator<T> comparator)
- 6. max(Comparator<T> comparator)
- 7. count()
- 8. anyMatch(Predicate<T> predicate)
- 9. allMatch(Predicate<T> predicate)
- 10. noneMatch(Predicate<T> predicate)
- 11. findFirst()
- 12. findAny()
- How many times we can use a single stream?
- Parallel Stream
Stream
We can consider stream as a pipeline, through which our collection elements passes through.
While elements passes through pipelines, it perform various operations like sorting, filtering etc.
Useful when deals with bulk processing. (Can do parallel processing)
Architecture
Example
public class Main {
public static void main(String[] args) {
List<Integer> salaryList = new ArrayList<>();
salaryList.add(3000);
salaryList.add(4100);
salaryList.add(9000);
salaryList.add(1000);
salaryList.add(3500);
int count = 0;
for (Integer salary : salaryList){
if (salary > 3000){
count++;
}
}
System.out.println("Total Employee with salary > 3000: "+count);
}
}
// Total Employee with salary > 3000: 3
public class Main {
public static void main(String[] args) {
List<Integer> salaryList = new ArrayList<>();
salaryList.add(3000);
salaryList.add(4100);
salaryList.add(9000);
salaryList.add(1000);
salaryList.add(3500);
long output = salaryList.stream().filter((Integer sal) -> sal > 3000).count();
System.out.println("Total Employee with salary > 3000: "+output);
}
}
//Total Employee with salary > 3000: 3
Different ways to create a Stream
1. From Collection
List<Integer> salaryList = Arrays.asList(3000, 4100, 9000, 1000, 3500);
Stream<Integer> streamFromIntegerList = salaryList.stream();
2. From Array
Integer[] salaryArray = {3000, 4100, 9000, 1000, 3500};
Stream<Integer> streamFromIntegerList = Arrays.stream(salaryArray);
3. From Static Method
Stream<Integer> streamFromStaticMethod = Stream.of(1000,3500,4000,9000);
4. From Stream Builder
Stream.Builder<Integer> streamBuilder = Stream.builder();
streamBuilder.add(1000).add(9000).add(3500);
Stream<Integer> streamFromStreamBuilder = streamBuilder.build();
5. From Stream Iterate
Stream<Integer> streamFromIterate = Stream.iterate(1000, (Integer n) -> n + 5000).limit(5);
Different Intermediate Operations
We can chain multiple intermediate operations together to perform mroe complex processing before applying terminal operation to produce the result.
1. filter(Predicate<T> predicate)
Filters the element
Stream<String> nameStream = Stream.of("HELLO", "EVERYBODY","HOW","ARE","YOU","DOING");
Stream<String> filteredStream = nameStream.filter((String name) -> name.length()<=3);
List<String> filteredNameList = filteredStream.collect(Collectors.toList());
// [HOW, ARE, YOU]
2. map(Function<T,R> mapper)
Used to transform each element
Stream<String> nameStream = Stream.of("HELLO", "EVERYBODY","HOW","ARE","YOU","DOING");
Stream<String> filteredNames = nameStream.map((String name) -> name.toLowerCase());
//[hello, everybody, how, are, you, doing]
3. flatMap(Function<T,Stream<R>> mapper)
Used to iterate over each element of the complext collection, and helps to flatten it.
List<List<String>> sentenceList = Arrays.asList(
Arrays.asList("I","LOVE","JAVA"),
Arrays.asList("CONCEPTS","ARE","CLEAR"),
Arrays.asList("ITS","VERY","EASY")
);
Stream<String> wordsStream1 = sentenceList.stream().flatMap((List<String> sentence) -> sentence.stream());
System.out.println(wordsStream1.collect(Collectors.toList()));
Stream<String> wordsStream2 = sentenceList.stream().flatMap((List<String> sentence) -> sentence.stream().map((String value)-> value.toLowerCase()));
System.out.println(wordsStream2.collect(Collectors.toList()));
/*
[I, LOVE, JAVA, CONCEPTS, ARE, CLEAR, ITS, VERY, EASY]
[i, love, java, concepts, are, clear, its, very, easy]
*/
4. distinct()
Removes duplicate from the stream
Integer[] arr = {1,5,2,7,4,4,2,0,9};
Stream<Integer> arrStream = Arrays.stream(arr).distinct();
System.out.println(arrStream.collect(Collectors.toList()));
//[1, 5, 2, 7, 4, 0, 9]
5. sorted()
Sorts the elements
Integer[] arr = {1,5,2,7,4,4,2,0,9};
Stream<Integer> arrStream = Arrays.stream(arr).sorted();
System.out.println(arrStream.collect(Collectors.toList()));
//[0, 1, 2, 2, 4, 4, 5, 7, 9]
Stream<Integer> arrStream = Arrays.stream(arr).sorted((Integer val1, Integer val2) -> val2-val1);
System.out.println(arrStream.collect(Collectors.toList()));
//[9, 7, 5, 4, 4, 2, 2, 1, 0]
6. peek(Consumer<T> action)
Helps you to see the intermediate result of the stream which is getting processed.
List<Integer> numbers = Arrays.asList(2,1,3,4,6);
Stream<Integer> numberStream = numbers.stream()
.filter((Integer val) -> val>2)
.peek((Integer val) -> System.out.println(val)) //it will print 3,4,6
.map((Integer val)->-1*val);
List<Integer> numberList = numberStream.collect(Collectors.toList());
7. limit(long maxSize)
Truncate the stream, to have no logner than given maxSize
List<Integer> numbers = Arrays.asList(2,1,3,4,6);
Stream<Integer> numberStream = numbers.stream()
.limit(3);
System.out.println(numberStream.collect(Collectors.toList()));
//[2, 1, 3]
8. skip(long n)
Skip the first n elements of the stream
List<Integer> numbers = Arrays.asList(2,1,3,4,6);
Stream<Integer> numberStream = numbers.stream().skip(3);
System.out.println(numberStream.collect(Collectors.toList()));
//[4, 6]
9. mapToInt(ToIntFunction<T> mapper)
Helps to work with primitive "int" data types
Example 1:
List<String> numbers = Arrays.asList("2","1","4","7");
IntStream numberStream = numbers.stream().mapToInt((String val)->Integer.parseInt(val));
int[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7
Example 2:
int[] numbersArray = {2, 1, 4, 7};
IntStream numbersStream = Arrays.stream(numbersArray);
numbersStream.filter((int val)->val>2);
int[] filteredArray = numbersStream.toArray();
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.base/java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:246)
at java.base/java.util.stream.IntPipeline.toArray(IntPipeline.java:562)
at learn.collection.Main.main(Main.java:22)
Example 3:
int[] numbersArray = {2, 1, 4, 7};
IntStream numbersStream = Arrays.stream(numbersArray);
int[] output = numbersStream.filter((int val)->val>2).toArray();
//4 7
10. mapToLong(ToLongFunction<T> mapper)
List<String> numbers = Arrays.asList("2","1","4","7");
LongStream numberStream = numbers.stream().mapToLong((String val)->Long.parseLong(val));
long[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7
long[] numbersArray = {2, 1, 4, 7};
LongStream numbersStream = Arrays.stream(numbersArray);
long[] output = numbersStream.filter((long val)->val>2).toArray();
//Output: 4,7
11. mapToDouble(ToDoubleFunction<T> mapper)
List<String> numbers = Arrays.asList("2","1","4","7");
DoubleStream numberStream = numbers.stream().mapToDouble((String val)->Double.parseDouble(val));
double[] numberArray = numberStream.toArray();
//Output: 2, 1, 4, 7
double[] numbersArray = {2, 1, 4, 7};
DoubleStream numbersStream = Arrays.stream(numbersArray);
double[] output = numbersStream.filter((double val)->val>2).toArray();
//Output: 4,7
Why we call intermediate operation "Lazy"?
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Stream<Integer> numberStream = numbers.stream().filter((Integer val) -> val>=3).peek((Integer val) -> System.out.println(val));
Output: Nothing would be printed in the output
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Stream<Integer> numberStream = numbers.stream().filter((Integer val) -> val>=3).peek((Integer val) -> System.out.println(val));
numberStream.count(); //count is one of the terminal operation
/*
4
7
10
*/
Sequence of Stream Operations
public class Main {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Stream<Integer> numberStream = numbers.stream()
.filter((Integer val) -> val>=3)
.peek((Integer val) -> System.out.println("after filter: "+val))
.map((Integer val) -> (val*-1))
.peek((Integer val) -> System.out.println("after negating:"+ val))
.sorted()
.peek((Integer val)-> System.out.println("after Sorted:"+val));
List<Integer> filteredNumberStream = numberStream.collect(Collectors.toList());
}
}
Expected Output
after filter: 4
after filter: 7
after filter: 10
after negating:-4
after negating:-7
after negating:-10
after Sorted:-10
after Sorted:-7
after Sorted:-4
Actual Output
after filter: 4
after negating:-4
after filter: 7
after negating:-7
after filter: 10
after negating:-10
after Sorted:-10
after Sorted:-7
after Sorted:-4
Different Terminal Operations
Terminal operations are the ones that produces the result. It triggers the processing of the stream.
1. forEach(Consumer<T> action)
Perform action on each elemnt of the stream. DO NOT returns any value.
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
numbers.stream().filter((Integer val) -> val>=3)
.forEach((Integer val)-> System.out.println(val));
//Output 4, 7, 10
2. toArray()
Collects the elements of the stream into an Array
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Object[] filteredNumberArrType1 = numbers.stream()
.filter((Integer val)-> val>=3)
.toArray();
Integer[] filteredNumberArrType2 = numbers.stream()
.filter((Integer val)->val>=3)
.toArray((int size) -> new Integer[size]);
3. reduce(BinaryOperator<T> accumulator)
Does reduction on the elements of the stream. Perform associative aggregation function.
public class Main {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Optional<Integer> reducedValue = numbers.stream()
.reduce((Integer val1, Integer val2)-> val1+val2);
System.out.println(reducedValue.get());
//output: 24
}
}
4. collect(Collector<T,A,R> collector)
can be used to collects the elements of the stream into an List.
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
List<Integer> filteredNumber = numbers.stream()
.filter((Integer val)->val>=3)
.collect(Collectors.toList());
5. min(Comparator<T> comparator)
Finds the minimum or maximum element from the stream based on the comparator provided.
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Optional<Integer> minimumValueType1 = numbers.stream()
.filter((Integer val)->val>=3)
.min((Integer val1, Integer val2)->val1-val2);
System.out.println(minimumValueType1.get());
//4
Optional<Integer> minimumValueType2 = numbers.stream()
.filter((Integer val)->val>=3)
.min((Integer val1, Integer val2)-> val2-val1);
System.out.println(minimumValueType2.get());
//10
6. max(Comparator<T> comparator)
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Optional<Integer> minimumValueType1 = numbers.stream()
.filter((Integer val)->val>=3)
.max((Integer val1, Integer val2)->val1-val2);
System.out.println(minimumValueType1.get());
//10
Optional<Integer> minimumValueType2 = numbers.stream()
.filter((Integer val)->val>=3)
.max((Integer val1, Integer val2)-> val2-val1);
System.out.println(minimumValueType2.get());
//4
7. count()
returns the count of element present in the stream
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
long noOfValuesPresent = numbers.stream()
.filter((Integer val)->val>=3)
.count();
//3
8. anyMatch(Predicate<T> predicate)
Checks if any value in the stream match the given predicate and return the boolean
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
boolean hasValueGreaterThanThree = numbers.stream()
.anyMatch((Integer val)->val>3);
//true
9. allMatch(Predicate<T> predicate)
Checks if all values in the stream match the given predicate and return the boolean
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
boolean hasValueGreaterThanThree = numbers.stream()
.allMatch((Integer val)->val>3);
//false
10. noneMatch(Predicate<T> predicate)
Checks if no value in the stream match the given predicate and return the boolean
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
boolean hasValueGreaterThanThree = numbers.stream()
.noneMatch((Integer val)->val>3);
//false
11. findFirst()
finds the first element of the stream
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Optional<Integer> firstValue = numbers.stream()
.filter((Integer val)->val>=3)
.findFirst();
System.out.println(firstValue.get());
//4
12. findAny()
finds any random element of the stream
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Optional<Integer> anyValue = numbers.stream()
.filter((Integer val)->val>=3)
.findAny();
System.out.println(anyValue.get());
//4
How many times we can use a single stream?
One terminal operation is used on a stream, it is closed/consumed and cannot be used again for another terminal operation.
public class Main {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(2,1,4,7,10);
Stream<Integer> filteredNumbers = numbers.stream()
.filter((Integer val)->val>=3);
filteredNumbers.forEach((Integer val) -> System.out.println(val));//consumed the filtered Numbers stream
//trying to use the closed stream again
List<Integer> listFromStream = filteredNumbers.collect(Collectors.toList());
}
}
/*
4
7
10
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
at learn.collection.Main.main(Main.java:20)
*/
Parallel Stream
Helps to perform operation on stream concurrently, taking advantage of multi core CPU.
ParallelStream() method is used instead of regular stream() method.
Internally it does:
Task Splitting: it uses "spliterator" function to split the data into multiple chunks.
Task submission and parallel processing: uses Fork-Join technique.
public class Main {
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(11,22,33,44,55,66,77,88,99,110);
// Sequential Processing
long sequentialProcessingStartTime = System.currentTimeMillis();
numbers.stream()
.map((Integer val)->val*val)
.forEach((Integer val)-> System.out.println(val));
System.out.println("Sequential processing time taken: "+ (System.currentTimeMillis()-sequentialProcessingStartTime)+" millisecond");
// Parallel Processing
long parallelProcessingStartTime = System.currentTimeMillis();
numbers.parallelStream()
.map((Integer val)->val*val)
.forEach((Integer val)-> System.out.println(val));
System.out.println("Parallel processing time taken: "+ (System.currentTimeMillis()-parallelProcessingStartTime)+" millisecond");
}
}
121
484
1089
1936
3025
4356
5929
7744
9801
12100
Sequential processing time taken: 2 millisecond
5929
4356
9801
12100
7744
484
1089
121
3025
1936
Parallel processing time taken: 4 millisecond