GEDOPLAN

Stream Gatherers schließen Lücken der klassischen Stream-Operatoren und ermöglichen  deutlich expressivere, aber trotzdem gut lesbare Stream-Verarbeitung.

Mit Stream Gatherern lassen sich eigene, zustandsbehaftete Zwischenoperationen definieren, die Eingabeelemente beliebig puffern, transformieren, aufteilen oder zusammenführen können, z.B. etwa für Windowing,  Batching oder laufende Aggregationen.

Java Streams: Recap

Ein Java Stream besteht aus einer Quelle (source), keiner, einer oder mehreren Zwischenoperationen (intermediate) und einer Abschlussoperation (terminal operation).

Während für Abschlussoperationen mit Collector bereits Erweiterungsmöglichkeiten existieren, waren Zwischenoperationen bisher auf die vordefinierten beschränkt: filter, map, flatMap, distinct, sorted, limit, skip, peek, parallel, sequential.

Fokus: Zwischenoperationen

Mit diesen vordefinierten Zwischenoperationen lassen sich bereits viele Fälle abdecken, aber viele fehlen auch, z.B. fold(), distinctBy(), window(), takeEvery(), partititionBy().

Statt das ohnehin schon umfangreiche API weiter aufzublähen, wurde die Möglichkeit hinzugefügt, eigene Zwischenoperationen mit beliebiger Funktionalität selbst zu definieren: Stream Gatherers.

Sie ermöglichen:

Ein Beispiel zur Verdeutlichung: Daten eines Streams sollen in Gruppen fester Größe zusammengefasst werden.

var result = Stream.iterate(0, i -> i + 1)
     // ... hier in Gruppen zu je vier Elementen zusammenfassen 
     .limit(3)
     .toList();

Die erwartete Ausgabe lautet:
 [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]]

Um das mit Bordmitteln zu erreichen, muss der Code bisher etwa so aussehen:

var result  =  Stream.iterate(0, i -> i + 1)
          .limit(12)
          .collect(Collectors.groupingBy(i -> i / 4))
          .values().stream()
          .sorted(Comparator.comparingInt(n -> n.getFirst()))
          .limit(3)
          .toList();

Mit dem Stream Gatherer windowFixed(int windowSize) lässt sich das Gleiche mit dem bereits oben angedeuteten Code so umsetzen:

var result = Stream.iterate(0, i -> i + 1)
     .gather(Gatherers.windowFixed(4))
     .limit(3)
     .toList();

Das ist eine deutliche Vereinfachung.

Vordefinierte Gatherers

Natürlich gibt es bereits eine Reihe von vorgefertigten Gatherers. In Java 25 stehen folgende zur Verfügung:

windowFixed(windowSize)

var result = Stream.of(1,2,3,4,5,6,7,8)
    .gather(Gatherers.windowFixed(3))
    .toList();

Ausgabe:[[1, 2, 3], [4, 5, 6], [7, 8]]

windowSliding(windowSize)

var result = Stream.of(1,2,3,4,5,6,7,8)
    .gather(Gatherers.windowSliding(2))
    .toList();

Ausgabe: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]

scan(initial, scanner)

var result = Stream.of(1,2,3,4,5,6,7,8,9)
    .gather(Gatherers.scan(() -> 1L, (p, n) -> p * n))
    .toList();

Ausgabe: [1, 2, 6, 24, 120, 720, 5040, 40320, 362880]

fold(initial, folder)
 
Faltet Stream zu einem einzigen Endergebnis (wie reduce, aber zustandsbehaftet und als Zwischenoperation statt Abschlussoperation).
  Beispiel: 1,2,3 mit Multiplikation → Optional[6]​

var result = Stream.of(1,2,3,4,5,6,7,8,9)
    .gather(Gatherers.fold(() -> 1L, (p, n) -> p * n))
    .findFirst();

Ausgabe: Optional[362880]

mapConcurrent(maxThreads, mapper)
 
Wendet Mapper parallel mit Virtual Threads an; begrenzt auf maxThreads.
  Behält Reihenfolge bei, nutzt Concurrency für CPU-intensive Tasks.​

var result = Stream.of(1,2,3,4,5)
    .gather(Gatherers.mapConcurrent(2, n -> n * n))
    .toList();

Ausgabe: [1, 4, 9, 16, 25] (paralleles Mapping mit max. 2 Threads, Reihenfolge bleibt erhalten)

Maßgeschneidert

Wem dieses Angebot noch nicht genügt, der kann sich auch einen Gatherer selbst implementieren. Hier ein Gatherer distinctByName(), der Duplikate basierend auf einem Key-Extractor entfernt:

// Gatherer, der nur Personen mit eindeutigen Namen durchlässt
// (sequentiell, da Reihenfolge wichtig ist)

static Gatherer<Person, ?, Person> distinctByName = Gatherer.ofSequential(
    HashSet::new,  // Initialisiere ein Set zum Speichern bereits gesehener Namen
    (state, person, downstream) -> {
        // Füge den Namen zum Set hinzu
        // add() gibt true zurück, wenn der Name neu ist
        if (state.add(person.name())) {
            // Nur neue Namen werden weitergegeben
            return downstream.push(person);
        }
        // Duplikate werden übersprungen
        return true;
      }
  );

record Person(String name) {}​

Und so wird der Gatherer angewendet:

var result = Stream.of(
        new Person("Alice"),
        new Person("Bob"),
        new Person("Alice"),
        new Person("Charlie"))
    .gather(distinctByName)
    .toList();

Ausgabe: [Person[name=Alice], Person[name=Bob], Person[name=Charlie]]​

Auf diese Weise lässt sich mit wenigen Zeilen eine eigene Zwischenoperation implementieren.

Vorteile und Einsatzgebiete

Mit Stream Gatherers bekommen wir flexible, zustandsbehaftete Zwischenoperationen für Java Streams.

Natürlich gibt es auch Einschränkungen, die wir berücksichtigen sollten:

Fazit

Stream Gatherers sind seit Java 22 als Preview-Feature verfügbar und in Java 24 finalisiert. Sie schließen eine wichtige Lücke im Stream API, indem sie es ermöglichen, zustandsbehaftete Zwischenoperationen mit beliebiger Funktionalität zu erstellen.

Mitgelieferte Gatherers decken häufige Anwendungsfälle wie Windowing und Batching ab, während eigene Implementierungen für spezifische Anforderungen in wenigen Zeilen erstellt werden können.

Einsetzen sollte man Gatherers vor allem dann, wenn:

Der Performance-Overhead durch Zustandsverwaltung sollte bei kritischen Anwendungsfällen berücksichtigt werden.