Split the function

This commit is contained in:
2026-02-09 18:29:35 +09:00
parent 48369486a3
commit d0a6b88eba
14 changed files with 953 additions and 9 deletions

View File

@@ -0,0 +1,37 @@
package com.kamco.cd.geojsonscheduler.batch;
import com.kamco.cd.geojsonscheduler.service.DockerRunnerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@RequiredArgsConstructor
public class DockerRunTasklet implements Tasklet {
private final DockerRunnerService dockerRunnerService;
@Value("#{jobParameters['resultUid']}")
private String resultUid;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
log.info("========================================");
log.info("Docker 컨테이너 실행 시작 (ResultUid={})", resultUid);
log.info("========================================");
dockerRunnerService.run(resultUid);
log.info("========================================");
log.info("Docker 컨테이너 실행 완료 (ResultUid={})", resultUid);
log.info("========================================");
return RepeatStatus.FINISHED;
}
}

View File

@@ -4,7 +4,6 @@ import com.kamco.cd.geojsonscheduler.listener.BatchHistoryListener;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job; import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step; import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder; import org.springframework.batch.core.step.builder.StepBuilder;
@@ -18,20 +17,20 @@ public class ExportGeoJsonJobConfig {
private final JobRepository jobRepository; private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager; private final PlatformTransactionManager transactionManager;
private final ExportGeoJsonTasklet exportGeoJsonTasklet; private final LaunchChildJobsTasklet launchChildJobsTasklet;
@Bean @Bean
public Job exportGeoJsonJob(BatchHistoryListener historyListener) { // 1. 리스너 주입 받기 public Job exportGeoJsonJob(BatchHistoryListener historyListener) {
return new JobBuilder("exportGeoJsonJob", jobRepository) return new JobBuilder("exportGeoJsonJob", jobRepository)
.listener(historyListener) // 2. 리스너 등록 .listener(historyListener)
.start(exportGeoJsonStep()) .start(launchChildJobsStep())
.build(); .build();
} }
@Bean @Bean
public Step exportGeoJsonStep() { public Step launchChildJobsStep() {
return new StepBuilder("exportGeoJsonStep", jobRepository) return new StepBuilder("launchChildJobsStep", jobRepository)
.tasklet(exportGeoJsonTasklet, transactionManager) .tasklet(launchChildJobsTasklet, transactionManager)
.build(); .build();
} }
} }

View File

@@ -0,0 +1,94 @@
package com.kamco.cd.geojsonscheduler.batch;
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalCntInfo;
import com.kamco.cd.geojsonscheduler.repository.TrainingDataReviewJobRepository;
import java.util.List;
import java.util.Objects;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@RequiredArgsConstructor
public class LaunchChildJobsTasklet implements Tasklet {
private final TrainingDataReviewJobRepository repository;
private final JobLauncher jobLauncher;
@Qualifier("processAnalCntInfoJob")
private final Job processAnalCntInfoJob;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
log.info("========================================");
log.info("Parent Job 시작: AnalCntInfo 리스트 조회 및 Child Job 실행");
log.info("========================================");
// 진행중인 회차 중, complete_cnt 가 존재하는 회차 목록 가져오기
log.info("진행중인 회차 목록 조회 중...");
List<AnalCntInfo> analList = repository.findAnalCntInfoList();
log.info("진행중인 회차 수: {}", analList.size());
int processedCount = 0;
int skippedCount = 0;
int failedCount = 0;
for (AnalCntInfo info : analList) {
log.info("----------------------------------------");
log.info("회차 검토: AnalUid={}, ResultUid={}", info.getAnalUid(), info.getResultUid());
log.info("전체 건수: {}, 파일 건수: {}", info.getAllCnt(), info.getFileCnt());
if (Objects.equals(info.getAllCnt(), info.getFileCnt())) {
log.info("모든 파일이 이미 처리됨. 건너뜀.");
skippedCount++;
continue;
}
try {
// Child Job 실행
JobParameters jobParameters =
new JobParametersBuilder()
.addLong("analUid", info.getAnalUid())
.addString("resultUid", info.getResultUid())
.addLong("timestamp", System.currentTimeMillis()) // 고유성 보장
.toJobParameters();
log.info("Child Job 실행 중... (AnalUid={}, ResultUid={})", info.getAnalUid(), info.getResultUid());
jobLauncher.run(processAnalCntInfoJob, jobParameters);
log.info("Child Job 실행 완료 (AnalUid={}, ResultUid={})", info.getAnalUid(), info.getResultUid());
processedCount++;
} catch (Exception e) {
log.error("Child Job 실행 실패 (AnalUid={}, ResultUid={}): {}", info.getAnalUid(), info.getResultUid(), e.getMessage(), e);
failedCount++;
// 한 개 실패해도 계속 진행
}
}
log.info("========================================");
log.info("Parent Job 완료");
log.info(" 총 회차 수: {}", analList.size());
log.info(" 성공: {}", processedCount);
log.info(" 건너뜀: {}", skippedCount);
log.info(" 실패: {}", failedCount);
log.info("========================================");
if (failedCount > 0) {
log.warn("{} 개의 Child Job 실행이 실패했습니다.", failedCount);
// 실패가 있어도 Parent Job은 성공으로 처리 (부분 성공)
// 만약 하나라도 실패하면 Parent Job도 실패로 처리하려면 아래 주석 해제
// throw new RuntimeException(failedCount + " 개의 Child Job 실행이 실패했습니다.");
}
return RepeatStatus.FINISHED;
}
}

View File

@@ -0,0 +1,119 @@
package com.kamco.cd.geojsonscheduler.batch;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalMapSheetList;
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData;
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData.GeoJsonFeature;
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.FeatureCollection;
import com.kamco.cd.geojsonscheduler.repository.TrainingDataReviewJobRepository;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@RequiredArgsConstructor
public class MakeGeoJsonTasklet implements Tasklet {
private final TrainingDataReviewJobRepository repository;
@Value("${training-data.geojson-dir}")
private String trainingDataDir;
@Value("#{jobParameters['analUid']}")
private Long analUid;
@Value("#{jobParameters['resultUid']}")
private String resultUid;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
log.info("========================================");
log.info("GeoJSON 생성 시작 (AnalUid={}, ResultUid={})", analUid, resultUid);
log.info("========================================");
// 검수 완료된 도엽 목록 조회
log.info("검수 완료된 도엽 목록 조회 중... (AnalUid={})", analUid);
List<AnalMapSheetList> analMapList = repository.findCompletedAnalMapSheetList(analUid);
log.info("검수 완료된 도엽 수: {}", analMapList.size());
if (analMapList.isEmpty()) {
log.warn("검수 완료된 도엽이 없음. 작업 건너뜀.");
return RepeatStatus.FINISHED;
}
int processedMapSheetCount = 0;
int totalGeoJsonFiles = 0;
for (AnalMapSheetList mapSheet : analMapList) {
log.info(" 도엽 처리 중: MapSheetNum={}", mapSheet.getMapSheetNum());
// 도엽별 geom 데이터 가져오기
List<CompleteLabelData> completeList =
repository.findCompletedYesterdayLabelingList(analUid, mapSheet.getMapSheetNum());
log.info(" 완료된 라벨링 데이터 수: {}", completeList.size());
if (!completeList.isEmpty()) {
List<Long> geoUids = completeList.stream().map(CompleteLabelData::getGeoUid).toList();
log.info(" GeoUID 목록 생성 완료: {} 건", geoUids.size());
List<GeoJsonFeature> features = completeList.stream().map(GeoJsonFeature::from).toList();
log.info(" GeoJSON Feature 변환 완료: {} 개", features.size());
FeatureCollection collection = new FeatureCollection(features);
String filename = mapSheet.buildFilename(resultUid);
log.info(" GeoJSON 파일명: {}", filename);
// 형식 /kamco-nfs/dataset/request/uuid/filename
Path outputPath =
Paths.get(
trainingDataDir + File.separator + "request" + File.separator + resultUid,
filename);
log.info(" 출력 경로: {}", outputPath);
try {
Files.createDirectories(outputPath.getParent());
log.info(" 디렉토리 생성 완료: {}", outputPath.getParent());
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
objectMapper.writeValue(outputPath.toFile(), collection);
log.info(" GeoJSON 파일 저장 완료: {}", outputPath);
repository.updateLearnDataGeomFileCreateYn(geoUids);
log.info(" DB 업데이트 완료: {} 건", geoUids.size());
processedMapSheetCount++;
totalGeoJsonFiles++;
} catch (IOException e) {
log.error(" GeoJSON 파일 생성 실패: {}", e.getMessage(), e);
throw new RuntimeException("GeoJSON 파일 생성 실패: " + filename, e);
}
}
}
log.info("========================================");
log.info("GeoJSON 생성 완료 (ResultUid={})", resultUid);
log.info(" 처리된 도엽 수: {}", processedMapSheetCount);
log.info(" 생성된 GeoJSON 파일 수: {}", totalGeoJsonFiles);
log.info("========================================");
if (totalGeoJsonFiles == 0) {
throw new RuntimeException("생성된 GeoJSON 파일이 없습니다. (AnalUid=" + analUid + ")");
}
return RepeatStatus.FINISHED;
}
}

View File

@@ -0,0 +1,57 @@
package com.kamco.cd.geojsonscheduler.batch;
import com.kamco.cd.geojsonscheduler.listener.StepHistoryListener;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
public class ProcessAnalCntInfoJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final MakeGeoJsonTasklet makeGeoJsonTasklet;
private final DockerRunTasklet dockerRunTasklet;
private final ZipResponseTasklet zipResponseTasklet;
private final StepHistoryListener stepHistoryListener;
@Bean
public Job processAnalCntInfoJob() {
return new JobBuilder("processAnalCntInfoJob", jobRepository)
.start(makeGeoJsonStep())
.next(dockerRunStep())
.next(zipResponseStep())
.build();
}
@Bean
public Step makeGeoJsonStep() {
return new StepBuilder("makeGeoJsonStep", jobRepository)
.tasklet(makeGeoJsonTasklet, transactionManager)
.listener(stepHistoryListener)
.build();
}
@Bean
public Step dockerRunStep() {
return new StepBuilder("dockerRunStep", jobRepository)
.tasklet(dockerRunTasklet, transactionManager)
.listener(stepHistoryListener)
.build();
}
@Bean
public Step zipResponseStep() {
return new StepBuilder("zipResponseStep", jobRepository)
.tasklet(zipResponseTasklet, transactionManager)
.listener(stepHistoryListener)
.build();
}
}

View File

@@ -0,0 +1,36 @@
package com.kamco.cd.geojsonscheduler.batch;
import com.kamco.cd.geojsonscheduler.listener.BatchHistoryListener;
import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
@Configuration
@RequiredArgsConstructor
public class TrainModelJobConfig {
private final JobRepository jobRepository;
private final PlatformTransactionManager transactionManager;
private final TrainModelTasklet trainModelTasklet;
@Bean
public Job trainModelJob(BatchHistoryListener historyListener) {
return new JobBuilder("trainModelJob", jobRepository)
.listener(historyListener)
.start(trainModelStep())
.build();
}
@Bean
public Step trainModelStep() {
return new StepBuilder("trainModelStep", jobRepository)
.tasklet(trainModelTasklet, transactionManager)
.build();
}
}

View File

@@ -0,0 +1,60 @@
package com.kamco.cd.geojsonscheduler.batch;
import com.kamco.cd.geojsonscheduler.service.TrainDockerRunnerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@RequiredArgsConstructor
public class TrainModelTasklet implements Tasklet {
private final TrainDockerRunnerService trainDockerRunnerService;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
log.info("========================================");
log.info("학습 배치 작업 시작");
log.info("========================================");
String jobName = chunkContext.getStepContext().getJobName();
log.info("Job Name: {}", jobName);
// Job 파라미터에서 dataset-folder와 output-folder 가져오기
String datasetFolder = (String) chunkContext.getStepContext()
.getJobParameters()
.get("dataset-folder");
String outputFolder = (String) chunkContext.getStepContext()
.getJobParameters()
.get("output-folder");
log.info("Dataset Folder Parameter: {}", datasetFolder);
log.info("Output Folder Parameter: {}", outputFolder);
if (datasetFolder == null || datasetFolder.isBlank()) {
log.error("dataset-folder 파라미터가 없습니다!");
throw new IllegalArgumentException("dataset-folder parameter is required");
}
if (outputFolder == null || outputFolder.isBlank()) {
log.error("output-folder 파라미터가 없습니다!");
throw new IllegalArgumentException("output-folder parameter is required");
}
// Train Docker 실행
log.info("Train Docker 실행 중...");
trainDockerRunnerService.runTraining(datasetFolder, outputFolder);
log.info("Train Docker 실행 완료");
log.info("========================================");
log.info("학습 배치 작업 완료");
log.info("========================================");
return RepeatStatus.FINISHED;
}
}

View File

@@ -0,0 +1,108 @@
package com.kamco.cd.geojsonscheduler.batch;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@RequiredArgsConstructor
public class ZipResponseTasklet implements Tasklet {
@Value("${training-data.geojson-dir}")
private String trainingDataDir;
@Value("#{jobParameters['resultUid']}")
private String resultUid;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
log.info("========================================");
log.info("결과물 압축 시작 (ResultUid={})", resultUid);
log.info("========================================");
Path responseDir =
Paths.get(trainingDataDir + File.separator + "response" + File.separator + resultUid);
Path zipFile =
Paths.get(
trainingDataDir + File.separator + "response" + File.separator + resultUid + ".zip");
log.info("압축 대상 디렉토리: {}", responseDir);
log.info("압축 파일 경로: {}", zipFile);
if (!Files.exists(responseDir)) {
log.error("Response 디렉토리가 존재하지 않음: {}", responseDir);
throw new RuntimeException("Response 디렉토리가 존재하지 않습니다: " + responseDir);
}
try {
zipDirectory(responseDir.toFile(), zipFile.toFile());
log.info("압축 완료: {} (크기: {} bytes)", zipFile, Files.size(zipFile));
} catch (IOException e) {
log.error("압축 실패: {}", e.getMessage(), e);
throw new RuntimeException("Response 디렉토리 압축 실패: " + responseDir, e);
}
log.info("========================================");
log.info("결과물 압축 완료 (ResultUid={})", resultUid);
log.info("========================================");
return RepeatStatus.FINISHED;
}
private void zipDirectory(File sourceDir, File zipFile) throws IOException {
try (FileOutputStream fos = new FileOutputStream(zipFile);
ZipOutputStream zos = new ZipOutputStream(fos)) {
zipDirectoryRecursive(sourceDir, sourceDir.getName(), zos);
}
}
private void zipDirectoryRecursive(File fileToZip, String fileName, ZipOutputStream zos)
throws IOException {
if (fileToZip.isHidden()) {
return;
}
if (fileToZip.isDirectory()) {
if (fileName.endsWith("/")) {
zos.putNextEntry(new ZipEntry(fileName));
zos.closeEntry();
} else {
zos.putNextEntry(new ZipEntry(fileName + "/"));
zos.closeEntry();
}
File[] children = fileToZip.listFiles();
if (children != null) {
for (File childFile : children) {
zipDirectoryRecursive(childFile, fileName + "/" + childFile.getName(), zos);
}
}
return;
}
try (FileInputStream fis = new FileInputStream(fileToZip)) {
ZipEntry zipEntry = new ZipEntry(fileName);
zos.putNextEntry(zipEntry);
byte[] buffer = new byte[1024];
int length;
while ((length = fis.read(buffer)) >= 0) {
zos.write(buffer, 0, length);
}
}
}
}

View File

@@ -0,0 +1,25 @@
package com.kamco.cd.geojsonscheduler.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Getter
@Setter
@ConfigurationProperties(prefix = "train-data.docker")
public class TrainDockerProperties {
private String image;
private String dataVolume;
private String checkpointsVolume;
private String datasetFolder;
private String outputFolder;
private String inputSize;
private String cropSize;
private int batchSize;
private String gpuIds;
private int gpus;
private String lr;
private String backbone;
private int epochs;
}

View File

@@ -0,0 +1,113 @@
package com.kamco.cd.geojsonscheduler.listener;
import com.kamco.cd.geojsonscheduler.repository.BatchStepHistoryRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;
@Log4j2
@Component
@RequiredArgsConstructor
public class StepHistoryListener implements StepExecutionListener {
private final BatchStepHistoryRepository batchStepHistoryRepository;
@Override
public void beforeStep(StepExecution stepExecution) {
log.info("=========================================================");
log.info("Step 시작 - StepHistoryListener");
log.info("=========================================================");
String stepName = stepExecution.getStepName();
log.info("Step Name: {}", stepName);
try {
Long analUid = stepExecution.getJobParameters().getLong("analUid");
String resultUid = stepExecution.getJobParameters().getString("resultUid");
if (analUid == null || resultUid == null) {
log.warn(
"JobParameters에 analUid 또는 resultUid가 없어 Step 이력을 기록할 수 없습니다.");
return;
}
log.info("AnalUid: {}, ResultUid: {}", analUid, resultUid);
// Step 시작 기록
batchStepHistoryRepository.startStep(analUid, resultUid, stepName);
log.info("Step 시작 기록 저장 완료");
} catch (Exception e) {
log.error("Step 시작 기록 저장 실패: {}", e.getMessage(), e);
// 리스너 오류가 Step 실행을 방해하지 않도록 예외를 던지지 않음
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("=========================================================");
log.info("Step 종료 - StepHistoryListener");
log.info("=========================================================");
String stepName = stepExecution.getStepName();
log.info("Step Name: {}", stepName);
log.info("Step Exit Status: {}", stepExecution.getExitStatus());
try {
Long analUid = stepExecution.getJobParameters().getLong("analUid");
String resultUid = stepExecution.getJobParameters().getString("resultUid");
if (analUid == null || resultUid == null) {
log.warn(
"JobParameters에 analUid 또는 resultUid가 없어 Step 이력을 기록할 수 없습니다.");
return stepExecution.getExitStatus();
}
log.info("AnalUid: {}, ResultUid: {}", analUid, resultUid);
// Step 성공 여부 판단
boolean isSuccess = ExitStatus.COMPLETED.equals(stepExecution.getExitStatus());
log.info("Step 성공 여부: {}", isSuccess ? "성공" : "실패");
if (isSuccess) {
// Step 성공 기록
batchStepHistoryRepository.finishStepSuccess(analUid, resultUid, stepName);
log.info("Step 성공 기록 저장 완료");
} else {
// Step 실패 기록
String errorMessage = buildErrorMessage(stepExecution);
batchStepHistoryRepository.finishStepFailed(analUid, resultUid, stepName, errorMessage);
log.info("Step 실패 기록 저장 완료");
}
} catch (Exception e) {
log.error("Step 종료 기록 저장 실패: {}", e.getMessage(), e);
// 리스너 오류가 Step 실행을 방해하지 않도록 예외를 던지지 않음
}
log.info("=========================================================");
return stepExecution.getExitStatus();
}
/**
* Step 실행 실패 시 에러 메시지 생성
*/
private String buildErrorMessage(StepExecution stepExecution) {
StringBuilder sb = new StringBuilder();
sb.append("ExitStatus: ").append(stepExecution.getExitStatus()).append("\n");
if (!stepExecution.getFailureExceptions().isEmpty()) {
sb.append("Failure Exceptions:\n");
for (Throwable t : stepExecution.getFailureExceptions()) {
sb.append("- ").append(t.getClass().getSimpleName()).append(": ").append(t.getMessage())
.append("\n");
}
}
return sb.toString();
}
}

View File

@@ -0,0 +1,138 @@
package com.kamco.cd.geojsonscheduler.repository;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
@Log4j2
@Repository
@RequiredArgsConstructor
public class BatchStepHistoryRepository {
private final JdbcTemplate jdbcTemplate;
/**
* Step 시작 기록
*/
@Transactional
public void startStep(Long analUid, String resultUid, String stepName) {
log.info("[BatchStepHistoryRepository] Step 시작 기록 저장");
log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName);
String sql =
"""
INSERT INTO public.batch_step_history
(anal_uid, result_uid, step_name, status, started_dttm, created_dttm, updated_dttm)
VALUES (?, ?, ?, ?, ?, ?, ?)
""";
Timestamp now = Timestamp.valueOf(LocalDateTime.now());
int rowsAffected =
jdbcTemplate.update(
sql,
analUid,
resultUid,
stepName,
"STARTED",
now, // started_dttm
now, // created_dttm
now // updated_dttm
);
log.info(
"[BatchStepHistoryRepository] Step 시작 기록 저장 완료 ({} rows affected)", rowsAffected);
}
/**
* Step 성공 기록
*/
@Transactional
public void finishStepSuccess(Long analUid, String resultUid, String stepName) {
log.info("[BatchStepHistoryRepository] Step 성공 기록 업데이트");
log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName);
String sql =
"""
UPDATE public.batch_step_history
SET status = ?,
completed_dttm = ?,
updated_dttm = ?
WHERE anal_uid = ?
AND result_uid = ?
AND step_name = ?
AND status = 'STARTED'
""";
Timestamp now = Timestamp.valueOf(LocalDateTime.now());
int rowsAffected =
jdbcTemplate.update(sql, "SUCCESS", now, now, analUid, resultUid, stepName);
if (rowsAffected > 0) {
log.info(
"[BatchStepHistoryRepository] Step 성공 기록 업데이트 완료 ({} rows affected)",
rowsAffected);
} else {
log.warn(
"[BatchStepHistoryRepository] 업데이트된 row가 없습니다. AnalUid: {}, ResultUid: {},"
+ " StepName: {}",
analUid,
resultUid,
stepName);
}
}
/**
* Step 실패 기록
*/
@Transactional
public void finishStepFailed(
Long analUid, String resultUid, String stepName, String errorMessage) {
log.info("[BatchStepHistoryRepository] Step 실패 기록 업데이트");
log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName);
log.info(" ErrorMessage: {}", errorMessage);
String sql =
"""
UPDATE public.batch_step_history
SET status = ?,
error_message = ?,
completed_dttm = ?,
updated_dttm = ?
WHERE anal_uid = ?
AND result_uid = ?
AND step_name = ?
AND status = 'STARTED'
""";
Timestamp now = Timestamp.valueOf(LocalDateTime.now());
// error_message는 최대 1000자로 제한
String truncatedError =
errorMessage != null && errorMessage.length() > 1000
? errorMessage.substring(0, 1000)
: errorMessage;
int rowsAffected =
jdbcTemplate.update(
sql, "FAILED", truncatedError, now, now, analUid, resultUid, stepName);
if (rowsAffected > 0) {
log.info(
"[BatchStepHistoryRepository] Step 실패 기록 업데이트 완료 ({} rows affected)",
rowsAffected);
} else {
log.warn(
"[BatchStepHistoryRepository] 업데이트된 row가 없습니다. AnalUid: {}, ResultUid: {},"
+ " StepName: {}",
analUid,
resultUid,
stepName);
}
}
}

View File

@@ -37,14 +37,18 @@ public class DockerRunnerService {
int exitCode = process.waitFor(); int exitCode = process.waitFor();
if (exitCode != 0) { if (exitCode != 0) {
log.error("Docker process exited with code {} for resultUid: {}", exitCode, resultUid); log.error("Docker process exited with code {} for resultUid: {}", exitCode, resultUid);
throw new RuntimeException(
String.format("Docker process failed with exit code %d for resultUid: %s", exitCode, resultUid));
} else { } else {
log.info("Docker process completed successfully for resultUid: {}", resultUid); log.info("Docker process completed successfully for resultUid: {}", resultUid);
} }
} catch (IOException e) { } catch (IOException e) {
log.error("Failed to run docker command for resultUid {}: {}", resultUid, e.getMessage()); log.error("Failed to run docker command for resultUid {}: {}", resultUid, e.getMessage());
throw new RuntimeException("Failed to run docker command for resultUid: " + resultUid, e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
log.error("Docker process interrupted for resultUid {}: {}", resultUid, e.getMessage()); log.error("Docker process interrupted for resultUid {}: {}", resultUid, e.getMessage());
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
throw new RuntimeException("Docker process interrupted for resultUid: " + resultUid, e);
} }
} }

View File

@@ -0,0 +1,121 @@
package com.kamco.cd.geojsonscheduler.service;
import com.kamco.cd.geojsonscheduler.config.TrainDockerProperties;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.stereotype.Service;
@Log4j2
@Service
@RequiredArgsConstructor
public class TrainDockerRunnerService {
private final TrainDockerProperties trainDockerProperties;
public void runTraining(String datasetFolder, String outputFolder) {
log.info("========================================");
log.info("Train Docker 실행 시작");
log.info("Dataset Folder: {}", datasetFolder);
log.info("Output Folder: {}", outputFolder);
log.info("========================================");
List<String> command = buildTrainCommand(datasetFolder, outputFolder);
log.info("Docker 명령어: {}", String.join(" ", command));
try {
ProcessBuilder pb = new ProcessBuilder(command);
pb.redirectErrorStream(true);
Process process = pb.start();
log.info("Docker 프로세스 시작됨 (Detached mode)");
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(process.getInputStream()))) {
String line;
while ((line = reader.readLine()) != null) {
log.info("[train-docker] {}", line);
}
}
int exitCode = process.waitFor();
if (exitCode != 0) {
log.error("Train Docker 프로세스 실패: exitCode={}, datasetFolder={}, outputFolder={}",
exitCode, datasetFolder, outputFolder);
} else {
log.info("Train Docker 프로세스 시작 완료: datasetFolder={}, outputFolder={}",
datasetFolder, outputFolder);
}
} catch (IOException e) {
log.error("Train Docker 실행 실패: datasetFolder={}, outputFolder={}, error={}",
datasetFolder, outputFolder, e.getMessage(), e);
} catch (InterruptedException e) {
log.error("Train Docker 프로세스 중단: datasetFolder={}, outputFolder={}, error={}",
datasetFolder, outputFolder, e.getMessage(), e);
Thread.currentThread().interrupt();
}
log.info("========================================");
log.info("Train Docker 실행 완료");
log.info("========================================");
}
private List<String> buildTrainCommand(String datasetFolder, String outputFolder) {
List<String> cmd = new ArrayList<>();
cmd.add("docker");
cmd.add("run");
cmd.add("-d"); // detached mode
cmd.add("--name");
cmd.add("train-cd");
cmd.add("--rm");
cmd.add("--gpus");
cmd.add("all");
cmd.add("--ipc=host");
cmd.add("--shm-size=16g");
cmd.add("--ulimit");
cmd.add("memlock=-1");
cmd.add("--ulimit");
cmd.add("stack=67108864");
cmd.add("-e");
cmd.add("NCCL_DEBUG=INFO");
cmd.add("-e");
cmd.add("NCCL_IB_DISABLE=1");
cmd.add("-e");
cmd.add("NCCL_P2P_DISABLE=0");
cmd.add("-e");
cmd.add("NCCL_SOCKET_IFNAME=eth0");
cmd.add("-v");
cmd.add(trainDockerProperties.getDataVolume());
cmd.add("-v");
cmd.add(trainDockerProperties.getCheckpointsVolume());
cmd.add("-it");
cmd.add(trainDockerProperties.getImage());
cmd.add("python");
cmd.add("/workspace/change-detection-code/train_wrapper.py");
cmd.add("--dataset-folder");
cmd.add(datasetFolder);
cmd.add("--output-folder");
cmd.add(outputFolder);
cmd.add("--input-size");
cmd.add(trainDockerProperties.getInputSize());
cmd.add("--crop-size");
cmd.add(trainDockerProperties.getCropSize());
cmd.add("--batch-size");
cmd.add(String.valueOf(trainDockerProperties.getBatchSize()));
cmd.add("--gpu-ids");
cmd.add(trainDockerProperties.getGpuIds());
cmd.add("--gpus");
cmd.add(String.valueOf(trainDockerProperties.getGpus()));
cmd.add("--lr");
cmd.add(trainDockerProperties.getLr());
cmd.add("--backbone");
cmd.add(trainDockerProperties.getBackbone());
cmd.add("--epochs");
cmd.add(String.valueOf(trainDockerProperties.getEpochs()));
return cmd;
}
}

View File

@@ -23,3 +23,36 @@ COMMENT ON COLUMN public.batch_history.created_dttm IS '생성 일시';
COMMENT ON COLUMN public.batch_history.updated_dttm IS '수정 일시'; COMMENT ON COLUMN public.batch_history.updated_dttm IS '수정 일시';
COMMENT ON COLUMN public.batch_history.status IS '상태 (STARTED/COMPLETED/FAILED)'; COMMENT ON COLUMN public.batch_history.status IS '상태 (STARTED/COMPLETED/FAILED)';
COMMENT ON COLUMN public.batch_history.completed_dttm IS '완료 일시'; COMMENT ON COLUMN public.batch_history.completed_dttm IS '완료 일시';
-- batch_step_history 테이블 생성
CREATE TABLE IF NOT EXISTS public.batch_step_history (
id BIGSERIAL PRIMARY KEY,
anal_uid BIGINT NOT NULL,
result_uid VARCHAR(255) NOT NULL,
step_name VARCHAR(100) NOT NULL,
status VARCHAR(50) NOT NULL,
error_message TEXT,
started_dttm TIMESTAMP NOT NULL,
completed_dttm TIMESTAMP,
created_dttm TIMESTAMP NOT NULL,
updated_dttm TIMESTAMP NOT NULL
);
-- 인덱스 생성
CREATE INDEX IF NOT EXISTS idx_batch_step_history_anal_uid ON public.batch_step_history(anal_uid);
CREATE INDEX IF NOT EXISTS idx_batch_step_history_result_uid ON public.batch_step_history(result_uid);
CREATE INDEX IF NOT EXISTS idx_batch_step_history_status ON public.batch_step_history(status);
CREATE INDEX IF NOT EXISTS idx_batch_step_history_step_name ON public.batch_step_history(step_name);
-- 코멘트
COMMENT ON TABLE public.batch_step_history IS '배치 Step 실행 이력';
COMMENT ON COLUMN public.batch_step_history.id IS 'Step 이력 고유 ID';
COMMENT ON COLUMN public.batch_step_history.anal_uid IS '분석 UID';
COMMENT ON COLUMN public.batch_step_history.result_uid IS '결과 UID';
COMMENT ON COLUMN public.batch_step_history.step_name IS 'Step 이름 (makeGeoJsonStep/dockerRunStep/zipResponseStep)';
COMMENT ON COLUMN public.batch_step_history.status IS '상태 (STARTED/SUCCESS/FAILED)';
COMMENT ON COLUMN public.batch_step_history.error_message IS '에러 메시지';
COMMENT ON COLUMN public.batch_step_history.started_dttm IS 'Step 시작 일시';
COMMENT ON COLUMN public.batch_step_history.completed_dttm IS 'Step 완료 일시';
COMMENT ON COLUMN public.batch_step_history.created_dttm IS '생성 일시';
COMMENT ON COLUMN public.batch_step_history.updated_dttm IS '수정 일시';