실패 메시지 저장 추가
This commit is contained in:
@@ -4,7 +4,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.kamco.cd.kamcoback.common.exception.CustomApiException;
|
||||
import com.kamco.cd.kamcoback.common.service.ExternalJarRunner;
|
||||
import com.kamco.cd.kamcoback.config.resttemplate.ExternalHttpClient;
|
||||
import com.kamco.cd.kamcoback.config.resttemplate.ExternalHttpClient.ExternalCallResult;
|
||||
import com.kamco.cd.kamcoback.inference.dto.InferenceDetailDto.InferenceBatchSheet;
|
||||
@@ -14,12 +13,15 @@ import com.kamco.cd.kamcoback.inference.dto.InferenceResultDto.Status;
|
||||
import com.kamco.cd.kamcoback.inference.dto.InferenceResultsTestingDto;
|
||||
import com.kamco.cd.kamcoback.inference.dto.InferenceSendDto;
|
||||
import com.kamco.cd.kamcoback.postgres.core.InferenceResultCoreService;
|
||||
import com.kamco.cd.kamcoback.scheduler.dto.BatchStatusDto;
|
||||
import com.kamco.cd.kamcoback.scheduler.dto.JobStatusDto;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
@@ -41,7 +43,6 @@ public class MapSheetInferenceJobService {
|
||||
|
||||
private final ExternalHttpClient externalHttpClient;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ExternalJarRunner externalJarRunner;
|
||||
|
||||
@Value("${inference.batch-url}")
|
||||
private String batchUrl;
|
||||
@@ -79,7 +80,7 @@ public class MapSheetInferenceJobService {
|
||||
}
|
||||
|
||||
// 추론실행 상태 정보 가져오기
|
||||
JobStatusDto job = fetchJobStatus(batchId);
|
||||
BatchStatusDto job = batchesStatus(batchId);
|
||||
if (job == null) {
|
||||
// TODO END가 아닌 상태의 추론목록이 있고 추론 api 호출할때 404면 end로 상태 업데이트
|
||||
// 학습 상태 종료
|
||||
@@ -139,7 +140,7 @@ public class MapSheetInferenceJobService {
|
||||
* @return
|
||||
* @throws JsonProcessingException
|
||||
*/
|
||||
private JobStatusDto fetchJobStatus(Long batchId) throws JsonProcessingException {
|
||||
private BatchStatusDto batchesStatus(Long batchId) throws JsonProcessingException {
|
||||
String url = batchUrl + "/" + batchId;
|
||||
|
||||
ExternalCallResult<String> result =
|
||||
@@ -154,6 +155,31 @@ public class MapSheetInferenceJobService {
|
||||
return null;
|
||||
}
|
||||
|
||||
return objectMapper.readValue(result.body(), BatchStatusDto.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* job 정보 조회
|
||||
*
|
||||
* @param jobId
|
||||
* @return
|
||||
* @throws JsonProcessingException
|
||||
*/
|
||||
private JobStatusDto jobStatus(Long jobId) throws JsonProcessingException {
|
||||
String url = inferenceUrl + "/" + jobId;
|
||||
|
||||
ExternalCallResult<String> result =
|
||||
externalHttpClient.call(url, HttpMethod.GET, null, jsonHeaders(), String.class);
|
||||
|
||||
int status = result.statusCode();
|
||||
if (status == 404) {
|
||||
log.info("Batch not found. batchId={}", jobId);
|
||||
return null;
|
||||
}
|
||||
if (status < 200 || status >= 300) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return objectMapper.readValue(result.body(), JobStatusDto.class);
|
||||
}
|
||||
|
||||
@@ -170,7 +196,7 @@ public class MapSheetInferenceJobService {
|
||||
* @param dto
|
||||
* @return
|
||||
*/
|
||||
private boolean isCompleted(JobStatusDto dto) {
|
||||
private boolean isCompleted(BatchStatusDto dto) {
|
||||
return dto.getTotalJobs() <= (dto.getCompletedJobs() + dto.getFailedJobs());
|
||||
}
|
||||
|
||||
@@ -180,7 +206,7 @@ public class MapSheetInferenceJobService {
|
||||
* @param sheet
|
||||
* @param job
|
||||
*/
|
||||
private void onCompleted(InferenceBatchSheet sheet, JobStatusDto job) {
|
||||
private void onCompleted(InferenceBatchSheet sheet, BatchStatusDto job) {
|
||||
String currentType = sheet.getRunningModelType();
|
||||
ZonedDateTime now = ZonedDateTime.now();
|
||||
|
||||
@@ -288,7 +314,7 @@ public class MapSheetInferenceJobService {
|
||||
* @param sheet
|
||||
* @param job
|
||||
*/
|
||||
private void onProcessing(InferenceBatchSheet sheet, JobStatusDto job) {
|
||||
private void onProcessing(InferenceBatchSheet sheet, BatchStatusDto job) {
|
||||
SaveInferenceAiDto save = new SaveInferenceAiDto();
|
||||
save.setUuid(sheet.getUuid());
|
||||
save.setStatus(Status.IN_PROGRESS.getId());
|
||||
@@ -298,6 +324,11 @@ public class MapSheetInferenceJobService {
|
||||
save.setFailedJobs(job.getFailedJobs());
|
||||
save.setType(sheet.getRunningModelType());
|
||||
inferenceResultCoreService.update(save);
|
||||
|
||||
// 실패 메시지 저장
|
||||
saveFail5k(job, sheet.getUuid(), sheet.getRunningModelType());
|
||||
// 성공 job id 저장
|
||||
saveCompleted5k(job, sheet.getUuid(), sheet.getRunningModelType());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -444,7 +475,7 @@ public class MapSheetInferenceJobService {
|
||||
|
||||
/** 모델별 추론 종료 update */
|
||||
private void updateProcessingEndTimeByModel(
|
||||
JobStatusDto dto, UUID uuid, ZonedDateTime dateTime, String type) {
|
||||
BatchStatusDto dto, UUID uuid, ZonedDateTime dateTime, String type) {
|
||||
SaveInferenceAiDto saveInferenceAiDto = new SaveInferenceAiDto();
|
||||
saveInferenceAiDto.setUuid(uuid);
|
||||
saveInferenceAiDto.setUpdateUid(0L);
|
||||
@@ -456,13 +487,10 @@ public class MapSheetInferenceJobService {
|
||||
saveInferenceAiDto.setFailedJobs(dto.getFailedJobs());
|
||||
inferenceResultCoreService.update(saveInferenceAiDto);
|
||||
|
||||
List<Long> failedIds =
|
||||
Optional.ofNullable(dto.getFailedIds()).orElse(List.of()).stream()
|
||||
.map(Long::valueOf)
|
||||
.toList();
|
||||
|
||||
// 도엽별 실패여부 업데이트
|
||||
inferenceResultCoreService.saveFail5k(uuid, failedIds, type);
|
||||
// 실패 메시지 저장
|
||||
saveFail5k(dto, uuid, type);
|
||||
// 성공 job id 저장
|
||||
saveCompleted5k(dto, uuid, type);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -477,4 +505,72 @@ public class MapSheetInferenceJobService {
|
||||
request.setUpdateUid(0L);
|
||||
inferenceResultCoreService.update(request);
|
||||
}
|
||||
|
||||
/**
|
||||
* 도엽별 실패여부 업데이트
|
||||
*
|
||||
* @param dto batch 정보
|
||||
* @param uuid uuid
|
||||
* @param type 모델 타입
|
||||
*/
|
||||
private void saveFail5k(BatchStatusDto dto, UUID uuid, String type) {
|
||||
|
||||
List<Long> failedIds =
|
||||
Optional.ofNullable(dto.getFailedIds()).orElse(List.of()).stream()
|
||||
.map(Long::valueOf)
|
||||
.toList();
|
||||
|
||||
List<Long> jobIds = inferenceResultCoreService.findFail5kList(uuid, failedIds, type);
|
||||
|
||||
Set<Long> jobIdSet = new HashSet<>(jobIds);
|
||||
|
||||
List<Long> remainFailedIds = failedIds.stream().filter(id -> !jobIdSet.contains(id)).toList();
|
||||
|
||||
if (remainFailedIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (Long failedId : remainFailedIds) {
|
||||
JobStatusDto jobStatusDto = jobStatus(failedId);
|
||||
inferenceResultCoreService.saveFail5k(uuid, jobStatusDto, type);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 도엽별 job id 저장
|
||||
*
|
||||
* @param dto batch 정보
|
||||
* @param uuid uuid
|
||||
* @param type 모델 타입
|
||||
*/
|
||||
private void saveCompleted5k(BatchStatusDto dto, UUID uuid, String type) {
|
||||
|
||||
List<Long> completedIds =
|
||||
Optional.ofNullable(dto.getCompletedIds()).orElse(List.of()).stream()
|
||||
.map(Long::valueOf)
|
||||
.toList();
|
||||
|
||||
List<Long> jobIds = inferenceResultCoreService.findCompleted5kList(uuid, completedIds, type);
|
||||
|
||||
Set<Long> jobIdSet = new HashSet<>(jobIds);
|
||||
|
||||
List<Long> remainIds = completedIds.stream().filter(id -> !jobIdSet.contains(id)).toList();
|
||||
|
||||
if (remainIds.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
for (Long id : remainIds) {
|
||||
JobStatusDto jobStatusDto = jobStatus(id);
|
||||
inferenceResultCoreService.saveJobId(uuid, jobStatusDto, type);
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,7 @@ public class ShpPipelineService {
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("SHP pipeline failed. inferenceId={}", inferenceId, e);
|
||||
// TODO 필요하면 실패 상태 업데이트 로직 추가
|
||||
// TODO 실패 상태 업데이트 로직 추가
|
||||
} finally {
|
||||
shpKeyLock.unlock(inferenceId);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user