라벨링 어제 완료된 건 검수할당 스케줄링 + 라벨 저장 시 실시간 할당 추가
This commit is contained in:
@@ -0,0 +1,31 @@
|
||||
package com.kamco.cd.kamcoback.scheduler.dto;
|
||||
|
||||
import java.util.UUID;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.Setter;
|
||||
|
||||
public class TrainingDataReviewJobDto {
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@RequiredArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class Tasks {
|
||||
|
||||
private UUID assignmentUid;
|
||||
private Long inferenceUid;
|
||||
private Long analUid;
|
||||
}
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@RequiredArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public static class InspectorPendingDto {
|
||||
|
||||
String inspectorUid;
|
||||
Long pendingCount;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,131 @@
|
||||
package com.kamco.cd.kamcoback.scheduler.service;
|
||||
|
||||
import com.kamco.cd.kamcoback.postgres.core.TrainingDataReviewJobCoreService;
|
||||
import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.InspectorPendingDto;
|
||||
import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.Tasks;
|
||||
import jakarta.transaction.Transactional;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.log4j.Log4j2;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
@Log4j2
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class TrainingDataReviewJobService {
|
||||
|
||||
private final TrainingDataReviewJobCoreService trainingDataReviewJobCoreService;
|
||||
|
||||
@Value("${spring.profiles.active}")
|
||||
private String profile;
|
||||
|
||||
private boolean isLocalProfile() {
|
||||
return "local".equalsIgnoreCase(profile);
|
||||
}
|
||||
|
||||
@Transactional
|
||||
@Scheduled(cron = "0 0 0 * * *")
|
||||
public void assignReviewerYesterdayLabelComplete() {
|
||||
|
||||
if (isLocalProfile()) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
List<Tasks> tasks = trainingDataReviewJobCoreService.findCompletedYesterdayUnassigned();
|
||||
|
||||
if (tasks.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// 회차별로 그룹핑
|
||||
Map<Long, List<Tasks>> taskByRound =
|
||||
tasks.stream().collect(Collectors.groupingBy(Tasks::getAnalUid));
|
||||
|
||||
// 회차별 분배
|
||||
for (Map.Entry<Long, List<Tasks>> entry : taskByRound.entrySet()) {
|
||||
Long analUid = entry.getKey();
|
||||
List<Tasks> analTasks = entry.getValue();
|
||||
|
||||
// pending 계산
|
||||
List<InspectorPendingDto> pendings =
|
||||
trainingDataReviewJobCoreService.findInspectorPendingByRound(analUid);
|
||||
|
||||
if (pendings.isEmpty()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
List<String> reviewerIds =
|
||||
pendings.stream().map(InspectorPendingDto::getInspectorUid).toList();
|
||||
|
||||
// Lock 걸릴 수 있기 때문에 엔티티 조회하는 Repository 에서 구현
|
||||
trainingDataReviewJobCoreService.lockInspectors(analUid, reviewerIds);
|
||||
|
||||
// 균등 분배
|
||||
Map<String, List<Tasks>> assignMap = distributeByLeastPending(analTasks, reviewerIds);
|
||||
|
||||
// reviewer별 batch update
|
||||
assignMap.forEach(
|
||||
(reviewerId, assignedTasks) -> {
|
||||
if (assignedTasks.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<UUID> assignmentUids =
|
||||
assignedTasks.stream().map(Tasks::getAssignmentUid).toList();
|
||||
|
||||
trainingDataReviewJobCoreService.assignReviewerBatch(assignmentUids, reviewerId);
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("배치 처리 중 예외", e);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, List<Tasks>> distributeByLeastPending(
|
||||
List<Tasks> tasks, List<String> reviewerIds) {
|
||||
Map<String, List<Tasks>> result = new LinkedHashMap<>();
|
||||
|
||||
// 순서 유지 중요 (ASC 정렬된 상태)
|
||||
for (String reviewerId : reviewerIds) {
|
||||
result.put(reviewerId, new ArrayList<>());
|
||||
}
|
||||
|
||||
int reviewerCount = reviewerIds.size();
|
||||
|
||||
for (int i = 0; i < tasks.size(); i++) {
|
||||
String reviewerId = reviewerIds.get(i % reviewerCount);
|
||||
result.get(reviewerId).add(tasks.get(i));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// 라벨러 완료,SKIP 시 호출
|
||||
@Transactional
|
||||
public void assignRealtime(String assignmentUid) {
|
||||
Tasks task = trainingDataReviewJobCoreService.findAssignmentTask(assignmentUid);
|
||||
Long analUid = task.getAnalUid();
|
||||
|
||||
// pending 계산
|
||||
List<InspectorPendingDto> pendings =
|
||||
trainingDataReviewJobCoreService.findInspectorPendingByRound(analUid);
|
||||
|
||||
if (pendings.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> order = pendings.stream().map(InspectorPendingDto::getInspectorUid).toList();
|
||||
|
||||
trainingDataReviewJobCoreService.lockInspectors(analUid, order);
|
||||
|
||||
trainingDataReviewJobCoreService.assignReviewer(task.getAssignmentUid(), order.getFirst());
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user