Split the function
This commit is contained in:
Binary file not shown.
@@ -0,0 +1,152 @@
|
||||
package com.kamco.cd.geojsonscheduler.batch;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.SerializationFeature;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalCntInfo;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.AnalMapSheetList;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.CompleteLabelData.GeoJsonFeature;
|
||||
import com.kamco.cd.geojsonscheduler.dto.TrainingDataReviewJobDto.FeatureCollection;
|
||||
import com.kamco.cd.geojsonscheduler.repository.TrainingDataReviewJobRepository;
|
||||
import com.kamco.cd.geojsonscheduler.service.DockerRunnerService;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
import org.springframework.batch.core.scope.context.ChunkContext;
|
||||
import org.springframework.batch.core.step.tasklet.Tasklet;
|
||||
import org.springframework.batch.repeat.RepeatStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Log4j2
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class ExportGeoJsonTasklet implements Tasklet {
|
||||
|
||||
private final TrainingDataReviewJobRepository repository;
|
||||
private final DockerRunnerService dockerRunnerService;
|
||||
|
||||
@Value("${training-data.geojson-dir}")
|
||||
private String trainingDataDir;
|
||||
|
||||
@Override
|
||||
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
|
||||
log.info("========================================");
|
||||
log.info("배치 작업 시작");
|
||||
log.info("========================================");
|
||||
|
||||
// 1. StepContext를 통해 바로 가져오기 (가장 추천)
|
||||
String jobName = chunkContext.getStepContext().getJobName();
|
||||
log.info("Job Name: {}", jobName);
|
||||
|
||||
// 진행중인 회차 중, complete_cnt 가 존재하는 회차 목록 가져오기
|
||||
log.info("진행중인 회차 목록 조회 중...");
|
||||
List<AnalCntInfo> analList = repository.findAnalCntInfoList();
|
||||
log.info("진행중인 회차 수: {}", analList.size());
|
||||
|
||||
int processedAnalCount = 0;
|
||||
for (AnalCntInfo info : analList) {
|
||||
log.info("----------------------------------------");
|
||||
log.info("회차 처리 중: AnalUid={}, ResultUid={}", info.getAnalUid(), info.getResultUid());
|
||||
log.info("전체 건수: {}, 파일 건수: {}", info.getAllCnt(), info.getFileCnt());
|
||||
|
||||
if (Objects.equals(info.getAllCnt(), info.getFileCnt())) {
|
||||
log.info("모든 파일이 이미 처리됨. 건너뜀.");
|
||||
continue;
|
||||
}
|
||||
|
||||
//추론 ID
|
||||
String resultUid = info.getResultUid();
|
||||
log.info("ResultUid: {}", resultUid);
|
||||
|
||||
//insert 하기 jobname, resultUid , 시작시간
|
||||
// 어제까지 검수 완료된 총 데이터의 도엽별 목록 가져오기
|
||||
log.info("검수 완료된 도엽 목록 조회 중... (AnalUid={})", info.getAnalUid());
|
||||
List<AnalMapSheetList> analMapList = repository.findCompletedAnalMapSheetList(info.getAnalUid());
|
||||
log.info("검수 완료된 도엽 수: {}", analMapList.size());
|
||||
|
||||
//TODO 도엽이 4개이상 존재할때 만 RUN 하기
|
||||
if (analMapList.isEmpty()) {
|
||||
log.warn("검수 완료된 도엽이 없음. 건너뜀.");
|
||||
continue;
|
||||
}
|
||||
|
||||
//insert 하기 jobname, resultUid , 시작시간
|
||||
boolean anyProcessed = false;
|
||||
int processedMapSheetCount = 0;
|
||||
int totalGeoJsonFiles = 0;
|
||||
|
||||
for (AnalMapSheetList mapSheet : analMapList) {
|
||||
log.info(" 도엽 처리 중: MapSheetNum={}", mapSheet.getMapSheetNum());
|
||||
|
||||
//도엽별 geom 데이터 가지고 와서 geojson 만들기
|
||||
List<CompleteLabelData> completeList =
|
||||
repository.findCompletedYesterdayLabelingList(
|
||||
info.getAnalUid(), mapSheet.getMapSheetNum());
|
||||
log.info(" 완료된 라벨링 데이터 수: {}", completeList.size());
|
||||
|
||||
if (!completeList.isEmpty()) {
|
||||
List<Long> geoUids = completeList.stream().map(CompleteLabelData::getGeoUid).toList();
|
||||
log.info(" GeoUID 목록 생성 완료: {} 건", geoUids.size());
|
||||
|
||||
List<GeoJsonFeature> features = completeList.stream().map(GeoJsonFeature::from).toList();
|
||||
log.info(" GeoJSON Feature 변환 완료: {} 개", features.size());
|
||||
|
||||
FeatureCollection collection = new FeatureCollection(features);
|
||||
String filename = mapSheet.buildFilename(resultUid);
|
||||
log.info(" GeoJSON 파일명: {}", filename);
|
||||
|
||||
// 형식 /kamco-nfs/dataset/request/uuid/filename
|
||||
Path outputPath = Paths.get(trainingDataDir + File.separator + "request" + File.separator + resultUid, filename);
|
||||
log.info(" 출력 경로: {}", outputPath);
|
||||
|
||||
try {
|
||||
Files.createDirectories(outputPath.getParent());
|
||||
log.info(" 디렉토리 생성 완료: {}", outputPath.getParent());
|
||||
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
|
||||
objectMapper.writeValue(outputPath.toFile(), collection);
|
||||
log.info(" GeoJSON 파일 저장 완료: {}", outputPath);
|
||||
|
||||
repository.updateLearnDataGeomFileCreateYn(geoUids);
|
||||
log.info(" DB 업데이트 완료: {} 건", geoUids.size());
|
||||
|
||||
anyProcessed = true;
|
||||
processedMapSheetCount++;
|
||||
totalGeoJsonFiles++;
|
||||
} catch (IOException e) {
|
||||
log.error(" GeoJSON 파일 생성 실패: {}", e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.info("회차 처리 완료: ResultUid={}", resultUid);
|
||||
log.info(" 처리된 도엽 수: {}", processedMapSheetCount);
|
||||
log.info(" 생성된 GeoJSON 파일 수: {}", totalGeoJsonFiles);
|
||||
|
||||
if (anyProcessed) {
|
||||
log.info("Docker 컨테이너 실행 중... (ResultUid={})", resultUid);
|
||||
dockerRunnerService.run(resultUid);
|
||||
log.info("Docker 컨테이너 실행 완료 (ResultUid={})", resultUid);
|
||||
processedAnalCount++;
|
||||
} else {
|
||||
log.warn("처리된 도엽이 없어 Docker 실행 건너뜀 (ResultUid={})", resultUid);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("========================================");
|
||||
log.info("배치 작업 완료");
|
||||
log.info("처리된 회차 수: {}", processedAnalCount);
|
||||
log.info("========================================");
|
||||
|
||||
return RepeatStatus.FINISHED;
|
||||
}
|
||||
}
|
||||
@@ -6,7 +6,9 @@ import java.util.List;
|
||||
import java.util.Objects;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.batch.core.BatchStatus;
|
||||
import org.springframework.batch.core.Job;
|
||||
import org.springframework.batch.core.JobExecution;
|
||||
import org.springframework.batch.core.JobParameters;
|
||||
import org.springframework.batch.core.JobParametersBuilder;
|
||||
import org.springframework.batch.core.StepContribution;
|
||||
@@ -59,8 +61,9 @@ public class LaunchChildJobsTasklet implements Tasklet {
|
||||
/** 분석 회차 정보 조회를 위한 Repository */
|
||||
private final TrainingDataReviewJobRepository repository;
|
||||
|
||||
/** Child Job을 실행하기 위한 JobLauncher */
|
||||
private final JobLauncher jobLauncher;
|
||||
/** Child Job을 실행하기 위한 비동기 JobLauncher (트랜잭션 충돌 방지) */
|
||||
@Qualifier("asyncJobLauncher")
|
||||
private final JobLauncher asyncJobLauncher;
|
||||
|
||||
/** 실행할 Child Job (processAnalCntInfoJob) */
|
||||
@Qualifier("processAnalCntInfoJob")
|
||||
@@ -141,18 +144,56 @@ public class LaunchChildJobsTasklet implements Tasklet {
|
||||
log.info(" - JobParameters: analUid={}, resultUid={}", info.getAnalUid(),
|
||||
info.getResultUid());
|
||||
|
||||
// Child Job 실행 (동기 방식)
|
||||
// Child Job 실행 (비동기 방식 - 트랜잭션 충돌 방지)
|
||||
// asyncJobLauncher를 사용하여 별도 쓰레드에서 실행
|
||||
// 내부적으로 makeGeoJsonStep → dockerRunStep → zipResponseStep 순차 실행
|
||||
long startTime = System.currentTimeMillis();
|
||||
jobLauncher.run(processAnalCntInfoJob, jobParameters);
|
||||
JobExecution jobExecution = asyncJobLauncher.run(processAnalCntInfoJob, jobParameters);
|
||||
|
||||
// Child Job 완료 대기 (비동기 실행이므로 완료를 폴링)
|
||||
log.info("[Child Job 대기] 실행 완료 대기 중... (JobExecutionId={})",
|
||||
jobExecution.getId());
|
||||
while (jobExecution.isRunning()) {
|
||||
try {
|
||||
Thread.sleep(1000); // 1초마다 상태 확인
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new RuntimeException("Child Job 대기 중 인터럽트 발생", e);
|
||||
}
|
||||
}
|
||||
|
||||
long duration = System.currentTimeMillis() - startTime;
|
||||
BatchStatus status = jobExecution.getStatus();
|
||||
|
||||
log.info("[Child Job 완료] ✓ 정상 종료");
|
||||
log.info(" - AnalUid: {}", info.getAnalUid());
|
||||
log.info(" - ResultUid: {}", info.getResultUid());
|
||||
log.info(" - 실행 시간: {} ms ({} 초)", duration, duration / 1000);
|
||||
if (status == BatchStatus.COMPLETED) {
|
||||
log.info("[Child Job 완료] ✓ 정상 종료");
|
||||
log.info(" - AnalUid: {}", info.getAnalUid());
|
||||
log.info(" - ResultUid: {}", info.getResultUid());
|
||||
log.info(" - 실행 시간: {} ms ({} 초)", duration, duration / 1000);
|
||||
log.info(" - 최종 상태: {}", status);
|
||||
processedCount++;
|
||||
} else {
|
||||
// Child Job 실패
|
||||
log.error("[Child Job 실패] ✗ 비정상 종료");
|
||||
log.error(" - AnalUid: {}", info.getAnalUid());
|
||||
log.error(" - ResultUid: {}", info.getResultUid());
|
||||
log.error(" - 실행 시간: {} ms ({} 초)", duration, duration / 1000);
|
||||
log.error(" - 최종 상태: {}", status);
|
||||
log.error(" - Exit 상태: {}", jobExecution.getExitStatus());
|
||||
|
||||
processedCount++;
|
||||
// 실패 예외 정보 로깅
|
||||
if (!jobExecution.getAllFailureExceptions().isEmpty()) {
|
||||
log.error(" - 실패 예외:");
|
||||
for (Throwable t : jobExecution.getAllFailureExceptions()) {
|
||||
log.error(" * {}: {}", t.getClass().getSimpleName(), t.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
failedCount++;
|
||||
// 실패해도 다음 회차 계속 처리
|
||||
log.info("[계속 진행] 다음 회차 처리를 계속합니다.");
|
||||
continue; // 다음 for 루프로
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
// Child Job 실행 실패 시 (Step 실패 또는 예외 발생)
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
package com.kamco.cd.geojsonscheduler.config;
|
||||
|
||||
import org.springframework.batch.core.launch.JobLauncher;
|
||||
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
|
||||
import org.springframework.batch.core.repository.JobRepository;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.task.SimpleAsyncTaskExecutor;
|
||||
|
||||
/**
|
||||
* 비동기 JobLauncher 설정
|
||||
*
|
||||
* <p>Parent Job 내에서 Child Job을 실행할 때 트랜잭션 충돌을 방지하기 위해 비동기 JobLauncher를 생성합니다.
|
||||
*
|
||||
* <p><b>문제:</b> Parent Job의 Step이 트랜잭션 내에서 실행되는데, 그 안에서 동기 JobLauncher로 Child Job을
|
||||
* 실행하면 "Existing transaction detected in JobRepository" 에러 발생
|
||||
*
|
||||
* <p><b>해결:</b> 비동기 TaskExecutor를 사용하는 별도의 JobLauncher를 생성하여 트랜잭션 분리
|
||||
*
|
||||
* @author KAMCO Development Team
|
||||
* @since 1.0.0
|
||||
*/
|
||||
@Configuration
|
||||
public class AsyncJobLauncherConfig {
|
||||
|
||||
/**
|
||||
* 비동기 JobLauncher 생성
|
||||
*
|
||||
* <p>SimpleAsyncTaskExecutor를 사용하여 Child Job을 별도 쓰레드에서 실행합니다. 이렇게 하면 Parent Job의
|
||||
* 트랜잭션과 분리되어 트랜잭션 충돌이 발생하지 않습니다.
|
||||
*
|
||||
* @param jobRepository JobRepository
|
||||
* @return 비동기 JobLauncher
|
||||
* @throws Exception JobLauncher 초기화 실패 시
|
||||
*/
|
||||
@Bean(name = "asyncJobLauncher")
|
||||
public JobLauncher asyncJobLauncher(JobRepository jobRepository) throws Exception {
|
||||
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
|
||||
jobLauncher.setJobRepository(jobRepository);
|
||||
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor()); // 비동기 실행
|
||||
jobLauncher.afterPropertiesSet();
|
||||
return jobLauncher;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user