diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/DockerRunTasklet.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/DockerRunTasklet.java new file mode 100644 index 0000000..4b8b1be --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/DockerRunTasklet.java @@ -0,0 +1,37 @@ +package com.kamco.cd.geojsonscheduler.batch; + +import com.kamco.cd.geojsonscheduler.service.DockerRunnerService; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class DockerRunTasklet implements Tasklet { + + private final DockerRunnerService dockerRunnerService; + + @Value("#{jobParameters['resultUid']}") + private String resultUid; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + log.info("========================================"); + log.info("Docker 컨테이너 실행 시작 (ResultUid={})", resultUid); + log.info("========================================"); + + dockerRunnerService.run(resultUid); + + log.info("========================================"); + log.info("Docker 컨테이너 실행 완료 (ResultUid={})", resultUid); + log.info("========================================"); + + return RepeatStatus.FINISHED; + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ExportGeoJsonJobConfig.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ExportGeoJsonJobConfig.java index fd023cb..d999d8d 100644 --- a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ExportGeoJsonJobConfig.java +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ExportGeoJsonJobConfig.java @@ -4,7 +4,6 @@ import com.kamco.cd.geojsonscheduler.listener.BatchHistoryListener; import lombok.RequiredArgsConstructor; import org.springframework.batch.core.Job; import org.springframework.batch.core.Step; -import org.springframework.batch.core.StepExecutionListener; import org.springframework.batch.core.job.builder.JobBuilder; import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.step.builder.StepBuilder; @@ -18,20 +17,20 @@ public class ExportGeoJsonJobConfig { private final JobRepository jobRepository; private final PlatformTransactionManager transactionManager; - private final ExportGeoJsonTasklet exportGeoJsonTasklet; + private final LaunchChildJobsTasklet launchChildJobsTasklet; @Bean - public Job exportGeoJsonJob(BatchHistoryListener historyListener) { // 1. 리스너 주입 받기 + public Job exportGeoJsonJob(BatchHistoryListener historyListener) { return new JobBuilder("exportGeoJsonJob", jobRepository) - .listener(historyListener) // 2. 리스너 등록 - .start(exportGeoJsonStep()) + .listener(historyListener) + .start(launchChildJobsStep()) .build(); } @Bean - public Step exportGeoJsonStep() { - return new StepBuilder("exportGeoJsonStep", jobRepository) - .tasklet(exportGeoJsonTasklet, transactionManager) + public Step launchChildJobsStep() { + return new StepBuilder("launchChildJobsStep", jobRepository) + .tasklet(launchChildJobsTasklet, transactionManager) .build(); } -} \ No newline at end of file +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/LaunchChildJobsTasklet.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/LaunchChildJobsTasklet.java new file mode 100644 index 0000000..e73123a --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/LaunchChildJobsTasklet.java @@ -0,0 +1,94 @@ +package com.kamco.cd.geojsonscheduler.batch; + +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalCntInfo; +import com.kamco.cd.geojsonscheduler.repository.TrainingDataReviewJobRepository; +import java.util.List; +import java.util.Objects; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class LaunchChildJobsTasklet implements Tasklet { + + private final TrainingDataReviewJobRepository repository; + private final JobLauncher jobLauncher; + + @Qualifier("processAnalCntInfoJob") + private final Job processAnalCntInfoJob; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + log.info("========================================"); + log.info("Parent Job 시작: AnalCntInfo 리스트 조회 및 Child Job 실행"); + log.info("========================================"); + + // 진행중인 회차 중, complete_cnt 가 존재하는 회차 목록 가져오기 + log.info("진행중인 회차 목록 조회 중..."); + List analList = repository.findAnalCntInfoList(); + log.info("진행중인 회차 수: {}", analList.size()); + + int processedCount = 0; + int skippedCount = 0; + int failedCount = 0; + + for (AnalCntInfo info : analList) { + log.info("----------------------------------------"); + log.info("회차 검토: AnalUid={}, ResultUid={}", info.getAnalUid(), info.getResultUid()); + log.info("전체 건수: {}, 파일 건수: {}", info.getAllCnt(), info.getFileCnt()); + + if (Objects.equals(info.getAllCnt(), info.getFileCnt())) { + log.info("모든 파일이 이미 처리됨. 건너뜀."); + skippedCount++; + continue; + } + + try { + // Child Job 실행 + JobParameters jobParameters = + new JobParametersBuilder() + .addLong("analUid", info.getAnalUid()) + .addString("resultUid", info.getResultUid()) + .addLong("timestamp", System.currentTimeMillis()) // 고유성 보장 + .toJobParameters(); + + log.info("Child Job 실행 중... (AnalUid={}, ResultUid={})", info.getAnalUid(), info.getResultUid()); + jobLauncher.run(processAnalCntInfoJob, jobParameters); + log.info("Child Job 실행 완료 (AnalUid={}, ResultUid={})", info.getAnalUid(), info.getResultUid()); + processedCount++; + } catch (Exception e) { + log.error("Child Job 실행 실패 (AnalUid={}, ResultUid={}): {}", info.getAnalUid(), info.getResultUid(), e.getMessage(), e); + failedCount++; + // 한 개 실패해도 계속 진행 + } + } + + log.info("========================================"); + log.info("Parent Job 완료"); + log.info(" 총 회차 수: {}", analList.size()); + log.info(" 성공: {}", processedCount); + log.info(" 건너뜀: {}", skippedCount); + log.info(" 실패: {}", failedCount); + log.info("========================================"); + + if (failedCount > 0) { + log.warn("{} 개의 Child Job 실행이 실패했습니다.", failedCount); + // 실패가 있어도 Parent Job은 성공으로 처리 (부분 성공) + // 만약 하나라도 실패하면 Parent Job도 실패로 처리하려면 아래 주석 해제 + // throw new RuntimeException(failedCount + " 개의 Child Job 실행이 실패했습니다."); + } + + return RepeatStatus.FINISHED; + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/MakeGeoJsonTasklet.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/MakeGeoJsonTasklet.java new file mode 100644 index 0000000..7c4b6a2 --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/MakeGeoJsonTasklet.java @@ -0,0 +1,119 @@ +package com.kamco.cd.geojsonscheduler.batch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalMapSheetList; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData.GeoJsonFeature; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.FeatureCollection; +import com.kamco.cd.geojsonscheduler.repository.TrainingDataReviewJobRepository; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class MakeGeoJsonTasklet implements Tasklet { + + private final TrainingDataReviewJobRepository repository; + + @Value("${training-data.geojson-dir}") + private String trainingDataDir; + + @Value("#{jobParameters['analUid']}") + private Long analUid; + + @Value("#{jobParameters['resultUid']}") + private String resultUid; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + log.info("========================================"); + log.info("GeoJSON 생성 시작 (AnalUid={}, ResultUid={})", analUid, resultUid); + log.info("========================================"); + + // 검수 완료된 도엽 목록 조회 + log.info("검수 완료된 도엽 목록 조회 중... (AnalUid={})", analUid); + List analMapList = repository.findCompletedAnalMapSheetList(analUid); + log.info("검수 완료된 도엽 수: {}", analMapList.size()); + + if (analMapList.isEmpty()) { + log.warn("검수 완료된 도엽이 없음. 작업 건너뜀."); + return RepeatStatus.FINISHED; + } + + int processedMapSheetCount = 0; + int totalGeoJsonFiles = 0; + + for (AnalMapSheetList mapSheet : analMapList) { + log.info(" 도엽 처리 중: MapSheetNum={}", mapSheet.getMapSheetNum()); + + // 도엽별 geom 데이터 가져오기 + List completeList = + repository.findCompletedYesterdayLabelingList(analUid, mapSheet.getMapSheetNum()); + log.info(" 완료된 라벨링 데이터 수: {}", completeList.size()); + + if (!completeList.isEmpty()) { + List geoUids = completeList.stream().map(CompleteLabelData::getGeoUid).toList(); + log.info(" GeoUID 목록 생성 완료: {} 건", geoUids.size()); + + List features = completeList.stream().map(GeoJsonFeature::from).toList(); + log.info(" GeoJSON Feature 변환 완료: {} 개", features.size()); + + FeatureCollection collection = new FeatureCollection(features); + String filename = mapSheet.buildFilename(resultUid); + log.info(" GeoJSON 파일명: {}", filename); + + // 형식 /kamco-nfs/dataset/request/uuid/filename + Path outputPath = + Paths.get( + trainingDataDir + File.separator + "request" + File.separator + resultUid, + filename); + log.info(" 출력 경로: {}", outputPath); + + try { + Files.createDirectories(outputPath.getParent()); + log.info(" 디렉토리 생성 완료: {}", outputPath.getParent()); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + objectMapper.writeValue(outputPath.toFile(), collection); + log.info(" GeoJSON 파일 저장 완료: {}", outputPath); + + repository.updateLearnDataGeomFileCreateYn(geoUids); + log.info(" DB 업데이트 완료: {} 건", geoUids.size()); + + processedMapSheetCount++; + totalGeoJsonFiles++; + } catch (IOException e) { + log.error(" GeoJSON 파일 생성 실패: {}", e.getMessage(), e); + throw new RuntimeException("GeoJSON 파일 생성 실패: " + filename, e); + } + } + } + + log.info("========================================"); + log.info("GeoJSON 생성 완료 (ResultUid={})", resultUid); + log.info(" 처리된 도엽 수: {}", processedMapSheetCount); + log.info(" 생성된 GeoJSON 파일 수: {}", totalGeoJsonFiles); + log.info("========================================"); + + if (totalGeoJsonFiles == 0) { + throw new RuntimeException("생성된 GeoJSON 파일이 없습니다. (AnalUid=" + analUid + ")"); + } + + return RepeatStatus.FINISHED; + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ProcessAnalCntInfoJobConfig.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ProcessAnalCntInfoJobConfig.java new file mode 100644 index 0000000..3a6038e --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ProcessAnalCntInfoJobConfig.java @@ -0,0 +1,57 @@ +package com.kamco.cd.geojsonscheduler.batch; + +import com.kamco.cd.geojsonscheduler.listener.StepHistoryListener; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +@Configuration +@RequiredArgsConstructor +public class ProcessAnalCntInfoJobConfig { + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final MakeGeoJsonTasklet makeGeoJsonTasklet; + private final DockerRunTasklet dockerRunTasklet; + private final ZipResponseTasklet zipResponseTasklet; + private final StepHistoryListener stepHistoryListener; + + @Bean + public Job processAnalCntInfoJob() { + return new JobBuilder("processAnalCntInfoJob", jobRepository) + .start(makeGeoJsonStep()) + .next(dockerRunStep()) + .next(zipResponseStep()) + .build(); + } + + @Bean + public Step makeGeoJsonStep() { + return new StepBuilder("makeGeoJsonStep", jobRepository) + .tasklet(makeGeoJsonTasklet, transactionManager) + .listener(stepHistoryListener) + .build(); + } + + @Bean + public Step dockerRunStep() { + return new StepBuilder("dockerRunStep", jobRepository) + .tasklet(dockerRunTasklet, transactionManager) + .listener(stepHistoryListener) + .build(); + } + + @Bean + public Step zipResponseStep() { + return new StepBuilder("zipResponseStep", jobRepository) + .tasklet(zipResponseTasklet, transactionManager) + .listener(stepHistoryListener) + .build(); + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/TrainModelJobConfig.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/TrainModelJobConfig.java new file mode 100644 index 0000000..67a4b2d --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/TrainModelJobConfig.java @@ -0,0 +1,36 @@ +package com.kamco.cd.geojsonscheduler.batch; + +import com.kamco.cd.geojsonscheduler.listener.BatchHistoryListener; +import lombok.RequiredArgsConstructor; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.transaction.PlatformTransactionManager; + +@Configuration +@RequiredArgsConstructor +public class TrainModelJobConfig { + + private final JobRepository jobRepository; + private final PlatformTransactionManager transactionManager; + private final TrainModelTasklet trainModelTasklet; + + @Bean + public Job trainModelJob(BatchHistoryListener historyListener) { + return new JobBuilder("trainModelJob", jobRepository) + .listener(historyListener) + .start(trainModelStep()) + .build(); + } + + @Bean + public Step trainModelStep() { + return new StepBuilder("trainModelStep", jobRepository) + .tasklet(trainModelTasklet, transactionManager) + .build(); + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/TrainModelTasklet.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/TrainModelTasklet.java new file mode 100644 index 0000000..b8979cf --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/TrainModelTasklet.java @@ -0,0 +1,60 @@ +package com.kamco.cd.geojsonscheduler.batch; + +import com.kamco.cd.geojsonscheduler.service.TrainDockerRunnerService; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class TrainModelTasklet implements Tasklet { + + private final TrainDockerRunnerService trainDockerRunnerService; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + log.info("========================================"); + log.info("학습 배치 작업 시작"); + log.info("========================================"); + + String jobName = chunkContext.getStepContext().getJobName(); + log.info("Job Name: {}", jobName); + + // Job 파라미터에서 dataset-folder와 output-folder 가져오기 + String datasetFolder = (String) chunkContext.getStepContext() + .getJobParameters() + .get("dataset-folder"); + String outputFolder = (String) chunkContext.getStepContext() + .getJobParameters() + .get("output-folder"); + + log.info("Dataset Folder Parameter: {}", datasetFolder); + log.info("Output Folder Parameter: {}", outputFolder); + + if (datasetFolder == null || datasetFolder.isBlank()) { + log.error("dataset-folder 파라미터가 없습니다!"); + throw new IllegalArgumentException("dataset-folder parameter is required"); + } + + if (outputFolder == null || outputFolder.isBlank()) { + log.error("output-folder 파라미터가 없습니다!"); + throw new IllegalArgumentException("output-folder parameter is required"); + } + + // Train Docker 실행 + log.info("Train Docker 실행 중..."); + trainDockerRunnerService.runTraining(datasetFolder, outputFolder); + log.info("Train Docker 실행 완료"); + + log.info("========================================"); + log.info("학습 배치 작업 완료"); + log.info("========================================"); + + return RepeatStatus.FINISHED; + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ZipResponseTasklet.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ZipResponseTasklet.java new file mode 100644 index 0000000..1f12ed1 --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ZipResponseTasklet.java @@ -0,0 +1,108 @@ +package com.kamco.cd.geojsonscheduler.batch; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class ZipResponseTasklet implements Tasklet { + + @Value("${training-data.geojson-dir}") + private String trainingDataDir; + + @Value("#{jobParameters['resultUid']}") + private String resultUid; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + log.info("========================================"); + log.info("결과물 압축 시작 (ResultUid={})", resultUid); + log.info("========================================"); + + Path responseDir = + Paths.get(trainingDataDir + File.separator + "response" + File.separator + resultUid); + Path zipFile = + Paths.get( + trainingDataDir + File.separator + "response" + File.separator + resultUid + ".zip"); + + log.info("압축 대상 디렉토리: {}", responseDir); + log.info("압축 파일 경로: {}", zipFile); + + if (!Files.exists(responseDir)) { + log.error("Response 디렉토리가 존재하지 않음: {}", responseDir); + throw new RuntimeException("Response 디렉토리가 존재하지 않습니다: " + responseDir); + } + + try { + zipDirectory(responseDir.toFile(), zipFile.toFile()); + log.info("압축 완료: {} (크기: {} bytes)", zipFile, Files.size(zipFile)); + } catch (IOException e) { + log.error("압축 실패: {}", e.getMessage(), e); + throw new RuntimeException("Response 디렉토리 압축 실패: " + responseDir, e); + } + + log.info("========================================"); + log.info("결과물 압축 완료 (ResultUid={})", resultUid); + log.info("========================================"); + + return RepeatStatus.FINISHED; + } + + private void zipDirectory(File sourceDir, File zipFile) throws IOException { + try (FileOutputStream fos = new FileOutputStream(zipFile); + ZipOutputStream zos = new ZipOutputStream(fos)) { + zipDirectoryRecursive(sourceDir, sourceDir.getName(), zos); + } + } + + private void zipDirectoryRecursive(File fileToZip, String fileName, ZipOutputStream zos) + throws IOException { + if (fileToZip.isHidden()) { + return; + } + + if (fileToZip.isDirectory()) { + if (fileName.endsWith("/")) { + zos.putNextEntry(new ZipEntry(fileName)); + zos.closeEntry(); + } else { + zos.putNextEntry(new ZipEntry(fileName + "/")); + zos.closeEntry(); + } + + File[] children = fileToZip.listFiles(); + if (children != null) { + for (File childFile : children) { + zipDirectoryRecursive(childFile, fileName + "/" + childFile.getName(), zos); + } + } + return; + } + + try (FileInputStream fis = new FileInputStream(fileToZip)) { + ZipEntry zipEntry = new ZipEntry(fileName); + zos.putNextEntry(zipEntry); + byte[] buffer = new byte[1024]; + int length; + while ((length = fis.read(buffer)) >= 0) { + zos.write(buffer, 0, length); + } + } + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/config/TrainDockerProperties.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/config/TrainDockerProperties.java new file mode 100644 index 0000000..559f0c5 --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/config/TrainDockerProperties.java @@ -0,0 +1,25 @@ +package com.kamco.cd.geojsonscheduler.config; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@Getter +@Setter +@ConfigurationProperties(prefix = "train-data.docker") +public class TrainDockerProperties { + + private String image; + private String dataVolume; + private String checkpointsVolume; + private String datasetFolder; + private String outputFolder; + private String inputSize; + private String cropSize; + private int batchSize; + private String gpuIds; + private int gpus; + private String lr; + private String backbone; + private int epochs; +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/listener/StepHistoryListener.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/listener/StepHistoryListener.java new file mode 100644 index 0000000..14a34c7 --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/listener/StepHistoryListener.java @@ -0,0 +1,113 @@ +package com.kamco.cd.geojsonscheduler.listener; + +import com.kamco.cd.geojsonscheduler.repository.BatchStepHistoryRepository; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class StepHistoryListener implements StepExecutionListener { + + private final BatchStepHistoryRepository batchStepHistoryRepository; + + @Override + public void beforeStep(StepExecution stepExecution) { + log.info("========================================================="); + log.info("Step 시작 - StepHistoryListener"); + log.info("========================================================="); + + String stepName = stepExecution.getStepName(); + log.info("Step Name: {}", stepName); + + try { + Long analUid = stepExecution.getJobParameters().getLong("analUid"); + String resultUid = stepExecution.getJobParameters().getString("resultUid"); + + if (analUid == null || resultUid == null) { + log.warn( + "JobParameters에 analUid 또는 resultUid가 없어 Step 이력을 기록할 수 없습니다."); + return; + } + + log.info("AnalUid: {}, ResultUid: {}", analUid, resultUid); + + // Step 시작 기록 + batchStepHistoryRepository.startStep(analUid, resultUid, stepName); + log.info("Step 시작 기록 저장 완료"); + + } catch (Exception e) { + log.error("Step 시작 기록 저장 실패: {}", e.getMessage(), e); + // 리스너 오류가 Step 실행을 방해하지 않도록 예외를 던지지 않음 + } + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + log.info("========================================================="); + log.info("Step 종료 - StepHistoryListener"); + log.info("========================================================="); + + String stepName = stepExecution.getStepName(); + log.info("Step Name: {}", stepName); + log.info("Step Exit Status: {}", stepExecution.getExitStatus()); + + try { + Long analUid = stepExecution.getJobParameters().getLong("analUid"); + String resultUid = stepExecution.getJobParameters().getString("resultUid"); + + if (analUid == null || resultUid == null) { + log.warn( + "JobParameters에 analUid 또는 resultUid가 없어 Step 이력을 기록할 수 없습니다."); + return stepExecution.getExitStatus(); + } + + log.info("AnalUid: {}, ResultUid: {}", analUid, resultUid); + + // Step 성공 여부 판단 + boolean isSuccess = ExitStatus.COMPLETED.equals(stepExecution.getExitStatus()); + log.info("Step 성공 여부: {}", isSuccess ? "성공" : "실패"); + + if (isSuccess) { + // Step 성공 기록 + batchStepHistoryRepository.finishStepSuccess(analUid, resultUid, stepName); + log.info("Step 성공 기록 저장 완료"); + } else { + // Step 실패 기록 + String errorMessage = buildErrorMessage(stepExecution); + batchStepHistoryRepository.finishStepFailed(analUid, resultUid, stepName, errorMessage); + log.info("Step 실패 기록 저장 완료"); + } + + } catch (Exception e) { + log.error("Step 종료 기록 저장 실패: {}", e.getMessage(), e); + // 리스너 오류가 Step 실행을 방해하지 않도록 예외를 던지지 않음 + } + + log.info("========================================================="); + return stepExecution.getExitStatus(); + } + + /** + * Step 실행 실패 시 에러 메시지 생성 + */ + private String buildErrorMessage(StepExecution stepExecution) { + StringBuilder sb = new StringBuilder(); + + sb.append("ExitStatus: ").append(stepExecution.getExitStatus()).append("\n"); + + if (!stepExecution.getFailureExceptions().isEmpty()) { + sb.append("Failure Exceptions:\n"); + for (Throwable t : stepExecution.getFailureExceptions()) { + sb.append("- ").append(t.getClass().getSimpleName()).append(": ").append(t.getMessage()) + .append("\n"); + } + } + + return sb.toString(); + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/repository/BatchStepHistoryRepository.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/repository/BatchStepHistoryRepository.java new file mode 100644 index 0000000..fbaf37f --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/repository/BatchStepHistoryRepository.java @@ -0,0 +1,138 @@ +package com.kamco.cd.geojsonscheduler.repository; + +import java.sql.Timestamp; +import java.time.LocalDateTime; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; +import org.springframework.transaction.annotation.Transactional; + +@Log4j2 +@Repository +@RequiredArgsConstructor +public class BatchStepHistoryRepository { + + private final JdbcTemplate jdbcTemplate; + + /** + * Step 시작 기록 + */ + @Transactional + public void startStep(Long analUid, String resultUid, String stepName) { + log.info("[BatchStepHistoryRepository] Step 시작 기록 저장"); + log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName); + + String sql = + """ + INSERT INTO public.batch_step_history + (anal_uid, result_uid, step_name, status, started_dttm, created_dttm, updated_dttm) + VALUES (?, ?, ?, ?, ?, ?, ?) + """; + + Timestamp now = Timestamp.valueOf(LocalDateTime.now()); + + int rowsAffected = + jdbcTemplate.update( + sql, + analUid, + resultUid, + stepName, + "STARTED", + now, // started_dttm + now, // created_dttm + now // updated_dttm + ); + + log.info( + "[BatchStepHistoryRepository] Step 시작 기록 저장 완료 ({} rows affected)", rowsAffected); + } + + /** + * Step 성공 기록 + */ + @Transactional + public void finishStepSuccess(Long analUid, String resultUid, String stepName) { + log.info("[BatchStepHistoryRepository] Step 성공 기록 업데이트"); + log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName); + + String sql = + """ + UPDATE public.batch_step_history + SET status = ?, + completed_dttm = ?, + updated_dttm = ? + WHERE anal_uid = ? + AND result_uid = ? + AND step_name = ? + AND status = 'STARTED' + """; + + Timestamp now = Timestamp.valueOf(LocalDateTime.now()); + + int rowsAffected = + jdbcTemplate.update(sql, "SUCCESS", now, now, analUid, resultUid, stepName); + + if (rowsAffected > 0) { + log.info( + "[BatchStepHistoryRepository] Step 성공 기록 업데이트 완료 ({} rows affected)", + rowsAffected); + } else { + log.warn( + "[BatchStepHistoryRepository] 업데이트된 row가 없습니다. AnalUid: {}, ResultUid: {}," + + " StepName: {}", + analUid, + resultUid, + stepName); + } + } + + /** + * Step 실패 기록 + */ + @Transactional + public void finishStepFailed( + Long analUid, String resultUid, String stepName, String errorMessage) { + log.info("[BatchStepHistoryRepository] Step 실패 기록 업데이트"); + log.info(" AnalUid: {}, ResultUid: {}, StepName: {}", analUid, resultUid, stepName); + log.info(" ErrorMessage: {}", errorMessage); + + String sql = + """ + UPDATE public.batch_step_history + SET status = ?, + error_message = ?, + completed_dttm = ?, + updated_dttm = ? + WHERE anal_uid = ? + AND result_uid = ? + AND step_name = ? + AND status = 'STARTED' + """; + + Timestamp now = Timestamp.valueOf(LocalDateTime.now()); + + // error_message는 최대 1000자로 제한 + String truncatedError = + errorMessage != null && errorMessage.length() > 1000 + ? errorMessage.substring(0, 1000) + : errorMessage; + + int rowsAffected = + jdbcTemplate.update( + sql, "FAILED", truncatedError, now, now, analUid, resultUid, stepName); + + if (rowsAffected > 0) { + log.info( + "[BatchStepHistoryRepository] Step 실패 기록 업데이트 완료 ({} rows affected)", + rowsAffected); + } else { + log.warn( + "[BatchStepHistoryRepository] 업데이트된 row가 없습니다. AnalUid: {}, ResultUid: {}," + + " StepName: {}", + analUid, + resultUid, + stepName); + } + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/service/DockerRunnerService.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/service/DockerRunnerService.java index 42e6e47..f2c9da6 100644 --- a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/service/DockerRunnerService.java +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/service/DockerRunnerService.java @@ -37,14 +37,18 @@ public class DockerRunnerService { int exitCode = process.waitFor(); if (exitCode != 0) { log.error("Docker process exited with code {} for resultUid: {}", exitCode, resultUid); + throw new RuntimeException( + String.format("Docker process failed with exit code %d for resultUid: %s", exitCode, resultUid)); } else { log.info("Docker process completed successfully for resultUid: {}", resultUid); } } catch (IOException e) { log.error("Failed to run docker command for resultUid {}: {}", resultUid, e.getMessage()); + throw new RuntimeException("Failed to run docker command for resultUid: " + resultUid, e); } catch (InterruptedException e) { log.error("Docker process interrupted for resultUid {}: {}", resultUid, e.getMessage()); Thread.currentThread().interrupt(); + throw new RuntimeException("Docker process interrupted for resultUid: " + resultUid, e); } } diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/service/TrainDockerRunnerService.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/service/TrainDockerRunnerService.java new file mode 100644 index 0000000..c94bdc9 --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/service/TrainDockerRunnerService.java @@ -0,0 +1,121 @@ +package com.kamco.cd.geojsonscheduler.service; + +import com.kamco.cd.geojsonscheduler.config.TrainDockerProperties; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.stereotype.Service; + +@Log4j2 +@Service +@RequiredArgsConstructor +public class TrainDockerRunnerService { + + private final TrainDockerProperties trainDockerProperties; + + public void runTraining(String datasetFolder, String outputFolder) { + log.info("========================================"); + log.info("Train Docker 실행 시작"); + log.info("Dataset Folder: {}", datasetFolder); + log.info("Output Folder: {}", outputFolder); + log.info("========================================"); + + List command = buildTrainCommand(datasetFolder, outputFolder); + log.info("Docker 명령어: {}", String.join(" ", command)); + + try { + ProcessBuilder pb = new ProcessBuilder(command); + pb.redirectErrorStream(true); + Process process = pb.start(); + + log.info("Docker 프로세스 시작됨 (Detached mode)"); + + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + log.info("[train-docker] {}", line); + } + } + + int exitCode = process.waitFor(); + if (exitCode != 0) { + log.error("Train Docker 프로세스 실패: exitCode={}, datasetFolder={}, outputFolder={}", + exitCode, datasetFolder, outputFolder); + } else { + log.info("Train Docker 프로세스 시작 완료: datasetFolder={}, outputFolder={}", + datasetFolder, outputFolder); + } + } catch (IOException e) { + log.error("Train Docker 실행 실패: datasetFolder={}, outputFolder={}, error={}", + datasetFolder, outputFolder, e.getMessage(), e); + } catch (InterruptedException e) { + log.error("Train Docker 프로세스 중단: datasetFolder={}, outputFolder={}, error={}", + datasetFolder, outputFolder, e.getMessage(), e); + Thread.currentThread().interrupt(); + } + + log.info("========================================"); + log.info("Train Docker 실행 완료"); + log.info("========================================"); + } + + private List buildTrainCommand(String datasetFolder, String outputFolder) { + List cmd = new ArrayList<>(); + cmd.add("docker"); + cmd.add("run"); + cmd.add("-d"); // detached mode + cmd.add("--name"); + cmd.add("train-cd"); + cmd.add("--rm"); + cmd.add("--gpus"); + cmd.add("all"); + cmd.add("--ipc=host"); + cmd.add("--shm-size=16g"); + cmd.add("--ulimit"); + cmd.add("memlock=-1"); + cmd.add("--ulimit"); + cmd.add("stack=67108864"); + cmd.add("-e"); + cmd.add("NCCL_DEBUG=INFO"); + cmd.add("-e"); + cmd.add("NCCL_IB_DISABLE=1"); + cmd.add("-e"); + cmd.add("NCCL_P2P_DISABLE=0"); + cmd.add("-e"); + cmd.add("NCCL_SOCKET_IFNAME=eth0"); + cmd.add("-v"); + cmd.add(trainDockerProperties.getDataVolume()); + cmd.add("-v"); + cmd.add(trainDockerProperties.getCheckpointsVolume()); + cmd.add("-it"); + cmd.add(trainDockerProperties.getImage()); + cmd.add("python"); + cmd.add("/workspace/change-detection-code/train_wrapper.py"); + cmd.add("--dataset-folder"); + cmd.add(datasetFolder); + cmd.add("--output-folder"); + cmd.add(outputFolder); + cmd.add("--input-size"); + cmd.add(trainDockerProperties.getInputSize()); + cmd.add("--crop-size"); + cmd.add(trainDockerProperties.getCropSize()); + cmd.add("--batch-size"); + cmd.add(String.valueOf(trainDockerProperties.getBatchSize())); + cmd.add("--gpu-ids"); + cmd.add(trainDockerProperties.getGpuIds()); + cmd.add("--gpus"); + cmd.add(String.valueOf(trainDockerProperties.getGpus())); + cmd.add("--lr"); + cmd.add(trainDockerProperties.getLr()); + cmd.add("--backbone"); + cmd.add(trainDockerProperties.getBackbone()); + cmd.add("--epochs"); + cmd.add(String.valueOf(trainDockerProperties.getEpochs())); + return cmd; + } +} diff --git a/kamco-make-dataset-generation/src/main/resources/sql/schema.sql b/kamco-make-dataset-generation/src/main/resources/sql/schema.sql index fba8ca4..ba97357 100644 --- a/kamco-make-dataset-generation/src/main/resources/sql/schema.sql +++ b/kamco-make-dataset-generation/src/main/resources/sql/schema.sql @@ -23,3 +23,36 @@ COMMENT ON COLUMN public.batch_history.created_dttm IS '생성 일시'; COMMENT ON COLUMN public.batch_history.updated_dttm IS '수정 일시'; COMMENT ON COLUMN public.batch_history.status IS '상태 (STARTED/COMPLETED/FAILED)'; COMMENT ON COLUMN public.batch_history.completed_dttm IS '완료 일시'; + +-- batch_step_history 테이블 생성 +CREATE TABLE IF NOT EXISTS public.batch_step_history ( + id BIGSERIAL PRIMARY KEY, + anal_uid BIGINT NOT NULL, + result_uid VARCHAR(255) NOT NULL, + step_name VARCHAR(100) NOT NULL, + status VARCHAR(50) NOT NULL, + error_message TEXT, + started_dttm TIMESTAMP NOT NULL, + completed_dttm TIMESTAMP, + created_dttm TIMESTAMP NOT NULL, + updated_dttm TIMESTAMP NOT NULL +); + +-- 인덱스 생성 +CREATE INDEX IF NOT EXISTS idx_batch_step_history_anal_uid ON public.batch_step_history(anal_uid); +CREATE INDEX IF NOT EXISTS idx_batch_step_history_result_uid ON public.batch_step_history(result_uid); +CREATE INDEX IF NOT EXISTS idx_batch_step_history_status ON public.batch_step_history(status); +CREATE INDEX IF NOT EXISTS idx_batch_step_history_step_name ON public.batch_step_history(step_name); + +-- 코멘트 +COMMENT ON TABLE public.batch_step_history IS '배치 Step 실행 이력'; +COMMENT ON COLUMN public.batch_step_history.id IS 'Step 이력 고유 ID'; +COMMENT ON COLUMN public.batch_step_history.anal_uid IS '분석 UID'; +COMMENT ON COLUMN public.batch_step_history.result_uid IS '결과 UID'; +COMMENT ON COLUMN public.batch_step_history.step_name IS 'Step 이름 (makeGeoJsonStep/dockerRunStep/zipResponseStep)'; +COMMENT ON COLUMN public.batch_step_history.status IS '상태 (STARTED/SUCCESS/FAILED)'; +COMMENT ON COLUMN public.batch_step_history.error_message IS '에러 메시지'; +COMMENT ON COLUMN public.batch_step_history.started_dttm IS 'Step 시작 일시'; +COMMENT ON COLUMN public.batch_step_history.completed_dttm IS 'Step 완료 일시'; +COMMENT ON COLUMN public.batch_step_history.created_dttm IS '생성 일시'; +COMMENT ON COLUMN public.batch_step_history.updated_dttm IS '수정 일시';