개요
Spring Batch에서는 기본적으로 단일 스레드 방식으로 작업을 처리합니다.
하지만 Spring Batch에서는 퍼포먼스 향상을 위해 멀티 스레드를 통한 병렬 처리를 지원합니다.
이번 시간에는 Spring Batch에서 제공하는 다양한 병렬 처리의 방법의 개념을 짚어보고, 그 중 하나인
Step Partitioning에 대해 실습해보겠습니다Parallel Processing
Spring Batch는 여러가지 Scaling 방법들을 제공합니다.
상황에 맞는 전략들을 사용할 수 있는데, 각 상황은 프로세서, 스레드의 갯수로 나뉩니다.
실습에서 진행할 Scalling 방법은 가장 많이 사용되는 Partitioning과 Multi-threaded Step 입니다.
이번 시간에는 Partitioning을 알아보고, 다음 포스팅에서는 Multi-threaded Step을 알아보겠습니다.
더 다양한 방법들을 알고싶다면 여기를 참고하세요
Partitioning 개념

(출처 : Spring Batch Docs)
Partitioning이란?
하나의 Step 작업을 여러개의 작은 단위(파티션)로 나누어 병렬로 처리하는 방법
Spring Batch에서 청크 단위로 배치 작업을 할 때, 청크를 작은 단위로 또 나누어서 병렬로 처리합니다.
예를 들어서 id로 파티션하거나, 시간같은 컬럼으로 나누어서도 작업할 수도 있습니다.
다음 포스팅에서 실습할
Multi-threaded Step 방법으로 병렬을 처리하려면, thread-safe한 배치 프로그램을 작성하는 것이 필수적입니다. 따라서 기존에 작성했던 배치 시스템을 Scale up 하는 과정에서 코드 변경이 잦고, 동시성 문제가 발생하지 않는지 많은 리소스가 필요합니다. 하지만
Partitioning 은 병렬로 실행되지만 서로 겹치지 않는 분리된 데이터를 처리하기 때문에 동시성 이슈가 없다는 장점이 있습니다. 당연히 병렬로 처리하니 퍼포먼스 향상도 가져오면서도, 쉽게 확장 가능하기 때문에 많이 사용하는 방법중 하나입니다.
Partitioning vs 멀티스레드 Step 비교 정리
항목 | Partitioning 방식 | Multi-threaded Step 방식 |
병렬 처리 단위 | 파티션 (데이터 범위 단위) | 청크 단위 (Thread로 청크 처리) |
Reader/Writer 인스턴스 | 파티션마다 새로 생성 ( @StepScope) | 모든 스레드가 공유 |
스레드 안전 요구 | 거의 없음 (독립 처리) | 필수 (Reader/Writer 등 thread-safe 해야 함) |
설정 복잡도 | 높음 ( Partitioner, 파라미터 전달 등 필요) | 비교적 단순 ( taskExecutor만 추가) |
동시성 이슈 위험 | 낮음 | 높음 (동시 접근 시 충돌 가능) |
성능 예측 가능성 | 높음 (분할 제어 가능) | 낮음 (DB 락, 중복 처리 가능성) |
처리 대상 분할 방식 | 명시적으로 ID/날짜 등 기준으로 나눔 | 자동 (단일 Reader에서 청크 단위로 병렬 처리) |
Partitioning 구조
Partitioning은 크게 3가지 구성 요소로 이루어집니다:
구성 요소 | 역할 |
Master Step | Partitioning을 설정하고 파티션별 Step을 분배 |
Partitioner | 전체 데이터 처리 범위를 나눔 (예: ID 범위) |
Slave Step | 각 파티션에 대해 병렬로 실행되는 Step (실제 처리 로직) |
구조 간 동작 흐름:
Master Step이 실행되면Partitioner가 호출되어, 각 파티션에 대한 범위를 정의함
- 각 범위(startId~endId 등)는
ExecutionContext에 담겨 파티션별로 설정됨
- 설정된 파라미터를 기반으로 동일한
Slave Step이 여러 개 동시에 병렬 실행됨
- 각
Slave Step은 자신에게 할당된 범위만 처리하므로 스레드 간 충돌 없이 안정적으로 병렬 처리 가능
Partitioning 실습
파티셔닝을 통해 병렬 처리 실습을 해보겠습니다.
이번 실습은 이전에 작성했던 시나리오2번에 파티셔닝을 추가하는 방식으로 진행했습니다.
앞서 실습한 시나리오2번은 여기를 참고해주세요.
하위 구성요소부터 코드를 작성해보겠습니다.
Partitioner
먼저 파티션을 나눌
Partitioner 를 구성해보겠습니다.private static final int GRID_SIZE = 4;
@Bean
public Partitioner idRangePartitioner() {
return gridSize -> {
long minId = 1L;
long maxId = 100000L; // or dynamically fetch max ID from DB
long targetSize = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
long start = minId;
long end;
for (int i = 0; i < gridSize; i++) {
end = Math.min(start + targetSize - 1, maxId);
ExecutionContext context = new ExecutionContext();
context.putLong("startId", start);
context.putLong("endId", end);
result.put("partition" + i, context);
start += targetSize;
}
return result;
};
}주의할점
minId와 maxId는 실습 환경에서는 고정되어 있으므로 고정된 값을사용했으나, 실제 환경에서는 db에서 동적으로 가져와야 합니다!
위 코드는 id를 기준으로 4개의 파티션으로 나눕니다.
아래와 같이 파티션이 나뉠 것입니다.
1번 파티션 - 1 ~ 25000
2번 파티션 - 25001 ~ 50000
3번 파티션 - 50001 ~ 75000
4번 파티션 - 75001 ~ 100000
그리고 파티션을 기준으로, 청크로 나누어 작업합니다.
구조적으로 보면 아래와 같이 처리하게 되겠죠
[Job]
└── [Partition 1: 1~25000]
└── Chunk 1 (1~1000)
└── Chunk 2 (1001~2000)
└── ...
└── [Partition 2: 25001~50000]
└── Chunk 1 (25001~26000)
└── ...
...
Partitioner 에서 나눈 파티션을 후에 Slave Step 에서 청크 단위로 처리하게 됩니다.Master Step
이번엔
Master Step을 설정해봅시다.@Bean
public Step partitionedStep() {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", idRangePartitioner()) // ← 파티셔너 설정
.step(workerStep()) // ← Slave Step 정의
.gridSize(GRID_SIZE) // ← 파티션 개수
.taskExecutor(taskExecutor()) // ← 병렬 실행용 Executor
.build();
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setThreadNamePrefix("batch-thread-");
executor.initialize();
return executor;
}Master Step 설정에서 특히 중요한 두 가지 요소인 gridSize와 taskExecutor는 다음과 같은 역할을 합니다:gridSize: 전체 작업을 몇 개의 파티션으로 나눌지 결정합니다.
taskExecutor: 파티션별로Slave Step을 병렬 실행할 스레드 풀을 제공합니다.- 위의 예제 코드에서는 초기 스레드 개수를 4, 최대 스레드 개수를 8로 설정하였습니다.
Slave Step
이번에는
Slave Step을 설정해봅시다.@Bean
public Step workerStep() {
return new StepBuilder("workerStep", jobRepository)
.<Orders, MonthlySalesSummary>chunk(CHUNK_SIZE, transactionManager)
.reader(ordersWithFetchJoinReader(null, null))
.processor(parallelOrderToSummaryProcessor())
.writer(summaryWriter())
.build();
}앞서 정의했던
Master Step 에서 파티션을 정의했습니다.Partitioner 에서 나뉜 파티션은 Slave Step을 독립적으로 병렬 실행하게 됩니다.Reader의 파라미터를 (null, null) 로 설정한 이유
Spring Batch의 코드를 보다 보면 파라미터를 null로 설정하는 것을 많이 보셨을겁니다.
@StepScope 로 정의된 Bean은 Step 실행 시점에 Lazy하게 생성되며, ExecutionContext로부터 동적으로 파라미터를 주입받게 됩니다.@Bean
@StepScope
public ItemReader<Orders> ordersWithFetchJoinReader(
@Value("#{stepExecutionContext[startId]}") Long startId,
@Value("#{stepExecutionContext[endId]}") Long endId)이런식으로 Spring el에 맞게 정의해주면, 파라미터를 동적으로 주입받게 되니, Bean 생성 시점에는 실제 값을 몰라도 됩니다.
따라서 null을 전달해도 Spring이 런타임 시점에 자동으로 주입해줍니다!
전체 코드
전체 코드는 다음과 같습니다.
@Slf4j
@Configuration
@RequiredArgsConstructor
public class ParallelMonthlySalesSummaryJobConfig {
private static final int CHUNK_SIZE = 1000;
private static final int GRID_SIZE = 4;
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final EntityManagerFactory emf;
private final MonthlySalesSummaryRepository monthlySalesSummaryRepository;
@Bean
public Job parallelMonthlySalesSummaryJob() {
return new JobBuilder("parallelMonthlySalesSummaryJob", jobRepository)
.start(partitionedStep())
.build();
}
@Bean
public Step partitionedStep() {
return new StepBuilder("partitionedStep", jobRepository)
.partitioner("workerStep", idRangePartitioner())
.step(workerStep())
.gridSize(GRID_SIZE)
.taskExecutor(taskExecutor())
.build();
}
@Bean
public Partitioner idRangePartitioner() {
return gridSize -> {
long minId = 1L;
long maxId = 100000L; // or dynamically fetch max ID from DB
long targetSize = (maxId - minId) / gridSize + 1;
Map<String, ExecutionContext> result = new HashMap<>();
long start = minId;
long end;
for (int i = 0; i < gridSize; i++) {
end = Math.min(start + targetSize - 1, maxId);
ExecutionContext context = new ExecutionContext();
context.putLong("startId", start);
context.putLong("endId", end);
result.put("partition" + i, context);
start += targetSize;
}
return result;
};
}
@Bean
public Step workerStep() {
return new StepBuilder("workerStep", jobRepository)
.<Orders, MonthlySalesSummary>chunk(CHUNK_SIZE, transactionManager)
.reader(ordersWithFetchJoinReader(null, null))
.processor(parallelOrderToSummaryProcessor())
.writer(summaryWriter())
.build();
}
@Bean
@StepScope
public ItemReader<Orders> ordersWithFetchJoinReader(
@Value("#{stepExecutionContext[startId]}") Long startId,
@Value("#{stepExecutionContext[endId]}") Long endId) {
return new ItemReader<>() {
private final Queue<Orders> buffer = new LinkedList<>();
private int currentOffset = 0;
private final int pageSize = CHUNK_SIZE;
@Override
public Orders read() {
if (!buffer.isEmpty()) {
return buffer.poll();
}
EntityManager em = emf.createEntityManager();
try {
// 1단계: id 서브쿼리 (페이징 처리)
List<Long> ids = em.createQuery("""
SELECT o.id
FROM Orders o
WHERE o.id >= :startId AND o.id <= :endId
ORDER BY o.id
""", Long.class)
.setParameter("startId", startId)
.setParameter("endId", endId)
.setFirstResult(currentOffset)
.setMaxResults(pageSize)
.getResultList();
if (ids.isEmpty()) {
return null; // 끝
}
currentOffset += ids.size();
// 2단계: IN절 fetch join으로 연관관계 한꺼번에 로딩
List<Orders> orders = em.createQuery("""
SELECT DISTINCT o
FROM Orders o
LEFT JOIN FETCH o.user
LEFT JOIN FETCH o.orderItems
WHERE o.id IN :ids
ORDER BY o.id
""", Orders.class)
.setParameter("ids", ids)
.getResultList();
buffer.addAll(orders);
return buffer.poll();
} finally {
em.close();
}
}
};
}
@Bean
public ItemProcessor<Orders, MonthlySalesSummary> parallelOrderToSummaryProcessor() {
return order -> {
var user = order.getUser();
var month = YearMonth.from(order.getOrderAt());
long total = order.getOrderItems().stream()
.mapToLong(i -> (long) i.getQuantity() * i.getPrice())
.sum();
return MonthlySalesSummary.builder()
.user(user)
.yearMonth(month.toString())
.monthlySpendMoney(total)
.monthlyOrderCount(1)
.build();
};
}
@Bean
@StepScope
public ItemWriter<MonthlySalesSummary> summaryWriter() {
return items -> {
Map<String, MonthlySalesSummary> summaryMap = new ConcurrentHashMap<>();
for (MonthlySalesSummary item : items) {
String key = item.getUser().getId() + "_" + item.getYearMonth();
summaryMap.merge(key, item, (existing, incoming) -> {
existing.accumulateSpendMoney(incoming.getMonthlySpendMoney());
existing.plusOrderCount(incoming.getMonthlyOrderCount());
return existing;
});
}
monthlySalesSummaryRepository.saveAll(summaryMap.values());
};
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(8);
executor.setThreadNamePrefix("batch-thread-");
executor.initialize();
return executor;
}
}
결과를 보면

10만건의 order를 파티셔닝하여 배치했을 때 동시성 문제 없이 잘 집계되는 것을 알 수 있습니다!
마무리
이번 글에서는 Spring Batch의 파티셔닝(Partitioning) 개념과, Master Step과 Slave Step의 역할을 구분하여 설명했습니다.
실제 예제 코드를 통해 ID 범위 기준으로 데이터를 여러 파티션으로 나누고, 각 파티션을 병렬로 처리하는 구조를 살펴보았습니다.
특히
gridSize와 taskExecutor 설정이 파티셔닝 성능과 병렬 처리에 중요한 역할을 하며,@StepScope를 활용해 파티션별로 동적으로 파라미터를 주입받아 유연하게 처리할 수 있다는 점도 확인했습니다.다음 시간에는 Spring Batch의 또 다른 병렬 처리 방법인 Multi-threaded Step 실습 포스팅으로 돌아오겠습니다.
Reference
Share article