본문 바로가기
Spring/Batch

Spring Batch(스프링배치) Chunk방식의 Repeat과 Skip(3) - 실패한 작업 제어

by 딘딘은딘딘 2024. 9. 30.
반응형

Spring Batch(스프링배치) Chunk방식의 Repeat과 Skip(1) - 실패한 작업 제어

Spring Batch(스프링배치) Chunk방식의 Repeat과 Skip(2) - 실패한 작업 제어

 

Spring Batch의 Skip에 이어 Retry에 대해 작성한다.

 

 

Retry

 

  • 특정 예외가 발생했을때 재시도를 수행할 수 있으며, 이를 이용해 네트워크 지연, 타임아웃과 같은 일시적인 문제를 해결하여 불필요한 실패를 방지할 수 있다.
  • 스프링 배치의 Retry는 Spring Retry 모듈과 통합되어 있어, 복잡한 재시도 로직도 유연하게 구성할 수 있다.
  • Retry의 프로세스는 ItemProcessor에서는 Skip과 동일하고,  ItemWriter에서는 Skip의 경우에는 한건씩 처리가 되었었지만,
    Retry는 다건 처리 형태가 유지되어 진행된다.
  • Retry의 경우 ItemProcessor와 ItemWriter에서 적용이 이루어진다.
  • 청크 방식에서 재시도 로직이 적용되면 RetryTemplate.execute() 안에서 실행이 된다.
    (재시도 적용되지 않는 경우 일반적인 방식으로 배치프레임워크 내에서 실행)
  • 재시도 후에도 실패하면 결국 배치 작업이 실패로 끝날 수 있으며, 이때 recover에서 후속처리가 가능하다.

 

테스트는 크게 두 가지 케이스로 진행을 할 예정이다.

쉽고 간단하게 사용할 수 있는 내결함성을 활성화 한 Retry

 

복잡하지만, Recover과 재시도 정책에 대해 더 자세히 설정이 가능한

RetryTemplate를 사용한 Retry를 테스트 한다.

 

 

먼저 내결함성을 사용하여 ItemWriter에서 Retry가 적용되는 것을 테스트 해본다.

 

CustomBackOffPolicy.java

@NoArgsConstructor
public class CustomBackOffPolicy extends FixedBackOffPolicy {
    /**
     * FixedBackOffPolicy 의 period는 생성자로 설정이 불가능 하기 때문에 상속을 받아 생성한다.
     * 생성자로 객체를 생성시 period를 파라미터로 받을 수 있으며 설정하고자 하는 시간을 설정 할 수 있다.
     * Ex) 60000 은 60초 = 1분
     * default 는 1초로 설정된다.
    */
    public CustomBackOffPolicy(long period) {
        this.setBackOffPeriod(period);
    }

}

 

가장 기본적인 재시도 주기 설정을 위해 FixedBackOffPolicy를 상속받은 CustomBackOffPolicy를 생성했다.

 

- 내결함성을 적용 후 특정예외 발생 시 3번 재시도하는지 확인하는 코드

@Configuration
@RequiredArgsConstructor
@Slf4j
public class TestRetryJobConfiguration {

  private final JobRepository jobRepository;

  private final PlatformTransactionManager batchTransactionManager;

  private int itemReaderCnt = 0;

  @Bean
  public Job testRetryJob() {
    return new JobBuilder("testRetryJob", jobRepository)
        .start(testRetryStep())
        .build();
  }

  @Bean
  public Step testRetryStep() {
    return new StepBuilder("testRetryStep", jobRepository)
        .<Map<String, Integer>, Map<String, Integer>>chunk(5, batchTransactionManager)
        .reader(testRetryItemReader())
        .processor(testRetryItemProcessor())
        .writer(testRetryItemWriter())
        .faultTolerant()  // 내결함성 활성화
        .retry(TestCusotmException.class) // 커스텀 예외 발생시 Retry 수행
        .retryLimit(3) // 최대 3번까지 재시도
        .backOffPolicy(new CustomBackOffPolicy(3000L)) // 3초 주기로 실행
        .build();
  }

  @Bean
  public ItemReader<Map<String, Integer>> testRetryItemReader() {
    return () -> {
      itemReaderCnt++;
      if(itemReaderCnt == 11) return null;

      Map<String, Integer> testMap = new HashMap<>();
      testMap.put("test" + itemReaderCnt, itemReaderCnt);
      log.info("itemReaderMap : {}", testMap);
      return testMap;
    };
  }

  @Bean
  public ItemProcessor<Map<String, Integer>, Map<String, Integer>> testRetryItemProcessor() {
    return item -> {
      log.info("ItemProcessor item : {}", item);
      return item;
    };
  }

  @Bean
  public ItemWriter<Map<String, Integer>> testRetryItemWriter() {
    return chunk -> {
      log.info("ItemWriter items : {}", chunk);
      chunk.forEach(item -> {
        log.info("ItemWriter : item : {}", item.toString());
        item.forEach((key, value) -> {
          if(value == 3 ) {
            throw new TestCusotmException();
          }
        });
      });
    };
  }
}

 

 

 

실행결과

3초 간격으로 2번 더 추가 실행되어 총 3번 실행 된 것을 확인할 수 있다.

특이점으로 ItemReader은 한번만 실행이 되었는데, Skip과 동일하게

ItemReader에서 Processor로 넘어온 하나의 청크의 아이템들은 JVM 메모리에 일시적으로 캐싱 되었기 때문에 한번만 실행된 것을 확인 할 수 있다.

 


- 커스텀 RetryTemplate을 사용한 더 자세한 Retry 사용해보기

먼저 기본 설정이 적용된 Retry가 아닌 커스터마이징한 Retry를 사용하기 위해서는

아래 파일들을 만들어 주어야 한다.

ItemWriter를 구현한 커스텀 클래스를 생성해준다.

 

BatchRetryableItemWriter.Java

@Slf4j
public class BatchRetryableItemWriter<T> implements ItemWriter<T> {

  private final ItemWriter<T> itemWriter;
  private final RetryTemplate retryTemplate;
  private final List<Class<? extends Throwable>> retryExceptionList;

  public BatchRetryableItemWriter(ItemWriter<T> itemWriter, RetryTemplate retryTemplate, List<Class<? extends Throwable>> retryExceptionList) {
    this.itemWriter = itemWriter; //
    this.retryTemplate = retryTemplate;
    this.retryExceptionList = retryExceptionList; // 재시도 예외 목록
  }

  @Override
  public void write(@NonNull Chunk<? extends T> chunk) throws Exception {
    retryTemplate.execute(
        context -> {
          try {
            itemWriter.write(chunk);
          } catch (Exception e) {
            if(retryExceptionList.contains(e.getClass())) {
              log.error("[BatchRetryableItemWriter] 재시도 대상 예외 발생! 재시도 횟수 : {}", context.getRetryCount() + 1);
            }
            throw e;
          }

          return null; /* 성공한 경우 Null Retry 없음 */
        },
        failContext -> { // Recover 영역
          // 최대 재시도인 경우 로깅
          if(isMaxRetry(failContext)) {
            log.error("최대 재시도 횟수 초과. 재시도 횟수: {}", failContext.getRetryCount());
          }

          return null;
        });
  }

  /**
   * 현재 예외가 최대 재시도 횟수를 초과 했는지 확인한다.
   * */
  private boolean isMaxRetry(RetryContext context) {
    Object isExhausted = context.getAttribute(RetryContext.EXHAUSTED);

    return isRetryThrowable(context) && (isExhausted != null && (boolean) isExhausted);
  }

  /**
   * 마지막으로 발생한 예외가 재시도 대상 예외인지 확인한다.
   * */
  private boolean isRetryThrowable(RetryContext context) {
    return retryExceptionList.contains(context.getLastThrowable().getClass());
  }
}

 

RetryTemplateProvider.Java

@Component
@RequiredArgsConstructor
@Slf4j
public class RetryTemplateProvider {

    public RetryTemplate createRetryTemplate(int maxAttempts, Long backOffPeriod
        , List<Class<? extends Throwable>> retryableExceptionList) {
        RetryTemplate retryTemplate = new RetryTemplate();

        // 특정 예외에 대해서만 재시도하도록 설정
        Map<Class<? extends Throwable>, Boolean> retryableExceptions = retryableExceptionList.stream()
            .collect(Collectors.toMap(
                e -> e,
                b -> true
            ));

        // Retry 정책 생성
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(maxAttempts, retryableExceptions);

        retryTemplate.setRetryPolicy(retryPolicy);

        // 재시작 주기 설정
        if(backOffPeriod != null) {
            CustomBackOffPolicy backOffPolicy = new CustomBackOffPolicy(backOffPeriod);
            retryTemplate.setBackOffPolicy(backOffPolicy);
        }

        return retryTemplate;
    }
}

 

TestRetryJobConfiguration.Java

@Configuration
@RequiredArgsConstructor
@Slf4j
public class TestRetryJobConfiguration {

  private final JobRepository jobRepository;

  private final PlatformTransactionManager batchTransactionManager;
  private final RetryTemplateProvider retryTemplateProvider;

  private int itemReaderCnt = 0;

  @Bean
  public Job testRetryJob() {
    return new JobBuilder("testRetryJob", jobRepository)
        .start(testRetryStep())
        .build();
  }

  @Bean
  public Step testRetryStep() {
    // 재시도 대상 예외 목록 설정
    List<Class<? extends Throwable>> retryableExceptionList = List.of(TestCusotmException.class, TestCusotmException2.class);
    RetryTemplate retryTemplate = retryTemplateProvider.createRetryTemplate(2, 3000L, retryableExceptionList);

    return new StepBuilder("testRetryStep", jobRepository)
        .<Map<String, Integer>, Map<String, Integer>>chunk(5, batchTransactionManager)
        .reader(testRetryItemReader())
        .processor(testRetryItemProcessor())
        .writer(new BatchRetryableItemWriter<>(testRetryItemWriter(), retryTemplate, retryableExceptionList))
        .build();
  }

  @Bean
  public ItemReader<Map<String, Integer>> testRetryItemReader() {
    return () -> {
      itemReaderCnt++;
      if(itemReaderCnt == 11) return null;

      Map<String, Integer> testMap = new HashMap<>();
      testMap.put("test" + itemReaderCnt, itemReaderCnt);
      log.info("itemReaderMap : {}", testMap);
      return testMap;
    };
  }

  @Bean
  public ItemProcessor<Map<String, Integer>, Map<String, Integer>> testRetryItemProcessor() {
    return item -> {
      log.info("ItemProcessor item : {}", item);
      return item;
    };
  }

  @Bean
  public ItemWriter<Map<String, Integer>> testRetryItemWriter() {
    return chunk -> {
      log.info("ItemWriter items : {}", chunk);

      chunk.forEach(item -> {
        log.info("ItemWriter : item : {}", item.toString());
        item.forEach((key, value) -> {
          if(value == 3 ) {
            throw new TestCusotmException();
          }
        });
      });
    };
  }
}

 

 

실행결과

ItemWriter를 구현한 BatchRetryableItemWriter에서 retryTemplate.execute()에

첫번째 파라미터로 itemWriter를 실행시키고

두번째 파라미터로 recover 콜백을 구현헀다.

 

테스트를 진행 해보니 3번째 item에서 예외가 발생했고,

3초 주기로 2번 재시도 후 트랜잭션이 종료되는 것을 확인 할 수 있다.

 

내결함성을 적용하면 간편하게 retry를 사용할 수 있지만,

직접 커스텀해서 구현을 하면 재시도를 했음에도 불구하고 실패한 경우에 대한 예외 처리가 가능하다.

반응형