Spring Batch Step Configuration

SpringBatch Docs 보며 공부 - Step Configuration
정찬's avatar
Jun 03, 2025
Spring Batch Step Configuration
 

개요

 
Spring Batch 공식문서 공부 3번째 포스팅이다.
이번 포스팅에서는 Spring Batch에서 Step을 구성하는 방법을 알아보겠다.
 

Chunk-oriented Processing

 
Spring Batch의 Step은 Chunk 지향 처리 방식을 사용한다.
Chunk 지향 처리는 데이터를 한 번에 하나씩 읽고 트랜잭션 경계 내에서 기록되는 Chunk를 생성한다.

Cofiguring a Step

 
단계는 다음과 같은 방식으로 구성할 수 있다.
@Bean public Step sampleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("sampleStep", jobRepository) .<String, String>chunk(10, transactionManager) .reader(itemReader()) .writer(itemWriter()) .build(); }
 
위와 같은 방식으로 StepBuilder를 사용해서 단계를 구성한다.
위처럼 단계에서 사용되는 ItemReader, ItemWriter는 필수 구성 요소이지만, ItemProcessor는 선택적으로 구성할 수 있다.
 
또한 Chunk를 구성할 때는 입력, 출력 단위와 한 Chunk의 갯수, transactionManager를 설정한다. 위 코드는 입력 타입 String, 출력 타입 String, 한 청크는 10개의 아이템으로 구성된다. 따라서 한 번의 트랜잭션 범위 안에서 처리할 아이템의 개수는 10개로 구성된다.
 
Step의 처리가 시작될 때 트랜잭션이 시작된다.
또한 read가 호출될때마다 카운터가 증가하고, 카운터가 아이템의 개수 (위의 예시에는 10)에 도달하면 집계된 항목 목록들이 전달되고 ItemWriter 트랜잭션이 커밋된다.
 

Configuring a Step for Restart

 
Restart는 Step에 많은 영향을 미치므로 특정 구성이 필요할 수 있다.
 

Setting a Start Limit

 
Step의 시작 횟수를 제어해야 하는 상황이 존재한다. StartLimit을 설정하는 방법은 다음과 같다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(10, transactionManager) .reader(itemReader()) .writer(itemWriter()) .startLimit(1) .build(); }
 
예를 들어, 데이터 중복/오염 발생 위험 때문에 해당 Step이 한번만 실행되어야 하는 경우, 위의 설정처럼 startLimit를 1로 설정할 수 있다.
 
이 경우에 만약 실행 횟수가 1을 초과했을때 StartLimitExceededException 이 던져진다.
startLimit의 기본값은 int의 최댓값으로 설정된다.

Restarting a Completed Step

 
재시작 가능한 작업의 경우, 처음 성공 여부와 관계없이 항상 실행되어야 하는 단계가 있을 수 있다. 예를 들어, 유효성 검사 단계나 Step 처리 전 리소스 정리 단계가 있다. 이 경우에는 아래와 같이 구성할 수 있다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(10, transactionManager) .reader(itemReader()) .writer(itemWriter()) .allowStartIfComplete(true) .build(); }
 

Configuring Skip Logic

 
처리 중 발생한 오류가 실패로 이어지지 않고 건너뛰어야 하는 경우가 많다. 예를 들어, 재무 데이터는 송금으로 이어지기 때문에 건너뛸 수 없으며, 송금은 정확하게 정확해야 한다. 반면 공급업체 목록을 로드하는 경우에는 건너뛸 수 있다. 이처럼 문제가 없는 경우에는 Step을 스킵할 수 있다. 방법은 아래와 같다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(10, transactionManager) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(FlatFileParseException.class) .build(); }
 
위의 설정을 보면 FlatFileParseException 이 발생하면 해당 Step은 건너뛰어지고 총 건너뛰기 제한은 10으로 설정된다. (건너뛰기 제한의 기본 설정은 10이다. 위 설정은 명시적으로 설정한 것임)
 
만약 특정 예외가 발생했을 때 스킵을 제외하고 싶다면 다음과 같이 설정하면 된다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(10, transactionManager) .reader(flatFileItemReader()) .writer(itemWriter()) .faultTolerant() .skipLimit(10) .skip(Exception.class) .noSkip(FileNotFoundException.class) .build(); }
 
위의 예시는 noSkip 속성으로 FileNotFoundException을 지정했다. Exception이 발생하면 skip하지만, FileNotFoundException이 발생했으면 skip하지 않는다. 이처럼 skip을 구체화 할 때 사용한다. (skip이 없이 noSkip 단독으로 사용되지 않는다)
 

Configuring Retry logic

 
대부분의 경우 예외가 건너뛰기 또는 Step실패를 유발하기를 원한다. 그러나 모든 예외가 결정적인 것은 아니다.
만약 동시성제어의 잠금에서 예외가 발생했을 경우, 기다렸다가 다시 시도하면 성공할 수 있다.
 
재시도 설정은 다음과 같이 구성하면 된다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(2, transactionManager) .reader(itemReader()) .writer(itemWriter()) .faultTolerant() .retryLimit(3) .retry(DeadlockLoserDataAccessException.class) .build(); }
 
위 설정은 DeadlockLoserDataAccessException이 발생했을 때 재시도를 시도하며, 3번까지 재시도가 가능하다.
 

Controlling Rollback

 
롤백은 다음과 같이 제어할 수 있다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(2, transactionManager) .reader(itemReader()) .writer(itemWriter()) .faultTolerant() .noRollback(ValidationException.class) .build(); }
 
 

Transactional Readers

 
Spring Batch에서 ItemReader가 트랜잭션 경계 안에서 실행되는지 여부를 제어하는 설정이다.
 
공식문서만 봐서는 이 설정이 왜 필요한지 충분한 설명이 없어서 추가하자면, 이 설정은 롤백 시 Reader의 작업도 무효화하여 재시도 안정성을 확보하는데 의의가 있다.
 
다음과 같이 설정할 수 있다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(2, transactionManager) .reader(itemReader()) .writer(itemWriter()) .readerIsTransactionalQueue() .build(); }
 
readerIsTransactionalQueue 설정은 Step의 청크 처리에서 ItemReader를 트랜잭션 내에서 실행되도록 지정한다. 기본값은 false이며, 이 설정을 사용하면 청크 트랜잭션이 시작된 이후에 Reader가 실행되도록 보장한다.
 

Transaction Attributes

 
트랜잭션 속성을 설정하는 방법은 다음과 같다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { DefaultTransactionAttribute attribute = new DefaultTransactionAttribute(); attribute.setPropagationBehavior(Propagation.REQUIRED.value()); attribute.setIsolationLevel(Isolation.DEFAULT.value()); attribute.setTimeout(30); return new StepBuilder("step1", jobRepository) .<String, String>chunk(2, transactionManager) .reader(itemReader()) .writer(itemWriter()) .transactionAttribute(attribute) .build(); }
 

Registering ItemStream with a  Step

 
Spring Batch에서 ItemStream을 Step에 등록하는 것은 재시작 가능한 배치 처리를 위해 매우 중요하다. ItemStreamopen(), update(), close() 메서드를 통해 실행 상태를 관리하며, 실패 시 재시작에 필요한 상태 정보를 제공한다.
 

ItemStream

 
ItemStream 인터페이스는 배치 처리 중 실행 상태를 유지하고 복원하는 데 사용된다. ItemReader, ItemWriter, ItemProcessor 중 일부는 이 인터페이스를 구현하여 자체적으로 상태를 관리한다.
예를 들어, FlatFileItemReader는 현재 읽고 있는 라인 번호를 저장하여 실패 후 재시작 시 해당 위치부터 다시 읽을 수 있다.
 
  • 자동 등록: ItemReader, ItemWriter, ItemProcessor가 직접 ItemStream을 구현한 경우, Spring Batch는 이를 자동으로 인식하고 등록한다.
  • 수동 등록 필요: 그러나 CompositeItemWriter와 같이 내부에 여러 ItemWriter를 위임(delegation)하는 구조에서는, 위임된 구성 요소들이 ItemStream을 구현하더라도 자동으로 등록되지 않는다. 이 경우, 각 구성 요소를 명시적으로 Step에 등록해야 한다.
 
 
ItemStream 등록은 아래와 같이 설정하면 된다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(2, transactionManager) .reader(itemReader()) .writer(compositeItemWriter()) .stream(fileItemWriter1()) // 수동 등록 .stream(fileItemWriter2()) // 수동 등록 .build(); } @Bean public CompositeItemWriter<String> compositeItemWriter() { CompositeItemWriter<String> writer = new CompositeItemWriter<>(); writer.setDelegates(Arrays.asList(fileItemWriter1(), fileItemWriter2())); return writer; }
 

Intercepting Step Execution

 
Spring Batch에서 Step 실행을 가로채는 기능은 배치 처리의 다양한 단계에서 사용자 정의 로직을 삽입할 수 있도록 도와준다. 이를 통해 로깅, 모니터링, 오류 처리, 상태 저장 등의 작업을 유연하게 수행할 수 있다.
 
리스너는 아래와 같이 등록할 수 있다.
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .<String, String>chunk(10, transactionManager) .reader(reader()) .writer(writer()) .listener(chunkListener()) .build(); }
 
다음은 주요 Listener 인터페이스와 기능을 알아보겠다.

StepExecutionListener

 
public interface StepExecutionListener extends StepListener { void beforeStep(StepExecution stepExecution); ExitStatus afterStep(StepExecution stepExecution); }
 
Step 시작 전과 종료 후에 로직을 실행한다. afterStepExitStatus를 반환하여 Step의 종료 상태를 조정할 수 있다.
 

ChunkListner

 
public interface ChunkListener extends StepListener { void beforeChunk(ChunkContext context); void afterChunk(ChunkContext context); void afterChunkError(ChunkContext context); }
 
트랜잭션 내에서 청크 처리의 시작과 종료 시점에 로직을 실행한다. 예를 들어, 청크 처리 전 초기화 작업이나 청크 처리 후 정리 작업을 수행할 수 있다.
 

ItemReadListener<T>

 
public interface ItemReadListener<T> extends StepListener { void beforeRead(); void afterRead(T item); void onReadError(Exception ex); }
 
아이템 읽기 전후 및 읽기 오류 발생 시 로직을 실행한다. 예를 들어, 읽기 오류 발생 시 로그를 남기거나 오류를 처리할 수 있다.
 

ItemProcessListener<T, S>

 
public interface ItemProcessListener<T, S> extends StepListener { void beforeProcess(T item); void afterProcess(T item, S result); void onProcessError(T item, Exception e); }
 
아이템 처리 전후 및 처리 오류 발생 시 로직을 실행한다. 예를 들어, 처리된 결과를 검증하거나 오류를 처리할 수 있다.
 

ItemWriteListener<S>

 
public interface ItemWriteListener<S> extends StepListener { void beforeWrite(List<? extends S> items); void afterWrite(List<? extends S> items); void onWriteError(Exception exception, List<? extends S> items); }
 
아이템 쓰기 전후 및 쓰기 오류 발생 시 로직을 실행한다. 예를 들어, 쓰기 전 데이터 검증이나 쓰기 오류 처리 등을 수행할 수 있다.
 

SkipListener<T, S>

 
public interface SkipListener<T,S> extends StepListener { void onSkipInRead(Throwable t); void onSkipInProcess(T item, Throwable t); void onSkipInWrite(S item, Throwable t); }
 
읽기, 처리, 쓰기 단계에서 스킵된 아이템에 대한 로직을 실행한다. 예를 들어, 스킵된 아이템을 로그에 기록하거나 별도로 저장할 수 있다.
 

TaskletStep

 
Step을 처리할 때 Chunk지향 처리 방식 외에도 Tasklet 기반으로 처리할 수 있다.
 
Takslet은 Chunk방식과 다르게 매우 단순한 형태로, 하나의 작업을 수행할 때 사용된다. Tasklet은 Chunk방식에 비해, 구현이 간단하고 직관적이다. 따라서 복잡한 Reader, Processor, Writer 구성을 필요로하지 않는다는 장점이 있다.
 
TaskletStep 설정 방법은 다음과 같다
 
@Bean public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new StepBuilder("step1", jobRepository) .tasklet(myTasklet(), transactionManager) .build(); }
 
StepBuilder의 tasklet() 메서드에 Tasklet 구현체를 전달해야 한다.
 

Tasklet

 
많은 배치 작업에는 주요 처리가 시작되기 전에 다양한 리소스를 설정하거나 처리가 완료된 후 해당 리소스를 정리하기 위해 수행해야 하는 단계가 포함된다.
 
Tasklet의 구성 예시는 다음과 같다.
 
public class FileDeletingTasklet implements Tasklet, InitializingBean { private Resource directory; public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { File dir = directory.getFile(); Assert.state(dir.isDirectory(), "The resource must be a directory"); File[] files = dir.listFiles(); for (int i = 0; i < files.length; i++) { boolean deleted = files[i].delete(); if (!deleted) { throw new UnexpectedJobExecutionException("Could not delete file " + files[i].getPath()); } } return RepeatStatus.FINISHED; } public void setDirectoryResource(Resource directory) { this.directory = directory; } public void afterPropertiesSet() throws Exception { Assert.state(directory != null, "Directory must be set"); } }
 
주어진 디렉토리 내의 모든 파일을 삭제하는 Tasklet의 예시이다.
각 execute의 호출은 트랜잭션으로 래핑된다.
또한 execute은 RepeatStatus를 리턴하는데, 이 상태값에 따라 계속 반복해서 작업을 수행할지, Step을 종료할지 판단한다.
RepeatStatus의 종류는 2가지이다
  • RepeatStatus.FINISHED
  • RepeatStatus.CONTINUABLE
 

TaskletAdapter

 
Tasklet 기반 Step을 구성할 때 반드시 Tasklet 구현체를 작성해야 하는건 아니다. TaskletAdapter는 기존에 존재하는 메서드를 Tasklet으로 래핑하여 사용할 수 있게 도와준다. 예를 들어, 기존 DAO 메서드를 Tasklet으로 사용하려는 경우 함수를 재사용할 수 있게 해준다.
 
TaksletAdapter를 사용하여 Takslet을 구성하는 방법은 다음과 같다.
 
@Bean public Tasklet taskletAdapter() { MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter(); adapter.setTargetObject(myService); adapter.setTargetMethod("performTask"); return adapter; }
 

Controlling Step Flow

 
Step을 구성할 때, 다양한 흐름이 있다.
각 흐름을 제어하기 위한 설정 방법을 알아보자.
 

Sequential Flow

 
가장 간단한 순차적인 흐름 시나리오이다.
notion image
 
순차적인 흐름 시나리오는 .next()를 사용하여 구성할 수 있다.
 
@Bean public Job job(JobRepository jobRepository, Step stepA, Step stepB, Step stepC) { return new JobBuilder("job", jobRepository) .start(stepA) .next(stepB) .next(stepC) .build(); }
 
위 설정은 stepA → stepB → stepC 순서로 실행되는 순차적 흐름 설정이다.
 

Conditional Flow

 
분기적인 흐름을 구성할 수도 있다.
 
notion image
 
위는 성공 여부에 따라 StepB, StepC로의 흐름을 나타내는 흐름도이다. 분기적인 흐름을 구성하는건 다음과 같이 하면 된다.
 
@Bean public Job job(JobRepository jobRepository, Step stepA, Step stepB, Step stepC) { return new JobBuilder("job", jobRepository) .start(stepA) .on("*").to(stepB) .from(stepA).on("FAILED").to(stepC) .end() .build(); }
 
on() 방식은 간단한 패턴 매칭 방식을 사용하여 Step의 실행 결과인 ExitStatus와 매치한다.
패턴에는 특수 문자 두 개만 허용한다.
  • * : 0개 이상의 문자와 일치
  • ? : 정확히 한 문자와 일치
 
예를 들어, c*t 는 cat과 count와 일치하고, c?t는 cat과 일치하지만, count와는 일치하지 않는다.
 
주의할 점
  • 하나의 Step에 여러 개의 분기를 지정 할 수 있으며, 그 수에는 제한이 없다.
  • 하지만 Step 실행 결과로 반환된 ExitStatus가 지정된 어떤 조건에도 일치하지 않는 경우, SpringBatch는 예외를 던지고 전체 Job이 실패하게 된다.
  • 분기 조건은 프레임워크에서 가장 구체적인 것부터, 덜 구체적인 순으로 정렬한다. 이 말은 즉슨, 분기 순서가 코드 상 먼저인지 여부와 상관없이, 더 정확히 일치하는 조건에 대해 결정된다.
  • 분기를 설정할 때는, BatchStatusExitStatus 의 차이점을 정확하게 이해하는 것이 중요하다. 이는 바로 뒤에서 설명하겠다.
 

BatchStatus vs ExitStatus

 
SpringBatch에서 조건 분기를 구현할 때 혼동을 피하고 의도한 흐름을 정확하게 구현하기 위해 위 두 개념을 반드시 이해하고 구분해야 한다.
 
구분
BatchStatus
ExitStatus
의미
Step/Job의 실행 상태 (프레임워크 내부에서 사용)
Step/Job이 종료될 때 반환되는 사용자 정의 가능한 값
타입
Enum (COMPLETED, FAILED, 등)
문자열 ("COMPLETED", "FAILED", "SKIPPED", 등)
용도
Spring Batch 내부 로직/상태 추적
조건 분기 및 흐름 제어에 사용
커스터마이징
거의 불가
가능 (stepExecution.setExitStatus(new ExitStatus("SKIPPED")))
조건 분기 on(), to()는 ExitStatus 기준이다. BatchStatus 기준이 아니다. 예를 들어, ExitStatus가 FAILED 여도, BatchStatus가 COMPLETED일 수 있다는 것을 유의하자!
 

Configuring for Stop

 
비즈니스 요구사항에 따라 Job의 흐름을 중단하거나 종료할 수 있다.
 
Job의 흐름을 중단하는 방법
  • end : Job을 성공적으로 종료한다. BatchStatus와 ExitStatus는 기본적으로 COMPLETED로 설정된다.
 
예시는 다음과 같다.
@Bean public Job job(JobRepository jobRepository, Step step1, Step step2, Step step3) { return new JobBuilder("job", jobRepository) .start(step1) .next(step2) .on("FAILED").end() .from(step2).on("*").to(step3) .end() .build(); }
 
위 설정은 Step2가 실패했을 시 종료하는 설정이다.
 
  • fail : Job을 실패로 종료한다. BatchStatus는 FAILED로 설정된다.
 
@Bean public Job job(JobRepository jobRepository, Step step1, Step step2, Step step3) { return new JobBuilder("job", jobRepository) .start(step1) .next(step2).on("FAILED").fail() .from(step2).on("*").to(step3) .end() .build(); }
 
  • stop : Job을 중단하고, 이후 재시작 시 특정 Step부터 실행하도록 설정한다. BatchStatus는 STOPPED로 설정된다.
 
@Bean public Job job(JobRepository jobRepository, Step step1, Step step2) { return new JobBuilder("job", jobRepository) .start(step1) .on("COMPLETED").stopAndRestart(step2) .end() .build(); }
 
각 전이 요소는 Job의 BatchStatus에 영향을 미치며, 이를 통해 Job의 재시작 가능 여부나 후속 처리 로직을 제어할 수 있다.
 

Programmatic Flow Decisions

 
프로그래밍을 사용해 동적으로 분기를 결정할 수 있다.
프로그래밍을 사용해 동적으로 분기를 설정하는 방법은 다음과 같다.
 
public class MyDecider implements JobExecutionDecider { public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) { String status; if (someCondition()) { status = "FAILED"; } else { status = "COMPLETED"; } return new FlowExecutionStatus(status); } }
 
JobExecutionDecider 를 구현하고, 이를 다음과 같이 설정할 수 있다.
 
@Bean public Job job(JobRepository jobRepository, MyDecider decider, Step step1, Step step2, Step step3) { return new JobBuilder("job", jobRepository) .start(step1) .next(decider).on("FAILED").to(step2) .from(decider).on("COMPLETED").to(step3) .end() .build(); }
 

Split Flows

 
SpringBatch는 작업을 병렬 흐름으로 구성하는 방법도 지원한다.
 
@Bean public Flow flow1(Step step1, Step step2) { return new FlowBuilder<SimpleFlow>("flow1") .start(step1) .next(step2) .build(); } @Bean public Flow flow2(Step step3) { return new FlowBuilder<SimpleFlow>("flow2") .start(step3) .build(); } @Bean public Job job(JobRepository jobRepository, Flow flow1, Flow flow2, Step step4) { return new JobBuilder("job", jobRepository) .start(flow1) .split(new SimpleAsyncTaskExecutor()) .add(flow2) .next(step4) .end() .build(); }
 
처음보면 좀 복잡하긴 한데, 흐름을 말로 설명해보면 다음과 같다
  1. flow1 실행
    1. step1step2 (순차 실행)
  1. 동시에 flow2도 실행
    1. step3
      ☑ 병렬 처리.split()SimpleAsyncTaskExecutor로 구현됨
      flow1flow2는 병렬로 동시에 실행됨
  1. 두 Flow가 모두 완료된 후, step4 실행
 
병렬 실행 최적화가 필요한 경우 유용할 것 같다.
 

마무리

 
좀 길었는데, Step을 구성하는 방법을 공부해봤다.
아직까진 어려운건 없는데, 내용이 매우 많아서 단순히 글만 보는걸로는 제대로 학습했는지 잘 모르겠다. 얼른 공식문서 다 보고, 실제로 배치 시스템을 구성하고 싶다.
다음 포스팅은 ItemReader, ItemWriter 설명으로 돌아오겠다.
 
 
 
Share article

lushlife99