17 KiB
KAMCO Dataset Generation Batch System
KAMCO 학습 데이터 생성 및 처리를 위한 Spring Batch 시스템입니다.
목차
시스템 개요
주요 기능
- 검수 완료된 라벨링 데이터를 GeoJSON 형식으로 변환
- Docker 컨테이너를 통한 학습 데이터 생성 파이프라인 실행
- 생성된 결과물을 ZIP 파일로 압축
- 각 처리 단계별 성공/실패 이력을 DB에 자동 기록
기술 스택
- Java 17+
- Spring Boot 3.x
- Spring Batch 5.x
- PostgreSQL
- Docker
배치 작업 구조
1. Parent Job: exportGeoJsonJob
Parent Job은 진행 중인 모든 분석 회차를 조회하여 각 회차별로 Child Job을 실행합니다.
exportGeoJsonJob (Parent Job)
└─ Step: launchChildJobsStep
└─ Tasklet: LaunchChildJobsTasklet
├─ AnalCntInfo 리스트 조회
└─ 각 AnalCntInfo마다 Child Job 실행
실행 조건:
tb_map_sheet_anal_inference.anal_state = 'ING'이고 검수 완료(COMPLETE) 건수가 1개 이상인 건- 또는
tb_map_sheet_anal_inference.anal_state = 'FINISH'이면서batch_step_history에서step_name = 'zipResponseStep',status = 'SUCCESS'인 마지막 ZIP 완료 시각보다 더 늦게 검수 완료(COMPLETE) 된 건이 있는 건
2. Child Job: processAnalCntInfoJob
각 AnalCntInfo(분석 회차)마다 독립적으로 실행되는 서브 작업입니다.
processAnalCntInfoJob (Child Job)
├─ Step 1: makeGeoJsonStep
│ └─ Tasklet: MakeGeoJsonTasklet
│ └─ 검수 완료된 라벨링 데이터를 GeoJSON 파일로 생성
│ → /dataset/request/{resultUid}/*.geojson
│
├─ Step 2: dockerRunStep
│ └─ Tasklet: DockerRunTasklet
│ └─ Docker 컨테이너 실행 (학습 데이터 생성 파이프라인)
│ → /dataset/response/{resultUid}/*
│
└─ Step 3: zipResponseStep
└─ Tasklet: ZipResponseTasklet
└─ 생성된 결과물을 ZIP으로 압축
→ /dataset/response/{resultUid}.zip
JobParameters:
analUid(Long): 분석 회차 UIDresultUid(String): 결과물 고유 ID (UUID)timestamp(Long): 고유성 보장을 위한 타임스탬프
데이터베이스 스키마
1. batch_history 테이블
전체 배치 작업(Parent Job) 실행 이력을 기록합니다.
CREATE TABLE public.batch_history (
uuid UUID PRIMARY KEY, -- 배치 실행 고유 ID
job VARCHAR(255) NOT NULL, -- 배치 작업 이름 (exportGeoJsonJob)
id VARCHAR(255) NOT NULL, -- 비즈니스 ID
created_dttm TIMESTAMP NOT NULL, -- 생성 일시
updated_dttm TIMESTAMP NOT NULL, -- 수정 일시
status VARCHAR(50) NOT NULL, -- 상태 (STARTED/COMPLETED/FAILED)
completed_dttm TIMESTAMP -- 완료 일시
);
인덱스:
idx_batch_history_job(job)idx_batch_history_status(status)idx_batch_history_created(created_dttm DESC)
2. batch_step_history 테이블
각 AnalCntInfo의 Step별 실행 이력을 기록합니다.
CREATE TABLE public.batch_step_history (
id BIGSERIAL PRIMARY KEY, -- Step 이력 고유 ID
anal_uid BIGINT NOT NULL, -- 분석 UID
result_uid VARCHAR(255) NOT NULL, -- 결과 UID
step_name VARCHAR(100) NOT NULL, -- Step 이름
status VARCHAR(50) NOT NULL, -- 상태 (STARTED/SUCCESS/FAILED)
error_message TEXT, -- 에러 메시지 (최대 1000자)
started_dttm TIMESTAMP NOT NULL, -- Step 시작 일시
completed_dttm TIMESTAMP, -- Step 완료 일시
created_dttm TIMESTAMP NOT NULL, -- 생성 일시
updated_dttm TIMESTAMP NOT NULL -- 수정 일시
);
Step 이름:
makeGeoJsonStep: GeoJSON 파일 생성dockerRunStep: Docker 컨테이너 실행zipResponseStep: 결과물 ZIP 압축
인덱스:
idx_batch_step_history_anal_uid(anal_uid)idx_batch_step_history_result_uid(result_uid)idx_batch_step_history_status(status)idx_batch_step_history_step_name(step_name)
실행 흐름
전체 프로세스
1. Parent Job 시작
↓
2. 진행 중인 AnalCntInfo 리스트 조회
↓
3. 각 AnalCntInfo마다 반복:
↓
├─ 3.1. Child Job 실행 (processAnalCntInfoJob)
│ ↓
│ ├─ Step 1: makeGeoJsonStep
│ │ - beforeStep: DB에 STARTED 기록
│ │ - Tasklet 실행: GeoJSON 파일 생성
│ │ - afterStep: DB에 SUCCESS/FAILED 기록
│ │
│ ├─ Step 2: dockerRunStep
│ │ - beforeStep: DB에 STARTED 기록
│ │ - Tasklet 실행: Docker 컨테이너 실행
│ │ - afterStep: DB에 SUCCESS/FAILED 기록
│ │
│ └─ Step 3: zipResponseStep
│ - beforeStep: DB에 STARTED 기록
│ - Tasklet 실행: 결과물 ZIP 압축
│ - afterStep: DB에 SUCCESS/FAILED 기록
│
└─ 3.2. 다음 AnalCntInfo 처리
↓
4. Parent Job 종료 (부분 성공 허용)
Step 1: makeGeoJsonStep
목적: 검수 완료된 라벨링 데이터를 GeoJSON 파일로 변환
처리 과정:
findCompletedAnalMapSheetList(): 검수 완료된 도엽 목록 조회- 각 도엽별로:
findCompletedYesterdayLabelingList(): 배치 실행일 0시 이전까지 검수 완료된 데이터 조회- GeoJSON Feature 생성
/dataset/request/{resultUid}/{filename}.geojson저장updateLearnDataGeomFileCreateYn(): DB에 파일 생성 완료 플래그 업데이트
출력 파일 형식:
{resultUid_8자}_{compareYyyy}_{targetYyyy}_{mapSheetNum}_D15.geojson
예시:
ED80D700_2022_2023_3724036_D15.geojson
Step 2: dockerRunStep
목적: Docker 컨테이너를 통해 학습 데이터 생성 파이프라인 실행
Docker 명령어:
docker run --rm \
--user {dockerUser} \
-v {datasetVolume} \
-v {imagesVolume} \
--entrypoint python \
{dockerImage} \
code/kamco_full_pipeline.py \
--labelling-folder request/{resultUid} \
--output-folder response/{resultUid} \
--input_root {inputRoot} \
--output_root {outputRoot} \
--patch_size {patchSize} \
--overlap_pct {overlapPct} \
--train_val_test_ratio {train} {val} {test} \
--keep_empty_ratio {keepEmptyRatio}
에러 처리:
- Docker 프로세스의
exitCode != 0시RuntimeException발생 - Step 실패로 처리되어 DB에
FAILED상태 기록 - 에러 메시지와 exit code가
error_message컬럼에 저장됨
Step 3: zipResponseStep
목적: 생성된 학습 데이터를 ZIP 파일로 압축
처리 과정:
/dataset/response/{resultUid}/디렉토리 검증- 디렉토리 내 모든 파일과 서브디렉토리를 재귀적으로 압축
/dataset/response/{resultUid}.zip파일 생성
압축 설정:
- Hidden 파일 제외
- 디렉토리 구조 유지
- 버퍼 크기: 1024 bytes
설정 방법
application.yml 설정
# 학습 데이터 디렉토리 경로
training-data:
geojson-dir: /kamco-nfs/dataset
# Docker 설정
docker:
user: "1000:1000"
image: "kamco/dataset-generator:latest"
dataset-volume: "/kamco-nfs/dataset:/dataset"
images-volume: "/kamco-nfs/images:/images"
input-root: "/dataset"
output-root: "/dataset"
patch-size: 512
overlap-pct: 0.2
train-val-test-ratio:
- "0.7"
- "0.2"
- "0.1"
keep-empty-ratio: 0.5
환경 변수
| 환경 변수 | 설명 | 기본값 |
|---|---|---|
TRAINING_DATA_GEOJSON_DIR |
GeoJSON 파일 저장 경로 | /kamco-nfs/dataset |
DOCKER_USER |
Docker 컨테이너 실행 유저 | 1000:1000 |
DOCKER_IMAGE |
Docker 이미지 이름 | kamco/dataset-generator:latest |
모니터링 및 로그
로그 레벨
logging:
level:
com.kamco.cd.geojsonscheduler: INFO
com.kamco.cd.geojsonscheduler.batch: DEBUG
org.springframework.batch: INFO
주요 로그 포인트
Parent Job 로그
[INFO] Parent Job 시작: AnalCntInfo 리스트 조회 및 Child Job 실행
[INFO] 진행중인 회차 목록 조회 중...
[INFO] 진행중인 회차 수: 3
[INFO] 회차 검토: AnalUid=100, ResultUid=ED80D700...
[INFO] Child Job 실행 중... (AnalUid=100, ResultUid=ED80D700...)
[INFO] Child Job 실행 완료 (AnalUid=100, ResultUid=ED80D700...)
[INFO] Parent Job 완료 - 성공: 2, 건너뜀: 1, 실패: 0
Step 로그
[INFO] ========================================
[INFO] GeoJSON 생성 시작 (AnalUid=100, ResultUid=ED80D700...)
[INFO] 검수 완료된 도엽 수: 5
[INFO] 도엽 처리 중: MapSheetNum=3724036
[INFO] 완료된 라벨링 데이터 수: 150
[INFO] GeoJSON 파일 저장 완료: /dataset/request/ED80D700.../ED80D700_2022_2023_3724036_D15.geojson
[INFO] GeoJSON 생성 완료 (ResultUid=ED80D700...) - 처리된 도엽 수: 5, 생성된 파일 수: 5
[INFO] ========================================
Docker 실행 로그
[INFO] ========================================
[INFO] Docker 컨테이너 실행 시작 (ResultUid=ED80D700...)
[INFO] Running docker command: docker run --rm --user 1000:1000...
[INFO] [docker] Loading configuration...
[INFO] [docker] Processing pipeline...
[INFO] [docker] Pipeline completed successfully
[INFO] Docker process completed successfully for resultUid: ED80D700...
[INFO] ========================================
DB 조회 쿼리
1. 특정 회차의 모든 Step 실행 이력
SELECT
step_name,
status,
started_dttm,
completed_dttm,
EXTRACT(EPOCH FROM (completed_dttm - started_dttm)) AS duration_seconds,
error_message
FROM batch_step_history
WHERE anal_uid = 100
AND result_uid = 'ED80D700A0F5482BB0EC11A366DEA8DE'
ORDER BY started_dttm;
2. 최근 실패한 Step 조회
SELECT
anal_uid,
result_uid,
step_name,
error_message,
started_dttm,
completed_dttm
FROM batch_step_history
WHERE status = 'FAILED'
ORDER BY started_dttm DESC
LIMIT 10;
3. Step별 성공률 통계
SELECT
step_name,
COUNT(*) AS total_executions,
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS success_count,
SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
ROUND(
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END)::NUMERIC / COUNT(*) * 100,
2
) AS success_rate_pct
FROM batch_step_history
GROUP BY step_name;
4. 평균 실행 시간 (Step별)
SELECT
step_name,
COUNT(*) AS total_executions,
AVG(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS avg_duration_seconds,
MIN(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS min_duration_seconds,
MAX(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS max_duration_seconds
FROM batch_step_history
WHERE status = 'SUCCESS'
AND completed_dttm IS NOT NULL
GROUP BY step_name;
5. 특정 기간 동안 처리된 회차 수
SELECT
DATE(started_dttm) AS execution_date,
COUNT(DISTINCT result_uid) AS processed_count
FROM batch_step_history
WHERE step_name = 'makeGeoJsonStep'
AND started_dttm >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY DATE(started_dttm)
ORDER BY execution_date DESC;
트러블슈팅
1. Docker 컨테이너 실행 실패
증상:
[ERROR] Docker process exited with code 1 for resultUid: ED80D700...
FileNotFoundError: Missing training pairs root at /dataset/response/.../tifs/train
원인:
- GeoJSON 파일이 생성되지 않았거나 잘못된 형식
- Docker 볼륨 마운트 경로 불일치
- 파이프라인 실행 중 필수 파일 누락
해결 방법:
-
batch_step_history테이블에서makeGeoJsonStep상태 확인SELECT * FROM batch_step_history WHERE result_uid = 'ED80D700...' AND step_name = 'makeGeoJsonStep'; -
GeoJSON 파일 존재 여부 확인
ls -la /kamco-nfs/dataset/request/ED80D700.../ -
Docker 볼륨 설정 확인
docker: dataset-volume: "/kamco-nfs/dataset:/dataset" # 호스트:컨테이너
2. GeoJSON 파일이 생성되지 않음
증상:
[WARN] 검수 완료된 도엽이 없음. 작업 건너뜀.
원인:
- 검수 완료(
COMPLETE) 상태의 데이터가 없음 inspect_stat_dttm이 오늘 이후 (어제까지만 조회)
해결 방법:
-
검수 완료 데이터 확인
SELECT COUNT(*) FROM tb_labeling_assignment WHERE anal_uid = 100 AND inspect_state = 'COMPLETE'; -
검수 완료 시간 확인
SELECT MAX(inspect_stat_dttm) FROM tb_labeling_assignment WHERE anal_uid = 100 AND inspect_state = 'COMPLETE';
3. ZIP 파일 생성 실패
증상:
[ERROR] Response 디렉토리가 존재하지 않음: /dataset/response/ED80D700...
원인:
- Docker Step에서 결과물이 생성되지 않음
- Docker 컨테이너가 중간에 실패했지만 감지되지 않음
해결 방법:
-
Docker Step 상태 확인
SELECT * FROM batch_step_history WHERE result_uid = 'ED80D700...' AND step_name = 'dockerRunStep'; -
Response 디렉토리 확인
ls -la /kamco-nfs/dataset/response/ED80D700.../ -
Docker 컨테이너 로그 확인 (DB의
error_message컬럼)
4. Child Job이 실행되지 않음
증상:
[INFO] 모든 파일이 이미 처리됨. 건너뜀.
원인:
all_cnt == file_cnt(이미 모든 파일이 생성됨)- 재실행이 필요한 경우 플래그를 초기화하지 않음
해결 방법:
-
파일 생성 플래그 초기화
UPDATE tb_map_sheet_learn_data_geom SET file_create_yn = false, updated_dttm = NOW() WHERE geo_uid IN ( SELECT inference_geom_uid FROM tb_labeling_assignment WHERE anal_uid = 100 ); -
배치 재실행
5. 배치 작업 전체가 실패함
증상:
[ERROR] Child Job 실행 실패 (AnalUid=100, ResultUid=ED80D700...): ...
[WARN] 3 개의 Child Job 실행이 실패했습니다.
원인:
- 여러 회차에서 동시에 실패 발생
- 현재는 부분 실패를 허용하도록 설정됨
해결 방법:
-
실패한 회차 확인
SELECT DISTINCT anal_uid, result_uid FROM batch_step_history WHERE status = 'FAILED' AND started_dttm >= CURRENT_DATE; -
각 회차별로 실패 원인 분석
SELECT step_name, error_message FROM batch_step_history WHERE anal_uid = 100 AND status = 'FAILED'; -
실패 정책 변경 (필요 시)
LaunchChildJobsTasklet.java:87-89주석 해제하여 하나라도 실패 시 Parent Job 실패 처리
개발자 가이드
새로운 Step 추가하기
-
Tasklet 생성
@Component @RequiredArgsConstructor public class NewStepTasklet implements Tasklet { @Value("#{jobParameters['analUid']}") private Long analUid; @Value("#{jobParameters['resultUid']}") private String resultUid; @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { // 로직 구현 return RepeatStatus.FINISHED; } } -
JobConfig에 Step 등록
@Bean public Step newStep() { return new StepBuilder("newStep", jobRepository) .tasklet(newStepTasklet, transactionManager) .listener(stepHistoryListener) // 이력 기록 .build(); } -
Job Flow에 추가
@Bean public Job processAnalCntInfoJob() { return new JobBuilder("processAnalCntInfoJob", jobRepository) .start(makeGeoJsonStep()) .next(dockerRunStep()) .next(zipResponseStep()) .next(newStep()) // 새로운 Step 추가 .build(); }
로그 커스터마이징
로그 레벨 변경:
logging:
level:
com.kamco.cd.geojsonscheduler.batch.DockerRunTasklet: DEBUG
특정 Step만 로그 출력:
@Slf4j
public class CustomTasklet implements Tasklet {
@Override
public RepeatStatus execute(...) {
if (log.isDebugEnabled()) {
log.debug("상세 디버그 정보: {}", details);
}
return RepeatStatus.FINISHED;
}
}
배포 및 운영
배포 절차
-
빌드
./gradlew clean build -
Docker 이미지 빌드
docker build -t kamco-batch:latest . -
실행
java -jar build/libs/kamco-geojson-scheduler-1.0.0.jar
스케줄링 설정
Spring Scheduler를 사용한 정기 실행:
@Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
public void runBatch() {
JobParameters jobParameters = new JobParametersBuilder()
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(exportGeoJsonJob, jobParameters);
}
라이선스
Copyright (c) 2024 KAMCO. All rights reserved.
문의
기술 지원: tech-support@kamco.co.kr