Spring Batch Step 파헤쳐보기
2024-01-02 15:43:28
Step 개념
- Batch Job 을 구성하는 독립적인 단계로 하나의 Job은 하나 이상의 Step으로 구성됩니다.
- Spring Batch Job 구현체중 하나인
SimpleJob을 까보면 Job 은 내부 멤버변수로 Step List를 들고 있음을 확인할 수 있습니다.
public class SimpleJob extends AbstractJob {
private List<Step> steps = new ArrayList<>();
//...
}
StepExecution 이란?
- Step에 대한 한번의 시도를 의미하는 객체로서 Step 실행 중에 발생한 정보들을 저장하고 있는 객체입니다.
- DB의
BATCH_STEP_EXECUTION테이블과 1:1로 매핑됩니다. - Job이 실패해서 재수행하는 경우에는 실패한 Step에 대해서만 재시작하고, 성공한 Step은 생략합니다. ( 하지만
allowStartIfComplete설정값을 변경하면 성공한 Step도 재시작되게 변경할 수 있습니다 )
예를 들어 Step 1,2,3이 존재하는데 Step2번이 실행중에 실패한다면 Step 1은 성공 , Step 2는 실패처리 됩니다. 그리고 Step 3는 실행되지 않습니다. 이 상태에서 Job을 재시작하면 Step2부터 재시작하여 Step2,Step3 가 수행됩니다.
- Job을 구성하는 모든 Step의 실행 정보인
StepExecution이 완료 처리되어야만JobExecution이 완료처리됩니다.
즉 Spring Batch 도메인 용어를 정리하면 하나의 Job은 여러개의 Step으로 구성되고, Job이 JobParameter를 주입받아 실행되는 객체가 JobInstance객체입니다.
JobInstance를 실행한 정보가 JobExecution이고, Job이 실행되면서 Job을 구성하는 Step들의 실행정보가 StepExecution입니다.
Step간 데이터 공유하기 - ExecutionContext
ExecutionContext를 활용하면 Job내 Step간 데이터를 공유할 수 있습니다. 혹은 실패한 Step에서 Step 재시작시 실패 이전까지 작업했던 상태값들을 가져올 수 있습니다.ExecutionContext는 Spring Batch에서 관리하는 key-value (Map) 컬렉션입니다.StepExecution,JobExecution객체의 멤버변수로 선언되고 , 각각 DB의BATCH_JOB_EXECUTION_CONTEXT,BATCH_STEP_EXECUTION_CONTEXT테이블에 1:1 매핑됩니다.StepExecution내ExecutionContext는 Step안에서만 공유됩니다. 즉 특정 Step에서만 접근이 가능합니다. 실패한 Step이 재시작된 경우도 이전까지 작업한 내용을 불러들일수 있습니다.JobExeuction내ExecutionContext는 모든 Step안에서 공유됩니다.
예시 코드
아래와 같이 ExecutionContext를 StepContribution 또는 ChunkContext를 통해 접근하고, 값을 넣어줄 수 있습니다.
넣어준 값은 BATCH_JOB_EXECUTION_CONTEXT , BATCH_STEP_EXECUTION_CONTEXT 테이블에 각각 직렬화되어 저장됩니다.
public class ExecutionContextTasklet1 implements Tasklet {
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
ExecutionContext jobExecutionContext = contribution.getStepExecution().getJobExecution().getExecutionContext();
ExecutionContext stepExecutionContext = contribution.getStepExecution().getExecutionContext();
jobExecutionContext.put("jobName", "developer"); //1. BATCH_JOB_EXECUTION_CONTEXT 에 저장 (모든 STEP에서 공유)
stepExecutionContext.put("stepName", "software"); // 2. BATCH_STEP_EXECUTION_CONTEXT 에 저장 (특정 STEP에서 공유)
return RepeatStatus.FINISHED;
}
}
Spring Batch에서 제공하는 Step 구현체(5)
- Step 인터페이스를 AbstractStep이라는 추상 클래스에서 구현하고,AbstractStep 추상클래스를 구현하는 구조입니다.
- Batch에서 제공하는 Step의 구현체는 아래와 같은 5개의 구현체가 존재합니다.
- Tasklet Step
- Partition Step
- Job Step
- Flow Step
- Decision Step
TaskletStep
RepeatTemplate을 사용해서 Tasklet 코드 block을 트랜잭션 경계 내(성공시 커밋,실패시 롤백)에서 반복해서 실행합니다. 언제까지 반복해서 실행할것인가에 대한 판단은 Tasklet 객체에서 반환하는RepeatStatus값에 의해 결정됩니다.RepeatStatus.FINISHED와 같이 특정RepeatStatus를 반환할떄까지 계속해서 실행합니다.- TaskletStep은 아래와 같은 tasklet 인터페이스를 구현하는 tasklet 구현체를 멤버변수로 가지고 있습니다.
/**
* Strategy for processing in a step.
*/
public interface Tasklet {
/**
* @return an {@link RepeatStatus} indicating whether processing is
* continuable. Returning {@code null} is interpreted as {@link RepeatStatus#FINISHED}
*
* @throws Exception thrown if error occurs during execution.
*/
@Nullable
RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;
}
- TaskletStep의 코드를 확인해보면 Tasklet 호출횟수를 조정하기 위한 RepeatTemplate 와 실행되야할 작업 그 자체 (tasklet) 구현체를 멤버변수로 들고있는것을 확인할 수 있습니다.
public class TaskletStep extends AbstractStep {
private RepeatOperations stepOperations = new RepeatTemplate();
//...
private Tasklet tasklet;
//...
}
- 자주 사용되는 tasklet 구현체는 Spring Batch에서 이미 구현해놓았습니다. 이 중
ChunkOrientedTasklet구현체을 활용해 Chunk 단위로 배치 작업을 쪼개서 처리할 수 있습니다.
Chunk 기반과 Task 기반
- Spring Batch에서 Tasklet Step의 실행 단위는 크게 Chunk기반과 Task기반으로 2가지입니다.
Chunk 기반 Tasklet의 특징
- 하나의 큰 덩어리를 n개씩 쪼개서 실행한다는 의미로 데이터 대용량 처리용으로 설계되었습니다.
- ItemReader, ItemProcessor, ItemWriter를 사용합니다.
- Spring Batch에서는 Chunk기반 Tasklet 실행을 위해
ChunkOrientedTasklet구현체를 제공합니다.
public Step chunkStep() {
return stepBuilderFactory.get("chunkStep")
.<String, String>chunk(2)
.reader(new ListItemReader<>(List.of("a", "b", "c", "d", "e", "f")))
.processor((ItemProcessor<String, String>) String::toUpperCase)
.writer(items -> items.forEach(System.out::println))
.build();
}
Chunk 작업의 Buffering - StepContribution
-
Chunk Process의 변경 사항을 버퍼링 한 뒤,
StepExecution상태를 업데이트하는 도메인 객체입니다. -
ItemReader,ItemWriter의readCount,writeCount와 같은 Chunk내 작업 정보들을 임시적으로 들고 있다가 Chunk Commit 직전에 update해주는 역할입니다. -
실제로 Chunk 작업 결과를 Buffering했다가 update해주는 코드는
StepExecution.applymethod를 호출하면서 발생합니다. 아래 로직을 확인해보면StepContribution의 상태값들을StepExecution에 누적시켜주는것을 확인할 수 있습니다.
class StepExecution {
//...
/**
* On successful execution just before a chunk commit, this method should be
* called. Synchronizes access to the {@link StepExecution} so that changes
* are atomic.
*
* @param contribution {@link StepContribution} instance used to update the StepExecution state.
*/
public synchronized void apply(StepContribution contribution) {
readSkipCount += contribution.getReadSkipCount();
writeSkipCount += contribution.getWriteSkipCount();
processSkipCount += contribution.getProcessSkipCount();
filterCount += contribution.getFilterCount();
readCount += contribution.getReadCount();
writeCount += contribution.getWriteCount();
exitStatus = exitStatus.and(contribution.getExitStatus());
}
//...
}
Task 기반 Tasklet의 특징
- Chunk 기반으로 작업을 n개로 쪼개서 처리하기보다, 단일 작업 기반이 더 효율적인 경우 사용됩니다.
- 주로 Tasklet 인터페이스 구현체를 만들어서 사용합니다.
public Step taskStep(){
return stepBuilderFactory.get("taskStep")
.tasklet(new CustomTasklet())
.listener(new StepExecutionListener() {
@Override
public void beforeStep(StepExecution stepExecution) {}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {return null;}
}) // 1. Step 실행전과 후에 수행해야할 콜백 로직 설정
.build();
}
- Chunk Buffering
Step API - startLimit / allowStartIfComplete
- Step 재시작과 관련된 API로 startLimit , allowStartIfComplete API가 존재합니다.
startLimit API
- step 실패시 step이 재수행될 수 있는데, 이떄 재수행횟수를 제한하는 API입니다.
- 예를 들어서
startLimit값을 3으로 주게 되면 3번까지만 step 재시작이 허용됩니다. 이후에는org.springframework.batch.core.StartLimitExceededException예외가 터집니다. - default값은
Integer.MAX_VALUE로 사실상 제한이 없습니다.
설정 예시
- 아래 테스트 케이스는 startLimit을 3으로 주었을떄입니다.
public Step step(){
return stepBuilderFactory.get("step")
.tasklet((contribution, chunkContext) -> {
System.out.println("step execute");
throw new RuntimeException();
})
.startLimit(3)
.build();
}
- tasklet 로직에서 의도적으로 RuntimeException을 던지고 Job을 여러번까지 재시작하면 3번까지는
BATCH_JOB_STEP_EXECUTION테이블에 저장됩니다.
- 이후에 step은 재시작하더라도 아래와 같은 예외가 던져지게 됩니다.
org.springframework.batch.core.StartLimitExceededException: Maximum start limit exceeded for step: stepStartMax: 3
- 주의해야할 부분은 해당 API와 설정과 무관하게 Job Execution은 계속해서 일어나게 됩니다. 즉
BATCH_JOB_EXECUTION테이블의 행은 누적됩니다. 즉 Step 재시작이 안되는 것 뿐 입니다. 아래 예시를 확인해보면 설정된startLimit값 (3) 이후로는StartLimitException이EXIT_MESSAGE로 기록되고 있습니다.
allowStartIfComplete API
- step 실패시 step이 재시작될 수 있는데, 이떄 기존에 완료된 step은 재시작할지 말지를 결정하는 api입니다.
- 기본값은 false로 완료된 step은 재시작을 수행하지 않습니다. 만약 true로 주게 된다면 이미
COMPLETE된 step이더라도 항상 재시작합니다.
설정 예시
- 아래 테스트 케이스는
Job을 첫번쨰 step은allowStartIfComplete값을true로 준 step이고, 두번째 step은 항상 실패하는 step입니다. 즉 두번쨰 step에서 항상 실패함으로 step1은allowStartIfComplete값이true라면 같이 재시작됩니다.
@Bean
public Job batchJob() {
return this.jobBuilderFactory.get("Job")
.start(restartAllowableStep())
.next(alwaysFailStep())
.build();
}
@Bean
public Step restartAllowableStep(){
return stepBuilderFactory.get("restartAllowableStep")
.tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED)
.allowStartIfComplete(true)
.build();
}
@Bean
public Step alwaysFailStep(){
return stepBuilderFactory.get("alwaysFailStep")
.tasklet((contribution, chunkContext) -> {
throw new RuntimeException();
})
.startLimit(3)
.build();
}
- 5번 Job을 재시작해보면
allowStartIfComplete을true로 준 첫번쨰 step은 이미 step이 완료되었음에도 불구하고 항상 재시작 되는 것을 확인할 수 있습니다. - 항상 실패하는 두번쨰 step은
startLimit값을 초과하게 되면 재시작되지 않고, 첫번쨰 step만 계속해서 재시작됩니다.
언제 allowStartIfComplete API를 사용할 수 있을까?
- 시점에 따라 데이터 값이 변경될 수 있어 데이터 유효성 점검을 수행하는 step의 경우에는 이전에 성공했던 step이더라도 재실행이 필요할 것입니다.
JobStep
- Job을 포함하고 있는 Step입니다. 즉, Step안에 또 다른 Job이 있습니다.
- JobStep안에 있는 Job이 실패하면, 해당 JobStep을 포함하고 있는 Job 역시 당연히 실패합니다.
- Job 실행 관련 메타 데이터는 JobStep안에 Job도 별도로 DB에 저장됩니다.
- 커다란 Job 하나를 작은 Job으로 쪼개서 관리하고자 할 때 사용될 수 있습니다.
설정 예시
- 먼저 가장 상위 Job을 설정하고, 하위에 step을 JobStep 으로 구성합니다.
- JobStep에 들어갈 Job이 사용할 parameter를 정의하기 위해서
parametersExtractor를 정의합니다.
@Bean
public Job parentJob(Step jobStep) {
return this.jobBuilderFactory.get("parentJob") // 1. 가장 상위의 Job입니다.
.start(jobStep)
.next(parentStep())
.build();
}
@Bean
public Step jobStep(JobLauncher jobLauncher) {
return stepBuilderFactory.get("jobStep") // 2. JobStep 입니다.
.job(childJob()) // 3. JobStep이 포함할 Job을 명시합니다.
.launcher(jobLauncher) // 4. 해당 Job이 사용할 JobLauncher를 명시합니다.
.parametersExtractor(jobParmetersExtractor()) // 5. 해당 Job에 넘겨줄 파라미터를 어떻게 가져올것인지 정의합니다.
.listener(new StepExecutionListener() { // 6. step 실행 전후로 실행할 콜백메소드를 정의합니다.
@Override
public void beforeStep(StepExecution stepExecution) {
stepExecution.getExecutionContext().putString("name", "chansoo");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
})
.build();
}
- Spring Batch에서는
parametersExtractor인터페이스의 구현체인DefaultJobParametersExtractor를 제공합니다. DefaultJobParametersExtractor는 부모 Job의 parameter와 Step의 parameter를 가져올 수 있습니다. 이떄 가져올 parameter의 key값을 설정해줄 수 있습니다.- 위 예시 코드에서는
StepExecutionListener에서 JobStep이 실행되기전에nameparameter를 설정해서 넘겨주고 있습니다. 이럴때는 아래와 같이 key를name으로 주면 설정된 value값을 들고 올 수 있습니다.
// stepExecutionContext에 있는 key를 찾아서 job에게 값을 넘겨줄 수 있습니다.
private JobParametersExtractor jobParmetersExtractor() {
DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
extractor.setKeys(new String[]{"name"}); // 1. name key를 가진 parameter가 존재하는 경우 Job에 넘겨준다.
return extractor;
}
배치 작업이 부모 Job,JobStep의 Job모두 성공한 경우가 아니라 JobStep의 Job만 성공한 case에는 어떻게 될까요? 아래와 같이 각각 3개의 케이스로 나눠볼 수 있을것입니다.
- 부모 Job, JobStep의 Job 모두 성공된 경우
- 부모 Job 실행 중 JobStep의 Job이 실패된 경우
- 부모 Job 실행 중 JobStep의 Job은 성공했으나, 부모 Job의 다른 Step 실패된 경우
부모 Job, JobStep의 Job 모두 성공된 케이스
- 우선 성공/실패 케이스와 별개로
BATCH_JOB_INSTANCE테이블에는 부모 Job와JobStep의 Job 모두 데이터가 들어갑니다.
- JobStep의 Job또한 별개의 Job Instance로 관리되기 때문에
BATCH_JOB_EXECUTION테이블의 레코드도 2개가 생성됩니다.
부모 Job 실행 중 JobStep의 Job이 실패된 케이스
- JobStep의 Job이 실패되면 부모의 Job도 실패처리가 됩니다. 즉 모든 Job이 실패된 것으로 간주됩니다.
아래와 같이 JobStep의 job의 step에서 무조건 RuntimeException을 던져서 실패하도록 설정하였습니다.
@Bean
public Step childStep() {
return stepBuilderFactory.get("childStep")
.tasklet((contribution, chunkContext) -> {
throw new RuntimeException("child job failed");
})
.build();
}
- job 실행 후
BATCH_JOB_EXECUTION테이블을 확인해보면 부모 Job까지 모두 실패처리된것을 확인할 수 있고 부모 Job은org.springframework.batch.core.UnexpectedJobExecutionException: Step failure: the delegate Job failed in JobStep.예외가 던져집니다.
BATCH_STEP_EXECUTION테이블을 확인해보면 부모 Job의 다음 step은 실행처리가 안되고, JobStep의 job의 step까지만 처리하다가 실패된것으로 기록됩니다.
부모 Job 실행 중 JobStep의 Job은 성공했으나, 부모 Job의 다른 Step 실패된 경우
- JobStep의 Job은 별개의
JobInstance로 취급되기 때문에COMPLETED상태로 기록되고, 부모 Job만FAILED상태로 기록됩니다. 즉 JobStep의 Job은 성공처리됩니다.BATCH_JOB_EXECUTION테이블을 확인해보면 JobStep의 job은 성공처리된것을 확인할 수 있습니다.
Reference
[1] https://docs.spring.io/spring-batch/docs/current/reference/html/domain.html [2] https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#configureStep
댓글
이 게시글에 대한 의견을 공유해주세요!
