diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/config/AsyncConfig.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/config/AsyncConfig.java new file mode 100644 index 00000000..77b99a93 --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/config/AsyncConfig.java @@ -0,0 +1,23 @@ +package com.kamco.cd.kamcoback.scheduler.config; + +import java.util.concurrent.Executor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class AsyncConfig { + + @Bean(name = "shpExecutor") + public Executor shpExecutor() { + ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor(); + ex.setCorePoolSize(2); + ex.setMaxPoolSize(4); + ex.setQueueCapacity(50); + ex.setThreadNamePrefix("shp-"); + ex.initialize(); + return ex; + } +} diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/config/ShpKeyLock.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/config/ShpKeyLock.java new file mode 100644 index 00000000..11958fea --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/config/ShpKeyLock.java @@ -0,0 +1,27 @@ +package com.kamco.cd.kamcoback.scheduler.config; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import org.springframework.stereotype.Component; + +@Component +public class ShpKeyLock { + + private final ConcurrentHashMap locks = new ConcurrentHashMap<>(); + + public boolean tryLock(String key) { + ReentrantLock lock = locks.computeIfAbsent(key, k -> new ReentrantLock()); + return lock.tryLock(); + } + + public void unlock(String key) { + ReentrantLock lock = locks.get(key); + if (lock != null && lock.isHeldByCurrentThread()) { + lock.unlock(); + // 메모리 누수 방지(락이 비어있으면 제거) + if (!lock.hasQueuedThreads()) { + locks.remove(key, lock); + } + } + } +} diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/service/MapSheetInferenceJobService.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/MapSheetInferenceJobService.java index 3ac33839..4012a4a7 100644 --- a/src/main/java/com/kamco/cd/kamcoback/scheduler/service/MapSheetInferenceJobService.java +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/MapSheetInferenceJobService.java @@ -37,6 +37,8 @@ import org.springframework.stereotype.Service; public class MapSheetInferenceJobService { private final InferenceResultCoreService inferenceResultCoreService; + private final ShpPipelineService shpPipelineService; + private final ExternalHttpClient externalHttpClient; private final ObjectMapper objectMapper; private final ExternalJarRunner externalJarRunner; @@ -240,15 +242,8 @@ public class MapSheetInferenceJobService { String mapIds = sb.toString(); String batchId = sheet.getM1BatchId() + "," + sheet.getM2BatchId() + "," + sheet.getM3BatchId(); - // uid 기준 도엽별 shp, geojson 파일 생성 - externalJarRunner.run(jarPath, batchId, inferenceId, mapIds); - - // uid 기준 merge shp, geojson 파일 생성 - externalJarRunner.run(jarPath, batchId, inferenceId, ""); - - // uid 기준 도엽별 shp 파일 geoserver 등록 - String register = datasetDir + "/" + inferenceId + "/" + "merge" + "/" + inferenceId + ".shp"; - externalJarRunner.run(jarPath, register, inferenceId); + // shp 파일 비동기 생성 + shpPipelineService.runPipeline(jarPath, datasetDir, batchId, inferenceId, mapIds); } /** diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/service/ShpPipelineService.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/ShpPipelineService.java new file mode 100644 index 00000000..b212a230 --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/ShpPipelineService.java @@ -0,0 +1,48 @@ +package com.kamco.cd.kamcoback.scheduler.service; + +import com.kamco.cd.kamcoback.common.service.ExternalJarRunner; +import com.kamco.cd.kamcoback.scheduler.config.ShpKeyLock; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +@Log4j2 +@Service +@RequiredArgsConstructor +public class ShpPipelineService { + + private final ExternalJarRunner externalJarRunner; + private final ShpKeyLock shpKeyLock; + + @Async("shpExecutor") + public void runPipeline( + String jarPath, String datasetDir, String batchId, String inferenceId, String mapIds) { + + // inferenceId 기준 동시 실행 제한 + if (!shpKeyLock.tryLock(inferenceId)) { + log.warn("SHP pipeline already running. inferenceId={}", inferenceId); + return; + } + + try { + // 1 uid 기준 도엽별 shp, geojson 파일 생성 + externalJarRunner.run(jarPath, batchId, inferenceId, mapIds); + + // 2 uid 기준 merge shp, geojson 파일 생성 + externalJarRunner.run(jarPath, batchId, inferenceId, ""); + + // 3 uid 기준 shp 파일 geoserver 등록 + String register = datasetDir + "/" + inferenceId + "/merge/" + inferenceId + ".shp"; + externalJarRunner.run(jarPath, register, inferenceId); + + log.info("SHP pipeline finished. inferenceId={}", inferenceId); + + } catch (Exception e) { + log.error("SHP pipeline failed. inferenceId={}", inferenceId, e); + // TODO 필요하면 실패 상태 업데이트 로직 추가 + } finally { + shpKeyLock.unlock(inferenceId); + } + } +}