티스토리 뷰

728x90
반응형

Batch는 대량의 데이터를 한 번에 처리하는 작업 단위를 말합니다. Batch 처리 과정은 데이터를 입수(Fetch)하고, 가공(Transform)한 뒤 데이터를 저장(Save) 또는 외부로 전송(Output) 단계로 진행됩니다. Spring Batch는 Batch처리 작업을 위한 소스코드를 손쉽게 작성을 도와주는 라이브러리입니다. 단, Spring Batch는 스케쥴러가 아니므로, 주기적으로 Batch 작업을 실행시키고자 한다면, 스케쥴러에 해당하는 Quartz, Tivoli, Control-M과 함께 사용하셔야 합니다. 이번 포스팅에서는 Spring Boot에서 Batch처리 서비스를 만드는 방법에 대해 알아보겠습니다.

Springboot Batch 프로젝트 다운로드

https://start.spring.io/ 에 접속하셔서, Dependency에서 Spring Batch and HyperSQL Database를 선택하여 프로젝트를 다운로드합니다. 나머지 환경설정은 사용자의 개발환경에 맞게 설정해줍니다. 제가 설정한 프로젝트는 아래 사진을 참고 바랍니다.

SpringBatch 프로젝트 다운로드

프로젝트를 다운로드하고, IntelliJ로 오픈한 후 build.gradle 파일을 열어보시면, Dependency에 Spring Batch HyperSQL Database가 추가된 것을 확인하실 수 있습니다.

dependencies {
   implementation 'org.springframework.boot:spring-boot-starter-batch'
   runtimeOnly 'org.hsqldb:hsqldb'
   testImplementation 'org.springframework.boot:spring-boot-starter-test'
   testImplementation 'org.springframework.batch:spring-batch-test'
}

Batch 작업 처리 단계는 데이터 입수, 가공, 처리(저장, 외부 전송)입니다. Spring Batch에서 배치를 처리하는 방식을 살펴보겠습니다.

SpringBatch 아키텍처

Spring Batch코드를 작성하기에 앞서, Spring Batch에서 Batch를 처리하는 패턴, 즉 동작원리에 대해 알아보겠습니다.

SpringBatch 아키텍처

Spring Batch 공식 문서에 Batch Job 처리 관련 아키텍처 다이어그램을 확인할 수 있습니다. 여기서 나오는 용어에 대해서 간략한 설명은 아래와 같습니다. 

  • Job : 처리해야 할 전체 Batch 프로세스 
    • JobInstance : Job이 실행되었을 때, 논리적인 실행 단위
    • JobParameter : Job이 실행될 때, 참조하는 파라미터
  • JobRepository: JobLauncher, Job, Step의 CRUD 오퍼레이션을 지원하는 메커니즘
  • JobLauncher : JobParameter가 주어졌을 때, Job을 실행하는 역할
  • Step : Job을 실행하는 단계
  • ItemReader : 외부로부터 데이터를 읽어오는 개체
  • ItemProcessor : 읽어온 데이터를 가공 및 처리하는 개체
  • ItemWriter : 처리된 데이터를 외부로 출력하는 개체

Spring Batch에서 Batch가 실행되는 시나리오를 생각해보면, JobRepository에 실행할 Job이 존재한다면, Job Launcher는 Job과 JobParameter정보로 실행하여 JobInstance가 생성됩니다. 그리고 JobInstance에는 실행해야 할 Step단계가 있으며, Step별로 ItemReader, ItemProcessor, ItemWriter 유틸리티 클래스를 활용하여 작업을 처리합니다. 작업을 처리하며 처리된 결과 및 상태 값은 JobRepository에 저장됩니다. 다음으로 Spring Batch로 데이터 입수, 가공, 처리하는 프로그램을 소스코드로 작성해보겠습니다.

데이터 Fetch

먼저 데이터를 csv파일로부터 읽어, 클래스로 로딩하기 위한 시나리오를 작성해보겠습니다. 다운로드한 프로젝트 내에서 아래 데이터를 src/main/resources/sample-data.csv 경로에 파일로 저장합니다.

Jill,Doe
Joe,Doe
Justin,Doe
Jane,Doe
John,Doe

sample-data.csv에 저장된 콘텐츠를 읽어와 저장할 도메인 클래스를 아래와 같이 작성합니다. 도메인 클래스는 읽어 온 데이터를 표현하는 데이터라고 이해하시면 됩니다.

package com.example.demo;

public class Person {

    private String lastName;
    private String firstName;

    public Person() {
    }

    public Person(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getFirstName() {
        return firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "firstName: " + firstName + ", lastName: " + lastName;
    }
}

Spring Batch에 데이터 입수 처리를 지시할 코드를 BatchConfiguration 클래스에 작성해보겠습니다.

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    @Bean
    public FlatFileItemReader<Person> reader() {
        return new FlatFileItemReaderBuilder<Person>()
                .name("personItemReader")
                .resource(new ClassPathResource("sample-data.csv"))
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .build();
    }
}

소스코드에 대한 설명은 아래와 같습니다.

  • @EnableBatchProcessing : Spring Batch Job 처리에 필요한 Bean들을 사용한다는 어노테이션 (여기서는 memory-based Database를 사용하고자 선언)
  • reader() : SpringBatch에서 정의된 ItemReader 객체를 활용해, 데이터를 읽어오는 함수. 읽어온 데이터를 Person 객체로 변환

데이터 Transform

ItemReader로 읽어온 데이터는 ItemProcessor를 통해 데이터를 변환 및 가공할 수 있습니다. 아래와 같이 데이터 변환 및 가공 역할을 수행하는 PersonItemProcessor 클래스를 작성합니다. 읽어온 데이터를 대문자로 변환하는 역할을 수행합니다.

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.springframework.batch.item.ItemProcessor;

public class PersonItemProcessor implements ItemProcessor<Person, Person> {
    private static final Logger log = LoggerFactory.getLogger(PersonItemProcessor.class);
    @Override
    public Person process(final Person person) throws Exception {
        final String firstName = person.getFirstName().toUpperCase();
        final String lastName = person.getLastName().toUpperCase();
        final Person transformedPerson = new Person(firstName, lastName);
        log.info("Converting (" + person + ") into (" + transformedPerson + ")");
        return transformedPerson;
    }
}

Spring Batch에 데이터 변환처리를 담당할 클래스를 알려주어야 합니다. BatchConfiguration 클래스에 processor() 함수를 작성해줍니다.

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }
}

데이터 Process

데이터 변환 및 가공 처리가 끝난 데이터를 최종적으로 파일로 저장할지, 데이터 베이스로 저장할지 아니면 외부로 전성할지 정의해야 합니다. 또한 데이터 포맷을 변경해야 할 수 도 있습니다. 여기서는 데이터베이스에 저장하므로, 데이터를 저장할 table 생성 스크립트를 src/main/resources/schema-all.sql에 작성해줍니다.

DROP TABLE people IF EXISTS;

CREATE TABLE people  (
    person_id BIGINT IDENTITY NOT NULL PRIMARY KEY,
    first_name VARCHAR(20),
    last_name VARCHAR(20)
);

다음으로, BatchConfiguration 클래스에 write() 함수를 등록하여, 데이터 처리 로직을 구현합니다. 여기서는 write() 함수가 데이터베이스에 저장하는 용도로 작성되었습니다.

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    @Bean
    public JdbcBatchItemWriter<Person> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Person>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO people (first_name, last_name) VALUES (:firstName, :lastName)")
                .dataSource(dataSource)
                .build();
    }
}

Batch Job Configuration

데이터 입수, 가공, 처리 로직을 소스코드로 작성이 완료되었습니다. 이제 최종적으로 Job과 Step에 대한 정의를 Spring Batch에게 알려주어야 합니다. BatchConfiguration 클래스에 JobBuilderFactory클래스로 Job을 정의하고, StepBuilderFactory로 step1() 메서드로 Step을 정의합니다. 소스코드를 보시면 Job에는 Step이 정의되어야 하고, Step은 reader, writer, processor가 정의되어야 한다는 것을 알 수 있습니다. 

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    
    @Bean
    public Job importUserJob(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Person> writer) {
        return stepBuilderFactory.get("step1")
                .<Person, Person> chunk(10)
                .reader(reader())
                .processor(processor())
                .writer(writer)
                .build();
    }
}

Job이 종료되었을 때, 후속 처리할 수 있는 리스너를 등록할 수 있는데 여기서는 JobCompletionNotificationListener클래스를 정의하였습니다. 예제에는 Job이 정상적으로 종료되고 나면, Database에 저장이 되었는지, 확인하는 작업으로 후속 처리하는 로직이 작성되어 있습니다.

package com.example.demo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.listener.JobExecutionListenerSupport;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {
    private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener.class);
    private final JdbcTemplate jdbcTemplate;

@Autowired
    public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        if(jobExecution.getStatus() == BatchStatus.COMPLETED) {
            log.info("!!! JOB FINISHED! Time to verify the results");

            jdbcTemplate.query("SELECT first_name, last_name FROM people",
                    (rs, row) -> new Person(
                            rs.getString(1),
                            rs.getString(2))
            ).forEach(person -> log.info("Found <" + person + "> in the database."));
        }
    }
}

데이터 읽기, 가공, 처리, Job & Step 등록 코드가 작성된 최종 소스 코드는 아래 git repository의 complete폴더 경로를 참조 부탁드립니다. 

https://github.com/spring-guides/gs-batch-processing

Batch Job 실행하기

SpringBoot 애플리케이션을 실행하면, SpringBatch기능이 실행되고, 실행결과는 콘솔 창에 로그가 남습니다. 아래와 같이 로그가 기록되면, Spring Batch가 성공적으로 실행된 것입니다.

Executing step: [step1]
Converting (firstName: Jill, lastName: Doe) into (firstName: JILL, lastName: DOE)
Converting (firstName: Joe, lastName: Doe) into (firstName: JOE, lastName: DOE)
Converting (firstName: Justin, lastName: Doe) into (firstName: JUSTIN, lastName: DOE)
Converting (firstName: Jane, lastName: Doe) into (firstName: JANE, lastName: DOE)
Converting (firstName: John, lastName: Doe) into (firstName: JOHN, lastName: DOE)
Step: [step1] executed in 27ms
!!! JOB FINISHED! Time to verify the results
Found <firstName: JILL, lastName: DOE> in the database.
Found <firstName: JOE, lastName: DOE> in the database.
Found <firstName: JUSTIN, lastName: DOE> in the database.
Found <firstName: JANE, lastName: DOE> in the database.
Found <firstName: JOHN, lastName: DOE> in the database.
Job: [FlowJob: [name=importUserJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED] in 36ms

 

728x90
반응형
댓글
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/11   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
글 보관함