How do Java streams work internally

Michael Muller 3

The major innovations in Java 8 included lambda expressions and the stream interface. The current Java has been around for a while, but many companies are still in the process of converting to it. Good if you can then deal with parallel collections.

It has introductory articles on lambdas and streams heise developer already given. In this respect, the concepts are only briefly refreshed in this article in order to focus on special aspects such as parallel collections through a brief consideration of Java concurrency.

Basics of parallelism in Java 8

A stream can be thought of as a continuous flow of data, similar to a InputStream. The data is brought into circulation from a source, for example a collection. However, a date is not just a byte, but can represent any object. In the case of a collection as a data source, a date of the stream corresponds to that of the collection.

A ParallelStream can be imagined as a parallel stream of objects. The so-called spliterator (splitting iterator) is responsible for the division and number of (parallel) streams, which decides whether the data is divided into smaller units to be processed in parallel and iterates over the data of such a unit. The fork-join framework from Java is used under the hood.

At the end of the stream there is a terminal operation. It merges the parallel flows into one result. In addition to the terminal, any intermediate operations that are carried out in a chain can be inserted into the stream. In this respect, from the developer's point of view, there is only input and output data, but no explicit intermediate results.

Figure 3 shows an example of a parallel stream with filter and map operation and a list collector as a terminal operation. Such a stream behaves "lazy": It can be set up with its intermediate operations as a process chain without actually executing any operations. The stream is only started when the terminal operation is called. This results in the following basic structure for collections:

AnyCollection
.stream ()
[.intermediateOperation1 ()
[.intermediateOperation2 ()
[....]]]
.terminalOperation ();

As the square brackets indicate, developers cannot specify any or any number of intermediate operations. However, a terminal operation is sufficient for execution.

Streams are not only defined on collections, but also in other interfaces or classes. As default methods, they extend existing interfaces and can come with different names, for example as .intStream () or as .lines ().

Generated in the following example createNumbers () simply a list of numbers. From it the occurrences of all values ​​smaller than 10 are to be counted. For this purpose, a filter is used as an intermediate operation, which only allows numbers less than 10 to pass. The method filter() works on the interface Predicatewhich is an abstract method test (Object o) Are defined. One element from the stream is transferred here.

public class SimpleStream {
public static void main (String [] args) {
List numbers = createNumbers ();
System.out.println ("Count <10:" +
numbers.stream (). filter (new LessThan10 ()). count ());
}

private static class LessThan10 implements Predicate {
@Override
public boolean test (Object n) {
return (int) n <10;
}
}
}

But the whole thing seems very cumbersome. Although the loop can be hidden with the stream and thus the processing can be linearized, the complex definition of the test class is disturbing. This is where the lambda expressions come into play. They can be used in places of functional interfaces. An interface with exactly one abstract method is referred to as such.

public class SimpleStream {
public static void main (String [] args) {
List numbers = createNumbers ();
System.out.println ("Count <10:" +
numbers.stream (). filter (n -> n <10) .count ());
}
}

n is a freely chosen identifier that stands for an object occurring in the stream. This n is now entered as a parameter in the function. The method to be overridden test (Object n) returns a boolean value using n <10. Since the compiler can determine the type of the object from the collection at this point, there is no type specification for the parameter n an explicit casting is required.

The lambda operator -> may be reminiscent of mathematics. It says here x -> f (x): One parameter x is on a function f (x) pictured. If you are familiar with JavaScript, you may find the above expression as function (n) n <10 known.

The following rules, among others, apply to parameters in lambda expressions:

  • Put type + name like parameter in brackets: (int x, int y) -> x * y;
  • Clearly derivable types can be omitted: (x, y) -> x * y;
  • An empty parameter list is also possible: () -> getVendorCount (persons)
  • A parameter without a type specification is possible without brackets: (x) -> x * x; or x -> x * x;

So much for a brief refresher on the basics of lambdas and streams. In addition to the articles mentioned, the basics are described in detail in an e-book by the author.

Parallel data collectors

This article is about parallel data collection. To keep the example simple, numbers are only summed up. The pitfalls and solutions shown can then be transferred to other tasks. The method comes as a terminal operation collect for use:

long sum = numbers.parallelStream (). collect (...);

It is used to collect data in a container. As it persists, its content changes. You are dealing with a changeable data structure. In contrast, the one not covered in the article works reduce with immutable data structures.

collect is an overloaded method that is defined with two different signatures. It expects three parameters: supplier, accumulator and combiner. Readers can find out what they are all about after a brief excursion into the basics of Java concurrency.

Concurrency digression

Excursion destination concurrency

Concurrency is called concurrency in German. This expresses that several program strands (threads) are executed next to each other - but not necessarily at the same time. A classic example is the relocation of computationally intensive tasks to the background so that the user interface does not block.

One speaks of parallelism when the concurrent program threads execute the same algorithms at the same time. Opinions differ as to whether parallelism is a special case of concurrency or another form of program execution. However, this does not play a role for the consideration here. In the context of parallel streams, the same algorithms are used simultaneously on different data in the stream. This can be seen in Figure 3 as a vertical section through the data streams, in which the same operation is always arranged in a line.

Java was designed for multithreading from the start. This is how the class exists Thread since JDK 1.0.

Differentiation between process, thread and program

  • A process has a complete runtime environment with its own memory, which is usually protected from other processes.
  • A thread is a thread within the process.
  • In a process there is at least one thread, in concurrency there are several.
  • A program is executed by one or more processes.

See also the Java SE documentation.

public static void main (String [] args) {
Thread thread = new MyThread ();
thread.start ();
System.out.println ("Message from Main");
}

private static class MyThread extends Thread {
@Override
public void run () {
System.out.println ("Message from MyThread");
}
}

A thread can be easily implemented as a class that can be used by Thread inherits. You just need the method run() overwrite. A new instance of the class is created in the calling program and using begin() executed. begin() ensures behind the scenes that a new thread is created and then calls the method in it run() on.

Using Thread seems intuitive at first glance. That the own class of Thread must inherit, but does not prove to be helpful, after all, one usually wants to map subject-specific dependencies in class hierarchies. To that extent is Thread not necessarily usable as an upper class; there is another way: instead of Thread To inherit, the class simply implements the interface Runnable. The class Thread does not proceed any different internally.

public static void main (String [] args) {
Thread thread = new Thread (new MyRunnable ());
thread.start ();
System.out.println ("Message from Main");
}

private static class MyRunnable implements Runnable {
@Override
public void run () {
System.out.println ("Message from MyRunnable");
}
}

To create a thread, one instance of its own class is converted into a new one by Thread packed up. Otherwise nothing changes.

Before readers return to the solution with parallel streams, let us consider adding numbers using threading. The class SummingUnit used.

private class SummingUnit {

public SummingUnit () {
System.out.println ("ctor SummingUnit");
}

private long sum = 0;

public void sum (long val) {
sum + = val;
}

public long getSum () {
return sum;
}

public void combine (SummingUnit other) {
sum + = other.sum;
}
}

The class internally contains a sum variable which the sum () transferred values ​​are summed up, as well as a getter to read out the sum. The method combine () is not needed at first. However, attentive readers may want to refer to the third parameter of collect () detect.

public static void main (String [] args) throws InterruptedException {
List numbers = Utils.createNumbers ();
int size = numbers.size ();
SummingUnit summingUnit = new SummingUnit ();

Thread thread = new Thread (new SumTask (numbers, 0,
size / 2, summingUnit));
thread.start ();

SumTask sumTask = new SumTask (numbers, size / 2,
size, summingUnit);
sumTask.run ();
thread.join (); // wait for thread to complete

System.out.println ("Sum:" + summingUnit.getSum ());
}


private static class SumTask implements Runnable {

private final List _numbers;
private final int _start;
private final int _end;
private final SummingUnit _summingUnit;

public SumTask (List numbers,
int start,
int end,
SummingUnit summingUnit) {
_numbers = numbers;
_start = start;
_end = end;
_summingUnit = summingUnit;
}

@Override
public void run () {
for (int i = _start; i <_end; i ++) {
_summingUnit.sum (_numbers.get (i));
}
}
}

Race Conditions

Processed here SumTask the list with the numbers partial. In the main routine SumTask to do this once using a new thread (therad.start), the second time directly sumTask.run) called. Before the total is output, the program waits using thread.join () on the termination of the thread to make sure that the computation is also completed. Despite the measure, the program outputs different results when executed several times. How can that be?

Race Condition

Looking at the summation:

sum + = val;

so what looks like a simple operation is actually a whole series of instructions. It is syntactic sugar, because the instruction can also be written differently.

sum = sum + val;

An ordinary computer cannot simply add a value to a variable that is somewhere in memory. Rather, the addition takes place in a processor register. This results in a sequence of instructions similar to the following pseudocode:

register.load (sum)
register.add (val)
register.write (sum)

The value of the variable sum is loaded into a register, the value of val added and the result in the variable sum filed. These three principal operations do not run as an atomic unit. Rather, the operations of the different threads can be nested within one another, with each thread being able to use its own register. Consider a possible sequence of two threads. An initial value of sum = 0 accepted. Thread 1 processed val = 5 and thread 2 val = 3. Each thread has its own register.

surgery Result
Thread 1 register1.load (sum) register1 = 0
Thread 1 register1.add (5) register1 = 5
Thread 1 register1.write (sum) sum = 5
Thread 2 register2.load (sum) register2 = 5
Thread 2 register2.add (3) register2 = 8
Thread 2 register2.write (sum) sum = 8

Since both threads work independently of and in parallel with one another, the sequence of operations can also be done differently, for example as follows:

surgery Result
Thread 1 register1.load (sum) register1 = 0
Thread 1 register1.add (5) register1 = 5
Thread 2 register2.load (sum) register2 = 0 (sum is still 0!)
Thread 1 register1.write (sum) sum = 5
Thread 2 register2.add (3) register2 = 3
Thread 2 register2.write (sum) sum = 3

In this example register2 loaded before the first thread wrote back its result. So is sum still 0. As can be easily seen, contains sum the wrong value after both additions have expired. The problem is a so-called race condition. The result depends on who is ahead in the race for the operation on the common variable. The method sum () becomes a critical section. This must be protected from competing access. Java offers various mechanisms for this. For example, developers can set a lock before entering the critical section or they can let Java do the work by using the method with the qualifier synchronized Mistake.

public synchronized void sum (long val)

This automatically locks the method for other threads as long as one of them is running through them. In practice, it serializes the calls, one after the other, at the expense of parallelism. That may not be the solution you want.

Java provides the key word volatile, with which the declaration of the variable sum can be provided. In this way, Java only guarantees that after writing to this variable, the exact new value will be read. But that doesn't help in the scenario described. (However, without volatile the first sequence will also fail: although the 5 after sum was written, the following operation could still read a 0 due to old cache contents.)

Avoid critical sections

Rather, the desired solution must be to avoid critical sections wherever possible. In the example, this can be achieved by having each thread with its own SummingUnit is working. This means that there is no longer any shared variable. Instead, the partial results are individual SummingUnits to collect at the end - this is where it comes in combine ()-Method.

public static void main (String [] args) throws InterruptedException {
List numbers = Utils.createNumbers ();
int size = numbers.size ();
SummingUnit summingUnit = new SummingUnit ();
SummingUnit summingUnit2 = new SummingUnit ();

Thread thread = new Thread (new SumTask (numbers, 0,
size / 2, summingUnit));
thread.start ();

SumTask sumTask = new SumTask (numbers, size / 2,
size, summingUnit2);
sumTask.run ();
thread.join (); // wait for thread to complete

summingUnit.combine (summingUnit2);
System.out.println ("Sum:" + summingUnit.getSum ());
}

Conclusion

Parallel stream processing

So each thread has its own SummingUnit which adds up some of the values, and if the partial results are merged after parallel processing, special mechanisms such as synchronization are not required.

Back to collect ()-Method of parallel stream processing. The mentioned variant is defined as follows:

R collect (Supplier supplier,
BiConsumer accumulator,
BiConsumer combiner);

The first parameter, supplier, stands for a function that creates a result container of type R. (Result) generated. In the example, the function has the task of creating a new one for each thread SummingUnit to create.

The second function is an accumulator. This means that it receives values ​​and temporarily records the result of the calculation in order to then use this (intermediate) result and the next value to make the following calculation. That corresponds to the sum ()-Method. She works on an object of the type R. (the SummingUnit) and accepts a value of type Twhat the type of an object of the
Streams.

Last but not least, the combineFunction that readers will know from the thread example. This is an object of the type R. (again SummingUnit) that accepts another object of the type. Since the Collect ()Method in the example a SummingUnit returns is subsequently getSum () call to get the total.

System.out.println ("total:" +
numbers.parallelStream (). collect (
() -> new SummingUnit (), // supplier
(summingUnit, value) ->
summingUnit.sum (value), // accumulator
(summingUnit, other) ->
summingUnit.combine (other) // combiner
) .getSum ()
);

The naming of the lambda expressions can be freely selected. Because of their small scope, the variables are often given short names.

.collect (() -> new SummingUnit (),
(s, v) -> s.sum (v), (s, o) -> s.combine (o))

Alternatively, method references can be used at this point

.collect (SummingUnit :: new, SummingUnit :: sum, SummingUnit :: combine)

It is therefore quite easy to collect data according to your own taste. In practice, developers will not come across such a simple application, but the principle can be transferred to a suitable collector's item in the technical domain.

It is also possible to outsource the collective functions to a separate object and the last step of the retrieval of results there (in the example getSum). That's exactly what developers are cluttered with collectMethod.

R collect (Collector collector);

How easy to see, expected collect here an object of the type Collector. This is not an abstract class, but an interface. The following is the class SummingCollector that implements this interface. The application is then quite short:

System.out.println ("total:" +
numbers.parallelStream (). collect (new SummingCollector ()));

Methods are to be defined that provide a result container as well as an accumulator and a combination function. This is from the first collect- variant known. In addition, a method is required that provides a function for the final result. Please note at this point: The methods do not deliver direct results, but functions that produce the results. The easiest way to implement a function is to use a lambda expression.

A last method finally provides various characteristics that give the compiler information about the use of the collector. For the sake of simplicity, the one presented uses SummingCollector the existing one SummingUnitto whom he delegates individual tasks.

public class SummingCollector
implements Collector {
@Override
public supplier supplier () {
return () -> new SummingUnit ();
}

@Override
public BiConsumer accumulator () {
return (s, v) -> s.sum (v);
}

@Override
public BinaryOperator combiner () {
return (left, right) -> {left.combine (right); return left;};
}

@Override
public function finisher () {
return s -> s.getSum ();
}

@Override
public Set characteristics () {
return EnumSet.of (Characteristics.UNORDERED);
// do not add ", Characteristics.CONCURRENT"!
}

The class implements the typed interface Collector. The three types (T, A., R.; see method signature above) say: Process an object of the type Long (a number), use a result container of the type SummingUnit and produce a final result of the type long (the sum).

supplier delivers a new one SummingUnit. But that only works because the value is in the characteristics
Characteristics.CONCURRENT is not included. If it is specified, the compiler considers the collector per se to be multithreaded and only generates one SummingUnit. As explained, the method is sum () but not designed for multithreading. The way it is implemented, developers get what they want SummingUnit per thread. The function accumulator takes one
SummingUnit as well as a value opposite. Here is then simply added.

combiner works on two SummingUnits, here referred to as left and right. In contrast to the first variant, the second is not combined into a "leading" result container. Rather, both are equal. After combine-Operation is to return a result container. In the example this is the one on the left. Depending on the task, developers can also create a completely new container that contains the combined values.

The function has been added finisher, which receives one (the remaining) result container and delivers the final result. Finally, readers see the specification of the characteristic unorderedthat tells the compiler that the order in which the stream objects are processed is irrelevant. This enables effective parallelization.

Conclusion

Lambdas and streams allow simple parallelization without the explicit use of threads, synchronization, locks and other things that are at least no fun and even prevent some developers from "multithreaded" programming. As the simple example has shown, the knowledge of what is going on behind the scenes is very helpful to implement parallel processing with streams in a meaningful way. With this knowledge, significantly more complex data collectors can be created. (ane)

Michael Müller
As Head of Software Development at InEK GmbH, he is responsible for projects in the web, Java and .NET environment. In addition, he works as a freelance author and writes specialist articles on various development topics as well as book reviews.

3 comments