Site Search:

Use parallel Streams including reduction, decomposition, merging processes, pipelines and performance

Back>



The fork/join framework is one of the implementations of Executor Interface. It is the bridge between pre java8 thread pool and java8 parallel stream. The fork/join framework allows you to divide a problem into subproblems, solving each subproblems in a separate thread in parallel. Java8 parallel stream is a wrapping around of the fork/join framework.

Instead of having the programmer to specify how the problem is divided in the fork/join framework, the java run time performs the partitioning and combining of solution transparently for programmers -- Collection interface provided default Spliterator<E> spliterator(), which can be overridden by Collection implementations. Parallel streams internally use the default ForkJoinPool, whose size by default is fixed to Runtime.getRuntime().availableProcessors(). As a java8 programmer, all you have to say is stream.parallel() or collection.parallelStream() in order to convert a serial stream into parallel stream.

Parallel stream is a branded fork/join thread pool, it is not a new magic, calling parallel() or parallelStream() just internally set a boolean flag to signal java runtime that you want to run in parallel (with fork/join thread pool) all the operations that follow the invocation to parallel. However, you may not get what you want during the fork/join's decompose, reduce, combine process.

  • Some operations are naturally sequential like findFirst. Run parallel get nothing but thread overhead.
  • Some collection implementations are not parallel friendly (eg. LinkedList is hard to decompose), calling parallel() or parallelStream() gets worse performances. 
  • When parallel do help, it is not the fastest, conventional thread pool with fine tuned pool size (famous formular Ncpu * Ucpu (1 + W/C) ) runs faster than parallel stream (with fixed default size Runtime.getRuntime().availableProcessors()).
  • Parallel stream also produces unpredictable results and throws ConcurrentModificationException like any thread pool does if not used correctly.

parallel streams
parallel streams


For OCPJP, we need to know these in scope knowledge:

Creating parallel Streams

  • create a parallel stream from an existing stream.
Stream<Integer> stream = Arrays.asList(1, 2, 3, 4, 5).stream();
Stream<Integer> parallelStream = stream.parallel();
  • create a parallel stream from a java collection class.
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
Stream<Integer> parallelStream = list.parallelStream();

Ordering

The streams are considered ordered by default. However order is not guaranteed with parallel streams, methods such as findAny(), forEach() will produce unpredictable results for parallel stream. Any stream operation that is based on order, including findFirst(), limit(), skip() and forEachOrdered() may actually perform more slowly in a parallel stream.

OCPJP>cat ParallelOrder.java 
import java.util.stream.*;
import java.util.*;

class ParallelOrder {
  public static void main(String[] args) {
    List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);
    System.out.println("findAny()");
    System.out.println(list.parallelStream().findAny().get());    
    System.out.println(list.parallelStream().findAny().get());    
    System.out.println(list.parallelStream().findAny().get());    
    System.out.println("forEach()");
    list.stream().parallel().forEach(System.out::println);
    System.out.println("forEach()");
    list.stream().parallel().forEach(System.out::println);
    System.out.println("forEachOrdered()");
    list.stream().parallel().forEachOrdered(System.out::println);
    System.out.println("forEachOrdered()");
    list.stream().parallel().forEachOrdered(System.out::println);
  }
}
OCPJP>javac ParallelOrder.java 
OCPJP>java ParallelOrder
findAny()
3
2
2
forEach()
1
5
3
2
4
forEach()
3
1
2
5
4
forEachOrdered()
1
2
3
4
5
forEachOrdered()
1
2
3
4
5
OCPJP>

Performance

Parallel streams can improve performance because many stream operations can be executed independently. 
  • When stream operations need to coordinate, the performance degenerates. Many common stream operations such as map(), forEach(), filter() can be processed independently (although order is never guaranteed); other operations such as limit(), findFirst() that have to coordinate parallel streams, are expensive.
  • For a small amount of data, the advantages of processing in parallel is shadowed by the additional cost introduced by the parallelization process. 
  • The stream source can also effect performance of parallel stream. For example, ArrayList is easier to split than LinkedList, because the later has to be traversed before decomposing. The following table Listed stream sources and decomposability.
Source Decomposability
ArrayList Good
LinkedList bad
IntStream.range Good
Stream.iterate bad
HashSet Ok
TreeSet Ok

Interference and Stateful Operation

There are 2 types of stream operations -- stateful and stateless:
  1. Operations like map and filter take each element from the input stream and produce zero or one result in the output stream. They don't have internal state (assuming the user-supplied lambda or method reference has no internal mutable state). These kind of operations are stateless operation.
  2. Operations like sorted or distinct require knowing the previous history to do their job. They have to know all the elements before a single item can be added to the output stream. In another word, they have to cache in memory all the elements in the stream -- a huge internal state. Other operations like reduce, sum and max need to have a small internal state to accumulate the result, this internal state changes all the time. These kind of operations have internal states, are called stateful operations.
A stateful lambda expression is one has internal mutable state, thus an stateless operation that takes the lambda expression now become stateful -- the results now depend on state that might change during the execution of a pipeline.

The state could change during the execution of a pipeline by some operations' side effect. An operation or lambda expression has side effect if, it is stateful -- in addition to returning a value, it also modifies a state variable, a variable that is potentially depended on by other operations. It is Ok for lambda expression to have side effect. Operations like peek and collect are designed for side effects. For parallel stream, a stateful operation running by thread A can share states with operations (same operation or different operation) running by thread B.

When we use operations such as forEach and peek in a parallel stream, the java runtime may invoke the lambda expression specified as its parameter concurrently from multiple threads. If the lambda expression is stateful and the shared states are not safely published and shared, the results will suffer traditional race conditions (during fork/join process), such as unpredictable or throw ConcurrentModificationException. Traditional safeguards such as immutable, Concurrent collections, Atomic etc. need to be used in these cases. Assuming a stateful lambda expression is unavoidable, eg. producing multiple objects from an one time stream (and summarizingInt can not help).

OCPJP>cat ParallelRaceCondition.java 
import java.util.*;
import java.util.stream.*;
import java.util.concurrent.atomic.*;
class ParallelRaceCondition {
  static private int sum = 0;
  static private AtomicInteger aSum = new AtomicInteger(); 
  public static void main(String... args) {
    IntStream.rangeClosed(0, 9).parallel().forEach(i -> sum += i);
    System.out.println("sum = " + sum);
    int s = IntStream.rangeClosed(0, 9).parallel().sum();
    System.out.println("sum() = " + s);
    IntStream.rangeClosed(0, 9).parallel().forEach(i -> aSum.addAndGet(i));
    System.out.println("aSum = " + aSum.get());
    
  }
}
OCPJP>javac ParallelRaceCondition.java 
OCPJP>for i in {1..50}; do java ParallelRaceCondition; done | grep 'sum =' |sort|uniq
sum = 36
sum = 40
sum = 43
sum = 44
sum = 45
OCPJP>for i in {1..50}; do java ParallelRaceCondition; done | grep 'sum() =' |sort|uniq
sum() = 45
OCPJP>for i in {1..50}; do java ParallelRaceCondition; done | grep 'aSum =' |sort|uniq
aSum = 45
OCPJP>


For stateless operations filter and map, a stateful lambda expression can cause the program to throw ConcurrentModificationException when using (parallel) stream, and there is no reason to introduce state in these operations, so never pass as parameters a stateful lambda expressions in operations such as filter and map.

OCPJP>cat ParallelException.java 
import java.util.concurrent.*;
import java.util.*;
import java.util.stream.*;

class ParallelException {
  public static void main(String...args) {
    List<String> noSafe = new ArrayList<>();
    noSafe.add("a");
    noSafe.parallelStream().map(i -> {noSafe.add("c"); return "c";}).count();
  }
}
OCPJP>javac ParallelException.java 
OCPJP>java ParallelException
Exception in thread "main" java.util.ConcurrentModificationException
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:747)
at java.util.stream.ReduceOps$ReduceTask.doLeaf(ReduceOps.java:721)
at java.util.stream.AbstractTask.compute(AbstractTask.java:316)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:400)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:728)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:714)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.LongPipeline.reduce(LongPipeline.java:438)
at java.util.stream.LongPipeline.sum(LongPipeline.java:396)
at java.util.stream.ReferencePipeline.count(ReferencePipeline.java:526)
at ParallelException.main(ParallelException.java:9)
OCPJP>

If we replace the ArrayList with thread-safe CopyOnWriteArrayList, the above ConcurrentModificationException can be avoid. (still a horrible idea though)

OCPJP>cat ParallelNoException.java 
import java.util.concurrent.*;
import java.util.*;
import java.util.stream.*;

class ParallelNoException {
  public static void main(String...args) {
    List<String> safe = new CopyOnWriteArrayList<>();
    safe.add("a");
    safe.parallelStream().map(i -> {safe.add("c"); return "c";}).count();
  }
}
OCPJP>javac ParallelNoException.java 
OCPJP>java ParallelNoException
OCPJP>

Parallel Reduction


Reduction operations on parallel streams are referred to as parallel reductions.

The reduce method is meant to combine two values and produce a new one -- it is an immutable reduction. The reduce() method has a 3 argument version that takes an identity, an accumulator and a combiner operators as the parameters.

In contract, the collect method is designed to mutate a container to accumulate the result it's supposed to produce. Like reduce(), collect() also has a 3 argument version that takes a supplier, an accumulator and a combiner operators.

The method reduce(), collect() generally performs better with parallel stream. The better performance is not guaranteed though. Since collect need to mutate a container, getting parallel performance is even harder. For collect() method -- the java runtime only performs a parallel reduction if all of the 3 criteria are true:
  1. The stream is parallel. You call .parallel() or .parallelStream() to satisfy this.
  2. The parameter of the collect operation, the collector, has the characteristic Collector.Characteristics.CONCURRENT. 
  3. Either the stream is unordered or the collector has the characteristic Collector.Characteristics.UNORDERED. 
Collectors.toConcurrentMap() and Collectors.groupingByConcurrent() satisfy both 2 and 3.

OCPJP>cat ParallelMap.java 
import java.util.concurrent.*;
import java.util.stream.*;
import java.util.*;
class ParallelMap {
  public static void main(String[] args) {
    System.out.println(Collectors.toConcurrentMap(k ->k, v -> v).characteristics());
    ConcurrentMap<String, Integer> m = Stream.of("hello", "world").parallel().collect(Collectors.toConcurrentMap(k -> k, String::length, (i, j) -> i + j));
    System.out.println(m);
    System.out.println(Collectors.groupingByConcurrent(String::length).characteristics());
    ConcurrentMap<Integer, List<String>> g = Stream.of("hello", "world", "!").collect(Collectors.groupingByConcurrent(String::length));
    System.out.println(g);
  }
}
OCPJP>
OCPJP>javac ParallelMap.java 
OCPJP>java ParallelMap
[CONCURRENT, UNORDERED, IDENTITY_FINISH]
{world=5, hello=5}
[CONCURRENT, UNORDERED, IDENTITY_FINISH]
{1=[!], 5=[hello, world]}
OCPJP>


To maintain the order as they would be with a serial stream, the following extra requirements for reduce() arguments have to be followed. Like reduce(), collect()'s supplier and accumulator have to follow rule 2 and 3 to maintain the order.
  1. The identity must be defined such that for all elements in the stream u, combiner.apply(identity, u) is equal to u.
  2. The accumulator operator op must be associative and stateless such that (a op b) op c is equal to a op (b op c).
  3. The combiner operator must also be associative and stateless and compatible with the identity, such that for all u and t combiner.apply(u, accumulator.apply(identity, t)) is equal to accumulator.apply(u, t).
OCPJP>cat ParallelReduce.java 
import java.util.*;
import java.util.stream.*;
class ParallelReduce {
  public static void main(String[] args) {
    System.out.println(Stream.of("a", "b", "c", "d").parallel().reduce("", (a,b) -> a +b, (a, b) -> a + b));
    System.out.println(Stream.of("a", "b", "c", "d").parallel().collect(StringBuilder::new, StringBuilder::append, (s1, s2) -> s1.append(s2.toString())));
  }
}
OCPJP>
OCPJP>javac ParallelReduce.java 
OCPJP>java ParallelReduce
abcd
abcd
OCPJP>