본문 바로가기
Spring/Batch

Spring Batch(스프링배치) Chunk방식의 Repeat과 Skip(2) - 실패한 작업 제어

by 딘딘은딘딘 2024. 9. 26.
반응형

Spring Batch(스프링배치) Chunk방식의 Repeat과 Skip(1) - 실패한 작업 제어

 

Spring Batch(스프링배치) Chunk방식의 Repeat과 Skip(1) - 실패한 작업 제어

스프링 배치의 Step을 실행하는 도중특정 요인 ( Ex) 일시적 네트워크 지연, 처리 도중 상태 변경 등 ) 에 따라 작업이 실패하는 경우가 발생한다.스프링 배치는 이를 해결하기 위해 Spring의 Retry와 S

anianidindin.tistory.com

 

지난 글에서는 재시도 로직의 흐름을 이해하기 위해 Repeat과 FaultTolerant 그리고 Stpe의 흐름에 대한 내용을 작성 했었다.
이번 글은 스프링 배치 청크 방식에서 실패한 작업에 대한 Skip에 대한 글을 이어서 작성하고

Retry에 대한 글은 3번째 글에서 작성하도록 한다.

 

 

Skip

 

  • 배치 실행 중 특정 예외가 발생하면 해당 레코드(진행중인 1개의 청크 단위)를 실패로 간주하지 않고,
    기록을 남기고 다음 단계로 넘어가도록 하는 기능
  • Skip은 ItemReader, ItemProcessor, ItemWriter 각각의 프로세스에서 다르게 작동한다.
  • 데이터의 일관성을 유지하면서 사소한 예외로 인한 전체 처리를 멈추지 않기 위해 사용 된다.
  • 그러나 스킵 된 데이터에 대한 후처리(로깅, 재시도 등)가 없으면 데이터 신뢰성이 떨어질 수 있다

 

ItemReader
@Configuration
@RequiredArgsConstructor
@Slf4j
public class TestJobConfiguration {

  private final JobRepository jobRepository;

  private final PlatformTransactionManager batchTransactionManager;

  private int itemReaderCnt = 0;

  @Bean
  public Job testJob() {
    return new JobBuilder("testJob", jobRepository)
        .start(testStep())
        .build();
  }

  @Bean
  public Step testStep() {
    return new StepBuilder("testStep", jobRepository)
        .<Map<String, String>, Map<String, String>>chunk(5, batchTransactionManager)
        .reader(testItemReader())
        .writer(testItemWriter())
        .faultTolerant()  // 내결함성 활성화
        .skip(TestCusotmException.class) // 커스텀 예외 발생시 Skip 발생
        .skipLimit(3) // 최대 3번까지 스킵 허용
        .build();
  }

  @Bean
  public ItemReader<Map<String, String>> testItemReader() {
    return () -> {
      itemReaderCnt++;
      if(itemReaderCnt == 3) {
        throw new TestCusotmException();
      }
      if(itemReaderCnt == 11) return null;

      Map<String, String> testMap = new HashMap<>();
      testMap.put("test" + itemReaderCnt, "테스트" + itemReaderCnt);
      log.info("itemReaderMap : {}", testMap);
      return testMap;
    };
  }

  @Bean
  public ItemWriter<Map<String, String>> testItemWriter() {
    return chunk -> {
      log.info("Chunk size: {}", chunk.size());
      chunk.forEach(item -> log.info("Processing item: {}", item));
    };
  }
}

 

StepBuilder에서

위와 같이 내결함성을 활성화 및 skip 예외를 설정해주고

ItemReader에서 청크사이즈를 5로, count가 3인 경우 TestCustomException을 던지도록 설정을 했다

 

 

실행 결과 test3 이 스킵되고 나머지 데이터가 Processor로 넘어간 것을 확인할 수 있다.

 

* ItemReader에서 Skip 사용 예시)
Reader에서 외부 인터페이스 호출해 데이터 Read, 파일(Excel,txt,CSV 등) 을 읽은 후 대량 처리 할때 사용

 

 

ItemProcessor
@Bean
public Step testStep() {
  return new StepBuilder("testStep", jobRepository)
      .<Map<String, String>, Map<String, String>>chunk(5, batchTransactionManager)
      .reader(testItemReader())
      .processor(testItemProcessor())
      .writer(testItemWriter())
      .faultTolerant()  // 내결함성 활성화
      .skip(TestCusotmException.class) // 커스텀 예외 발생시 Skip 발생
      .skipLimit(3) // 최대 3번까지 스킵 허용
      .build();
}

@Bean
public ItemReader<Map<String, String>> testItemReader() {
  return () -> {
    itemReaderCnt++;
    if(itemReaderCnt == 11) return null;

    Map<String, String> testMap = new HashMap<>();
    testMap.put("test" + itemReaderCnt, "테스트" + itemReaderCnt);
    log.info("itemReaderMap : {}", testMap);
    return testMap;
  };
}

@Bean
public ItemProcessor<Map<String, String>, Map<String, String>> testItemProcessor() {
  return item -> {
    log.info("ItemProcessorMap : {}", item);

    String testStr = item.get("test3");

    if(testStr != null && testStr.equals("테스트3")) {
      throw new TestCusotmException();
    }

    return item;
  };
}

@Bean
public ItemWriter<Map<String, String>> testItemWriter() {
  return chunk -> {
    log.info("ItemWriter Start");
    chunk.forEach(item -> log.info("ItemWriter item: {}", item));
  };
}

 

 

실행결과

  • ItemReader에서 5건을 조회 후 Processor 처리 도중 3번째에서 실패가 발생하면
    트랜잭션을 롤백한 후 해당 청크의 처음으로 되돌아 간 후 첫번째 Item 부터 예외가 발생한 Item을 제외하고 처리한다.
  • ItemReader에서 Processor로 넘어온 하나의 청크의 아이템들은 JVM 메모리에 일시적으로 캐싱된다.
    즉, 청크가 완전히 처리될 때까지 아이템들이 메모리에서 유지되기 때문에 실패 후 처음으로 돌아가도 다시 실행이 가능하다.
    Writer에서 처리 후 청크가 종료되면 (트랜잭션이 커밋되는 시점) 해당 데이터도 삭제된다.

 

ItemWriter

 

ItemWriter에서는 현재 청크에서 특정 예외가 발생 했을때 해당 Item을 스킵하는지와

스킵된 Item만 롤백이 되고 나머지는 정상 Commit이 되는지, 그리고 ItemWriter에서 Skip이 발생하는 흐름을 테스트 했다.

 

엔티티와 데이터는 다음과 같이 준비되었다.

mem_id가 3일 때 save 메서드 호출 이후 예외가 발생하도록 설정할 예정이며

정상적인 흐름이라면 홍길동3의 경우만 status가 ACTIVE 변경되지 않고 나머지는 WITHDRAW 로 변경이 되어야 한다.

 

Member

@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
@AllArgsConstructor
@Builder
@Entity
@EntityListeners({AuditingEntityListener.class})
@Table(name = "member")
public class Member {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  @Column(name = "mem_id")
  private Long id;

  @Column(name = "mem_nm")
  private String memNm;

  @Column(name = "status")
  private String status;
  
  public void applyMemberStatus(String status) {
    this.status = status;
  }
}

 

Data

 

TestJobConfiguration

@Configuration
@RequiredArgsConstructor
@Slf4j
public class TestJobConfiguration {

  private final JobRepository jobRepository;

  private final PlatformTransactionManager batchTransactionManager;

  private final TestService testService;

  private int itemReaderCnt = 0;

  @Bean
  public Job testJob() {
    return new JobBuilder("testJob", jobRepository)
        .start(testStep())
        .build();
  }

  @Bean
  public Step testStep() {
    return new StepBuilder("testStep", jobRepository)
        .<Map<String, Integer>, Map<String, Integer>>chunk(5, batchTransactionManager)
        .reader(testItemReader())
        .processor(testItemProcessor())
        .writer(testItemWriter())
        .faultTolerant()  // 내결함성 활성화
        .skip(TestCusotmException.class) // 커스텀 예외 발생시 Skip 발생
        .skipLimit(3) // 최대 3번까지 스킵 허용
        .build();
  }

  @Bean
  public ItemReader<Map<String, Integer>> testItemReader() {
    return () -> {
      itemReaderCnt++;
      if(itemReaderCnt == 11) return null;

      Map<String, Integer> testMap = new HashMap<>();
      testMap.put("test" + itemReaderCnt, itemReaderCnt);
      log.info("itemReaderMap : {}", testMap);
      return testMap;
    };
  }

  @Bean
  public ItemProcessor<Map<String, Integer>, Map<String, Integer>> testItemProcessor() {
    return item -> {
      log.info("ItemProcessor item : {}", item);
      return item;
    };
  }

  @Bean
  public ItemWriter<Map<String, Integer>> testItemWriter() {
    return chunk -> {
      log.info("ItemWriter items : {}", chunk);
      chunk.getItems().forEach(testService::memberUpdate);
    };
  }
}

 

TestService

@Service
@Slf4j
@RequiredArgsConstructor
public class TestService {
  private final MemberJPARepository memberJPARepository;

  public void memberUpdate(Map<String, Integer> map) {
    try {
      map.forEach((key, value) -> {
        Long id = Long.parseLong(String.valueOf(value));

        Member member = memberJPARepository.findById(id)
            .orElseThrow(RuntimeException::new);

        String status = "WITHDRAW";

        member.applyMemberStatus(status);

        memberJPARepository.save(member);
        
        log.info("Member >>>> memId : {} status : {}", member.getId(), member.getStatus());
        if(id.equals(3L)) {
          throw new TestCusotmException();
        }
      });
    } catch (TestCusotmException e) {
      log.error("커스텀 에러 발생", e);
      throw e;
    } catch (Exception e) {
      log.error("기타에러", e);
      throw e;
    }
  }
}

 

 

실행결과

 

데이터는 정상적으로 변경이 되었고,

 

로그는 아래와 같이 기록 되었다.

 

 

로그가 발생한 순서대로 나열을 하고 확인을 해보니

 

  • 최초에는 트랜잭션이 Open 되고 item1~5까지 Processor에서 처리한 Item의 목록이 들어오게 된다.
    이때 3번째 item을 처리할떄 커스텀 예외가 발생하게 되면 현재 청크의 처음 Item으로 돌아가게 된다.
  • ItemWriter에서 Skip이 발생한 후 현재 청크의 처음 Item으로 돌아오게 되면 ItemProcessor에서 1건씩 Writer로 넘어오게 된다.
  • ItemProcessor에서는 현재 청크가 끝날때까지 1건씩 트랜잭션을 새로 Open 하고 Writer 한다.
  • ItemWriter에서 3번째 item을 다시 실행할 때 커스텀 예외가 발생하면 트랜잭션이 롤백 되고 다음 item을 수행한다.
  • 현재 청크가 종료되고 다음 청크가 수행되면 배치는 이전과 동일하게 작동
반응형