diff --git a/src/main/java/com/kamco/cd/training/common/utils/FIleChecker.java b/src/main/java/com/kamco/cd/training/common/utils/FIleChecker.java index 6ba9a5b..7c99e88 100644 --- a/src/main/java/com/kamco/cd/training/common/utils/FIleChecker.java +++ b/src/main/java/com/kamco/cd/training/common/utils/FIleChecker.java @@ -3,7 +3,6 @@ package com.kamco.cd.training.common.utils; import static java.lang.String.CASE_INSENSITIVE_ORDER; import com.jcraft.jsch.ChannelExec; -import com.jcraft.jsch.ChannelSftp; import com.jcraft.jsch.JSch; import com.jcraft.jsch.Session; import com.kamco.cd.training.common.exception.CustomApiException; @@ -795,92 +794,6 @@ public class FIleChecker { return destFile; } - public static void uploadTo86(Path localFile) { - - String host = "192.168.2.86"; - int port = 22; - String username = "kcomu"; - String password = "Kamco2025!"; - - String remoteDir = "/home/kcomu/data/request"; - - Session session = null; - ChannelSftp channel = null; - - try { - JSch jsch = new JSch(); - - session = jsch.getSession(username, host, port); - session.setPassword(password); - - Properties config = new Properties(); - config.put("StrictHostKeyChecking", "no"); - session.setConfig(config); - - session.connect(10_000); - - channel = (ChannelSftp) session.openChannel("sftp"); - channel.connect(10_000); - - // 목적지 디렉토리 이동 - channel.cd(remoteDir); - - // 업로드 - channel.put(localFile.toString(), localFile.getFileName().toString()); - - } catch (Exception e) { - throw new RuntimeException("SFTP upload failed", e); - } finally { - if (channel != null) channel.disconnect(); - if (session != null) session.disconnect(); - } - } - - public static void unzipOn86Server(String zipPath, String targetDir) { - - String host = "192.168.2.86"; - String user = "kcomu"; - String password = "Kamco2025!"; - - Session session = null; - ChannelExec channel = null; - - try { - JSch jsch = new JSch(); - - session = jsch.getSession(user, host, 22); - session.setPassword(password); - - Properties config = new Properties(); - config.put("StrictHostKeyChecking", "no"); - session.setConfig(config); - - session.connect(10_000); - - String command = "unzip -o " + zipPath + " -d " + targetDir; - - channel = (ChannelExec) session.openChannel("exec"); - channel.setCommand(command); - channel.setErrStream(System.err); - - InputStream in = channel.getInputStream(); - channel.connect(); - - // 출력 읽기(선택) - try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) { - while (br.readLine() != null) { - // 필요하면 로그 - } - } - - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - if (channel != null) channel.disconnect(); - if (session != null) session.disconnect(); - } - } - public static List execCommandAndReadLines(String command) { List result = new ArrayList<>(); diff --git a/src/main/java/com/kamco/cd/training/dataset/service/DatasetService.java b/src/main/java/com/kamco/cd/training/dataset/service/DatasetService.java index cc0088e..ba54cb1 100644 --- a/src/main/java/com/kamco/cd/training/dataset/service/DatasetService.java +++ b/src/main/java/com/kamco/cd/training/dataset/service/DatasetService.java @@ -417,6 +417,7 @@ public class DatasetService { } private String escape(String path) { + // 쉘 커맨드에서 안전하게 사용할 수 있도록 문자열을 작은따옴표로 감싸면서, 내부의 작은따옴표를 이스케이프 처리 return "'" + path.replace("'", "'\"'\"'") + "'"; } diff --git a/src/main/java/com/kamco/cd/training/postgres/core/ModelTrainJobCoreService.java b/src/main/java/com/kamco/cd/training/postgres/core/ModelTrainJobCoreService.java index b8c396d..8cc1fe8 100644 --- a/src/main/java/com/kamco/cd/training/postgres/core/ModelTrainJobCoreService.java +++ b/src/main/java/com/kamco/cd/training/postgres/core/ModelTrainJobCoreService.java @@ -5,6 +5,8 @@ import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity; import com.kamco.cd.training.postgres.repository.train.ModelTrainJobRepository; import com.kamco.cd.training.train.dto.ModelTrainJobDto; import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -118,6 +120,28 @@ public class ModelTrainJobCoreService { log.info("[TRAIN JOB FAIL] jobId={}, modelId={}", jobId, errorMessage); } + /** + * 중단됨 처리 + * + * @param jobId + * @param exitCode + * @param errorMessage + */ + @Transactional + public void markPaused(Long jobId, Integer exitCode, String errorMessage) { + ModelTrainJobEntity job = + modelTrainJobRepository + .findById(jobId) + .orElseThrow(() -> new CustomApiException("NOT_FOUND_DATA", HttpStatus.NOT_FOUND)); + + job.setStatusCd("STOPPED"); + job.setExitCode(exitCode); + job.setErrorMessage(errorMessage); + job.setFinishedDttm(ZonedDateTime.now()); + + log.info("[TRAIN JOB FAIL] jobId={}, modelId={}", jobId, errorMessage); + } + /** 취소 처리 */ @Transactional public void markCanceled(Long jobId) { @@ -151,11 +175,13 @@ public class ModelTrainJobCoreService { * * @return */ - public ModelTrainJobDto findRunningJobs() { - ModelTrainJobEntity entity = modelTrainJobRepository.findRunningJobs().orElse(null); - if (entity == null) { - return null; + public List findRunningJobs() { + List entity = modelTrainJobRepository.findRunningJobs(); + + if (entity == null || entity.isEmpty()) { + return Collections.emptyList(); } - return entity.toDto(); + + return entity.stream().map(ModelTrainJobEntity::toDto).toList(); } } diff --git a/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryCustom.java b/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryCustom.java index 61724a9..e81fb80 100644 --- a/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryCustom.java +++ b/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryCustom.java @@ -1,6 +1,7 @@ package com.kamco.cd.training.postgres.repository.train; import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity; +import java.util.List; import java.util.Optional; public interface ModelTrainJobRepositoryCustom { @@ -12,5 +13,5 @@ public interface ModelTrainJobRepositoryCustom { void insertModelTestTrainingRun(Long modelId, Long jobId, int epoch); - Optional findRunningJobs(); + List findRunningJobs(); } diff --git a/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryImpl.java b/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryImpl.java index 9a296ab..698e302 100644 --- a/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryImpl.java +++ b/src/main/java/com/kamco/cd/training/postgres/repository/train/ModelTrainJobRepositoryImpl.java @@ -9,6 +9,7 @@ import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity; import com.kamco.cd.training.postgres.entity.QModelTrainJobEntity; import com.querydsl.jpa.impl.JPAQueryFactory; import jakarta.persistence.EntityManager; +import java.util.List; import java.util.Optional; import org.springframework.stereotype.Repository; @@ -83,18 +84,16 @@ public class ModelTrainJobRepositoryImpl implements ModelTrainJobRepositoryCusto } @Override - public Optional findRunningJobs() { - return Optional.ofNullable( - queryFactory - .select(modelTrainJobEntity) - .from(modelTrainJobEntity) - .where( - modelTrainJobEntity - .statusCd - .eq(JobStatusType.RUNNING.getId()) - .and(modelTrainJobEntity.jobType.eq(JobType.TRAIN.getId()))) - .orderBy(modelTrainJobEntity.id.desc()) - .limit(1) - .fetchOne()); + public List findRunningJobs() { + return queryFactory + .select(modelTrainJobEntity) + .from(modelTrainJobEntity) + .where( + modelTrainJobEntity + .statusCd + .eq(JobStatusType.RUNNING.getId()) + .and(modelTrainJobEntity.jobType.eq(JobType.TRAIN.getId()))) + .orderBy(modelTrainJobEntity.id.desc()) + .fetch(); } } diff --git a/src/main/java/com/kamco/cd/training/train/TrainApiController.java b/src/main/java/com/kamco/cd/training/train/TrainApiController.java index 941278b..1c5f9d4 100644 --- a/src/main/java/com/kamco/cd/training/train/TrainApiController.java +++ b/src/main/java/com/kamco/cd/training/train/TrainApiController.java @@ -185,7 +185,7 @@ public class TrainApiController { }) @PostMapping("/create-tmp/{uuid}") public ApiResponseDto createTmpFile( - @Parameter(description = "uuid", example = "80a0e544-36ed-4999-b705-97427f23337d") + @Parameter(description = "model uuid", example = "80a0e544-36ed-4999-b705-97427f23337d") @PathVariable UUID uuid) { diff --git a/src/main/java/com/kamco/cd/training/train/service/DataSetCountersService.java b/src/main/java/com/kamco/cd/training/train/service/DataSetCountersService.java index 82eeaaa..20f5766 100644 --- a/src/main/java/com/kamco/cd/training/train/service/DataSetCountersService.java +++ b/src/main/java/com/kamco/cd/training/train/service/DataSetCountersService.java @@ -6,13 +6,17 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +/** 학습실행 파일 하드링크 */ @Service @Log4j2 @RequiredArgsConstructor @@ -42,6 +46,10 @@ public class DataSetCountersService { // tmp Path tmpPath = Path.of(trainBaseDir, "tmp", basic.getRequestPath()); + + // 차이나는거 + diffMergedRequestsVsTmp(uids, tmpPath); + DatasetCounters counters2 = countTmpAfterBuild(tmpPath); allLogs .append(counters2.prints(basic.getRequestPath(), "TMP")) @@ -163,4 +171,58 @@ public class DataSetCountersService { test + test2); } } + + private Set listTifRelative(Path root) throws IOException { + if (!Files.isDirectory(root)) return Set.of(); + + try (var stream = Files.walk(root)) { + return stream + .filter(Files::isRegularFile) + .filter(p -> p.getFileName().toString().toLowerCase().endsWith(".tif")) + .map(p -> root.relativize(p).toString().replace("\\", "/")) + .collect(Collectors.toSet()); + } + } + + private Set listTifFileNameOnly(Path root) throws IOException { + if (!Files.isDirectory(root)) return Set.of(); + + try (var stream = Files.walk(root)) { + return stream + .filter(Files::isRegularFile) + .filter(p -> p.getFileName().toString().toLowerCase().endsWith(".tif")) + .map(p -> p.getFileName().toString()) // 파일명만 + .collect(Collectors.toSet()); + } + } + + public void diffMergedRequestsVsTmp(List uids, Path tmpRoot) throws IOException { + + // 1) 요청 uids 전체를 합친 tif "파일명" 집합 + Set reqAll = new HashSet<>(); + for (String uid : uids) { + Path reqRoot = Path.of(requestDir, uid); + + // ★합본 tmp는 보통 폴더 구조가 바뀌므로 "상대경로" 비교보다 파일명 비교가 먼저 유용합니다. + reqAll.addAll(listTifFileNameOnly(reqRoot)); + } + + // 2) tmp tif 파일명 집합 + Set tmpAll = listTifFileNameOnly(tmpRoot); + + Set missing = new HashSet<>(reqAll); + missing.removeAll(tmpAll); + + Set extra = new HashSet<>(tmpAll); + extra.removeAll(reqAll); + + log.info("==== MERGED DIFF (filename-based) ===="); + log.info("request(all uids) tif = {}", reqAll.size()); + log.info("tmp tif = {}", tmpAll.size()); + log.info("missing = {}", missing.size()); + log.info("extra = {}", extra.size()); + + missing.stream().sorted().limit(50).forEach(f -> log.warn("[MISSING] {}", f)); + extra.stream().sorted().limit(50).forEach(f -> log.warn("[EXTRA] {}", f)); + } } diff --git a/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java b/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java index 9ea39ca..3e3439a 100644 --- a/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java +++ b/src/main/java/com/kamco/cd/training/train/service/JobRecoveryOnStartupService.java @@ -1,74 +1,449 @@ package com.kamco.cd.training.train.service; +import com.kamco.cd.training.model.dto.ModelTrainMngDto; import com.kamco.cd.training.postgres.core.ModelTrainJobCoreService; +import com.kamco.cd.training.postgres.core.ModelTrainMngCoreService; import com.kamco.cd.training.train.dto.ModelTrainJobDto; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; -import org.springframework.boot.context.event.ApplicationReadyEvent; -import org.springframework.context.event.EventListener; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Profile; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -/** 실행중 학습이 있을때 처리 */ +/** + * 서버 재기동 시 "RUNNING 상태로 남아있는 학습 Job"을 복구(정리)하기 위한 서비스. + * + *

상황 예시: - 서버가 강제 재기동/장애로 내려감 - DB 상에서는 job_state가 RUNNING(진행중)으로 남아있음 - 실제 docker 컨테이너는: 1) 아직 + * 살아있거나(running=true) 2) 종료되었거나(exited) 3) --rm 옵션으로 인해 컨테이너가 이미 삭제되어 존재하지 않을 수 있음 + * + *

이 클래스는 ApplicationReadyEvent(스프링 부팅 완료) 시점에 실행되어, DB의 RUNNING 잡들을 조회한 뒤 컨테이너 상태를 점검하고, + * SUCCESS/FAILED 처리를 수행합니다. + */ +@Profile("!local") @Component @RequiredArgsConstructor @Log4j2 -@Transactional(readOnly = true) public class JobRecoveryOnStartupService { + private final ModelTrainJobCoreService modelTrainJobCoreService; + private final ModelTrainMngCoreService modelTrainMngCoreService; - @EventListener(ApplicationReadyEvent.class) + /** + * Docker 컨테이너가 쓰는 response(산출물) 디렉토리의 "호스트 측" 베이스 경로. 예) /data/train/response + * + *

컨테이너가 --rm 으로 삭제된 경우에도 이 경로에 val.csv / *.pth 등이 남아있으면 정상 종료 여부를 "파일 기반"으로 판정합니다. + */ + @Value("${train.docker.responseDir}") + private String responseDir; + + /** + * 스프링 부팅 완료 시점(빈 생성/초기화 모두 끝난 뒤)에 복구 로직 실행. + * + *

@Transactional: - recover() 메서드 전체가 하나의 트랜잭션으로 감싸집니다. - Job 하나씩 처리하다가 예외가 발생하면 전체 롤백이 될 수 + * 있으므로 "잡 단위로 확실히 커밋"이 필요하면 (권장) 잡 단위로 분리 트랜잭션(REQUIRES_NEW) 고려하세요. + */ + // @EventListener(ApplicationReadyEvent.class) + @Transactional public void recover() { - // RUNNING 중인 학습이 있는지 조회 - ModelTrainJobDto runningJobs = modelTrainJobCoreService.findRunningJobs(); - if (runningJobs == null) { + // 1) DB에서 "RUNNING(진행중) 상태"로 남아있는 job 목록을 조회 + List runningJobs = modelTrainJobCoreService.findRunningJobs(); + + // 실행중 job이 없으면 할 일 없음 + if (runningJobs == null || runningJobs.isEmpty()) { return; } - String containerName = runningJobs.getContainerName(); + // 2) 각 job에 대해 docker 컨테이너 상태를 확인하고, 상태에 따라 조치 + for (ModelTrainJobDto job : runningJobs) { + String containerName = job.getContainerName(); - try { - boolean containerAlive = isContainerRunning(containerName); + try { + // 2-1) docker inspect로 컨테이너 상태 조회 + DockerInspectState state = inspectContainer(containerName); - if (containerAlive) { - // 컨테이너 살아있으면 → RUNNING 유지 - log.info("[RECOVERY] container still running: {}", containerName); + // 3) 컨테이너가 "없음" + // - docker run --rm 로 실행한 컨테이너는 정상 종료 시 바로 삭제될 수 있음 + // - 즉 "컨테이너 없음"이 무조건 실패는 아님 + if (!state.exists()) { + log.warn( + "[RECOVERY] container missing. try file-based reconcile. container={}", + containerName); - } else { - // 컨테이너 죽었으면 → FAILED 처리 - log.info("[RECOVERY] container not found. mark FAILED: {}", containerName); + // 3-1) 컨테이너가 없을 때는 산출물(responseDir)을 보고 완료 여부를 "추정" + OutputResult out = probeOutputs(job); + + // 3-2) 산출물이 충분하면 성공 처리 + if (out.completed()) { + log.info("[RECOVERY] outputs look completed. mark SUCCESS. jobId={}", job.getId()); + modelTrainJobCoreService.markSuccess(job.getId(), 0); + markStepSuccessByJobType(job); + + } else { + // 3-3) 산출물이 부족하면 실패 처리(운영 정책에 따라 "유예"도 가능) + log.warn( + "[RECOVERY] outputs incomplete. mark FAILED. jobId={} reason={}", + job.getId(), + out.reason()); + + modelTrainJobCoreService.markFailed( + job.getId(), -1, "SERVER_RESTART_CONTAINER_MISSING_OUTPUT_INCOMPLETE"); + + markStepErrorByJobType(job, out.reason()); + } + continue; + } + + // 4) 컨테이너는 존재하고, 아직 running=true + // - 서버만 재기동됐고 컨테이너는 그대로 살아있는 케이스 + // - 이 경우 DB를 건드리면 오히려 꼬일 수 있으니 RUNNING 유지 + if (state.running()) { + log.info("[RECOVERY] container still running. container={}", containerName); + try { + ProcessBuilder pb = new ProcessBuilder("docker", "stop", "-t", "20", containerName); + pb.redirectErrorStream(true); + + Process p = pb.start(); + + boolean finished = p.waitFor(30, TimeUnit.SECONDS); + if (!finished) { + p.destroyForcibly(); + throw new IOException("docker stop timeout"); + } + + int code = p.exitValue(); + if (code != 0) { + throw new IOException("docker stop failed. exit=" + code); + } + + log.info( + "[RECOVERY] container stopped (will be auto removed by --rm). container={}", + containerName); + + // 여기서 상태를 PAUSED로 바꿔도 되고 + modelTrainJobCoreService.markPaused(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART"); + + } catch (Exception e) { + log.error("[RECOVERY] docker stop failed. container={}", containerName, e); + + modelTrainJobCoreService.markFailed(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART"); + } + continue; + } + + // 5) 컨테이너는 존재하지만 running=false + // - exited / dead 등의 상태 + Integer exitCode = state.exitCode(); + String status = state.status(); + + // 5-1) exitCode=0이면 정상 종료로 간주 → SUCCESS 처리 + if (exitCode != null && exitCode == 0) { + log.info("[RECOVERY] container exited(0). mark SUCCESS. container={}", containerName); + modelTrainJobCoreService.markSuccess(job.getId(), 0); + markStepSuccessByJobType(job); + + } else { + // 5-2) exitCode != 0 이거나 null이면 실패로 간주 → FAILED 처리 + log.warn( + "[RECOVERY] container exited non-zero. mark FAILED. container={} status={} exitCode={}", + containerName, + status, + exitCode); + + modelTrainJobCoreService.markFailed( + job.getId(), exitCode, "SERVER_RESTART_CONTAINER_EXIT_NONZERO"); + + markStepErrorByJobType(job, "exit=" + exitCode + " status=" + status); + } + + } catch (Exception e) { + // 6) docker inspect 자체가 실패한 경우 + // - docker 데몬 문제/권한 문제/일시적 오류 가능 + // - 운영 정책에 따라 "바로 실패" 대신 "유예" 처리도 고려 가능 + log.error("[RECOVERY] container inspect failed. container={}", containerName, e); modelTrainJobCoreService.markFailed( - runningJobs.getId(), null, "SERVER_RESTART_CONTAINER_NOT_FOUND"); - } - } catch (IOException e) { - log.error("[RECOVERY] container check failed. mark FAILED: {}", containerName, e); + job.getId(), -1, "SERVER_RESTART_CONTAINER_INSPECT_ERROR"); - modelTrainJobCoreService.markFailed( - runningJobs.getId(), null, "SERVER_RESTART_CONTAINER_CHECK_ERROR"); + markStepErrorByJobType(job, "inspect-error"); + } } } /** - * docker 실행중인지 확인하기 + * jobType에 따라 학습 관리 테이블의 "성공 단계"를 업데이트. * - * @param containerName container name - * @return true, false - * @throws IOException + *

예: - jobType == "EVAL" → step2(평가 단계) 성공 - 그 외 → step1(학습 단계) 성공 */ - private boolean isContainerRunning(String containerName) throws IOException { + private void markStepSuccessByJobType(ModelTrainJobDto job) { + Map params = job.getParamsJson(); + boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType"))); + if (isEval) { + modelTrainMngCoreService.markStep2Success(job.getModelId()); + } else { + modelTrainMngCoreService.markStep1Success(job.getModelId()); + } + } + + /** + * jobType에 따라 학습 관리 테이블의 "에러 단계"를 업데이트. + * + *

예: - jobType == "EVAL" → step2(평가 단계) 에러 - 그 외 → step1 혹은 전체 에러 + */ + private void markStepErrorByJobType(ModelTrainJobDto job, String msg) { + Map params = job.getParamsJson(); + boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType"))); + if (isEval) { + modelTrainMngCoreService.markStep2Error(job.getModelId(), msg); + } else { + modelTrainMngCoreService.markError(job.getModelId(), msg); + } + } + + /** + * docker inspect를 사용해서 컨테이너 상태를 조회합니다. + * + *

사용하는 템플릿: {{.State.Status}} {{.State.Running}} {{.State.ExitCode}} + * + *

예상 출력 예: - "running true 0" - "exited false 0" - "exited false 137" + * + *

주의: - 컨테이너가 없거나 inspect 실패 시 exitCode != 0 또는 output이 비어서 missing() 반환 - 무한 대기 방지를 위해 5초 + * 타임아웃을 둠 + */ + private DockerInspectState inspectContainer(String containerName) + throws IOException, InterruptedException { ProcessBuilder pb = - new ProcessBuilder("docker", "inspect", "-f", "{{.State.Running}}", containerName); + new ProcessBuilder( + "docker", + "inspect", + "-f", + "{{.State.Status}} {{.State.Running}} {{.State.ExitCode}}", + containerName); + + // stderr를 stdout으로 합쳐서 한 스트림으로 읽기(에러 메시지도 함께 받음) + pb.redirectErrorStream(true); Process p = pb.start(); - BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream())); - String line = br.readLine(); - return "true".equals(line); + // inspect 출력은 1줄이면 충분하므로 readLine()만 수행 + String output; + try (BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()))) { + output = br.readLine(); + } + + // 무한대기 방지: 5초 내에 종료되지 않으면 강제 종료 + boolean finished = p.waitFor(5, TimeUnit.SECONDS); + if (!finished) { + p.destroyForcibly(); + throw new IOException("docker inspect timeout"); + } + + // docker inspect 자체의 프로세스 exit code + int code = p.exitValue(); + + // 실패(코드 !=0) 또는 출력이 없으면 "컨테이너 없음"으로 간주 + if (code != 0 || output == null || output.isBlank()) { + return DockerInspectState.missing(); + } + + // "status running exitCode" 형태로 split + String[] parts = output.trim().split("\\s+"); + + // status: running/exited/dead 등 + String status = parts.length > 0 ? parts[0] : "unknown"; + + // running: true/false + boolean running = parts.length > 1 && Boolean.parseBoolean(parts[1]); + + // exitCode: 정수 파싱(파싱 실패하면 null) + Integer exitCode = null; + if (parts.length > 2) { + try { + exitCode = Integer.parseInt(parts[2]); + } catch (Exception ignore) { + // ignore + } + } + + return new DockerInspectState(true, running, exitCode, status); + } + + /** + * docker inspect 결과를 담는 레코드. + * + *

exists: - true : docker inspect 성공 (컨테이너 존재) - false : 컨테이너 없음(또는 inspect 실패를 missing으로 간주) + */ + private record DockerInspectState( + boolean exists, boolean running, Integer exitCode, String status) { + static DockerInspectState missing() { + return new DockerInspectState(false, false, null, "missing"); + } + } + + // ============================================================================================ + // 컨테이너가 "없을 때" 파일 기반으로 완료/미완료를 판정하는 로직 + // ============================================================================================ + + /** + * 컨테이너가 없을 때(responseDir 산출물만 남아있는 상태) 완료 여부를 파일 기반으로 판정합니다. + * + *

판정 규칙(보수적으로 설계): 1) total_epoch가 paramsJson에 있어야 함 (없으면 완료 판단 불가) 2) val.csv 존재 + 헤더 제외 라인 수 + * >= total_epoch 이어야 함 3) *.pth 파일이 total_epoch 이상 존재하거나, best*.pth(또는 *best*.pth)가 존재해야 함 + * + *

왜 이렇게? - 어떤 학습은 epoch마다 pth를 남기고 - 어떤 학습은 best만 남기기도 해서 "pthCount >= total_epoch"만 쓰면 정상 종료를 + * 실패로 오판할 수 있음. + */ + private OutputResult probeOutputs(ModelTrainJobDto job) { + try { + Path outDir = resolveOutputDir(job); + if (outDir == null || !Files.isDirectory(outDir)) { + return new OutputResult(false, "output-dir-missing"); + } + + Integer totalEpoch = extractTotalEpoch(job).orElse(null); + if (totalEpoch == null || totalEpoch <= 0) { + return new OutputResult(false, "total-epoch-missing"); + } + + Path valCsv = outDir.resolve("val.csv"); + if (!Files.exists(valCsv)) { + return new OutputResult(false, "val.csv-missing"); + } + + long lines = countNonHeaderLines(valCsv); + + // “같아야 완료” 정책 + if (lines == totalEpoch) { + return new OutputResult(true, "ok"); + } + + return new OutputResult( + false, "val.csv-lines-mismatch lines=" + lines + " expected=" + totalEpoch); + + } catch (Exception e) { + log.error("[RECOVERY] probeOutputs error. jobId={}", job.getId(), e); + return new OutputResult(false, "probe-error"); + } + } + + /** + * responseDir 아래에서 job 산출물 디렉토리를 찾습니다. + * + *

가장 중요한 커스터마이징 포인트: - 실제 운영 환경에서 산출물이 어떤 경로 규칙으로 저장되는지에 따라 여기만 수정하면 됩니다. + * + *

현재 기본 탐색 순서: 1) {responseDir}/{jobId} 2) {responseDir}/{modelId} 3) + * {responseDir}/{containerName} 4) 마지막 fallback: responseDir 자체 + * + *

추천: - 여러분 규칙이 "{responseDir}/{modelId}/{jobId}" 같은 형태라면 base.resolve(modelId).resolve(jobId) + * 형태를 1순위로 두세요. + */ + private Path resolveOutputDir(ModelTrainJobDto job) { + ModelTrainMngDto.Basic model = modelTrainMngCoreService.findModelById(job.getModelId()); + + Path base = Paths.get(responseDir, model.getUuid().toString(), "metrics"); + + return Files.isDirectory(base) ? base : null; + } + + /** + * paramsJson에서 total_epoch 값을 추출합니다. + * + *

키 후보: - "total_epoch" (snake_case) - "totalEpoch" (camelCase) + * + *

예: paramsJson = {"jobType":"TRAIN","total_epoch":50,...} + */ + private Optional extractTotalEpoch(ModelTrainJobDto job) { + Map params = job.getParamsJson(); + if (params == null) return Optional.empty(); + + Object v = params.get("total_epoch"); + if (v == null) v = params.get("totalEpoch"); + if (v == null) return Optional.empty(); + + try { + return Optional.of(Integer.parseInt(String.valueOf(v))); + } catch (Exception ignore) { + return Optional.empty(); + } + } + + /** + * CSV 파일에서 "헤더(첫 줄)"를 제외한 라인 수를 계산합니다. + * + *

가정: - val.csv 첫 줄은 헤더 - 이후 라인들이 epoch별 기록(또는 유사한 누적 기록) + * + *

주의: - 파일 인코딩은 UTF-8로 가정 - 빈 줄은 제외 + */ + private long countNonHeaderLines(Path csv) throws IOException { + try (Stream lines = Files.lines(csv, StandardCharsets.UTF_8)) { + return lines.skip(1).filter(s -> s != null && !s.isBlank()).count(); + } + } + + /** + * 디렉토리에서 glob 패턴에 맞는 파일 수를 셉니다. + * + *

예: - "*.pth" - "best*.pth" + */ + private long countFilesByGlob(Path dir, String glob) throws IOException { + try (DirectoryStream ds = Files.newDirectoryStream(dir, glob)) { + long cnt = 0; + for (Path p : ds) { + if (Files.isRegularFile(p)) cnt++; + } + return cnt; + } + } + + /** 디렉토리에서 glob 패턴에 맞는 파일이 "하나라도" 존재하는지 체크합니다. */ + private boolean existsByGlob(Path dir, String glob) throws IOException { + try (DirectoryStream ds = Files.newDirectoryStream(dir, glob)) { + return ds.iterator().hasNext(); + } + } + + // ============================================================================================ + // probeOutputs() 결과 객체 + // ============================================================================================ + + /** + * 컨테이너가 없을 때(responseDir 기반) 완료 여부 판정 결과. + * + *

completed: - true : 산출물이 완료로 보임(성공 처리 가능) - false : 산출물이 부족/불명확(실패 또는 유예 판단) + * + *

reason: - 실패/미완료 사유(로그/DB 메시지로 남기기 용도) + */ + private static final class OutputResult { + + private final boolean completed; + private final String reason; + + private OutputResult(boolean completed, String reason) { + this.completed = completed; + this.reason = reason; + } + + boolean completed() { + return completed; + } + + String reason() { + return reason; + } } } diff --git a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java index cecceb1..978fe2b 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TmpDatasetService.java @@ -26,6 +26,7 @@ public class TmpDatasetService { * @param uid 임시폴더 uuid * @param type train, val, test * @param links tif pull path + * @return * @throws IOException */ public void buildTmpDatasetHardlink(String uid, String type, List links) diff --git a/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java b/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java index b5dd84e..d2dd5a6 100644 --- a/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java +++ b/src/main/java/com/kamco/cd/training/train/service/TrainJobService.java @@ -266,6 +266,8 @@ public class TrainJobService { List uids = modelTrainMngCoreService.findDatasetUid(datasetIds); try { + // 데이터셋 심볼링크 생성 + // String pathUid = tmpDatasetService.buildTmpDatasetSymlink(raw, uids); // train path List trainList = modelTrainMngCoreService.findDatasetTrainPath(modelId); // validation path