Mit dem Release von Java 25 sind auch Stream-Gatherer endlich als finales Feature verwendbar. In den meisten Artikeln die einen Überblick der Neuerungen und Änderungen geben wird der mitgelieferte Stream-Gatherer fold gerne als Stream#reduce() aber als Intermediate Operation beschrieben. Das ist häufig auch gar nicht falsch, es gibt aber dennoch ein paar weitere kleine interessante Unterschiede, auf die wir hier einen kleinen Blick werfen möchten!
Stream-Operation reduce
Die Stream-Operation reduce gibt es in zwei Ausfertigungen, es kann entweder nur eine accumulator-Operation festgelegt werden, die auch als combiner-Operation verwendet wird oder es kann beides separat festgelegt werden. Bei Definition einer combiner-Operation kann die accumulator-Operation einen unterschiedlichen Ein- und Ausgabetyp erlauben. Beide Versionen benötigen einen Identity-Wert bezüglich der Operation.
T reduce(T identity, BinaryOperator<T> accumulator)
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner)
Aber genug Theorie, schauen wir uns mal in einem Beispiel an, was genau da eigentlich passiert. Dazu nehmen wir uns ein kleines anschauliches Problem vor, bei dem einfach nur Character bzw. Strings zusammengefügt werden. Die einzelnen Schritte werden zudem mit einer Kommandozeilenausgabe protokolliert.
void simpleReduce(Stream<String> stringStream) {
String reduceResult = stringStream.reduce(
"",
(a, b) -> {
IO.println("Acc: %s + %s = %s (Thread %s)".formatted(a, b, a + b, Thread.currentThread().getName()));
return a + b;
});
IO.println(reduceResult);
}
void reduceWithCombiner(Stream<Character> stringStream) {
String reduceResult = stringStream.reduce(
"",
(a, b) -> {
IO.println("Acc: %s + %s = %s (Thread %s)".formatted(a, b, a + b, Thread.currentThread().getName()));
return a + b;
},
(a, b) -> {
IO.println("Comb: %s + %s = %s (Thread %s)".formatted(a, b, a + b, Thread.currentThread().getName()));
return a + b;
});
IO.println(reduceResult);
}
Die unterschiedlichen Ein- und Ausgabetypen bei Angabe einer Combiner-Funktion wird hier durch das Starten mit Character-Werten dargestellt. Was kommt aber nun heraus, wenn wir diese Funktionen mit einem Stream der Buchstaben A bis G aufrufen?
# SimpleReduce
Acc: + A = A (Thread main)
Acc: A + B = AB (Thread main)
Acc: AB + C = ABC (Thread main)
Acc: ABC + D = ABCD (Thread main)
Acc: ABCD + E = ABCDE (Thread main)
Acc: ABCDE + F = ABCDEF (Thread main)
Acc: ABCDEF + G = ABCDEFG (Thread main)
ABCDEFG
# ReduceWithCombiner
Acc: + A = A (Thread main)
Acc: A + B = AB (Thread main)
Acc: AB + C = ABC (Thread main)
Acc: ABC + D = ABCD (Thread main)
Acc: ABCD + E = ABCDE (Thread main)
Acc: ABCDE + F = ABCDEF (Thread main)
Acc: ABCDEF + G = ABCDEFG (Thread main)
ABCDEFG
Es wird wie zu erwarten mit dem leeren String als Startwert gestartet und dann nach und nach immer ein weiterer Buchstabe (im zweiten Fall als Character) angefügt. Tatsächlich wird im zweiten Fall die combiner-Operation kein einziges mal aufgerufen. Es tritt einfach kein Fall auf in dem zwei Zwischenergebnisse zusammengefügt werden müssen. Etwas spannender wird es, wenn wir uns einen parallelen Stream anschauen.
# SimpleReduce (parallel)
Acc: + E = E (Thread main)
Acc: + D = D (Thread main)
Acc: D + E = DE (Thread main)
Acc: + G = G (Thread main)
Acc: + A = A (Thread ForkJoinPool.commonPool-worker-2)
Acc: + F = F (Thread main)
Acc: F + G = FG (Thread main)
Acc: + B = B (Thread ForkJoinPool.commonPool-worker-1)
Acc: DE + FG = DEFG (Thread main)
Acc: + C = C (Thread ForkJoinPool.commonPool-worker-3)
Acc: B + C = BC (Thread ForkJoinPool.commonPool-worker-3)
Acc: A + BC = ABC (Thread ForkJoinPool.commonPool-worker-3)
Acc: ABC + DEFG = ABCDEFG (Thread ForkJoinPool.commonPool-worker-3)
ABCDEFG
# ReduceWithCombiner (parallel)
Acc: + E = E (Thread main)
Acc: + A = A (Thread ForkJoinPool.commonPool-worker-4)
Acc: + B = B (Thread ForkJoinPool.commonPool-worker-3)
Acc: + F = F (Thread ForkJoinPool.commonPool-worker-4)
Acc: + D = D (Thread main)
Acc: + C = C (Thread ForkJoinPool.commonPool-worker-1)
Comb: D + E = DE (Thread main)
Comb: B + C = BC (Thread ForkJoinPool.commonPool-worker-1)
Acc: + G = G (Thread ForkJoinPool.commonPool-worker-2)
Comb: A + BC = ABC (Thread ForkJoinPool.commonPool-worker-1)
Comb: F + G = FG (Thread ForkJoinPool.commonPool-worker-2)
Comb: DE + FG = DEFG (Thread ForkJoinPool.commonPool-worker-2)
Comb: ABC + DEFG = ABCDEFG (Thread ForkJoinPool.commonPool-worker-2)
ABCDEFG
Hier sehen wir, dass die Verarbeitung parallel stattfindet und dadurch auch Zwischenergebnisse auftreten, die zusammengefügt werden müssen. Da die String-Concatination eine assoziative Operation ist und der leere String bezüglich der Operation den Identitätswert darstellt, ist das Endergebnis trotzdem korrekt.
Stream-Gatherer fold
Der Stream-Gatherer fold garantiert eine feste Reihenfolge für die Verarbeitung der Elemente, da er als Sequential-Gatherer definiert ist. Auch hier können Ein- uns Ausgabetypen unterschiedlich sein. Beim Aufrufen definieren wir eine Operation um einen Initialwert festzulegen und eine fold-Operation.
<T, R> Gatherer<T, ?, R> fold(Supplier<R> initial, BiFunction<? super R, ? super T, ? extends R> folder)
Auch hier implementieren wir das oben genutzte Beispiel mit Log-Ausgaben.
void fold(Stream<Character> stringStream) {
Optional<String> foldResult = stringStream.gather(Gatherers.fold(() -> "",
(String a, Character b) -> {
IO.println("%s + %s = %s (Thread %s)".formatted(a, b, a + b, Thread.currentThread().getName()));
return a + b;
})
).findFirst();
if (foldResult.isPresent()) {
IO.println(foldResult.get());
} else {
IO.println("fold result not present"); }
}
Anschließend rufen wir die Funktion mit einem sequentiellen und einem parallelen Stream auf.
# Fold
+ A = A (Thread main)
A + B = AB (Thread main)
AB + C = ABC (Thread main)
ABC + D = ABCD (Thread main)
ABCD + E = ABCDE (Thread main)
ABCDE + F = ABCDEF (Thread main)
ABCDEF + G = ABCDEFG (Thread main)
ABCDEFG
# Fold (parallel)
+ A = A (Thread ForkJoinPool.commonPool-worker-1)
A + B = AB (Thread ForkJoinPool.commonPool-worker-2)
AB + C = ABC (Thread ForkJoinPool.commonPool-worker-2)
ABC + D = ABCD (Thread ForkJoinPool.commonPool-worker-2)
ABCD + E = ABCDE (Thread ForkJoinPool.commonPool-worker-2)
ABCDE + F = ABCDEF (Thread ForkJoinPool.commonPool-worker-2)
ABCDEF + G = ABCDEFG (Thread ForkJoinPool.commonPool-worker-2)
ABCDEFG
Hier werden die Elemente auch bei einem parallelen Stream sequentiell verarbeitet, es gibt keine Zwischenergebnisse, die zusammengefügt werden müssen. Das Ergebnis bleibt hier aber weiter dasselbe. Wieso braucht es denn dann eigentlich den Stream-Gatherer fold? Ist es wirklich nur, damit wir später mit einem Stream, der nur ein Element enthält, weiterarbeiten können?
Wozu brauchen wir den Stream-Gatherer fold?
Die Unterschiede und Gemeinsamkeiten haben wir ja gerade schon behandelt. Wer bei seinem Problem mit einem Stream besser weiterarbeiten kann, braucht bei fold keine Zwischenoperation um dahin zu kommen. Weitere Anwendungsfälle sind Probleme, die zwingend sequentiell verarbeitet werden müssen, auch wenn ein Eingabestream parallel definiert ist. Die Assoziativität, die für die accumulator- und combiner-Operationen von reduce vorrausgesetzt wird, ist für die fold-Operation nicht zwingend notwendig. Wenn eine combiner-Operation für ein Problem nicht implementiert werden kann, ist fold die bessere Wahl. Die fold-Operation startet zudem explizit mit einem Initialwert (nicht der Identität). Damit reduce in jedem Fall korrekt funktioniert muss der Wert dort der Identität bezüglich der angegebenen Operationen entsprechen!
Für unser Beispiel oben macht das allerdings alles keinen Unterschied. Deshalb schauen wir uns noch ein weiteres Problem an, bei dem wir einen Stream an Sensorwerten eines CO2-Sensors verarbeiten. Es soll ein gleitender gewichteter Durchschnittswert berechnet werden, bei dem der aktuellste Sensorwert das größte Gewicht erhält und vorherige Sensorwerte nach und nach exponentiell weniger Einfluss auf das Ergebnis haben. Die Rechenvorschrift für einen Integrationsschritt ist relativ simpel, wir gewichten den bisherigen Durchschnittswert mit 0.9 und den neuen Sensorwert mit 1 - 0.9 = 0.1.
avgi = (avgi-1 * 0.9) + (sensori * 0.1)
In Java sieht das dann wie folgt aus, wir nehmen einen initialen Wert von 1000 ppm an.
void weightedAverageReduce(Stream<Double> sensorValues) {
double initialValue = 1000.0;
double weightedAverageSensorValue = sensorValues.reduce(
initialValue,
(currentWeightedAverage, newValue) -> {
IO.println("(%f * 0.9) + (%f * 0.1) = %f".formatted(currentWeightedAverage, newValue, (currentWeightedAverage * 0.9) + (newValue * 0.1)));
return (currentWeightedAverage * 0.9) + (newValue * 0.1);
}
);
IO.println("Weighted average: " + weightedAverageSensorValue + " ppm");
}
void weightedAverageFold(Stream<Double> sensorValues) {
double initialValue = 1000.0;
double weightedAverageSensorValue = sensorValues
.gather(Gatherers.fold(
() -> initialValue,
(currentWeightedAverage, newValue) -> {
IO.println("(%f * 0.9) + (%f * 0.1) = %f".formatted(currentWeightedAverage, newValue, (currentWeightedAverage * 0.9) + (newValue * 0.1)));
return (currentWeightedAverage * 0.9) + (newValue * 0.1);
}
)).findFirst().orElseThrow();
IO.println("Weighted average: " + weightedAverageSensorValue + " ppm");
}
Auch hier können wir die Funktionen wieder mit sequentiellen und parallelen Streams aufrufen und die Ausgaben vergleichen.
# Reduce
(1000.000000 * 0.9) + (500.000000 * 0.1) = 950.000000
(950.000000 * 0.9) + (750.000000 * 0.1) = 930.000000
(930.000000 * 0.9) + (900.000000 * 0.1) = 927.000000
(927.000000 * 0.9) + (1200.000000 * 0.1) = 954.300000
(954.300000 * 0.9) + (1100.000000 * 0.1) = 968.870000
(968.870000 * 0.9) + (950.000000 * 0.1) = 966.983000
Weighted average: 966.9830000000002 ppm
# Reduce (parallel)
(1000.000000 * 0.9) + (1200.000000 * 0.1) = 1020.000000
(1000.000000 * 0.9) + (950.000000 * 0.1) = 995.000000
(1000.000000 * 0.9) + (900.000000 * 0.1) = 990.000000
(1000.000000 * 0.9) + (1100.000000 * 0.1) = 1010.000000
(1000.000000 * 0.9) + (750.000000 * 0.1) = 975.000000
(1000.000000 * 0.9) + (500.000000 * 0.1) = 950.000000
(1010.000000 * 0.9) + (995.000000 * 0.1) = 1008.500000
(975.000000 * 0.9) + (990.000000 * 0.1) = 976.500000
(1020.000000 * 0.9) + (1008.500000 * 0.1) = 1018.850000
(950.000000 * 0.9) + (976.500000 * 0.1) = 952.650000
(952.650000 * 0.9) + (1018.850000 * 0.1) = 959.270000
Weighted average: 959.27 ppm
# Fold
(1000.000000 * 0.9) + (500.000000 * 0.1) = 950.000000
(950.000000 * 0.9) + (750.000000 * 0.1) = 930.000000
(930.000000 * 0.9) + (900.000000 * 0.1) = 927.000000
(927.000000 * 0.9) + (1200.000000 * 0.1) = 954.300000
(954.300000 * 0.9) + (1100.000000 * 0.1) = 968.870000
(968.870000 * 0.9) + (950.000000 * 0.1) = 966.983000
Weighted average: 966.9830000000002 ppm
# Fold (parallel)
(1000.000000 * 0.9) + (500.000000 * 0.1) = 950.000000
(950.000000 * 0.9) + (750.000000 * 0.1) = 930.000000
(930.000000 * 0.9) + (900.000000 * 0.1) = 927.000000
(927.000000 * 0.9) + (1200.000000 * 0.1) = 954.300000
(954.300000 * 0.9) + (1100.000000 * 0.1) = 968.870000
(968.870000 * 0.9) + (950.000000 * 0.1) = 966.983000
Weighted average: 966.9830000000002 ppm
Das Ergebnis der reduce Implementation stimmt im sequentiellen Fall tatsächlich mit dem der fold-Implementation überein. Für einen parallelen Stream weicht das Ergebnis allerdings ab, wenn wir uns die durchgeführten Schritte ansehen wird auch schnell deutlich weshalb. Wir haben bei unserer Implementation zwei Annahmen missachtet die reduce verlangt. Der von uns angegebene Initialwert ist keineswegs die Identität bezüglich der definierten Operation und die Reihenfolge in der die Elemente verarbeitet und verarbeitet werden ist für das richtige Ergebnis entscheidend, dadurch ist auch die Assoziativität der Operation nicht gegeben.
Was heißt das aber jetzt für uns? Müssen wir unsere Implementationen alle auf fold anpassen? Nein, wie wir oben gesehen haben ist auch für reduce ein korrektes Ergebnis für viele Probleme garantiert. Wir sollten uns aber über die Einschränkungen die reduce macht um ein korrektes Ergebnis zu garantieren im Klaren sein und unsere Probleme daraufhin analysieren bevor wir eine Entscheidung treffen. Wenn wir mit dem Ergebnis nicht als Stream weiterarbeiten möchten und unsere Implementation die Annahmen von reduce erfüllt (Identität, Assoziativität, …) kann reduce verwendet werden, insbesondere wenn eine parallele Verarbeitung gewünscht ist. Fold gibt uns die Möglichkeit in Situationen, in denen eine geordnete Verarbeitung notwendig ist oder keine entsprechende Combiner-Funktion implementiert werden kann, eine korrekte Verarbeitung sicherzustellen.
Den hier gezeigten Beispielcode gibt es wie immer auf GitHub zum selbst ausprobieren.







