diff --git a/kamco-make-dataset-generation/build/libs/generator-dataset-for-training.jar b/kamco-make-dataset-generation/build/libs/generator-dataset-for-training.jar index 310e591..6320c43 100644 Binary files a/kamco-make-dataset-generation/build/libs/generator-dataset-for-training.jar and b/kamco-make-dataset-generation/build/libs/generator-dataset-for-training.jar differ diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ExportGeoJsonTasklet.java.backup b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ExportGeoJsonTasklet.java.backup new file mode 100644 index 0000000..2931dfe --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/ExportGeoJsonTasklet.java.backup @@ -0,0 +1,152 @@ +package com.kamco.cd.geojsonscheduler.batch; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalCntInfo; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalMapSheetList; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData.GeoJsonFeature; +import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.FeatureCollection; +import com.kamco.cd.geojsonscheduler.repository.TrainingDataReviewJobRepository; +import com.kamco.cd.geojsonscheduler.service.DockerRunnerService; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Objects; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Log4j2 +@Component +@RequiredArgsConstructor +public class ExportGeoJsonTasklet implements Tasklet { + + private final TrainingDataReviewJobRepository repository; + private final DockerRunnerService dockerRunnerService; + + @Value("${training-data.geojson-dir}") + private String trainingDataDir; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) { + log.info("========================================"); + log.info("배치 작업 시작"); + log.info("========================================"); + + // 1. StepContext를 통해 바로 가져오기 (가장 추천) + String jobName = chunkContext.getStepContext().getJobName(); + log.info("Job Name: {}", jobName); + + // 진행중인 회차 중, complete_cnt 가 존재하는 회차 목록 가져오기 + log.info("진행중인 회차 목록 조회 중..."); + List analList = repository.findAnalCntInfoList(); + log.info("진행중인 회차 수: {}", analList.size()); + + int processedAnalCount = 0; + for (AnalCntInfo info : analList) { + log.info("----------------------------------------"); + log.info("회차 처리 중: AnalUid={}, ResultUid={}", info.getAnalUid(), info.getResultUid()); + log.info("전체 건수: {}, 파일 건수: {}", info.getAllCnt(), info.getFileCnt()); + + if (Objects.equals(info.getAllCnt(), info.getFileCnt())) { + log.info("모든 파일이 이미 처리됨. 건너뜀."); + continue; + } + + //추론 ID + String resultUid = info.getResultUid(); + log.info("ResultUid: {}", resultUid); + + //insert 하기 jobname, resultUid , 시작시간 + // 어제까지 검수 완료된 총 데이터의 도엽별 목록 가져오기 + log.info("검수 완료된 도엽 목록 조회 중... (AnalUid={})", info.getAnalUid()); + List analMapList = repository.findCompletedAnalMapSheetList(info.getAnalUid()); + log.info("검수 완료된 도엽 수: {}", analMapList.size()); + + //TODO 도엽이 4개이상 존재할때 만 RUN 하기 + if (analMapList.isEmpty()) { + log.warn("검수 완료된 도엽이 없음. 건너뜀."); + continue; + } + + //insert 하기 jobname, resultUid , 시작시간 + boolean anyProcessed = false; + int processedMapSheetCount = 0; + int totalGeoJsonFiles = 0; + + for (AnalMapSheetList mapSheet : analMapList) { + log.info(" 도엽 처리 중: MapSheetNum={}", mapSheet.getMapSheetNum()); + + //도엽별 geom 데이터 가지고 와서 geojson 만들기 + List completeList = + repository.findCompletedYesterdayLabelingList( + info.getAnalUid(), mapSheet.getMapSheetNum()); + log.info(" 완료된 라벨링 데이터 수: {}", completeList.size()); + + if (!completeList.isEmpty()) { + List geoUids = completeList.stream().map(CompleteLabelData::getGeoUid).toList(); + log.info(" GeoUID 목록 생성 완료: {} 건", geoUids.size()); + + List features = completeList.stream().map(GeoJsonFeature::from).toList(); + log.info(" GeoJSON Feature 변환 완료: {} 개", features.size()); + + FeatureCollection collection = new FeatureCollection(features); + String filename = mapSheet.buildFilename(resultUid); + log.info(" GeoJSON 파일명: {}", filename); + + // 형식 /kamco-nfs/dataset/request/uuid/filename + Path outputPath = Paths.get(trainingDataDir + File.separator + "request" + File.separator + resultUid, filename); + log.info(" 출력 경로: {}", outputPath); + + try { + Files.createDirectories(outputPath.getParent()); + log.info(" 디렉토리 생성 완료: {}", outputPath.getParent()); + + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.enable(SerializationFeature.INDENT_OUTPUT); + objectMapper.writeValue(outputPath.toFile(), collection); + log.info(" GeoJSON 파일 저장 완료: {}", outputPath); + + repository.updateLearnDataGeomFileCreateYn(geoUids); + log.info(" DB 업데이트 완료: {} 건", geoUids.size()); + + anyProcessed = true; + processedMapSheetCount++; + totalGeoJsonFiles++; + } catch (IOException e) { + log.error(" GeoJSON 파일 생성 실패: {}", e.getMessage(), e); + } + } + } + + log.info("회차 처리 완료: ResultUid={}", resultUid); + log.info(" 처리된 도엽 수: {}", processedMapSheetCount); + log.info(" 생성된 GeoJSON 파일 수: {}", totalGeoJsonFiles); + + if (anyProcessed) { + log.info("Docker 컨테이너 실행 중... (ResultUid={})", resultUid); + dockerRunnerService.run(resultUid); + log.info("Docker 컨테이너 실행 완료 (ResultUid={})", resultUid); + processedAnalCount++; + } else { + log.warn("처리된 도엽이 없어 Docker 실행 건너뜀 (ResultUid={})", resultUid); + } + } + + log.info("========================================"); + log.info("배치 작업 완료"); + log.info("처리된 회차 수: {}", processedAnalCount); + log.info("========================================"); + + return RepeatStatus.FINISHED; + } +} diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/LaunchChildJobsTasklet.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/LaunchChildJobsTasklet.java index d99e482..d29e2ef 100644 --- a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/LaunchChildJobsTasklet.java +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/batch/LaunchChildJobsTasklet.java @@ -6,7 +6,9 @@ import java.util.List; import java.util.Objects; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.springframework.batch.core.BatchStatus; import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.StepContribution; @@ -59,8 +61,9 @@ public class LaunchChildJobsTasklet implements Tasklet { /** 분석 회차 정보 조회를 위한 Repository */ private final TrainingDataReviewJobRepository repository; - /** Child Job을 실행하기 위한 JobLauncher */ - private final JobLauncher jobLauncher; + /** Child Job을 실행하기 위한 비동기 JobLauncher (트랜잭션 충돌 방지) */ + @Qualifier("asyncJobLauncher") + private final JobLauncher asyncJobLauncher; /** 실행할 Child Job (processAnalCntInfoJob) */ @Qualifier("processAnalCntInfoJob") @@ -141,18 +144,56 @@ public class LaunchChildJobsTasklet implements Tasklet { log.info(" - JobParameters: analUid={}, resultUid={}", info.getAnalUid(), info.getResultUid()); - // Child Job 실행 (동기 방식) + // Child Job 실행 (비동기 방식 - 트랜잭션 충돌 방지) + // asyncJobLauncher를 사용하여 별도 쓰레드에서 실행 // 내부적으로 makeGeoJsonStep → dockerRunStep → zipResponseStep 순차 실행 long startTime = System.currentTimeMillis(); - jobLauncher.run(processAnalCntInfoJob, jobParameters); + JobExecution jobExecution = asyncJobLauncher.run(processAnalCntInfoJob, jobParameters); + + // Child Job 완료 대기 (비동기 실행이므로 완료를 폴링) + log.info("[Child Job 대기] 실행 완료 대기 중... (JobExecutionId={})", + jobExecution.getId()); + while (jobExecution.isRunning()) { + try { + Thread.sleep(1000); // 1초마다 상태 확인 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Child Job 대기 중 인터럽트 발생", e); + } + } + long duration = System.currentTimeMillis() - startTime; + BatchStatus status = jobExecution.getStatus(); - log.info("[Child Job 완료] ✓ 정상 종료"); - log.info(" - AnalUid: {}", info.getAnalUid()); - log.info(" - ResultUid: {}", info.getResultUid()); - log.info(" - 실행 시간: {} ms ({} 초)", duration, duration / 1000); + if (status == BatchStatus.COMPLETED) { + log.info("[Child Job 완료] ✓ 정상 종료"); + log.info(" - AnalUid: {}", info.getAnalUid()); + log.info(" - ResultUid: {}", info.getResultUid()); + log.info(" - 실행 시간: {} ms ({} 초)", duration, duration / 1000); + log.info(" - 최종 상태: {}", status); + processedCount++; + } else { + // Child Job 실패 + log.error("[Child Job 실패] ✗ 비정상 종료"); + log.error(" - AnalUid: {}", info.getAnalUid()); + log.error(" - ResultUid: {}", info.getResultUid()); + log.error(" - 실행 시간: {} ms ({} 초)", duration, duration / 1000); + log.error(" - 최종 상태: {}", status); + log.error(" - Exit 상태: {}", jobExecution.getExitStatus()); - processedCount++; + // 실패 예외 정보 로깅 + if (!jobExecution.getAllFailureExceptions().isEmpty()) { + log.error(" - 실패 예외:"); + for (Throwable t : jobExecution.getAllFailureExceptions()) { + log.error(" * {}: {}", t.getClass().getSimpleName(), t.getMessage()); + } + } + + failedCount++; + // 실패해도 다음 회차 계속 처리 + log.info("[계속 진행] 다음 회차 처리를 계속합니다."); + continue; // 다음 for 루프로 + } } catch (Exception e) { // Child Job 실행 실패 시 (Step 실패 또는 예외 발생) diff --git a/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/config/AsyncJobLauncherConfig.java b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/config/AsyncJobLauncherConfig.java new file mode 100644 index 0000000..8a46e45 --- /dev/null +++ b/kamco-make-dataset-generation/src/main/java/com/kamco/cd/geojsonscheduler/config/AsyncJobLauncherConfig.java @@ -0,0 +1,44 @@ +package com.kamco.cd.geojsonscheduler.config; + +import org.springframework.batch.core.launch.JobLauncher; +import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.SimpleAsyncTaskExecutor; + +/** + * 비동기 JobLauncher 설정 + * + *

Parent Job 내에서 Child Job을 실행할 때 트랜잭션 충돌을 방지하기 위해 비동기 JobLauncher를 생성합니다. + * + *

문제: Parent Job의 Step이 트랜잭션 내에서 실행되는데, 그 안에서 동기 JobLauncher로 Child Job을 + * 실행하면 "Existing transaction detected in JobRepository" 에러 발생 + * + *

해결: 비동기 TaskExecutor를 사용하는 별도의 JobLauncher를 생성하여 트랜잭션 분리 + * + * @author KAMCO Development Team + * @since 1.0.0 + */ +@Configuration +public class AsyncJobLauncherConfig { + + /** + * 비동기 JobLauncher 생성 + * + *

SimpleAsyncTaskExecutor를 사용하여 Child Job을 별도 쓰레드에서 실행합니다. 이렇게 하면 Parent Job의 + * 트랜잭션과 분리되어 트랜잭션 충돌이 발생하지 않습니다. + * + * @param jobRepository JobRepository + * @return 비동기 JobLauncher + * @throws Exception JobLauncher 초기화 실패 시 + */ + @Bean(name = "asyncJobLauncher") + public JobLauncher asyncJobLauncher(JobRepository jobRepository) throws Exception { + TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher(); + jobLauncher.setJobRepository(jobRepository); + jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // 비동기 실행 + jobLauncher.afterPropertiesSet(); + return jobLauncher; + } +}