본문 바로가기
Spring/Batch

Spring Batch(스프링배치) 5.1.x 버전 시작하기

by 딘딘은딘딘 2024. 9. 23.
반응형

 

스프링 배치에 대한 개인적인 테스트 및 궁금했던 부분을 실험 하고자 스프링 배치 프로젝트를 생성하기로 했다.

 

Spring Batch 5.1의 주요 테마는 JDK21 LTS를 수용하는 부분이라고 공식 문서에 작성 되어 있기 때문에

추후 더 많은 테스트와 확장성을 고려해 JAVA 버전은 21 버전을 사용하기로 선택 했다.
(최소 JAVA 17 이상은 사용 해야 한다)

 

먼저 배치를 생성하기 전에 배치를 사용하기 위해 필요한 Job, Steb, 메타테이블에 대한 개념만 간단하게

정리 하고 프로젝트를 구성하도록 한다.

 

Spring Batch는 JVM에서 대규모 데이터를 효율적으로 처리하기 위한 배치 처리 표준.

주로 데이터를 청크 단위로 읽고, 처리하고, 쓰는 청크 기반 처리

분할 처리(partitioning) 같은 일반적인 배치 처리 패턴을 지원하여 성능 높은 배치 애플리케이션을 구축하는 데 사용된다.

* Job?

  • Job은 배치 작업의 단위이며, 배치 프로세스를 정의하는 가장 큰 단위.
  • 하나의 Job은 여러 개의 Step으로 구성된다.
  • 배치를 실행할 때 스프링 배치는 Job을 트리거한다.

 

* Step?

 

  • Step은 Job 안에서 수행되는 하나의 독립적인 처리 단위이다
  • 각 Step은 데이터 읽기(Reader), 처리(Processor), 쓰기(Writer)의 순서로 작업을 수행된다.
  • Step에는 트랜잭션 처리를 위한 경계가 있으며, 각각의 Step은 커밋 단위를 정의합니다.

 

 

* Spring Batch MetaTable (메타테이블)

  • 스프링 배치 메타테이블이란 배치 작업(Job)과 관련된 실행 상태, 이력, 실패, 재시도 등의 정보를 관리하기 위해 데이터베이스에 저장되는 테이블들을 의미한다.
  • 스프링 배치는 이 메타데이터를 통해 배치 작업이 정상적으로 완료되었는지, 중간에 실패했는지, 또는 중단된 작업을 재시작해야 하는지 등을 관리 한다.
  • 스프링 배치에서는 배치 작업의 실행과 관련된 중요한 정보를 관리하기 위해 기본적으로 아래에 설명할 6개의 메타 테이블을 생성한다. 이러한 테이블들은 배치 작업의 실행 상태를 기록하고 추적할 수 있도록 도와주며, 이를 통해 작업을 모니터링하고 오류 발생 시 재시작하는데 유용하게 사용할 수 있다.

 

메타 테이블은 아래 6개의 테이블을 스프링 배치를 실행할때 설정에 따라 자동으로 생성해주며

공식 홈페이지에는 ERD 까지 제공한다.

 

 

6개 테이블들에 대한 각각의 설명은 다음과 같다.

BATCH_JOB_EXECUTION 배치 작업 실행에 대한 정보를 저장 (각 배치 작업의 실행에 대한 기본 정보와 상태를 포함)
BATCH_JOB_EXECUTION_CONTEXT 배치 작업 실행에 필요한 컨텍스트 정보를 저장 (예로 JobExecutionContext의 속성들이 여기에 저장)
BATCH_JOB_EXECUTION_PARAMS 배치 작업 실행에 전달된 매개변수 정보를 저장
BATCH_JOB_INSTANCE 배치 작업의 인스턴스에 대한 정보를 저장 (여러 번 실행될 수 있는 동일한 작업의 각 인스턴스를 식별)
BATCH_STEP_EXECUTION 배치 스텝의 실행에 대한 정보를 저장합니다. 각 스텝 실행에 대한 상태와 통계를 추적합니다.
BATCH_STEP_EXECUTION_CONTEXT 배치 스텝 실행에 필요한 컨텍스트 정보를 저장합니다.

 

스프링 배치의 메타테이블을 자동생성 하기 위해선 yml 파일에 다음과 같이 설정한다.

 

spring:
  batch:
    jdbc:
      initialize-schema: always # always: 배치 메타테이블 항상 생성 / never: 생성하지 않음 / embedded: H2 DB 인 경우 생성

 

BatchProperties의 initializeSchema 필드는 DatabaseInitializationMode의 Enum 타입으로 설정되어 있는데

DatabaseInitializationMode 타입은 아래와 같으며 3가지 속성으로 설정된다.

always: 배치 메타테이블 항상 생성

never: 생성하지 않음

embedded: H2 DB 인 경우 생성

 

간혹 always로 설정을 해도 메타 테이블이 생성이 안될 수 있는데 이때는 수동으로 테이블을 생성 해주면 된다.

org.springframework.batch.core

각 DBMS 별로 DDL을 추출 할 수 있고 해당 sql 을 실행해 DDL 문을 실행 해주면 된다.

 


기본 구성 설정

* 스프링 배치 시작하기

 

- Spring Initializr 로 생성 (자바 21)

 

 

- bulid.gradle

plugins {
    id 'java'
    id 'org.springframework.boot' version '3.3.4'
    id 'io.spring.dependency-management' version '1.1.6'
}

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(21)
    }
}

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-data-jdbc'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'org.springframework.batch:spring-batch-core:5.1.2'

    // logger changed setting logback->log4j2
    implementation "org.bgee.log4jdbc-log4j2:log4jdbc-log4j2-jdbc4.1:1.16"

    // spring data jpa, querydsl
    implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
    implementation "com.querydsl:querydsl-jpa:5.0.0:jakarta"
    annotationProcessor "com.querydsl:querydsl-apt:5.0.0:jakarta"
    annotationProcessor("jakarta.persistence:jakarta.persistence-api")
    annotationProcessor("jakarta.annotation:jakarta.annotation-api")

    implementation "org.apache.commons:commons-lang3:3.14.0"
    implementation "org.apache.commons:commons-text:1.11.0"

    // @ConfigurationProperties
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'

    //spring quartz - 배치 scheduling
    implementation 'org.springframework.boot:spring-boot-starter-quartz'

    compileOnly 'org.projectlombok:lombok'
    runtimeOnly 'com.mysql:mysql-connector-j'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.batch:spring-batch-test'
    testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

 

bulid.gradle에는 필요한 정보만 기재 했다.

 

implementation 'org.springframework.boot:spring-boot-starter-batch'

implementation 'org.springframework.batch:spring-batch-core:5.1.2'

위 두 종속성의 경우 배치를 사용하기 위한 필수 Dependency이고

 

implementation 'org.springframework.boot:spring-boot-starter-web' 의 경우는

RestController로 테스트를 진행하기 위해 추가 했다.

 

Spring Batch 5.x 버전을 사용하기 위해서는 Spring Framework 6 이상을 사용해야 하기 때문에

SpringBoot를 3.3.4 버전으로 설정했다.

 

5.x 버전에서 JPA는 javax.*이 아닌 jakarata 로 설정이 필요하다.

(https://github.com/spring-projects/spring-batch/wiki/Spring-Batch-5.0-Migration-Guide)

 

- application.yml

spring:
  datasource:
    driver-class-name: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
    url: jdbc:log4jdbc:mysql://127.0.0.1:43306/TESTDB?allowMultiQueries=true
    username: root
    password: 1234
    hikari:
      connection-timeout: 3000
      max-lifetime: 1740000
      maximum-pool-size: 20
  jpa:
    hibernate:
      ddl-auto: update
    open-in-view: false
    show-sql: false
    properties:
      hibernate:
        dialect: org.hibernate.dialect.MySQLDialect
        storage-engine: innodb
        hbm2ddl-auto: none  # must be none
        show-sql: true
        format_sql: true          # 포매터 적용
        use_sql_comments: true    # jpql 출력

  batch:
    job:
      enabled: false
    jdbc:
      initialize-schema: always # always: 배치 메타테이블 항상 생성 / never: 생성하지 않음 / embedded: H2 DB 인 경우 생성

logging:
  level:
    root: INFO
    org:
      hibernate:
        SQL: DEBUG
        type:
          descriptor.sql.basicBinder: TRACE
          BasicTypeRegistry: WARN
        transaction.JDBCTransaction: DEBUG
        jdbc.connectionManager: DEBUG
    org.springframework.orm.jpa.JpaTransactionManager: DEBUG
    org.springframework.boot.autoconfigure: ERROR
    org.springframework.data.repository.config: ERROR
    io.swagger.models.parameters.AbstractSerializableParameter: ERROR
    com.zaxxer.hikari: DEBUG
    jdbc:
      audit: off
      connection: off
      sqlonly: debug
      resultset: off
      resultsettable: info
      sqltiming: debug
    com.ani.study: info

 

일단은 배치 서버부터 띄워야 하지만,

로깅과 JPA 설정을 함께 추가 했다.

 

- DataSourceConfiguration.java

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.batch.BatchDataSource;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;

@Configuration
public class DataSourceConfiguration {
  @Value("${spring.datasource.driver-class-name}")
  private String driverClassName;

  @Value("${spring.datasource.url}")
  private String url;

  @Value("${spring.datasource.username}")
  private String username;

  @Value("${spring.datasource.password}")
  private String password;

  @Value("${spring.datasource.hikari.connection-timeout}")
  private String connectionTimeout;

  @Value("${spring.datasource.hikari.max-lifetime}")
  private String maxLifetime;

  @Value("${spring.datasource.hikari.maximum-pool-size}")
  private String maximumPoolSize;

  @Bean(name = "batchDataSource")
  @Primary
  @BatchDataSource
  public DataSource batchDataSource() {
    HikariConfig config = new HikariConfig();
    config.setDriverClassName(driverClassName);
    config.setJdbcUrl(url);
    config.setUsername(username);
    config.setPassword(password);
    config.setConnectionTimeout(Long.parseLong(connectionTimeout));
    config.setMaxLifetime(Long.parseLong(maxLifetime));
    config.setMaximumPoolSize(Integer.parseInt(maximumPoolSize));
    config.setConnectionTestQuery("SELECT 1");
    HikariDataSource paymentHikariDataSource = new HikariDataSource(config);
    return new LazyConnectionDataSourceProxy(paymentHikariDataSource);
  }
}

 

배치를 사용하기 위한 dataSource를 설정 해준다.

dataSource의 BeanName을 기본 값이 아닌 batchDataSource로 설정한 이유는

추후 다른 데이터 소스를 추가할 확장성을 고려해 따로 지정 하였다. (현재는 테스트 이기 때문에 한개만 지정)

 

스프링 배치의 DefaultBatchConfiguration 는 스프링 배치에서 제공하는 기본 설정 클래스로, 스프링 배치를 사용할 때 필요한 기본적인 설정을 제공하는 역할을 한다.

이 때문에 dataSource나 transactionManager의 빈 이름은 기본 값이 아닌 @Primary 어노테이션을 붙인 기본 빈을 명시적으로 지정 해줘야 빈 충돌이 발생하지 않는다.

 

따로 특별한 설정이 필요하지 않는다면 DefaultBatchConfiguration를 상속 받아 사용해도 문제가 없다 (오히려 더 간단해짐)

 

 

* DefaultBatchConfiguration의 주요 컴포넌트

  • JobRepository: 배치 작업의 실행 상태와 메타데이터를 관리하는 저장소
  • JobLauncher: 배치 작업을 시작하는 데 사용되는 인터페이스로, Job을 트리거하는 역할
  • JobRegistry: 등록된 배치 작업을 관리하는 레지스트리로, 이를 통해 Job을 검색할 수 있다.
  • PlatformTransactionManager: 배치 작업에서 사용할 트랜잭션을 관리.
  • JobExplorer: 실행 중이거나 완료된 배치 작업을 탐색하고 상태를 조회하는 데 사용.

 

 

- TransactionManagerConfig.java

import jakarta.persistence.EntityManagerFactory;
import java.util.HashMap;
import java.util.Map;
import javax.sql.DataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
import org.springframework.orm.jpa.JpaTransactionManager;
import org.springframework.orm.jpa.JpaVendorAdapter;
import org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean;
import org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

@Configuration
@EnableTransactionManagement // 트랜잭션 관리 활성화. @Transactional 을 사용해 선언적 트랜잭션으로 관리가 가능
@EnableJpaRepositories(
    entityManagerFactoryRef = "batchEntityManagerFactory",
    transactionManagerRef = "batchTransactionManager",
    basePackages = "com.ani.study.domain"
)
public class TransactionManagerConfig {

    @Value("${spring.jpa.properties.hibernate.dialect}")
    private String dialect;

    @Value("${spring.jpa.properties.hibernate.storage-engine}")
    private String storageEngine;

    @Value("${spring.jpa.properties.hibernate.show-sql}")
    private String showSql;

    @Value("${spring.jpa.properties.hibernate.hbm2ddl-auto}")
    private String hbm2ddlAuto;

    /**
     * JPA 구현체에 대한 설정을 반환
     * */
    public JpaVendorAdapter jpaVendorAdapter() {
        HibernateJpaVendorAdapter jpaVendorAdapter = new HibernateJpaVendorAdapter();
        jpaVendorAdapter.setShowSql(true); // 로그 출력
        return jpaVendorAdapter;
    }

    /**
     * 엔티티 매니저 팩토리 빈 생성
     * */
    @Bean(name = "batchEntityManagerFactory")
    @Primary // 기본 EntityManagerFactory 설정
    public LocalContainerEntityManagerFactoryBean batchEntityManagerFactory(@Qualifier("batchDataSource") DataSource dataSource) {
        LocalContainerEntityManagerFactoryBean em = new LocalContainerEntityManagerFactoryBean();
        em.setDataSource(dataSource);
        em.setJpaVendorAdapter(jpaVendorAdapter());
        em.setJpaPropertyMap(hibernateJpaProperties());
        em.setPackagesToScan("com.ani.study.domain.entity");  // JPA 엔티티 위치 설정
        return em;
    }

    /**
     * 하이버네이트 관련 설정 반환
     * */
    private Map<String, ?> hibernateJpaProperties() {
        HashMap<String, String> properties = new HashMap<>();

        properties.put("hibernate.dialect", dialect);
        properties.put("hibernate.dialect.storage_engine", storageEngine);
        properties.put("hibernate.show_sql", showSql);
        properties.put("hibernate.format_sql", "true");
        properties.put("hibernate.hbm2ddl.auto", hbm2ddlAuto);
        return properties;
    }

    /**
     * 트랜잭션 매니저 설정
     * */
    @Bean(name = "batchTransactionManager")
    @Primary
    public PlatformTransactionManager batchTransactionManager(@Qualifier("batchEntityManagerFactory") EntityManagerFactory entityManagerFactory) {
        return new JpaTransactionManager(entityManagerFactory);
    }
}

 

추후에 트랜잭션 관련 테스트도 진행할 예정이기 때문에

트랜잭션 매니저도 설정해준다.

 


테스트를 위한 작업

이제 설정이 끝났으면 JobConfiguration을 작성한다.

- TestJobConfiguration.java

import java.util.HashMap;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class TestJobConfiguration {

  private final JobRepository jobRepository;

  private final PlatformTransactionManager batchTransactionManager;

  private int itemReaderCnt = 1;

  @Bean
  public Job testJob() {
    return new JobBuilder("testJob", jobRepository)
        .start(testStep())
        .build();
  }

  @Bean
  public Step testStep() {
    return new StepBuilder("testStep", jobRepository)
        .<Map<String, String>, Map<String, String>>chunk(2, batchTransactionManager)
        .reader(testItemReader())
        .writer(testItemWriter())
        .build();
  }

  @Bean
  public ItemReader<Map<String, String>> testItemReader() {
    return () -> {
      if(itemReaderCnt == 5) return null;

      Map<String, String> testMap = new HashMap<>();
      testMap.put("test" + itemReaderCnt, "테스트" + itemReaderCnt);
      itemReaderCnt++;
      return testMap;
    };
  }

  @Bean
  public ItemWriter<Map<String, String>> testItemWriter() {
    return chunk -> {
      log.info("Chunk size: {}", chunk.size());
      chunk.forEach(item -> log.info("Processing item: {}", item));
    };
  }
}

 

추후 Chunk 방식으로 테스트를 진행할 예정이라 청크 방식으로 진행을 했고 청크사이즈는 2로 지정했다.

테스트를 위해 itemRederCnt를 1로 설정을 헀고,

ItemReader에서 Map에 test + itemRederCnt 를 진행하고 5가 되었을때 멈추도록 진행 했다

(ItemReader의 경우 null을 반환할 때까지는 무한 루프를 돌기 때문에 Break를 5번째로 지정했다.)

 

 

- JobService.java

위에서 생성한 Job을 실행하기 위에서 Controller와 Service를 생성할 차례이다.

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@Service
@Slf4j
@RequiredArgsConstructor
public class JobService {
  private final JobLauncher jobLauncher;
  private final ApplicationContext context;

  /**
   * Spring Batch Job 을 실행한다.
   * @param jobName batch job name
   *
   * */
  @Async
  public void invokeJob(String jobName) throws JobInstanceAlreadyCompleteException, JobExecutionAlreadyRunningException, JobParametersInvalidException, JobRestartException {
    String date = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));

    JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
    jobParametersBuilder.addString("Sync date", date);

    log.info("job parameters info > : {}", jobParametersBuilder.toJobParameters().getParameters());
    var jobToStart = context.getBean(jobName, Job.class); // Job 빈을 스프링 컨텍스트에서 가져옴
    jobLauncher.run(jobToStart, jobParametersBuilder.toJobParameters()); // Job을 실행하고, 해당 작업에 JobParameters를 전달
  }
}

 

@Async : 배치 작업을 비동기적으로 처리하기 위해 사용되었으며, 작업이 완료될 때까지 기다리지 않고 메인 스레드에서 비동기로 배치 작업을 실행하기 위해 설정

JobLauncher : Job과 JobParameters를 받아 배치 작업을 실행하는 역할을 한다.

ApplicationContext :  런타임 시점에 Job 빈을 동적으로 가져온다. jobName이라는 파라미터로 전달받은 이름을 기반으로, 해당 이름을 가진 Job을 스프링 컨테이너에서 가져온다.

 

JobParametersBuilder를 사용하여 현재 시간(date)을 파라미터로 추가하고, 이를 배치 작업 실행 시 함께 전달한다.

배치 작업이 실행될 때마다 고유한 실행 파라미터를 요구하기 때문에, 여기서 시간 정보를 파라미터로 추가해 중복된 실행을 방지한다.

 

- BatchJobController.java

import com.ani.study.domain.service.JobService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RequiredArgsConstructor
@Slf4j
@RestController
public class BatchJobController {

  private final JobService jobService;

  @GetMapping("/start")
  public String startJob(@RequestParam(name = "jobName") String jobName) throws Exception {
    log.debug("\n\n### JobController.startJob \n\n.");
    jobService.invokeJob(jobName);
    return "Job Started...";
  }
}

 


테스트

http://localhost:8080/start?jobName=testJob

 

Controller의 Method를 Get으로 설정 했기 때문에 Postman으로 테스트를 할 필요가 없다.

배치 서버를 띄운 뒤 브라우저에 위 경로를 입력하고 실행한다.

 

청크 사이즈를 2로 설정 하고 2번 반복되는 것과 테스트 1~4까지 로그가 찍히는 것을 확인 할 수 있다.

 

 

참고 :

https://docs.spring.io/spring-batch/reference/job/java-config.html

https://taes-k.github.io/2021/03/01/spring-batch-table/

https://docs.spring.io/spring-batch/reference/

https://jiwondev.tistory.com/295

https://docs.spring.io/spring-batch/reference/schema-appendix.html#recommendationsForIndexingMetaDataTables

 

 

 

반응형