16.9 Parallel Streams
The Stream API makes it possible to execute a sequential stream in parallel without rewriting the code. The primary reason for using parallel streams is to improve performance, but at the same time ensuring that the results obtained are the same, or at least compatible, regardless of the mode of execution. Although the API goes a long way to achieve its aim, it is important to understand the pitfalls to avoid when executing stream pipelines in parallel.
Building Parallel Streams
The execution mode of an existing stream can be set to parallel by calling the parallel() method on the stream (p. 933). The parallelStream() method of the Collection interface can be used to create a parallel stream with a collection as the data source (p. 897). No other code is necessary for parallel execution, as the data partitioning and thread management for a parallel stream are handled by the API and the JVM. As with any stream, the stream is not executed until a terminal operation is invoked on it.
The isParallel() method of the stream interfaces can be used to determine whether the execution mode of a stream is parallel (p. 933).
Parallel Stream Execution
The Stream API allows a stream to be executed either sequentially or in parallel— meaning that all stream operations can execute either sequentially or in parallel. A sequential stream is executed in a single thread running on one CPU core. The elements in the stream are processed sequentially in a single pass by the stream operations that are executed in the same thread (p. 891).
A parallel stream is executed by different threads, running on multiple CPU cores in a computer. The stream elements are split into substreams that are processed by multiple instances of the stream pipeline being executed in multiple threads. The partial results from processing of each substream are merged (or combined) into a final result (p. 891).
Parallel streams utilize the Fork/Join Framework (§23.3, p. 1447) under the hood for executing parallel tasks. This framework provides support for the thread management necessary to execute the substreams in parallel. The number of threads employed during parallel stream execution is dependent on the CPU cores in the computer.
Figure 16.12, p. 963, illustrates parallel functional reduction using the three-argument reduce(identity, accumulator, combiner) terminal operation (p. 962).
Figure 16.14, p. 967, illustrates parallel mutable reduction using the three-argument collect(supplier, accumulator, combiner) terminal operation (p. 966).
Factors Affecting Performance
There are no guarantees that executing a stream in parallel will improve the performance. In this subsection we look at some factors that can affect performance.
Benchmarking
In general, increasing the number of CPU cores and thereby the number of threads that can execute in parallel only scales performance up to a threshold for a given size of data, as some threads might become idle if there is no data left for them to process. The number of CPU cores boosts performance to a certain extent, but it is not the only factor that should be considered when deciding to execute a stream in parallel.
Inherent in the total cost of parallel processing is the start-up cost of setting up the parallel execution. At the onset, if this cost is already comparable to the cost of sequential execution, not much can be gained by resorting to parallel execution.
A combination of the following three factors can be crucial in deciding whether a stream should be executed in parallel:
Sufficiently large data size
The size of the stream must be large enough to warrant parallel processing; otherwise, sequential processing is preferable. The start-up cost can be too prohibitive for parallel execution if the stream size is too small.
Computation-intensive stream operations
If the stream operations are small computations, then the stream size should be proportionately large as to warrant parallel execution. If the stream operations are computation-intensive, the stream size is less significant, and parallel execution can boost performance.
Easily splittable stream
If the cost of splitting the stream into substreams is higher than processing the substreams, employing parallel execution can be futile. Collections like Array-Lists, HashMaps, and simple arrays are efficiently splittable, whereas LinkedLists and IO-based data sources are less efficient in this regard.
Benchmarking—that is, measuring performance—is strongly recommended to decide whether parallel execution will be beneficial. Example 16.14 illustrates a simple scheme where reading the system clock before and after a stream is executed can be used to get a sense of how well a stream performs.
The class StreamBenchmarks in Example 16.14 defines five methods, at (1) through (5), that compute the sum of values from 1 to n. These methods compute the sum in various ways. Each method is executed with four different values of n; that is, the stream size is the number of values for summation. The program prints the benchmarks for each method for the different values of n, which of course can vary, as many factors can influence the results—the most significant one being the number of CPU cores on the computer.
The methods seqSumRangeClosed() at (1) and parSumRangeClosed() at (2) perform the computation on a sequential and a parallel stream, respectively, that are created with the closeRange() method.
return LongStream.rangeClosed(1L, n).sum(); // Sequential stream ... return LongStream.rangeClosed(1L, n).parallel().sum(); // Parallel stream
Benchmarks from Example 16.14:
Size Sequential Parallel 1000 0.05681 0.11031 10000 0.06698 0.13979 100000 0.71274 0.52627 1000000 7.02237 4.37249
The terminal operation sum() is not computation-intensive. The parallel stream starts to show better performance when the number of values approaches 100000. The stream size is then significantly large for the parallel stream to show better performance. Note that the range of values defined by the arguments of the rangeClosed() method can be efficiently split into substreams, as its start and end values are provided.
The methods seqSumIterate() at (3) and parSumIterate() at (4) return a sequential and a parallel sequential stream, respectively, that is created with the iterate() method.
return LongStream.iterate(1L, i -> i + 1).limit(n).sum(); // Sequential ... return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum(); // Parallel
Benchmarks from Example 16.14:
Size Sequential Parallel 1000 0.08645 0.34696 10000 0.35687 1.27861 100000 3.24083 11.38709 1000000 29.92285 117.87909
The method iterate() creates an infinite stream, and the limit() intermediate operation truncates the stream according to the value of n. The performance of both streams degrades fast when the number of values increases. However, the parallel stream performs worse than the sequential stream in all cases. The values generated by the iterate() method are not known before the stream is executed, and the limit() operation is also stateful, making the process of splitting the values into substreams inefficient in the case of the parallel stream.
The method iterSumLoop() at (5) uses a for(;;) loop to compute the sum.
Benchmarks from Example 16.14:
Size Iterative 1000 0.00586 10000 0.02330 100000 0.22352 1000000 2.49677
Using a for(;;) loop to calculate the sum performs best for all values of n compared to the streams, showing that significant overhead is involved in using streams for summing a sequence of numerical values.
In Example 16.14, the methods measurePerf() at (6) and xqtFunctions() at (13) create the benchmarks for functions passed as parameters. In the measurePerf() method, the system clock is read at (8) and the function parameter func is applied at (9). The system clock is read again at (10) after the function application at (9) has completed. The execution time calculated at (10) reflects the time for executing the function. Applying the function func evaluates the lambda expression or the method reference implementing the LongFunction interface. In Example 16.14, the function parameter func is implemented by method references that call methods, at (1) through (5), in the StreamBenchmarks class whose execution time we want to measure.
public static <R> double measurePerf(LongFunction<R> func, long n) { // (6) // ... double start = System.nanoTime(); // (8) result = func.apply(n); // (9) double duration = (System.nanoTime() - start)/1_000_000; // (10) ms. // ... }
Example 16.14 Benchmarking
import java.util.function.LongFunction; import java.util.stream.LongStream; /* * Benchmark the execution time to sum numbers from 1 to n values * using streams. */ public final class StreamBenchmarks { public static long seqSumRangeClosed(long n) { // (1) return LongStream.rangeClosed(1L, n).sum(); } public static long paraSumRangeClosed(long n) { // (2) return LongStream.rangeClosed(1L, n).parallel().sum(); } public static long seqSumIterate(long n) { // (3) return LongStream.iterate(1L, i -> i + 1).limit(n).sum(); } public static long paraSumIterate(long n) { // (5) return LongStream.iterate(1L, i -> i + 1).limit(n).parallel().sum(); } public static long iterSumLoop(long n) { // (5) long result = 0; for (long i = 1L; i <= n; i++) { result += i; } return result; } /* * Applies the function parameter func, passing n as parameter. * Returns the average time (ms.) to execute the function 100 times. */ public static <R> double measurePerf(LongFunction<R> func, long n) { // (6) int numOfExecutions = 100; double totTime = 0.0; R result = null; for (int i = 0; i < numOfExecutions; i++) { // (7) double start = System.nanoTime(); // (8) result = func.apply(n); // (9) double duration = (System.nanoTime() - start)/1_000_000; // (10) totTime += duration; // (11) } double avgTime = totTime/numOfExecutions; // (12) return avgTime; } /* * Executes the functions in the varargs parameter funcs * for different stream sizes. */ public static <R> void xqtFunctions(LongFunction<R>... funcs) { // (13) long[] sizes = {1_000L, 10_000L, 100_000L, 1_000_000L}; // (14) // For each stream size ... for (int i = 0; i < sizes.length; ++i) { // (15) System.out.printf("%7d", sizes[i]); // ... execute the functions passed in the varargs parameter funcs. for (int j = 0; j < funcs.length; ++j) { // (16) System.out.printf("%10.5f", measurePerf(funcs[j], sizes[i])); } System.out.println(); } } public static void main(String[] args) { // (17) System.out.println("Streams created with the rangeClosed() method:");// (18) System.out.println(" Size Sequential Parallel"); xqtFunctions(StreamBenchmarks::seqSumRangeClosed, StreamBenchmarks::paraSumRangeClosed); System.out.println("Streams created with the iterate() method:"); // (19) System.out.println(" Size Sequential Parallel"); xqtFunctions(StreamBenchmarks::seqSumIterate, StreamBenchmarks::paraSumIterate); System.out.println("Iterative solution with an explicit loop:"); // (20) System.out.println(" Size Iterative"); xqtFunctions(StreamBenchmarks::iterSumLoop); } }
Possible output from the program:
Streams created with the rangeClosed() method: Size Sequential Parallel 1000 0.05681 0.11031 10000 0.06698 0.13979 100000 0.71274 0.52627 1000000 7.02237 4.37249 Streams created with the iterate() method: Size Sequential Parallel 1000 0.08645 0.34696 10000 0.35687 1.27861 100000 3.24083 11.38709 1000000 29.92285 117.87909 Iterative solution with an explicit loop: Size Iterative 1000 0.00586 10000 0.02330 100000 0.22352 1000000 2.49677
Side Effects
Efficient execution of parallel streams that produces the desired results requires the stream operations (and their behavioral parameters) to avoid certain side effects.
Non-interfering behaviors
The behavioral parameters of stream operations should be non-interfering (p. 909)—both for sequential and parallel streams. Unless the stream data source is concurrent, the stream operations should not modify it during the execution of the stream. See building streams from collections (p. 897).
Stateless behaviors
The behavioral parameters of stream operations should be stateless (p. 909)— both for sequential and parallel streams. A behavioral parameter implemented as a lambda expression should not depend on any state that might change during the execution of the stream pipeline. The results from a stateful behavioral parameter can be nondeterministic or even incorrect. For a stateless behavioral parameter, the results are always the same.
Shared state that is accessed by the behavior parameters of stream operations in a pipeline is not a good idea. Executing the pipeline in parallel can lead to race conditions in accessing the global state, and using synchronization code to provide thread-safety may defeat the purpose of parallelization. Using the three-argument reduce() or collect() method can be a better solution to encapsulate shared state.
The intermediate operations distinct(), skip(), limit(), and sorted() are stateful (p. 915, p. 915, p. 917, p. 929). See also Table 16.3, p. 938. They can carry extra performance overhead when executed in a parallel stream, as such an operation can entail multiple passes over the data and may require significant data buffering.
Ordering
An ordered stream (p. 891) processed by operations that preserve the encounter order will produce the same results, regardless of whether it is executed sequentially or in parallel. However, repeated execution of an unordered stream— sequential or parallel—can produce different results.
Preserving the encounter order of elements in an ordered parallel stream can incur a performance penalty. The performance of an ordered parallel stream can be improved if the ordering constraint is removed by calling the unordered() intermediate operation on the stream (p. 932).
The three stateful intermediate operations distinct(), skip(), and limit() can improve performance in a parallel stream that is unordered, as compared to one that is ordered (p. 915, p. 915, p. 917). The distinct() operation need only buffer any occurrence of a duplicate value in the case of an unordered parallel stream, rather than the first occurrence. The skip() operation can skip any n elements in the case of an unordered parallel stream, not necessarily the first n elements. The limit() operation can truncate the stream after any n elements in the case of an unordered parallel stream, and not necessarily after the first n elements.
The terminal operation findAny() is intentionally nondeterministic, and can return any element in the stream (p. 952). It is specially suited for parallel streams.
The forEach() terminal operation ignores the encounter order, but the forEachOrdered() terminal operation preserves the order (p. 948). The sorted() stateful intermediate operation, on the other hand, enforces a specific encounter order, regardless of whether it executed in a parallel pipeline (p. 929).
Autoboxing and Unboxing of Numeric Values
As the Stream API allows both object and numeric streams, and provides support for conversion between them (p. 934), choosing a numeric stream when possible can offset the overhead of autoboxing and unboxing in object streams.
As we have seen, in order to take full advantage of parallel execution, composition of a stream pipeline must follow certain rules to facilitate parallelization. In summary, the benefits of using parallel streams are best achieved when:
The stream data source is of a sufficient size and the stream is easily splittable into substreams.
The stream operations have no adverse side effects and are computation-intensive enough to warrant parallelization.