From 7ca37bf1e47556f41e6c709bba283938d7af2f17 Mon Sep 17 00:00:00 2001 From: teddy Date: Fri, 27 Feb 2026 22:51:27 +0900 Subject: [PATCH 1/9] =?UTF-8?q?=ED=95=98=EB=93=9C=EB=A7=81=ED=81=AC=20?= =?UTF-8?q?=EB=A1=9C=EA=B7=B8=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cd/training/train/TrainApiController.java | 2 +- .../train/service/DataSetCountersService.java | 62 +++++++++++++++++++ .../train/service/TmpDatasetService.java | 5 +- 3 files changed, 66 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/kamco/cd/training/train/TrainApiController.java b/src/main/java/com/kamco/cd/training/train/TrainApiController.java index 941278b..1c5f9d4 100644 --- a/src/main/java/com/kamco/cd/training/train/TrainApiController.java +++ b/src/main/java/com/kamco/cd/training/train/TrainApiController.java @@ -185,7 +185,7 @@ public class TrainApiController { }) @PostMapping("/create-tmp/{uuid}") public ApiResponseDto createTmpFile( - @Parameter(description = "uuid", example = "80a0e544-36ed-4999-b705-97427f23337d") + @Parameter(description = "model uuid", example = "80a0e544-36ed-4999-b705-97427f23337d") @PathVariable UUID uuid) { diff --git a/src/main/java/com/kamco/cd/training/train/service/DataSetCountersService.java b/src/main/java/com/kamco/cd/training/train/service/DataSetCountersService.java index 82eeaaa..20f5766 100644 --- a/src/main/java/com/kamco/cd/training/train/service/DataSetCountersService.java +++ b/src/main/java/com/kamco/cd/training/train/service/DataSetCountersService.java @@ -6,13 +6,17 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +/** 학습실행 파일 하드링크 */ @Service @Log4j2 @RequiredArgsConstructor @@ -42,6 +46,10 @@ public class DataSetCountersService { // tmp Path tmpPath = Path.of(trainBaseDir, "tmp", basic.getRequestPath()); + + // 차이나는거 + diffMergedRequestsVsTmp(uids, tmpPath); + DatasetCounters counters2 = countTmpAfterBuild(tmpPath); allLogs .append(counters2.prints(basic.getRequestPath(), "TMP")) @@ -163,4 +171,58 @@ public class DataSetCountersService { test + test2); } } + + private Set listTifRelative(Path root) throws IOException { + if (!Files.isDirectory(root)) return Set.of(); + + try (var stream = Files.walk(root)) { + return stream + .filter(Files::isRegularFile) + .filter(p -> p.getFileName().toString().toLowerCase().endsWith(".tif")) + .map(p -> root.relativize(p).toString().replace("\\", "/")) + .collect(Collectors.toSet()); + } + } + + private Set listTifFileNameOnly(Path root) throws IOException { + if (!Files.isDirectory(root)) return Set.of(); + + try (var stream = Files.walk(root)) { + return stream + .filter(Files::isRegularFile) + .filter(p -> p.getFileName().toString().toLowerCase().endsWith(".tif")) + .map(p -> p.getFileName().toString()) // 파일명만 + .collect(Collectors.toSet()); + } + } + + public void diffMergedRequestsVsTmp(List uids, Path tmpRoot) throws IOException { + + // 1) 요청 uids 전체를 합친 tif "파일명" 집합 + Set reqAll = new HashSet<>(); + for (String uid : uids) { + Path reqRoot = Path.of(requestDir, uid); + + // ★합본 tmp는 보통 폴더 구조가 바뀌므로 "상대경로" 비교보다 파일명 비교가 먼저 유용합니다. + reqAll.addAll(listTifFileNameOnly(reqRoot)); + } + + // 2) tmp tif 파일명 집합 + Set tmpAll = listTifFileNameOnly(tmpRoot); + + Set missing = new HashSet<>(reqAll); + missing.removeAll(tmpAll); + + Set extra = new HashSet<>(tmpAll); + extra.removeAll(reqAll); + + log.info("==== MERGED DIFF (filename-based) ===="); + log.info("request(all uids) tif = {}", reqAll.size()); + log.info("tmp tif = {}", tmpAll.size()); + log.info("missing = {}", missing.size()); + log.info("extra = {}", extra.size()); + + missing.stream().sorted().limit(50).forEach(f -> log.warn("[MISSING] {}", f)); + extra.stream().sorted().limit(50).forEach(f -> log.warn("[EXTRA] {}", f)); + } } diff --git a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java index cecceb1..183b87d 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java @@ -88,6 +88,7 @@ public class TmpDatasetService { // 충돌 시 덮어쓰기 if (Files.exists(dst)) { + log.warn("COLLISION overwrite: dst={} src={}", dst, src); Files.delete(dst); } @@ -171,8 +172,8 @@ public class TmpDatasetService { scannedDirs++; log.info("SCAN dir={}", srcDir); - try (DirectoryStream stream = Files.newDirectoryStream(srcDir)) { - for (Path f : stream) { + try (var stream = Files.walk(srcDir)) { + for (Path f : stream.filter(Files::isRegularFile).toList()) { if (!Files.isRegularFile(f)) { log.debug("skip non-regular file: {}", f); continue; From aa3af4e9d0516a40306afa8f6c319a1b227915a6 Mon Sep 17 00:00:00 2001 From: teddy Date: Fri, 27 Feb 2026 23:12:00 +0900 Subject: [PATCH 2/9] =?UTF-8?q?=ED=95=98=EB=93=9C=EB=A7=81=ED=81=AC=20?= =?UTF-8?q?=EB=A1=9C=EA=B7=B8=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kamco/cd/training/train/service/TmpDatasetService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java index 183b87d..dbbbd45 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java @@ -181,10 +181,10 @@ public class TmpDatasetService { regularFiles++; - String dstName = id + "__" + f.getFileName(); + String dstName = f.getFileName().toString(); Path dst = tmp.resolve(type).resolve(part).resolve(dstName); - // dst가 남아있으면 삭제(심볼릭링크든 파일이든) + // dst가 남아있으면 삭제(심볼릭링크든 파일이든) 하고 다시만듬 if (Files.exists(dst) || Files.isSymbolicLink(dst)) { Files.delete(dst); log.debug("deleted existing: {}", dst); From 12f6bb715413705c53ca9a867091313b8d6aac86 Mon Sep 17 00:00:00 2001 From: teddy Date: Fri, 27 Feb 2026 23:31:04 +0900 Subject: [PATCH 3/9] =?UTF-8?q?=ED=95=98=EB=93=9C=EB=A7=81=ED=81=AC=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../train/service/TmpDatasetService.java | 242 +++++------------- .../train/service/TrainJobService.java | 6 +- 2 files changed, 68 insertions(+), 180 deletions(-) diff --git a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java index dbbbd45..0b7547b 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java @@ -21,186 +21,68 @@ public class TmpDatasetService { private String trainBaseDir; /** - * train, val, test 폴더별로 link - * - * @param uid 임시폴더 uuid - * @param type train, val, test - * @param links tif pull path - * @throws IOException - */ - public void buildTmpDatasetHardlink(String uid, String type, List links) - throws IOException { - - if (links == null || links.isEmpty()) { - throw new IOException("links is empty"); - } - - Path tmp = Path.of(trainBaseDir, "tmp", uid); - - long hardlinksMade = 0; - - for (ModelTrainLinkDto dto : links) { - - if (type == null) { - log.warn("SKIP - trainType null: {}", dto); - continue; - } - - // type별 디렉토리 생성 - Files.createDirectories(tmp.resolve(type).resolve("input1")); - Files.createDirectories(tmp.resolve(type).resolve("input2")); - Files.createDirectories(tmp.resolve(type).resolve("label")); - Files.createDirectories(tmp.resolve(type).resolve("label-json")); - - // comparePath → input1 - hardlinksMade += link(tmp, type, "input1", dto.getComparePath()); - - // targetPath → input2 - hardlinksMade += link(tmp, type, "input2", dto.getTargetPath()); - - // labelPath → label - hardlinksMade += link(tmp, type, "label", dto.getLabelPath()); - - // geoJsonPath -> label-json - hardlinksMade += link(tmp, type, "label-json", dto.getGeoJsonPath()); - } - - if (hardlinksMade == 0) { - throw new IOException("No hardlinks created."); - } - - log.info("tmp dataset created: {}, hardlinksMade={}", tmp, hardlinksMade); - } - - private long link(Path tmp, String type, String part, String fullPath) throws IOException { - - if (fullPath == null || fullPath.isBlank()) return 0; - - Path src = Path.of(fullPath); - - if (!Files.isRegularFile(src)) { - log.warn("SKIP (not file): {}", src); - return 0; - } - - String fileName = src.getFileName().toString(); - Path dst = tmp.resolve(type).resolve(part).resolve(fileName); - - // 충돌 시 덮어쓰기 - if (Files.exists(dst)) { - log.warn("COLLISION overwrite: dst={} src={}", dst, src); - Files.delete(dst); - } - - Files.createLink(dst, src); - - return 1; - } - - private String safe(String s) { - return (s == null || s.isBlank()) ? null : s.trim(); - } - - /** - * request 전체 폴더 link + * 다른 데이터셋 파일과 이름이 겹치면 그 파일은 skip함 * * @param uid - * @param datasetUids + * @param type + * @param links * @return * @throws IOException */ - public String buildTmpDatasetSymlink(String uid, List datasetUids) throws IOException { + public String buildTmpDatasetSymlink(String uid, String type, List links) + throws IOException { - log.info("========== buildTmpDatasetHardlink START =========="); - log.info("uid={}", uid); - log.info("datasetUids={}", datasetUids); - log.info("requestDir(raw)={}", requestDir); + if (uid == null || uid.isBlank()) throw new IOException("uid is empty"); + if (type == null || type.isBlank()) throw new IOException("type is empty"); + if (links == null || links.isEmpty()) throw new IOException("links is empty"); + + log.info("========== buildTmpDatasetHardlink MERGE START =========="); + log.info("uid={}, type={}, links.size={}", uid, type, links.size()); Path BASE = toPath(requestDir); Path tmp = Path.of(trainBaseDir, "tmp", uid); - log.info("BASE={}", BASE); - log.info("BASE exists? {}", Files.isDirectory(BASE)); - log.info("tmp={}", tmp); + long hardlinksMade = 0; + long skippedCollision = 0; + long noDir = 0; - long noDir = 0, scannedDirs = 0, regularFiles = 0, hardlinksMade = 0; + // tmp// 준비 + for (String part : List.of("input1", "input2", "label", "label-json")) { + Files.createDirectories(tmp.resolve(type).resolve(part)); + } + + for (ModelTrainLinkDto dto : links) { + String datasetUid = safe(dto.getDatasetUid()); + if (datasetUid == null) { + log.warn("SKIP dto (datasetUid null): {}", dto); + continue; + } + + Path srcRoot = BASE.resolve(datasetUid); - // tmp 디렉토리 준비 - for (String type : List.of("train", "val", "test")) { for (String part : List.of("input1", "input2", "label", "label-json")) { - Path dir = tmp.resolve(type).resolve(part); - Files.createDirectories(dir); - log.info("createDirectories: {}", dir); - } - } - // 하드링크는 "같은 파일시스템"에서만 가능하므로 BASE/tmp가 같은 FS인지 미리 확인(권장) - try { - var baseStore = Files.getFileStore(BASE); - var tmpStore = Files.getFileStore(tmp.getParent()); // BASE/tmp - if (!baseStore.name().equals(tmpStore.name()) || !baseStore.type().equals(tmpStore.type())) { - throw new IOException( - "Hardlink requires same filesystem. baseStore=" - + baseStore.name() - + "(" - + baseStore.type() - + "), tmpStore=" - + tmpStore.name() - + "(" - + tmpStore.type() - + ")"); - } - } catch (Exception e) { - // FileStore 비교가 환경마다 애매할 수 있어서, 여기서는 경고만 주고 실제 createLink에서 최종 판단하게 둘 수도 있음. - log.warn("FileStore check skipped/failed (will rely on createLink): {}", e.toString()); - } + Path srcDir = srcRoot.resolve(type).resolve(part); + if (!Files.isDirectory(srcDir)) { + noDir++; + continue; + } - for (String id : datasetUids) { - Path srcRoot = BASE.resolve(id); - log.info("---- dataset id={} srcRoot={} exists? {}", id, srcRoot, Files.isDirectory(srcRoot)); + // ✅ 하위폴더까지 전부 + try (var walk = Files.walk(srcDir)) { + for (Path f : walk.filter(Files::isRegularFile).toList()) { - for (String type : List.of("train", "val", "test")) { - for (String part : List.of("input1", "input2", "label", "label-json")) { + String fileName = f.getFileName().toString(); + Path dst = tmp.resolve(type).resolve(part).resolve(fileName); - Path srcDir = srcRoot.resolve(type).resolve(part); - if (!Files.isDirectory(srcDir)) { - log.warn("SKIP (not directory): {}", srcDir); - noDir++; - continue; - } - - scannedDirs++; - log.info("SCAN dir={}", srcDir); - - try (var stream = Files.walk(srcDir)) { - for (Path f : stream.filter(Files::isRegularFile).toList()) { - if (!Files.isRegularFile(f)) { - log.debug("skip non-regular file: {}", f); - continue; - } - - regularFiles++; - - String dstName = f.getFileName().toString(); - Path dst = tmp.resolve(type).resolve(part).resolve(dstName); - - // dst가 남아있으면 삭제(심볼릭링크든 파일이든) 하고 다시만듬 - if (Files.exists(dst) || Files.isSymbolicLink(dst)) { - Files.delete(dst); - log.debug("deleted existing: {}", dst); - } - - try { - // 하드링크 생성 (dst가 새 파일로 생기지만 inode는 f와 동일) - Files.createLink(dst, f); - hardlinksMade++; - log.debug("created hardlink: {} => {}", dst, f); - } catch (IOException e) { - // 여기서 바로 실패시키면 “tmp는 만들었는데 내용은 0개” 같은 상태를 방지할 수 있음 - log.error("FAILED create hardlink: {} => {}", dst, f, e); - throw e; - } + // ✅ 이름 유지 + 충돌은 skip + if (Files.exists(dst)) { + skippedCollision++; + continue; } + + Files.createLink(dst, f); + hardlinksMade++; } } } @@ -208,29 +90,35 @@ public class TmpDatasetService { if (hardlinksMade == 0) { throw new IOException( - "No hardlinks created. regularFiles=" - + regularFiles - + ", scannedDirs=" - + scannedDirs - + ", noDir=" - + noDir); + "No hardlinks created. noDir=" + noDir + ", skippedCollision=" + skippedCollision); } - log.info("tmp dataset created: {}", tmp); log.info( - "summary: scannedDirs={}, noDir={}, regularFiles={}, hardlinksMade={}", - scannedDirs, - noDir, - regularFiles, - hardlinksMade); + "tmp dataset merged: {} (type={}), hardlinksMade={}, skippedCollision={}, noDir={}", + tmp, + type, + hardlinksMade, + skippedCollision, + noDir); return uid; } private static Path toPath(String p) { - if (p.startsWith("~/")) { - return Paths.get(System.getProperty("user.home")).resolve(p.substring(2)).normalize(); + if (p == null || p.isBlank()) { + throw new IllegalArgumentException("path is null or blank"); } - return Paths.get(p).toAbsolutePath().normalize(); + String trimmed = p.trim(); + if (trimmed.startsWith("~/")) { + return Paths.get(System.getProperty("user.home")) + .resolve(trimmed.substring(2)) + .toAbsolutePath() + .normalize(); + } + return Paths.get(trimmed).toAbsolutePath().normalize(); + } + + private static String safe(String s) { + return (s == null || s.isBlank()) ? null : s.trim(); } } diff --git a/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java b/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java index b5dd84e..c519fd2 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java @@ -274,11 +274,11 @@ public class TrainJobService { List testList = modelTrainMngCoreService.findDatasetTestPath(modelId); // train 데이터셋 심볼링크 생성 - tmpDatasetService.buildTmpDatasetHardlink(raw, "train", trainList); + tmpDatasetService.buildTmpDatasetSymlink(raw, "train", trainList); // val 데이터셋 심볼링크 생성 - tmpDatasetService.buildTmpDatasetHardlink(raw, "val", valList); + tmpDatasetService.buildTmpDatasetSymlink(raw, "val", valList); // test 데이터셋 심볼링크 생성 - tmpDatasetService.buildTmpDatasetHardlink(raw, "test", testList); + tmpDatasetService.buildTmpDatasetSymlink(raw, "test", testList); ModelTrainMngDto.UpdateReq updateReq = new ModelTrainMngDto.UpdateReq(); updateReq.setRequestPath(raw); From 9dfa54fbf9d3cfed844ed0fd7ab2a67fb0c5a177 Mon Sep 17 00:00:00 2001 From: teddy Date: Sat, 28 Feb 2026 01:01:38 +0900 Subject: [PATCH 4/9] =?UTF-8?q?=EB=A6=AC=EC=BB=A4=EB=B2=84=EB=A6=AC=20?= =?UTF-8?q?=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/ModelTrainJobCoreService.java | 36 +- .../train/ModelTrainJobRepositoryCustom.java | 3 +- .../train/ModelTrainJobRepositoryImpl.java | 25 +- .../service/JobRecoveryOnStartupService.java | 435 ++++++++++++++++-- .../train/service/TmpDatasetService.java | 6 +- .../train/service/TrainJobService.java | 3 +- 6 files changed, 457 insertions(+), 51 deletions(-) diff --git a/src/main/java/com/kamco/cd/training/postgres/core/ModelTrainJobCoreService.java b/src/main/java/com/kamco/cd/training/postgres/core/ModelTrainJobCoreService.java index b8c396d..8cc1fe8 100644 --- a/src/main/java/com/kamco/cd/training/postgres/core/ModelTrainJobCoreService.java +++ b/src/main/java/com/kamco/cd/training/postgres/core/ModelTrainJobCoreService.java @@ -5,6 +5,8 @@ import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity; import com.kamco.cd.training.postgres.repository.train.ModelTrainJobRepository; import com.kamco.cd.training.train.dto.ModelTrainJobDto; import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -118,6 +120,28 @@ public class ModelTrainJobCoreService { log.info("[TRAIN JOB FAIL] jobId={}, modelId={}", jobId, errorMessage); } + /** + * 중단됨 처리 + * + * @param jobId + * @param exitCode + * @param errorMessage + */ + @Transactional + public void markPaused(Long jobId, Integer exitCode, String errorMessage) { + ModelTrainJobEntity job = + modelTrainJobRepository + .findById(jobId) + .orElseThrow(() -> new CustomApiException("NOT_FOUND_DATA", HttpStatus.NOT_FOUND)); + + job.setStatusCd("STOPPED"); + job.setExitCode(exitCode); + job.setErrorMessage(errorMessage); + job.setFinishedDttm(ZonedDateTime.now()); + + log.info("[TRAIN JOB FAIL] jobId={}, modelId={}", jobId, errorMessage); + } + /** 취소 처리 */ @Transactional public void markCanceled(Long jobId) { @@ -151,11 +175,13 @@ public class ModelTrainJobCoreService { * * @return */ - public ModelTrainJobDto findRunningJobs() { - ModelTrainJobEntity entity = modelTrainJobRepository.findRunningJobs().orElse(null); - if (entity == null) { - return null; + public List findRunningJobs() { + List entity = modelTrainJobRepository.findRunningJobs(); + + if (entity == null || entity.isEmpty()) { + return Collections.emptyList(); } - return entity.toDto(); + + return entity.stream().map(ModelTrainJobEntity::toDto).toList(); } } diff --git a/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryCustom.java b/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryCustom.java index 61724a9..e81fb80 100644 --- a/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryCustom.java +++ b/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryCustom.java @@ -1,6 +1,7 @@ package com.kamco.cd.training.postgres.repository.train; import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity; +import java.util.List; import java.util.Optional; public interface ModelTrainJobRepositoryCustom { @@ -12,5 +13,5 @@ public interface ModelTrainJobRepositoryCustom { void insertModelTestTrainingRun(Long modelId, Long jobId, int epoch); - Optional findRunningJobs(); + List findRunningJobs(); } diff --git a/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryImpl.java b/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryImpl.java index 9a296ab..698e302 100644 --- a/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryImpl.java +++ b/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryImpl.java @@ -9,6 +9,7 @@ import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity; import com.kamco.cd.training.postgres.entity.QModelTrainJobEntity; import com.querydsl.jpa.impl.JPAQueryFactory; import jakarta.persistence.EntityManager; +import java.util.List; import java.util.Optional; import org.springframework.stereotype.Repository; @@ -83,18 +84,16 @@ public class ModelTrainJobRepositoryImpl implements ModelTrainJobRepositoryCusto } @Override - public Optional findRunningJobs() { - return Optional.ofNullable( - queryFactory - .select(modelTrainJobEntity) - .from(modelTrainJobEntity) - .where( - modelTrainJobEntity - .statusCd - .eq(JobStatusType.RUNNING.getId()) - .and(modelTrainJobEntity.jobType.eq(JobType.TRAIN.getId()))) - .orderBy(modelTrainJobEntity.id.desc()) - .limit(1) - .fetchOne()); + public List findRunningJobs() { + return queryFactory + .select(modelTrainJobEntity) + .from(modelTrainJobEntity) + .where( + modelTrainJobEntity + .statusCd + .eq(JobStatusType.RUNNING.getId()) + .and(modelTrainJobEntity.jobType.eq(JobType.TRAIN.getId()))) + .orderBy(modelTrainJobEntity.id.desc()) + .fetch(); } } diff --git a/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java b/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java index 9ea39ca..526f046 100644 --- a/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java +++ b/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java @@ -1,74 +1,451 @@ package com.kamco.cd.training.train.service; +import com.kamco.cd.training.model.dto.ModelTrainMngDto; import com.kamco.cd.training.postgres.core.ModelTrainJobCoreService; +import com.kamco.cd.training.postgres.core.ModelTrainMngCoreService; import com.kamco.cd.training.train.dto.ModelTrainJobDto; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.annotation.Profile; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -/** 실행중 학습이 있을때 처리 */ +/** + * 서버 재기동 시 "RUNNING 상태로 남아있는 학습 Job"을 복구(정리)하기 위한 서비스. + * + *

상황 예시: - 서버가 강제 재기동/장애로 내려감 - DB 상에서는 job_state가 RUNNING(진행중)으로 남아있음 - 실제 docker 컨테이너는: 1) 아직 + * 살아있거나(running=true) 2) 종료되었거나(exited) 3) --rm 옵션으로 인해 컨테이너가 이미 삭제되어 존재하지 않을 수 있음 + * + *

이 클래스는 ApplicationReadyEvent(스프링 부팅 완료) 시점에 실행되어, DB의 RUNNING 잡들을 조회한 뒤 컨테이너 상태를 점검하고, + * SUCCESS/FAILED 처리를 수행합니다. + */ +@Profile("!local") @Component @RequiredArgsConstructor @Log4j2 -@Transactional(readOnly = true) public class JobRecoveryOnStartupService { + private final ModelTrainJobCoreService modelTrainJobCoreService; + private final ModelTrainMngCoreService modelTrainMngCoreService; + /** + * Docker 컨테이너가 쓰는 response(산출물) 디렉토리의 "호스트 측" 베이스 경로. 예) /data/train/response + * + *

컨테이너가 --rm 으로 삭제된 경우에도 이 경로에 val.csv / *.pth 등이 남아있으면 정상 종료 여부를 "파일 기반"으로 판정합니다. + */ + @Value("${train.docker.responseDir}") + private String responseDir; + + /** + * 스프링 부팅 완료 시점(빈 생성/초기화 모두 끝난 뒤)에 복구 로직 실행. + * + *

@Transactional: - recover() 메서드 전체가 하나의 트랜잭션으로 감싸집니다. - Job 하나씩 처리하다가 예외가 발생하면 전체 롤백이 될 수 + * 있으므로 "잡 단위로 확실히 커밋"이 필요하면 (권장) 잡 단위로 분리 트랜잭션(REQUIRES_NEW) 고려하세요. + */ @EventListener(ApplicationReadyEvent.class) + @Transactional public void recover() { - // RUNNING 중인 학습이 있는지 조회 - ModelTrainJobDto runningJobs = modelTrainJobCoreService.findRunningJobs(); - if (runningJobs == null) { + // 1) DB에서 "RUNNING(진행중) 상태"로 남아있는 job 목록을 조회 + List runningJobs = modelTrainJobCoreService.findRunningJobs(); + + // 실행중 job이 없으면 할 일 없음 + if (runningJobs == null || runningJobs.isEmpty()) { return; } - String containerName = runningJobs.getContainerName(); + // 2) 각 job에 대해 docker 컨테이너 상태를 확인하고, 상태에 따라 조치 + for (ModelTrainJobDto job : runningJobs) { + String containerName = job.getContainerName(); - try { - boolean containerAlive = isContainerRunning(containerName); + try { + // 2-1) docker inspect로 컨테이너 상태 조회 + DockerInspectState state = inspectContainer(containerName); - if (containerAlive) { - // 컨테이너 살아있으면 → RUNNING 유지 - log.info("[RECOVERY] container still running: {}", containerName); + // 3) 컨테이너가 "없음" + // - docker run --rm 로 실행한 컨테이너는 정상 종료 시 바로 삭제될 수 있음 + // - 즉 "컨테이너 없음"이 무조건 실패는 아님 + if (!state.exists()) { + log.warn( + "[RECOVERY] container missing. try file-based reconcile. container={}", + containerName); - } else { - // 컨테이너 죽었으면 → FAILED 처리 - log.info("[RECOVERY] container not found. mark FAILED: {}", containerName); + // 3-1) 컨테이너가 없을 때는 산출물(responseDir)을 보고 완료 여부를 "추정" + OutputResult out = probeOutputs(job); + + // 3-2) 산출물이 충분하면 성공 처리 + if (out.completed()) { + log.info("[RECOVERY] outputs look completed. mark SUCCESS. jobId={}", job.getId()); + modelTrainJobCoreService.markSuccess(job.getId(), 0); + markStepSuccessByJobType(job); + + } else { + // 3-3) 산출물이 부족하면 실패 처리(운영 정책에 따라 "유예"도 가능) + log.warn( + "[RECOVERY] outputs incomplete. mark FAILED. jobId={} reason={}", + job.getId(), + out.reason()); + + modelTrainJobCoreService.markFailed( + job.getId(), -1, "SERVER_RESTART_CONTAINER_MISSING_OUTPUT_INCOMPLETE"); + + markStepErrorByJobType(job, out.reason()); + } + continue; + } + + // 4) 컨테이너는 존재하고, 아직 running=true + // - 서버만 재기동됐고 컨테이너는 그대로 살아있는 케이스 + // - 이 경우 DB를 건드리면 오히려 꼬일 수 있으니 RUNNING 유지 + if (state.running()) { + log.info("[RECOVERY] container still running. container={}", containerName); + try { + ProcessBuilder pb = new ProcessBuilder("docker", "stop", "-t", "20", containerName); + pb.redirectErrorStream(true); + + Process p = pb.start(); + + boolean finished = p.waitFor(30, TimeUnit.SECONDS); + if (!finished) { + p.destroyForcibly(); + throw new IOException("docker stop timeout"); + } + + int code = p.exitValue(); + if (code != 0) { + throw new IOException("docker stop failed. exit=" + code); + } + + log.info( + "[RECOVERY] container stopped (will be auto removed by --rm). container={}", + containerName); + + // 여기서 상태를 PAUSED로 바꿔도 되고 + modelTrainJobCoreService.markPaused(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART"); + + } catch (Exception e) { + log.error("[RECOVERY] docker stop failed. container={}", containerName, e); + + modelTrainJobCoreService.markFailed(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART"); + } + continue; + } + + // 5) 컨테이너는 존재하지만 running=false + // - exited / dead 등의 상태 + Integer exitCode = state.exitCode(); + String status = state.status(); + + // 5-1) exitCode=0이면 정상 종료로 간주 → SUCCESS 처리 + if (exitCode != null && exitCode == 0) { + log.info("[RECOVERY] container exited(0). mark SUCCESS. container={}", containerName); + modelTrainJobCoreService.markSuccess(job.getId(), 0); + markStepSuccessByJobType(job); + + } else { + // 5-2) exitCode != 0 이거나 null이면 실패로 간주 → FAILED 처리 + log.warn( + "[RECOVERY] container exited non-zero. mark FAILED. container={} status={} exitCode={}", + containerName, + status, + exitCode); + + modelTrainJobCoreService.markFailed( + job.getId(), exitCode, "SERVER_RESTART_CONTAINER_EXIT_NONZERO"); + + markStepErrorByJobType(job, "exit=" + exitCode + " status=" + status); + } + + } catch (Exception e) { + // 6) docker inspect 자체가 실패한 경우 + // - docker 데몬 문제/권한 문제/일시적 오류 가능 + // - 운영 정책에 따라 "바로 실패" 대신 "유예" 처리도 고려 가능 + log.error("[RECOVERY] container inspect failed. container={}", containerName, e); modelTrainJobCoreService.markFailed( - runningJobs.getId(), null, "SERVER_RESTART_CONTAINER_NOT_FOUND"); - } - } catch (IOException e) { - log.error("[RECOVERY] container check failed. mark FAILED: {}", containerName, e); + job.getId(), -1, "SERVER_RESTART_CONTAINER_INSPECT_ERROR"); - modelTrainJobCoreService.markFailed( - runningJobs.getId(), null, "SERVER_RESTART_CONTAINER_CHECK_ERROR"); + markStepErrorByJobType(job, "inspect-error"); + } } } /** - * docker 실행중인지 확인하기 + * jobType에 따라 학습 관리 테이블의 "성공 단계"를 업데이트. * - * @param containerName container name - * @return true, false - * @throws IOException + *

예: - jobType == "EVAL" → step2(평가 단계) 성공 - 그 외 → step1(학습 단계) 성공 */ - private boolean isContainerRunning(String containerName) throws IOException { + private void markStepSuccessByJobType(ModelTrainJobDto job) { + Map params = job.getParamsJson(); + boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType"))); + if (isEval) { + modelTrainMngCoreService.markStep2Success(job.getModelId()); + } else { + modelTrainMngCoreService.markStep1Success(job.getModelId()); + } + } + + /** + * jobType에 따라 학습 관리 테이블의 "에러 단계"를 업데이트. + * + *

예: - jobType == "EVAL" → step2(평가 단계) 에러 - 그 외 → step1 혹은 전체 에러 + */ + private void markStepErrorByJobType(ModelTrainJobDto job, String msg) { + Map params = job.getParamsJson(); + boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType"))); + if (isEval) { + modelTrainMngCoreService.markStep2Error(job.getModelId(), msg); + } else { + modelTrainMngCoreService.markError(job.getModelId(), msg); + } + } + + /** + * docker inspect를 사용해서 컨테이너 상태를 조회합니다. + * + *

사용하는 템플릿: {{.State.Status}} {{.State.Running}} {{.State.ExitCode}} + * + *

예상 출력 예: - "running true 0" - "exited false 0" - "exited false 137" + * + *

주의: - 컨테이너가 없거나 inspect 실패 시 exitCode != 0 또는 output이 비어서 missing() 반환 - 무한 대기 방지를 위해 5초 + * 타임아웃을 둠 + */ + private DockerInspectState inspectContainer(String containerName) + throws IOException, InterruptedException { ProcessBuilder pb = - new ProcessBuilder("docker", "inspect", "-f", "{{.State.Running}}", containerName); + new ProcessBuilder( + "docker", + "inspect", + "-f", + "{{.State.Status}} {{.State.Running}} {{.State.ExitCode}}", + containerName); + + // stderr를 stdout으로 합쳐서 한 스트림으로 읽기(에러 메시지도 함께 받음) + pb.redirectErrorStream(true); Process p = pb.start(); - BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream())); - String line = br.readLine(); - return "true".equals(line); + // inspect 출력은 1줄이면 충분하므로 readLine()만 수행 + String output; + try (BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + output = br.readLine(); + } + + // 무한대기 방지: 5초 내에 종료되지 않으면 강제 종료 + boolean finished = p.waitFor(5, TimeUnit.SECONDS); + if (!finished) { + p.destroyForcibly(); + throw new IOException("docker inspect timeout"); + } + + // docker inspect 자체의 프로세스 exit code + int code = p.exitValue(); + + // 실패(코드 !=0) 또는 출력이 없으면 "컨테이너 없음"으로 간주 + if (code != 0 || output == null || output.isBlank()) { + return DockerInspectState.missing(); + } + + // "status running exitCode" 형태로 split + String[] parts = output.trim().split("\\s+"); + + // status: running/exited/dead 등 + String status = parts.length > 0 ? parts[0] : "unknown"; + + // running: true/false + boolean running = parts.length > 1 && Boolean.parseBoolean(parts[1]); + + // exitCode: 정수 파싱(파싱 실패하면 null) + Integer exitCode = null; + if (parts.length > 2) { + try { + exitCode = Integer.parseInt(parts[2]); + } catch (Exception ignore) { + // ignore + } + } + + return new DockerInspectState(true, running, exitCode, status); + } + + /** + * docker inspect 결과를 담는 레코드. + * + *

exists: - true : docker inspect 성공 (컨테이너 존재) - false : 컨테이너 없음(또는 inspect 실패를 missing으로 간주) + */ + private record DockerInspectState( + boolean exists, boolean running, Integer exitCode, String status) { + static DockerInspectState missing() { + return new DockerInspectState(false, false, null, "missing"); + } + } + + // ============================================================================================ + // 컨테이너가 "없을 때" 파일 기반으로 완료/미완료를 판정하는 로직 + // ============================================================================================ + + /** + * 컨테이너가 없을 때(responseDir 산출물만 남아있는 상태) 완료 여부를 파일 기반으로 판정합니다. + * + *

판정 규칙(보수적으로 설계): 1) total_epoch가 paramsJson에 있어야 함 (없으면 완료 판단 불가) 2) val.csv 존재 + 헤더 제외 라인 수 + * >= total_epoch 이어야 함 3) *.pth 파일이 total_epoch 이상 존재하거나, best*.pth(또는 *best*.pth)가 존재해야 함 + * + *

왜 이렇게? - 어떤 학습은 epoch마다 pth를 남기고 - 어떤 학습은 best만 남기기도 해서 "pthCount >= total_epoch"만 쓰면 정상 종료를 + * 실패로 오판할 수 있음. + */ + private OutputResult probeOutputs(ModelTrainJobDto job) { + try { + Path outDir = resolveOutputDir(job); + if (outDir == null || !Files.isDirectory(outDir)) { + return new OutputResult(false, "output-dir-missing"); + } + + Integer totalEpoch = extractTotalEpoch(job).orElse(null); + if (totalEpoch == null || totalEpoch <= 0) { + return new OutputResult(false, "total-epoch-missing"); + } + + Path valCsv = outDir.resolve("val.csv"); + if (!Files.exists(valCsv)) { + return new OutputResult(false, "val.csv-missing"); + } + + long lines = countNonHeaderLines(valCsv); + + // “같아야 완료” 정책 + if (lines == totalEpoch) { + return new OutputResult(true, "ok"); + } + + return new OutputResult( + false, "val.csv-lines-mismatch lines=" + lines + " expected=" + totalEpoch); + + } catch (Exception e) { + log.error("[RECOVERY] probeOutputs error. jobId={}", job.getId(), e); + return new OutputResult(false, "probe-error"); + } + } + + /** + * responseDir 아래에서 job 산출물 디렉토리를 찾습니다. + * + *

가장 중요한 커스터마이징 포인트: - 실제 운영 환경에서 산출물이 어떤 경로 규칙으로 저장되는지에 따라 여기만 수정하면 됩니다. + * + *

현재 기본 탐색 순서: 1) {responseDir}/{jobId} 2) {responseDir}/{modelId} 3) + * {responseDir}/{containerName} 4) 마지막 fallback: responseDir 자체 + * + *

추천: - 여러분 규칙이 "{responseDir}/{modelId}/{jobId}" 같은 형태라면 base.resolve(modelId).resolve(jobId) + * 형태를 1순위로 두세요. + */ + private Path resolveOutputDir(ModelTrainJobDto job) { + ModelTrainMngDto.Basic model = modelTrainMngCoreService.findModelById(job.getModelId()); + + Path base = Paths.get(responseDir, model.getUuid().toString(), "metrics"); + + return Files.isDirectory(base) ? base : null; + } + + /** + * paramsJson에서 total_epoch 값을 추출합니다. + * + *

키 후보: - "total_epoch" (snake_case) - "totalEpoch" (camelCase) + * + *

예: paramsJson = {"jobType":"TRAIN","total_epoch":50,...} + */ + private Optional extractTotalEpoch(ModelTrainJobDto job) { + Map params = job.getParamsJson(); + if (params == null) return Optional.empty(); + + Object v = params.get("total_epoch"); + if (v == null) v = params.get("totalEpoch"); + if (v == null) return Optional.empty(); + + try { + return Optional.of(Integer.parseInt(String.valueOf(v))); + } catch (Exception ignore) { + return Optional.empty(); + } + } + + /** + * CSV 파일에서 "헤더(첫 줄)"를 제외한 라인 수를 계산합니다. + * + *

가정: - val.csv 첫 줄은 헤더 - 이후 라인들이 epoch별 기록(또는 유사한 누적 기록) + * + *

주의: - 파일 인코딩은 UTF-8로 가정 - 빈 줄은 제외 + */ + private long countNonHeaderLines(Path csv) throws IOException { + try (Stream lines = Files.lines(csv, StandardCharsets.UTF_8)) { + return lines.skip(1).filter(s -> s != null && !s.isBlank()).count(); + } + } + + /** + * 디렉토리에서 glob 패턴에 맞는 파일 수를 셉니다. + * + *

예: - "*.pth" - "best*.pth" + */ + private long countFilesByGlob(Path dir, String glob) throws IOException { + try (DirectoryStream ds = Files.newDirectoryStream(dir, glob)) { + long cnt = 0; + for (Path p : ds) { + if (Files.isRegularFile(p)) cnt++; + } + return cnt; + } + } + + /** 디렉토리에서 glob 패턴에 맞는 파일이 "하나라도" 존재하는지 체크합니다. */ + private boolean existsByGlob(Path dir, String glob) throws IOException { + try (DirectoryStream ds = Files.newDirectoryStream(dir, glob)) { + return ds.iterator().hasNext(); + } + } + + // ============================================================================================ + // probeOutputs() 결과 객체 + // ============================================================================================ + + /** + * 컨테이너가 없을 때(responseDir 기반) 완료 여부 판정 결과. + * + *

completed: - true : 산출물이 완료로 보임(성공 처리 가능) - false : 산출물이 부족/불명확(실패 또는 유예 판단) + * + *

reason: - 실패/미완료 사유(로그/DB 메시지로 남기기 용도) + */ + private static final class OutputResult { + + private final boolean completed; + private final String reason; + + private OutputResult(boolean completed, String reason) { + this.completed = completed; + this.reason = reason; + } + + boolean completed() { + return completed; + } + + String reason() { + return reason; + } } } diff --git a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java index 0b7547b..27b59ae 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java @@ -20,6 +20,8 @@ public class TmpDatasetService { @Value("${train.docker.basePath}") private String trainBaseDir; + private final DataSetCountersService dataSetCountersService; + /** * 다른 데이터셋 파일과 이름이 겹치면 그 파일은 skip함 * @@ -68,14 +70,14 @@ public class TmpDatasetService { continue; } - // ✅ 하위폴더까지 전부 + // 하위폴더까지 전부 try (var walk = Files.walk(srcDir)) { for (Path f : walk.filter(Files::isRegularFile).toList()) { String fileName = f.getFileName().toString(); Path dst = tmp.resolve(type).resolve(part).resolve(fileName); - // ✅ 이름 유지 + 충돌은 skip + // 이름 유지 + 충돌은 skip if (Files.exists(dst)) { skippedCollision++; continue; diff --git a/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java b/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java index c519fd2..5193cda 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java @@ -279,7 +279,8 @@ public class TrainJobService { tmpDatasetService.buildTmpDatasetSymlink(raw, "val", valList); // test 데이터셋 심볼링크 생성 tmpDatasetService.buildTmpDatasetSymlink(raw, "test", testList); - + // 카운트 로그 + dataSetCounters.getCount(modelId); ModelTrainMngDto.UpdateReq updateReq = new ModelTrainMngDto.UpdateReq(); updateReq.setRequestPath(raw); From 365ad81cad91bf50d7b8ae28be2b485e69688af4 Mon Sep 17 00:00:00 2001 From: teddy Date: Sat, 28 Feb 2026 01:24:34 +0900 Subject: [PATCH 5/9] =?UTF-8?q?=EB=A6=AC=EC=BB=A4=EB=B2=84=EB=A6=AC=20?= =?UTF-8?q?=EC=82=AD=EC=A0=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cd/training/train/service/JobRecoveryOnStartupService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java b/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java index 526f046..88b9bc7 100644 --- a/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java +++ b/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java @@ -58,7 +58,7 @@ public class JobRecoveryOnStartupService { *

@Transactional: - recover() 메서드 전체가 하나의 트랜잭션으로 감싸집니다. - Job 하나씩 처리하다가 예외가 발생하면 전체 롤백이 될 수 * 있으므로 "잡 단위로 확실히 커밋"이 필요하면 (권장) 잡 단위로 분리 트랜잭션(REQUIRES_NEW) 고려하세요. */ - @EventListener(ApplicationReadyEvent.class) +// @EventListener(ApplicationReadyEvent.class) @Transactional public void recover() { From 69eaba1a839eed270564bcc639206581f1260bcf Mon Sep 17 00:00:00 2001 From: teddy Date: Tue, 3 Mar 2026 22:51:10 +0900 Subject: [PATCH 6/9] =?UTF-8?q?=ED=95=98=EB=93=9C=EB=A7=81=ED=81=AC=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../train/service/TmpDatasetService.java | 236 +++++++++++++----- .../train/service/TrainJobService.java | 11 +- 2 files changed, 179 insertions(+), 68 deletions(-) diff --git a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java index 27b59ae..978fe2b 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java @@ -20,71 +20,187 @@ public class TmpDatasetService { @Value("${train.docker.basePath}") private String trainBaseDir; - private final DataSetCountersService dataSetCountersService; - /** - * 다른 데이터셋 파일과 이름이 겹치면 그 파일은 skip함 + * train, val, test 폴더별로 link * - * @param uid - * @param type - * @param links + * @param uid 임시폴더 uuid + * @param type train, val, test + * @param links tif pull path * @return * @throws IOException */ - public String buildTmpDatasetSymlink(String uid, String type, List links) + public void buildTmpDatasetHardlink(String uid, String type, List links) throws IOException { - if (uid == null || uid.isBlank()) throw new IOException("uid is empty"); - if (type == null || type.isBlank()) throw new IOException("type is empty"); - if (links == null || links.isEmpty()) throw new IOException("links is empty"); + if (links == null || links.isEmpty()) { + throw new IOException("links is empty"); + } - log.info("========== buildTmpDatasetHardlink MERGE START =========="); - log.info("uid={}, type={}, links.size={}", uid, type, links.size()); + Path tmp = Path.of(trainBaseDir, "tmp", uid); + + long hardlinksMade = 0; + + for (ModelTrainLinkDto dto : links) { + + if (type == null) { + log.warn("SKIP - trainType null: {}", dto); + continue; + } + + // type별 디렉토리 생성 + Files.createDirectories(tmp.resolve(type).resolve("input1")); + Files.createDirectories(tmp.resolve(type).resolve("input2")); + Files.createDirectories(tmp.resolve(type).resolve("label")); + Files.createDirectories(tmp.resolve(type).resolve("label-json")); + + // comparePath → input1 + hardlinksMade += link(tmp, type, "input1", dto.getComparePath()); + + // targetPath → input2 + hardlinksMade += link(tmp, type, "input2", dto.getTargetPath()); + + // labelPath → label + hardlinksMade += link(tmp, type, "label", dto.getLabelPath()); + + // geoJsonPath -> label-json + hardlinksMade += link(tmp, type, "label-json", dto.getGeoJsonPath()); + } + + if (hardlinksMade == 0) { + throw new IOException("No hardlinks created."); + } + + log.info("tmp dataset created: {}, hardlinksMade={}", tmp, hardlinksMade); + } + + private long link(Path tmp, String type, String part, String fullPath) throws IOException { + + if (fullPath == null || fullPath.isBlank()) return 0; + + Path src = Path.of(fullPath); + + if (!Files.isRegularFile(src)) { + log.warn("SKIP (not file): {}", src); + return 0; + } + + String fileName = src.getFileName().toString(); + Path dst = tmp.resolve(type).resolve(part).resolve(fileName); + + // 충돌 시 덮어쓰기 + if (Files.exists(dst)) { + Files.delete(dst); + } + + Files.createLink(dst, src); + + return 1; + } + + private String safe(String s) { + return (s == null || s.isBlank()) ? null : s.trim(); + } + + /** + * request 전체 폴더 link + * + * @param uid + * @param datasetUids + * @return + * @throws IOException + */ + public String buildTmpDatasetSymlink(String uid, List datasetUids) throws IOException { + + log.info("========== buildTmpDatasetHardlink START =========="); + log.info("uid={}", uid); + log.info("datasetUids={}", datasetUids); + log.info("requestDir(raw)={}", requestDir); Path BASE = toPath(requestDir); Path tmp = Path.of(trainBaseDir, "tmp", uid); - long hardlinksMade = 0; - long skippedCollision = 0; - long noDir = 0; + log.info("BASE={}", BASE); + log.info("BASE exists? {}", Files.isDirectory(BASE)); + log.info("tmp={}", tmp); - // tmp// 준비 - for (String part : List.of("input1", "input2", "label", "label-json")) { - Files.createDirectories(tmp.resolve(type).resolve(part)); + long noDir = 0, scannedDirs = 0, regularFiles = 0, hardlinksMade = 0; + + // tmp 디렉토리 준비 + for (String type : List.of("train", "val", "test")) { + for (String part : List.of("input1", "input2", "label", "label-json")) { + Path dir = tmp.resolve(type).resolve(part); + Files.createDirectories(dir); + log.info("createDirectories: {}", dir); + } } - for (ModelTrainLinkDto dto : links) { - String datasetUid = safe(dto.getDatasetUid()); - if (datasetUid == null) { - log.warn("SKIP dto (datasetUid null): {}", dto); - continue; + // 하드링크는 "같은 파일시스템"에서만 가능하므로 BASE/tmp가 같은 FS인지 미리 확인(권장) + try { + var baseStore = Files.getFileStore(BASE); + var tmpStore = Files.getFileStore(tmp.getParent()); // BASE/tmp + if (!baseStore.name().equals(tmpStore.name()) || !baseStore.type().equals(tmpStore.type())) { + throw new IOException( + "Hardlink requires same filesystem. baseStore=" + + baseStore.name() + + "(" + + baseStore.type() + + "), tmpStore=" + + tmpStore.name() + + "(" + + tmpStore.type() + + ")"); } + } catch (Exception e) { + // FileStore 비교가 환경마다 애매할 수 있어서, 여기서는 경고만 주고 실제 createLink에서 최종 판단하게 둘 수도 있음. + log.warn("FileStore check skipped/failed (will rely on createLink): {}", e.toString()); + } - Path srcRoot = BASE.resolve(datasetUid); + for (String id : datasetUids) { + Path srcRoot = BASE.resolve(id); + log.info("---- dataset id={} srcRoot={} exists? {}", id, srcRoot, Files.isDirectory(srcRoot)); - for (String part : List.of("input1", "input2", "label", "label-json")) { + for (String type : List.of("train", "val", "test")) { + for (String part : List.of("input1", "input2", "label", "label-json")) { - Path srcDir = srcRoot.resolve(type).resolve(part); - if (!Files.isDirectory(srcDir)) { - noDir++; - continue; - } + Path srcDir = srcRoot.resolve(type).resolve(part); + if (!Files.isDirectory(srcDir)) { + log.warn("SKIP (not directory): {}", srcDir); + noDir++; + continue; + } - // 하위폴더까지 전부 - try (var walk = Files.walk(srcDir)) { - for (Path f : walk.filter(Files::isRegularFile).toList()) { + scannedDirs++; + log.info("SCAN dir={}", srcDir); - String fileName = f.getFileName().toString(); - Path dst = tmp.resolve(type).resolve(part).resolve(fileName); + try (DirectoryStream stream = Files.newDirectoryStream(srcDir)) { + for (Path f : stream) { + if (!Files.isRegularFile(f)) { + log.debug("skip non-regular file: {}", f); + continue; + } - // 이름 유지 + 충돌은 skip - if (Files.exists(dst)) { - skippedCollision++; - continue; + regularFiles++; + + String dstName = id + "__" + f.getFileName(); + Path dst = tmp.resolve(type).resolve(part).resolve(dstName); + + // dst가 남아있으면 삭제(심볼릭링크든 파일이든) + if (Files.exists(dst) || Files.isSymbolicLink(dst)) { + Files.delete(dst); + log.debug("deleted existing: {}", dst); + } + + try { + // 하드링크 생성 (dst가 새 파일로 생기지만 inode는 f와 동일) + Files.createLink(dst, f); + hardlinksMade++; + log.debug("created hardlink: {} => {}", dst, f); + } catch (IOException e) { + // 여기서 바로 실패시키면 “tmp는 만들었는데 내용은 0개” 같은 상태를 방지할 수 있음 + log.error("FAILED create hardlink: {} => {}", dst, f, e); + throw e; + } } - - Files.createLink(dst, f); - hardlinksMade++; } } } @@ -92,35 +208,29 @@ public class TmpDatasetService { if (hardlinksMade == 0) { throw new IOException( - "No hardlinks created. noDir=" + noDir + ", skippedCollision=" + skippedCollision); + "No hardlinks created. regularFiles=" + + regularFiles + + ", scannedDirs=" + + scannedDirs + + ", noDir=" + + noDir); } + log.info("tmp dataset created: {}", tmp); log.info( - "tmp dataset merged: {} (type={}), hardlinksMade={}, skippedCollision={}, noDir={}", - tmp, - type, - hardlinksMade, - skippedCollision, - noDir); + "summary: scannedDirs={}, noDir={}, regularFiles={}, hardlinksMade={}", + scannedDirs, + noDir, + regularFiles, + hardlinksMade); return uid; } private static Path toPath(String p) { - if (p == null || p.isBlank()) { - throw new IllegalArgumentException("path is null or blank"); + if (p.startsWith("~/")) { + return Paths.get(System.getProperty("user.home")).resolve(p.substring(2)).normalize(); } - String trimmed = p.trim(); - if (trimmed.startsWith("~/")) { - return Paths.get(System.getProperty("user.home")) - .resolve(trimmed.substring(2)) - .toAbsolutePath() - .normalize(); - } - return Paths.get(trimmed).toAbsolutePath().normalize(); - } - - private static String safe(String s) { - return (s == null || s.isBlank()) ? null : s.trim(); + return Paths.get(p).toAbsolutePath().normalize(); } } diff --git a/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java b/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java index 5193cda..d2dd5a6 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java @@ -266,6 +266,8 @@ public class TrainJobService { List uids = modelTrainMngCoreService.findDatasetUid(datasetIds); try { + // 데이터셋 심볼링크 생성 + // String pathUid = tmpDatasetService.buildTmpDatasetSymlink(raw, uids); // train path List trainList = modelTrainMngCoreService.findDatasetTrainPath(modelId); // validation path @@ -274,13 +276,12 @@ public class TrainJobService { List testList = modelTrainMngCoreService.findDatasetTestPath(modelId); // train 데이터셋 심볼링크 생성 - tmpDatasetService.buildTmpDatasetSymlink(raw, "train", trainList); + tmpDatasetService.buildTmpDatasetHardlink(raw, "train", trainList); // val 데이터셋 심볼링크 생성 - tmpDatasetService.buildTmpDatasetSymlink(raw, "val", valList); + tmpDatasetService.buildTmpDatasetHardlink(raw, "val", valList); // test 데이터셋 심볼링크 생성 - tmpDatasetService.buildTmpDatasetSymlink(raw, "test", testList); - // 카운트 로그 - dataSetCounters.getCount(modelId); + tmpDatasetService.buildTmpDatasetHardlink(raw, "test", testList); + ModelTrainMngDto.UpdateReq updateReq = new ModelTrainMngDto.UpdateReq(); updateReq.setRequestPath(raw); From 335f0dbb9bc0313d8d14fddda6d8400cd8b49b4b Mon Sep 17 00:00:00 2001 From: teddy Date: Tue, 3 Mar 2026 23:01:22 +0900 Subject: [PATCH 7/9] =?UTF-8?q?spotless=20=EC=A0=81=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/kamco/cd/training/dataset/service/DatasetService.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/kamco/cd/training/dataset/service/DatasetService.java b/src/main/java/com/kamco/cd/training/dataset/service/DatasetService.java index cc0088e..ba54cb1 100644 --- a/src/main/java/com/kamco/cd/training/dataset/service/DatasetService.java +++ b/src/main/java/com/kamco/cd/training/dataset/service/DatasetService.java @@ -417,6 +417,7 @@ public class DatasetService { } private String escape(String path) { + // 쉘 커맨드에서 안전하게 사용할 수 있도록 문자열을 작은따옴표로 감싸면서, 내부의 작은따옴표를 이스케이프 처리 return "'" + path.replace("'", "'\"'\"'") + "'"; } From f3c822587fe90a461367d5ec7676956c96e881a8 Mon Sep 17 00:00:00 2001 From: teddy Date: Tue, 3 Mar 2026 23:02:53 +0900 Subject: [PATCH 8/9] =?UTF-8?q?spotless=20=EC=A0=81=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../training/train/service/JobRecoveryOnStartupService.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java b/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java index 88b9bc7..3e3439a 100644 --- a/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java +++ b/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java @@ -20,9 +20,7 @@ import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Value; -import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.annotation.Profile; -import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -58,7 +56,7 @@ public class JobRecoveryOnStartupService { *

@Transactional: - recover() 메서드 전체가 하나의 트랜잭션으로 감싸집니다. - Job 하나씩 처리하다가 예외가 발생하면 전체 롤백이 될 수 * 있으므로 "잡 단위로 확실히 커밋"이 필요하면 (권장) 잡 단위로 분리 트랜잭션(REQUIRES_NEW) 고려하세요. */ -// @EventListener(ApplicationReadyEvent.class) + // @EventListener(ApplicationReadyEvent.class) @Transactional public void recover() { From e9f8bb37fab0655ce2a7e3788f51c75a5338f976 Mon Sep 17 00:00:00 2001 From: teddy Date: Tue, 3 Mar 2026 23:06:45 +0900 Subject: [PATCH 9/9] =?UTF-8?q?spotless=20=EC=A0=81=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cd/training/common/utils/FIleChecker.java | 87 ------------------- 1 file changed, 87 deletions(-) diff --git a/src/main/java/com/kamco/cd/training/common/utils/FIleChecker.java b/src/main/java/com/kamco/cd/training/common/utils/FIleChecker.java index 12364b8..a44e9c5 100644 --- a/src/main/java/com/kamco/cd/training/common/utils/FIleChecker.java +++ b/src/main/java/com/kamco/cd/training/common/utils/FIleChecker.java @@ -3,7 +3,6 @@ package com.kamco.cd.training.common.utils; import static java.lang.String.CASE_INSENSITIVE_ORDER; import com.jcraft.jsch.ChannelExec; -import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; import com.jcraft.jsch.Session; import com.kamco.cd.training.common.exception.CustomApiException; @@ -787,92 +786,6 @@ public class FIleChecker { return destFile; } - public static void uploadTo86(Path localFile) { - - String host = "192.168.2.86"; - int port = 22; - String username = "kcomu"; - String password = "Kamco2025!"; - - String remoteDir = "/home/kcomu/data/request"; - - Session session = null; - ChannelSftp channel = null; - - try { - JSch jsch = new JSch(); - - session = jsch.getSession(username, host, port); - session.setPassword(password); - - Properties config = new Properties(); - config.put("StrictHostKeyChecking", "no"); - session.setConfig(config); - - session.connect(10_000); - - channel = (ChannelSftp) session.openChannel("sftp"); - channel.connect(10_000); - - // 목적지 디렉토리 이동 - channel.cd(remoteDir); - - // 업로드 - channel.put(localFile.toString(), localFile.getFileName().toString()); - - } catch (Exception e) { - throw new RuntimeException("SFTP upload failed", e); - } finally { - if (channel != null) channel.disconnect(); - if (session != null) session.disconnect(); - } - } - - public static void unzipOn86Server(String zipPath, String targetDir) { - - String host = "192.168.2.86"; - String user = "kcomu"; - String password = "Kamco2025!"; - - Session session = null; - ChannelExec channel = null; - - try { - JSch jsch = new JSch(); - - session = jsch.getSession(user, host, 22); - session.setPassword(password); - - Properties config = new Properties(); - config.put("StrictHostKeyChecking", "no"); - session.setConfig(config); - - session.connect(10_000); - - String command = "unzip -o " + zipPath + " -d " + targetDir; - - channel = (ChannelExec) session.openChannel("exec"); - channel.setCommand(command); - channel.setErrStream(System.err); - - InputStream in = channel.getInputStream(); - channel.connect(); - - // 출력 읽기(선택) - try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) { - while (br.readLine() != null) { - // 필요하면 로그 - } - } - - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - if (channel != null) channel.disconnect(); - if (session != null) session.disconnect(); - } - } - public static List execCommandAndReadLines(String command) { List result = new ArrayList<>();