Spring Batch 실습 4 : 배치 시스템 병렬 처리 - Step Partitioning

Spring Batch - Step Partitioning
정찬's avatar
Jun 27, 2025
Spring Batch 실습 4 : 배치 시스템 병렬 처리 - Step Partitioning
 

개요

 
Spring Batch에서는 기본적으로 단일 스레드 방식으로 작업을 처리합니다.
하지만 Spring Batch에서는 퍼포먼스 향상을 위해 멀티 스레드를 통한 병렬 처리를 지원합니다.
 
이번 시간에는 Spring Batch에서 제공하는 다양한 병렬 처리의 방법의 개념을 짚어보고, 그 중 하나인 Step Partitioning에 대해 실습해보겠습니다
 

Parallel Processing

 
Spring Batch는 여러가지 Scaling 방법들을 제공합니다.
상황에 맞는 전략들을 사용할 수 있는데, 각 상황은 프로세서, 스레드의 갯수로 나뉩니다.
 
실습에서 진행할 Scalling 방법은 가장 많이 사용되는 Partitioning과 Multi-threaded Step 입니다.
이번 시간에는 Partitioning을 알아보고, 다음 포스팅에서는 Multi-threaded Step을 알아보겠습니다.
더 다양한 방법들을 알고싶다면 여기를 참고하세요
 

Partitioning 개념

 
notion image
 
 
Partitioning이란?
하나의 Step 작업을 여러개의 작은 단위(파티션)로 나누어 병렬로 처리하는 방법
 
Spring Batch에서 청크 단위로 배치 작업을 할 때, 청크를 작은 단위로 또 나누어서 병렬로 처리합니다.
 
예를 들어서 id로 파티션하거나, 시간같은 컬럼으로 나누어서도 작업할 수도 있습니다.
 
다음 포스팅에서 실습할 Multi-threaded Step 방법으로 병렬을 처리하려면, thread-safe한 배치 프로그램을 작성하는 것이 필수적입니다. 따라서 기존에 작성했던 배치 시스템을 Scale up 하는 과정에서 코드 변경이 잦고, 동시성 문제가 발생하지 않는지 많은 리소스가 필요합니다.
하지만 Partitioning 은 병렬로 실행되지만 서로 겹치지 않는 분리된 데이터를 처리하기 때문에 동시성 이슈가 없다는 장점이 있습니다.
당연히 병렬로 처리하니 퍼포먼스 향상도 가져오면서도, 쉽게 확장 가능하기 때문에 많이 사용하는 방법중 하나입니다.
 

Partitioning vs 멀티스레드 Step 비교 정리

항목
Partitioning 방식
Multi-threaded Step 방식
병렬 처리 단위
파티션 (데이터 범위 단위)
청크 단위 (Thread로 청크 처리)
Reader/Writer 인스턴스
파티션마다 새로 생성 (@StepScope)
모든 스레드가 공유
스레드 안전 요구
거의 없음 (독립 처리)
필수 (Reader/Writer 등 thread-safe 해야 함)
설정 복잡도
높음 (Partitioner, 파라미터 전달 등 필요)
비교적 단순 (taskExecutor만 추가)
동시성 이슈 위험
낮음
높음 (동시 접근 시 충돌 가능)
성능 예측 가능성
높음 (분할 제어 가능)
낮음 (DB 락, 중복 처리 가능성)
처리 대상 분할 방식
명시적으로 ID/날짜 등 기준으로 나눔
자동 (단일 Reader에서 청크 단위로 병렬 처리)

Partitioning 구조

 
Partitioning은 크게 3가지 구성 요소로 이루어집니다:
구성 요소
역할
Master Step
Partitioning을 설정하고 파티션별 Step을 분배
Partitioner
전체 데이터 처리 범위를 나눔 (예: ID 범위)
Slave Step
각 파티션에 대해 병렬로 실행되는 Step (실제 처리 로직)
구조 간 동작 흐름:
  • Master Step이 실행되면 Partitioner가 호출되어, 각 파티션에 대한 범위를 정의함
  • 각 범위(startId~endId 등)는 ExecutionContext에 담겨 파티션별로 설정됨
  • 설정된 파라미터를 기반으로 동일한 Slave Step여러 개 동시에 병렬 실행
  • Slave Step은 자신에게 할당된 범위만 처리하므로 스레드 간 충돌 없이 안정적으로 병렬 처리 가능
 

Partitioning 실습

 
파티셔닝을 통해 병렬 처리 실습을 해보겠습니다.
이번 실습은 이전에 작성했던 시나리오2번에 파티셔닝을 추가하는 방식으로 진행했습니다.
앞서 실습한 시나리오2번은 여기를 참고해주세요.
 

 
하위 구성요소부터 코드를 작성해보겠습니다.
 

Partitioner

 
먼저 파티션을 나눌 Partitioner 를 구성해보겠습니다.
 
private static final int GRID_SIZE = 4; @Bean public Partitioner idRangePartitioner() { return gridSize -> { long minId = 1L; long maxId = 100000L; // or dynamically fetch max ID from DB long targetSize = (maxId - minId) / gridSize + 1; Map<String, ExecutionContext> result = new HashMap<>(); long start = minId; long end; for (int i = 0; i < gridSize; i++) { end = Math.min(start + targetSize - 1, maxId); ExecutionContext context = new ExecutionContext(); context.putLong("startId", start); context.putLong("endId", end); result.put("partition" + i, context); start += targetSize; } return result; }; }
 
주의할점
minId와 maxId는 실습 환경에서는 고정되어 있으므로 고정된 값을사용했으나, 실제 환경에서는 db에서 동적으로 가져와야 합니다!
 
위 코드는 id를 기준으로 4개의 파티션으로 나눕니다.
아래와 같이 파티션이 나뉠 것입니다.
1번 파티션 - 1 ~ 25000
2번 파티션 - 25001 ~ 50000
3번 파티션 - 50001 ~ 75000
4번 파티션 - 75001 ~ 100000
 
그리고 파티션을 기준으로, 청크로 나누어 작업합니다.
구조적으로 보면 아래와 같이 처리하게 되겠죠
[Job] └── [Partition 1: 1~25000] └── Chunk 1 (1~1000) └── Chunk 2 (1001~2000) └── ... └── [Partition 2: 25001~50000] └── Chunk 1 (25001~26000) └── ... ...
 
Partitioner 에서 나눈 파티션을 후에 Slave Step 에서 청크 단위로 처리하게 됩니다.

Master Step

 
이번엔 Master Step을 설정해봅시다.
 
@Bean public Step partitionedStep() { return new StepBuilder("partitionedStep", jobRepository) .partitioner("workerStep", idRangePartitioner()) // ← 파티셔너 설정 .step(workerStep()) // ← Slave Step 정의 .gridSize(GRID_SIZE) // ← 파티션 개수 .taskExecutor(taskExecutor()) // ← 병렬 실행용 Executor .build(); } @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(4); executor.setMaxPoolSize(8); executor.setThreadNamePrefix("batch-thread-"); executor.initialize(); return executor; }
 
Master Step 설정에서 특히 중요한 두 가지 요소인 gridSizetaskExecutor는 다음과 같은 역할을 합니다:
  • gridSize: 전체 작업을 몇 개의 파티션으로 나눌지 결정합니다.
  • taskExecutor: 파티션별로 Slave Step을 병렬 실행할 스레드 풀을 제공합니다.
    • 위의 예제 코드에서는 초기 스레드 개수를 4, 최대 스레드 개수를 8로 설정하였습니다.
 

Slave Step

 
이번에는 Slave Step을 설정해봅시다.
 
@Bean public Step workerStep() { return new StepBuilder("workerStep", jobRepository) .<Orders, MonthlySalesSummary>chunk(CHUNK_SIZE, transactionManager) .reader(ordersWithFetchJoinReader(null, null)) .processor(parallelOrderToSummaryProcessor()) .writer(summaryWriter()) .build(); }
 
앞서 정의했던 Master Step 에서 파티션을 정의했습니다.
Partitioner 에서 나뉜 파티션은 Slave Step을 독립적으로 병렬 실행하게 됩니다.
 
Reader의 파라미터를 (null, null) 로 설정한 이유
 
Spring Batch의 코드를 보다 보면 파라미터를 null로 설정하는 것을 많이 보셨을겁니다.
 
@StepScope 로 정의된 Bean은 Step 실행 시점에 Lazy하게 생성되며, ExecutionContext로부터 동적으로 파라미터를 주입받게 됩니다.
 
@Bean @StepScope public ItemReader<Orders> ordersWithFetchJoinReader( @Value("#{stepExecutionContext[startId]}") Long startId, @Value("#{stepExecutionContext[endId]}") Long endId)
 
이런식으로 Spring el에 맞게 정의해주면, 파라미터를 동적으로 주입받게 되니, Bean 생성 시점에는 실제 값을 몰라도 됩니다.
따라서 null을 전달해도 Spring이 런타임 시점에 자동으로 주입해줍니다!
 

전체 코드

 
전체 코드는 다음과 같습니다.
@Slf4j @Configuration @RequiredArgsConstructor public class ParallelMonthlySalesSummaryJobConfig { private static final int CHUNK_SIZE = 1000; private static final int GRID_SIZE = 4; private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; private final EntityManagerFactory emf; private final MonthlySalesSummaryRepository monthlySalesSummaryRepository; @Bean public Job parallelMonthlySalesSummaryJob() { return new JobBuilder("parallelMonthlySalesSummaryJob", jobRepository) .start(partitionedStep()) .build(); } @Bean public Step partitionedStep() { return new StepBuilder("partitionedStep", jobRepository) .partitioner("workerStep", idRangePartitioner()) .step(workerStep()) .gridSize(GRID_SIZE) .taskExecutor(taskExecutor()) .build(); } @Bean public Partitioner idRangePartitioner() { return gridSize -> { long minId = 1L; long maxId = 100000L; // or dynamically fetch max ID from DB long targetSize = (maxId - minId) / gridSize + 1; Map<String, ExecutionContext> result = new HashMap<>(); long start = minId; long end; for (int i = 0; i < gridSize; i++) { end = Math.min(start + targetSize - 1, maxId); ExecutionContext context = new ExecutionContext(); context.putLong("startId", start); context.putLong("endId", end); result.put("partition" + i, context); start += targetSize; } return result; }; } @Bean public Step workerStep() { return new StepBuilder("workerStep", jobRepository) .<Orders, MonthlySalesSummary>chunk(CHUNK_SIZE, transactionManager) .reader(ordersWithFetchJoinReader(null, null)) .processor(parallelOrderToSummaryProcessor()) .writer(summaryWriter()) .build(); } @Bean @StepScope public ItemReader<Orders> ordersWithFetchJoinReader( @Value("#{stepExecutionContext[startId]}") Long startId, @Value("#{stepExecutionContext[endId]}") Long endId) { return new ItemReader<>() { private final Queue<Orders> buffer = new LinkedList<>(); private int currentOffset = 0; private final int pageSize = CHUNK_SIZE; @Override public Orders read() { if (!buffer.isEmpty()) { return buffer.poll(); } EntityManager em = emf.createEntityManager(); try { // 1단계: id 서브쿼리 (페이징 처리) List<Long> ids = em.createQuery(""" SELECT o.id FROM Orders o WHERE o.id >= :startId AND o.id <= :endId ORDER BY o.id """, Long.class) .setParameter("startId", startId) .setParameter("endId", endId) .setFirstResult(currentOffset) .setMaxResults(pageSize) .getResultList(); if (ids.isEmpty()) { return null; // 끝 } currentOffset += ids.size(); // 2단계: IN절 fetch join으로 연관관계 한꺼번에 로딩 List<Orders> orders = em.createQuery(""" SELECT DISTINCT o FROM Orders o LEFT JOIN FETCH o.user LEFT JOIN FETCH o.orderItems WHERE o.id IN :ids ORDER BY o.id """, Orders.class) .setParameter("ids", ids) .getResultList(); buffer.addAll(orders); return buffer.poll(); } finally { em.close(); } } }; } @Bean public ItemProcessor<Orders, MonthlySalesSummary> parallelOrderToSummaryProcessor() { return order -> { var user = order.getUser(); var month = YearMonth.from(order.getOrderAt()); long total = order.getOrderItems().stream() .mapToLong(i -> (long) i.getQuantity() * i.getPrice()) .sum(); return MonthlySalesSummary.builder() .user(user) .yearMonth(month.toString()) .monthlySpendMoney(total) .monthlyOrderCount(1) .build(); }; } @Bean @StepScope public ItemWriter<MonthlySalesSummary> summaryWriter() { return items -> { Map<String, MonthlySalesSummary> summaryMap = new ConcurrentHashMap<>(); for (MonthlySalesSummary item : items) { String key = item.getUser().getId() + "_" + item.getYearMonth(); summaryMap.merge(key, item, (existing, incoming) -> { existing.accumulateSpendMoney(incoming.getMonthlySpendMoney()); existing.plusOrderCount(incoming.getMonthlyOrderCount()); return existing; }); } monthlySalesSummaryRepository.saveAll(summaryMap.values()); }; } @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(4); executor.setMaxPoolSize(8); executor.setThreadNamePrefix("batch-thread-"); executor.initialize(); return executor; } }
 
결과를 보면
notion image
 
10만건의 order를 파티셔닝하여 배치했을 때 동시성 문제 없이 잘 집계되는 것을 알 수 있습니다!

마무리

 
이번 글에서는 Spring Batch의 파티셔닝(Partitioning) 개념과, Master Step과 Slave Step의 역할을 구분하여 설명했습니다.
실제 예제 코드를 통해 ID 범위 기준으로 데이터를 여러 파티션으로 나누고, 각 파티션을 병렬로 처리하는 구조를 살펴보았습니다.
특히 gridSizetaskExecutor 설정이 파티셔닝 성능과 병렬 처리에 중요한 역할을 하며,
@StepScope를 활용해 파티션별로 동적으로 파라미터를 주입받아 유연하게 처리할 수 있다는 점도 확인했습니다.
 
다음 시간에는 Spring Batch의 또 다른 병렬 처리 방법인 Multi-threaded Step 실습 포스팅으로 돌아오겠습니다.

Reference

 
 
 
Share article

lushlife99