GEDOPLAN
Spring

Spring Batch

Spring
communication 3671504 640 jpg

Spring Batch ist seit Jahren ein weit verbreiteter Ansatz, um robuste und skalierbare Batchprozesse in unseren Projekten zu realisieren. Heute werfen wir einen kleinen Blick auf die Grundlagen.

Bevor wir ein konkretes Beispiel sehen ist es wichtig zu verstehen, aus welchen “Puzzel-Teilen” der Batch Prozess später zusammen gesetzt sein wird:

  • Job
    • generelle Konfiguration
    • referenziert ein oder mehrere Steps
  • Step
    • eine Phase in einer Jobausführung
    • kann aus einer einzelnen Aktion bestehen (Tasklet)
    • oder aus Reader/Processor/Writer
    • Konfiguration von Listenern, Chunks, Fehlerbehandlungen, Retry-Mechanismen
  • JobExecution
    • ein eindeutiger Lauf eines Jobs
    • eindeutig identifiziert über evtl. Job-Parameter
  • StepExecution
    • eindeutig innerhalb JobExecution
  • Step-/Job-ExcecutionContext
    • Context (Map) zur Ablage von Execution-Daten (z.B. welche ID wurde zuletzt geladen, welche Datensätze erfolgreich gespeichert)
    • persistent in entsprechender Context Tabelle

Hierbei ist erst einmal wichtig zu verstehen, dass im Standard sowohl ein Job (identifiziert über JobParameter), also auch ein Step innerhalb eines Jobs nach Erfolg nicht noch einmal ausgeführt werden. Im Fehlerfall wird der Job wieder aufgenommen (neue JobExecution) übernimmt dabei aber den Context des fehlgeschlagenen Laufes und es werden nur die Steps ausgeführt, die nicht erfolgreich waren. Dieses Verhalten lässt sich natürlich steuern (allowStartIfComplete(…)). Die Idee ist klar: damit lassen sich lang läufige und stabile Prozesse etablieren, die auch in einem Fehlerfall effizient wieder aufgenommen werden können.

Job – Konfiguration

Schauen wir uns einen einfachen Job an, der über eine Schnittstelle eine Synchronisation von Daten durchführt.

    @Bean
    public Job products(JobRepository jobRepository, PlatformTransactionManager ptm) {
        return new JobBuilder("ImportProducts", jobRepository)
                .start(cleanDbStep(jobRepository, ptm, null))
                .next(importProductsStep(jobRepository, ptm))
                .build();
    }

Unter Verwendung des JobBuilder’s definieren wir einen neuen Job, mit einem eindeutigen Namen und deklarieren die einzelnen Schritte, die innerhalb dieses Jobs ablaufen sollen.

Tasklet

    @Bean
    @JobScope
    public Step cleanDbStep(JobRepository jobRepository, PlatformTransactionManager ptm, @Value("#{jobExecution}") JobExecution jobExecution) {
        return new StepBuilder("cleanDB", jobRepository)
                .tasklet((contribution, chunkContext) -> {
                    JobExecution last = jobRepository.getLastJobExecution(jobExecution.getJobInstance().getJobName(), jobExecution.getJobParameters());
                    if(last!=null && !last.getExitStatus().equals(ExitStatus.COMPLETED)){
                        log.info("Last run was NOT successfully, we keep our data");
                    } else {
                        log.info("Last run was successfully, lets clean the database for a fresh copy");
                        productRepository.deleteAll();
                    }
                        return RepeatStatus.FINISHED;
                }, ptm)
                .allowStartIfComplete(true)
                .build();
    }

Ein Step der lediglich einen Tasklet (Einzel-Schritt) enthält. Dieser prüft über das jobRepository den Status des letzten Laufes. Sollte dieser erfolgreich durchlaufen sein, führt dieser Step eine Bereinigung der Daten durch. Sollte der vorangegangene Lauf durch einen Fehler beendet worden sein (und damit evtl. Teildatenmengen vorliegen) bleiben die Daten bestehen.

Import-Step

    private Step importProductsStep(JobRepository jobRepository, PlatformTransactionManager ptm) {
        return new StepBuilder("importProducts", jobRepository)
                .<Product, Product>chunk(10, ptm)
                .reader(dummyJsonItemReader)
                .writer(writeProduct())
                .faultTolerant()
                .retry(IOException.class)
                .retry(RestClientException.class)
                .retryLimit(5)
                .allowStartIfComplete(true)
                .build();
    }

Der zweite Schritt besteht aus einem Chunk-basierten Reader und Writer welche den Datenaustausch realisieren = immer 10 Items werden verarbeitet und geschrieben. Zusätzlichen definiert der Step eine faultTolerant, die dafür sorgt, dass bei bestimmten Exceptions ein weiterer Versuch unternommen wird, die Daten zu verarbeiten. Das retryLimit gibt dabei an, wie oft eine solche Wiederholung durchgeführt werden soll, bis der Schritt schließlich als Fehlerhaft markiert wird.

Reader

   @Component
    @StepScope
    public static class DummyJsonItemReader extends  AbstractPagingItemReader<Product>{

        private final RestTemplate dummyJson;

        public DummyJsonItemReader(RestTemplate dummyJson) {
            this.dummyJson = dummyJson;
            this.setPageSize(100);
        }

        protected void doReadPage() {
            if (this.results == null) {
                this.results = new ArrayList<>();
            } else {
                this.results.clear();
            }

            String requestURI = UriComponentsBuilder.fromUriString("/product")
                    .queryParam("limit", 100)
                    .queryParam("skip", getPage() * 100)
                    .build()
                    .toUriString();

            logger.info("Process URL " + requestURI);
            ProductWrapper products = dummyJson.getForObject(requestURI, ProductWrapper.class);

            if (products != null && !products.getProducts().isEmpty()) {
                this.results.addAll(products.getProducts());
            }
        }
    }

Unser Reader als Komponente realisiert. Dieser ist für unsere Zwecke abgeleitet von AbstractPagingItemReader, da wir per Rest-API eine Ressource abfragen, die Seitenweise abgefragt werden kann. Der Vorteil: diese Basis-Klasse übernimmt das Wiederaufsetzen in einem Fehlerfall und rekonstruiert den Index des bereits verarbeiteten Items über den ExecutionContext des letzten Laufes. Sollte diese (oder eine der anderen Basisklassen) nicht passen müssen wir derlei Szenarien selber im Kopf behalten und entsprechenden behandeln, indem wir z.B. den StepExecutionContext nutzen:

    @Bean
    @StepScope
    public ItemReader<Product> readProduct(@Value("#{stepExecution}") StepExecution stepExecution) {
        return () -> {
            BigDecimal lastId= Optional.ofNullable(stepExecution.getExecutionContext().get("lastId")).map(v -> (BigDecimal) v).orElse(BigDecimal.ZERO);
            Product product = getNextProductFromWhereEver(lastId);
            if(product==null){ 
                return null;
            } else {
              stepExecution.getExecutionContext().put("lastId", product.getId());
              return product;
            }
        };
    }

Writer

    private ItemWriter<Product> writeProduct() {
        return (products) -> {
            productRepository.saveAll(products);
            productRepository.flush();
        };
    }

Ein einfaches Übertragen der Items in unsere Datenbank. Lediglich zu berücksichtigen: der Writer erhält nicht ein Item sondern mehrere Items (Chunk, mit der in der Konfiguration angegeben Größe)

Let’s Start

Um einen Job zu starten existieren nun diverse Methoden. Realisieren ließe sich so etwas automatisch beim Start des Containers, über die Kommandozeile, externe Calls oder per Scheduling. Ein einfaches Beispiel per Schedule sieht bei uns so aus:

@Configuration
@RequiredArgsConstructor
@EnableScheduling
public class BatchScheduler {

    final JobLauncher jobLauncher;

    final Job products;
    
   @Scheduled(cron = "0 * * * * *")
    public void productsJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
        jobLauncher.run(products, new JobParameters());
    }

Über den JobLauncher starten wir einen Job und geben benötigte Parameter mit. Dabei erinnern wir uns: die Parameter identifizieren eine JobExecution eindeutig. Hier übergeben wir keine Parameter, obwohl der Job dank Scheduled natürlich mehrfach ausgeführt wird. Das erlaubt Spring Batch uns nur, weil unsere oben definieren Steps alle das erneute Ausführen nach erfolgreichem Ende erlauben (.allowStartIfComplete(true)).

Ein Blick zurück

Jeder Job, jede Execution und jeder Context wird von Spring Batch in einer Reihe von Datenbanktabellen abgelegt. So lässt sich gut nachvollziehen, welche Jobs wann mit welchem Ergebnis ausgeführt wurden:

BATCH_JOB_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_INSTANCE
BATCH_STEP_EXECUTION
BATCH_STEP_EXECUTION_CONTEXT

Ein Blick nach vorn

Das war ein kurzer und knackiger Einblick in die Welt von Spring Batch. Viele Mechanismen, die wir gestreift haben, lassen sich natürlich tiefer gehen konfigurieren.

Github. Bunt. Und in Farbe.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert

Bitte füllen Sie dieses Feld aus.
Bitte füllen Sie dieses Feld aus.
Bitte gib eine gültige E-Mail-Adresse ein.
Sie müssen den Bedingungen zustimmen, um fortzufahren.

Autor

Diesen Artikel teilen

LinkedIn
Xing

Gibt es noch Fragen?

Fragen beantworten wir sehr gerne! Schreibe uns einfach per Kontaktformular.

Kurse

weitere Blogbeiträge

2021 02 09 12 45 23 neues dokument 1 inkscape
Webprogrammierung

Angular E2E mit json-server

Ein E2E-Test dient dazu eine Anwendung “von Vorne bis Hinten” durchzutesten. Dabei sind viele Hürden zu meistern, angefangen von erreichbarer…
IT-Training - GEDOPLAN
Jakarta EE (Java EE)

Unenhanced Classes in OpenJPA

Persistence Provider müssen die persistenten Klassen für die Laufzeit um (providerspezifischen) Code erweitern, um bspw. Dirty Checks durchführen zu können…

Work Life Balance. Jobs bei Gedoplan

We are looking for you!

Lust bei GEDOPLAN mitzuarbeiten? Wir suchen immer Verstärkung – egal ob Entwickler, Dozent, Trainerberater oder für unser IT-Marketing! Schau doch einfach mal auf unsere Jobseiten! Wir freuen uns auf Dich!

Work Life Balance. Jobs bei Gedoplan

We are looking for you!

Lust bei GEDOPLAN mitzuarbeiten? Wir suchen immer Verstärkung – egal ob Entwickler, Dozent, Trainerberater oder für unser IT-Marketing! Schau doch einfach mal auf unsere Jobseiten! Wir freuen uns auf Dich!