GEDOPLAN
JavaJava SE

Stream Gatherers in Java 25

JavaJava SE
faucet 1684902 1280

Stream Gatherers schließen Lücken der klassischen Stream-Operatoren und ermöglichen  deutlich expressivere, aber trotzdem gut lesbare Stream-Verarbeitung.

Mit Stream Gatherern lassen sich eigene, zustandsbehaftete Zwischenoperationen definieren, die Eingabeelemente beliebig puffern, transformieren, aufteilen oder zusammenführen können, z.B. etwa für Windowing,  Batching oder laufende Aggregationen.

Java Streams: Recap

Ein Java Stream besteht aus einer Quelle (source), keiner, einer oder mehreren Zwischenoperationen (intermediate) und einer Abschlussoperation (terminal operation).

Während für Abschlussoperationen mit Collector bereits Erweiterungsmöglichkeiten existieren, waren Zwischenoperationen bisher auf die vordefinierten beschränkt: filter, map, flatMap, distinct, sorted, limit, skip, peek, parallel, sequential.

Fokus: Zwischenoperationen

Als Zwischenoperationen sind diese im Standardangebot:

filter, map/mapToX, flatMap/flatMapToX, distinct, sorted, limit, skip, peek, parallel, sequential

Mit diesen vordefinierten Zwischenoperationen lassen sich bereits viele Fälle abdecken, aber viele fehlen auch, z.B. fold(), distinctBy(), window(), takeEvery(), partititionBy().

Statt das ohnehin schon umfangreiche API weiter aufzublähen, wurde die Möglichkeit hinzugefügt, eigene Zwischenoperationen mit beliebiger Funktionalität selbst zu definieren: Stream Gatherers.

Sie ermöglichen:

  • Transformationen von Elementen
  • Filterung
  • Zustandsbehaftete Operationen
  • Änderung der Stream-Kardinalitäten (1:n, n:1, n:m)

Ein Beispiel zur Verdeutlichung: Daten eines Streams sollen in Gruppen fester Größe zusammengefasst werden.

var result = Stream.iterate(0, i -> i + 1)
     // ... hier in Gruppen zu je vier Elementen zusammenfassen 
     .limit(3)
     .toList();

Die erwartete Ausgabe lautet:
 [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11]]

Um das mit Bordmitteln zu erreichen, muss der Code bisher etwa so aussehen:

var result  =  Stream.iterate(0, i -> i + 1)
          .limit(12)
          .collect(Collectors.groupingBy(i -> i / 4))
          .values().stream()
          .sorted(Comparator.comparingInt(n -> n.getFirst()))
          .limit(3)
          .toList();

Mit dem Stream Gatherer windowFixed(int windowSize) lässt sich das Gleiche mit dem bereits oben angedeuteten Code so umsetzen:

var result = Stream.iterate(0, i -> i + 1)
     .gather(Gatherers.windowFixed(4))
     .limit(3)
     .toList();

Das ist eine deutliche Vereinfachung.

Vordefinierte Gatherers

Natürlich gibt es bereits eine Reihe von vorgefertigten Gatherers. In Java 25 stehen folgende zur Verfügung:

windowFixed(windowSize)

  • Gruppiert Elemente in feste Listen der Größe windowSize; letztes Fenster darf kleiner sein.
var result = Stream.of(1,2,3,4,5,6,7,8)
    .gather(Gatherers.windowFixed(3))
    .toList();

Ausgabe:[[1, 2, 3], [4, 5, 6], [7, 8]]

windowSliding(windowSize)

  • Erzeugt gleitende Fenster fester Größe mit Schrittweite 1 (Überlappung).
var result = Stream.of(1,2,3,4,5,6,7,8)
    .gather(Gatherers.windowSliding(2))
    .toList();

Ausgabe: [[1, 2], [2, 3], [3, 4], [4, 5], [5, 6], [6, 7], [7, 8]]

scan(initial, scanner)

  • Liefert alle  Zwischenergebnisse der Akkumulation.
      Beispiel: 1,2,3 mit Multiplikation → [1, 2, 6]​
var result = Stream.of(1,2,3,4,5,6,7,8,9)
    .gather(Gatherers.scan(() -> 1L, (p, n) -> p * n))
    .toList();

Ausgabe: [1, 2, 6, 24, 120, 720, 5040, 40320, 362880]

fold(initial, folder)
 
Faltet Stream zu einem einzigen Endergebnis (wie reduce, aber zustandsbehaftet und als Zwischenoperation statt Abschlussoperation).
  Beispiel: 1,2,3 mit Multiplikation → Optional[6]​

var result = Stream.of(1,2,3,4,5,6,7,8,9)
    .gather(Gatherers.fold(() -> 1L, (p, n) -> p * n))
    .findFirst();

Ausgabe: Optional[362880]

mapConcurrent(maxThreads, mapper)
 
Wendet Mapper parallel mit Virtual Threads an; begrenzt auf maxThreads.
  Behält Reihenfolge bei, nutzt Concurrency für CPU-intensive Tasks.​

var result = Stream.of(1,2,3,4,5)
    .gather(Gatherers.mapConcurrent(2, n -> n * n))
    .toList();

Ausgabe: [1, 4, 9, 16, 25] (paralleles Mapping mit max. 2 Threads, Reihenfolge bleibt erhalten)

Maßgeschneidert

Wem dieses Angebot noch nicht genügt, der kann sich auch einen Gatherer selbst implementieren. Hier ein Gatherer distinctByName(), der Duplikate basierend auf einem Key-Extractor entfernt:

// Gatherer, der nur Personen mit eindeutigen Namen durchlässt
// (sequentiell, da Reihenfolge wichtig ist)

static Gatherer<Person, ?, Person> distinctByName = Gatherer.ofSequential(
    HashSet::new,  // Initialisiere ein Set zum Speichern bereits gesehener Namen
    (state, person, downstream) -> {
        // Füge den Namen zum Set hinzu
        // add() gibt true zurück, wenn der Name neu ist
        if (state.add(person.name())) {
            // Nur neue Namen werden weitergegeben
            return downstream.push(person);
        }
        // Duplikate werden übersprungen
        return true;
      }
  );

record Person(String name) {}​

Und so wird der Gatherer angewendet:

var result = Stream.of(
        new Person("Alice"),
        new Person("Bob"),
        new Person("Alice"),
        new Person("Charlie"))
    .gather(distinctByName)
    .toList();

Ausgabe: [Person[name=Alice], Person[name=Bob], Person[name=Charlie]]​

Auf diese Weise lässt sich mit wenigen Zeilen eine eigene Zwischenoperation implementieren.

Vorteile und Einsatzgebiete

Mit Stream Gatherers bekommen wir flexible, zustandsbehaftete Zwischenoperationen für Java Streams.

  • sind deutlich ausdrucksstärker als einfaches filter/map/flatMap
  • können Zustand vorhalten, puffern, Verarbeitung abkürzen (analog zu limit())
  • sind wiederverwendbar, wir können z.B. komplexe filter-map-Pipelines in einen Gatherer kapseln
  • können effizient parallel arbeiten

Natürlich gibt es auch Einschränkungen, die wir berücksichtigen sollten:

  • Performance Overhead: Vorhalten und Verwalten des Zustands ist langsamer als zustandslose Operationen
  • Keine Primitives: Keine Unterstützung für IntStream etc.
  • Debugging: Je nach Komplexität des Gatherers können Lesbarkeit und Fehlersuche leiden.

Fazit

Stream Gatherers sind seit Java 22 als Preview-Feature verfügbar und in Java 24 finalisiert. Sie schließen eine wichtige Lücke im Stream API, indem sie es ermöglichen, zustandsbehaftete Zwischenoperationen mit beliebiger Funktionalität zu erstellen.

Mitgelieferte Gatherers decken häufige Anwendungsfälle wie Windowing und Batching ab, während eigene Implementierungen für spezifische Anforderungen in wenigen Zeilen erstellt werden können.

Einsetzen sollte man Gatherers vor allem dann, wenn:

  • Zustandsbehaftete Transformationen nötig sind
  • Stream-Pipelines unübersichtlich werden
  • Standard-Operationen nicht ausreichen

Der Performance-Overhead durch Zustandsverwaltung sollte bei kritischen Anwendungsfällen berücksichtigt werden.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Bitte füllen Sie dieses Feld aus.
Bitte füllen Sie dieses Feld aus.
Bitte gib eine gültige E-Mail-Adresse ein.
Sie müssen den Bedingungen zustimmen, um fortzufahren.

Autor

Diesen Artikel teilen

LinkedIn
Xing

Gibt es noch Fragen?

Fragen beantworten wir sehr gerne! Schreibe uns einfach per Kontaktformular.

Kurse

weitere Blogbeiträge

web 7048124 640 jpg
Spring

Spring Boot – Caching

Caching ist ein essenzielles Thema, wenn es um die Laufzeitoptimierung der eigenen Anwendung geht. Fachlich ist die größte Herausforderung sicherlich…
IT-Training - GEDOPLAN
Jakarta EE (Java EE)

Java Expertenkreis Bielefeld

Das nächste Treffen unseres Java Expertenkreises Bielefeld findet statt am: Termin:       Donnerstag, den 23.08.2012, 18:00-20:00 Uhr Ort:              GEDOPLAN, Stieghorster Str.…

Work Life Balance. Jobs bei Gedoplan

We are looking for you!

Lust bei GEDOPLAN mitzuarbeiten? Wir suchen immer Verstärkung – egal ob Entwickler, Dozent, Trainerberater oder für unser IT-Marketing! Schau doch einfach mal auf unsere Jobseiten! Wir freuen uns auf Dich!

Work Life Balance. Jobs bei Gedoplan

We are looking for you!

Lust bei GEDOPLAN mitzuarbeiten? Wir suchen immer Verstärkung – egal ob Entwickler, Dozent, Trainerberater oder für unser IT-Marketing! Schau doch einfach mal auf unsere Jobseiten! Wir freuen uns auf Dich!