Split the function
This commit is contained in:
687
kamco-make-dataset-generation/README.md
Normal file
687
kamco-make-dataset-generation/README.md
Normal file
@@ -0,0 +1,687 @@
|
||||
# KAMCO Dataset Generation Batch System
|
||||
|
||||
KAMCO 학습 데이터 생성 및 처리를 위한 Spring Batch 시스템입니다.
|
||||
|
||||
## 목차
|
||||
- [시스템 개요](#시스템-개요)
|
||||
- [배치 작업 구조](#배치-작업-구조)
|
||||
- [데이터베이스 스키마](#데이터베이스-스키마)
|
||||
- [실행 흐름](#실행-흐름)
|
||||
- [설정 방법](#설정-방법)
|
||||
- [모니터링 및 로그](#모니터링-및-로그)
|
||||
- [트러블슈팅](#트러블슈팅)
|
||||
|
||||
---
|
||||
|
||||
## 시스템 개요
|
||||
|
||||
### 주요 기능
|
||||
- 검수 완료된 라벨링 데이터를 GeoJSON 형식으로 변환
|
||||
- Docker 컨테이너를 통한 학습 데이터 생성 파이프라인 실행
|
||||
- 생성된 결과물을 ZIP 파일로 압축
|
||||
- 각 처리 단계별 성공/실패 이력을 DB에 자동 기록
|
||||
|
||||
### 기술 스택
|
||||
- **Java 17+**
|
||||
- **Spring Boot 3.x**
|
||||
- **Spring Batch 5.x**
|
||||
- **PostgreSQL**
|
||||
- **Docker**
|
||||
|
||||
---
|
||||
|
||||
## 배치 작업 구조
|
||||
|
||||
### 1. Parent Job: `exportGeoJsonJob`
|
||||
|
||||
Parent Job은 진행 중인 모든 분석 회차를 조회하여 각 회차별로 Child Job을 실행합니다.
|
||||
|
||||
```
|
||||
exportGeoJsonJob (Parent Job)
|
||||
└─ Step: launchChildJobsStep
|
||||
└─ Tasklet: LaunchChildJobsTasklet
|
||||
├─ AnalCntInfo 리스트 조회
|
||||
└─ 각 AnalCntInfo마다 Child Job 실행
|
||||
```
|
||||
|
||||
**실행 조건:**
|
||||
- `tb_map_sheet_anal_inference` 테이블의 `anal_state = 'ING'` (진행 중)
|
||||
- 검수 완료(`COMPLETE`) 건수가 1개 이상 존재
|
||||
- `all_cnt != file_cnt` (아직 파일 생성이 완료되지 않은 경우)
|
||||
|
||||
---
|
||||
|
||||
### 2. Child Job: `processAnalCntInfoJob`
|
||||
|
||||
각 AnalCntInfo(분석 회차)마다 독립적으로 실행되는 서브 작업입니다.
|
||||
|
||||
```
|
||||
processAnalCntInfoJob (Child Job)
|
||||
├─ Step 1: makeGeoJsonStep
|
||||
│ └─ Tasklet: MakeGeoJsonTasklet
|
||||
│ └─ 검수 완료된 라벨링 데이터를 GeoJSON 파일로 생성
|
||||
│ → /dataset/request/{resultUid}/*.geojson
|
||||
│
|
||||
├─ Step 2: dockerRunStep
|
||||
│ └─ Tasklet: DockerRunTasklet
|
||||
│ └─ Docker 컨테이너 실행 (학습 데이터 생성 파이프라인)
|
||||
│ → /dataset/response/{resultUid}/*
|
||||
│
|
||||
└─ Step 3: zipResponseStep
|
||||
└─ Tasklet: ZipResponseTasklet
|
||||
└─ 생성된 결과물을 ZIP으로 압축
|
||||
→ /dataset/response/{resultUid}.zip
|
||||
```
|
||||
|
||||
**JobParameters:**
|
||||
- `analUid` (Long): 분석 회차 UID
|
||||
- `resultUid` (String): 결과물 고유 ID (UUID)
|
||||
- `timestamp` (Long): 고유성 보장을 위한 타임스탬프
|
||||
|
||||
---
|
||||
|
||||
## 데이터베이스 스키마
|
||||
|
||||
### 1. `batch_history` 테이블
|
||||
|
||||
전체 배치 작업(Parent Job) 실행 이력을 기록합니다.
|
||||
|
||||
```sql
|
||||
CREATE TABLE public.batch_history (
|
||||
uuid UUID PRIMARY KEY, -- 배치 실행 고유 ID
|
||||
job VARCHAR(255) NOT NULL, -- 배치 작업 이름 (exportGeoJsonJob)
|
||||
id VARCHAR(255) NOT NULL, -- 비즈니스 ID
|
||||
created_dttm TIMESTAMP NOT NULL, -- 생성 일시
|
||||
updated_dttm TIMESTAMP NOT NULL, -- 수정 일시
|
||||
status VARCHAR(50) NOT NULL, -- 상태 (STARTED/COMPLETED/FAILED)
|
||||
completed_dttm TIMESTAMP -- 완료 일시
|
||||
);
|
||||
```
|
||||
|
||||
**인덱스:**
|
||||
- `idx_batch_history_job` (job)
|
||||
- `idx_batch_history_status` (status)
|
||||
- `idx_batch_history_created` (created_dttm DESC)
|
||||
|
||||
---
|
||||
|
||||
### 2. `batch_step_history` 테이블
|
||||
|
||||
각 AnalCntInfo의 Step별 실행 이력을 기록합니다.
|
||||
|
||||
```sql
|
||||
CREATE TABLE public.batch_step_history (
|
||||
id BIGSERIAL PRIMARY KEY, -- Step 이력 고유 ID
|
||||
anal_uid BIGINT NOT NULL, -- 분석 UID
|
||||
result_uid VARCHAR(255) NOT NULL, -- 결과 UID
|
||||
step_name VARCHAR(100) NOT NULL, -- Step 이름
|
||||
status VARCHAR(50) NOT NULL, -- 상태 (STARTED/SUCCESS/FAILED)
|
||||
error_message TEXT, -- 에러 메시지 (최대 1000자)
|
||||
started_dttm TIMESTAMP NOT NULL, -- Step 시작 일시
|
||||
completed_dttm TIMESTAMP, -- Step 완료 일시
|
||||
created_dttm TIMESTAMP NOT NULL, -- 생성 일시
|
||||
updated_dttm TIMESTAMP NOT NULL -- 수정 일시
|
||||
);
|
||||
```
|
||||
|
||||
**Step 이름:**
|
||||
- `makeGeoJsonStep`: GeoJSON 파일 생성
|
||||
- `dockerRunStep`: Docker 컨테이너 실행
|
||||
- `zipResponseStep`: 결과물 ZIP 압축
|
||||
|
||||
**인덱스:**
|
||||
- `idx_batch_step_history_anal_uid` (anal_uid)
|
||||
- `idx_batch_step_history_result_uid` (result_uid)
|
||||
- `idx_batch_step_history_status` (status)
|
||||
- `idx_batch_step_history_step_name` (step_name)
|
||||
|
||||
---
|
||||
|
||||
## 실행 흐름
|
||||
|
||||
### 전체 프로세스
|
||||
|
||||
```
|
||||
1. Parent Job 시작
|
||||
↓
|
||||
2. 진행 중인 AnalCntInfo 리스트 조회
|
||||
↓
|
||||
3. 각 AnalCntInfo마다 반복:
|
||||
↓
|
||||
├─ 3.1. Child Job 실행 (processAnalCntInfoJob)
|
||||
│ ↓
|
||||
│ ├─ Step 1: makeGeoJsonStep
|
||||
│ │ - beforeStep: DB에 STARTED 기록
|
||||
│ │ - Tasklet 실행: GeoJSON 파일 생성
|
||||
│ │ - afterStep: DB에 SUCCESS/FAILED 기록
|
||||
│ │
|
||||
│ ├─ Step 2: dockerRunStep
|
||||
│ │ - beforeStep: DB에 STARTED 기록
|
||||
│ │ - Tasklet 실행: Docker 컨테이너 실행
|
||||
│ │ - afterStep: DB에 SUCCESS/FAILED 기록
|
||||
│ │
|
||||
│ └─ Step 3: zipResponseStep
|
||||
│ - beforeStep: DB에 STARTED 기록
|
||||
│ - Tasklet 실행: 결과물 ZIP 압축
|
||||
│ - afterStep: DB에 SUCCESS/FAILED 기록
|
||||
│
|
||||
└─ 3.2. 다음 AnalCntInfo 처리
|
||||
↓
|
||||
4. Parent Job 종료 (부분 성공 허용)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Step 1: makeGeoJsonStep
|
||||
|
||||
**목적:** 검수 완료된 라벨링 데이터를 GeoJSON 파일로 변환
|
||||
|
||||
**처리 과정:**
|
||||
1. `findCompletedAnalMapSheetList()`: 검수 완료된 도엽 목록 조회
|
||||
2. 각 도엽별로:
|
||||
- `findCompletedYesterdayLabelingList()`: 어제까지 검수 완료된 데이터 조회
|
||||
- GeoJSON Feature 생성
|
||||
- `/dataset/request/{resultUid}/{filename}.geojson` 저장
|
||||
- `updateLearnDataGeomFileCreateYn()`: DB에 파일 생성 완료 플래그 업데이트
|
||||
|
||||
**출력 파일 형식:**
|
||||
```
|
||||
{resultUid_8자}_{compareYyyy}_{targetYyyy}_{mapSheetNum}_D15.geojson
|
||||
```
|
||||
|
||||
**예시:**
|
||||
```
|
||||
ED80D700_2022_2023_3724036_D15.geojson
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Step 2: dockerRunStep
|
||||
|
||||
**목적:** Docker 컨테이너를 통해 학습 데이터 생성 파이프라인 실행
|
||||
|
||||
**Docker 명령어:**
|
||||
```bash
|
||||
docker run --rm \
|
||||
--user {dockerUser} \
|
||||
-v {datasetVolume} \
|
||||
-v {imagesVolume} \
|
||||
--entrypoint python \
|
||||
{dockerImage} \
|
||||
code/kamco_full_pipeline.py \
|
||||
--labelling-folder request/{resultUid} \
|
||||
--output-folder response/{resultUid} \
|
||||
--input_root {inputRoot} \
|
||||
--output_root {outputRoot} \
|
||||
--patch_size {patchSize} \
|
||||
--overlap_pct {overlapPct} \
|
||||
--train_val_test_ratio {train} {val} {test} \
|
||||
--keep_empty_ratio {keepEmptyRatio}
|
||||
```
|
||||
|
||||
**에러 처리:**
|
||||
- Docker 프로세스의 `exitCode != 0` 시 `RuntimeException` 발생
|
||||
- Step 실패로 처리되어 DB에 `FAILED` 상태 기록
|
||||
- 에러 메시지와 exit code가 `error_message` 컬럼에 저장됨
|
||||
|
||||
---
|
||||
|
||||
### Step 3: zipResponseStep
|
||||
|
||||
**목적:** 생성된 학습 데이터를 ZIP 파일로 압축
|
||||
|
||||
**처리 과정:**
|
||||
1. `/dataset/response/{resultUid}/` 디렉토리 검증
|
||||
2. 디렉토리 내 모든 파일과 서브디렉토리를 재귀적으로 압축
|
||||
3. `/dataset/response/{resultUid}.zip` 파일 생성
|
||||
|
||||
**압축 설정:**
|
||||
- Hidden 파일 제외
|
||||
- 디렉토리 구조 유지
|
||||
- 버퍼 크기: 1024 bytes
|
||||
|
||||
---
|
||||
|
||||
## 설정 방법
|
||||
|
||||
### application.yml 설정
|
||||
|
||||
```yaml
|
||||
# 학습 데이터 디렉토리 경로
|
||||
training-data:
|
||||
geojson-dir: /kamco-nfs/dataset
|
||||
|
||||
# Docker 설정
|
||||
docker:
|
||||
user: "1000:1000"
|
||||
image: "kamco/dataset-generator:latest"
|
||||
dataset-volume: "/kamco-nfs/dataset:/dataset"
|
||||
images-volume: "/kamco-nfs/images:/images"
|
||||
input-root: "/dataset"
|
||||
output-root: "/dataset"
|
||||
patch-size: 512
|
||||
overlap-pct: 0.2
|
||||
train-val-test-ratio:
|
||||
- "0.7"
|
||||
- "0.2"
|
||||
- "0.1"
|
||||
keep-empty-ratio: 0.5
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 환경 변수
|
||||
|
||||
| 환경 변수 | 설명 | 기본값 |
|
||||
|----------|------|--------|
|
||||
| `TRAINING_DATA_GEOJSON_DIR` | GeoJSON 파일 저장 경로 | `/kamco-nfs/dataset` |
|
||||
| `DOCKER_USER` | Docker 컨테이너 실행 유저 | `1000:1000` |
|
||||
| `DOCKER_IMAGE` | Docker 이미지 이름 | `kamco/dataset-generator:latest` |
|
||||
|
||||
---
|
||||
|
||||
## 모니터링 및 로그
|
||||
|
||||
### 로그 레벨
|
||||
|
||||
```yaml
|
||||
logging:
|
||||
level:
|
||||
com.kamco.cd.geojsonscheduler: INFO
|
||||
com.kamco.cd.geojsonscheduler.batch: DEBUG
|
||||
org.springframework.batch: INFO
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 주요 로그 포인트
|
||||
|
||||
#### Parent Job 로그
|
||||
```log
|
||||
[INFO] Parent Job 시작: AnalCntInfo 리스트 조회 및 Child Job 실행
|
||||
[INFO] 진행중인 회차 목록 조회 중...
|
||||
[INFO] 진행중인 회차 수: 3
|
||||
[INFO] 회차 검토: AnalUid=100, ResultUid=ED80D700...
|
||||
[INFO] Child Job 실행 중... (AnalUid=100, ResultUid=ED80D700...)
|
||||
[INFO] Child Job 실행 완료 (AnalUid=100, ResultUid=ED80D700...)
|
||||
[INFO] Parent Job 완료 - 성공: 2, 건너뜀: 1, 실패: 0
|
||||
```
|
||||
|
||||
#### Step 로그
|
||||
```log
|
||||
[INFO] ========================================
|
||||
[INFO] GeoJSON 생성 시작 (AnalUid=100, ResultUid=ED80D700...)
|
||||
[INFO] 검수 완료된 도엽 수: 5
|
||||
[INFO] 도엽 처리 중: MapSheetNum=3724036
|
||||
[INFO] 완료된 라벨링 데이터 수: 150
|
||||
[INFO] GeoJSON 파일 저장 완료: /dataset/request/ED80D700.../ED80D700_2022_2023_3724036_D15.geojson
|
||||
[INFO] GeoJSON 생성 완료 (ResultUid=ED80D700...) - 처리된 도엽 수: 5, 생성된 파일 수: 5
|
||||
[INFO] ========================================
|
||||
```
|
||||
|
||||
#### Docker 실행 로그
|
||||
```log
|
||||
[INFO] ========================================
|
||||
[INFO] Docker 컨테이너 실행 시작 (ResultUid=ED80D700...)
|
||||
[INFO] Running docker command: docker run --rm --user 1000:1000...
|
||||
[INFO] [docker] Loading configuration...
|
||||
[INFO] [docker] Processing pipeline...
|
||||
[INFO] [docker] Pipeline completed successfully
|
||||
[INFO] Docker process completed successfully for resultUid: ED80D700...
|
||||
[INFO] ========================================
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### DB 조회 쿼리
|
||||
|
||||
#### 1. 특정 회차의 모든 Step 실행 이력
|
||||
```sql
|
||||
SELECT
|
||||
step_name,
|
||||
status,
|
||||
started_dttm,
|
||||
completed_dttm,
|
||||
EXTRACT(EPOCH FROM (completed_dttm - started_dttm)) AS duration_seconds,
|
||||
error_message
|
||||
FROM batch_step_history
|
||||
WHERE anal_uid = 100
|
||||
AND result_uid = 'ED80D700A0F5482BB0EC11A366DEA8DE'
|
||||
ORDER BY started_dttm;
|
||||
```
|
||||
|
||||
#### 2. 최근 실패한 Step 조회
|
||||
```sql
|
||||
SELECT
|
||||
anal_uid,
|
||||
result_uid,
|
||||
step_name,
|
||||
error_message,
|
||||
started_dttm,
|
||||
completed_dttm
|
||||
FROM batch_step_history
|
||||
WHERE status = 'FAILED'
|
||||
ORDER BY started_dttm DESC
|
||||
LIMIT 10;
|
||||
```
|
||||
|
||||
#### 3. Step별 성공률 통계
|
||||
```sql
|
||||
SELECT
|
||||
step_name,
|
||||
COUNT(*) AS total_executions,
|
||||
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS success_count,
|
||||
SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
|
||||
ROUND(
|
||||
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END)::NUMERIC / COUNT(*) * 100,
|
||||
2
|
||||
) AS success_rate_pct
|
||||
FROM batch_step_history
|
||||
GROUP BY step_name;
|
||||
```
|
||||
|
||||
#### 4. 평균 실행 시간 (Step별)
|
||||
```sql
|
||||
SELECT
|
||||
step_name,
|
||||
COUNT(*) AS total_executions,
|
||||
AVG(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS avg_duration_seconds,
|
||||
MIN(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS min_duration_seconds,
|
||||
MAX(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS max_duration_seconds
|
||||
FROM batch_step_history
|
||||
WHERE status = 'SUCCESS'
|
||||
AND completed_dttm IS NOT NULL
|
||||
GROUP BY step_name;
|
||||
```
|
||||
|
||||
#### 5. 특정 기간 동안 처리된 회차 수
|
||||
```sql
|
||||
SELECT
|
||||
DATE(started_dttm) AS execution_date,
|
||||
COUNT(DISTINCT result_uid) AS processed_count
|
||||
FROM batch_step_history
|
||||
WHERE step_name = 'makeGeoJsonStep'
|
||||
AND started_dttm >= CURRENT_DATE - INTERVAL '7 days'
|
||||
GROUP BY DATE(started_dttm)
|
||||
ORDER BY execution_date DESC;
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 트러블슈팅
|
||||
|
||||
### 1. Docker 컨테이너 실행 실패
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[ERROR] Docker process exited with code 1 for resultUid: ED80D700...
|
||||
FileNotFoundError: Missing training pairs root at /dataset/response/.../tifs/train
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- GeoJSON 파일이 생성되지 않았거나 잘못된 형식
|
||||
- Docker 볼륨 마운트 경로 불일치
|
||||
- 파이프라인 실행 중 필수 파일 누락
|
||||
|
||||
**해결 방법:**
|
||||
1. `batch_step_history` 테이블에서 `makeGeoJsonStep` 상태 확인
|
||||
```sql
|
||||
SELECT * FROM batch_step_history
|
||||
WHERE result_uid = 'ED80D700...'
|
||||
AND step_name = 'makeGeoJsonStep';
|
||||
```
|
||||
|
||||
2. GeoJSON 파일 존재 여부 확인
|
||||
```bash
|
||||
ls -la /kamco-nfs/dataset/request/ED80D700.../
|
||||
```
|
||||
|
||||
3. Docker 볼륨 설정 확인
|
||||
```yaml
|
||||
docker:
|
||||
dataset-volume: "/kamco-nfs/dataset:/dataset" # 호스트:컨테이너
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 2. GeoJSON 파일이 생성되지 않음
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[WARN] 검수 완료된 도엽이 없음. 작업 건너뜀.
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- 검수 완료(`COMPLETE`) 상태의 데이터가 없음
|
||||
- `inspect_stat_dttm`이 오늘 이후 (어제까지만 조회)
|
||||
|
||||
**해결 방법:**
|
||||
1. 검수 완료 데이터 확인
|
||||
```sql
|
||||
SELECT COUNT(*)
|
||||
FROM tb_labeling_assignment
|
||||
WHERE anal_uid = 100
|
||||
AND inspect_state = 'COMPLETE';
|
||||
```
|
||||
|
||||
2. 검수 완료 시간 확인
|
||||
```sql
|
||||
SELECT MAX(inspect_stat_dttm)
|
||||
FROM tb_labeling_assignment
|
||||
WHERE anal_uid = 100
|
||||
AND inspect_state = 'COMPLETE';
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 3. ZIP 파일 생성 실패
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[ERROR] Response 디렉토리가 존재하지 않음: /dataset/response/ED80D700...
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- Docker Step에서 결과물이 생성되지 않음
|
||||
- Docker 컨테이너가 중간에 실패했지만 감지되지 않음
|
||||
|
||||
**해결 방법:**
|
||||
1. Docker Step 상태 확인
|
||||
```sql
|
||||
SELECT * FROM batch_step_history
|
||||
WHERE result_uid = 'ED80D700...'
|
||||
AND step_name = 'dockerRunStep';
|
||||
```
|
||||
|
||||
2. Response 디렉토리 확인
|
||||
```bash
|
||||
ls -la /kamco-nfs/dataset/response/ED80D700.../
|
||||
```
|
||||
|
||||
3. Docker 컨테이너 로그 확인 (DB의 `error_message` 컬럼)
|
||||
|
||||
---
|
||||
|
||||
### 4. Child Job이 실행되지 않음
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[INFO] 모든 파일이 이미 처리됨. 건너뜀.
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- `all_cnt == file_cnt` (이미 모든 파일이 생성됨)
|
||||
- 재실행이 필요한 경우 플래그를 초기화하지 않음
|
||||
|
||||
**해결 방법:**
|
||||
1. 파일 생성 플래그 초기화
|
||||
```sql
|
||||
UPDATE tb_map_sheet_learn_data_geom
|
||||
SET file_create_yn = false,
|
||||
updated_dttm = NOW()
|
||||
WHERE geo_uid IN (
|
||||
SELECT inference_geom_uid
|
||||
FROM tb_labeling_assignment
|
||||
WHERE anal_uid = 100
|
||||
);
|
||||
```
|
||||
|
||||
2. 배치 재실행
|
||||
|
||||
---
|
||||
|
||||
### 5. 배치 작업 전체가 실패함
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[ERROR] Child Job 실행 실패 (AnalUid=100, ResultUid=ED80D700...): ...
|
||||
[WARN] 3 개의 Child Job 실행이 실패했습니다.
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- 여러 회차에서 동시에 실패 발생
|
||||
- 현재는 부분 실패를 허용하도록 설정됨
|
||||
|
||||
**해결 방법:**
|
||||
1. 실패한 회차 확인
|
||||
```sql
|
||||
SELECT DISTINCT anal_uid, result_uid
|
||||
FROM batch_step_history
|
||||
WHERE status = 'FAILED'
|
||||
AND started_dttm >= CURRENT_DATE;
|
||||
```
|
||||
|
||||
2. 각 회차별로 실패 원인 분석
|
||||
```sql
|
||||
SELECT step_name, error_message
|
||||
FROM batch_step_history
|
||||
WHERE anal_uid = 100
|
||||
AND status = 'FAILED';
|
||||
```
|
||||
|
||||
3. 실패 정책 변경 (필요 시)
|
||||
- `LaunchChildJobsTasklet.java:87-89` 주석 해제하여 하나라도 실패 시 Parent Job 실패 처리
|
||||
|
||||
---
|
||||
|
||||
## 개발자 가이드
|
||||
|
||||
### 새로운 Step 추가하기
|
||||
|
||||
1. **Tasklet 생성**
|
||||
```java
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class NewStepTasklet implements Tasklet {
|
||||
@Value("#{jobParameters['analUid']}")
|
||||
private Long analUid;
|
||||
|
||||
@Value("#{jobParameters['resultUid']}")
|
||||
private String resultUid;
|
||||
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
// 로직 구현
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
2. **JobConfig에 Step 등록**
|
||||
```java
|
||||
@Bean
|
||||
public Step newStep() {
|
||||
return new StepBuilder("newStep", jobRepository)
|
||||
.tasklet(newStepTasklet, transactionManager)
|
||||
.listener(stepHistoryListener) // 이력 기록
|
||||
.build();
|
||||
}
|
||||
```
|
||||
|
||||
3. **Job Flow에 추가**
|
||||
```java
|
||||
@Bean
|
||||
public Job processAnalCntInfoJob() {
|
||||
return new JobBuilder("processAnalCntInfoJob", jobRepository)
|
||||
.start(makeGeoJsonStep())
|
||||
.next(dockerRunStep())
|
||||
.next(zipResponseStep())
|
||||
.next(newStep()) // 새로운 Step 추가
|
||||
.build();
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 로그 커스터마이징
|
||||
|
||||
**로그 레벨 변경:**
|
||||
```yaml
|
||||
logging:
|
||||
level:
|
||||
com.kamco.cd.geojsonscheduler.batch.DockerRunTasklet: DEBUG
|
||||
```
|
||||
|
||||
**특정 Step만 로그 출력:**
|
||||
```java
|
||||
@Slf4j
|
||||
public class CustomTasklet implements Tasklet {
|
||||
@Override
|
||||
public RepeatStatus execute(...) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("상세 디버그 정보: {}", details);
|
||||
}
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 배포 및 운영
|
||||
|
||||
### 배포 절차
|
||||
|
||||
1. **빌드**
|
||||
```bash
|
||||
./gradlew clean build
|
||||
```
|
||||
|
||||
2. **Docker 이미지 빌드**
|
||||
```bash
|
||||
docker build -t kamco-batch:latest .
|
||||
```
|
||||
|
||||
3. **실행**
|
||||
```bash
|
||||
java -jar build/libs/kamco-geojson-scheduler-1.0.0.jar
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 스케줄링 설정
|
||||
|
||||
Spring Scheduler를 사용한 정기 실행:
|
||||
|
||||
```java
|
||||
@Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
|
||||
public void runBatch() {
|
||||
JobParameters jobParameters = new JobParametersBuilder()
|
||||
.addLong("timestamp", System.currentTimeMillis())
|
||||
.toJobParameters();
|
||||
|
||||
jobLauncher.run(exportGeoJsonJob, jobParameters);
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 라이선스
|
||||
|
||||
Copyright (c) 2024 KAMCO. All rights reserved.
|
||||
|
||||
---
|
||||
|
||||
## 문의
|
||||
|
||||
기술 지원: tech-support@kamco.co.kr
|
||||
@@ -10,26 +10,97 @@ import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너 실행 Tasklet
|
||||
*
|
||||
* <p>학습 데이터 생성 파이프라인이 담긴 Docker 컨테이너를 실행합니다. 이전 Step(makeGeoJsonStep)에서 생성된 GeoJSON
|
||||
* 파일들을 입력으로 받아 학습 데이터셋을 생성합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Docker 컨테이너 실행 (kamco_full_pipeline.py)
|
||||
* <li>입력: /dataset/request/{resultUid}/*.geojson
|
||||
* <li>출력: /dataset/response/{resultUid}/*
|
||||
* <li>Docker 실행 실패 시 RuntimeException 발생 (Step 실패 처리)
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>실행 조건:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>makeGeoJsonStep이 성공적으로 완료되어야 함
|
||||
* <li>request/{resultUid}/ 디렉토리에 GeoJSON 파일이 존재해야 함
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>Docker Exit Code 처리:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Exit Code = 0: 정상 종료 (Step 성공)
|
||||
* <li>Exit Code != 0: 비정상 종료 (RuntimeException 발생 → Step 실패)
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
* @see DockerRunnerService
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DockerRunTasklet implements Tasklet {
|
||||
|
||||
/** Docker 컨테이너 실행을 담당하는 서비스 */
|
||||
private final DockerRunnerService dockerRunnerService;
|
||||
|
||||
/** Job Parameter로 전달받은 결과물 고유 ID (UUID) */
|
||||
@Value("#{jobParameters['resultUid']}")
|
||||
private String resultUid;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너 실행 작업 수행
|
||||
*
|
||||
* <p>DockerRunnerService를 통해 학습 데이터 생성 파이프라인을 실행합니다. Docker 프로세스가 비정상 종료될 경우
|
||||
* RuntimeException이 발생하여 Step이 실패 처리됩니다.
|
||||
*
|
||||
* @param contribution Step 실행 정보를 담는 객체
|
||||
* @param chunkContext Chunk 실행 컨텍스트
|
||||
* @return RepeatStatus.FINISHED - 작업 완료
|
||||
* @throws RuntimeException Docker 프로세스가 비정상 종료(exitCode != 0)된 경우
|
||||
*/
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("Docker 컨테이너 실행 시작 (ResultUid={})", resultUid);
|
||||
log.info("========================================");
|
||||
|
||||
// 실행 전 사전 정보 로깅
|
||||
log.info("[사전 정보]");
|
||||
log.info(" - 입력 디렉토리: /dataset/request/{}/", resultUid);
|
||||
log.info(" - 출력 디렉토리: /dataset/response/{}/", resultUid);
|
||||
log.info(" - 파이프라인: kamco_full_pipeline.py");
|
||||
|
||||
try {
|
||||
// DockerRunnerService를 통해 Docker 컨테이너 실행
|
||||
// exitCode != 0 시 RuntimeException 발생
|
||||
log.info("[Docker 실행] 컨테이너 시작...");
|
||||
dockerRunnerService.run(resultUid);
|
||||
log.info("[Docker 실행] ✓ 컨테이너 정상 종료 (exitCode=0)");
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
// Docker 실행 실패 시 (DockerRunnerService에서 던진 예외)
|
||||
log.error("[Docker 실행] ✗ 컨테이너 실행 실패!", e);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. request/{}/ 디렉토리에 GeoJSON 파일이 있는지 확인", resultUid);
|
||||
log.error(" 2. Docker 이미지가 정상적으로 존재하는지 확인");
|
||||
log.error(" 3. Docker 볼륨 마운트 경로 확인");
|
||||
log.error(" 4. kamco_full_pipeline.py 스크립트 로그 확인");
|
||||
throw e; // 예외 재발생 → Step 실패 처리
|
||||
}
|
||||
|
||||
log.info("========================================");
|
||||
log.info("Docker 컨테이너 실행 완료 (ResultUid={})", resultUid);
|
||||
log.info(" - 결과물 저장 위치: /dataset/response/{}/", resultUid);
|
||||
log.info("========================================");
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
|
||||
@@ -17,76 +17,191 @@ import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Child Job 실행 Tasklet (Parent Job용)
|
||||
*
|
||||
* <p>진행 중인 모든 분석 회차(AnalCntInfo)를 조회하여 각 회차마다 독립적인 Child Job을 실행합니다. 각 Child Job은 3개의
|
||||
* Step(makeGeoJson → dockerRun → zipResponse)을 순차적으로 실행합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>진행 중인 분석 회차 목록 조회 (tb_map_sheet_anal_inference, anal_state='ING')
|
||||
* <li>각 회차별 처리 필요 여부 판단 (all_cnt != file_cnt)
|
||||
* <li>회차마다 Child Job(processAnalCntInfoJob) 실행
|
||||
* <li>부분 실패 허용 (한 회차 실패해도 다른 회차 계속 처리)
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>실행 조건:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>tb_map_sheet_anal_inference.anal_state = 'ING' (진행 중)
|
||||
* <li>검수 완료 건수(complete_cnt) > 0
|
||||
* <li>all_cnt != file_cnt (아직 파일 생성이 완료되지 않음)
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>실패 정책:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>현재: 부분 실패 허용 (일부 Child Job 실패해도 Parent Job 성공)
|
||||
* <li>변경 가능: 87-89라인 주석 해제 시 하나라도 실패하면 Parent Job 실패
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
* @see com.kamco.cd.geojsonscheduler.batch.ProcessAnalCntInfoJobConfig
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class LaunchChildJobsTasklet implements Tasklet {
|
||||
|
||||
/** 분석 회차 정보 조회를 위한 Repository */
|
||||
private final TrainingDataReviewJobRepository repository;
|
||||
|
||||
/** Child Job을 실행하기 위한 JobLauncher */
|
||||
private final JobLauncher jobLauncher;
|
||||
|
||||
/** 실행할 Child Job (processAnalCntInfoJob) */
|
||||
@Qualifier("processAnalCntInfoJob")
|
||||
private final Job processAnalCntInfoJob;
|
||||
|
||||
/**
|
||||
* Parent Job의 메인 로직 실행
|
||||
*
|
||||
* <p>진행 중인 모든 분석 회차를 조회하여 각 회차마다 Child Job을 실행합니다. 한 회차가 실패해도 다른 회차는 계속 처리되며, 최종적으로 통계를
|
||||
* 로깅합니다.
|
||||
*
|
||||
* @param contribution Step 실행 정보를 담는 객체
|
||||
* @param chunkContext Chunk 실행 컨텍스트
|
||||
* @return RepeatStatus.FINISHED - 작업 완료
|
||||
*/
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("Parent Job 시작: AnalCntInfo 리스트 조회 및 Child Job 실행");
|
||||
log.info("========================================");
|
||||
|
||||
// 진행중인 회차 중, complete_cnt 가 존재하는 회차 목록 가져오기
|
||||
log.info("진행중인 회차 목록 조회 중...");
|
||||
// Step 1: 진행 중인 분석 회차 목록 조회
|
||||
log.info("[Step 1/3] 진행 중인 분석 회차 목록 조회 중...");
|
||||
log.info(" - 조회 조건: anal_state='ING' AND complete_cnt > 0");
|
||||
|
||||
List<AnalCntInfo> analList = repository.findAnalCntInfoList();
|
||||
log.info("진행중인 회차 수: {}", analList.size());
|
||||
|
||||
int processedCount = 0;
|
||||
int skippedCount = 0;
|
||||
int failedCount = 0;
|
||||
log.info("[Step 1/3] 조회 완료");
|
||||
log.info(" - 진행 중인 회차 수: {} 개", analList.size());
|
||||
|
||||
for (AnalCntInfo info : analList) {
|
||||
log.info("----------------------------------------");
|
||||
log.info("회차 검토: AnalUid={}, ResultUid={}", info.getAnalUid(), info.getResultUid());
|
||||
log.info("전체 건수: {}, 파일 건수: {}", info.getAllCnt(), info.getFileCnt());
|
||||
if (analList.isEmpty()) {
|
||||
log.warn("[경고] 처리할 분석 회차가 없습니다.");
|
||||
log.warn(" - 확인사항:");
|
||||
log.warn(" 1. tb_map_sheet_anal_inference 테이블의 anal_state='ING' 데이터 확인");
|
||||
log.warn(" 2. 검수 완료(COMPLETE) 건수가 있는지 확인");
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
// Step 2: 각 회차별 Child Job 실행
|
||||
log.info("[Step 2/3] 회차별 Child Job 실행 시작");
|
||||
|
||||
// 실행 통계를 위한 카운터
|
||||
int processedCount = 0; // 성공적으로 처리된 회차 수
|
||||
int skippedCount = 0; // 건너뛴 회차 수 (이미 처리 완료)
|
||||
int failedCount = 0; // 실패한 회차 수
|
||||
|
||||
// 각 분석 회차별로 Child Job 실행
|
||||
for (int i = 0; i < analList.size(); i++) {
|
||||
AnalCntInfo info = analList.get(i);
|
||||
|
||||
log.info("========================================");
|
||||
log.info("[회차 {}/{}] AnalCntInfo 처리 시작", i + 1, analList.size());
|
||||
log.info(" - AnalUid: {}", info.getAnalUid());
|
||||
log.info(" - ResultUid: {}", info.getResultUid());
|
||||
log.info(" - 전체 건수(all_cnt): {}", info.getAllCnt());
|
||||
log.info(" - 검수 완료 건수(complete_cnt): {}", info.getCompleteCnt());
|
||||
log.info(" - 파일 생성 완료 건수(file_cnt): {}", info.getFileCnt());
|
||||
|
||||
// 처리 필요 여부 판단: all_cnt == file_cnt면 이미 모든 파일이 생성됨
|
||||
if (Objects.equals(info.getAllCnt(), info.getFileCnt())) {
|
||||
log.info("모든 파일이 이미 처리됨. 건너뜀.");
|
||||
log.info("[건너뜀] 모든 파일이 이미 처리 완료됨 (all_cnt={}, file_cnt={})",
|
||||
info.getAllCnt(), info.getFileCnt());
|
||||
log.info(" - 재처리가 필요한 경우 file_create_yn 플래그를 초기화하세요.");
|
||||
skippedCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// Child Job 실행
|
||||
// Child Job Parameters 생성
|
||||
JobParameters jobParameters =
|
||||
new JobParametersBuilder()
|
||||
.addLong("analUid", info.getAnalUid())
|
||||
.addString("resultUid", info.getResultUid())
|
||||
.addLong("timestamp", System.currentTimeMillis()) // 고유성 보장
|
||||
.addLong("timestamp", System.currentTimeMillis()) // JobInstance 고유성 보장
|
||||
.toJobParameters();
|
||||
|
||||
log.info("Child Job 실행 중... (AnalUid={}, ResultUid={})", info.getAnalUid(), info.getResultUid());
|
||||
log.info("[Child Job 실행] processAnalCntInfoJob 시작...");
|
||||
log.info(" - JobParameters: analUid={}, resultUid={}", info.getAnalUid(),
|
||||
info.getResultUid());
|
||||
|
||||
// Child Job 실행 (동기 방식)
|
||||
// 내부적으로 makeGeoJsonStep → dockerRunStep → zipResponseStep 순차 실행
|
||||
long startTime = System.currentTimeMillis();
|
||||
jobLauncher.run(processAnalCntInfoJob, jobParameters);
|
||||
log.info("Child Job 실행 완료 (AnalUid={}, ResultUid={})", info.getAnalUid(), info.getResultUid());
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
|
||||
log.info("[Child Job 완료] ✓ 정상 종료");
|
||||
log.info(" - AnalUid: {}", info.getAnalUid());
|
||||
log.info(" - ResultUid: {}", info.getResultUid());
|
||||
log.info(" - 실행 시간: {} ms ({} 초)", duration, duration / 1000);
|
||||
|
||||
processedCount++;
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Child Job 실행 실패 (AnalUid={}, ResultUid={}): {}", info.getAnalUid(), info.getResultUid(), e.getMessage(), e);
|
||||
// Child Job 실행 실패 시 (Step 실패 또는 예외 발생)
|
||||
log.error("[Child Job 실패] ✗ 실행 중 오류 발생", e);
|
||||
log.error(" - AnalUid: {}", info.getAnalUid());
|
||||
log.error(" - ResultUid: {}", info.getResultUid());
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. batch_step_history 테이블에서 실패한 Step 확인");
|
||||
log.error(" 2. error_message 컬럼에서 상세 에러 내용 확인");
|
||||
|
||||
failedCount++;
|
||||
// 한 개 실패해도 계속 진행
|
||||
|
||||
// 한 회차 실패해도 다음 회차 계속 처리 (부분 실패 허용)
|
||||
log.info("[계속 진행] 다음 회차 처리를 계속합니다.");
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: 최종 통계 및 결과 로깅
|
||||
log.info("========================================");
|
||||
log.info("Parent Job 완료");
|
||||
log.info(" 총 회차 수: {}", analList.size());
|
||||
log.info(" 성공: {}", processedCount);
|
||||
log.info(" 건너뜀: {}", skippedCount);
|
||||
log.info(" 실패: {}", failedCount);
|
||||
log.info("[Step 3/3] Parent Job 실행 결과 요약");
|
||||
log.info(" - 총 회차 수: {} 개", analList.size());
|
||||
log.info(" - 성공: {} 개", processedCount);
|
||||
log.info(" - 건너뜀: {} 개", skippedCount);
|
||||
log.info(" - 실패: {} 개", failedCount);
|
||||
|
||||
// 성공률 계산
|
||||
if (analList.size() > 0) {
|
||||
double successRate =
|
||||
(double) processedCount / (analList.size() - skippedCount) * 100;
|
||||
log.info(" - 성공률: {}% (건너뛴 회차 제외)", String.format("%.2f", successRate));
|
||||
}
|
||||
|
||||
log.info("========================================");
|
||||
|
||||
// 실패 정책 처리
|
||||
if (failedCount > 0) {
|
||||
log.warn("{} 개의 Child Job 실행이 실패했습니다.", failedCount);
|
||||
// 실패가 있어도 Parent Job은 성공으로 처리 (부분 성공)
|
||||
log.warn("[경고] {} 개의 Child Job 실행이 실패했습니다.", failedCount);
|
||||
log.warn(" - 실패 상세 내용은 batch_step_history 테이블을 확인하세요.");
|
||||
log.warn(" - SQL: SELECT * FROM batch_step_history WHERE status='FAILED' ORDER BY"
|
||||
+ " started_dttm DESC;");
|
||||
|
||||
// 실패가 있어도 Parent Job은 성공으로 처리 (부분 성공 정책)
|
||||
// 만약 하나라도 실패하면 Parent Job도 실패로 처리하려면 아래 주석 해제
|
||||
// throw new RuntimeException(failedCount + " 개의 Child Job 실행이 실패했습니다.");
|
||||
// throw new RuntimeException(
|
||||
// String.format("%d 개의 Child Job 실행이 실패했습니다. (성공: %d, 실패: %d)",
|
||||
// failedCount, processedCount, failedCount));
|
||||
} else {
|
||||
log.info("[완료] 모든 Child Job이 정상적으로 완료되었습니다.");
|
||||
}
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
|
||||
@@ -22,98 +22,184 @@ import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* GeoJSON 파일 생성 Tasklet
|
||||
*
|
||||
* <p>검수 완료된 라벨링 데이터를 GeoJSON 형식으로 변환하여 파일로 저장합니다. 각 도엽(Map Sheet)별로 별도의 GeoJSON 파일을
|
||||
* 생성하며, 파일 생성 완료 후 DB에 플래그를 업데이트합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>검수 완료된 도엽 목록 조회
|
||||
* <li>도엽별 라벨링 데이터 조회 및 GeoJSON Feature 변환
|
||||
* <li>GeoJSON 파일 생성 (/dataset/request/{resultUid}/*.geojson)
|
||||
* <li>DB에 파일 생성 완료 플래그 업데이트
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>파일 명명 규칙:</b> {resultUid_8자}_{compareYyyy}_{targetYyyy}_{mapSheetNum}_D15.geojson
|
||||
*
|
||||
* <p><b>예시:</b> ED80D700_2022_2023_3724036_D15.geojson
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class MakeGeoJsonTasklet implements Tasklet {
|
||||
|
||||
/** 라벨링 데이터 조회를 위한 Repository */
|
||||
private final TrainingDataReviewJobRepository repository;
|
||||
|
||||
/** GeoJSON 파일이 저장될 베이스 디렉토리 경로 (예: /kamco-nfs/dataset) */
|
||||
@Value("${training-data.geojson-dir}")
|
||||
private String trainingDataDir;
|
||||
|
||||
/** Job Parameter로 전달받은 분석 회차 UID */
|
||||
@Value("#{jobParameters['analUid']}")
|
||||
private Long analUid;
|
||||
|
||||
/** Job Parameter로 전달받은 결과물 고유 ID (UUID) */
|
||||
@Value("#{jobParameters['resultUid']}")
|
||||
private String resultUid;
|
||||
|
||||
/**
|
||||
* GeoJSON 파일 생성 작업 실행
|
||||
*
|
||||
* <p>검수 완료된 라벨링 데이터를 조회하여 도엽별로 GeoJSON 파일을 생성합니다.
|
||||
*
|
||||
* @param contribution Step 실행 정보를 담는 객체
|
||||
* @param chunkContext Chunk 실행 컨텍스트
|
||||
* @return RepeatStatus.FINISHED - 작업 완료
|
||||
* @throws RuntimeException 생성된 GeoJSON 파일이 없는 경우 또는 파일 생성 실패 시
|
||||
*/
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("GeoJSON 생성 시작 (AnalUid={}, ResultUid={})", analUid, resultUid);
|
||||
log.info("========================================");
|
||||
|
||||
// 검수 완료된 도엽 목록 조회
|
||||
log.info("검수 완료된 도엽 목록 조회 중... (AnalUid={})", analUid);
|
||||
// Step 1: 검수 완료된 도엽 목록 조회
|
||||
log.info("[Step 1/4] 검수 완료된 도엽 목록 조회 중... (AnalUid={})", analUid);
|
||||
List<AnalMapSheetList> analMapList = repository.findCompletedAnalMapSheetList(analUid);
|
||||
log.info("검수 완료된 도엽 수: {}", analMapList.size());
|
||||
log.info("[Step 1/4] 검수 완료된 도엽 수: {}", analMapList.size());
|
||||
|
||||
// 검수 완료된 도엽이 없으면 작업 종료
|
||||
if (analMapList.isEmpty()) {
|
||||
log.warn("검수 완료된 도엽이 없음. 작업 건너뜀.");
|
||||
log.warn("[경고] 검수 완료된 도엽이 없음. 작업을 건너뜁니다.");
|
||||
log.warn(" - 확인사항: tb_labeling_assignment 테이블의 inspect_state='COMPLETE' 데이터가 있는지 확인");
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
int processedMapSheetCount = 0;
|
||||
int totalGeoJsonFiles = 0;
|
||||
// GeoJSON 파일 생성 통계를 위한 카운터
|
||||
int processedMapSheetCount = 0; // 처리된 도엽 수
|
||||
int totalGeoJsonFiles = 0; // 생성된 GeoJSON 파일 수
|
||||
|
||||
// Step 2: 각 도엽별로 GeoJSON 파일 생성
|
||||
log.info("[Step 2/4] 도엽별 GeoJSON 파일 생성 시작 (총 {} 개 도엽)", analMapList.size());
|
||||
|
||||
for (AnalMapSheetList mapSheet : analMapList) {
|
||||
log.info(" 도엽 처리 중: MapSheetNum={}", mapSheet.getMapSheetNum());
|
||||
log.info("----------------------------------------");
|
||||
log.info(" [도엽 처리] MapSheetNum={}", mapSheet.getMapSheetNum());
|
||||
|
||||
// 도엽별 geom 데이터 가져오기
|
||||
// Step 2-1: 도엽별 검수 완료된 라벨링 데이터 조회
|
||||
log.info(" [2-1] 라벨링 데이터 조회 중...");
|
||||
List<CompleteLabelData> completeList =
|
||||
repository.findCompletedYesterdayLabelingList(analUid, mapSheet.getMapSheetNum());
|
||||
log.info(" 완료된 라벨링 데이터 수: {}", completeList.size());
|
||||
log.info(" [2-1] 조회 완료: {} 건", completeList.size());
|
||||
|
||||
if (!completeList.isEmpty()) {
|
||||
// 라벨링 데이터가 없으면 다음 도엽으로
|
||||
if (completeList.isEmpty()) {
|
||||
log.info(" [건너뜀] 라벨링 데이터가 없어 GeoJSON 파일 생성을 건너뜁니다.");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Step 2-2: GeoUID 목록 추출 (DB 업데이트용)
|
||||
log.info(" [2-2] GeoUID 목록 추출 중...");
|
||||
List<Long> geoUids = completeList.stream().map(CompleteLabelData::getGeoUid).toList();
|
||||
log.info(" GeoUID 목록 생성 완료: {} 건", geoUids.size());
|
||||
log.info(" [2-2] GeoUID 목록 생성 완료: {} 건", geoUids.size());
|
||||
|
||||
// Step 2-3: GeoJSON Feature 객체로 변환
|
||||
log.info(" [2-3] GeoJSON Feature 변환 중...");
|
||||
List<GeoJsonFeature> features = completeList.stream().map(GeoJsonFeature::from).toList();
|
||||
log.info(" GeoJSON Feature 변환 완료: {} 개", features.size());
|
||||
log.info(" [2-3] GeoJSON Feature 변환 완료: {} 개", features.size());
|
||||
|
||||
// Step 2-4: FeatureCollection 생성 및 파일명 결정
|
||||
log.info(" [2-4] FeatureCollection 생성 중...");
|
||||
FeatureCollection collection = new FeatureCollection(features);
|
||||
String filename = mapSheet.buildFilename(resultUid);
|
||||
log.info(" GeoJSON 파일명: {}", filename);
|
||||
log.info(" [2-4] GeoJSON 파일명: {}", filename);
|
||||
|
||||
// 형식 /kamco-nfs/dataset/request/uuid/filename
|
||||
// Step 2-5: 파일 저장 경로 생성
|
||||
// 형식: /kamco-nfs/dataset/request/{resultUid}/{filename}.geojson
|
||||
Path outputPath =
|
||||
Paths.get(
|
||||
trainingDataDir + File.separator + "request" + File.separator + resultUid,
|
||||
filename);
|
||||
log.info(" 출력 경로: {}", outputPath);
|
||||
trainingDataDir + File.separator + "request" + File.separator + resultUid, filename);
|
||||
log.info(" [2-5] 출력 경로: {}", outputPath);
|
||||
|
||||
try {
|
||||
// Step 2-6: 디렉토리 생성 (존재하지 않으면)
|
||||
log.info(" [2-6] 디렉토리 생성 중...");
|
||||
Files.createDirectories(outputPath.getParent());
|
||||
log.info(" 디렉토리 생성 완료: {}", outputPath.getParent());
|
||||
log.info(" [2-6] 디렉토리 생성 완료: {}", outputPath.getParent());
|
||||
|
||||
// Step 2-7: GeoJSON 파일 저장 (Pretty Print 포맷)
|
||||
log.info(" [2-7] GeoJSON 파일 저장 중...");
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
|
||||
objectMapper.enable(SerializationFeature.INDENT_OUTPUT); // JSON 들여쓰기 적용
|
||||
objectMapper.writeValue(outputPath.toFile(), collection);
|
||||
log.info(" GeoJSON 파일 저장 완료: {}", outputPath);
|
||||
log.info(" [2-7] ✓ GeoJSON 파일 저장 완료: {}", outputPath);
|
||||
|
||||
// Step 2-8: DB에 파일 생성 완료 플래그 업데이트
|
||||
log.info(" [2-8] DB 파일 생성 플래그 업데이트 중...");
|
||||
repository.updateLearnDataGeomFileCreateYn(geoUids);
|
||||
log.info(" DB 업데이트 완료: {} 건", geoUids.size());
|
||||
log.info(" [2-8] ✓ DB 업데이트 완료: {} 건", geoUids.size());
|
||||
|
||||
// 통계 카운터 증가
|
||||
processedMapSheetCount++;
|
||||
totalGeoJsonFiles++;
|
||||
|
||||
log.info(" [완료] 도엽 '{}' 처리 성공", mapSheet.getMapSheetNum());
|
||||
|
||||
} catch (IOException e) {
|
||||
log.error(" GeoJSON 파일 생성 실패: {}", e.getMessage(), e);
|
||||
// 파일 생성 실패 시 예외 발생 (Step 실패 처리)
|
||||
log.error(" [실패] GeoJSON 파일 생성 실패", e);
|
||||
log.error(" - 파일명: {}", filename);
|
||||
log.error(" - 경로: {}", outputPath);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
throw new RuntimeException("GeoJSON 파일 생성 실패: " + filename, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: 처리 결과 요약 및 검증
|
||||
log.info("========================================");
|
||||
log.info("GeoJSON 생성 완료 (ResultUid={})", resultUid);
|
||||
log.info(" 처리된 도엽 수: {}", processedMapSheetCount);
|
||||
log.info(" 생성된 GeoJSON 파일 수: {}", totalGeoJsonFiles);
|
||||
log.info("[Step 3/4] GeoJSON 생성 작업 완료");
|
||||
log.info(" - ResultUid: {}", resultUid);
|
||||
log.info(" - AnalUid: {}", analUid);
|
||||
log.info(" - 처리 대상 도엽 수: {}", analMapList.size());
|
||||
log.info(" - 처리 완료 도엽 수: {}", processedMapSheetCount);
|
||||
log.info(" - 생성된 GeoJSON 파일 수: {}", totalGeoJsonFiles);
|
||||
log.info("========================================");
|
||||
|
||||
// Step 4: 필수 검증 - 최소 1개 이상의 파일이 생성되어야 함
|
||||
log.info("[Step 4/4] 필수 검증 수행 중...");
|
||||
if (totalGeoJsonFiles == 0) {
|
||||
throw new RuntimeException("생성된 GeoJSON 파일이 없습니다. (AnalUid=" + analUid + ")");
|
||||
log.error("[실패] 생성된 GeoJSON 파일이 없습니다!");
|
||||
log.error(" - AnalUid: {}", analUid);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 조회된 도엽 수: {}", analMapList.size());
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. 각 도엽에 라벨링 데이터가 있는지 확인");
|
||||
log.error(" 2. findCompletedYesterdayLabelingList() 쿼리 결과 확인");
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"생성된 GeoJSON 파일이 없습니다. (AnalUid=%d, ResultUid=%s)", analUid, resultUid));
|
||||
}
|
||||
|
||||
log.info("[Step 4/4] ✓ 검증 완료: {} 개의 GeoJSON 파일이 정상적으로 생성되었습니다.", totalGeoJsonFiles);
|
||||
log.info("GeoJSON 파일 저장 위치: {}/request/{}/", trainingDataDir, resultUid);
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,66 +18,190 @@ import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 결과물 ZIP 압축 Tasklet
|
||||
*
|
||||
* <p>Docker 컨테이너 실행으로 생성된 학습 데이터 결과물을 ZIP 파일로 압축합니다. 압축된 파일은 다운로드 또는 배포를 위해
|
||||
* 사용됩니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>response/{resultUid}/ 디렉토리 검증
|
||||
* <li>디렉토리 내 모든 파일과 서브디렉토리 재귀적 압축
|
||||
* <li>압축 파일 생성: response/{resultUid}.zip
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>압축 설정:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Hidden 파일 제외
|
||||
* <li>디렉토리 구조 유지
|
||||
* <li>버퍼 크기: 1024 bytes
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>실행 조건:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>dockerRunStep이 성공적으로 완료되어야 함
|
||||
* <li>response/{resultUid}/ 디렉토리가 존재해야 함
|
||||
* <li>디렉토리 내에 최소 1개 이상의 파일이 존재해야 함
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class ZipResponseTasklet implements Tasklet {
|
||||
|
||||
/** 학습 데이터 저장 베이스 디렉토리 경로 (예: /kamco-nfs/dataset) */
|
||||
@Value("${training-data.geojson-dir}")
|
||||
private String trainingDataDir;
|
||||
|
||||
/** Job Parameter로 전달받은 결과물 고유 ID (UUID) */
|
||||
@Value("#{jobParameters['resultUid']}")
|
||||
private String resultUid;
|
||||
|
||||
/**
|
||||
* 결과물 압축 작업 실행
|
||||
*
|
||||
* <p>response/{resultUid}/ 디렉토리를 검증하고 ZIP 파일로 압축합니다.
|
||||
*
|
||||
* @param contribution Step 실행 정보를 담는 객체
|
||||
* @param chunkContext Chunk 실행 컨텍스트
|
||||
* @return RepeatStatus.FINISHED - 작업 완료
|
||||
* @throws RuntimeException response 디렉토리가 존재하지 않거나 압축 실패 시
|
||||
*/
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("결과물 압축 시작 (ResultUid={})", resultUid);
|
||||
log.info("========================================");
|
||||
|
||||
// Step 1: 압축 대상 디렉토리 및 출력 파일 경로 설정
|
||||
log.info("[Step 1/5] 경로 설정 중...");
|
||||
Path responseDir =
|
||||
Paths.get(trainingDataDir + File.separator + "response" + File.separator + resultUid);
|
||||
Path zipFile =
|
||||
Paths.get(
|
||||
trainingDataDir + File.separator + "response" + File.separator + resultUid + ".zip");
|
||||
|
||||
log.info("압축 대상 디렉토리: {}", responseDir);
|
||||
log.info("압축 파일 경로: {}", zipFile);
|
||||
log.info("[Step 1/5] 경로 설정 완료");
|
||||
log.info(" - 압축 대상 디렉토리: {}", responseDir);
|
||||
log.info(" - 압축 파일 저장 경로: {}", zipFile);
|
||||
|
||||
// Step 2: response 디렉토리 존재 여부 검증
|
||||
log.info("[Step 2/5] Response 디렉토리 검증 중...");
|
||||
if (!Files.exists(responseDir)) {
|
||||
log.error("Response 디렉토리가 존재하지 않음: {}", responseDir);
|
||||
log.error("[실패] Response 디렉토리가 존재하지 않습니다!");
|
||||
log.error(" - 경로: {}", responseDir);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. dockerRunStep이 정상적으로 완료되었는지 확인");
|
||||
log.error(" 2. Docker 컨테이너가 결과물을 생성했는지 확인");
|
||||
log.error(" 3. Docker 볼륨 마운트 경로가 올바른지 확인");
|
||||
throw new RuntimeException("Response 디렉토리가 존재하지 않습니다: " + responseDir);
|
||||
}
|
||||
log.info("[Step 2/5] ✓ Response 디렉토리 존재 확인");
|
||||
|
||||
// Step 3: 디렉토리 내용 확인 (파일 수 카운트)
|
||||
log.info("[Step 3/5] 디렉토리 내용 분석 중...");
|
||||
File responseDirFile = responseDir.toFile();
|
||||
long fileCount = countFilesRecursively(responseDirFile);
|
||||
log.info("[Step 3/5] 디렉토리 분석 완료");
|
||||
log.info(" - 총 파일 수: {} 개", fileCount);
|
||||
|
||||
if (fileCount == 0) {
|
||||
log.warn("[경고] 압축할 파일이 없습니다. (디렉토리가 비어있음)");
|
||||
log.warn(" - 디렉토리: {}", responseDir);
|
||||
}
|
||||
|
||||
// Step 4: ZIP 압축 실행
|
||||
log.info("[Step 4/5] ZIP 압축 시작...");
|
||||
try {
|
||||
zipDirectory(responseDir.toFile(), zipFile.toFile());
|
||||
log.info("압축 완료: {} (크기: {} bytes)", zipFile, Files.size(zipFile));
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
zipDirectory(responseDirFile, zipFile.toFile());
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
long duration = endTime - startTime;
|
||||
long zipSize = Files.size(zipFile);
|
||||
double zipSizeMB = zipSize / (1024.0 * 1024.0);
|
||||
|
||||
log.info("[Step 4/5] ✓ ZIP 압축 완료");
|
||||
log.info(" - 압축 파일: {}", zipFile);
|
||||
log.info(" - 압축 파일 크기: {} bytes ({} MB)", zipSize, String.format("%.2f", zipSizeMB));
|
||||
log.info(" - 압축 소요 시간: {} ms", duration);
|
||||
|
||||
} catch (IOException e) {
|
||||
log.error("압축 실패: {}", e.getMessage(), e);
|
||||
log.error("[실패] ZIP 압축 중 오류 발생!", e);
|
||||
log.error(" - 소스 디렉토리: {}", responseDir);
|
||||
log.error(" - 대상 ZIP 파일: {}", zipFile);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
throw new RuntimeException("Response 디렉토리 압축 실패: " + responseDir, e);
|
||||
}
|
||||
|
||||
// Step 5: 최종 검증 및 완료
|
||||
log.info("[Step 5/5] 최종 검증 중...");
|
||||
if (Files.exists(zipFile)) {
|
||||
log.info("[Step 5/5] ✓ ZIP 파일 생성 확인 완료");
|
||||
} else {
|
||||
log.error("[실패] ZIP 파일이 생성되지 않았습니다!");
|
||||
throw new RuntimeException("ZIP 파일 생성 실패: " + zipFile);
|
||||
}
|
||||
|
||||
log.info("========================================");
|
||||
log.info("결과물 압축 완료 (ResultUid={})", resultUid);
|
||||
log.info(" - ZIP 파일 위치: {}", zipFile);
|
||||
log.info("========================================");
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
/**
|
||||
* 디렉토리를 ZIP 파일로 압축
|
||||
*
|
||||
* @param sourceDir 압축할 소스 디렉토리
|
||||
* @param zipFile 생성될 ZIP 파일
|
||||
* @throws IOException 압축 중 IO 오류 발생 시
|
||||
*/
|
||||
private void zipDirectory(File sourceDir, File zipFile) throws IOException {
|
||||
log.debug("ZIP 압축 시작: {} -> {}", sourceDir.getAbsolutePath(), zipFile.getAbsolutePath());
|
||||
|
||||
try (FileOutputStream fos = new FileOutputStream(zipFile);
|
||||
ZipOutputStream zos = new ZipOutputStream(fos)) {
|
||||
zipDirectoryRecursive(sourceDir, sourceDir.getName(), zos);
|
||||
}
|
||||
|
||||
log.debug("ZIP 압축 완료: {}", zipFile.getAbsolutePath());
|
||||
}
|
||||
|
||||
/**
|
||||
* 디렉토리를 재귀적으로 ZIP으로 압축
|
||||
*
|
||||
* <p>서브디렉토리를 포함한 모든 파일과 디렉토리를 ZIP에 추가합니다. Hidden 파일은 제외됩니다.
|
||||
*
|
||||
* @param fileToZip 압축할 파일 또는 디렉토리
|
||||
* @param fileName ZIP 엔트리 이름
|
||||
* @param zos ZipOutputStream
|
||||
* @throws IOException 압축 중 IO 오류 발생 시
|
||||
*/
|
||||
private void zipDirectoryRecursive(File fileToZip, String fileName, ZipOutputStream zos)
|
||||
throws IOException {
|
||||
|
||||
// Hidden 파일은 제외
|
||||
if (fileToZip.isHidden()) {
|
||||
log.debug("Hidden 파일 제외: {}", fileToZip.getName());
|
||||
return;
|
||||
}
|
||||
|
||||
// 디렉토리인 경우
|
||||
if (fileToZip.isDirectory()) {
|
||||
log.debug("디렉토리 추가: {}", fileName);
|
||||
|
||||
// 디렉토리 엔트리 생성
|
||||
if (fileName.endsWith("/")) {
|
||||
zos.putNextEntry(new ZipEntry(fileName));
|
||||
zos.closeEntry();
|
||||
@@ -86,6 +210,7 @@ public class ZipResponseTasklet implements Tasklet {
|
||||
zos.closeEntry();
|
||||
}
|
||||
|
||||
// 하위 파일 및 디렉토리 재귀 처리
|
||||
File[] children = fileToZip.listFiles();
|
||||
if (children != null) {
|
||||
for (File childFile : children) {
|
||||
@@ -95,9 +220,13 @@ public class ZipResponseTasklet implements Tasklet {
|
||||
return;
|
||||
}
|
||||
|
||||
// 파일인 경우: 파일 내용을 ZIP에 추가
|
||||
log.debug("파일 추가: {}", fileName);
|
||||
try (FileInputStream fis = new FileInputStream(fileToZip)) {
|
||||
ZipEntry zipEntry = new ZipEntry(fileName);
|
||||
zos.putNextEntry(zipEntry);
|
||||
|
||||
// 파일 내용을 버퍼를 통해 복사
|
||||
byte[] buffer = new byte[1024];
|
||||
int length;
|
||||
while ((length = fis.read(buffer)) >= 0) {
|
||||
@@ -105,4 +234,31 @@ public class ZipResponseTasklet implements Tasklet {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 디렉토리 내 파일 수를 재귀적으로 카운트
|
||||
*
|
||||
* @param directory 카운트할 디렉토리
|
||||
* @return 디렉토리 내 총 파일 수 (서브디렉토리 포함)
|
||||
*/
|
||||
private long countFilesRecursively(File directory) {
|
||||
if (!directory.isDirectory()) {
|
||||
return directory.isFile() ? 1 : 0;
|
||||
}
|
||||
|
||||
File[] children = directory.listFiles();
|
||||
if (children == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
long count = 0;
|
||||
for (File child : children) {
|
||||
if (child.isFile()) {
|
||||
count++;
|
||||
} else if (child.isDirectory()) {
|
||||
count += countFilesRecursively(child);
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,11 +8,46 @@ import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Step 실행 이력 Listener
|
||||
*
|
||||
* <p>각 Step의 시작과 종료 시점에 실행되어 batch_step_history 테이블에 실행 이력을 기록합니다. Step 시작 시 STARTED
|
||||
* 상태로 INSERT하고, Step 종료 시 SUCCESS 또는 FAILED 상태로 UPDATE합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Step 시작 시 DB에 STARTED 상태 기록
|
||||
* <li>Step 종료 시 DB에 SUCCESS 또는 FAILED 상태 업데이트
|
||||
* <li>실패 시 에러 메시지 자동 기록
|
||||
* <li>리스너 자체 오류가 Step 실행에 영향을 주지 않도록 예외 처리
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>적용 대상:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>makeGeoJsonStep
|
||||
* <li>dockerRunStep
|
||||
* <li>zipResponseStep
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>필수 JobParameters:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>analUid (Long): 분석 회차 UID
|
||||
* <li>resultUid (String): 결과물 고유 ID
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
* @see BatchStepHistoryRepository
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class StepHistoryListener implements StepExecutionListener {
|
||||
|
||||
/** Step 이력 DB 저장을 위한 Repository */
|
||||
private final BatchStepHistoryRepository batchStepHistoryRepository;
|
||||
|
||||
@Override
|
||||
|
||||
@@ -8,11 +8,42 @@ import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
/**
|
||||
* Batch Step 실행 이력 Repository
|
||||
*
|
||||
* <p>각 AnalCntInfo의 Step별 실행 이력을 batch_step_history 테이블에 저장하고 조회합니다. Step 시작 시 STARTED
|
||||
* 상태로 INSERT하고, Step 종료 시 SUCCESS 또는 FAILED 상태로 UPDATE합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Step 시작 기록 (status=STARTED)
|
||||
* <li>Step 성공 기록 (status=SUCCESS)
|
||||
* <li>Step 실패 기록 (status=FAILED, error_message 포함)
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>테이블 구조:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>id: Step 이력 고유 ID (BIGSERIAL)
|
||||
* <li>anal_uid: 분석 UID
|
||||
* <li>result_uid: 결과 UID
|
||||
* <li>step_name: Step 이름 (makeGeoJsonStep/dockerRunStep/zipResponseStep)
|
||||
* <li>status: 상태 (STARTED/SUCCESS/FAILED)
|
||||
* <li>error_message: 에러 메시지 (실패 시)
|
||||
* <li>started_dttm: Step 시작 일시
|
||||
* <li>completed_dttm: Step 완료 일시
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Log4j2
|
||||
@Repository
|
||||
@RequiredArgsConstructor
|
||||
public class BatchStepHistoryRepository {
|
||||
|
||||
/** JDBC 쿼리 실행을 위한 Template */
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
/**
|
||||
|
||||
@@ -84,7 +84,7 @@ public class TrainingDataReviewJobRepository {
|
||||
SELECT
|
||||
mslg.geo_uid,
|
||||
'Feature' AS type,
|
||||
ST_AsGeoJSON(mslg.geom) AS geom_str,
|
||||
ST_AsGeoJSON(ST_Transform(mslg.geom, 4326)) AS geom_str,
|
||||
CASE
|
||||
WHEN mslg.class_after_cd IN ('building', 'container') THEN 'M1'
|
||||
WHEN mslg.class_after_cd = 'waste' THEN 'M2'
|
||||
|
||||
@@ -10,79 +10,206 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너 실행 서비스
|
||||
*
|
||||
* <p>학습 데이터 생성 파이프라인이 포함된 Docker 컨테이너를 실행하고 결과를 모니터링합니다. Docker 프로세스의 표준 출력을 실시간으로
|
||||
* 로깅하며, 비정상 종료 시 RuntimeException을 발생시켜 Batch Step을 실패 처리합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Docker run 명령어 생성 (볼륨 마운트, 환경 변수, 파라미터 설정)
|
||||
* <li>Docker 프로세스 실행 및 실시간 로그 출력
|
||||
* <li>Exit Code 검증 (0이 아닐 시 예외 발생)
|
||||
* <li>프로세스 인터럽트 처리
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>Docker 명령어 구조:</b>
|
||||
*
|
||||
* <pre>
|
||||
* docker run --rm
|
||||
* --user {dockerUser}
|
||||
* -v {datasetVolume}
|
||||
* -v {imagesVolume}
|
||||
* --entrypoint python
|
||||
* {dockerImage}
|
||||
* code/kamco_full_pipeline.py
|
||||
* --labelling-folder request/{resultUid}
|
||||
* --output-folder response/{resultUid}
|
||||
* --input_root {inputRoot}
|
||||
* --output_root {outputRoot}
|
||||
* --patch_size {patchSize}
|
||||
* --overlap_pct {overlapPct}
|
||||
* --train_val_test_ratio {train} {val} {test}
|
||||
* --keep_empty_ratio {keepEmptyRatio}
|
||||
* </pre>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
* @see DockerProperties
|
||||
*/
|
||||
@Log4j2
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DockerRunnerService {
|
||||
|
||||
/** Docker 실행 관련 설정 정보 (application.yml) */
|
||||
private final DockerProperties dockerProperties;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너 실행 및 모니터링
|
||||
*
|
||||
* <p>학습 데이터 생성 파이프라인을 Docker 컨테이너로 실행합니다. 컨테이너의 표준 출력을 실시간으로 로깅하며, 비정상 종료 시
|
||||
* RuntimeException을 발생시켜 Step 실패로 처리합니다.
|
||||
*
|
||||
* @param resultUid 결과물 고유 ID (UUID)
|
||||
* @throws RuntimeException Docker 프로세스 실패 (exitCode != 0), IO 오류, 또는 인터럽트 발생 시
|
||||
*/
|
||||
public void run(String resultUid) {
|
||||
// Step 1: Docker 명령어 생성
|
||||
log.info("[Step 1/4] Docker 명령어 생성 중...");
|
||||
List<String> command = buildCommand(resultUid);
|
||||
log.info("Running docker command: {}", String.join(" ", command));
|
||||
log.info("[Step 1/4] Docker 명령어 생성 완료");
|
||||
log.debug(" - 명령어: {}", String.join(" ", command));
|
||||
|
||||
try {
|
||||
// Step 2: Docker 프로세스 시작
|
||||
log.info("[Step 2/4] Docker 프로세스 시작 중...");
|
||||
ProcessBuilder pb = new ProcessBuilder(command);
|
||||
pb.redirectErrorStream(true);
|
||||
pb.redirectErrorStream(true); // stderr를 stdout으로 리다이렉트
|
||||
Process process = pb.start();
|
||||
log.info("[Step 2/4] Docker 프로세스 시작 완료 (PID={})", process.pid());
|
||||
|
||||
// Step 3: Docker 프로세스 출력 실시간 로깅
|
||||
log.info("[Step 3/4] Docker 프로세스 출력 모니터링 중...");
|
||||
int lineCount = 0;
|
||||
try (BufferedReader reader =
|
||||
new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
log.info("[docker] {}", line);
|
||||
lineCount++;
|
||||
}
|
||||
}
|
||||
log.info("[Step 3/4] Docker 프로세스 출력 완료 (총 {} 라인)", lineCount);
|
||||
|
||||
// Step 4: Exit Code 검증
|
||||
log.info("[Step 4/4] Docker 프로세스 종료 대기 중...");
|
||||
int exitCode = process.waitFor();
|
||||
log.info("[Step 4/4] Docker 프로세스 종료 (exitCode={})", exitCode);
|
||||
|
||||
if (exitCode != 0) {
|
||||
// Docker 프로세스 비정상 종료 (Step 실패 처리)
|
||||
log.error("[실패] Docker 프로세스가 비정상 종료되었습니다!");
|
||||
log.error(" - Exit Code: {}", exitCode);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. Docker 컨테이너 로그 확인 (위 [docker] 로그 참조)");
|
||||
log.error(" 2. request/{}/ 디렉토리에 GeoJSON 파일 확인", resultUid);
|
||||
log.error(" 3. Docker 이미지 및 볼륨 마운트 경로 확인");
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Docker process failed with exit code %d for resultUid: %s",
|
||||
exitCode, resultUid));
|
||||
} else {
|
||||
// Docker 프로세스 정상 종료
|
||||
log.info("[성공] Docker 프로세스가 정상 종료되었습니다.");
|
||||
log.info(" - ResultUid: {}", resultUid);
|
||||
log.info(" - 결과물 위치: /dataset/response/{}/", resultUid);
|
||||
}
|
||||
|
||||
int exitCode = process.waitFor();
|
||||
if (exitCode != 0) {
|
||||
log.error("Docker process exited with code {} for resultUid: {}", exitCode, resultUid);
|
||||
throw new RuntimeException(
|
||||
String.format("Docker process failed with exit code %d for resultUid: %s", exitCode, resultUid));
|
||||
} else {
|
||||
log.info("Docker process completed successfully for resultUid: {}", resultUid);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to run docker command for resultUid {}: {}", resultUid, e.getMessage());
|
||||
// Docker 명령어 실행 실패 (파일 시스템 오류 등)
|
||||
log.error("[실패] Docker 명령어 실행 중 IO 오류 발생!", e);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
throw new RuntimeException("Failed to run docker command for resultUid: " + resultUid, e);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Docker process interrupted for resultUid {}: {}", resultUid, e.getMessage());
|
||||
Thread.currentThread().interrupt();
|
||||
// Docker 프로세스 인터럽트 (사용자 취소 또는 시스템 종료)
|
||||
log.error("[인터럽트] Docker 프로세스가 중단되었습니다!", e);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
Thread.currentThread().interrupt(); // 인터럽트 상태 복원
|
||||
throw new RuntimeException("Docker process interrupted for resultUid: " + resultUid, e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> buildCommand(String resultUid) {
|
||||
/**
|
||||
* Docker 명령어 생성
|
||||
*
|
||||
* <p>DockerProperties 설정 정보와 resultUid를 기반으로 Docker run 명령어를 생성합니다.
|
||||
*
|
||||
* @param resultUid 결과물 고유 ID (입력/출력 폴더 경로에 사용)
|
||||
* @return Docker 명령어 문자열 리스트 (ProcessBuilder 실행용)
|
||||
*/
|
||||
private List<String> cmd buildCommand(String resultUid) {
|
||||
log.debug("Docker 명령어 파라미터 구성 중...");
|
||||
|
||||
List<String> cmd = new ArrayList<>();
|
||||
|
||||
// Docker 기본 명령어
|
||||
cmd.add("docker");
|
||||
cmd.add("run");
|
||||
cmd.add("--rm");
|
||||
cmd.add("--rm"); // 컨테이너 종료 시 자동 삭제
|
||||
|
||||
// 사용자 및 권한 설정
|
||||
cmd.add("--user");
|
||||
cmd.add(dockerProperties.getUser());
|
||||
cmd.add(dockerProperties.getUser()); // 예: "1000:1000"
|
||||
log.debug(" - User: {}", dockerProperties.getUser());
|
||||
|
||||
// 볼륨 마운트 (호스트:컨테이너)
|
||||
cmd.add("-v");
|
||||
cmd.add(dockerProperties.getDatasetVolume());
|
||||
cmd.add(dockerProperties.getDatasetVolume()); // 예: "/kamco-nfs/dataset:/dataset"
|
||||
log.debug(" - Dataset Volume: {}", dockerProperties.getDatasetVolume());
|
||||
|
||||
cmd.add("-v");
|
||||
cmd.add(dockerProperties.getImagesVolume());
|
||||
cmd.add(dockerProperties.getImagesVolume()); // 예: "/kamco-nfs/images:/images"
|
||||
log.debug(" - Images Volume: {}", dockerProperties.getImagesVolume());
|
||||
|
||||
// Entrypoint 및 이미지
|
||||
cmd.add("--entrypoint");
|
||||
cmd.add("python");
|
||||
cmd.add(dockerProperties.getImage());
|
||||
cmd.add("python"); // Python으로 스크립트 실행
|
||||
cmd.add(dockerProperties.getImage()); // 예: "kamco/dataset-generator:latest"
|
||||
log.debug(" - Image: {}", dockerProperties.getImage());
|
||||
|
||||
// Python 스크립트 및 파라미터
|
||||
cmd.add("code/kamco_full_pipeline.py");
|
||||
|
||||
// 입출력 폴더 설정
|
||||
cmd.add("--labelling-folder");
|
||||
cmd.add("request/" + resultUid);
|
||||
log.debug(" - Labelling Folder: request/{}", resultUid);
|
||||
|
||||
cmd.add("--output-folder");
|
||||
cmd.add("response/" + resultUid);
|
||||
log.debug(" - Output Folder: response/{}", resultUid);
|
||||
|
||||
// 파이프라인 파라미터
|
||||
cmd.add("--input_root");
|
||||
cmd.add(dockerProperties.getInputRoot());
|
||||
|
||||
cmd.add("--output_root");
|
||||
cmd.add(dockerProperties.getOutputRoot());
|
||||
|
||||
cmd.add("--patch_size");
|
||||
cmd.add(String.valueOf(dockerProperties.getPatchSize()));
|
||||
log.debug(" - Patch Size: {}", dockerProperties.getPatchSize());
|
||||
|
||||
cmd.add("--overlap_pct");
|
||||
cmd.add(String.valueOf(dockerProperties.getOverlapPct()));
|
||||
log.debug(" - Overlap Percent: {}", dockerProperties.getOverlapPct());
|
||||
|
||||
cmd.add("--train_val_test_ratio");
|
||||
cmd.addAll(dockerProperties.getTrainValTestRatio());
|
||||
cmd.addAll(dockerProperties.getTrainValTestRatio()); // 예: ["0.7", "0.2", "0.1"]
|
||||
log.debug(" - Train/Val/Test Ratio: {}", dockerProperties.getTrainValTestRatio());
|
||||
|
||||
cmd.add("--keep_empty_ratio");
|
||||
cmd.add(String.valueOf(dockerProperties.getKeepEmptyRatio()));
|
||||
log.debug(" - Keep Empty Ratio: {}", dockerProperties.getKeepEmptyRatio());
|
||||
|
||||
log.debug("Docker 명령어 파라미터 구성 완료");
|
||||
return cmd;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user