Spring Batch ist seit Jahren ein weit verbreiteter Ansatz, um robuste und skalierbare Batchprozesse in unseren Projekten zu realisieren. Heute werfen wir einen kleinen Blick auf die Grundlagen.
Bevor wir ein konkretes Beispiel sehen ist es wichtig zu verstehen, aus welchen “Puzzel-Teilen” der Batch Prozess später zusammen gesetzt sein wird:
- Job
- generelle Konfiguration
- referenziert ein oder mehrere Steps
- Step
- eine Phase in einer Jobausführung
- kann aus einer einzelnen Aktion bestehen (Tasklet)
- oder aus Reader/Processor/Writer
- Konfiguration von Listenern, Chunks, Fehlerbehandlungen, Retry-Mechanismen
- JobExecution
- ein eindeutiger Lauf eines Jobs
- eindeutig identifiziert über evtl. Job-Parameter
- StepExecution
- eindeutig innerhalb JobExecution
- Step-/Job-ExcecutionContext
- Context (Map) zur Ablage von Execution-Daten (z.B. welche ID wurde zuletzt geladen, welche Datensätze erfolgreich gespeichert)
- persistent in entsprechender Context Tabelle
Hierbei ist erst einmal wichtig zu verstehen, dass im Standard sowohl ein Job (identifiziert über JobParameter), also auch ein Step innerhalb eines Jobs nach Erfolg nicht noch einmal ausgeführt werden. Im Fehlerfall wird der Job wieder aufgenommen (neue JobExecution) übernimmt dabei aber den Context des fehlgeschlagenen Laufes und es werden nur die Steps ausgeführt, die nicht erfolgreich waren. Dieses Verhalten lässt sich natürlich steuern (allowStartIfComplete(…)). Die Idee ist klar: damit lassen sich lang läufige und stabile Prozesse etablieren, die auch in einem Fehlerfall effizient wieder aufgenommen werden können.
Job – Konfiguration
Schauen wir uns einen einfachen Job an, der über eine Schnittstelle eine Synchronisation von Daten durchführt.
@Bean
public Job products(JobRepository jobRepository, PlatformTransactionManager ptm) {
return new JobBuilder("ImportProducts", jobRepository)
.start(cleanDbStep(jobRepository, ptm, null))
.next(importProductsStep(jobRepository, ptm))
.build();
}
Unter Verwendung des JobBuilder’s definieren wir einen neuen Job, mit einem eindeutigen Namen und deklarieren die einzelnen Schritte, die innerhalb dieses Jobs ablaufen sollen.
Tasklet
@Bean
@JobScope
public Step cleanDbStep(JobRepository jobRepository, PlatformTransactionManager ptm, @Value("#{jobExecution}") JobExecution jobExecution) {
return new StepBuilder("cleanDB", jobRepository)
.tasklet((contribution, chunkContext) -> {
JobExecution last = jobRepository.getLastJobExecution(jobExecution.getJobInstance().getJobName(), jobExecution.getJobParameters());
if(last!=null && !last.getExitStatus().equals(ExitStatus.COMPLETED)){
log.info("Last run was NOT successfully, we keep our data");
} else {
log.info("Last run was successfully, lets clean the database for a fresh copy");
productRepository.deleteAll();
}
return RepeatStatus.FINISHED;
}, ptm)
.allowStartIfComplete(true)
.build();
}
Ein Step der lediglich einen Tasklet (Einzel-Schritt) enthält. Dieser prüft über das jobRepository den Status des letzten Laufes. Sollte dieser erfolgreich durchlaufen sein, führt dieser Step eine Bereinigung der Daten durch. Sollte der vorangegangene Lauf durch einen Fehler beendet worden sein (und damit evtl. Teildatenmengen vorliegen) bleiben die Daten bestehen.
Import-Step
private Step importProductsStep(JobRepository jobRepository, PlatformTransactionManager ptm) {
return new StepBuilder("importProducts", jobRepository)
.<Product, Product>chunk(10, ptm)
.reader(dummyJsonItemReader)
.writer(writeProduct())
.faultTolerant()
.retry(IOException.class)
.retry(RestClientException.class)
.retryLimit(5)
.allowStartIfComplete(true)
.build();
}
Der zweite Schritt besteht aus einem Chunk-basierten Reader und Writer welche den Datenaustausch realisieren = immer 10 Items werden verarbeitet und geschrieben. Zusätzlichen definiert der Step eine faultTolerant, die dafür sorgt, dass bei bestimmten Exceptions ein weiterer Versuch unternommen wird, die Daten zu verarbeiten. Das retryLimit gibt dabei an, wie oft eine solche Wiederholung durchgeführt werden soll, bis der Schritt schließlich als Fehlerhaft markiert wird.
Reader
@Component
@StepScope
public static class DummyJsonItemReader extends AbstractPagingItemReader<Product>{
private final RestTemplate dummyJson;
public DummyJsonItemReader(RestTemplate dummyJson) {
this.dummyJson = dummyJson;
this.setPageSize(100);
}
protected void doReadPage() {
if (this.results == null) {
this.results = new ArrayList<>();
} else {
this.results.clear();
}
String requestURI = UriComponentsBuilder.fromUriString("/product")
.queryParam("limit", 100)
.queryParam("skip", getPage() * 100)
.build()
.toUriString();
logger.info("Process URL " + requestURI);
ProductWrapper products = dummyJson.getForObject(requestURI, ProductWrapper.class);
if (products != null && !products.getProducts().isEmpty()) {
this.results.addAll(products.getProducts());
}
}
}
Unser Reader als Komponente realisiert. Dieser ist für unsere Zwecke abgeleitet von AbstractPagingItemReader, da wir per Rest-API eine Ressource abfragen, die Seitenweise abgefragt werden kann. Der Vorteil: diese Basis-Klasse übernimmt das Wiederaufsetzen in einem Fehlerfall und rekonstruiert den Index des bereits verarbeiteten Items über den ExecutionContext des letzten Laufes. Sollte diese (oder eine der anderen Basisklassen) nicht passen müssen wir derlei Szenarien selber im Kopf behalten und entsprechenden behandeln, indem wir z.B. den StepExecutionContext nutzen:
@Bean
@StepScope
public ItemReader<Product> readProduct(@Value("#{stepExecution}") StepExecution stepExecution) {
return () -> {
BigDecimal lastId= Optional.ofNullable(stepExecution.getExecutionContext().get("lastId")).map(v -> (BigDecimal) v).orElse(BigDecimal.ZERO);
Product product = getNextProductFromWhereEver(lastId);
if(product==null){
return null;
} else {
stepExecution.getExecutionContext().put("lastId", product.getId());
return product;
}
};
}
Writer
private ItemWriter<Product> writeProduct() {
return (products) -> {
productRepository.saveAll(products);
productRepository.flush();
};
}
Ein einfaches Übertragen der Items in unsere Datenbank. Lediglich zu berücksichtigen: der Writer erhält nicht ein Item sondern mehrere Items (Chunk, mit der in der Konfiguration angegeben Größe)
Let’s Start
Um einen Job zu starten existieren nun diverse Methoden. Realisieren ließe sich so etwas automatisch beim Start des Containers, über die Kommandozeile, externe Calls oder per Scheduling. Ein einfaches Beispiel per Schedule sieht bei uns so aus:
@Configuration
@RequiredArgsConstructor
@EnableScheduling
public class BatchScheduler {
final JobLauncher jobLauncher;
final Job products;
@Scheduled(cron = "0 * * * * *")
public void productsJob() throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
jobLauncher.run(products, new JobParameters());
}
Über den JobLauncher starten wir einen Job und geben benötigte Parameter mit. Dabei erinnern wir uns: die Parameter identifizieren eine JobExecution eindeutig. Hier übergeben wir keine Parameter, obwohl der Job dank Scheduled natürlich mehrfach ausgeführt wird. Das erlaubt Spring Batch uns nur, weil unsere oben definieren Steps alle das erneute Ausführen nach erfolgreichem Ende erlauben (.allowStartIfComplete(true)).
Ein Blick zurück
Jeder Job, jede Execution und jeder Context wird von Spring Batch in einer Reihe von Datenbanktabellen abgelegt. So lässt sich gut nachvollziehen, welche Jobs wann mit welchem Ergebnis ausgeführt wurden:
BATCH_JOB_EXECUTION
BATCH_JOB_EXECUTION_CONTEXT
BATCH_JOB_EXECUTION_PARAMS
BATCH_JOB_INSTANCE
BATCH_STEP_EXECUTION
BATCH_STEP_EXECUTION_CONTEXT
Ein Blick nach vorn
Das war ein kurzer und knackiger Einblick in die Welt von Spring Batch. Viele Mechanismen, die wir gestreift haben, lassen sich natürlich tiefer gehen konfigurieren.