Spring Batch Step 파헤쳐보기

2024-01-02 15:43:28

#spring#Batch

Step 개념

  • Batch Job 을 구성하는 독립적인 단계로 하나의 Job은 하나 이상의 Step으로 구성됩니다.
  • Spring Batch Job 구현체중 하나인 SimpleJob을 까보면 Job 은 내부 멤버변수로 Step List를 들고 있음을 확인할 수 있습니다.
public class SimpleJob extends AbstractJob {
    private List<Step> steps = new ArrayList<>();
    //...
}

StepExecution 이란?

  • Step에 대한 한번의 시도를 의미하는 객체로서 Step 실행 중에 발생한 정보들을 저장하고 있는 객체입니다.
  • DB의 BATCH_STEP_EXECUTION 테이블과 1:1로 매핑됩니다.
  • Job이 실패해서 재수행하는 경우에는 실패한 Step에 대해서만 재시작하고, 성공한 Step은 생략합니다. ( 하지만 allowStartIfComplete 설정값을 변경하면 성공한 Step도 재시작되게 변경할 수 있습니다 )

예를 들어 Step 1,2,3이 존재하는데 Step2번이 실행중에 실패한다면 Step 1은 성공 , Step 2는 실패처리 됩니다. 그리고 Step 3는 실행되지 않습니다. 이 상태에서 Job을 재시작하면 Step2부터 재시작하여 Step2,Step3 가 수행됩니다.

  • Job을 구성하는 모든 Step의 실행 정보인 StepExecution이 완료 처리되어야만 JobExecution이 완료처리됩니다.

즉 Spring Batch 도메인 용어를 정리하면 하나의 Job은 여러개의 Step으로 구성되고, Job이 JobParameter를 주입받아 실행되는 객체가 JobInstance객체입니다.

JobInstance를 실행한 정보가 JobExecution이고, Job이 실행되면서 Job을 구성하는 Step들의 실행정보가 StepExecution입니다.

Step간 데이터 공유하기 - ExecutionContext

  • ExecutionContext를 활용하면 Job내 Step간 데이터를 공유할 수 있습니다. 혹은 실패한 Step에서 Step 재시작시 실패 이전까지 작업했던 상태값들을 가져올 수 있습니다.
  • ExecutionContext는 Spring Batch에서 관리하는 key-value (Map) 컬렉션입니다.
  • StepExecution, JobExecution 객체의 멤버변수로 선언되고 , 각각 DB의 BATCH_JOB_EXECUTION_CONTEXT , BATCH_STEP_EXECUTION_CONTEXT 테이블에 1:1 매핑됩니다.
  • StepExecutionExecutionContext는 Step안에서만 공유됩니다. 즉 특정 Step에서만 접근이 가능합니다. 실패한 Step이 재시작된 경우도 이전까지 작업한 내용을 불러들일수 있습니다.
  • JobExeuctionExecutionContext는 모든 Step안에서 공유됩니다.

예시 코드

아래와 같이 ExecutionContextStepContribution 또는 ChunkContext를 통해 접근하고, 값을 넣어줄 수 있습니다.
넣어준 값은 BATCH_JOB_EXECUTION_CONTEXT , BATCH_STEP_EXECUTION_CONTEXT 테이블에 각각 직렬화되어 저장됩니다.

public class ExecutionContextTasklet1 implements Tasklet {

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {

        ExecutionContext jobExecutionContext = contribution.getStepExecution().getJobExecution().getExecutionContext(); 
        ExecutionContext stepExecutionContext = contribution.getStepExecution().getExecutionContext();

        jobExecutionContext.put("jobName", "developer"); //1. BATCH_JOB_EXECUTION_CONTEXT 에 저장 (모든 STEP에서 공유)
        stepExecutionContext.put("stepName", "software"); // 2. BATCH_STEP_EXECUTION_CONTEXT 에 저장 (특정 STEP에서 공유)

        return RepeatStatus.FINISHED;
    }
}

Spring Batch에서 제공하는 Step 구현체(5)

  • Step 인터페이스를 AbstractStep이라는 추상 클래스에서 구현하고,AbstractStep 추상클래스를 구현하는 구조입니다.
  • Batch에서 제공하는 Step의 구현체는 아래와 같은 5개의 구현체가 존재합니다.
  1. Tasklet Step
  2. Partition Step
  3. Job Step
  4. Flow Step
  5. Decision Step

TaskletStep

  • RepeatTemplate을 사용해서 Tasklet 코드 block을 트랜잭션 경계 내(성공시 커밋,실패시 롤백)에서 반복해서 실행합니다. 언제까지 반복해서 실행할것인가에 대한 판단은 Tasklet 객체에서 반환하는 RepeatStatus값에 의해 결정됩니다. RepeatStatus.FINISHED 와 같이 특정 RepeatStatus를 반환할떄까지 계속해서 실행합니다.
  • TaskletStep은 아래와 같은 tasklet 인터페이스를 구현하는 tasklet 구현체를 멤버변수로 가지고 있습니다.
/**
 * Strategy for processing in a step.
 */
public interface Tasklet {

	/**
	 * @return an {@link RepeatStatus} indicating whether processing is
	 * continuable. Returning {@code null} is interpreted as {@link RepeatStatus#FINISHED}
	 *
	 * @throws Exception thrown if error occurs during execution.
	 */
	@Nullable
	RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception;

}
  • TaskletStep의 코드를 확인해보면 Tasklet 호출횟수를 조정하기 위한 RepeatTemplate 와 실행되야할 작업 그 자체 (tasklet) 구현체를 멤버변수로 들고있는것을 확인할 수 있습니다.
public class TaskletStep extends AbstractStep {

    private RepeatOperations stepOperations = new RepeatTemplate();
    //...
    private Tasklet tasklet;
    //...
}
  • 자주 사용되는 tasklet 구현체는 Spring Batch에서 이미 구현해놓았습니다. 이 중 ChunkOrientedTasklet구현체을 활용해 Chunk 단위로 배치 작업을 쪼개서 처리할 수 있습니다.

Chunk 기반과 Task 기반

  • Spring Batch에서 Tasklet Step의 실행 단위는 크게 Chunk기반과 Task기반으로 2가지입니다.

Chunk 기반 Tasklet의 특징

  • 하나의 큰 덩어리를 n개씩 쪼개서 실행한다는 의미로 데이터 대용량 처리용으로 설계되었습니다.
  • ItemReader, ItemProcessor, ItemWriter를 사용합니다.
  • Spring Batch에서는 Chunk기반 Tasklet 실행을 위해 ChunkOrientedTasklet 구현체를 제공합니다.
public Step chunkStep() {
    return stepBuilderFactory.get("chunkStep")
                             .<String, String>chunk(2)
                             .reader(new ListItemReader<>(List.of("a", "b", "c", "d", "e", "f")))
                             .processor((ItemProcessor<String, String>) String::toUpperCase)
                             .writer(items -> items.forEach(System.out::println))
                             .build();
}

Chunk 작업의 Buffering - StepContribution

  • Chunk Process의 변경 사항을 버퍼링 한 뒤, StepExecution 상태를 업데이트하는 도메인 객체입니다.

  • ItemReader, ItemWriterreadCount , writeCount와 같은 Chunk내 작업 정보들을 임시적으로 들고 있다가 Chunk Commit 직전에 update해주는 역할입니다.

  • 실제로 Chunk 작업 결과를 Buffering했다가 update해주는 코드는 StepExecution.apply method를 호출하면서 발생합니다. 아래 로직을 확인해보면 StepContribution의 상태값들을 StepExecution 에 누적시켜주는것을 확인할 수 있습니다.

class StepExecution {
    //...
    /**
     * On successful execution just before a chunk commit, this method should be
     * called. Synchronizes access to the {@link StepExecution} so that changes
     * are atomic.
     *
     * @param contribution {@link StepContribution} instance used to update the StepExecution state.
     */
    public synchronized void apply(StepContribution contribution) {
        readSkipCount += contribution.getReadSkipCount();
        writeSkipCount += contribution.getWriteSkipCount();
        processSkipCount += contribution.getProcessSkipCount();
        filterCount += contribution.getFilterCount();
        readCount += contribution.getReadCount();
        writeCount += contribution.getWriteCount();
        exitStatus = exitStatus.and(contribution.getExitStatus());
    }
    //...
}

Task 기반 Tasklet의 특징

  • Chunk 기반으로 작업을 n개로 쪼개서 처리하기보다, 단일 작업 기반이 더 효율적인 경우 사용됩니다.
  • 주로 Tasklet 인터페이스 구현체를 만들어서 사용합니다.
public Step taskStep(){
    return stepBuilderFactory.get("taskStep")
            .tasklet(new CustomTasklet())
            .listener(new StepExecutionListener() {
                @Override
                public void beforeStep(StepExecution stepExecution) {}

                @Override
                public ExitStatus afterStep(StepExecution stepExecution) {return null;}
            })  // 1. Step 실행전과 후에 수행해야할 콜백 로직 설정
            .build();
}      
  • Chunk Buffering

Step API - startLimit / allowStartIfComplete

  • Step 재시작과 관련된 API로 startLimit , allowStartIfComplete API가 존재합니다.

startLimit API

  • step 실패시 step이 재수행될 수 있는데, 이떄 재수행횟수를 제한하는 API입니다.
  • 예를 들어서 startLimit 값을 3으로 주게 되면 3번까지만 step 재시작이 허용됩니다. 이후에는 org.springframework.batch.core.StartLimitExceededException 예외가 터집니다.
  • default값은 Integer.MAX_VALUE로 사실상 제한이 없습니다.

설정 예시

  • 아래 테스트 케이스는 startLimit을 3으로 주었을떄입니다.
public Step step(){
    return stepBuilderFactory.get("step")
            .tasklet((contribution, chunkContext) -> {
                System.out.println("step execute");
                throw new RuntimeException();
            })
            .startLimit(3)
            .build();
}
  • tasklet 로직에서 의도적으로 RuntimeException을 던지고 Job을 여러번까지 재시작하면 3번까지는 BATCH_JOB_STEP_EXECUTION테이블에 저장됩니다.
  • 이후에 step은 재시작하더라도 아래와 같은 예외가 던져지게 됩니다.
org.springframework.batch.core.StartLimitExceededException: Maximum start limit exceeded for step: stepStartMax: 3
  • 주의해야할 부분은 해당 API와 설정과 무관하게 Job Execution은 계속해서 일어나게 됩니다. 즉 BATCH_JOB_EXECUTION 테이블의 행은 누적됩니다. 즉 Step 재시작이 안되는 것 뿐 입니다. 아래 예시를 확인해보면 설정된 startLimit값 (3) 이후로는 StartLimitExceptionEXIT_MESSAGE로 기록되고 있습니다.

allowStartIfComplete API

  • step 실패시 step이 재시작될 수 있는데, 이떄 기존에 완료된 step은 재시작할지 말지를 결정하는 api입니다.
  • 기본값은 false로 완료된 step은 재시작을 수행하지 않습니다. 만약 true로 주게 된다면 이미 COMPLETE된 step이더라도 항상 재시작합니다.

설정 예시

  • 아래 테스트 케이스는 Job을 첫번쨰 step은 allowStartIfComplete값을 true로 준 step이고, 두번째 step은 항상 실패하는 step입니다. 즉 두번쨰 step에서 항상 실패함으로 step1은 allowStartIfComplete값이 true라면 같이 재시작됩니다.
@Bean
public Job batchJob() {
        return this.jobBuilderFactory.get("Job")
        .start(restartAllowableStep())
        .next(alwaysFailStep())
        .build();
}
@Bean
public Step restartAllowableStep(){
        return stepBuilderFactory.get("restartAllowableStep")
        .tasklet((contribution, chunkContext) -> RepeatStatus.FINISHED)
        .allowStartIfComplete(true)
        .build();
}
@Bean
public Step alwaysFailStep(){
        return stepBuilderFactory.get("alwaysFailStep")
        .tasklet((contribution, chunkContext) -> {
        throw new RuntimeException();
        })
        .startLimit(3)
        .build();
}
  • 5번 Job을 재시작해보면 allowStartIfCompletetrue로 준 첫번쨰 step은 이미 step이 완료되었음에도 불구하고 항상 재시작 되는 것을 확인할 수 있습니다.
  • 항상 실패하는 두번쨰 step은 startLimit 값을 초과하게 되면 재시작되지 않고, 첫번쨰 step만 계속해서 재시작됩니다.

언제 allowStartIfComplete API를 사용할 수 있을까?

  • 시점에 따라 데이터 값이 변경될 수 있어 데이터 유효성 점검을 수행하는 step의 경우에는 이전에 성공했던 step이더라도 재실행이 필요할 것입니다.

JobStep

  • Job을 포함하고 있는 Step입니다. 즉, Step안에 또 다른 Job이 있습니다.
  • JobStep안에 있는 Job이 실패하면, 해당 JobStep을 포함하고 있는 Job 역시 당연히 실패합니다.
  • Job 실행 관련 메타 데이터는 JobStep안에 Job도 별도로 DB에 저장됩니다.
  • 커다란 Job 하나를 작은 Job으로 쪼개서 관리하고자 할 때 사용될 수 있습니다.

설정 예시

  • 먼저 가장 상위 Job을 설정하고, 하위에 step을 JobStep 으로 구성합니다.
  • JobStep에 들어갈 Job이 사용할 parameter를 정의하기 위해서 parametersExtractor를 정의합니다.
@Bean
public Job parentJob(Step jobStep) {
    return this.jobBuilderFactory.get("parentJob") // 1. 가장 상위의 Job입니다.
                                 .start(jobStep)
                                 .next(parentStep())
                                 .build();
}

@Bean
public Step jobStep(JobLauncher jobLauncher) { 
        return stepBuilderFactory.get("jobStep") // 2. JobStep 입니다. 
        .job(childJob())   // 3. JobStep이 포함할 Job을 명시합니다.
        .launcher(jobLauncher) // 4. 해당 Job이 사용할 JobLauncher를 명시합니다.
        .parametersExtractor(jobParmetersExtractor()) // 5. 해당 Job에 넘겨줄 파라미터를 어떻게 가져올것인지 정의합니다.
        .listener(new StepExecutionListener() { // 6. step 실행 전후로 실행할 콜백메소드를 정의합니다.
            @Override
            public void beforeStep(StepExecution stepExecution) {
                    stepExecution.getExecutionContext().putString("name", "chansoo");
            }
            
            @Override
            public ExitStatus afterStep(StepExecution stepExecution) {
                    return null;
            }
        })
        .build();
}
  • Spring Batch에서는 parametersExtractor 인터페이스의 구현체인 DefaultJobParametersExtractor를 제공합니다.
  • DefaultJobParametersExtractor는 부모 Job의 parameter와 Step의 parameter를 가져올 수 있습니다. 이떄 가져올 parameter의 key값을 설정해줄 수 있습니다.
  • 위 예시 코드에서는 StepExecutionListener에서 JobStep이 실행되기전에 name parameter를 설정해서 넘겨주고 있습니다. 이럴때는 아래와 같이 key를 name으로 주면 설정된 value값을 들고 올 수 있습니다.
// stepExecutionContext에 있는 key를 찾아서 job에게 값을 넘겨줄 수 있습니다.
private JobParametersExtractor jobParmetersExtractor() {
        DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();
        extractor.setKeys(new String[]{"name"}); // 1. name key를 가진 parameter가 존재하는 경우 Job에 넘겨준다.
        return extractor;
}

배치 작업이 부모 Job,JobStep의 Job모두 성공한 경우가 아니라 JobStep의 Job만 성공한 case에는 어떻게 될까요? 아래와 같이 각각 3개의 케이스로 나눠볼 수 있을것입니다.

  1. 부모 Job, JobStep의 Job 모두 성공된 경우
  2. 부모 Job 실행 중 JobStep의 Job이 실패된 경우
  3. 부모 Job 실행 중 JobStep의 Job은 성공했으나, 부모 Job의 다른 Step 실패된 경우

부모 Job, JobStep의 Job 모두 성공된 케이스

  • 우선 성공/실패 케이스와 별개로 BATCH_JOB_INSTANCE 테이블에는 부모 Job와 JobStep의 Job 모두 데이터가 들어갑니다.
  • JobStep의 Job또한 별개의 Job Instance로 관리되기 때문에 BATCH_JOB_EXECUTION 테이블의 레코드도 2개가 생성됩니다.

부모 Job 실행 중 JobStep의 Job이 실패된 케이스

  • JobStep의 Job이 실패되면 부모의 Job도 실패처리가 됩니다. 즉 모든 Job이 실패된 것으로 간주됩니다.

아래와 같이 JobStep의 job의 step에서 무조건 RuntimeException을 던져서 실패하도록 설정하였습니다.

@Bean
public Step childStep() {
    return stepBuilderFactory.get("childStep")
                             .tasklet((contribution, chunkContext) -> {
                                 throw new RuntimeException("child job failed");
                             })
                             .build();
}
  • job 실행 후 BATCH_JOB_EXECUTION테이블을 확인해보면 부모 Job까지 모두 실패처리된것을 확인할 수 있고 부모 Job은 org.springframework.batch.core.UnexpectedJobExecutionException: Step failure: the delegate Job failed in JobStep. 예외가 던져집니다.
  • BATCH_STEP_EXECUTION테이블을 확인해보면 부모 Job의 다음 step은 실행처리가 안되고, JobStep의 job의 step까지만 처리하다가 실패된것으로 기록됩니다.

부모 Job 실행 중 JobStep의 Job은 성공했으나, 부모 Job의 다른 Step 실패된 경우

  • JobStep의 Job은 별개의 JobInstance로 취급되기 때문에 COMPLETED상태로 기록되고, 부모 Job만 FAILED상태로 기록됩니다. 즉 JobStep의 Job은 성공처리됩니다. BATCH_JOB_EXECUTION 테이블을 확인해보면 JobStep의 job은 성공처리된것을 확인할 수 있습니다.

Reference

[1] https://docs.spring.io/spring-batch/docs/current/reference/html/domain.html [2] https://docs.spring.io/spring-batch/docs/current/reference/html/step.html#configureStep

프로필 이미지
@chani
바둑 좋아하는 개발자의 의미있는 학습 기록을 위한 공간입니다.

댓글

이 게시글에 대한 의견을 공유해주세요!

댓글