Merge branch 'develop' of https://kamco.git.gs.dabeeo.com/MVPTeam/kamco-train-api into develop
This commit is contained in:
@@ -3,7 +3,6 @@ package com.kamco.cd.training.common.utils;
|
||||
import static java.lang.String.CASE_INSENSITIVE_ORDER;
|
||||
|
||||
import com.jcraft.jsch.ChannelExec;
|
||||
import com.jcraft.jsch.ChannelSftp;
|
||||
import com.jcraft.jsch.JSch;
|
||||
import com.jcraft.jsch.Session;
|
||||
import com.kamco.cd.training.common.exception.CustomApiException;
|
||||
|
||||
@@ -398,9 +398,8 @@ public class DatasetService {
|
||||
return datasetCoreService.getFilePathByUUIDPathType(uuid, pathType);
|
||||
}
|
||||
|
||||
|
||||
|
||||
private String escape(String path) {
|
||||
// 쉘 커맨드에서 안전하게 사용할 수 있도록 문자열을 작은따옴표로 감싸면서, 내부의 작은따옴표를 이스케이프 처리
|
||||
return "'" + path.replace("'", "'\"'\"'") + "'";
|
||||
}
|
||||
|
||||
|
||||
@@ -430,6 +430,44 @@ public class ModelTrainMngCoreService {
|
||||
master.setUpdatedDttm(ZonedDateTime.now());
|
||||
}
|
||||
|
||||
/**
|
||||
* step1 정지 처리
|
||||
*
|
||||
* @param modelId
|
||||
* @param errorMessage
|
||||
*/
|
||||
public void markStep1Stop(Long modelId, String errorMessage) {
|
||||
ModelMasterEntity master =
|
||||
modelMngRepository
|
||||
.findById(modelId)
|
||||
.orElseThrow(() -> new IllegalArgumentException("Model not found: " + modelId));
|
||||
|
||||
master.setStatusCd(TrainStatusType.STOPPED.getId());
|
||||
master.setStep1State(TrainStatusType.STOPPED.getId());
|
||||
master.setLastError(errorMessage);
|
||||
master.setUpdatedUid(userUtil.getId());
|
||||
master.setUpdatedDttm(ZonedDateTime.now());
|
||||
}
|
||||
|
||||
/**
|
||||
* step2 정지 처리
|
||||
*
|
||||
* @param modelId
|
||||
* @param errorMessage
|
||||
*/
|
||||
public void markStep2Stop(Long modelId, String errorMessage) {
|
||||
ModelMasterEntity master =
|
||||
modelMngRepository
|
||||
.findById(modelId)
|
||||
.orElseThrow(() -> new IllegalArgumentException("Model not found: " + modelId));
|
||||
|
||||
master.setStatusCd(TrainStatusType.STOPPED.getId());
|
||||
master.setStep2State(TrainStatusType.STOPPED.getId());
|
||||
master.setLastError(errorMessage);
|
||||
master.setUpdatedUid(userUtil.getId());
|
||||
master.setUpdatedDttm(ZonedDateTime.now());
|
||||
}
|
||||
|
||||
@Transactional
|
||||
public void markSuccess(Long modelId) {
|
||||
ModelMasterEntity master =
|
||||
|
||||
@@ -222,7 +222,9 @@ public class DataSetCountersService {
|
||||
log.info("missing = {}", missing.size());
|
||||
log.info("extra = {}", extra.size());
|
||||
|
||||
log.info("[MISSING] total = {}", missing.size());
|
||||
missing.stream().sorted().limit(50).forEach(f -> log.warn("[MISSING] {}", f));
|
||||
log.info("[EXTRA] total = {}", extra.size());
|
||||
extra.stream().sorted().limit(50).forEach(f -> log.warn("[EXTRA] {}", f));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,7 +20,9 @@ import java.util.stream.Stream;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.context.event.ApplicationReadyEvent;
|
||||
import org.springframework.context.annotation.Profile;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
@@ -41,6 +43,7 @@ public class JobRecoveryOnStartupService {
|
||||
|
||||
private final ModelTrainJobCoreService modelTrainJobCoreService;
|
||||
private final ModelTrainMngCoreService modelTrainMngCoreService;
|
||||
private final ModelTrainMetricsJobService modelTrainMetricsJobService;
|
||||
|
||||
/**
|
||||
* Docker 컨테이너가 쓰는 response(산출물) 디렉토리의 "호스트 측" 베이스 경로. 예) /data/train/response
|
||||
@@ -56,7 +59,7 @@ public class JobRecoveryOnStartupService {
|
||||
* <p>@Transactional: - recover() 메서드 전체가 하나의 트랜잭션으로 감싸집니다. - Job 하나씩 처리하다가 예외가 발생하면 전체 롤백이 될 수
|
||||
* 있으므로 "잡 단위로 확실히 커밋"이 필요하면 (권장) 잡 단위로 분리 트랜잭션(REQUIRES_NEW) 고려하세요.
|
||||
*/
|
||||
// @EventListener(ApplicationReadyEvent.class)
|
||||
@EventListener(ApplicationReadyEvent.class)
|
||||
@Transactional
|
||||
public void recover() {
|
||||
|
||||
@@ -91,57 +94,92 @@ public class JobRecoveryOnStartupService {
|
||||
if (out.completed()) {
|
||||
log.info("[RECOVERY] outputs look completed. mark SUCCESS. jobId={}", job.getId());
|
||||
modelTrainJobCoreService.markSuccess(job.getId(), 0);
|
||||
// model 상태 변경
|
||||
markStepSuccessByJobType(job);
|
||||
// 결과 csv 파일 정보 등록
|
||||
modelTrainMetricsJobService.findTrainValidMetricCsvFiles();
|
||||
|
||||
} else {
|
||||
// 3-3) 산출물이 부족하면 실패 처리(운영 정책에 따라 "유예"도 가능)
|
||||
|
||||
// 3-3) 산출물이 부족하면 중단처리
|
||||
// 산출물이 부족하면 "중단/보류"로 처리
|
||||
// 운영자가 재시작 할 수 있게 한다.
|
||||
log.warn(
|
||||
"[RECOVERY] outputs incomplete. mark FAILED. jobId={} reason={}",
|
||||
"[RECOVERY] outputs incomplete. mark PAUSED/STOP for restart. jobId={} reason={}",
|
||||
job.getId(),
|
||||
out.reason());
|
||||
|
||||
modelTrainJobCoreService.markFailed(
|
||||
job.getId(), -1, "SERVER_RESTART_CONTAINER_MISSING_OUTPUT_INCOMPLETE");
|
||||
Integer modelId = job.getModelId() == null ? null : Math.toIntExact(job.getModelId());
|
||||
|
||||
markStepErrorByJobType(job, out.reason());
|
||||
// PAUSED/STOP
|
||||
modelTrainJobCoreService.markPaused(
|
||||
job.getId(), modelId, "SERVER_RESTART_CONTAINER_MISSING_OUTPUT_INCOMPLETE");
|
||||
|
||||
// 모델도 에러가 아니라 STOP으로
|
||||
markStepStopByJobType(
|
||||
job, "SERVER_RESTART_CONTAINER_MISSING_OUTPUT_INCOMPLETE: " + out.reason());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// 4) 컨테이너는 존재하고, 아직 running=true
|
||||
// - 서버만 재기동됐고 컨테이너는 그대로 살아있는 케이스
|
||||
// - 이 경우 DB를 건드리면 오히려 꼬일 수 있으니 RUNNING 유지
|
||||
// - 실행중 docker 를 kill 하고 이어하기를 한다,
|
||||
if (state.running()) {
|
||||
log.info("[RECOVERY] container still running. container={}", containerName);
|
||||
log.warn("[RECOVERY] container still running. force kill. container={}", containerName);
|
||||
|
||||
try {
|
||||
ProcessBuilder pb = new ProcessBuilder("docker", "stop", "-t", "20", containerName);
|
||||
// ============================================================
|
||||
// 1) docker kill (SIGKILL) 바로 전송
|
||||
// - kill은 즉시 종료
|
||||
// ============================================================
|
||||
ProcessBuilder pb = new ProcessBuilder("docker", "kill", containerName);
|
||||
pb.redirectErrorStream(true);
|
||||
|
||||
Process p = pb.start();
|
||||
|
||||
boolean finished = p.waitFor(30, TimeUnit.SECONDS);
|
||||
boolean finished = p.waitFor(20, TimeUnit.SECONDS);
|
||||
if (!finished) {
|
||||
p.destroyForcibly();
|
||||
throw new IOException("docker stop timeout");
|
||||
throw new IOException("docker kill timeout");
|
||||
}
|
||||
|
||||
int code = p.exitValue();
|
||||
if (code != 0) {
|
||||
throw new IOException("docker stop failed. exit=" + code);
|
||||
throw new IOException("docker kill failed. exit=" + code);
|
||||
}
|
||||
|
||||
log.info(
|
||||
"[RECOVERY] container stopped (will be auto removed by --rm). container={}",
|
||||
containerName);
|
||||
// ============================================================
|
||||
// 2) kill 후 실제로 죽었는지 확인
|
||||
// ============================================================
|
||||
DockerInspectState after = inspectContainer(containerName);
|
||||
if (after.exists() && after.running()) {
|
||||
throw new IOException("docker kill returned 0 but container still running");
|
||||
}
|
||||
|
||||
// 여기서 상태를 PAUSED로 바꿔도 되고
|
||||
modelTrainJobCoreService.markPaused(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART");
|
||||
log.info("[RECOVERY] container killed successfully. container={}", containerName);
|
||||
|
||||
// ============================================================
|
||||
// 3) job 상태를 PAUSED로 변경 (서버 재기동으로 강제 중단)
|
||||
// ============================================================
|
||||
Integer modelId = job.getModelId() == null ? null : Math.toIntExact(job.getModelId());
|
||||
|
||||
modelTrainJobCoreService.markPaused(
|
||||
job.getId(), modelId, "AUTO_KILLED_ON_SERVER_RESTART");
|
||||
|
||||
log.info("job = {}", job);
|
||||
markStepStopByJobType(job, "AUTO_KILLED_ON_SERVER_RESTART");
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[RECOVERY] docker stop failed. container={}", containerName, e);
|
||||
|
||||
modelTrainJobCoreService.markFailed(job.getId(), -1, "AUTO_STOP_FAILED_ON_RESTART");
|
||||
log.error("[RECOVERY] docker kill failed. container={}", containerName, e);
|
||||
|
||||
modelTrainJobCoreService.markFailed(
|
||||
job.getId(), -1, "AUTO_KILL_FAILED_ON_SERVER_RESTART");
|
||||
|
||||
markStepErrorByJobType(job, "AUTO_KILL_FAILED_ON_SERVER_RESTART");
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -154,7 +192,10 @@ public class JobRecoveryOnStartupService {
|
||||
if (exitCode != null && exitCode == 0) {
|
||||
log.info("[RECOVERY] container exited(0). mark SUCCESS. container={}", containerName);
|
||||
modelTrainJobCoreService.markSuccess(job.getId(), 0);
|
||||
// model 상태 변경
|
||||
markStepSuccessByJobType(job);
|
||||
// 결과 csv 파일 정보 등록
|
||||
modelTrainMetricsJobService.findTrainValidMetricCsvFiles();
|
||||
|
||||
} else {
|
||||
// 5-2) exitCode != 0 이거나 null이면 실패로 간주 → FAILED 처리
|
||||
@@ -166,7 +207,7 @@ public class JobRecoveryOnStartupService {
|
||||
|
||||
modelTrainJobCoreService.markFailed(
|
||||
job.getId(), exitCode, "SERVER_RESTART_CONTAINER_EXIT_NONZERO");
|
||||
|
||||
// model 상태 변경
|
||||
markStepErrorByJobType(job, "exit=" + exitCode + " status=" + status);
|
||||
}
|
||||
|
||||
@@ -178,7 +219,7 @@ public class JobRecoveryOnStartupService {
|
||||
|
||||
modelTrainJobCoreService.markFailed(
|
||||
job.getId(), -1, "SERVER_RESTART_CONTAINER_INSPECT_ERROR");
|
||||
|
||||
// model 상태 변경
|
||||
markStepErrorByJobType(job, "inspect-error");
|
||||
}
|
||||
}
|
||||
@@ -204,6 +245,16 @@ public class JobRecoveryOnStartupService {
|
||||
*
|
||||
* <p>예: - jobType == "EVAL" → step2(평가 단계) 에러 - 그 외 → step1 혹은 전체 에러
|
||||
*/
|
||||
private void markStepStopByJobType(ModelTrainJobDto job, String msg) {
|
||||
Map<String, Object> params = job.getParamsJson();
|
||||
boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType")));
|
||||
if (isEval) {
|
||||
modelTrainMngCoreService.markStep2Stop(job.getModelId(), msg);
|
||||
} else {
|
||||
modelTrainMngCoreService.markStep1Stop(job.getModelId(), msg);
|
||||
}
|
||||
}
|
||||
|
||||
private void markStepErrorByJobType(ModelTrainJobDto job, String msg) {
|
||||
Map<String, Object> params = job.getParamsJson();
|
||||
boolean isEval = params != null && "EVAL".equals(String.valueOf(params.get("jobType")));
|
||||
@@ -310,33 +361,66 @@ public class JobRecoveryOnStartupService {
|
||||
*/
|
||||
private OutputResult probeOutputs(ModelTrainJobDto job) {
|
||||
try {
|
||||
|
||||
log.info(
|
||||
"[RECOVERY] probeOutputs start. jobId={}, modelId={}", job.getId(), job.getModelId());
|
||||
|
||||
// 1) 출력 디렉토리 확인
|
||||
Path outDir = resolveOutputDir(job);
|
||||
if (outDir == null || !Files.isDirectory(outDir)) {
|
||||
log.warn("[RECOVERY] output directory missing. jobId={}, path={}", job.getId(), outDir);
|
||||
return new OutputResult(false, "output-dir-missing");
|
||||
}
|
||||
|
||||
log.info("[RECOVERY] output directory found. jobId={}, path={}", job.getId(), outDir);
|
||||
|
||||
// 2) totalEpoch 확인
|
||||
Integer totalEpoch = extractTotalEpoch(job).orElse(null);
|
||||
if (totalEpoch == null || totalEpoch <= 0) {
|
||||
log.warn(
|
||||
"[RECOVERY] totalEpoch missing or invalid. jobId={}, totalEpoch={}",
|
||||
job.getId(),
|
||||
totalEpoch);
|
||||
return new OutputResult(false, "total-epoch-missing");
|
||||
}
|
||||
|
||||
log.info("[RECOVERY] totalEpoch={}. jobId={}", totalEpoch, job.getId());
|
||||
|
||||
// 3) val.csv 존재 확인
|
||||
Path valCsv = outDir.resolve("val.csv");
|
||||
if (!Files.exists(valCsv)) {
|
||||
log.warn("[RECOVERY] val.csv missing. jobId={}, path={}", job.getId(), valCsv);
|
||||
return new OutputResult(false, "val.csv-missing");
|
||||
}
|
||||
|
||||
// 4) val.csv 라인 수 확인
|
||||
long lines = countNonHeaderLines(valCsv);
|
||||
|
||||
// “같아야 완료” 정책
|
||||
log.info(
|
||||
"[RECOVERY] val.csv lines counted. jobId={}, lines={}, expected={}",
|
||||
job.getId(),
|
||||
lines,
|
||||
totalEpoch);
|
||||
|
||||
// 5) 완료 판정
|
||||
if (lines == totalEpoch) {
|
||||
log.info("[RECOVERY] outputs look COMPLETE. jobId={}", job.getId());
|
||||
return new OutputResult(true, "ok");
|
||||
}
|
||||
|
||||
log.warn(
|
||||
"[RECOVERY] val.csv line mismatch. jobId={}, lines={}, expected={}",
|
||||
job.getId(),
|
||||
lines,
|
||||
totalEpoch);
|
||||
|
||||
return new OutputResult(
|
||||
false, "val.csv-lines-mismatch lines=" + lines + " expected=" + totalEpoch);
|
||||
|
||||
} catch (Exception e) {
|
||||
|
||||
log.error("[RECOVERY] probeOutputs error. jobId={}", job.getId(), e);
|
||||
|
||||
return new OutputResult(false, "probe-error");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,71 +20,187 @@ public class TmpDatasetService {
|
||||
@Value("${train.docker.basePath}")
|
||||
private String trainBaseDir;
|
||||
|
||||
private final DataSetCountersService dataSetCountersService;
|
||||
|
||||
/**
|
||||
* 다른 데이터셋 파일과 이름이 겹치면 그 파일은 skip함
|
||||
* train, val, test 폴더별로 link
|
||||
*
|
||||
* @param uid
|
||||
* @param type
|
||||
* @param links
|
||||
* @param uid 임시폴더 uuid
|
||||
* @param type train, val, test
|
||||
* @param links tif pull path
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public String buildTmpDatasetSymlink(String uid, String type, List<ModelTrainLinkDto> links)
|
||||
public void buildTmpDatasetHardlink(String uid, String type, List<ModelTrainLinkDto> links)
|
||||
throws IOException {
|
||||
|
||||
if (uid == null || uid.isBlank()) throw new IOException("uid is empty");
|
||||
if (type == null || type.isBlank()) throw new IOException("type is empty");
|
||||
if (links == null || links.isEmpty()) throw new IOException("links is empty");
|
||||
if (links == null || links.isEmpty()) {
|
||||
throw new IOException("links is empty");
|
||||
}
|
||||
|
||||
log.info("========== buildTmpDatasetHardlink MERGE START ==========");
|
||||
log.info("uid={}, type={}, links.size={}", uid, type, links.size());
|
||||
Path tmp = Path.of(trainBaseDir, "tmp", uid);
|
||||
|
||||
long hardlinksMade = 0;
|
||||
|
||||
for (ModelTrainLinkDto dto : links) {
|
||||
|
||||
if (type == null) {
|
||||
log.warn("SKIP - trainType null: {}", dto);
|
||||
continue;
|
||||
}
|
||||
|
||||
// type별 디렉토리 생성
|
||||
Files.createDirectories(tmp.resolve(type).resolve("input1"));
|
||||
Files.createDirectories(tmp.resolve(type).resolve("input2"));
|
||||
Files.createDirectories(tmp.resolve(type).resolve("label"));
|
||||
Files.createDirectories(tmp.resolve(type).resolve("label-json"));
|
||||
|
||||
// comparePath → input1
|
||||
hardlinksMade += link(tmp, type, "input1", dto.getComparePath());
|
||||
|
||||
// targetPath → input2
|
||||
hardlinksMade += link(tmp, type, "input2", dto.getTargetPath());
|
||||
|
||||
// labelPath → label
|
||||
hardlinksMade += link(tmp, type, "label", dto.getLabelPath());
|
||||
|
||||
// geoJsonPath -> label-json
|
||||
hardlinksMade += link(tmp, type, "label-json", dto.getGeoJsonPath());
|
||||
}
|
||||
|
||||
if (hardlinksMade == 0) {
|
||||
throw new IOException("No hardlinks created.");
|
||||
}
|
||||
|
||||
log.info("tmp dataset created: {}, hardlinksMade={}", tmp, hardlinksMade);
|
||||
}
|
||||
|
||||
private long link(Path tmp, String type, String part, String fullPath) throws IOException {
|
||||
|
||||
if (fullPath == null || fullPath.isBlank()) return 0;
|
||||
|
||||
Path src = Path.of(fullPath);
|
||||
|
||||
if (!Files.isRegularFile(src)) {
|
||||
log.warn("SKIP (not file): {}", src);
|
||||
return 0;
|
||||
}
|
||||
|
||||
String fileName = src.getFileName().toString();
|
||||
Path dst = tmp.resolve(type).resolve(part).resolve(fileName);
|
||||
|
||||
// 충돌 시 덮어쓰기
|
||||
if (Files.exists(dst)) {
|
||||
Files.delete(dst);
|
||||
}
|
||||
|
||||
Files.createLink(dst, src);
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
private String safe(String s) {
|
||||
return (s == null || s.isBlank()) ? null : s.trim();
|
||||
}
|
||||
|
||||
/**
|
||||
* request 전체 폴더 link
|
||||
*
|
||||
* @param uid
|
||||
* @param datasetUids
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
public String buildTmpDatasetSymlink(String uid, List<String> datasetUids) throws IOException {
|
||||
|
||||
log.info("========== buildTmpDatasetHardlink START ==========");
|
||||
log.info("uid={}", uid);
|
||||
log.info("datasetUids={}", datasetUids);
|
||||
log.info("requestDir(raw)={}", requestDir);
|
||||
|
||||
Path BASE = toPath(requestDir);
|
||||
Path tmp = Path.of(trainBaseDir, "tmp", uid);
|
||||
|
||||
long hardlinksMade = 0;
|
||||
long skippedCollision = 0;
|
||||
long noDir = 0;
|
||||
log.info("BASE={}", BASE);
|
||||
log.info("BASE exists? {}", Files.isDirectory(BASE));
|
||||
log.info("tmp={}", tmp);
|
||||
|
||||
// tmp/<type>/<part> 준비
|
||||
long noDir = 0, scannedDirs = 0, regularFiles = 0, hardlinksMade = 0;
|
||||
|
||||
// tmp 디렉토리 준비
|
||||
for (String type : List.of("train", "val", "test")) {
|
||||
for (String part : List.of("input1", "input2", "label", "label-json")) {
|
||||
Files.createDirectories(tmp.resolve(type).resolve(part));
|
||||
Path dir = tmp.resolve(type).resolve(part);
|
||||
Files.createDirectories(dir);
|
||||
log.info("createDirectories: {}", dir);
|
||||
}
|
||||
}
|
||||
|
||||
for (ModelTrainLinkDto dto : links) {
|
||||
String datasetUid = safe(dto.getDatasetUid());
|
||||
if (datasetUid == null) {
|
||||
log.warn("SKIP dto (datasetUid null): {}", dto);
|
||||
continue;
|
||||
// 하드링크는 "같은 파일시스템"에서만 가능하므로 BASE/tmp가 같은 FS인지 미리 확인(권장)
|
||||
try {
|
||||
var baseStore = Files.getFileStore(BASE);
|
||||
var tmpStore = Files.getFileStore(tmp.getParent()); // BASE/tmp
|
||||
if (!baseStore.name().equals(tmpStore.name()) || !baseStore.type().equals(tmpStore.type())) {
|
||||
throw new IOException(
|
||||
"Hardlink requires same filesystem. baseStore="
|
||||
+ baseStore.name()
|
||||
+ "("
|
||||
+ baseStore.type()
|
||||
+ "), tmpStore="
|
||||
+ tmpStore.name()
|
||||
+ "("
|
||||
+ tmpStore.type()
|
||||
+ ")");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// FileStore 비교가 환경마다 애매할 수 있어서, 여기서는 경고만 주고 실제 createLink에서 최종 판단하게 둘 수도 있음.
|
||||
log.warn("FileStore check skipped/failed (will rely on createLink): {}", e.toString());
|
||||
}
|
||||
|
||||
Path srcRoot = BASE.resolve(datasetUid);
|
||||
for (String id : datasetUids) {
|
||||
Path srcRoot = BASE.resolve(id);
|
||||
log.info("---- dataset id={} srcRoot={} exists? {}", id, srcRoot, Files.isDirectory(srcRoot));
|
||||
|
||||
for (String type : List.of("train", "val", "test")) {
|
||||
for (String part : List.of("input1", "input2", "label", "label-json")) {
|
||||
|
||||
Path srcDir = srcRoot.resolve(type).resolve(part);
|
||||
if (!Files.isDirectory(srcDir)) {
|
||||
log.warn("SKIP (not directory): {}", srcDir);
|
||||
noDir++;
|
||||
continue;
|
||||
}
|
||||
|
||||
// 하위폴더까지 전부
|
||||
try (var walk = Files.walk(srcDir)) {
|
||||
for (Path f : walk.filter(Files::isRegularFile).toList()) {
|
||||
scannedDirs++;
|
||||
log.info("SCAN dir={}", srcDir);
|
||||
|
||||
String fileName = f.getFileName().toString();
|
||||
Path dst = tmp.resolve(type).resolve(part).resolve(fileName);
|
||||
|
||||
// 이름 유지 + 충돌은 skip
|
||||
if (Files.exists(dst)) {
|
||||
skippedCollision++;
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(srcDir)) {
|
||||
for (Path f : stream) {
|
||||
if (!Files.isRegularFile(f)) {
|
||||
log.debug("skip non-regular file: {}", f);
|
||||
continue;
|
||||
}
|
||||
|
||||
regularFiles++;
|
||||
|
||||
String dstName = id + "__" + f.getFileName();
|
||||
Path dst = tmp.resolve(type).resolve(part).resolve(dstName);
|
||||
|
||||
// dst가 남아있으면 삭제(심볼릭링크든 파일이든)
|
||||
if (Files.exists(dst) || Files.isSymbolicLink(dst)) {
|
||||
Files.delete(dst);
|
||||
log.debug("deleted existing: {}", dst);
|
||||
}
|
||||
|
||||
try {
|
||||
// 하드링크 생성 (dst가 새 파일로 생기지만 inode는 f와 동일)
|
||||
Files.createLink(dst, f);
|
||||
hardlinksMade++;
|
||||
log.debug("created hardlink: {} => {}", dst, f);
|
||||
} catch (IOException e) {
|
||||
// 여기서 바로 실패시키면 “tmp는 만들었는데 내용은 0개” 같은 상태를 방지할 수 있음
|
||||
log.error("FAILED create hardlink: {} => {}", dst, f, e);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -92,35 +208,29 @@ public class TmpDatasetService {
|
||||
|
||||
if (hardlinksMade == 0) {
|
||||
throw new IOException(
|
||||
"No hardlinks created. noDir=" + noDir + ", skippedCollision=" + skippedCollision);
|
||||
"No hardlinks created. regularFiles="
|
||||
+ regularFiles
|
||||
+ ", scannedDirs="
|
||||
+ scannedDirs
|
||||
+ ", noDir="
|
||||
+ noDir);
|
||||
}
|
||||
|
||||
log.info("tmp dataset created: {}", tmp);
|
||||
log.info(
|
||||
"tmp dataset merged: {} (type={}), hardlinksMade={}, skippedCollision={}, noDir={}",
|
||||
tmp,
|
||||
type,
|
||||
hardlinksMade,
|
||||
skippedCollision,
|
||||
noDir);
|
||||
"summary: scannedDirs={}, noDir={}, regularFiles={}, hardlinksMade={}",
|
||||
scannedDirs,
|
||||
noDir,
|
||||
regularFiles,
|
||||
hardlinksMade);
|
||||
|
||||
return uid;
|
||||
}
|
||||
|
||||
private static Path toPath(String p) {
|
||||
if (p == null || p.isBlank()) {
|
||||
throw new IllegalArgumentException("path is null or blank");
|
||||
if (p.startsWith("~/")) {
|
||||
return Paths.get(System.getProperty("user.home")).resolve(p.substring(2)).normalize();
|
||||
}
|
||||
String trimmed = p.trim();
|
||||
if (trimmed.startsWith("~/")) {
|
||||
return Paths.get(System.getProperty("user.home"))
|
||||
.resolve(trimmed.substring(2))
|
||||
.toAbsolutePath()
|
||||
.normalize();
|
||||
}
|
||||
return Paths.get(trimmed).toAbsolutePath().normalize();
|
||||
}
|
||||
|
||||
private static String safe(String s) {
|
||||
return (s == null || s.isBlank()) ? null : s.trim();
|
||||
return Paths.get(p).toAbsolutePath().normalize();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -266,21 +266,28 @@ public class TrainJobService {
|
||||
List<String> uids = modelTrainMngCoreService.findDatasetUid(datasetIds);
|
||||
|
||||
try {
|
||||
// train path
|
||||
// 데이터셋 심볼링크 생성
|
||||
// String pathUid = tmpDatasetService.buildTmpDatasetSymlink(raw, uids);
|
||||
// train path 모델 클래스별 조회
|
||||
List<ModelTrainLinkDto> trainList = modelTrainMngCoreService.findDatasetTrainPath(modelId);
|
||||
// validation path
|
||||
// validation path 모델 클래스별 조회
|
||||
List<ModelTrainLinkDto> valList = modelTrainMngCoreService.findDatasetValPath(modelId);
|
||||
// test path
|
||||
// test path 모델 클래스별 조회
|
||||
List<ModelTrainLinkDto> testList = modelTrainMngCoreService.findDatasetTestPath(modelId);
|
||||
|
||||
log.info(
|
||||
"createTmpFile class list trainList = {} valList = {} testList = {}",
|
||||
trainList.size(),
|
||||
valList.size(),
|
||||
testList.size());
|
||||
|
||||
// train 데이터셋 심볼링크 생성
|
||||
tmpDatasetService.buildTmpDatasetSymlink(raw, "train", trainList);
|
||||
tmpDatasetService.buildTmpDatasetHardlink(raw, "train", trainList);
|
||||
// val 데이터셋 심볼링크 생성
|
||||
tmpDatasetService.buildTmpDatasetSymlink(raw, "val", valList);
|
||||
tmpDatasetService.buildTmpDatasetHardlink(raw, "val", valList);
|
||||
// test 데이터셋 심볼링크 생성
|
||||
tmpDatasetService.buildTmpDatasetSymlink(raw, "test", testList);
|
||||
// 카운트 로그
|
||||
dataSetCounters.getCount(modelId);
|
||||
tmpDatasetService.buildTmpDatasetHardlink(raw, "test", testList);
|
||||
|
||||
ModelTrainMngDto.UpdateReq updateReq = new ModelTrainMngDto.UpdateReq();
|
||||
updateReq.setRequestPath(raw);
|
||||
|
||||
|
||||
@@ -108,6 +108,10 @@ public class TrainJobWorker {
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* 0 정상 종료 SUCCESS 1~125 학습 코드 에러 FAILED 137 OOMKill FAILED 143 SIGTERM (stop) STOP -1 우리 내부
|
||||
* 강제 중단 STOP
|
||||
*/
|
||||
if (result.getExitCode() == 0) {
|
||||
// 성공 처리
|
||||
modelTrainJobCoreService.markSuccess(jobId, result.getExitCode());
|
||||
@@ -124,7 +128,23 @@ public class TrainJobWorker {
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
String failMsg = result.getStatus() + "\n" + result.getLogs();
|
||||
log.info("training fail exitCode={} Msg ={}", result.getExitCode(), failMsg);
|
||||
|
||||
if (result.getExitCode() == -1 || result.getExitCode() == 143) {
|
||||
// 실패 처리
|
||||
modelTrainJobCoreService.markPaused(
|
||||
jobId, result.getExitCode(), result.getStatus() + "\n" + result.getLogs());
|
||||
|
||||
if (isEval) {
|
||||
// 오류 정보 등록
|
||||
modelTrainMngCoreService.markStep2Stop(modelId, "exit=" + result.getExitCode());
|
||||
} else {
|
||||
// 오류 정보 등록
|
||||
modelTrainMngCoreService.markStep1Stop(modelId, "exit=" + result.getExitCode());
|
||||
}
|
||||
} else {
|
||||
// 실패 처리
|
||||
modelTrainJobCoreService.markFailed(
|
||||
jobId, result.getExitCode(), result.getStatus() + "\n" + result.getLogs());
|
||||
@@ -137,6 +157,7 @@ public class TrainJobWorker {
|
||||
modelTrainMngCoreService.markError(modelId, "exit=" + result.getExitCode());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
modelTrainJobCoreService.markFailed(jobId, null, e.getMessage());
|
||||
|
||||
Reference in New Issue
Block a user