From 6d423534f19ff2dd69acbdd06d130e69ccf8ac4d Mon Sep 17 00:00:00 2001 From: teddy Date: Tue, 20 Jan 2026 17:12:46 +0900 Subject: [PATCH] =?UTF-8?q?=EC=B6=94=EB=A1=A0=20=EC=99=84=EB=A3=8C=20shp?= =?UTF-8?q?=ED=8C=8C=EC=9D=BC=20=EC=83=9D=EC=84=B1=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scheduler/config/AsyncConfig.java | 23 +++++++++ .../scheduler/config/ShpKeyLock.java | 27 +++++++++++ .../service/MapSheetInferenceJobService.java | 13 ++--- .../scheduler/service/ShpPipelineService.java | 48 +++++++++++++++++++ 4 files changed, 102 insertions(+), 9 deletions(-) create mode 100644 src/main/java/com/kamco/cd/kamcoback/scheduler/config/AsyncConfig.java create mode 100644 src/main/java/com/kamco/cd/kamcoback/scheduler/config/ShpKeyLock.java create mode 100644 src/main/java/com/kamco/cd/kamcoback/scheduler/service/ShpPipelineService.java diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/config/AsyncConfig.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/config/AsyncConfig.java new file mode 100644 index 00000000..77b99a93 --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/config/AsyncConfig.java @@ -0,0 +1,23 @@ +package com.kamco.cd.kamcoback.scheduler.config; + +import java.util.concurrent.Executor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class AsyncConfig { + + @Bean(name = "shpExecutor") + public Executor shpExecutor() { + ThreadPoolTaskExecutor ex = new ThreadPoolTaskExecutor(); + ex.setCorePoolSize(2); + ex.setMaxPoolSize(4); + ex.setQueueCapacity(50); + ex.setThreadNamePrefix("shp-"); + ex.initialize(); + return ex; + } +} diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/config/ShpKeyLock.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/config/ShpKeyLock.java new file mode 100644 index 00000000..11958fea --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/config/ShpKeyLock.java @@ -0,0 +1,27 @@ +package com.kamco.cd.kamcoback.scheduler.config; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import org.springframework.stereotype.Component; + +@Component +public class ShpKeyLock { + + private final ConcurrentHashMap locks = new ConcurrentHashMap<>(); + + public boolean tryLock(String key) { + ReentrantLock lock = locks.computeIfAbsent(key, k -> new ReentrantLock()); + return lock.tryLock(); + } + + public void unlock(String key) { + ReentrantLock lock = locks.get(key); + if (lock != null && lock.isHeldByCurrentThread()) { + lock.unlock(); + // 메모리 누수 방지(락이 비어있으면 제거) + if (!lock.hasQueuedThreads()) { + locks.remove(key, lock); + } + } + } +} diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/service/MapSheetInferenceJobService.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/MapSheetInferenceJobService.java index 3ac33839..4012a4a7 100644 --- a/src/main/java/com/kamco/cd/kamcoback/scheduler/service/MapSheetInferenceJobService.java +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/MapSheetInferenceJobService.java @@ -37,6 +37,8 @@ import org.springframework.stereotype.Service; public class MapSheetInferenceJobService { private final InferenceResultCoreService inferenceResultCoreService; + private final ShpPipelineService shpPipelineService; + private final ExternalHttpClient externalHttpClient; private final ObjectMapper objectMapper; private final ExternalJarRunner externalJarRunner; @@ -240,15 +242,8 @@ public class MapSheetInferenceJobService { String mapIds = sb.toString(); String batchId = sheet.getM1BatchId() + "," + sheet.getM2BatchId() + "," + sheet.getM3BatchId(); - // uid 기준 도엽별 shp, geojson 파일 생성 - externalJarRunner.run(jarPath, batchId, inferenceId, mapIds); - - // uid 기준 merge shp, geojson 파일 생성 - externalJarRunner.run(jarPath, batchId, inferenceId, ""); - - // uid 기준 도엽별 shp 파일 geoserver 등록 - String register = datasetDir + "/" + inferenceId + "/" + "merge" + "/" + inferenceId + ".shp"; - externalJarRunner.run(jarPath, register, inferenceId); + // shp 파일 비동기 생성 + shpPipelineService.runPipeline(jarPath, datasetDir, batchId, inferenceId, mapIds); } /** diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/service/ShpPipelineService.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/ShpPipelineService.java new file mode 100644 index 00000000..b212a230 --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/ShpPipelineService.java @@ -0,0 +1,48 @@ +package com.kamco.cd.kamcoback.scheduler.service; + +import com.kamco.cd.kamcoback.common.service.ExternalJarRunner; +import com.kamco.cd.kamcoback.scheduler.config.ShpKeyLock; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Service; + +@Log4j2 +@Service +@RequiredArgsConstructor +public class ShpPipelineService { + + private final ExternalJarRunner externalJarRunner; + private final ShpKeyLock shpKeyLock; + + @Async("shpExecutor") + public void runPipeline( + String jarPath, String datasetDir, String batchId, String inferenceId, String mapIds) { + + // inferenceId 기준 동시 실행 제한 + if (!shpKeyLock.tryLock(inferenceId)) { + log.warn("SHP pipeline already running. inferenceId={}", inferenceId); + return; + } + + try { + // 1 uid 기준 도엽별 shp, geojson 파일 생성 + externalJarRunner.run(jarPath, batchId, inferenceId, mapIds); + + // 2 uid 기준 merge shp, geojson 파일 생성 + externalJarRunner.run(jarPath, batchId, inferenceId, ""); + + // 3 uid 기준 shp 파일 geoserver 등록 + String register = datasetDir + "/" + inferenceId + "/merge/" + inferenceId + ".shp"; + externalJarRunner.run(jarPath, register, inferenceId); + + log.info("SHP pipeline finished. inferenceId={}", inferenceId); + + } catch (Exception e) { + log.error("SHP pipeline failed. inferenceId={}", inferenceId, e); + // TODO 필요하면 실패 상태 업데이트 로직 추가 + } finally { + shpKeyLock.unlock(inferenceId); + } + } +}