Compare commits
4 Commits
feat/modif
...
feat/dean/
| Author | SHA1 | Date | |
|---|---|---|---|
| 34b5dd928a | |||
| d0a6b88eba | |||
| 48369486a3 | |||
| f55e29f0cf |
687
kamco-make-dataset-generation/README.md
Normal file
687
kamco-make-dataset-generation/README.md
Normal file
@@ -0,0 +1,687 @@
|
||||
# KAMCO Dataset Generation Batch System
|
||||
|
||||
KAMCO 학습 데이터 생성 및 처리를 위한 Spring Batch 시스템입니다.
|
||||
|
||||
## 목차
|
||||
- [시스템 개요](#시스템-개요)
|
||||
- [배치 작업 구조](#배치-작업-구조)
|
||||
- [데이터베이스 스키마](#데이터베이스-스키마)
|
||||
- [실행 흐름](#실행-흐름)
|
||||
- [설정 방법](#설정-방법)
|
||||
- [모니터링 및 로그](#모니터링-및-로그)
|
||||
- [트러블슈팅](#트러블슈팅)
|
||||
|
||||
---
|
||||
|
||||
## 시스템 개요
|
||||
|
||||
### 주요 기능
|
||||
- 검수 완료된 라벨링 데이터를 GeoJSON 형식으로 변환
|
||||
- Docker 컨테이너를 통한 학습 데이터 생성 파이프라인 실행
|
||||
- 생성된 결과물을 ZIP 파일로 압축
|
||||
- 각 처리 단계별 성공/실패 이력을 DB에 자동 기록
|
||||
|
||||
### 기술 스택
|
||||
- **Java 17+**
|
||||
- **Spring Boot 3.x**
|
||||
- **Spring Batch 5.x**
|
||||
- **PostgreSQL**
|
||||
- **Docker**
|
||||
|
||||
---
|
||||
|
||||
## 배치 작업 구조
|
||||
|
||||
### 1. Parent Job: `exportGeoJsonJob`
|
||||
|
||||
Parent Job은 진행 중인 모든 분석 회차를 조회하여 각 회차별로 Child Job을 실행합니다.
|
||||
|
||||
```
|
||||
exportGeoJsonJob (Parent Job)
|
||||
└─ Step: launchChildJobsStep
|
||||
└─ Tasklet: LaunchChildJobsTasklet
|
||||
├─ AnalCntInfo 리스트 조회
|
||||
└─ 각 AnalCntInfo마다 Child Job 실행
|
||||
```
|
||||
|
||||
**실행 조건:**
|
||||
- `tb_map_sheet_anal_inference` 테이블의 `anal_state = 'ING'` (진행 중)
|
||||
- 검수 완료(`COMPLETE`) 건수가 1개 이상 존재
|
||||
- `all_cnt != file_cnt` (아직 파일 생성이 완료되지 않은 경우)
|
||||
|
||||
---
|
||||
|
||||
### 2. Child Job: `processAnalCntInfoJob`
|
||||
|
||||
각 AnalCntInfo(분석 회차)마다 독립적으로 실행되는 서브 작업입니다.
|
||||
|
||||
```
|
||||
processAnalCntInfoJob (Child Job)
|
||||
├─ Step 1: makeGeoJsonStep
|
||||
│ └─ Tasklet: MakeGeoJsonTasklet
|
||||
│ └─ 검수 완료된 라벨링 데이터를 GeoJSON 파일로 생성
|
||||
│ → /dataset/request/{resultUid}/*.geojson
|
||||
│
|
||||
├─ Step 2: dockerRunStep
|
||||
│ └─ Tasklet: DockerRunTasklet
|
||||
│ └─ Docker 컨테이너 실행 (학습 데이터 생성 파이프라인)
|
||||
│ → /dataset/response/{resultUid}/*
|
||||
│
|
||||
└─ Step 3: zipResponseStep
|
||||
└─ Tasklet: ZipResponseTasklet
|
||||
└─ 생성된 결과물을 ZIP으로 압축
|
||||
→ /dataset/response/{resultUid}.zip
|
||||
```
|
||||
|
||||
**JobParameters:**
|
||||
- `analUid` (Long): 분석 회차 UID
|
||||
- `resultUid` (String): 결과물 고유 ID (UUID)
|
||||
- `timestamp` (Long): 고유성 보장을 위한 타임스탬프
|
||||
|
||||
---
|
||||
|
||||
## 데이터베이스 스키마
|
||||
|
||||
### 1. `batch_history` 테이블
|
||||
|
||||
전체 배치 작업(Parent Job) 실행 이력을 기록합니다.
|
||||
|
||||
```sql
|
||||
CREATE TABLE public.batch_history (
|
||||
uuid UUID PRIMARY KEY, -- 배치 실행 고유 ID
|
||||
job VARCHAR(255) NOT NULL, -- 배치 작업 이름 (exportGeoJsonJob)
|
||||
id VARCHAR(255) NOT NULL, -- 비즈니스 ID
|
||||
created_dttm TIMESTAMP NOT NULL, -- 생성 일시
|
||||
updated_dttm TIMESTAMP NOT NULL, -- 수정 일시
|
||||
status VARCHAR(50) NOT NULL, -- 상태 (STARTED/COMPLETED/FAILED)
|
||||
completed_dttm TIMESTAMP -- 완료 일시
|
||||
);
|
||||
```
|
||||
|
||||
**인덱스:**
|
||||
- `idx_batch_history_job` (job)
|
||||
- `idx_batch_history_status` (status)
|
||||
- `idx_batch_history_created` (created_dttm DESC)
|
||||
|
||||
---
|
||||
|
||||
### 2. `batch_step_history` 테이블
|
||||
|
||||
각 AnalCntInfo의 Step별 실행 이력을 기록합니다.
|
||||
|
||||
```sql
|
||||
CREATE TABLE public.batch_step_history (
|
||||
id BIGSERIAL PRIMARY KEY, -- Step 이력 고유 ID
|
||||
anal_uid BIGINT NOT NULL, -- 분석 UID
|
||||
result_uid VARCHAR(255) NOT NULL, -- 결과 UID
|
||||
step_name VARCHAR(100) NOT NULL, -- Step 이름
|
||||
status VARCHAR(50) NOT NULL, -- 상태 (STARTED/SUCCESS/FAILED)
|
||||
error_message TEXT, -- 에러 메시지 (최대 1000자)
|
||||
started_dttm TIMESTAMP NOT NULL, -- Step 시작 일시
|
||||
completed_dttm TIMESTAMP, -- Step 완료 일시
|
||||
created_dttm TIMESTAMP NOT NULL, -- 생성 일시
|
||||
updated_dttm TIMESTAMP NOT NULL -- 수정 일시
|
||||
);
|
||||
```
|
||||
|
||||
**Step 이름:**
|
||||
- `makeGeoJsonStep`: GeoJSON 파일 생성
|
||||
- `dockerRunStep`: Docker 컨테이너 실행
|
||||
- `zipResponseStep`: 결과물 ZIP 압축
|
||||
|
||||
**인덱스:**
|
||||
- `idx_batch_step_history_anal_uid` (anal_uid)
|
||||
- `idx_batch_step_history_result_uid` (result_uid)
|
||||
- `idx_batch_step_history_status` (status)
|
||||
- `idx_batch_step_history_step_name` (step_name)
|
||||
|
||||
---
|
||||
|
||||
## 실행 흐름
|
||||
|
||||
### 전체 프로세스
|
||||
|
||||
```
|
||||
1. Parent Job 시작
|
||||
↓
|
||||
2. 진행 중인 AnalCntInfo 리스트 조회
|
||||
↓
|
||||
3. 각 AnalCntInfo마다 반복:
|
||||
↓
|
||||
├─ 3.1. Child Job 실행 (processAnalCntInfoJob)
|
||||
│ ↓
|
||||
│ ├─ Step 1: makeGeoJsonStep
|
||||
│ │ - beforeStep: DB에 STARTED 기록
|
||||
│ │ - Tasklet 실행: GeoJSON 파일 생성
|
||||
│ │ - afterStep: DB에 SUCCESS/FAILED 기록
|
||||
│ │
|
||||
│ ├─ Step 2: dockerRunStep
|
||||
│ │ - beforeStep: DB에 STARTED 기록
|
||||
│ │ - Tasklet 실행: Docker 컨테이너 실행
|
||||
│ │ - afterStep: DB에 SUCCESS/FAILED 기록
|
||||
│ │
|
||||
│ └─ Step 3: zipResponseStep
|
||||
│ - beforeStep: DB에 STARTED 기록
|
||||
│ - Tasklet 실행: 결과물 ZIP 압축
|
||||
│ - afterStep: DB에 SUCCESS/FAILED 기록
|
||||
│
|
||||
└─ 3.2. 다음 AnalCntInfo 처리
|
||||
↓
|
||||
4. Parent Job 종료 (부분 성공 허용)
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Step 1: makeGeoJsonStep
|
||||
|
||||
**목적:** 검수 완료된 라벨링 데이터를 GeoJSON 파일로 변환
|
||||
|
||||
**처리 과정:**
|
||||
1. `findCompletedAnalMapSheetList()`: 검수 완료된 도엽 목록 조회
|
||||
2. 각 도엽별로:
|
||||
- `findCompletedYesterdayLabelingList()`: 어제까지 검수 완료된 데이터 조회
|
||||
- GeoJSON Feature 생성
|
||||
- `/dataset/request/{resultUid}/{filename}.geojson` 저장
|
||||
- `updateLearnDataGeomFileCreateYn()`: DB에 파일 생성 완료 플래그 업데이트
|
||||
|
||||
**출력 파일 형식:**
|
||||
```
|
||||
{resultUid_8자}_{compareYyyy}_{targetYyyy}_{mapSheetNum}_D15.geojson
|
||||
```
|
||||
|
||||
**예시:**
|
||||
```
|
||||
ED80D700_2022_2023_3724036_D15.geojson
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Step 2: dockerRunStep
|
||||
|
||||
**목적:** Docker 컨테이너를 통해 학습 데이터 생성 파이프라인 실행
|
||||
|
||||
**Docker 명령어:**
|
||||
```bash
|
||||
docker run --rm \
|
||||
--user {dockerUser} \
|
||||
-v {datasetVolume} \
|
||||
-v {imagesVolume} \
|
||||
--entrypoint python \
|
||||
{dockerImage} \
|
||||
code/kamco_full_pipeline.py \
|
||||
--labelling-folder request/{resultUid} \
|
||||
--output-folder response/{resultUid} \
|
||||
--input_root {inputRoot} \
|
||||
--output_root {outputRoot} \
|
||||
--patch_size {patchSize} \
|
||||
--overlap_pct {overlapPct} \
|
||||
--train_val_test_ratio {train} {val} {test} \
|
||||
--keep_empty_ratio {keepEmptyRatio}
|
||||
```
|
||||
|
||||
**에러 처리:**
|
||||
- Docker 프로세스의 `exitCode != 0` 시 `RuntimeException` 발생
|
||||
- Step 실패로 처리되어 DB에 `FAILED` 상태 기록
|
||||
- 에러 메시지와 exit code가 `error_message` 컬럼에 저장됨
|
||||
|
||||
---
|
||||
|
||||
### Step 3: zipResponseStep
|
||||
|
||||
**목적:** 생성된 학습 데이터를 ZIP 파일로 압축
|
||||
|
||||
**처리 과정:**
|
||||
1. `/dataset/response/{resultUid}/` 디렉토리 검증
|
||||
2. 디렉토리 내 모든 파일과 서브디렉토리를 재귀적으로 압축
|
||||
3. `/dataset/response/{resultUid}.zip` 파일 생성
|
||||
|
||||
**압축 설정:**
|
||||
- Hidden 파일 제외
|
||||
- 디렉토리 구조 유지
|
||||
- 버퍼 크기: 1024 bytes
|
||||
|
||||
---
|
||||
|
||||
## 설정 방법
|
||||
|
||||
### application.yml 설정
|
||||
|
||||
```yaml
|
||||
# 학습 데이터 디렉토리 경로
|
||||
training-data:
|
||||
geojson-dir: /kamco-nfs/dataset
|
||||
|
||||
# Docker 설정
|
||||
docker:
|
||||
user: "1000:1000"
|
||||
image: "kamco/dataset-generator:latest"
|
||||
dataset-volume: "/kamco-nfs/dataset:/dataset"
|
||||
images-volume: "/kamco-nfs/images:/images"
|
||||
input-root: "/dataset"
|
||||
output-root: "/dataset"
|
||||
patch-size: 512
|
||||
overlap-pct: 0.2
|
||||
train-val-test-ratio:
|
||||
- "0.7"
|
||||
- "0.2"
|
||||
- "0.1"
|
||||
keep-empty-ratio: 0.5
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 환경 변수
|
||||
|
||||
| 환경 변수 | 설명 | 기본값 |
|
||||
|----------|------|--------|
|
||||
| `TRAINING_DATA_GEOJSON_DIR` | GeoJSON 파일 저장 경로 | `/kamco-nfs/dataset` |
|
||||
| `DOCKER_USER` | Docker 컨테이너 실행 유저 | `1000:1000` |
|
||||
| `DOCKER_IMAGE` | Docker 이미지 이름 | `kamco/dataset-generator:latest` |
|
||||
|
||||
---
|
||||
|
||||
## 모니터링 및 로그
|
||||
|
||||
### 로그 레벨
|
||||
|
||||
```yaml
|
||||
logging:
|
||||
level:
|
||||
com.kamco.cd.geojsonscheduler: INFO
|
||||
com.kamco.cd.geojsonscheduler.batch: DEBUG
|
||||
org.springframework.batch: INFO
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 주요 로그 포인트
|
||||
|
||||
#### Parent Job 로그
|
||||
```log
|
||||
[INFO] Parent Job 시작: AnalCntInfo 리스트 조회 및 Child Job 실행
|
||||
[INFO] 진행중인 회차 목록 조회 중...
|
||||
[INFO] 진행중인 회차 수: 3
|
||||
[INFO] 회차 검토: AnalUid=100, ResultUid=ED80D700...
|
||||
[INFO] Child Job 실행 중... (AnalUid=100, ResultUid=ED80D700...)
|
||||
[INFO] Child Job 실행 완료 (AnalUid=100, ResultUid=ED80D700...)
|
||||
[INFO] Parent Job 완료 - 성공: 2, 건너뜀: 1, 실패: 0
|
||||
```
|
||||
|
||||
#### Step 로그
|
||||
```log
|
||||
[INFO] ========================================
|
||||
[INFO] GeoJSON 생성 시작 (AnalUid=100, ResultUid=ED80D700...)
|
||||
[INFO] 검수 완료된 도엽 수: 5
|
||||
[INFO] 도엽 처리 중: MapSheetNum=3724036
|
||||
[INFO] 완료된 라벨링 데이터 수: 150
|
||||
[INFO] GeoJSON 파일 저장 완료: /dataset/request/ED80D700.../ED80D700_2022_2023_3724036_D15.geojson
|
||||
[INFO] GeoJSON 생성 완료 (ResultUid=ED80D700...) - 처리된 도엽 수: 5, 생성된 파일 수: 5
|
||||
[INFO] ========================================
|
||||
```
|
||||
|
||||
#### Docker 실행 로그
|
||||
```log
|
||||
[INFO] ========================================
|
||||
[INFO] Docker 컨테이너 실행 시작 (ResultUid=ED80D700...)
|
||||
[INFO] Running docker command: docker run --rm --user 1000:1000...
|
||||
[INFO] [docker] Loading configuration...
|
||||
[INFO] [docker] Processing pipeline...
|
||||
[INFO] [docker] Pipeline completed successfully
|
||||
[INFO] Docker process completed successfully for resultUid: ED80D700...
|
||||
[INFO] ========================================
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### DB 조회 쿼리
|
||||
|
||||
#### 1. 특정 회차의 모든 Step 실행 이력
|
||||
```sql
|
||||
SELECT
|
||||
step_name,
|
||||
status,
|
||||
started_dttm,
|
||||
completed_dttm,
|
||||
EXTRACT(EPOCH FROM (completed_dttm - started_dttm)) AS duration_seconds,
|
||||
error_message
|
||||
FROM batch_step_history
|
||||
WHERE anal_uid = 100
|
||||
AND result_uid = 'ED80D700A0F5482BB0EC11A366DEA8DE'
|
||||
ORDER BY started_dttm;
|
||||
```
|
||||
|
||||
#### 2. 최근 실패한 Step 조회
|
||||
```sql
|
||||
SELECT
|
||||
anal_uid,
|
||||
result_uid,
|
||||
step_name,
|
||||
error_message,
|
||||
started_dttm,
|
||||
completed_dttm
|
||||
FROM batch_step_history
|
||||
WHERE status = 'FAILED'
|
||||
ORDER BY started_dttm DESC
|
||||
LIMIT 10;
|
||||
```
|
||||
|
||||
#### 3. Step별 성공률 통계
|
||||
```sql
|
||||
SELECT
|
||||
step_name,
|
||||
COUNT(*) AS total_executions,
|
||||
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END) AS success_count,
|
||||
SUM(CASE WHEN status = 'FAILED' THEN 1 ELSE 0 END) AS failed_count,
|
||||
ROUND(
|
||||
SUM(CASE WHEN status = 'SUCCESS' THEN 1 ELSE 0 END)::NUMERIC / COUNT(*) * 100,
|
||||
2
|
||||
) AS success_rate_pct
|
||||
FROM batch_step_history
|
||||
GROUP BY step_name;
|
||||
```
|
||||
|
||||
#### 4. 평균 실행 시간 (Step별)
|
||||
```sql
|
||||
SELECT
|
||||
step_name,
|
||||
COUNT(*) AS total_executions,
|
||||
AVG(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS avg_duration_seconds,
|
||||
MIN(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS min_duration_seconds,
|
||||
MAX(EXTRACT(EPOCH FROM (completed_dttm - started_dttm))) AS max_duration_seconds
|
||||
FROM batch_step_history
|
||||
WHERE status = 'SUCCESS'
|
||||
AND completed_dttm IS NOT NULL
|
||||
GROUP BY step_name;
|
||||
```
|
||||
|
||||
#### 5. 특정 기간 동안 처리된 회차 수
|
||||
```sql
|
||||
SELECT
|
||||
DATE(started_dttm) AS execution_date,
|
||||
COUNT(DISTINCT result_uid) AS processed_count
|
||||
FROM batch_step_history
|
||||
WHERE step_name = 'makeGeoJsonStep'
|
||||
AND started_dttm >= CURRENT_DATE - INTERVAL '7 days'
|
||||
GROUP BY DATE(started_dttm)
|
||||
ORDER BY execution_date DESC;
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 트러블슈팅
|
||||
|
||||
### 1. Docker 컨테이너 실행 실패
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[ERROR] Docker process exited with code 1 for resultUid: ED80D700...
|
||||
FileNotFoundError: Missing training pairs root at /dataset/response/.../tifs/train
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- GeoJSON 파일이 생성되지 않았거나 잘못된 형식
|
||||
- Docker 볼륨 마운트 경로 불일치
|
||||
- 파이프라인 실행 중 필수 파일 누락
|
||||
|
||||
**해결 방법:**
|
||||
1. `batch_step_history` 테이블에서 `makeGeoJsonStep` 상태 확인
|
||||
```sql
|
||||
SELECT * FROM batch_step_history
|
||||
WHERE result_uid = 'ED80D700...'
|
||||
AND step_name = 'makeGeoJsonStep';
|
||||
```
|
||||
|
||||
2. GeoJSON 파일 존재 여부 확인
|
||||
```bash
|
||||
ls -la /kamco-nfs/dataset/request/ED80D700.../
|
||||
```
|
||||
|
||||
3. Docker 볼륨 설정 확인
|
||||
```yaml
|
||||
docker:
|
||||
dataset-volume: "/kamco-nfs/dataset:/dataset" # 호스트:컨테이너
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 2. GeoJSON 파일이 생성되지 않음
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[WARN] 검수 완료된 도엽이 없음. 작업 건너뜀.
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- 검수 완료(`COMPLETE`) 상태의 데이터가 없음
|
||||
- `inspect_stat_dttm`이 오늘 이후 (어제까지만 조회)
|
||||
|
||||
**해결 방법:**
|
||||
1. 검수 완료 데이터 확인
|
||||
```sql
|
||||
SELECT COUNT(*)
|
||||
FROM tb_labeling_assignment
|
||||
WHERE anal_uid = 100
|
||||
AND inspect_state = 'COMPLETE';
|
||||
```
|
||||
|
||||
2. 검수 완료 시간 확인
|
||||
```sql
|
||||
SELECT MAX(inspect_stat_dttm)
|
||||
FROM tb_labeling_assignment
|
||||
WHERE anal_uid = 100
|
||||
AND inspect_state = 'COMPLETE';
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 3. ZIP 파일 생성 실패
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[ERROR] Response 디렉토리가 존재하지 않음: /dataset/response/ED80D700...
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- Docker Step에서 결과물이 생성되지 않음
|
||||
- Docker 컨테이너가 중간에 실패했지만 감지되지 않음
|
||||
|
||||
**해결 방법:**
|
||||
1. Docker Step 상태 확인
|
||||
```sql
|
||||
SELECT * FROM batch_step_history
|
||||
WHERE result_uid = 'ED80D700...'
|
||||
AND step_name = 'dockerRunStep';
|
||||
```
|
||||
|
||||
2. Response 디렉토리 확인
|
||||
```bash
|
||||
ls -la /kamco-nfs/dataset/response/ED80D700.../
|
||||
```
|
||||
|
||||
3. Docker 컨테이너 로그 확인 (DB의 `error_message` 컬럼)
|
||||
|
||||
---
|
||||
|
||||
### 4. Child Job이 실행되지 않음
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[INFO] 모든 파일이 이미 처리됨. 건너뜀.
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- `all_cnt == file_cnt` (이미 모든 파일이 생성됨)
|
||||
- 재실행이 필요한 경우 플래그를 초기화하지 않음
|
||||
|
||||
**해결 방법:**
|
||||
1. 파일 생성 플래그 초기화
|
||||
```sql
|
||||
UPDATE tb_map_sheet_learn_data_geom
|
||||
SET file_create_yn = false,
|
||||
updated_dttm = NOW()
|
||||
WHERE geo_uid IN (
|
||||
SELECT inference_geom_uid
|
||||
FROM tb_labeling_assignment
|
||||
WHERE anal_uid = 100
|
||||
);
|
||||
```
|
||||
|
||||
2. 배치 재실행
|
||||
|
||||
---
|
||||
|
||||
### 5. 배치 작업 전체가 실패함
|
||||
|
||||
**증상:**
|
||||
```log
|
||||
[ERROR] Child Job 실행 실패 (AnalUid=100, ResultUid=ED80D700...): ...
|
||||
[WARN] 3 개의 Child Job 실행이 실패했습니다.
|
||||
```
|
||||
|
||||
**원인:**
|
||||
- 여러 회차에서 동시에 실패 발생
|
||||
- 현재는 부분 실패를 허용하도록 설정됨
|
||||
|
||||
**해결 방법:**
|
||||
1. 실패한 회차 확인
|
||||
```sql
|
||||
SELECT DISTINCT anal_uid, result_uid
|
||||
FROM batch_step_history
|
||||
WHERE status = 'FAILED'
|
||||
AND started_dttm >= CURRENT_DATE;
|
||||
```
|
||||
|
||||
2. 각 회차별로 실패 원인 분석
|
||||
```sql
|
||||
SELECT step_name, error_message
|
||||
FROM batch_step_history
|
||||
WHERE anal_uid = 100
|
||||
AND status = 'FAILED';
|
||||
```
|
||||
|
||||
3. 실패 정책 변경 (필요 시)
|
||||
- `LaunchChildJobsTasklet.java:87-89` 주석 해제하여 하나라도 실패 시 Parent Job 실패 처리
|
||||
|
||||
---
|
||||
|
||||
## 개발자 가이드
|
||||
|
||||
### 새로운 Step 추가하기
|
||||
|
||||
1. **Tasklet 생성**
|
||||
```java
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class NewStepTasklet implements Tasklet {
|
||||
@Value("#{jobParameters['analUid']}")
|
||||
private Long analUid;
|
||||
|
||||
@Value("#{jobParameters['resultUid']}")
|
||||
private String resultUid;
|
||||
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
// 로직 구현
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
2. **JobConfig에 Step 등록**
|
||||
```java
|
||||
@Bean
|
||||
public Step newStep() {
|
||||
return new StepBuilder("newStep", jobRepository)
|
||||
.tasklet(newStepTasklet, transactionManager)
|
||||
.listener(stepHistoryListener) // 이력 기록
|
||||
.build();
|
||||
}
|
||||
```
|
||||
|
||||
3. **Job Flow에 추가**
|
||||
```java
|
||||
@Bean
|
||||
public Job processAnalCntInfoJob() {
|
||||
return new JobBuilder("processAnalCntInfoJob", jobRepository)
|
||||
.start(makeGeoJsonStep())
|
||||
.next(dockerRunStep())
|
||||
.next(zipResponseStep())
|
||||
.next(newStep()) // 새로운 Step 추가
|
||||
.build();
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 로그 커스터마이징
|
||||
|
||||
**로그 레벨 변경:**
|
||||
```yaml
|
||||
logging:
|
||||
level:
|
||||
com.kamco.cd.geojsonscheduler.batch.DockerRunTasklet: DEBUG
|
||||
```
|
||||
|
||||
**특정 Step만 로그 출력:**
|
||||
```java
|
||||
@Slf4j
|
||||
public class CustomTasklet implements Tasklet {
|
||||
@Override
|
||||
public RepeatStatus execute(...) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("상세 디버그 정보: {}", details);
|
||||
}
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 배포 및 운영
|
||||
|
||||
### 배포 절차
|
||||
|
||||
1. **빌드**
|
||||
```bash
|
||||
./gradlew clean build
|
||||
```
|
||||
|
||||
2. **Docker 이미지 빌드**
|
||||
```bash
|
||||
docker build -t kamco-batch:latest .
|
||||
```
|
||||
|
||||
3. **실행**
|
||||
```bash
|
||||
java -jar build/libs/kamco-geojson-scheduler-1.0.0.jar
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 스케줄링 설정
|
||||
|
||||
Spring Scheduler를 사용한 정기 실행:
|
||||
|
||||
```java
|
||||
@Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
|
||||
public void runBatch() {
|
||||
JobParameters jobParameters = new JobParametersBuilder()
|
||||
.addLong("timestamp", System.currentTimeMillis())
|
||||
.toJobParameters();
|
||||
|
||||
jobLauncher.run(exportGeoJsonJob, jobParameters);
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 라이선스
|
||||
|
||||
Copyright (c) 2024 KAMCO. All rights reserved.
|
||||
|
||||
---
|
||||
|
||||
## 문의
|
||||
|
||||
기술 지원: tech-support@kamco.co.kr
|
||||
Binary file not shown.
@@ -0,0 +1,108 @@
|
||||
package com.kamco.cd.geojsonscheduler.batch;
|
||||
|
||||
import com.kamco.cd.geojsonscheduler.service.DockerRunnerService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너 실행 Tasklet
|
||||
*
|
||||
* <p>학습 데이터 생성 파이프라인이 담긴 Docker 컨테이너를 실행합니다. 이전 Step(makeGeoJsonStep)에서 생성된 GeoJSON
|
||||
* 파일들을 입력으로 받아 학습 데이터셋을 생성합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Docker 컨테이너 실행 (kamco_full_pipeline.py)
|
||||
* <li>입력: /dataset/request/{resultUid}/*.geojson
|
||||
* <li>출력: /dataset/response/{resultUid}/*
|
||||
* <li>Docker 실행 실패 시 RuntimeException 발생 (Step 실패 처리)
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>실행 조건:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>makeGeoJsonStep이 성공적으로 완료되어야 함
|
||||
* <li>request/{resultUid}/ 디렉토리에 GeoJSON 파일이 존재해야 함
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>Docker Exit Code 처리:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Exit Code = 0: 정상 종료 (Step 성공)
|
||||
* <li>Exit Code != 0: 비정상 종료 (RuntimeException 발생 → Step 실패)
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
* @see DockerRunnerService
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class DockerRunTasklet implements Tasklet {
|
||||
|
||||
/** Docker 컨테이너 실행을 담당하는 서비스 */
|
||||
private final DockerRunnerService dockerRunnerService;
|
||||
|
||||
/** Job Parameter로 전달받은 결과물 고유 ID (UUID) */
|
||||
@Value("#{jobParameters['resultUid']}")
|
||||
private String resultUid;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너 실행 작업 수행
|
||||
*
|
||||
* <p>DockerRunnerService를 통해 학습 데이터 생성 파이프라인을 실행합니다. Docker 프로세스가 비정상 종료될 경우
|
||||
* RuntimeException이 발생하여 Step이 실패 처리됩니다.
|
||||
*
|
||||
* @param contribution Step 실행 정보를 담는 객체
|
||||
* @param chunkContext Chunk 실행 컨텍스트
|
||||
* @return RepeatStatus.FINISHED - 작업 완료
|
||||
* @throws RuntimeException Docker 프로세스가 비정상 종료(exitCode != 0)된 경우
|
||||
*/
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("Docker 컨테이너 실행 시작 (ResultUid={})", resultUid);
|
||||
log.info("========================================");
|
||||
|
||||
// 실행 전 사전 정보 로깅
|
||||
log.info("[사전 정보]");
|
||||
log.info(" - 입력 디렉토리: /dataset/request/{}/", resultUid);
|
||||
log.info(" - 출력 디렉토리: /dataset/response/{}/", resultUid);
|
||||
log.info(" - 파이프라인: kamco_full_pipeline.py");
|
||||
|
||||
try {
|
||||
// DockerRunnerService를 통해 Docker 컨테이너 실행
|
||||
// exitCode != 0 시 RuntimeException 발생
|
||||
log.info("[Docker 실행] 컨테이너 시작...");
|
||||
dockerRunnerService.run(resultUid);
|
||||
log.info("[Docker 실행] ✓ 컨테이너 정상 종료 (exitCode=0)");
|
||||
|
||||
} catch (RuntimeException e) {
|
||||
// Docker 실행 실패 시 (DockerRunnerService에서 던진 예외)
|
||||
log.error("[Docker 실행] ✗ 컨테이너 실행 실패!", e);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. request/{}/ 디렉토리에 GeoJSON 파일이 있는지 확인", resultUid);
|
||||
log.error(" 2. Docker 이미지가 정상적으로 존재하는지 확인");
|
||||
log.error(" 3. Docker 볼륨 마운트 경로 확인");
|
||||
log.error(" 4. kamco_full_pipeline.py 스크립트 로그 확인");
|
||||
throw e; // 예외 재발생 → Step 실패 처리
|
||||
}
|
||||
|
||||
log.info("========================================");
|
||||
log.info("Docker 컨테이너 실행 완료 (ResultUid={})", resultUid);
|
||||
log.info(" - 결과물 저장 위치: /dataset/response/{}/", resultUid);
|
||||
log.info("========================================");
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import com.kamco.cd.geojsonscheduler.listener.BatchHistoryListener;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
@@ -18,20 +17,20 @@ public class ExportGeoJsonJobConfig {
|
||||
|
||||
private final JobRepository jobRepository;
|
||||
private final PlatformTransactionManager transactionManager;
|
||||
private final ExportGeoJsonTasklet exportGeoJsonTasklet;
|
||||
private final LaunchChildJobsTasklet launchChildJobsTasklet;
|
||||
|
||||
@Bean
|
||||
public Job exportGeoJsonJob(BatchHistoryListener historyListener) { // 1. 리스너 주입 받기
|
||||
public Job exportGeoJsonJob(BatchHistoryListener historyListener) {
|
||||
return new JobBuilder("exportGeoJsonJob", jobRepository)
|
||||
.listener(historyListener) // 2. 리스너 등록
|
||||
.start(exportGeoJsonStep())
|
||||
.listener(historyListener)
|
||||
.start(launchChildJobsStep())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step exportGeoJsonStep() {
|
||||
return new StepBuilder("exportGeoJsonStep", jobRepository)
|
||||
.tasklet(exportGeoJsonTasklet, transactionManager)
|
||||
public Step launchChildJobsStep() {
|
||||
return new StepBuilder("launchChildJobsStep", jobRepository)
|
||||
.tasklet(launchChildJobsTasklet, transactionManager)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,209 @@
|
||||
package com.kamco.cd.geojsonscheduler.batch;
|
||||
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalCntInfo;
|
||||
import com.kamco.cd.geojsonscheduler.repository.TrainingDataReviewJobRepository;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.JobParameters;
|
||||
import org.springframework.batch.core.JobParametersBuilder;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.launch.JobLauncher;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Child Job 실행 Tasklet (Parent Job용)
|
||||
*
|
||||
* <p>진행 중인 모든 분석 회차(AnalCntInfo)를 조회하여 각 회차마다 독립적인 Child Job을 실행합니다. 각 Child Job은 3개의
|
||||
* Step(makeGeoJson → dockerRun → zipResponse)을 순차적으로 실행합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>진행 중인 분석 회차 목록 조회 (tb_map_sheet_anal_inference, anal_state='ING')
|
||||
* <li>각 회차별 처리 필요 여부 판단 (all_cnt != file_cnt)
|
||||
* <li>회차마다 Child Job(processAnalCntInfoJob) 실행
|
||||
* <li>부분 실패 허용 (한 회차 실패해도 다른 회차 계속 처리)
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>실행 조건:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>tb_map_sheet_anal_inference.anal_state = 'ING' (진행 중)
|
||||
* <li>검수 완료 건수(complete_cnt) > 0
|
||||
* <li>all_cnt != file_cnt (아직 파일 생성이 완료되지 않음)
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>실패 정책:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>현재: 부분 실패 허용 (일부 Child Job 실패해도 Parent Job 성공)
|
||||
* <li>변경 가능: 87-89라인 주석 해제 시 하나라도 실패하면 Parent Job 실패
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
* @see com.kamco.cd.geojsonscheduler.batch.ProcessAnalCntInfoJobConfig
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class LaunchChildJobsTasklet implements Tasklet {
|
||||
|
||||
/** 분석 회차 정보 조회를 위한 Repository */
|
||||
private final TrainingDataReviewJobRepository repository;
|
||||
|
||||
/** Child Job을 실행하기 위한 JobLauncher */
|
||||
private final JobLauncher jobLauncher;
|
||||
|
||||
/** 실행할 Child Job (processAnalCntInfoJob) */
|
||||
@Qualifier("processAnalCntInfoJob")
|
||||
private final Job processAnalCntInfoJob;
|
||||
|
||||
/**
|
||||
* Parent Job의 메인 로직 실행
|
||||
*
|
||||
* <p>진행 중인 모든 분석 회차를 조회하여 각 회차마다 Child Job을 실행합니다. 한 회차가 실패해도 다른 회차는 계속 처리되며, 최종적으로 통계를
|
||||
* 로깅합니다.
|
||||
*
|
||||
* @param contribution Step 실행 정보를 담는 객체
|
||||
* @param chunkContext Chunk 실행 컨텍스트
|
||||
* @return RepeatStatus.FINISHED - 작업 완료
|
||||
*/
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("Parent Job 시작: AnalCntInfo 리스트 조회 및 Child Job 실행");
|
||||
log.info("========================================");
|
||||
|
||||
// Step 1: 진행 중인 분석 회차 목록 조회
|
||||
log.info("[Step 1/3] 진행 중인 분석 회차 목록 조회 중...");
|
||||
log.info(" - 조회 조건: anal_state='ING' AND complete_cnt > 0");
|
||||
|
||||
List<AnalCntInfo> analList = repository.findAnalCntInfoList();
|
||||
|
||||
log.info("[Step 1/3] 조회 완료");
|
||||
log.info(" - 진행 중인 회차 수: {} 개", analList.size());
|
||||
|
||||
if (analList.isEmpty()) {
|
||||
log.warn("[경고] 처리할 분석 회차가 없습니다.");
|
||||
log.warn(" - 확인사항:");
|
||||
log.warn(" 1. tb_map_sheet_anal_inference 테이블의 anal_state='ING' 데이터 확인");
|
||||
log.warn(" 2. 검수 완료(COMPLETE) 건수가 있는지 확인");
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
// Step 2: 각 회차별 Child Job 실행
|
||||
log.info("[Step 2/3] 회차별 Child Job 실행 시작");
|
||||
|
||||
// 실행 통계를 위한 카운터
|
||||
int processedCount = 0; // 성공적으로 처리된 회차 수
|
||||
int skippedCount = 0; // 건너뛴 회차 수 (이미 처리 완료)
|
||||
int failedCount = 0; // 실패한 회차 수
|
||||
|
||||
// 각 분석 회차별로 Child Job 실행
|
||||
for (int i = 0; i < analList.size(); i++) {
|
||||
AnalCntInfo info = analList.get(i);
|
||||
|
||||
log.info("========================================");
|
||||
log.info("[회차 {}/{}] AnalCntInfo 처리 시작", i + 1, analList.size());
|
||||
log.info(" - AnalUid: {}", info.getAnalUid());
|
||||
log.info(" - ResultUid: {}", info.getResultUid());
|
||||
log.info(" - 전체 건수(all_cnt): {}", info.getAllCnt());
|
||||
log.info(" - 검수 완료 건수(complete_cnt): {}", info.getCompleteCnt());
|
||||
log.info(" - 파일 생성 완료 건수(file_cnt): {}", info.getFileCnt());
|
||||
|
||||
// 처리 필요 여부 판단: all_cnt == file_cnt면 이미 모든 파일이 생성됨
|
||||
if (Objects.equals(info.getAllCnt(), info.getFileCnt())) {
|
||||
log.info("[건너뜀] 모든 파일이 이미 처리 완료됨 (all_cnt={}, file_cnt={})",
|
||||
info.getAllCnt(), info.getFileCnt());
|
||||
log.info(" - 재처리가 필요한 경우 file_create_yn 플래그를 초기화하세요.");
|
||||
skippedCount++;
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
// Child Job Parameters 생성
|
||||
JobParameters jobParameters =
|
||||
new JobParametersBuilder()
|
||||
.addLong("analUid", info.getAnalUid())
|
||||
.addString("resultUid", info.getResultUid())
|
||||
.addLong("timestamp", System.currentTimeMillis()) // JobInstance 고유성 보장
|
||||
.toJobParameters();
|
||||
|
||||
log.info("[Child Job 실행] processAnalCntInfoJob 시작...");
|
||||
log.info(" - JobParameters: analUid={}, resultUid={}", info.getAnalUid(),
|
||||
info.getResultUid());
|
||||
|
||||
// Child Job 실행 (동기 방식)
|
||||
// 내부적으로 makeGeoJsonStep → dockerRunStep → zipResponseStep 순차 실행
|
||||
long startTime = System.currentTimeMillis();
|
||||
jobLauncher.run(processAnalCntInfoJob, jobParameters);
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
|
||||
log.info("[Child Job 완료] ✓ 정상 종료");
|
||||
log.info(" - AnalUid: {}", info.getAnalUid());
|
||||
log.info(" - ResultUid: {}", info.getResultUid());
|
||||
log.info(" - 실행 시간: {} ms ({} 초)", duration, duration / 1000);
|
||||
|
||||
processedCount++;
|
||||
|
||||
} catch (Exception e) {
|
||||
// Child Job 실행 실패 시 (Step 실패 또는 예외 발생)
|
||||
log.error("[Child Job 실패] ✗ 실행 중 오류 발생", e);
|
||||
log.error(" - AnalUid: {}", info.getAnalUid());
|
||||
log.error(" - ResultUid: {}", info.getResultUid());
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. batch_step_history 테이블에서 실패한 Step 확인");
|
||||
log.error(" 2. error_message 컬럼에서 상세 에러 내용 확인");
|
||||
|
||||
failedCount++;
|
||||
|
||||
// 한 회차 실패해도 다음 회차 계속 처리 (부분 실패 허용)
|
||||
log.info("[계속 진행] 다음 회차 처리를 계속합니다.");
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: 최종 통계 및 결과 로깅
|
||||
log.info("========================================");
|
||||
log.info("[Step 3/3] Parent Job 실행 결과 요약");
|
||||
log.info(" - 총 회차 수: {} 개", analList.size());
|
||||
log.info(" - 성공: {} 개", processedCount);
|
||||
log.info(" - 건너뜀: {} 개", skippedCount);
|
||||
log.info(" - 실패: {} 개", failedCount);
|
||||
|
||||
// 성공률 계산
|
||||
if (analList.size() > 0) {
|
||||
double successRate =
|
||||
(double) processedCount / (analList.size() - skippedCount) * 100;
|
||||
log.info(" - 성공률: {}% (건너뛴 회차 제외)", String.format("%.2f", successRate));
|
||||
}
|
||||
|
||||
log.info("========================================");
|
||||
|
||||
// 실패 정책 처리
|
||||
if (failedCount > 0) {
|
||||
log.warn("[경고] {} 개의 Child Job 실행이 실패했습니다.", failedCount);
|
||||
log.warn(" - 실패 상세 내용은 batch_step_history 테이블을 확인하세요.");
|
||||
log.warn(" - SQL: SELECT * FROM batch_step_history WHERE status='FAILED' ORDER BY"
|
||||
+ " started_dttm DESC;");
|
||||
|
||||
// 실패가 있어도 Parent Job은 성공으로 처리 (부분 성공 정책)
|
||||
// 만약 하나라도 실패하면 Parent Job도 실패로 처리하려면 아래 주석 해제
|
||||
// throw new RuntimeException(
|
||||
// String.format("%d 개의 Child Job 실행이 실패했습니다. (성공: %d, 실패: %d)",
|
||||
// failedCount, processedCount, failedCount));
|
||||
} else {
|
||||
log.info("[완료] 모든 Child Job이 정상적으로 완료되었습니다.");
|
||||
}
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,205 @@
|
||||
package com.kamco.cd.geojsonscheduler.batch;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalMapSheetList;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData.GeoJsonFeature;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.FeatureCollection;
|
||||
import com.kamco.cd.geojsonscheduler.repository.TrainingDataReviewJobRepository;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* GeoJSON 파일 생성 Tasklet
|
||||
*
|
||||
* <p>검수 완료된 라벨링 데이터를 GeoJSON 형식으로 변환하여 파일로 저장합니다. 각 도엽(Map Sheet)별로 별도의 GeoJSON 파일을
|
||||
* 생성하며, 파일 생성 완료 후 DB에 플래그를 업데이트합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>검수 완료된 도엽 목록 조회
|
||||
* <li>도엽별 라벨링 데이터 조회 및 GeoJSON Feature 변환
|
||||
* <li>GeoJSON 파일 생성 (/dataset/request/{resultUid}/*.geojson)
|
||||
* <li>DB에 파일 생성 완료 플래그 업데이트
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>파일 명명 규칙:</b> {resultUid_8자}_{compareYyyy}_{targetYyyy}_{mapSheetNum}_D15.geojson
|
||||
*
|
||||
* <p><b>예시:</b> ED80D700_2022_2023_3724036_D15.geojson
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class MakeGeoJsonTasklet implements Tasklet {
|
||||
|
||||
/** 라벨링 데이터 조회를 위한 Repository */
|
||||
private final TrainingDataReviewJobRepository repository;
|
||||
|
||||
/** GeoJSON 파일이 저장될 베이스 디렉토리 경로 (예: /kamco-nfs/dataset) */
|
||||
@Value("${training-data.geojson-dir}")
|
||||
private String trainingDataDir;
|
||||
|
||||
/** Job Parameter로 전달받은 분석 회차 UID */
|
||||
@Value("#{jobParameters['analUid']}")
|
||||
private Long analUid;
|
||||
|
||||
/** Job Parameter로 전달받은 결과물 고유 ID (UUID) */
|
||||
@Value("#{jobParameters['resultUid']}")
|
||||
private String resultUid;
|
||||
|
||||
/**
|
||||
* GeoJSON 파일 생성 작업 실행
|
||||
*
|
||||
* <p>검수 완료된 라벨링 데이터를 조회하여 도엽별로 GeoJSON 파일을 생성합니다.
|
||||
*
|
||||
* @param contribution Step 실행 정보를 담는 객체
|
||||
* @param chunkContext Chunk 실행 컨텍스트
|
||||
* @return RepeatStatus.FINISHED - 작업 완료
|
||||
* @throws RuntimeException 생성된 GeoJSON 파일이 없는 경우 또는 파일 생성 실패 시
|
||||
*/
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("GeoJSON 생성 시작 (AnalUid={}, ResultUid={})", analUid, resultUid);
|
||||
log.info("========================================");
|
||||
|
||||
// Step 1: 검수 완료된 도엽 목록 조회
|
||||
log.info("[Step 1/4] 검수 완료된 도엽 목록 조회 중... (AnalUid={})", analUid);
|
||||
List<AnalMapSheetList> analMapList = repository.findCompletedAnalMapSheetList(analUid);
|
||||
log.info("[Step 1/4] 검수 완료된 도엽 수: {}", analMapList.size());
|
||||
|
||||
// 검수 완료된 도엽이 없으면 작업 종료
|
||||
if (analMapList.isEmpty()) {
|
||||
log.warn("[경고] 검수 완료된 도엽이 없음. 작업을 건너뜁니다.");
|
||||
log.warn(" - 확인사항: tb_labeling_assignment 테이블의 inspect_state='COMPLETE' 데이터가 있는지 확인");
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
// GeoJSON 파일 생성 통계를 위한 카운터
|
||||
int processedMapSheetCount = 0; // 처리된 도엽 수
|
||||
int totalGeoJsonFiles = 0; // 생성된 GeoJSON 파일 수
|
||||
|
||||
// Step 2: 각 도엽별로 GeoJSON 파일 생성
|
||||
log.info("[Step 2/4] 도엽별 GeoJSON 파일 생성 시작 (총 {} 개 도엽)", analMapList.size());
|
||||
|
||||
for (AnalMapSheetList mapSheet : analMapList) {
|
||||
log.info("----------------------------------------");
|
||||
log.info(" [도엽 처리] MapSheetNum={}", mapSheet.getMapSheetNum());
|
||||
|
||||
// Step 2-1: 도엽별 검수 완료된 라벨링 데이터 조회
|
||||
log.info(" [2-1] 라벨링 데이터 조회 중...");
|
||||
List<CompleteLabelData> completeList =
|
||||
repository.findCompletedYesterdayLabelingList(analUid, mapSheet.getMapSheetNum());
|
||||
log.info(" [2-1] 조회 완료: {} 건", completeList.size());
|
||||
|
||||
// 라벨링 데이터가 없으면 다음 도엽으로
|
||||
if (completeList.isEmpty()) {
|
||||
log.info(" [건너뜀] 라벨링 데이터가 없어 GeoJSON 파일 생성을 건너뜁니다.");
|
||||
continue;
|
||||
}
|
||||
|
||||
// Step 2-2: GeoUID 목록 추출 (DB 업데이트용)
|
||||
log.info(" [2-2] GeoUID 목록 추출 중...");
|
||||
List<Long> geoUids = completeList.stream().map(CompleteLabelData::getGeoUid).toList();
|
||||
log.info(" [2-2] GeoUID 목록 생성 완료: {} 건", geoUids.size());
|
||||
|
||||
// Step 2-3: GeoJSON Feature 객체로 변환
|
||||
log.info(" [2-3] GeoJSON Feature 변환 중...");
|
||||
List<GeoJsonFeature> features = completeList.stream().map(GeoJsonFeature::from).toList();
|
||||
log.info(" [2-3] GeoJSON Feature 변환 완료: {} 개", features.size());
|
||||
|
||||
// Step 2-4: FeatureCollection 생성 및 파일명 결정
|
||||
log.info(" [2-4] FeatureCollection 생성 중...");
|
||||
FeatureCollection collection = new FeatureCollection(features);
|
||||
String filename = mapSheet.buildFilename(resultUid);
|
||||
log.info(" [2-4] GeoJSON 파일명: {}", filename);
|
||||
|
||||
// Step 2-5: 파일 저장 경로 생성
|
||||
// 형식: /kamco-nfs/dataset/request/{resultUid}/{filename}.geojson
|
||||
Path outputPath =
|
||||
Paths.get(
|
||||
trainingDataDir + File.separator + "request" + File.separator + resultUid, filename);
|
||||
log.info(" [2-5] 출력 경로: {}", outputPath);
|
||||
|
||||
try {
|
||||
// Step 2-6: 디렉토리 생성 (존재하지 않으면)
|
||||
log.info(" [2-6] 디렉토리 생성 중...");
|
||||
Files.createDirectories(outputPath.getParent());
|
||||
log.info(" [2-6] 디렉토리 생성 완료: {}", outputPath.getParent());
|
||||
|
||||
// Step 2-7: GeoJSON 파일 저장 (Pretty Print 포맷)
|
||||
log.info(" [2-7] GeoJSON 파일 저장 중...");
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.enable(SerializationFeature.INDENT_OUTPUT); // JSON 들여쓰기 적용
|
||||
objectMapper.writeValue(outputPath.toFile(), collection);
|
||||
log.info(" [2-7] ✓ GeoJSON 파일 저장 완료: {}", outputPath);
|
||||
|
||||
// Step 2-8: DB에 파일 생성 완료 플래그 업데이트
|
||||
log.info(" [2-8] DB 파일 생성 플래그 업데이트 중...");
|
||||
repository.updateLearnDataGeomFileCreateYn(geoUids);
|
||||
log.info(" [2-8] ✓ DB 업데이트 완료: {} 건", geoUids.size());
|
||||
|
||||
// 통계 카운터 증가
|
||||
processedMapSheetCount++;
|
||||
totalGeoJsonFiles++;
|
||||
|
||||
log.info(" [완료] 도엽 '{}' 처리 성공", mapSheet.getMapSheetNum());
|
||||
|
||||
} catch (IOException e) {
|
||||
// 파일 생성 실패 시 예외 발생 (Step 실패 처리)
|
||||
log.error(" [실패] GeoJSON 파일 생성 실패", e);
|
||||
log.error(" - 파일명: {}", filename);
|
||||
log.error(" - 경로: {}", outputPath);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
throw new RuntimeException("GeoJSON 파일 생성 실패: " + filename, e);
|
||||
}
|
||||
}
|
||||
|
||||
// Step 3: 처리 결과 요약 및 검증
|
||||
log.info("========================================");
|
||||
log.info("[Step 3/4] GeoJSON 생성 작업 완료");
|
||||
log.info(" - ResultUid: {}", resultUid);
|
||||
log.info(" - AnalUid: {}", analUid);
|
||||
log.info(" - 처리 대상 도엽 수: {}", analMapList.size());
|
||||
log.info(" - 처리 완료 도엽 수: {}", processedMapSheetCount);
|
||||
log.info(" - 생성된 GeoJSON 파일 수: {}", totalGeoJsonFiles);
|
||||
log.info("========================================");
|
||||
|
||||
// Step 4: 필수 검증 - 최소 1개 이상의 파일이 생성되어야 함
|
||||
log.info("[Step 4/4] 필수 검증 수행 중...");
|
||||
if (totalGeoJsonFiles == 0) {
|
||||
log.error("[실패] 생성된 GeoJSON 파일이 없습니다!");
|
||||
log.error(" - AnalUid: {}", analUid);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 조회된 도엽 수: {}", analMapList.size());
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. 각 도엽에 라벨링 데이터가 있는지 확인");
|
||||
log.error(" 2. findCompletedYesterdayLabelingList() 쿼리 결과 확인");
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"생성된 GeoJSON 파일이 없습니다. (AnalUid=%d, ResultUid=%s)", analUid, resultUid));
|
||||
}
|
||||
|
||||
log.info("[Step 4/4] ✓ 검증 완료: {} 개의 GeoJSON 파일이 정상적으로 생성되었습니다.", totalGeoJsonFiles);
|
||||
log.info("GeoJSON 파일 저장 위치: {}/request/{}/", trainingDataDir, resultUid);
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.kamco.cd.geojsonscheduler.batch;
|
||||
|
||||
import com.kamco.cd.geojsonscheduler.listener.StepHistoryListener;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
public class ProcessAnalCntInfoJobConfig {
|
||||
|
||||
private final JobRepository jobRepository;
|
||||
private final PlatformTransactionManager transactionManager;
|
||||
private final MakeGeoJsonTasklet makeGeoJsonTasklet;
|
||||
private final DockerRunTasklet dockerRunTasklet;
|
||||
private final ZipResponseTasklet zipResponseTasklet;
|
||||
private final StepHistoryListener stepHistoryListener;
|
||||
|
||||
@Bean
|
||||
public Job processAnalCntInfoJob() {
|
||||
return new JobBuilder("processAnalCntInfoJob", jobRepository)
|
||||
.start(makeGeoJsonStep())
|
||||
.next(dockerRunStep())
|
||||
.next(zipResponseStep())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step makeGeoJsonStep() {
|
||||
return new StepBuilder("makeGeoJsonStep", jobRepository)
|
||||
.tasklet(makeGeoJsonTasklet, transactionManager)
|
||||
.listener(stepHistoryListener)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step dockerRunStep() {
|
||||
return new StepBuilder("dockerRunStep", jobRepository)
|
||||
.tasklet(dockerRunTasklet, transactionManager)
|
||||
.listener(stepHistoryListener)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step zipResponseStep() {
|
||||
return new StepBuilder("zipResponseStep", jobRepository)
|
||||
.tasklet(zipResponseTasklet, transactionManager)
|
||||
.listener(stepHistoryListener)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package com.kamco.cd.geojsonscheduler.batch;
|
||||
|
||||
import com.kamco.cd.geojsonscheduler.listener.BatchHistoryListener;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.Step;
|
||||
import org.springframework.batch.core.job.builder.JobBuilder;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.batch.core.step.builder.StepBuilder;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.transaction.PlatformTransactionManager;
|
||||
|
||||
@Configuration
|
||||
@RequiredArgsConstructor
|
||||
public class TrainModelJobConfig {
|
||||
|
||||
private final JobRepository jobRepository;
|
||||
private final PlatformTransactionManager transactionManager;
|
||||
private final TrainModelTasklet trainModelTasklet;
|
||||
|
||||
@Bean
|
||||
public Job trainModelJob(BatchHistoryListener historyListener) {
|
||||
return new JobBuilder("trainModelJob", jobRepository)
|
||||
.listener(historyListener)
|
||||
.start(trainModelStep())
|
||||
.build();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Step trainModelStep() {
|
||||
return new StepBuilder("trainModelStep", jobRepository)
|
||||
.tasklet(trainModelTasklet, transactionManager)
|
||||
.build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
package com.kamco.cd.geojsonscheduler.batch;
|
||||
|
||||
import com.kamco.cd.geojsonscheduler.service.TrainDockerRunnerService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class TrainModelTasklet implements Tasklet {
|
||||
|
||||
private final TrainDockerRunnerService trainDockerRunnerService;
|
||||
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("학습 배치 작업 시작");
|
||||
log.info("========================================");
|
||||
|
||||
String jobName = chunkContext.getStepContext().getJobName();
|
||||
log.info("Job Name: {}", jobName);
|
||||
|
||||
// Job 파라미터에서 dataset-folder와 output-folder 가져오기
|
||||
String datasetFolder = (String) chunkContext.getStepContext()
|
||||
.getJobParameters()
|
||||
.get("dataset-folder");
|
||||
String outputFolder = (String) chunkContext.getStepContext()
|
||||
.getJobParameters()
|
||||
.get("output-folder");
|
||||
|
||||
log.info("Dataset Folder Parameter: {}", datasetFolder);
|
||||
log.info("Output Folder Parameter: {}", outputFolder);
|
||||
|
||||
if (datasetFolder == null || datasetFolder.isBlank()) {
|
||||
log.error("dataset-folder 파라미터가 없습니다!");
|
||||
throw new IllegalArgumentException("dataset-folder parameter is required");
|
||||
}
|
||||
|
||||
if (outputFolder == null || outputFolder.isBlank()) {
|
||||
log.error("output-folder 파라미터가 없습니다!");
|
||||
throw new IllegalArgumentException("output-folder parameter is required");
|
||||
}
|
||||
|
||||
// Train Docker 실행
|
||||
log.info("Train Docker 실행 중...");
|
||||
trainDockerRunnerService.runTraining(datasetFolder, outputFolder);
|
||||
log.info("Train Docker 실행 완료");
|
||||
|
||||
log.info("========================================");
|
||||
log.info("학습 배치 작업 완료");
|
||||
log.info("========================================");
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,264 @@
|
||||
package com.kamco.cd.geojsonscheduler.batch;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.zip.ZipEntry;
|
||||
import java.util.zip.ZipOutputStream;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 결과물 ZIP 압축 Tasklet
|
||||
*
|
||||
* <p>Docker 컨테이너 실행으로 생성된 학습 데이터 결과물을 ZIP 파일로 압축합니다. 압축된 파일은 다운로드 또는 배포를 위해
|
||||
* 사용됩니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>response/{resultUid}/ 디렉토리 검증
|
||||
* <li>디렉토리 내 모든 파일과 서브디렉토리 재귀적 압축
|
||||
* <li>압축 파일 생성: response/{resultUid}.zip
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>압축 설정:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Hidden 파일 제외
|
||||
* <li>디렉토리 구조 유지
|
||||
* <li>버퍼 크기: 1024 bytes
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>실행 조건:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>dockerRunStep이 성공적으로 완료되어야 함
|
||||
* <li>response/{resultUid}/ 디렉토리가 존재해야 함
|
||||
* <li>디렉토리 내에 최소 1개 이상의 파일이 존재해야 함
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class ZipResponseTasklet implements Tasklet {
|
||||
|
||||
/** 학습 데이터 저장 베이스 디렉토리 경로 (예: /kamco-nfs/dataset) */
|
||||
@Value("${training-data.geojson-dir}")
|
||||
private String trainingDataDir;
|
||||
|
||||
/** Job Parameter로 전달받은 결과물 고유 ID (UUID) */
|
||||
@Value("#{jobParameters['resultUid']}")
|
||||
private String resultUid;
|
||||
|
||||
/**
|
||||
* 결과물 압축 작업 실행
|
||||
*
|
||||
* <p>response/{resultUid}/ 디렉토리를 검증하고 ZIP 파일로 압축합니다.
|
||||
*
|
||||
* @param contribution Step 실행 정보를 담는 객체
|
||||
* @param chunkContext Chunk 실행 컨텍스트
|
||||
* @return RepeatStatus.FINISHED - 작업 완료
|
||||
* @throws RuntimeException response 디렉토리가 존재하지 않거나 압축 실패 시
|
||||
*/
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("결과물 압축 시작 (ResultUid={})", resultUid);
|
||||
log.info("========================================");
|
||||
|
||||
// Step 1: 압축 대상 디렉토리 및 출력 파일 경로 설정
|
||||
log.info("[Step 1/5] 경로 설정 중...");
|
||||
Path responseDir =
|
||||
Paths.get(trainingDataDir + File.separator + "response" + File.separator + resultUid);
|
||||
Path zipFile =
|
||||
Paths.get(
|
||||
trainingDataDir + File.separator + "response" + File.separator + resultUid + ".zip");
|
||||
|
||||
log.info("[Step 1/5] 경로 설정 완료");
|
||||
log.info(" - 압축 대상 디렉토리: {}", responseDir);
|
||||
log.info(" - 압축 파일 저장 경로: {}", zipFile);
|
||||
|
||||
// Step 2: response 디렉토리 존재 여부 검증
|
||||
log.info("[Step 2/5] Response 디렉토리 검증 중...");
|
||||
if (!Files.exists(responseDir)) {
|
||||
log.error("[실패] Response 디렉토리가 존재하지 않습니다!");
|
||||
log.error(" - 경로: {}", responseDir);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. dockerRunStep이 정상적으로 완료되었는지 확인");
|
||||
log.error(" 2. Docker 컨테이너가 결과물을 생성했는지 확인");
|
||||
log.error(" 3. Docker 볼륨 마운트 경로가 올바른지 확인");
|
||||
throw new RuntimeException("Response 디렉토리가 존재하지 않습니다: " + responseDir);
|
||||
}
|
||||
log.info("[Step 2/5] ✓ Response 디렉토리 존재 확인");
|
||||
|
||||
// Step 3: 디렉토리 내용 확인 (파일 수 카운트)
|
||||
log.info("[Step 3/5] 디렉토리 내용 분석 중...");
|
||||
File responseDirFile = responseDir.toFile();
|
||||
long fileCount = countFilesRecursively(responseDirFile);
|
||||
log.info("[Step 3/5] 디렉토리 분석 완료");
|
||||
log.info(" - 총 파일 수: {} 개", fileCount);
|
||||
|
||||
if (fileCount == 0) {
|
||||
log.warn("[경고] 압축할 파일이 없습니다. (디렉토리가 비어있음)");
|
||||
log.warn(" - 디렉토리: {}", responseDir);
|
||||
}
|
||||
|
||||
// Step 4: ZIP 압축 실행
|
||||
log.info("[Step 4/5] ZIP 압축 시작...");
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
zipDirectory(responseDirFile, zipFile.toFile());
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
long duration = endTime - startTime;
|
||||
long zipSize = Files.size(zipFile);
|
||||
double zipSizeMB = zipSize / (1024.0 * 1024.0);
|
||||
|
||||
log.info("[Step 4/5] ✓ ZIP 압축 완료");
|
||||
log.info(" - 압축 파일: {}", zipFile);
|
||||
log.info(" - 압축 파일 크기: {} bytes ({} MB)", zipSize, String.format("%.2f", zipSizeMB));
|
||||
log.info(" - 압축 소요 시간: {} ms", duration);
|
||||
|
||||
} catch (IOException e) {
|
||||
log.error("[실패] ZIP 압축 중 오류 발생!", e);
|
||||
log.error(" - 소스 디렉토리: {}", responseDir);
|
||||
log.error(" - 대상 ZIP 파일: {}", zipFile);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
throw new RuntimeException("Response 디렉토리 압축 실패: " + responseDir, e);
|
||||
}
|
||||
|
||||
// Step 5: 최종 검증 및 완료
|
||||
log.info("[Step 5/5] 최종 검증 중...");
|
||||
if (Files.exists(zipFile)) {
|
||||
log.info("[Step 5/5] ✓ ZIP 파일 생성 확인 완료");
|
||||
} else {
|
||||
log.error("[실패] ZIP 파일이 생성되지 않았습니다!");
|
||||
throw new RuntimeException("ZIP 파일 생성 실패: " + zipFile);
|
||||
}
|
||||
|
||||
log.info("========================================");
|
||||
log.info("결과물 압축 완료 (ResultUid={})", resultUid);
|
||||
log.info(" - ZIP 파일 위치: {}", zipFile);
|
||||
log.info("========================================");
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
|
||||
/**
|
||||
* 디렉토리를 ZIP 파일로 압축
|
||||
*
|
||||
* @param sourceDir 압축할 소스 디렉토리
|
||||
* @param zipFile 생성될 ZIP 파일
|
||||
* @throws IOException 압축 중 IO 오류 발생 시
|
||||
*/
|
||||
private void zipDirectory(File sourceDir, File zipFile) throws IOException {
|
||||
log.debug("ZIP 압축 시작: {} -> {}", sourceDir.getAbsolutePath(), zipFile.getAbsolutePath());
|
||||
|
||||
try (FileOutputStream fos = new FileOutputStream(zipFile);
|
||||
ZipOutputStream zos = new ZipOutputStream(fos)) {
|
||||
zipDirectoryRecursive(sourceDir, sourceDir.getName(), zos);
|
||||
}
|
||||
|
||||
log.debug("ZIP 압축 완료: {}", zipFile.getAbsolutePath());
|
||||
}
|
||||
|
||||
/**
|
||||
* 디렉토리를 재귀적으로 ZIP으로 압축
|
||||
*
|
||||
* <p>서브디렉토리를 포함한 모든 파일과 디렉토리를 ZIP에 추가합니다. Hidden 파일은 제외됩니다.
|
||||
*
|
||||
* @param fileToZip 압축할 파일 또는 디렉토리
|
||||
* @param fileName ZIP 엔트리 이름
|
||||
* @param zos ZipOutputStream
|
||||
* @throws IOException 압축 중 IO 오류 발생 시
|
||||
*/
|
||||
private void zipDirectoryRecursive(File fileToZip, String fileName, ZipOutputStream zos)
|
||||
throws IOException {
|
||||
|
||||
// Hidden 파일은 제외
|
||||
if (fileToZip.isHidden()) {
|
||||
log.debug("Hidden 파일 제외: {}", fileToZip.getName());
|
||||
return;
|
||||
}
|
||||
|
||||
// 디렉토리인 경우
|
||||
if (fileToZip.isDirectory()) {
|
||||
log.debug("디렉토리 추가: {}", fileName);
|
||||
|
||||
// 디렉토리 엔트리 생성
|
||||
if (fileName.endsWith("/")) {
|
||||
zos.putNextEntry(new ZipEntry(fileName));
|
||||
zos.closeEntry();
|
||||
} else {
|
||||
zos.putNextEntry(new ZipEntry(fileName + "/"));
|
||||
zos.closeEntry();
|
||||
}
|
||||
|
||||
// 하위 파일 및 디렉토리 재귀 처리
|
||||
File[] children = fileToZip.listFiles();
|
||||
if (children != null) {
|
||||
for (File childFile : children) {
|
||||
zipDirectoryRecursive(childFile, fileName + "/" + childFile.getName(), zos);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// 파일인 경우: 파일 내용을 ZIP에 추가
|
||||
log.debug("파일 추가: {}", fileName);
|
||||
try (FileInputStream fis = new FileInputStream(fileToZip)) {
|
||||
ZipEntry zipEntry = new ZipEntry(fileName);
|
||||
zos.putNextEntry(zipEntry);
|
||||
|
||||
// 파일 내용을 버퍼를 통해 복사
|
||||
byte[] buffer = new byte[1024];
|
||||
int length;
|
||||
while ((length = fis.read(buffer)) >= 0) {
|
||||
zos.write(buffer, 0, length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 디렉토리 내 파일 수를 재귀적으로 카운트
|
||||
*
|
||||
* @param directory 카운트할 디렉토리
|
||||
* @return 디렉토리 내 총 파일 수 (서브디렉토리 포함)
|
||||
*/
|
||||
private long countFilesRecursively(File directory) {
|
||||
if (!directory.isDirectory()) {
|
||||
return directory.isFile() ? 1 : 0;
|
||||
}
|
||||
|
||||
File[] children = directory.listFiles();
|
||||
if (children == null) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
long count = 0;
|
||||
for (File child : children) {
|
||||
if (child.isFile()) {
|
||||
count++;
|
||||
} else if (child.isDirectory()) {
|
||||
count += countFilesRecursively(child);
|
||||
}
|
||||
}
|
||||
return count;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package com.kamco.cd.geojsonscheduler.config;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@ConfigurationProperties(prefix = "train-data.docker")
|
||||
public class TrainDockerProperties {
|
||||
|
||||
private String image;
|
||||
private String dataVolume;
|
||||
private String checkpointsVolume;
|
||||
private String datasetFolder;
|
||||
private String outputFolder;
|
||||
private String inputSize;
|
||||
private String cropSize;
|
||||
private int batchSize;
|
||||
private String gpuIds;
|
||||
private int gpus;
|
||||
private String lr;
|
||||
private String backbone;
|
||||
private int epochs;
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
package com.kamco.cd.geojsonscheduler.listener;
|
||||
|
||||
import com.kamco.cd.geojsonscheduler.repository.BatchStepHistoryRepository;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.batch.core.ExitStatus;
|
||||
import org.springframework.batch.core.StepExecution;
|
||||
import org.springframework.batch.core.StepExecutionListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Step 실행 이력 Listener
|
||||
*
|
||||
* <p>각 Step의 시작과 종료 시점에 실행되어 batch_step_history 테이블에 실행 이력을 기록합니다. Step 시작 시 STARTED
|
||||
* 상태로 INSERT하고, Step 종료 시 SUCCESS 또는 FAILED 상태로 UPDATE합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Step 시작 시 DB에 STARTED 상태 기록
|
||||
* <li>Step 종료 시 DB에 SUCCESS 또는 FAILED 상태 업데이트
|
||||
* <li>실패 시 에러 메시지 자동 기록
|
||||
* <li>리스너 자체 오류가 Step 실행에 영향을 주지 않도록 예외 처리
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>적용 대상:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>makeGeoJsonStep
|
||||
* <li>dockerRunStep
|
||||
* <li>zipResponseStep
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>필수 JobParameters:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>analUid (Long): 분석 회차 UID
|
||||
* <li>resultUid (String): 결과물 고유 ID
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
* @see BatchStepHistoryRepository
|
||||
*/
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class StepHistoryListener implements StepExecutionListener {
|
||||
|
||||
/** Step 이력 DB 저장을 위한 Repository */
|
||||
private final BatchStepHistoryRepository batchStepHistoryRepository;
|
||||
|
||||
@Override
|
||||
public void beforeStep(StepExecution stepExecution) {
|
||||
log.info("=========================================================");
|
||||
log.info("Step 시작 - StepHistoryListener");
|
||||
log.info("=========================================================");
|
||||
|
||||
String stepName = stepExecution.getStepName();
|
||||
log.info("Step Name: {}", stepName);
|
||||
|
||||
try {
|
||||
Long analUid = stepExecution.getJobParameters().getLong("analUid");
|
||||
String resultUid = stepExecution.getJobParameters().getString("resultUid");
|
||||
|
||||
if (analUid == null || resultUid == null) {
|
||||
log.warn(
|
||||
"JobParameters에 analUid 또는 resultUid가 없어 Step 이력을 기록할 수 없습니다.");
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("AnalUid: {}, ResultUid: {}", analUid, resultUid);
|
||||
|
||||
// Step 시작 기록
|
||||
batchStepHistoryRepository.startStep(analUid, resultUid, stepName);
|
||||
log.info("Step 시작 기록 저장 완료");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Step 시작 기록 저장 실패: {}", e.getMessage(), e);
|
||||
// 리스너 오류가 Step 실행을 방해하지 않도록 예외를 던지지 않음
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExitStatus afterStep(StepExecution stepExecution) {
|
||||
log.info("=========================================================");
|
||||
log.info("Step 종료 - StepHistoryListener");
|
||||
log.info("=========================================================");
|
||||
|
||||
String stepName = stepExecution.getStepName();
|
||||
log.info("Step Name: {}", stepName);
|
||||
log.info("Step Exit Status: {}", stepExecution.getExitStatus());
|
||||
|
||||
try {
|
||||
Long analUid = stepExecution.getJobParameters().getLong("analUid");
|
||||
String resultUid = stepExecution.getJobParameters().getString("resultUid");
|
||||
|
||||
if (analUid == null || resultUid == null) {
|
||||
log.warn(
|
||||
"JobParameters에 analUid 또는 resultUid가 없어 Step 이력을 기록할 수 없습니다.");
|
||||
return stepExecution.getExitStatus();
|
||||
}
|
||||
|
||||
log.info("AnalUid: {}, ResultUid: {}", analUid, resultUid);
|
||||
|
||||
// Step 성공 여부 판단
|
||||
boolean isSuccess = ExitStatus.COMPLETED.equals(stepExecution.getExitStatus());
|
||||
log.info("Step 성공 여부: {}", isSuccess ? "성공" : "실패");
|
||||
|
||||
if (isSuccess) {
|
||||
// Step 성공 기록
|
||||
batchStepHistoryRepository.finishStepSuccess(analUid, resultUid, stepName);
|
||||
log.info("Step 성공 기록 저장 완료");
|
||||
} else {
|
||||
// Step 실패 기록
|
||||
String errorMessage = buildErrorMessage(stepExecution);
|
||||
batchStepHistoryRepository.finishStepFailed(analUid, resultUid, stepName, errorMessage);
|
||||
log.info("Step 실패 기록 저장 완료");
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("Step 종료 기록 저장 실패: {}", e.getMessage(), e);
|
||||
// 리스너 오류가 Step 실행을 방해하지 않도록 예외를 던지지 않음
|
||||
}
|
||||
|
||||
log.info("=========================================================");
|
||||
return stepExecution.getExitStatus();
|
||||
}
|
||||
|
||||
/**
|
||||
* Step 실행 실패 시 에러 메시지 생성
|
||||
*/
|
||||
private String buildErrorMessage(StepExecution stepExecution) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
sb.append("ExitStatus: ").append(stepExecution.getExitStatus()).append("\n");
|
||||
|
||||
if (!stepExecution.getFailureExceptions().isEmpty()) {
|
||||
sb.append("Failure Exceptions:\n");
|
||||
for (Throwable t : stepExecution.getFailureExceptions()) {
|
||||
sb.append("- ").append(t.getClass().getSimpleName()).append(": ").append(t.getMessage())
|
||||
.append("\n");
|
||||
}
|
||||
}
|
||||
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,169 @@
|
||||
package com.kamco.cd.geojsonscheduler.repository;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
import java.time.LocalDateTime;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.stereotype.Repository;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
/**
|
||||
* Batch Step 실행 이력 Repository
|
||||
*
|
||||
* <p>각 AnalCntInfo의 Step별 실행 이력을 batch_step_history 테이블에 저장하고 조회합니다. Step 시작 시 STARTED
|
||||
* 상태로 INSERT하고, Step 종료 시 SUCCESS 또는 FAILED 상태로 UPDATE합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Step 시작 기록 (status=STARTED)
|
||||
* <li>Step 성공 기록 (status=SUCCESS)
|
||||
* <li>Step 실패 기록 (status=FAILED, error_message 포함)
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>테이블 구조:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>id: Step 이력 고유 ID (BIGSERIAL)
|
||||
* <li>anal_uid: 분석 UID
|
||||
* <li>result_uid: 결과 UID
|
||||
* <li>step_name: Step 이름 (makeGeoJsonStep/dockerRunStep/zipResponseStep)
|
||||
* <li>status: 상태 (STARTED/SUCCESS/FAILED)
|
||||
* <li>error_message: 에러 메시지 (실패 시)
|
||||
* <li>started_dttm: Step 시작 일시
|
||||
* <li>completed_dttm: Step 완료 일시
|
||||
* </ul>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Log4j2
|
||||
@Repository
|
||||
@RequiredArgsConstructor
|
||||
public class BatchStepHistoryRepository {
|
||||
|
||||
/** JDBC 쿼리 실행을 위한 Template */
|
||||
private final JdbcTemplate jdbcTemplate;
|
||||
|
||||
/**
|
||||
* Step 시작 기록
|
||||
*/
|
||||
@Transactional
|
||||
public void startStep(Long analUid, String resultUid, String stepName) {
|
||||
log.info("[BatchStepHistoryRepository] Step 시작 기록 저장");
|
||||
log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName);
|
||||
|
||||
String sql =
|
||||
"""
|
||||
INSERT INTO public.batch_step_history
|
||||
(anal_uid, result_uid, step_name, status, started_dttm, created_dttm, updated_dttm)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""";
|
||||
|
||||
Timestamp now = Timestamp.valueOf(LocalDateTime.now());
|
||||
|
||||
int rowsAffected =
|
||||
jdbcTemplate.update(
|
||||
sql,
|
||||
analUid,
|
||||
resultUid,
|
||||
stepName,
|
||||
"STARTED",
|
||||
now, // started_dttm
|
||||
now, // created_dttm
|
||||
now // updated_dttm
|
||||
);
|
||||
|
||||
log.info(
|
||||
"[BatchStepHistoryRepository] Step 시작 기록 저장 완료 ({} rows affected)", rowsAffected);
|
||||
}
|
||||
|
||||
/**
|
||||
* Step 성공 기록
|
||||
*/
|
||||
@Transactional
|
||||
public void finishStepSuccess(Long analUid, String resultUid, String stepName) {
|
||||
log.info("[BatchStepHistoryRepository] Step 성공 기록 업데이트");
|
||||
log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName);
|
||||
|
||||
String sql =
|
||||
"""
|
||||
UPDATE public.batch_step_history
|
||||
SET status = ?,
|
||||
completed_dttm = ?,
|
||||
updated_dttm = ?
|
||||
WHERE anal_uid = ?
|
||||
AND result_uid = ?
|
||||
AND step_name = ?
|
||||
AND status = 'STARTED'
|
||||
""";
|
||||
|
||||
Timestamp now = Timestamp.valueOf(LocalDateTime.now());
|
||||
|
||||
int rowsAffected =
|
||||
jdbcTemplate.update(sql, "SUCCESS", now, now, analUid, resultUid, stepName);
|
||||
|
||||
if (rowsAffected > 0) {
|
||||
log.info(
|
||||
"[BatchStepHistoryRepository] Step 성공 기록 업데이트 완료 ({} rows affected)",
|
||||
rowsAffected);
|
||||
} else {
|
||||
log.warn(
|
||||
"[BatchStepHistoryRepository] 업데이트된 row가 없습니다. AnalUid: {}, ResultUid: {},"
|
||||
+ " StepName: {}",
|
||||
analUid,
|
||||
resultUid,
|
||||
stepName);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Step 실패 기록
|
||||
*/
|
||||
@Transactional
|
||||
public void finishStepFailed(
|
||||
Long analUid, String resultUid, String stepName, String errorMessage) {
|
||||
log.info("[BatchStepHistoryRepository] Step 실패 기록 업데이트");
|
||||
log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName);
|
||||
log.info(" ErrorMessage: {}", errorMessage);
|
||||
|
||||
String sql =
|
||||
"""
|
||||
UPDATE public.batch_step_history
|
||||
SET status = ?,
|
||||
error_message = ?,
|
||||
completed_dttm = ?,
|
||||
updated_dttm = ?
|
||||
WHERE anal_uid = ?
|
||||
AND result_uid = ?
|
||||
AND step_name = ?
|
||||
AND status = 'STARTED'
|
||||
""";
|
||||
|
||||
Timestamp now = Timestamp.valueOf(LocalDateTime.now());
|
||||
|
||||
// error_message는 최대 1000자로 제한
|
||||
String truncatedError =
|
||||
errorMessage != null && errorMessage.length() > 1000
|
||||
? errorMessage.substring(0, 1000)
|
||||
: errorMessage;
|
||||
|
||||
int rowsAffected =
|
||||
jdbcTemplate.update(
|
||||
sql, "FAILED", truncatedError, now, now, analUid, resultUid, stepName);
|
||||
|
||||
if (rowsAffected > 0) {
|
||||
log.info(
|
||||
"[BatchStepHistoryRepository] Step 실패 기록 업데이트 완료 ({} rows affected)",
|
||||
rowsAffected);
|
||||
} else {
|
||||
log.warn(
|
||||
"[BatchStepHistoryRepository] 업데이트된 row가 없습니다. AnalUid: {}, ResultUid: {},"
|
||||
+ " StepName: {}",
|
||||
analUid,
|
||||
resultUid,
|
||||
stepName);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -84,7 +84,7 @@ public class TrainingDataReviewJobRepository {
|
||||
SELECT
|
||||
mslg.geo_uid,
|
||||
'Feature' AS type,
|
||||
ST_AsGeoJSON(mslg.geom) AS geom_str,
|
||||
ST_AsGeoJSON(ST_Transform(mslg.geom, 4326)) AS geom_str,
|
||||
CASE
|
||||
WHEN mslg.class_after_cd IN ('building', 'container') THEN 'M1'
|
||||
WHEN mslg.class_after_cd = 'waste' THEN 'M2'
|
||||
|
||||
@@ -10,75 +10,206 @@ import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너 실행 서비스
|
||||
*
|
||||
* <p>학습 데이터 생성 파이프라인이 포함된 Docker 컨테이너를 실행하고 결과를 모니터링합니다. Docker 프로세스의 표준 출력을 실시간으로
|
||||
* 로깅하며, 비정상 종료 시 RuntimeException을 발생시켜 Batch Step을 실패 처리합니다.
|
||||
*
|
||||
* <p><b>주요 기능:</b>
|
||||
*
|
||||
* <ul>
|
||||
* <li>Docker run 명령어 생성 (볼륨 마운트, 환경 변수, 파라미터 설정)
|
||||
* <li>Docker 프로세스 실행 및 실시간 로그 출력
|
||||
* <li>Exit Code 검증 (0이 아닐 시 예외 발생)
|
||||
* <li>프로세스 인터럽트 처리
|
||||
* </ul>
|
||||
*
|
||||
* <p><b>Docker 명령어 구조:</b>
|
||||
*
|
||||
* <pre>
|
||||
* docker run --rm
|
||||
* --user {dockerUser}
|
||||
* -v {datasetVolume}
|
||||
* -v {imagesVolume}
|
||||
* --entrypoint python
|
||||
* {dockerImage}
|
||||
* code/kamco_full_pipeline.py
|
||||
* --labelling-folder request/{resultUid}
|
||||
* --output-folder response/{resultUid}
|
||||
* --input_root {inputRoot}
|
||||
* --output_root {outputRoot}
|
||||
* --patch_size {patchSize}
|
||||
* --overlap_pct {overlapPct}
|
||||
* --train_val_test_ratio {train} {val} {test}
|
||||
* --keep_empty_ratio {keepEmptyRatio}
|
||||
* </pre>
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
* @see DockerProperties
|
||||
*/
|
||||
@Log4j2
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class DockerRunnerService {
|
||||
|
||||
/** Docker 실행 관련 설정 정보 (application.yml) */
|
||||
private final DockerProperties dockerProperties;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너 실행 및 모니터링
|
||||
*
|
||||
* <p>학습 데이터 생성 파이프라인을 Docker 컨테이너로 실행합니다. 컨테이너의 표준 출력을 실시간으로 로깅하며, 비정상 종료 시
|
||||
* RuntimeException을 발생시켜 Step 실패로 처리합니다.
|
||||
*
|
||||
* @param resultUid 결과물 고유 ID (UUID)
|
||||
* @throws RuntimeException Docker 프로세스 실패 (exitCode != 0), IO 오류, 또는 인터럽트 발생 시
|
||||
*/
|
||||
public void run(String resultUid) {
|
||||
// Step 1: Docker 명령어 생성
|
||||
log.info("[Step 1/4] Docker 명령어 생성 중...");
|
||||
List<String> command = buildCommand(resultUid);
|
||||
log.info("Running docker command: {}", String.join(" ", command));
|
||||
log.info("[Step 1/4] Docker 명령어 생성 완료");
|
||||
log.debug(" - 명령어: {}", String.join(" ", command));
|
||||
|
||||
try {
|
||||
// Step 2: Docker 프로세스 시작
|
||||
log.info("[Step 2/4] Docker 프로세스 시작 중...");
|
||||
ProcessBuilder pb = new ProcessBuilder(command);
|
||||
pb.redirectErrorStream(true);
|
||||
pb.redirectErrorStream(true); // stderr를 stdout으로 리다이렉트
|
||||
Process process = pb.start();
|
||||
log.info("[Step 2/4] Docker 프로세스 시작 완료 (PID={})", process.pid());
|
||||
|
||||
// Step 3: Docker 프로세스 출력 실시간 로깅
|
||||
log.info("[Step 3/4] Docker 프로세스 출력 모니터링 중...");
|
||||
int lineCount = 0;
|
||||
try (BufferedReader reader =
|
||||
new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
log.info("[docker] {}", line);
|
||||
lineCount++;
|
||||
}
|
||||
}
|
||||
log.info("[Step 3/4] Docker 프로세스 출력 완료 (총 {} 라인)", lineCount);
|
||||
|
||||
// Step 4: Exit Code 검증
|
||||
log.info("[Step 4/4] Docker 프로세스 종료 대기 중...");
|
||||
int exitCode = process.waitFor();
|
||||
log.info("[Step 4/4] Docker 프로세스 종료 (exitCode={})", exitCode);
|
||||
|
||||
if (exitCode != 0) {
|
||||
log.error("Docker process exited with code {} for resultUid: {}", exitCode, resultUid);
|
||||
// Docker 프로세스 비정상 종료 (Step 실패 처리)
|
||||
log.error("[실패] Docker 프로세스가 비정상 종료되었습니다!");
|
||||
log.error(" - Exit Code: {}", exitCode);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 확인사항:");
|
||||
log.error(" 1. Docker 컨테이너 로그 확인 (위 [docker] 로그 참조)");
|
||||
log.error(" 2. request/{}/ 디렉토리에 GeoJSON 파일 확인", resultUid);
|
||||
log.error(" 3. Docker 이미지 및 볼륨 마운트 경로 확인");
|
||||
throw new RuntimeException(
|
||||
String.format(
|
||||
"Docker process failed with exit code %d for resultUid: %s",
|
||||
exitCode, resultUid));
|
||||
} else {
|
||||
log.info("Docker process completed successfully for resultUid: {}", resultUid);
|
||||
// Docker 프로세스 정상 종료
|
||||
log.info("[성공] Docker 프로세스가 정상 종료되었습니다.");
|
||||
log.info(" - ResultUid: {}", resultUid);
|
||||
log.info(" - 결과물 위치: /dataset/response/{}/", resultUid);
|
||||
}
|
||||
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to run docker command for resultUid {}: {}", resultUid, e.getMessage());
|
||||
// Docker 명령어 실행 실패 (파일 시스템 오류 등)
|
||||
log.error("[실패] Docker 명령어 실행 중 IO 오류 발생!", e);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
throw new RuntimeException("Failed to run docker command for resultUid: " + resultUid, e);
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Docker process interrupted for resultUid {}: {}", resultUid, e.getMessage());
|
||||
Thread.currentThread().interrupt();
|
||||
// Docker 프로세스 인터럽트 (사용자 취소 또는 시스템 종료)
|
||||
log.error("[인터럽트] Docker 프로세스가 중단되었습니다!", e);
|
||||
log.error(" - ResultUid: {}", resultUid);
|
||||
log.error(" - 에러 메시지: {}", e.getMessage());
|
||||
Thread.currentThread().interrupt(); // 인터럽트 상태 복원
|
||||
throw new RuntimeException("Docker process interrupted for resultUid: " + resultUid, e);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> buildCommand(String resultUid) {
|
||||
/**
|
||||
* Docker 명령어 생성
|
||||
*
|
||||
* <p>DockerProperties 설정 정보와 resultUid를 기반으로 Docker run 명령어를 생성합니다.
|
||||
*
|
||||
* @param resultUid 결과물 고유 ID (입력/출력 폴더 경로에 사용)
|
||||
* @return Docker 명령어 문자열 리스트 (ProcessBuilder 실행용)
|
||||
*/
|
||||
private List<String> cmd buildCommand(String resultUid) {
|
||||
log.debug("Docker 명령어 파라미터 구성 중...");
|
||||
|
||||
List<String> cmd = new ArrayList<>();
|
||||
|
||||
// Docker 기본 명령어
|
||||
cmd.add("docker");
|
||||
cmd.add("run");
|
||||
cmd.add("--rm");
|
||||
cmd.add("--rm"); // 컨테이너 종료 시 자동 삭제
|
||||
|
||||
// 사용자 및 권한 설정
|
||||
cmd.add("--user");
|
||||
cmd.add(dockerProperties.getUser());
|
||||
cmd.add(dockerProperties.getUser()); // 예: "1000:1000"
|
||||
log.debug(" - User: {}", dockerProperties.getUser());
|
||||
|
||||
// 볼륨 마운트 (호스트:컨테이너)
|
||||
cmd.add("-v");
|
||||
cmd.add(dockerProperties.getDatasetVolume());
|
||||
cmd.add(dockerProperties.getDatasetVolume()); // 예: "/kamco-nfs/dataset:/dataset"
|
||||
log.debug(" - Dataset Volume: {}", dockerProperties.getDatasetVolume());
|
||||
|
||||
cmd.add("-v");
|
||||
cmd.add(dockerProperties.getImagesVolume());
|
||||
cmd.add(dockerProperties.getImagesVolume()); // 예: "/kamco-nfs/images:/images"
|
||||
log.debug(" - Images Volume: {}", dockerProperties.getImagesVolume());
|
||||
|
||||
// Entrypoint 및 이미지
|
||||
cmd.add("--entrypoint");
|
||||
cmd.add("python");
|
||||
cmd.add(dockerProperties.getImage());
|
||||
cmd.add("python"); // Python으로 스크립트 실행
|
||||
cmd.add(dockerProperties.getImage()); // 예: "kamco/dataset-generator:latest"
|
||||
log.debug(" - Image: {}", dockerProperties.getImage());
|
||||
|
||||
// Python 스크립트 및 파라미터
|
||||
cmd.add("code/kamco_full_pipeline.py");
|
||||
|
||||
// 입출력 폴더 설정
|
||||
cmd.add("--labelling-folder");
|
||||
cmd.add("request/" + resultUid);
|
||||
log.debug(" - Labelling Folder: request/{}", resultUid);
|
||||
|
||||
cmd.add("--output-folder");
|
||||
cmd.add("response/" + resultUid);
|
||||
log.debug(" - Output Folder: response/{}", resultUid);
|
||||
|
||||
// 파이프라인 파라미터
|
||||
cmd.add("--input_root");
|
||||
cmd.add(dockerProperties.getInputRoot());
|
||||
|
||||
cmd.add("--output_root");
|
||||
cmd.add(dockerProperties.getOutputRoot());
|
||||
|
||||
cmd.add("--patch_size");
|
||||
cmd.add(String.valueOf(dockerProperties.getPatchSize()));
|
||||
log.debug(" - Patch Size: {}", dockerProperties.getPatchSize());
|
||||
|
||||
cmd.add("--overlap_pct");
|
||||
cmd.add(String.valueOf(dockerProperties.getOverlapPct()));
|
||||
log.debug(" - Overlap Percent: {}", dockerProperties.getOverlapPct());
|
||||
|
||||
cmd.add("--train_val_test_ratio");
|
||||
cmd.addAll(dockerProperties.getTrainValTestRatio());
|
||||
cmd.addAll(dockerProperties.getTrainValTestRatio()); // 예: ["0.7", "0.2", "0.1"]
|
||||
log.debug(" - Train/Val/Test Ratio: {}", dockerProperties.getTrainValTestRatio());
|
||||
|
||||
cmd.add("--keep_empty_ratio");
|
||||
cmd.add(String.valueOf(dockerProperties.getKeepEmptyRatio()));
|
||||
log.debug(" - Keep Empty Ratio: {}", dockerProperties.getKeepEmptyRatio());
|
||||
|
||||
log.debug("Docker 명령어 파라미터 구성 완료");
|
||||
return cmd;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
package com.kamco.cd.geojsonscheduler.service;
|
||||
|
||||
import com.kamco.cd.geojsonscheduler.config.TrainDockerProperties;
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Log4j2
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class TrainDockerRunnerService {
|
||||
|
||||
private final TrainDockerProperties trainDockerProperties;
|
||||
|
||||
public void runTraining(String datasetFolder, String outputFolder) {
|
||||
log.info("========================================");
|
||||
log.info("Train Docker 실행 시작");
|
||||
log.info("Dataset Folder: {}", datasetFolder);
|
||||
log.info("Output Folder: {}", outputFolder);
|
||||
log.info("========================================");
|
||||
|
||||
List<String> command = buildTrainCommand(datasetFolder, outputFolder);
|
||||
log.info("Docker 명령어: {}", String.join(" ", command));
|
||||
|
||||
try {
|
||||
ProcessBuilder pb = new ProcessBuilder(command);
|
||||
pb.redirectErrorStream(true);
|
||||
Process process = pb.start();
|
||||
|
||||
log.info("Docker 프로세스 시작됨 (Detached mode)");
|
||||
|
||||
try (BufferedReader reader =
|
||||
new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
log.info("[train-docker] {}", line);
|
||||
}
|
||||
}
|
||||
|
||||
int exitCode = process.waitFor();
|
||||
if (exitCode != 0) {
|
||||
log.error("Train Docker 프로세스 실패: exitCode={}, datasetFolder={}, outputFolder={}",
|
||||
exitCode, datasetFolder, outputFolder);
|
||||
} else {
|
||||
log.info("Train Docker 프로세스 시작 완료: datasetFolder={}, outputFolder={}",
|
||||
datasetFolder, outputFolder);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.error("Train Docker 실행 실패: datasetFolder={}, outputFolder={}, error={}",
|
||||
datasetFolder, outputFolder, e.getMessage(), e);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Train Docker 프로세스 중단: datasetFolder={}, outputFolder={}, error={}",
|
||||
datasetFolder, outputFolder, e.getMessage(), e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
|
||||
log.info("========================================");
|
||||
log.info("Train Docker 실행 완료");
|
||||
log.info("========================================");
|
||||
}
|
||||
|
||||
private List<String> buildTrainCommand(String datasetFolder, String outputFolder) {
|
||||
List<String> cmd = new ArrayList<>();
|
||||
cmd.add("docker");
|
||||
cmd.add("run");
|
||||
cmd.add("-d"); // detached mode
|
||||
cmd.add("--name");
|
||||
cmd.add("train-cd");
|
||||
cmd.add("--rm");
|
||||
cmd.add("--gpus");
|
||||
cmd.add("all");
|
||||
cmd.add("--ipc=host");
|
||||
cmd.add("--shm-size=16g");
|
||||
cmd.add("--ulimit");
|
||||
cmd.add("memlock=-1");
|
||||
cmd.add("--ulimit");
|
||||
cmd.add("stack=67108864");
|
||||
cmd.add("-e");
|
||||
cmd.add("NCCL_DEBUG=INFO");
|
||||
cmd.add("-e");
|
||||
cmd.add("NCCL_IB_DISABLE=1");
|
||||
cmd.add("-e");
|
||||
cmd.add("NCCL_P2P_DISABLE=0");
|
||||
cmd.add("-e");
|
||||
cmd.add("NCCL_SOCKET_IFNAME=eth0");
|
||||
cmd.add("-v");
|
||||
cmd.add(trainDockerProperties.getDataVolume());
|
||||
cmd.add("-v");
|
||||
cmd.add(trainDockerProperties.getCheckpointsVolume());
|
||||
cmd.add("-it");
|
||||
cmd.add(trainDockerProperties.getImage());
|
||||
cmd.add("python");
|
||||
cmd.add("/workspace/change-detection-code/train_wrapper.py");
|
||||
cmd.add("--dataset-folder");
|
||||
cmd.add(datasetFolder);
|
||||
cmd.add("--output-folder");
|
||||
cmd.add(outputFolder);
|
||||
cmd.add("--input-size");
|
||||
cmd.add(trainDockerProperties.getInputSize());
|
||||
cmd.add("--crop-size");
|
||||
cmd.add(trainDockerProperties.getCropSize());
|
||||
cmd.add("--batch-size");
|
||||
cmd.add(String.valueOf(trainDockerProperties.getBatchSize()));
|
||||
cmd.add("--gpu-ids");
|
||||
cmd.add(trainDockerProperties.getGpuIds());
|
||||
cmd.add("--gpus");
|
||||
cmd.add(String.valueOf(trainDockerProperties.getGpus()));
|
||||
cmd.add("--lr");
|
||||
cmd.add(trainDockerProperties.getLr());
|
||||
cmd.add("--backbone");
|
||||
cmd.add(trainDockerProperties.getBackbone());
|
||||
cmd.add("--epochs");
|
||||
cmd.add(String.valueOf(trainDockerProperties.getEpochs()));
|
||||
return cmd;
|
||||
}
|
||||
}
|
||||
@@ -23,3 +23,36 @@ COMMENT ON COLUMN public.batch_history.created_dttm IS '생성 일시';
|
||||
COMMENT ON COLUMN public.batch_history.updated_dttm IS '수정 일시';
|
||||
COMMENT ON COLUMN public.batch_history.status IS '상태 (STARTED/COMPLETED/FAILED)';
|
||||
COMMENT ON COLUMN public.batch_history.completed_dttm IS '완료 일시';
|
||||
|
||||
-- batch_step_history 테이블 생성
|
||||
CREATE TABLE IF NOT EXISTS public.batch_step_history (
|
||||
id BIGSERIAL PRIMARY KEY,
|
||||
anal_uid BIGINT NOT NULL,
|
||||
result_uid VARCHAR(255) NOT NULL,
|
||||
step_name VARCHAR(100) NOT NULL,
|
||||
status VARCHAR(50) NOT NULL,
|
||||
error_message TEXT,
|
||||
started_dttm TIMESTAMP NOT NULL,
|
||||
completed_dttm TIMESTAMP,
|
||||
created_dttm TIMESTAMP NOT NULL,
|
||||
updated_dttm TIMESTAMP NOT NULL
|
||||
);
|
||||
|
||||
-- 인덱스 생성
|
||||
CREATE INDEX IF NOT EXISTS idx_batch_step_history_anal_uid ON public.batch_step_history(anal_uid);
|
||||
CREATE INDEX IF NOT EXISTS idx_batch_step_history_result_uid ON public.batch_step_history(result_uid);
|
||||
CREATE INDEX IF NOT EXISTS idx_batch_step_history_status ON public.batch_step_history(status);
|
||||
CREATE INDEX IF NOT EXISTS idx_batch_step_history_step_name ON public.batch_step_history(step_name);
|
||||
|
||||
-- 코멘트
|
||||
COMMENT ON TABLE public.batch_step_history IS '배치 Step 실행 이력';
|
||||
COMMENT ON COLUMN public.batch_step_history.id IS 'Step 이력 고유 ID';
|
||||
COMMENT ON COLUMN public.batch_step_history.anal_uid IS '분석 UID';
|
||||
COMMENT ON COLUMN public.batch_step_history.result_uid IS '결과 UID';
|
||||
COMMENT ON COLUMN public.batch_step_history.step_name IS 'Step 이름 (makeGeoJsonStep/dockerRunStep/zipResponseStep)';
|
||||
COMMENT ON COLUMN public.batch_step_history.status IS '상태 (STARTED/SUCCESS/FAILED)';
|
||||
COMMENT ON COLUMN public.batch_step_history.error_message IS '에러 메시지';
|
||||
COMMENT ON COLUMN public.batch_step_history.started_dttm IS 'Step 시작 일시';
|
||||
COMMENT ON COLUMN public.batch_step_history.completed_dttm IS 'Step 완료 일시';
|
||||
COMMENT ON COLUMN public.batch_step_history.created_dttm IS '생성 일시';
|
||||
COMMENT ON COLUMN public.batch_step_history.updated_dttm IS '수정 일시';
|
||||
|
||||
Reference in New Issue
Block a user