8 Commits

Author SHA1 Message Date
265813e6f7 Merge branch 'feat/training_260202' of https://kamco.git.gs.dabeeo.com/MVPTeam/kamco-train-api into feat/training_260202 2026-03-10 14:19:40 +09:00
8190a6e9c8 unzip 2026-03-10 14:19:22 +09:00
e9f8bb37fa spotless 적용 2026-03-03 23:06:45 +09:00
f3c822587f spotless 적용 2026-03-03 23:02:53 +09:00
335f0dbb9b spotless 적용 2026-03-03 23:01:22 +09:00
69eaba1a83 하드링크 수정 2026-03-03 22:51:10 +09:00
365ad81cad 리커버리 삭제 2026-02-28 01:24:34 +09:00
9dfa54fbf9 리커버리 추가 2026-02-28 01:01:38 +09:00
8 changed files with 639 additions and 202 deletions

View File

@@ -3,7 +3,6 @@ package com.kamco.cd.training.common.utils;
import static java.lang.String.CASE_INSENSITIVE_ORDER; import static java.lang.String.CASE_INSENSITIVE_ORDER;
import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.ChannelExec;
import com.jcraft.jsch.ChannelSftp;
import com.jcraft.jsch.JSch; import com.jcraft.jsch.JSch;
import com.jcraft.jsch.Session; import com.jcraft.jsch.Session;
import com.kamco.cd.training.common.exception.CustomApiException; import com.kamco.cd.training.common.exception.CustomApiException;
@@ -720,18 +719,26 @@ public class FIleChecker {
public static void unzip(String fileName, String destDirectory) throws IOException { public static void unzip(String fileName, String destDirectory) throws IOException {
String zipFilePath = destDirectory + File.separator + fileName; String zipFilePath = destDirectory + File.separator + fileName;
log.info("fileName : {}", fileName);
log.info("destDirectory : {}", destDirectory);
log.info("zipFilePath : {}", zipFilePath);
// zip 이름으로 폴더 생성 (확장자 제거) // zip 이름으로 폴더 생성 (확장자 제거)
String folderName = String folderName =
fileName.endsWith(".zip") ? fileName.substring(0, fileName.length() - 4) : fileName; fileName.endsWith(".zip") ? fileName.substring(0, fileName.length() - 4) : fileName;
log.info("folderName : {}", folderName);
File destDir = new File(destDirectory, folderName); File destDir = new File(destDirectory, folderName);
log.info("destDir : {}", destDir);
// 동일 폴더가 이미 있으면 삭제 // 동일 폴더가 이미 있으면 삭제
log.info("111 destDir.exists() : {}", destDir.exists());
if (destDir.exists()) { if (destDir.exists()) {
deleteDirectoryRecursively(destDir.toPath()); deleteDirectoryRecursively(destDir.toPath());
} }
log.info("222 destDir.exists() : {}", destDir.exists());
if (!destDir.exists()) { if (!destDir.exists()) {
log.info("mkdirs : {}", destDir.exists());
destDir.mkdirs(); destDir.mkdirs();
} }
@@ -787,92 +794,6 @@ public class FIleChecker {
return destFile; return destFile;
} }
public static void uploadTo86(Path localFile) {
String host = "192.168.2.86";
int port = 22;
String username = "kcomu";
String password = "Kamco2025!";
String remoteDir = "/home/kcomu/data/request";
Session session = null;
ChannelSftp channel = null;
try {
JSch jsch = new JSch();
session = jsch.getSession(username, host, port);
session.setPassword(password);
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect(10_000);
channel = (ChannelSftp) session.openChannel("sftp");
channel.connect(10_000);
// 목적지 디렉토리 이동
channel.cd(remoteDir);
// 업로드
channel.put(localFile.toString(), localFile.getFileName().toString());
} catch (Exception e) {
throw new RuntimeException("SFTP upload failed", e);
} finally {
if (channel != null) channel.disconnect();
if (session != null) session.disconnect();
}
}
public static void unzipOn86Server(String zipPath, String targetDir) {
String host = "192.168.2.86";
String user = "kcomu";
String password = "Kamco2025!";
Session session = null;
ChannelExec channel = null;
try {
JSch jsch = new JSch();
session = jsch.getSession(user, host, 22);
session.setPassword(password);
Properties config = new Properties();
config.put("StrictHostKeyChecking", "no");
session.setConfig(config);
session.connect(10_000);
String command = "unzip -o " + zipPath + " -d " + targetDir;
channel = (ChannelExec) session.openChannel("exec");
channel.setCommand(command);
channel.setErrStream(System.err);
InputStream in = channel.getInputStream();
channel.connect();
// 출력 읽기(선택)
try (BufferedReader br = new BufferedReader(new InputStreamReader(in))) {
while (br.readLine() != null) {
// 필요하면 로그
}
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
if (channel != null) channel.disconnect();
if (session != null) session.disconnect();
}
}
public static List<String> execCommandAndReadLines(String command) { public static List<String> execCommandAndReadLines(String command) {
List<String> result = new ArrayList<>(); List<String> result = new ArrayList<>();

View File

@@ -417,6 +417,7 @@ public class DatasetService {
} }
private String escape(String path) { private String escape(String path) {
// 쉘 커맨드에서 안전하게 사용할 수 있도록 문자열을 작은따옴표로 감싸면서, 내부의 작은따옴표를 이스케이프 처리
return "'" + path.replace("'", "'\"'\"'") + "'"; return "'" + path.replace("'", "'\"'\"'") + "'";
} }

View File

@@ -5,6 +5,8 @@ import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity;
import com.kamco.cd.training.postgres.repository.train.ModelTrainJobRepository; import com.kamco.cd.training.postgres.repository.train.ModelTrainJobRepository;
import com.kamco.cd.training.train.dto.ModelTrainJobDto; import com.kamco.cd.training.train.dto.ModelTrainJobDto;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@@ -118,6 +120,28 @@ public class ModelTrainJobCoreService {
log.info("[TRAIN JOB FAIL] jobId={}, modelId={}", jobId, errorMessage); log.info("[TRAIN JOB FAIL] jobId={}, modelId={}", jobId, errorMessage);
} }
/**
* 중단됨 처리
*
* @param jobId
* @param exitCode
* @param errorMessage
*/
@Transactional
public void markPaused(Long jobId, Integer exitCode, String errorMessage) {
ModelTrainJobEntity job =
modelTrainJobRepository
.findById(jobId)
.orElseThrow(() -> new CustomApiException("NOT_FOUND_DATA", HttpStatus.NOT_FOUND));
job.setStatusCd("STOPPED");
job.setExitCode(exitCode);
job.setErrorMessage(errorMessage);
job.setFinishedDttm(ZonedDateTime.now());
log.info("[TRAIN JOB FAIL] jobId={}, modelId={}", jobId, errorMessage);
}
/** 취소 처리 */ /** 취소 처리 */
@Transactional @Transactional
public void markCanceled(Long jobId) { public void markCanceled(Long jobId) {
@@ -151,11 +175,13 @@ public class ModelTrainJobCoreService {
* *
* @return * @return
*/ */
public ModelTrainJobDto findRunningJobs() { public List<ModelTrainJobDto> findRunningJobs() {
ModelTrainJobEntity entity = modelTrainJobRepository.findRunningJobs().orElse(null); List<ModelTrainJobEntity> entity = modelTrainJobRepository.findRunningJobs();
if (entity == null) {
return null; if (entity == null || entity.isEmpty()) {
return Collections.emptyList();
} }
return entity.toDto();
return entity.stream().map(ModelTrainJobEntity::toDto).toList();
} }
} }

View File

@@ -1,6 +1,7 @@
package com.kamco.cd.training.postgres.repository.train; package com.kamco.cd.training.postgres.repository.train;
import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity; import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity;
import java.util.List;
import java.util.Optional; import java.util.Optional;
public interface ModelTrainJobRepositoryCustom { public interface ModelTrainJobRepositoryCustom {
@@ -12,5 +13,5 @@ public interface ModelTrainJobRepositoryCustom {
void insertModelTestTrainingRun(Long modelId, Long jobId, int epoch); void insertModelTestTrainingRun(Long modelId, Long jobId, int epoch);
Optional<ModelTrainJobEntity> findRunningJobs(); List<ModelTrainJobEntity> findRunningJobs();
} }

View File

@@ -9,6 +9,7 @@ import com.kamco.cd.training.postgres.entity.ModelTrainJobEntity;
import com.kamco.cd.training.postgres.entity.QModelTrainJobEntity; import com.kamco.cd.training.postgres.entity.QModelTrainJobEntity;
import com.querydsl.jpa.impl.JPAQueryFactory; import com.querydsl.jpa.impl.JPAQueryFactory;
import jakarta.persistence.EntityManager; import jakarta.persistence.EntityManager;
import java.util.List;
import java.util.Optional; import java.util.Optional;
import org.springframework.stereotype.Repository; import org.springframework.stereotype.Repository;
@@ -83,9 +84,8 @@ public class ModelTrainJobRepositoryImpl implements ModelTrainJobRepositoryCusto
} }
@Override @Override
public Optional<ModelTrainJobEntity> findRunningJobs() { public List<ModelTrainJobEntity> findRunningJobs() {
return Optional.ofNullable( return queryFactory
queryFactory
.select(modelTrainJobEntity) .select(modelTrainJobEntity)
.from(modelTrainJobEntity) .from(modelTrainJobEntity)
.where( .where(
@@ -94,7 +94,6 @@ public class ModelTrainJobRepositoryImpl implements ModelTrainJobRepositoryCusto
.eq(JobStatusType.RUNNING.getId()) .eq(JobStatusType.RUNNING.getId())
.and(modelTrainJobEntity.jobType.eq(JobType.TRAIN.getId()))) .and(modelTrainJobEntity.jobType.eq(JobType.TRAIN.getId())))
.orderBy(modelTrainJobEntity.id.desc()) .orderBy(modelTrainJobEntity.id.desc())
.limit(1) .fetch();
.fetchOne());
} }
} }

View File

@@ -1,74 +1,449 @@
package com.kamco.cd.training.train.service; package com.kamco.cd.training.train.service;
import com.kamco.cd.training.model.dto.ModelTrainMngDto;
import com.kamco.cd.training.postgres.core.ModelTrainJobCoreService; import com.kamco.cd.training.postgres.core.ModelTrainJobCoreService;
import com.kamco.cd.training.postgres.core.ModelTrainMngCoreService;
import com.kamco.cd.training.train.dto.ModelTrainJobDto; import com.kamco.cd.training.train.dto.ModelTrainJobDto;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2; import lombok.extern.log4j.Log4j2;
import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener; import org.springframework.context.annotation.Profile;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
/** 실행중 학습이 있을때 처리 */ /**
* 서버 재기동 시 "RUNNING 상태로 남아있는 학습 Job"을 복구(정리)하기 위한 서비스.
*
* <p>상황 예시: - 서버가 강제 재기동/장애로 내려감 - DB 상에서는 job_state가 RUNNING(진행중)으로 남아있음 - 실제 docker 컨테이너는: 1) 아직
* 살아있거나(running=true) 2) 종료되었거나(exited) 3) --rm 옵션으로 인해 컨테이너가 이미 삭제되어 존재하지 않을 수 있음
*
* <p>이 클래스는 ApplicationReadyEvent(스프링 부팅 완료) 시점에 실행되어, DB의 RUNNING 잡들을 조회한 뒤 컨테이너 상태를 점검하고,
* SUCCESS/FAILED 처리를 수행합니다.
*/
@Profile("!local")
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@Log4j2 @Log4j2
@Transactional(readOnly = true)
public class JobRecoveryOnStartupService { public class JobRecoveryOnStartupService {
private final ModelTrainJobCoreService modelTrainJobCoreService; private final ModelTrainJobCoreService modelTrainJobCoreService;
private final ModelTrainMngCoreService modelTrainMngCoreService;
@EventListener(ApplicationReadyEvent.class) /**
* Docker 컨테이너가 쓰는 response(산출물) 디렉토리의 "호스트 측" 베이스 경로. 예) /data/train/response
*
* <p>컨테이너가 --rm 으로 삭제된 경우에도 이 경로에 val.csv / *.pth 등이 남아있으면 정상 종료 여부를 "파일 기반"으로 판정합니다.
*/
@Value("${train.docker.responseDir}")
private String responseDir;
/**
* 스프링 부팅 완료 시점(빈 생성/초기화 모두 끝난 뒤)에 복구 로직 실행.
*
* <p>@Transactional: - recover() 메서드 전체가 하나의 트랜잭션으로 감싸집니다. - Job 하나씩 처리하다가 예외가 발생하면 전체 롤백이 될 수
* 있으므로 "잡 단위로 확실히 커밋"이 필요하면 (권장) 잡 단위로 분리 트랜잭션(REQUIRES_NEW) 고려하세요.
*/
// @EventListener(ApplicationReadyEvent.class)
@Transactional
public void recover() { public void recover() {
// RUNNING 중인 학습이 있는지 조회
ModelTrainJobDto runningJobs = modelTrainJobCoreService.findRunningJobs();
if (runningJobs == null) { // 1) DB에서 "RUNNING(진행중) 상태"로 남아있는 job 목록을 조회
List<ModelTrainJobDto> runningJobs = modelTrainJobCoreService.findRunningJobs();
// 실행중 job이 없으면 할 일 없음
if (runningJobs == null || runningJobs.isEmpty()) {
return; return;
} }
String containerName = runningJobs.getContainerName(); // 2) 각 job에 대해 docker 컨테이너 상태를 확인하고, 상태에 따라 조치
for (ModelTrainJobDto job : runningJobs) {
String containerName = job.getContainerName();
try { try {
boolean containerAlive = isContainerRunning(containerName); // 2-1) docker inspect로 컨테이너 상태 조회
DockerInspectState state = inspectContainer(containerName);
if (containerAlive) { // 3) 컨테이너가 "없음"
// 컨테이너 살아있으면 → RUNNING 유지 // - docker run --rm 로 실행한 컨테이너는 정상 종료 시 바로 삭제될 수 있음
log.info("[RECOVERY] container still running: {}", containerName); // - 즉 "컨테이너 없음"이 무조건 실패는 아님
if (!state.exists()) {
log.warn(
"[RECOVERY] container missing. try file-based reconcile. container={}",
containerName);
// 3-1) 컨테이너가 없을 때는 산출물(responseDir)을 보고 완료 여부를 "추정"
OutputResult out = probeOutputs(job);
// 3-2) 산출물이 충분하면 성공 처리
if (out.completed()) {
log.info("[RECOVERY] outputs look completed. mark SUCCESS. jobId={}", job.getId());
modelTrainJobCoreService.markSuccess(job.getId(), 0);
markStepSuccessByJobType(job);
} else { } else {
// 컨테이너 죽었으면 → FAILED 처리 // 3-3) 산출물이 부족하면 실패 처리(운영 정책에 따라 "유예"도 가능)
log.info("[RECOVERY] container not found. mark FAILED: {}", containerName); log.warn(
"[RECOVERY] outputs incomplete. mark FAILED. jobId={} reason={}",
job.getId(),
out.reason());
modelTrainJobCoreService.markFailed( modelTrainJobCoreService.markFailed(
runningJobs.getId(), null, "SERVER_RESTART_CONTAINER_NOT_FOUND"); job.getId(), -1, "SERVER_RESTART_CONTAINER_MISSING_OUTPUT_INCOMPLETE");
markStepErrorByJobType(job, out.reason());
} }
} catch (IOException e) { continue;
log.error("[RECOVERY] container check failed. mark FAILED: {}", containerName, e); }
// 4) 컨테이너는 존재하고, 아직 running=true
// - 서버만 재기동됐고 컨테이너는 그대로 살아있는 케이스
// - 이 경우 DB를 건드리면 오히려 꼬일 수 있으니 RUNNING 유지
if (state.running()) {
log.info("[RECOVERY] container still running. container={}", containerName);
try {
ProcessBuilder pb = new ProcessBuilder("docker", "stop", "-t", "20", containerName);
pb.redirectErrorStream(true);
Process p = pb.start();
boolean finished = p.waitFor(30, TimeUnit.SECONDS);
if (!finished) {
p.destroyForcibly();
throw new IOException("docker stop timeout");
}
int code = p.exitValue();
if (code != 0) {
throw new IOException("docker stop failed. exit=" + code);
}
log.info(
"[RECOVERY] container stopped (will be auto removed by --rm). container={}",
containerName);
// 여기서 상태를 PAUSED로 바꿔도 되고
modelTrainJobCoreService.markPaused(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART");
} catch (Exception e) {
log.error("[RECOVERY] docker stop failed. container={}", containerName, e);
modelTrainJobCoreService.markFailed(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART");
}
continue;
}
// 5) 컨테이너는 존재하지만 running=false
// - exited / dead 등의 상태
Integer exitCode = state.exitCode();
String status = state.status();
// 5-1) exitCode=0이면 정상 종료로 간주 → SUCCESS 처리
if (exitCode != null && exitCode == 0) {
log.info("[RECOVERY] container exited(0). mark SUCCESS. container={}", containerName);
modelTrainJobCoreService.markSuccess(job.getId(), 0);
markStepSuccessByJobType(job);
} else {
// 5-2) exitCode != 0 이거나 null이면 실패로 간주 → FAILED 처리
log.warn(
"[RECOVERY] container exited non-zero. mark FAILED. container={} status={} exitCode={}",
containerName,
status,
exitCode);
modelTrainJobCoreService.markFailed( modelTrainJobCoreService.markFailed(
runningJobs.getId(), null, "SERVER_RESTART_CONTAINER_CHECK_ERROR"); job.getId(), exitCode, "SERVER_RESTART_CONTAINER_EXIT_NONZERO");
markStepErrorByJobType(job, "exit=" + exitCode + " status=" + status);
}
} catch (Exception e) {
// 6) docker inspect 자체가 실패한 경우
// - docker 데몬 문제/권한 문제/일시적 오류 가능
// - 운영 정책에 따라 "바로 실패" 대신 "유예" 처리도 고려 가능
log.error("[RECOVERY] container inspect failed. container={}", containerName, e);
modelTrainJobCoreService.markFailed(
job.getId(), -1, "SERVER_RESTART_CONTAINER_INSPECT_ERROR");
markStepErrorByJobType(job, "inspect-error");
}
} }
} }
/** /**
* docker 실행중인지 확인하기 * jobType에 따라 학습 관리 테이블의 "성공 단계"를 업데이트.
* *
* @param containerName container name * <p>예: - jobType == "EVAL" → step2(평가 단계) 성공 - 그 외 → step1(학습 단계) 성공
* @return true, false
* @throws IOException
*/ */
private boolean isContainerRunning(String containerName) throws IOException { private void markStepSuccessByJobType(ModelTrainJobDto job) {
Map<String, Object> params = job.getParamsJson();
boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType")));
if (isEval) {
modelTrainMngCoreService.markStep2Success(job.getModelId());
} else {
modelTrainMngCoreService.markStep1Success(job.getModelId());
}
}
/**
* jobType에 따라 학습 관리 테이블의 "에러 단계"를 업데이트.
*
* <p>예: - jobType == "EVAL" → step2(평가 단계) 에러 - 그 외 → step1 혹은 전체 에러
*/
private void markStepErrorByJobType(ModelTrainJobDto job, String msg) {
Map<String, Object> params = job.getParamsJson();
boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType")));
if (isEval) {
modelTrainMngCoreService.markStep2Error(job.getModelId(), msg);
} else {
modelTrainMngCoreService.markError(job.getModelId(), msg);
}
}
/**
* docker inspect를 사용해서 컨테이너 상태를 조회합니다.
*
* <p>사용하는 템플릿: {{.State.Status}} {{.State.Running}} {{.State.ExitCode}}
*
* <p>예상 출력 예: - "running true 0" - "exited false 0" - "exited false 137"
*
* <p>주의: - 컨테이너가 없거나 inspect 실패 시 exitCode != 0 또는 output이 비어서 missing() 반환 - 무한 대기 방지를 위해 5초
* 타임아웃을 둠
*/
private DockerInspectState inspectContainer(String containerName)
throws IOException, InterruptedException {
ProcessBuilder pb = ProcessBuilder pb =
new ProcessBuilder("docker", "inspect", "-f", "{{.State.Running}}", containerName); new ProcessBuilder(
"docker",
"inspect",
"-f",
"{{.State.Status}} {{.State.Running}} {{.State.ExitCode}}",
containerName);
// stderr를 stdout으로 합쳐서 한 스트림으로 읽기(에러 메시지도 함께 받음)
pb.redirectErrorStream(true);
Process p = pb.start(); Process p = pb.start();
BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = br.readLine(); // inspect 출력은 1줄이면 충분하므로 readLine()만 수행
return "true".equals(line); String output;
try (BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
output = br.readLine();
}
// 무한대기 방지: 5초 내에 종료되지 않으면 강제 종료
boolean finished = p.waitFor(5, TimeUnit.SECONDS);
if (!finished) {
p.destroyForcibly();
throw new IOException("docker inspect timeout");
}
// docker inspect 자체의 프로세스 exit code
int code = p.exitValue();
// 실패(코드 !=0) 또는 출력이 없으면 "컨테이너 없음"으로 간주
if (code != 0 || output == null || output.isBlank()) {
return DockerInspectState.missing();
}
// "status running exitCode" 형태로 split
String[] parts = output.trim().split("\\s+");
// status: running/exited/dead 등
String status = parts.length > 0 ? parts[0] : "unknown";
// running: true/false
boolean running = parts.length > 1 && Boolean.parseBoolean(parts[1]);
// exitCode: 정수 파싱(파싱 실패하면 null)
Integer exitCode = null;
if (parts.length > 2) {
try {
exitCode = Integer.parseInt(parts[2]);
} catch (Exception ignore) {
// ignore
}
}
return new DockerInspectState(true, running, exitCode, status);
}
/**
* docker inspect 결과를 담는 레코드.
*
* <p>exists: - true : docker inspect 성공 (컨테이너 존재) - false : 컨테이너 없음(또는 inspect 실패를 missing으로 간주)
*/
private record DockerInspectState(
boolean exists, boolean running, Integer exitCode, String status) {
static DockerInspectState missing() {
return new DockerInspectState(false, false, null, "missing");
}
}
// ============================================================================================
// 컨테이너가 "없을 때" 파일 기반으로 완료/미완료를 판정하는 로직
// ============================================================================================
/**
* 컨테이너가 없을 때(responseDir 산출물만 남아있는 상태) 완료 여부를 파일 기반으로 판정합니다.
*
* <p>판정 규칙(보수적으로 설계): 1) total_epoch가 paramsJson에 있어야 함 (없으면 완료 판단 불가) 2) val.csv 존재 + 헤더 제외 라인 수
* >= total_epoch 이어야 함 3) *.pth 파일이 total_epoch 이상 존재하거나, best*.pth(또는 *best*.pth)가 존재해야 함
*
* <p>왜 이렇게? - 어떤 학습은 epoch마다 pth를 남기고 - 어떤 학습은 best만 남기기도 해서 "pthCount >= total_epoch"만 쓰면 정상 종료를
* 실패로 오판할 수 있음.
*/
private OutputResult probeOutputs(ModelTrainJobDto job) {
try {
Path outDir = resolveOutputDir(job);
if (outDir == null || !Files.isDirectory(outDir)) {
return new OutputResult(false, "output-dir-missing");
}
Integer totalEpoch = extractTotalEpoch(job).orElse(null);
if (totalEpoch == null || totalEpoch <= 0) {
return new OutputResult(false, "total-epoch-missing");
}
Path valCsv = outDir.resolve("val.csv");
if (!Files.exists(valCsv)) {
return new OutputResult(false, "val.csv-missing");
}
long lines = countNonHeaderLines(valCsv);
// “같아야 완료” 정책
if (lines == totalEpoch) {
return new OutputResult(true, "ok");
}
return new OutputResult(
false, "val.csv-lines-mismatch lines=" + lines + " expected=" + totalEpoch);
} catch (Exception e) {
log.error("[RECOVERY] probeOutputs error. jobId={}", job.getId(), e);
return new OutputResult(false, "probe-error");
}
}
/**
* responseDir 아래에서 job 산출물 디렉토리를 찾습니다.
*
* <p>가장 중요한 커스터마이징 포인트: - 실제 운영 환경에서 산출물이 어떤 경로 규칙으로 저장되는지에 따라 여기만 수정하면 됩니다.
*
* <p>현재 기본 탐색 순서: 1) {responseDir}/{jobId} 2) {responseDir}/{modelId} 3)
* {responseDir}/{containerName} 4) 마지막 fallback: responseDir 자체
*
* <p>추천: - 여러분 규칙이 "{responseDir}/{modelId}/{jobId}" 같은 형태라면 base.resolve(modelId).resolve(jobId)
* 형태를 1순위로 두세요.
*/
private Path resolveOutputDir(ModelTrainJobDto job) {
ModelTrainMngDto.Basic model = modelTrainMngCoreService.findModelById(job.getModelId());
Path base = Paths.get(responseDir, model.getUuid().toString(), "metrics");
return Files.isDirectory(base) ? base : null;
}
/**
* paramsJson에서 total_epoch 값을 추출합니다.
*
* <p>키 후보: - "total_epoch" (snake_case) - "totalEpoch" (camelCase)
*
* <p>예: paramsJson = {"jobType":"TRAIN","total_epoch":50,...}
*/
private Optional<Integer> extractTotalEpoch(ModelTrainJobDto job) {
Map<String, Object> params = job.getParamsJson();
if (params == null) return Optional.empty();
Object v = params.get("total_epoch");
if (v == null) v = params.get("totalEpoch");
if (v == null) return Optional.empty();
try {
return Optional.of(Integer.parseInt(String.valueOf(v)));
} catch (Exception ignore) {
return Optional.empty();
}
}
/**
* CSV 파일에서 "헤더(첫 줄)"를 제외한 라인 수를 계산합니다.
*
* <p>가정: - val.csv 첫 줄은 헤더 - 이후 라인들이 epoch별 기록(또는 유사한 누적 기록)
*
* <p>주의: - 파일 인코딩은 UTF-8로 가정 - 빈 줄은 제외
*/
private long countNonHeaderLines(Path csv) throws IOException {
try (Stream<String> lines = Files.lines(csv, StandardCharsets.UTF_8)) {
return lines.skip(1).filter(s -> s != null && !s.isBlank()).count();
}
}
/**
* 디렉토리에서 glob 패턴에 맞는 파일 수를 셉니다.
*
* <p>예: - "*.pth" - "best*.pth"
*/
private long countFilesByGlob(Path dir, String glob) throws IOException {
try (DirectoryStream<Path> ds = Files.newDirectoryStream(dir, glob)) {
long cnt = 0;
for (Path p : ds) {
if (Files.isRegularFile(p)) cnt++;
}
return cnt;
}
}
/** 디렉토리에서 glob 패턴에 맞는 파일이 "하나라도" 존재하는지 체크합니다. */
private boolean existsByGlob(Path dir, String glob) throws IOException {
try (DirectoryStream<Path> ds = Files.newDirectoryStream(dir, glob)) {
return ds.iterator().hasNext();
}
}
// ============================================================================================
// probeOutputs() 결과 객체
// ============================================================================================
/**
* 컨테이너가 없을 때(responseDir 기반) 완료 여부 판정 결과.
*
* <p>completed: - true : 산출물이 완료로 보임(성공 처리 가능) - false : 산출물이 부족/불명확(실패 또는 유예 판단)
*
* <p>reason: - 실패/미완료 사유(로그/DB 메시지로 남기기 용도)
*/
private static final class OutputResult {
private final boolean completed;
private final String reason;
private OutputResult(boolean completed, String reason) {
this.completed = completed;
this.reason = reason;
}
boolean completed() {
return completed;
}
String reason() {
return reason;
}
} }
} }

View File

@@ -21,68 +21,186 @@ public class TmpDatasetService {
private String trainBaseDir; private String trainBaseDir;
/** /**
* 다른 데이터셋 파일과 이름이 겹치면 그 파일은 skip함 * train, val, test 폴더별로 link
* *
* @param uid * @param uid 임시폴더 uuid
* @param type * @param type train, val, test
* @param links * @param links tif pull path
* @return * @return
* @throws IOException * @throws IOException
*/ */
public String buildTmpDatasetSymlink(String uid, String type, List<ModelTrainLinkDto> links) public void buildTmpDatasetHardlink(String uid, String type, List<ModelTrainLinkDto> links)
throws IOException { throws IOException {
if (uid == null || uid.isBlank()) throw new IOException("uid is empty"); if (links == null || links.isEmpty()) {
if (type == null || type.isBlank()) throw new IOException("type is empty"); throw new IOException("links is empty");
if (links == null || links.isEmpty()) throw new IOException("links is empty"); }
log.info("========== buildTmpDatasetHardlink MERGE START =========="); Path tmp = Path.of(trainBaseDir, "tmp", uid);
log.info("uid={}, type={}, links.size={}", uid, type, links.size());
long hardlinksMade = 0;
for (ModelTrainLinkDto dto : links) {
if (type == null) {
log.warn("SKIP - trainType null: {}", dto);
continue;
}
// type별 디렉토리 생성
Files.createDirectories(tmp.resolve(type).resolve("input1"));
Files.createDirectories(tmp.resolve(type).resolve("input2"));
Files.createDirectories(tmp.resolve(type).resolve("label"));
Files.createDirectories(tmp.resolve(type).resolve("label-json"));
// comparePath → input1
hardlinksMade += link(tmp, type, "input1", dto.getComparePath());
// targetPath → input2
hardlinksMade += link(tmp, type, "input2", dto.getTargetPath());
// labelPath → label
hardlinksMade += link(tmp, type, "label", dto.getLabelPath());
// geoJsonPath -> label-json
hardlinksMade += link(tmp, type, "label-json", dto.getGeoJsonPath());
}
if (hardlinksMade == 0) {
throw new IOException("No hardlinks created.");
}
log.info("tmp dataset created: {}, hardlinksMade={}", tmp, hardlinksMade);
}
private long link(Path tmp, String type, String part, String fullPath) throws IOException {
if (fullPath == null || fullPath.isBlank()) return 0;
Path src = Path.of(fullPath);
if (!Files.isRegularFile(src)) {
log.warn("SKIP (not file): {}", src);
return 0;
}
String fileName = src.getFileName().toString();
Path dst = tmp.resolve(type).resolve(part).resolve(fileName);
// 충돌 시 덮어쓰기
if (Files.exists(dst)) {
Files.delete(dst);
}
Files.createLink(dst, src);
return 1;
}
private String safe(String s) {
return (s == null || s.isBlank()) ? null : s.trim();
}
/**
* request 전체 폴더 link
*
* @param uid
* @param datasetUids
* @return
* @throws IOException
*/
public String buildTmpDatasetSymlink(String uid, List<String> datasetUids) throws IOException {
log.info("========== buildTmpDatasetHardlink START ==========");
log.info("uid={}", uid);
log.info("datasetUids={}", datasetUids);
log.info("requestDir(raw)={}", requestDir);
Path BASE = toPath(requestDir); Path BASE = toPath(requestDir);
Path tmp = Path.of(trainBaseDir, "tmp", uid); Path tmp = Path.of(trainBaseDir, "tmp", uid);
long hardlinksMade = 0; log.info("BASE={}", BASE);
long skippedCollision = 0; log.info("BASE exists? {}", Files.isDirectory(BASE));
long noDir = 0; log.info("tmp={}", tmp);
// tmp/<type>/<part> 준비 long noDir = 0, scannedDirs = 0, regularFiles = 0, hardlinksMade = 0;
// tmp 디렉토리 준비
for (String type : List.of("train", "val", "test")) {
for (String part : List.of("input1", "input2", "label", "label-json")) { for (String part : List.of("input1", "input2", "label", "label-json")) {
Files.createDirectories(tmp.resolve(type).resolve(part)); Path dir = tmp.resolve(type).resolve(part);
Files.createDirectories(dir);
log.info("createDirectories: {}", dir);
}
} }
for (ModelTrainLinkDto dto : links) { // 하드링크는 "같은 파일시스템"에서만 가능하므로 BASE/tmp가 같은 FS인지 미리 확인(권장)
String datasetUid = safe(dto.getDatasetUid()); try {
if (datasetUid == null) { var baseStore = Files.getFileStore(BASE);
log.warn("SKIP dto (datasetUid null): {}", dto); var tmpStore = Files.getFileStore(tmp.getParent()); // BASE/tmp
continue; if (!baseStore.name().equals(tmpStore.name()) || !baseStore.type().equals(tmpStore.type())) {
throw new IOException(
"Hardlink requires same filesystem. baseStore="
+ baseStore.name()
+ "("
+ baseStore.type()
+ "), tmpStore="
+ tmpStore.name()
+ "("
+ tmpStore.type()
+ ")");
}
} catch (Exception e) {
// FileStore 비교가 환경마다 애매할 수 있어서, 여기서는 경고만 주고 실제 createLink에서 최종 판단하게 둘 수도 있음.
log.warn("FileStore check skipped/failed (will rely on createLink): {}", e.toString());
} }
Path srcRoot = BASE.resolve(datasetUid); for (String id : datasetUids) {
Path srcRoot = BASE.resolve(id);
log.info("---- dataset id={} srcRoot={} exists? {}", id, srcRoot, Files.isDirectory(srcRoot));
for (String type : List.of("train", "val", "test")) {
for (String part : List.of("input1", "input2", "label", "label-json")) { for (String part : List.of("input1", "input2", "label", "label-json")) {
Path srcDir = srcRoot.resolve(type).resolve(part); Path srcDir = srcRoot.resolve(type).resolve(part);
if (!Files.isDirectory(srcDir)) { if (!Files.isDirectory(srcDir)) {
log.warn("SKIP (not directory): {}", srcDir);
noDir++; noDir++;
continue; continue;
} }
// ✅ 하위폴더까지 전부 scannedDirs++;
try (var walk = Files.walk(srcDir)) { log.info("SCAN dir={}", srcDir);
for (Path f : walk.filter(Files::isRegularFile).toList()) {
String fileName = f.getFileName().toString(); try (DirectoryStream<Path> stream = Files.newDirectoryStream(srcDir)) {
Path dst = tmp.resolve(type).resolve(part).resolve(fileName); for (Path f : stream) {
if (!Files.isRegularFile(f)) {
// ✅ 이름 유지 + 충돌은 skip log.debug("skip non-regular file: {}", f);
if (Files.exists(dst)) {
skippedCollision++;
continue; continue;
} }
regularFiles++;
String dstName = id + "__" + f.getFileName();
Path dst = tmp.resolve(type).resolve(part).resolve(dstName);
// dst가 남아있으면 삭제(심볼릭링크든 파일이든)
if (Files.exists(dst) || Files.isSymbolicLink(dst)) {
Files.delete(dst);
log.debug("deleted existing: {}", dst);
}
try {
// 하드링크 생성 (dst가 새 파일로 생기지만 inode는 f와 동일)
Files.createLink(dst, f); Files.createLink(dst, f);
hardlinksMade++; hardlinksMade++;
log.debug("created hardlink: {} => {}", dst, f);
} catch (IOException e) {
// 여기서 바로 실패시키면 “tmp는 만들었는데 내용은 0개” 같은 상태를 방지할 수 있음
log.error("FAILED create hardlink: {} => {}", dst, f, e);
throw e;
}
}
} }
} }
} }
@@ -90,35 +208,29 @@ public class TmpDatasetService {
if (hardlinksMade == 0) { if (hardlinksMade == 0) {
throw new IOException( throw new IOException(
"No hardlinks created. noDir=" + noDir + ", skippedCollision=" + skippedCollision); "No hardlinks created. regularFiles="
+ regularFiles
+ ", scannedDirs="
+ scannedDirs
+ ", noDir="
+ noDir);
} }
log.info("tmp dataset created: {}", tmp);
log.info( log.info(
"tmp dataset merged: {} (type={}), hardlinksMade={}, skippedCollision={}, noDir={}", "summary: scannedDirs={}, noDir={}, regularFiles={}, hardlinksMade={}",
tmp, scannedDirs,
type, noDir,
hardlinksMade, regularFiles,
skippedCollision, hardlinksMade);
noDir);
return uid; return uid;
} }
private static Path toPath(String p) { private static Path toPath(String p) {
if (p == null || p.isBlank()) { if (p.startsWith("~/")) {
throw new IllegalArgumentException("path is null or blank"); return Paths.get(System.getProperty("user.home")).resolve(p.substring(2)).normalize();
} }
String trimmed = p.trim(); return Paths.get(p).toAbsolutePath().normalize();
if (trimmed.startsWith("~/")) {
return Paths.get(System.getProperty("user.home"))
.resolve(trimmed.substring(2))
.toAbsolutePath()
.normalize();
}
return Paths.get(trimmed).toAbsolutePath().normalize();
}
private static String safe(String s) {
return (s == null || s.isBlank()) ? null : s.trim();
} }
} }

View File

@@ -266,6 +266,8 @@ public class TrainJobService {
List<String> uids = modelTrainMngCoreService.findDatasetUid(datasetIds); List<String> uids = modelTrainMngCoreService.findDatasetUid(datasetIds);
try { try {
// 데이터셋 심볼링크 생성
// String pathUid = tmpDatasetService.buildTmpDatasetSymlink(raw, uids);
// train path // train path
List<ModelTrainLinkDto> trainList = modelTrainMngCoreService.findDatasetTrainPath(modelId); List<ModelTrainLinkDto> trainList = modelTrainMngCoreService.findDatasetTrainPath(modelId);
// validation path // validation path
@@ -274,11 +276,11 @@ public class TrainJobService {
List<ModelTrainLinkDto> testList = modelTrainMngCoreService.findDatasetTestPath(modelId); List<ModelTrainLinkDto> testList = modelTrainMngCoreService.findDatasetTestPath(modelId);
// train 데이터셋 심볼링크 생성 // train 데이터셋 심볼링크 생성
tmpDatasetService.buildTmpDatasetSymlink(raw, "train", trainList); tmpDatasetService.buildTmpDatasetHardlink(raw, "train", trainList);
// val 데이터셋 심볼링크 생성 // val 데이터셋 심볼링크 생성
tmpDatasetService.buildTmpDatasetSymlink(raw, "val", valList); tmpDatasetService.buildTmpDatasetHardlink(raw, "val", valList);
// test 데이터셋 심볼링크 생성 // test 데이터셋 심볼링크 생성
tmpDatasetService.buildTmpDatasetSymlink(raw, "test", testList); tmpDatasetService.buildTmpDatasetHardlink(raw, "test", testList);
ModelTrainMngDto.UpdateReq updateReq = new ModelTrainMngDto.UpdateReq(); ModelTrainMngDto.UpdateReq updateReq = new ModelTrainMngDto.UpdateReq();
updateReq.setRequestPath(raw); updateReq.setRequestPath(raw);