Stream Gatherers schließen Lücken der klassischen Stream-Operatoren und ermöglichen deutlich expressivere, aber trotzdem gut lesbare Stream-Verarbeitung.
Mit Stream Gatherern lassen sich eigene, zustandsbehaftete Zwischenoperationen definieren, die Eingabeelemente beliebig puffern, transformieren, aufteilen oder zusammenführen können, z.B. etwa für Windowing, Batching oder laufende Aggregationen.
Java Streams: Recap
Ein Java Stream besteht aus einer Quelle (source), keiner, einer oder mehreren Zwischenoperationen (intermediate) und einer Abschlussoperation (terminal operation).
Während für Abschlussoperationen mit Collector bereits Erweiterungsmöglichkeiten existieren, waren Zwischenoperationen bisher auf die vordefinierten beschränkt: filter, map, flatMap, distinct, sorted, limit, skip, peek, parallel, sequential.
Fokus: Zwischenoperationen
Als Zwischenoperationen sind diese im Standardangebot:
filter, map/mapToX, flatMap/flatMapToX, distinct, sorted, limit, skip, peek, parallel, sequential
Mit diesen vordefinierten Zwischenoperationen lassen sich bereits viele Fälle abdecken, aber viele fehlen auch, z.B. fold(), distinctBy(), window(), takeEvery(), partititionBy().
Statt das ohnehin schon umfangreiche API weiter aufzublähen, wurde die Möglichkeit hinzugefügt, eigene Zwischenoperationen mit beliebiger Funktionalität selbst zu definieren: Stream Gatherers.
Sie ermöglichen:
- Transformationen von Elementen
- Filterung
- Zustandsbehaftete Operationen
- Änderung der Stream-Kardinalitäten (1:n, n:1, n:m)
Ein Beispiel zur Verdeutlichung: Daten eines Streams sollen in Gruppen fester Größe zusammengefasst werden.
var result = Stream.iterate(0, i -> i + 1)
// ... hier in Gruppen zu je vier Elementen zusammenfassen
.limit(3)
.toList();
Die erwartete Ausgabe lautet:
[[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]]
Um das mit Bordmitteln zu erreichen, muss der Code bisher etwa so aussehen:
var result = Stream.iterate(0, i -> i + 1)
.limit(12)
.collect(Collectors.groupingBy(i -> i / 4))
.values().stream()
.sorted(Comparator.comparingInt(n -> n.getFirst()))
.limit(3)
.toList();
Mit dem Stream Gatherer windowFixed(int windowSize) lässt sich das Gleiche mit dem bereits oben angedeuteten Code so umsetzen:
var result = Stream.iterate(0, i -> i + 1)
.gather(Gatherers.windowFixed(4))
.limit(3)
.toList();
Das ist eine deutliche Vereinfachung.
Vordefinierte Gatherers
Natürlich gibt es bereits eine Reihe von vorgefertigten Gatherers. In Java 25 stehen folgende zur Verfügung:
windowFixed(windowSize)
- Gruppiert Elemente in feste Listen der Größe windowSize; letztes Fenster darf kleiner sein.
var result = Stream.of(1,2,3,4,5,6,7,8)
.gather(Gatherers.windowFixed(3))
.toList();
Ausgabe:[[1, 2, 3], [4, 5, 6], [7, 8]]
windowSliding(windowSize)
- Erzeugt gleitende Fenster fester Größe mit Schrittweite 1 (Überlappung).
var result = Stream.of(1,2,3,4,5,6,7,8)
.gather(Gatherers.windowSliding(2))
.toList();
Ausgabe: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]
scan(initial, scanner)
- Liefert alle Zwischenergebnisse der Akkumulation.
Beispiel: 1,2,3 mit Multiplikation → [1, 2, 6]
var result = Stream.of(1,2,3,4,5,6,7,8,9)
.gather(Gatherers.scan(() -> 1L, (p, n) -> p * n))
.toList();
Ausgabe: [1, 2, 6, 24, 120, 720, 5040, 40320, 362880]
fold(initial, folder)
Faltet Stream zu einem einzigen Endergebnis (wie reduce, aber zustandsbehaftet und als Zwischenoperation statt Abschlussoperation).
Beispiel: 1,2,3 mit Multiplikation → Optional[6]
var result = Stream.of(1,2,3,4,5,6,7,8,9)
.gather(Gatherers.fold(() -> 1L, (p, n) -> p * n))
.findFirst();
Ausgabe: Optional[362880]
mapConcurrent(maxThreads, mapper)
Wendet Mapper parallel mit Virtual Threads an; begrenzt auf maxThreads.
Behält Reihenfolge bei, nutzt Concurrency für CPU-intensive Tasks.
var result = Stream.of(1,2,3,4,5)
.gather(Gatherers.mapConcurrent(2, n -> n * n))
.toList();
Ausgabe: [1, 4, 9, 16, 25] (paralleles Mapping mit max. 2 Threads, Reihenfolge bleibt erhalten)
Maßgeschneidert
Wem dieses Angebot noch nicht genügt, der kann sich auch einen Gatherer selbst implementieren. Hier ein Gatherer distinctByName(), der Duplikate basierend auf einem Key-Extractor entfernt:
// Gatherer, der nur Personen mit eindeutigen Namen durchlässt
// (sequentiell, da Reihenfolge wichtig ist)
static Gatherer<Person, ?, Person> distinctByName = Gatherer.ofSequential(
HashSet::new, // Initialisiere ein Set zum Speichern bereits gesehener Namen
(state, person, downstream) -> {
// Füge den Namen zum Set hinzu
// add() gibt true zurück, wenn der Name neu ist
if (state.add(person.name())) {
// Nur neue Namen werden weitergegeben
return downstream.push(person);
}
// Duplikate werden übersprungen
return true;
}
);
record Person(String name) {}
Und so wird der Gatherer angewendet:
var result = Stream.of(
new Person("Alice"),
new Person("Bob"),
new Person("Alice"),
new Person("Charlie"))
.gather(distinctByName)
.toList();
Ausgabe: [Person[name=Alice], Person[name=Bob], Person[name=Charlie]]
Auf diese Weise lässt sich mit wenigen Zeilen eine eigene Zwischenoperation implementieren.
Vorteile und Einsatzgebiete
Mit Stream Gatherers bekommen wir flexible, zustandsbehaftete Zwischenoperationen für Java Streams.
- sind deutlich ausdrucksstärker als einfaches filter/map/flatMap
- können Zustand vorhalten, puffern, Verarbeitung abkürzen (analog zu limit())
- sind wiederverwendbar, wir können z.B. komplexe filter-map-Pipelines in einen Gatherer kapseln
- können effizient parallel arbeiten
Natürlich gibt es auch Einschränkungen, die wir berücksichtigen sollten:
- Performance Overhead: Vorhalten und Verwalten des Zustands ist langsamer als zustandslose Operationen
- Keine Primitives: Keine Unterstützung für IntStream etc.
- Debugging: Je nach Komplexität des Gatherers können Lesbarkeit und Fehlersuche leiden.
Fazit
Stream Gatherers sind seit Java 22 als Preview-Feature verfügbar und in Java 24 finalisiert. Sie schließen eine wichtige Lücke im Stream API, indem sie es ermöglichen, zustandsbehaftete Zwischenoperationen mit beliebiger Funktionalität zu erstellen.
Mitgelieferte Gatherers decken häufige Anwendungsfälle wie Windowing und Batching ab, während eigene Implementierungen für spezifische Anforderungen in wenigen Zeilen erstellt werden können.
Einsetzen sollte man Gatherers vor allem dann, wenn:
- Zustandsbehaftete Transformationen nötig sind
- Stream-Pipelines unübersichtlich werden
- Standard-Operationen nicht ausreichen
Der Performance-Overhead durch Zustandsverwaltung sollte bei kritischen Anwendungsfällen berücksichtigt werden.






