|
|
|
|
@@ -1,74 +1,451 @@
|
|
|
|
|
package com.kamco.cd.training.train.service;
|
|
|
|
|
|
|
|
|
|
import com.kamco.cd.training.model.dto.ModelTrainMngDto;
|
|
|
|
|
import com.kamco.cd.training.postgres.core.ModelTrainJobCoreService;
|
|
|
|
|
import com.kamco.cd.training.postgres.core.ModelTrainMngCoreService;
|
|
|
|
|
import com.kamco.cd.training.train.dto.ModelTrainJobDto;
|
|
|
|
|
import java.io.BufferedReader;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.io.InputStreamReader;
|
|
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
|
import java.nio.file.DirectoryStream;
|
|
|
|
|
import java.nio.file.Files;
|
|
|
|
|
import java.nio.file.Path;
|
|
|
|
|
import java.nio.file.Paths;
|
|
|
|
|
import java.util.List;
|
|
|
|
|
import java.util.Map;
|
|
|
|
|
import java.util.Optional;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
|
|
import lombok.extern.log4j.Log4j2;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
|
|
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
|
|
|
|
import org.springframework.context.annotation.Profile;
|
|
|
|
|
import org.springframework.context.event.EventListener;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
import org.springframework.transaction.annotation.Transactional;
|
|
|
|
|
|
|
|
|
|
/** 실행중 학습이 있을때 처리 */
|
|
|
|
|
/**
|
|
|
|
|
* 서버 재기동 시 "RUNNING 상태로 남아있는 학습 Job"을 복구(정리)하기 위한 서비스.
|
|
|
|
|
*
|
|
|
|
|
* <p>상황 예시: - 서버가 강제 재기동/장애로 내려감 - DB 상에서는 job_state가 RUNNING(진행중)으로 남아있음 - 실제 docker 컨테이너는: 1) 아직
|
|
|
|
|
* 살아있거나(running=true) 2) 종료되었거나(exited) 3) --rm 옵션으로 인해 컨테이너가 이미 삭제되어 존재하지 않을 수 있음
|
|
|
|
|
*
|
|
|
|
|
* <p>이 클래스는 ApplicationReadyEvent(스프링 부팅 완료) 시점에 실행되어, DB의 RUNNING 잡들을 조회한 뒤 컨테이너 상태를 점검하고,
|
|
|
|
|
* SUCCESS/FAILED 처리를 수행합니다.
|
|
|
|
|
*/
|
|
|
|
|
@Profile("!local")
|
|
|
|
|
@Component
|
|
|
|
|
@RequiredArgsConstructor
|
|
|
|
|
@Log4j2
|
|
|
|
|
@Transactional(readOnly = true)
|
|
|
|
|
public class JobRecoveryOnStartupService {
|
|
|
|
|
|
|
|
|
|
private final ModelTrainJobCoreService modelTrainJobCoreService;
|
|
|
|
|
private final ModelTrainMngCoreService modelTrainMngCoreService;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Docker 컨테이너가 쓰는 response(산출물) 디렉토리의 "호스트 측" 베이스 경로. 예) /data/train/response
|
|
|
|
|
*
|
|
|
|
|
* <p>컨테이너가 --rm 으로 삭제된 경우에도 이 경로에 val.csv / *.pth 등이 남아있으면 정상 종료 여부를 "파일 기반"으로 판정합니다.
|
|
|
|
|
*/
|
|
|
|
|
@Value("${train.docker.responseDir}")
|
|
|
|
|
private String responseDir;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 스프링 부팅 완료 시점(빈 생성/초기화 모두 끝난 뒤)에 복구 로직 실행.
|
|
|
|
|
*
|
|
|
|
|
* <p>@Transactional: - recover() 메서드 전체가 하나의 트랜잭션으로 감싸집니다. - Job 하나씩 처리하다가 예외가 발생하면 전체 롤백이 될 수
|
|
|
|
|
* 있으므로 "잡 단위로 확실히 커밋"이 필요하면 (권장) 잡 단위로 분리 트랜잭션(REQUIRES_NEW) 고려하세요.
|
|
|
|
|
*/
|
|
|
|
|
@EventListener(ApplicationReadyEvent.class)
|
|
|
|
|
@Transactional
|
|
|
|
|
public void recover() {
|
|
|
|
|
// RUNNING 중인 학습이 있는지 조회
|
|
|
|
|
ModelTrainJobDto runningJobs = modelTrainJobCoreService.findRunningJobs();
|
|
|
|
|
|
|
|
|
|
if (runningJobs == null) {
|
|
|
|
|
// 1) DB에서 "RUNNING(진행중) 상태"로 남아있는 job 목록을 조회
|
|
|
|
|
List<ModelTrainJobDto> runningJobs = modelTrainJobCoreService.findRunningJobs();
|
|
|
|
|
|
|
|
|
|
// 실행중 job이 없으면 할 일 없음
|
|
|
|
|
if (runningJobs == null || runningJobs.isEmpty()) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String containerName = runningJobs.getContainerName();
|
|
|
|
|
// 2) 각 job에 대해 docker 컨테이너 상태를 확인하고, 상태에 따라 조치
|
|
|
|
|
for (ModelTrainJobDto job : runningJobs) {
|
|
|
|
|
String containerName = job.getContainerName();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
boolean containerAlive = isContainerRunning(containerName);
|
|
|
|
|
try {
|
|
|
|
|
// 2-1) docker inspect로 컨테이너 상태 조회
|
|
|
|
|
DockerInspectState state = inspectContainer(containerName);
|
|
|
|
|
|
|
|
|
|
if (containerAlive) {
|
|
|
|
|
// 컨테이너 살아있으면 → RUNNING 유지
|
|
|
|
|
log.info("[RECOVERY] container still running: {}", containerName);
|
|
|
|
|
// 3) 컨테이너가 "없음"
|
|
|
|
|
// - docker run --rm 로 실행한 컨테이너는 정상 종료 시 바로 삭제될 수 있음
|
|
|
|
|
// - 즉 "컨테이너 없음"이 무조건 실패는 아님
|
|
|
|
|
if (!state.exists()) {
|
|
|
|
|
log.warn(
|
|
|
|
|
"[RECOVERY] container missing. try file-based reconcile. container={}",
|
|
|
|
|
containerName);
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
// 컨테이너 죽었으면 → FAILED 처리
|
|
|
|
|
log.info("[RECOVERY] container not found. mark FAILED: {}", containerName);
|
|
|
|
|
// 3-1) 컨테이너가 없을 때는 산출물(responseDir)을 보고 완료 여부를 "추정"
|
|
|
|
|
OutputResult out = probeOutputs(job);
|
|
|
|
|
|
|
|
|
|
// 3-2) 산출물이 충분하면 성공 처리
|
|
|
|
|
if (out.completed()) {
|
|
|
|
|
log.info("[RECOVERY] outputs look completed. mark SUCCESS. jobId={}", job.getId());
|
|
|
|
|
modelTrainJobCoreService.markSuccess(job.getId(), 0);
|
|
|
|
|
markStepSuccessByJobType(job);
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
// 3-3) 산출물이 부족하면 실패 처리(운영 정책에 따라 "유예"도 가능)
|
|
|
|
|
log.warn(
|
|
|
|
|
"[RECOVERY] outputs incomplete. mark FAILED. jobId={} reason={}",
|
|
|
|
|
job.getId(),
|
|
|
|
|
out.reason());
|
|
|
|
|
|
|
|
|
|
modelTrainJobCoreService.markFailed(
|
|
|
|
|
job.getId(), -1, "SERVER_RESTART_CONTAINER_MISSING_OUTPUT_INCOMPLETE");
|
|
|
|
|
|
|
|
|
|
markStepErrorByJobType(job, out.reason());
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 4) 컨테이너는 존재하고, 아직 running=true
|
|
|
|
|
// - 서버만 재기동됐고 컨테이너는 그대로 살아있는 케이스
|
|
|
|
|
// - 이 경우 DB를 건드리면 오히려 꼬일 수 있으니 RUNNING 유지
|
|
|
|
|
if (state.running()) {
|
|
|
|
|
log.info("[RECOVERY] container still running. container={}", containerName);
|
|
|
|
|
try {
|
|
|
|
|
ProcessBuilder pb = new ProcessBuilder("docker", "stop", "-t", "20", containerName);
|
|
|
|
|
pb.redirectErrorStream(true);
|
|
|
|
|
|
|
|
|
|
Process p = pb.start();
|
|
|
|
|
|
|
|
|
|
boolean finished = p.waitFor(30, TimeUnit.SECONDS);
|
|
|
|
|
if (!finished) {
|
|
|
|
|
p.destroyForcibly();
|
|
|
|
|
throw new IOException("docker stop timeout");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int code = p.exitValue();
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
throw new IOException("docker stop failed. exit=" + code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.info(
|
|
|
|
|
"[RECOVERY] container stopped (will be auto removed by --rm). container={}",
|
|
|
|
|
containerName);
|
|
|
|
|
|
|
|
|
|
// 여기서 상태를 PAUSED로 바꿔도 되고
|
|
|
|
|
modelTrainJobCoreService.markPaused(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART");
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("[RECOVERY] docker stop failed. container={}", containerName, e);
|
|
|
|
|
|
|
|
|
|
modelTrainJobCoreService.markFailed(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART");
|
|
|
|
|
}
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 5) 컨테이너는 존재하지만 running=false
|
|
|
|
|
// - exited / dead 등의 상태
|
|
|
|
|
Integer exitCode = state.exitCode();
|
|
|
|
|
String status = state.status();
|
|
|
|
|
|
|
|
|
|
// 5-1) exitCode=0이면 정상 종료로 간주 → SUCCESS 처리
|
|
|
|
|
if (exitCode != null && exitCode == 0) {
|
|
|
|
|
log.info("[RECOVERY] container exited(0). mark SUCCESS. container={}", containerName);
|
|
|
|
|
modelTrainJobCoreService.markSuccess(job.getId(), 0);
|
|
|
|
|
markStepSuccessByJobType(job);
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
// 5-2) exitCode != 0 이거나 null이면 실패로 간주 → FAILED 처리
|
|
|
|
|
log.warn(
|
|
|
|
|
"[RECOVERY] container exited non-zero. mark FAILED. container={} status={} exitCode={}",
|
|
|
|
|
containerName,
|
|
|
|
|
status,
|
|
|
|
|
exitCode);
|
|
|
|
|
|
|
|
|
|
modelTrainJobCoreService.markFailed(
|
|
|
|
|
job.getId(), exitCode, "SERVER_RESTART_CONTAINER_EXIT_NONZERO");
|
|
|
|
|
|
|
|
|
|
markStepErrorByJobType(job, "exit=" + exitCode + " status=" + status);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
// 6) docker inspect 자체가 실패한 경우
|
|
|
|
|
// - docker 데몬 문제/권한 문제/일시적 오류 가능
|
|
|
|
|
// - 운영 정책에 따라 "바로 실패" 대신 "유예" 처리도 고려 가능
|
|
|
|
|
log.error("[RECOVERY] container inspect failed. container={}", containerName, e);
|
|
|
|
|
|
|
|
|
|
modelTrainJobCoreService.markFailed(
|
|
|
|
|
runningJobs.getId(), null, "SERVER_RESTART_CONTAINER_NOT_FOUND");
|
|
|
|
|
}
|
|
|
|
|
} catch (IOException e) {
|
|
|
|
|
log.error("[RECOVERY] container check failed. mark FAILED: {}", containerName, e);
|
|
|
|
|
job.getId(), -1, "SERVER_RESTART_CONTAINER_INSPECT_ERROR");
|
|
|
|
|
|
|
|
|
|
modelTrainJobCoreService.markFailed(
|
|
|
|
|
runningJobs.getId(), null, "SERVER_RESTART_CONTAINER_CHECK_ERROR");
|
|
|
|
|
markStepErrorByJobType(job, "inspect-error");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* docker 실행중인지 확인하기
|
|
|
|
|
* jobType에 따라 학습 관리 테이블의 "성공 단계"를 업데이트.
|
|
|
|
|
*
|
|
|
|
|
* @param containerName container name
|
|
|
|
|
* @return true, false
|
|
|
|
|
* @throws IOException
|
|
|
|
|
* <p>예: - jobType == "EVAL" → step2(평가 단계) 성공 - 그 외 → step1(학습 단계) 성공
|
|
|
|
|
*/
|
|
|
|
|
private boolean isContainerRunning(String containerName) throws IOException {
|
|
|
|
|
private void markStepSuccessByJobType(ModelTrainJobDto job) {
|
|
|
|
|
Map<String, Object> params = job.getParamsJson();
|
|
|
|
|
boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType")));
|
|
|
|
|
if (isEval) {
|
|
|
|
|
modelTrainMngCoreService.markStep2Success(job.getModelId());
|
|
|
|
|
} else {
|
|
|
|
|
modelTrainMngCoreService.markStep1Success(job.getModelId());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* jobType에 따라 학습 관리 테이블의 "에러 단계"를 업데이트.
|
|
|
|
|
*
|
|
|
|
|
* <p>예: - jobType == "EVAL" → step2(평가 단계) 에러 - 그 외 → step1 혹은 전체 에러
|
|
|
|
|
*/
|
|
|
|
|
private void markStepErrorByJobType(ModelTrainJobDto job, String msg) {
|
|
|
|
|
Map<String, Object> params = job.getParamsJson();
|
|
|
|
|
boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType")));
|
|
|
|
|
if (isEval) {
|
|
|
|
|
modelTrainMngCoreService.markStep2Error(job.getModelId(), msg);
|
|
|
|
|
} else {
|
|
|
|
|
modelTrainMngCoreService.markError(job.getModelId(), msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* docker inspect를 사용해서 컨테이너 상태를 조회합니다.
|
|
|
|
|
*
|
|
|
|
|
* <p>사용하는 템플릿: {{.State.Status}} {{.State.Running}} {{.State.ExitCode}}
|
|
|
|
|
*
|
|
|
|
|
* <p>예상 출력 예: - "running true 0" - "exited false 0" - "exited false 137"
|
|
|
|
|
*
|
|
|
|
|
* <p>주의: - 컨테이너가 없거나 inspect 실패 시 exitCode != 0 또는 output이 비어서 missing() 반환 - 무한 대기 방지를 위해 5초
|
|
|
|
|
* 타임아웃을 둠
|
|
|
|
|
*/
|
|
|
|
|
private DockerInspectState inspectContainer(String containerName)
|
|
|
|
|
throws IOException, InterruptedException {
|
|
|
|
|
|
|
|
|
|
ProcessBuilder pb =
|
|
|
|
|
new ProcessBuilder("docker", "inspect", "-f", "{{.State.Running}}", containerName);
|
|
|
|
|
new ProcessBuilder(
|
|
|
|
|
"docker",
|
|
|
|
|
"inspect",
|
|
|
|
|
"-f",
|
|
|
|
|
"{{.State.Status}} {{.State.Running}} {{.State.ExitCode}}",
|
|
|
|
|
containerName);
|
|
|
|
|
|
|
|
|
|
// stderr를 stdout으로 합쳐서 한 스트림으로 읽기(에러 메시지도 함께 받음)
|
|
|
|
|
pb.redirectErrorStream(true);
|
|
|
|
|
|
|
|
|
|
Process p = pb.start();
|
|
|
|
|
BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()));
|
|
|
|
|
|
|
|
|
|
String line = br.readLine();
|
|
|
|
|
return "true".equals(line);
|
|
|
|
|
// inspect 출력은 1줄이면 충분하므로 readLine()만 수행
|
|
|
|
|
String output;
|
|
|
|
|
try (BufferedReader br = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
|
|
|
|
|
output = br.readLine();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 무한대기 방지: 5초 내에 종료되지 않으면 강제 종료
|
|
|
|
|
boolean finished = p.waitFor(5, TimeUnit.SECONDS);
|
|
|
|
|
if (!finished) {
|
|
|
|
|
p.destroyForcibly();
|
|
|
|
|
throw new IOException("docker inspect timeout");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// docker inspect 자체의 프로세스 exit code
|
|
|
|
|
int code = p.exitValue();
|
|
|
|
|
|
|
|
|
|
// 실패(코드 !=0) 또는 출력이 없으면 "컨테이너 없음"으로 간주
|
|
|
|
|
if (code != 0 || output == null || output.isBlank()) {
|
|
|
|
|
return DockerInspectState.missing();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// "status running exitCode" 형태로 split
|
|
|
|
|
String[] parts = output.trim().split("\\s+");
|
|
|
|
|
|
|
|
|
|
// status: running/exited/dead 등
|
|
|
|
|
String status = parts.length > 0 ? parts[0] : "unknown";
|
|
|
|
|
|
|
|
|
|
// running: true/false
|
|
|
|
|
boolean running = parts.length > 1 && Boolean.parseBoolean(parts[1]);
|
|
|
|
|
|
|
|
|
|
// exitCode: 정수 파싱(파싱 실패하면 null)
|
|
|
|
|
Integer exitCode = null;
|
|
|
|
|
if (parts.length > 2) {
|
|
|
|
|
try {
|
|
|
|
|
exitCode = Integer.parseInt(parts[2]);
|
|
|
|
|
} catch (Exception ignore) {
|
|
|
|
|
// ignore
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return new DockerInspectState(true, running, exitCode, status);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* docker inspect 결과를 담는 레코드.
|
|
|
|
|
*
|
|
|
|
|
* <p>exists: - true : docker inspect 성공 (컨테이너 존재) - false : 컨테이너 없음(또는 inspect 실패를 missing으로 간주)
|
|
|
|
|
*/
|
|
|
|
|
private record DockerInspectState(
|
|
|
|
|
boolean exists, boolean running, Integer exitCode, String status) {
|
|
|
|
|
static DockerInspectState missing() {
|
|
|
|
|
return new DockerInspectState(false, false, null, "missing");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================================
|
|
|
|
|
// 컨테이너가 "없을 때" 파일 기반으로 완료/미완료를 판정하는 로직
|
|
|
|
|
// ============================================================================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 컨테이너가 없을 때(responseDir 산출물만 남아있는 상태) 완료 여부를 파일 기반으로 판정합니다.
|
|
|
|
|
*
|
|
|
|
|
* <p>판정 규칙(보수적으로 설계): 1) total_epoch가 paramsJson에 있어야 함 (없으면 완료 판단 불가) 2) val.csv 존재 + 헤더 제외 라인 수
|
|
|
|
|
* >= total_epoch 이어야 함 3) *.pth 파일이 total_epoch 이상 존재하거나, best*.pth(또는 *best*.pth)가 존재해야 함
|
|
|
|
|
*
|
|
|
|
|
* <p>왜 이렇게? - 어떤 학습은 epoch마다 pth를 남기고 - 어떤 학습은 best만 남기기도 해서 "pthCount >= total_epoch"만 쓰면 정상 종료를
|
|
|
|
|
* 실패로 오판할 수 있음.
|
|
|
|
|
*/
|
|
|
|
|
private OutputResult probeOutputs(ModelTrainJobDto job) {
|
|
|
|
|
try {
|
|
|
|
|
Path outDir = resolveOutputDir(job);
|
|
|
|
|
if (outDir == null || !Files.isDirectory(outDir)) {
|
|
|
|
|
return new OutputResult(false, "output-dir-missing");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Integer totalEpoch = extractTotalEpoch(job).orElse(null);
|
|
|
|
|
if (totalEpoch == null || totalEpoch <= 0) {
|
|
|
|
|
return new OutputResult(false, "total-epoch-missing");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Path valCsv = outDir.resolve("val.csv");
|
|
|
|
|
if (!Files.exists(valCsv)) {
|
|
|
|
|
return new OutputResult(false, "val.csv-missing");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
long lines = countNonHeaderLines(valCsv);
|
|
|
|
|
|
|
|
|
|
// “같아야 완료” 정책
|
|
|
|
|
if (lines == totalEpoch) {
|
|
|
|
|
return new OutputResult(true, "ok");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return new OutputResult(
|
|
|
|
|
false, "val.csv-lines-mismatch lines=" + lines + " expected=" + totalEpoch);
|
|
|
|
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
|
|
log.error("[RECOVERY] probeOutputs error. jobId={}", job.getId(), e);
|
|
|
|
|
return new OutputResult(false, "probe-error");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* responseDir 아래에서 job 산출물 디렉토리를 찾습니다.
|
|
|
|
|
*
|
|
|
|
|
* <p>가장 중요한 커스터마이징 포인트: - 실제 운영 환경에서 산출물이 어떤 경로 규칙으로 저장되는지에 따라 여기만 수정하면 됩니다.
|
|
|
|
|
*
|
|
|
|
|
* <p>현재 기본 탐색 순서: 1) {responseDir}/{jobId} 2) {responseDir}/{modelId} 3)
|
|
|
|
|
* {responseDir}/{containerName} 4) 마지막 fallback: responseDir 자체
|
|
|
|
|
*
|
|
|
|
|
* <p>추천: - 여러분 규칙이 "{responseDir}/{modelId}/{jobId}" 같은 형태라면 base.resolve(modelId).resolve(jobId)
|
|
|
|
|
* 형태를 1순위로 두세요.
|
|
|
|
|
*/
|
|
|
|
|
private Path resolveOutputDir(ModelTrainJobDto job) {
|
|
|
|
|
ModelTrainMngDto.Basic model = modelTrainMngCoreService.findModelById(job.getModelId());
|
|
|
|
|
|
|
|
|
|
Path base = Paths.get(responseDir, model.getUuid().toString(), "metrics");
|
|
|
|
|
|
|
|
|
|
return Files.isDirectory(base) ? base : null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* paramsJson에서 total_epoch 값을 추출합니다.
|
|
|
|
|
*
|
|
|
|
|
* <p>키 후보: - "total_epoch" (snake_case) - "totalEpoch" (camelCase)
|
|
|
|
|
*
|
|
|
|
|
* <p>예: paramsJson = {"jobType":"TRAIN","total_epoch":50,...}
|
|
|
|
|
*/
|
|
|
|
|
private Optional<Integer> extractTotalEpoch(ModelTrainJobDto job) {
|
|
|
|
|
Map<String, Object> params = job.getParamsJson();
|
|
|
|
|
if (params == null) return Optional.empty();
|
|
|
|
|
|
|
|
|
|
Object v = params.get("total_epoch");
|
|
|
|
|
if (v == null) v = params.get("totalEpoch");
|
|
|
|
|
if (v == null) return Optional.empty();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
return Optional.of(Integer.parseInt(String.valueOf(v)));
|
|
|
|
|
} catch (Exception ignore) {
|
|
|
|
|
return Optional.empty();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* CSV 파일에서 "헤더(첫 줄)"를 제외한 라인 수를 계산합니다.
|
|
|
|
|
*
|
|
|
|
|
* <p>가정: - val.csv 첫 줄은 헤더 - 이후 라인들이 epoch별 기록(또는 유사한 누적 기록)
|
|
|
|
|
*
|
|
|
|
|
* <p>주의: - 파일 인코딩은 UTF-8로 가정 - 빈 줄은 제외
|
|
|
|
|
*/
|
|
|
|
|
private long countNonHeaderLines(Path csv) throws IOException {
|
|
|
|
|
try (Stream<String> lines = Files.lines(csv, StandardCharsets.UTF_8)) {
|
|
|
|
|
return lines.skip(1).filter(s -> s != null && !s.isBlank()).count();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 디렉토리에서 glob 패턴에 맞는 파일 수를 셉니다.
|
|
|
|
|
*
|
|
|
|
|
* <p>예: - "*.pth" - "best*.pth"
|
|
|
|
|
*/
|
|
|
|
|
private long countFilesByGlob(Path dir, String glob) throws IOException {
|
|
|
|
|
try (DirectoryStream<Path> ds = Files.newDirectoryStream(dir, glob)) {
|
|
|
|
|
long cnt = 0;
|
|
|
|
|
for (Path p : ds) {
|
|
|
|
|
if (Files.isRegularFile(p)) cnt++;
|
|
|
|
|
}
|
|
|
|
|
return cnt;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** 디렉토리에서 glob 패턴에 맞는 파일이 "하나라도" 존재하는지 체크합니다. */
|
|
|
|
|
private boolean existsByGlob(Path dir, String glob) throws IOException {
|
|
|
|
|
try (DirectoryStream<Path> ds = Files.newDirectoryStream(dir, glob)) {
|
|
|
|
|
return ds.iterator().hasNext();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ============================================================================================
|
|
|
|
|
// probeOutputs() 결과 객체
|
|
|
|
|
// ============================================================================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 컨테이너가 없을 때(responseDir 기반) 완료 여부 판정 결과.
|
|
|
|
|
*
|
|
|
|
|
* <p>completed: - true : 산출물이 완료로 보임(성공 처리 가능) - false : 산출물이 부족/불명확(실패 또는 유예 판단)
|
|
|
|
|
*
|
|
|
|
|
* <p>reason: - 실패/미완료 사유(로그/DB 메시지로 남기기 용도)
|
|
|
|
|
*/
|
|
|
|
|
private static final class OutputResult {
|
|
|
|
|
|
|
|
|
|
private final boolean completed;
|
|
|
|
|
private final String reason;
|
|
|
|
|
|
|
|
|
|
private OutputResult(boolean completed, String reason) {
|
|
|
|
|
this.completed = completed;
|
|
|
|
|
this.reason = reason;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
boolean completed() {
|
|
|
|
|
return completed;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String reason() {
|
|
|
|
|
return reason;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|