diff --git a/src/main/java/com/kamco/cd/kamcoback/postgres/core/TrainingDataReviewJobCoreService.java b/src/main/java/com/kamco/cd/kamcoback/postgres/core/TrainingDataReviewJobCoreService.java new file mode 100644 index 00000000..5260f7af --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/postgres/core/TrainingDataReviewJobCoreService.java @@ -0,0 +1,40 @@ +package com.kamco.cd.kamcoback.postgres.core; + +import com.kamco.cd.kamcoback.postgres.repository.scheduler.TrainingDataReviewJobRepository; +import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.InspectorPendingDto; +import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.Tasks; +import java.util.List; +import java.util.UUID; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; + +@Service +@RequiredArgsConstructor +public class TrainingDataReviewJobCoreService { + + private final TrainingDataReviewJobRepository trainingDataReviewJobRepository; + + public List findCompletedYesterdayUnassigned() { + return trainingDataReviewJobRepository.findCompletedYesterdayUnassigned(); + } + + public void assignReviewer(UUID assignmentUid, String reviewerId) { + trainingDataReviewJobRepository.assignReviewer(assignmentUid, reviewerId); + } + + public void assignReviewerBatch(List assignmentUids, String reviewerId) { + trainingDataReviewJobRepository.assignReviewerBatch(assignmentUids, reviewerId); + } + + public Tasks findAssignmentTask(String assignmentUid) { + return trainingDataReviewJobRepository.findAssignmentTask(assignmentUid); + } + + public List findInspectorPendingByRound(Long analUid) { + return trainingDataReviewJobRepository.findInspectorPendingByRound(analUid); + } + + public void lockInspectors(Long analUid, List reviewerIds) { + trainingDataReviewJobRepository.lockInspectors(analUid, reviewerIds); + } +} diff --git a/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepository.java b/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepository.java new file mode 100644 index 00000000..430dacd0 --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepository.java @@ -0,0 +1,23 @@ +package com.kamco.cd.kamcoback.postgres.repository.scheduler; + +import com.kamco.cd.kamcoback.postgres.entity.LabelingInspectorEntity; +import jakarta.persistence.LockModeType; +import java.util.List; +import java.util.UUID; +import org.springframework.data.jpa.repository.JpaRepository; +import org.springframework.data.jpa.repository.Lock; +import org.springframework.data.jpa.repository.Query; + +public interface TrainingDataReviewJobRepository + extends JpaRepository, TrainingDataReviewJobRepositoryCustom { + + @Lock(LockModeType.PESSIMISTIC_WRITE) + @Query( + """ + select r + from LabelingInspectorEntity r + where r.analUid = :analUid + and r.inspectorUid in :inspectorUids + """) + List lockInspectors(Long analUid, List inspectorUids); +} diff --git a/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepositoryCustom.java b/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepositoryCustom.java new file mode 100644 index 00000000..f83b2f77 --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepositoryCustom.java @@ -0,0 +1,19 @@ +package com.kamco.cd.kamcoback.postgres.repository.scheduler; + +import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.InspectorPendingDto; +import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.Tasks; +import java.util.List; +import java.util.UUID; + +public interface TrainingDataReviewJobRepositoryCustom { + + List findCompletedYesterdayUnassigned(); + + List findInspectorPendingByRound(Long analUid); + + void assignReviewer(UUID assignmentUid, String reviewerId); + + void assignReviewerBatch(List assignmentUids, String reviewerId); + + Tasks findAssignmentTask(String assignmentUid); +} diff --git a/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepositoryImpl.java b/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepositoryImpl.java new file mode 100644 index 00000000..d7f22994 --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/postgres/repository/scheduler/TrainingDataReviewJobRepositoryImpl.java @@ -0,0 +1,135 @@ +package com.kamco.cd.kamcoback.postgres.repository.scheduler; + +import static com.kamco.cd.kamcoback.postgres.entity.QLabelingAssignmentEntity.labelingAssignmentEntity; +import static com.kamco.cd.kamcoback.postgres.entity.QLabelingInspectorEntity.labelingInspectorEntity; + +import com.kamco.cd.kamcoback.label.dto.LabelAllocateDto.InspectState; +import com.kamco.cd.kamcoback.label.dto.LabelAllocateDto.LabelState; +import com.kamco.cd.kamcoback.postgres.entity.LabelingAssignmentEntity; +import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.InspectorPendingDto; +import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.Tasks; +import com.querydsl.core.types.Projections; +import com.querydsl.core.types.dsl.BooleanExpression; +import com.querydsl.core.types.dsl.Expressions; +import com.querydsl.core.types.dsl.StringExpression; +import com.querydsl.jpa.impl.JPAQueryFactory; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; +import org.springframework.data.jpa.repository.support.QuerydslRepositorySupport; + +public class TrainingDataReviewJobRepositoryImpl extends QuerydslRepositorySupport + implements TrainingDataReviewJobRepositoryCustom { + + private final JPAQueryFactory queryFactory; + private final StringExpression NULL_STRING = Expressions.stringTemplate("cast(null as text)"); + + public TrainingDataReviewJobRepositoryImpl(JPAQueryFactory queryFactory) { + super(LabelingAssignmentEntity.class); + this.queryFactory = queryFactory; + } + + @Override + public List findCompletedYesterdayUnassigned() { + ZoneId zone = ZoneId.of("Asia/Seoul"); + ZonedDateTime todayStart = ZonedDateTime.now(zone).toLocalDate().atStartOfDay(zone); + ZonedDateTime yesterdayStart = todayStart.minusDays(1); + + BooleanExpression isYesterday = + labelingAssignmentEntity + .workStatDttm + .goe(yesterdayStart) + .and(labelingAssignmentEntity.workStatDttm.lt(todayStart)); + + return queryFactory + .select( + Projections.constructor( + Tasks.class, + labelingAssignmentEntity.assignmentUid, + labelingAssignmentEntity.inferenceGeomUid, + labelingAssignmentEntity.analUid)) + .from(labelingAssignmentEntity) + .where( + labelingAssignmentEntity.workState.in(LabelState.SKIP.getId(), LabelState.DONE.getId()), + labelingAssignmentEntity.inspectorUid.isNull(), + isYesterday) + .orderBy( + labelingAssignmentEntity.analUid.asc(), + labelingAssignmentEntity.assignGroupId.asc(), + labelingAssignmentEntity.inferenceGeomUid.asc()) + .fetch(); + } + + /** + * 해당 회차에 라벨링 할당받은 검수자별 완료 건수 count(), 완료한 게 적은 순으로 해야 일이 한 사람에게 몰리지 않음 + * + * @param analUid + * @return + */ + @Override + public List findInspectorPendingByRound(Long analUid) { + return queryFactory + .select( + Projections.constructor( + InspectorPendingDto.class, + labelingInspectorEntity.inspectorUid, + labelingAssignmentEntity.assignmentUid.count())) + .from(labelingInspectorEntity) + .leftJoin(labelingAssignmentEntity) + .on( + labelingInspectorEntity.inspectorUid.eq(labelingAssignmentEntity.inspectorUid), + labelingAssignmentEntity.inspectState.in( + InspectState.EXCEPT.getId(), InspectState.COMPLETE.getId())) + .where(labelingInspectorEntity.analUid.eq(analUid)) + .groupBy(labelingInspectorEntity.inspectorUid) + .orderBy(labelingAssignmentEntity.assignmentUid.count().asc()) + .fetch(); + } + + /** + * 실시간 분배용 1건 update + * + * @param assignmentUid + * @param reviewerId + */ + @Override + public void assignReviewer(UUID assignmentUid, String reviewerId) { + queryFactory + .update(labelingAssignmentEntity) + .set(labelingAssignmentEntity.inspectorUid, reviewerId) + .set(labelingAssignmentEntity.inspectState, InspectState.UNCONFIRM.getId()) + .where(labelingAssignmentEntity.assignmentUid.eq(assignmentUid)) + .execute(); + } + + /** + * 배치용 여러 건 update + * + * @param assignmentUids + * @param reviewerId + */ + @Override + public void assignReviewerBatch(List assignmentUids, String reviewerId) { + queryFactory + .update(labelingAssignmentEntity) + .set(labelingAssignmentEntity.inspectorUid, reviewerId) + .set(labelingAssignmentEntity.inspectState, InspectState.UNCONFIRM.getId()) + .where(labelingAssignmentEntity.assignmentUid.in(assignmentUids)) + .execute(); + } + + @Override + public Tasks findAssignmentTask(String assignmentUid) { + return queryFactory + .select( + Projections.constructor( + Tasks.class, + labelingAssignmentEntity.assignmentUid, + labelingAssignmentEntity.inferenceGeomUid, + labelingAssignmentEntity.analUid)) + .from(labelingAssignmentEntity) + .where(labelingAssignmentEntity.assignmentUid.eq(UUID.fromString(assignmentUid))) + .fetchOne(); + } +} diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/dto/TrainingDataReviewJobDto.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/dto/TrainingDataReviewJobDto.java new file mode 100644 index 00000000..343d9d4d --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/dto/TrainingDataReviewJobDto.java @@ -0,0 +1,31 @@ +package com.kamco.cd.kamcoback.scheduler.dto; + +import java.util.UUID; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.Setter; + +public class TrainingDataReviewJobDto { + + @Getter + @Setter + @RequiredArgsConstructor + @AllArgsConstructor + public static class Tasks { + + private UUID assignmentUid; + private Long inferenceUid; + private Long analUid; + } + + @Getter + @Setter + @RequiredArgsConstructor + @AllArgsConstructor + public static class InspectorPendingDto { + + String inspectorUid; + Long pendingCount; + } +} diff --git a/src/main/java/com/kamco/cd/kamcoback/scheduler/service/TrainingDataReviewJobService.java b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/TrainingDataReviewJobService.java new file mode 100644 index 00000000..25bf791b --- /dev/null +++ b/src/main/java/com/kamco/cd/kamcoback/scheduler/service/TrainingDataReviewJobService.java @@ -0,0 +1,131 @@ +package com.kamco.cd.kamcoback.scheduler.service; + +import com.kamco.cd.kamcoback.postgres.core.TrainingDataReviewJobCoreService; +import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.InspectorPendingDto; +import com.kamco.cd.kamcoback.scheduler.dto.TrainingDataReviewJobDto.Tasks; +import jakarta.transaction.Transactional; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Service; + +@Log4j2 +@Service +@RequiredArgsConstructor +public class TrainingDataReviewJobService { + + private final TrainingDataReviewJobCoreService trainingDataReviewJobCoreService; + + @Value("${spring.profiles.active}") + private String profile; + + private boolean isLocalProfile() { + return "local".equalsIgnoreCase(profile); + } + + @Transactional + @Scheduled(cron = "0 0 0 * * *") + public void assignReviewerYesterdayLabelComplete() { + + if (isLocalProfile()) { + return; + } + + try { + List tasks = trainingDataReviewJobCoreService.findCompletedYesterdayUnassigned(); + + if (tasks.isEmpty()) { + return; + } + + // 회차별로 그룹핑 + Map> taskByRound = + tasks.stream().collect(Collectors.groupingBy(Tasks::getAnalUid)); + + // 회차별 분배 + for (Map.Entry> entry : taskByRound.entrySet()) { + Long analUid = entry.getKey(); + List analTasks = entry.getValue(); + + // pending 계산 + List pendings = + trainingDataReviewJobCoreService.findInspectorPendingByRound(analUid); + + if (pendings.isEmpty()) { + continue; + } + + List reviewerIds = + pendings.stream().map(InspectorPendingDto::getInspectorUid).toList(); + + // Lock 걸릴 수 있기 때문에 엔티티 조회하는 Repository 에서 구현 + trainingDataReviewJobCoreService.lockInspectors(analUid, reviewerIds); + + // 균등 분배 + Map> assignMap = distributeByLeastPending(analTasks, reviewerIds); + + // reviewer별 batch update + assignMap.forEach( + (reviewerId, assignedTasks) -> { + if (assignedTasks.isEmpty()) { + return; + } + + List assignmentUids = + assignedTasks.stream().map(Tasks::getAssignmentUid).toList(); + + trainingDataReviewJobCoreService.assignReviewerBatch(assignmentUids, reviewerId); + }); + } + } catch (Exception e) { + log.error("배치 처리 중 예외", e); + } + } + + private Map> distributeByLeastPending( + List tasks, List reviewerIds) { + Map> result = new LinkedHashMap<>(); + + // 순서 유지 중요 (ASC 정렬된 상태) + for (String reviewerId : reviewerIds) { + result.put(reviewerId, new ArrayList<>()); + } + + int reviewerCount = reviewerIds.size(); + + for (int i = 0; i < tasks.size(); i++) { + String reviewerId = reviewerIds.get(i % reviewerCount); + result.get(reviewerId).add(tasks.get(i)); + } + + return result; + } + + // 라벨러 완료,SKIP 시 호출 + @Transactional + public void assignRealtime(String assignmentUid) { + Tasks task = trainingDataReviewJobCoreService.findAssignmentTask(assignmentUid); + Long analUid = task.getAnalUid(); + + // pending 계산 + List pendings = + trainingDataReviewJobCoreService.findInspectorPendingByRound(analUid); + + if (pendings.isEmpty()) { + return; + } + + List order = pendings.stream().map(InspectorPendingDto::getInspectorUid).toList(); + + trainingDataReviewJobCoreService.lockInspectors(analUid, order); + + trainingDataReviewJobCoreService.assignReviewer(task.getAssignmentUid(), order.getFirst()); + } +} diff --git a/src/main/java/com/kamco/cd/kamcoback/trainingdata/TrainingDataReviewApiController.java b/src/main/java/com/kamco/cd/kamcoback/trainingdata/TrainingDataReviewApiController.java index c0e3d1ee..18ead37a 100644 --- a/src/main/java/com/kamco/cd/kamcoback/trainingdata/TrainingDataReviewApiController.java +++ b/src/main/java/com/kamco/cd/kamcoback/trainingdata/TrainingDataReviewApiController.java @@ -3,6 +3,7 @@ package com.kamco.cd.kamcoback.trainingdata; import com.kamco.cd.kamcoback.code.dto.CommonCodeDto; import com.kamco.cd.kamcoback.config.api.ApiResponseDto; import com.kamco.cd.kamcoback.config.api.ApiResponseDto.ResponseObj; +import com.kamco.cd.kamcoback.scheduler.service.TrainingDataReviewJobService; import com.kamco.cd.kamcoback.trainingdata.dto.TrainingDataReviewDto; import com.kamco.cd.kamcoback.trainingdata.dto.TrainingDataReviewDto.ReviewGeometryInfo; import com.kamco.cd.kamcoback.trainingdata.dto.TrainingDataReviewDto.ReviewListDto; @@ -31,6 +32,7 @@ import org.springframework.web.bind.annotation.RestController; public class TrainingDataReviewApiController { private final TrainingDataReviewService trainingDataReviewService; + private final TrainingDataReviewJobService trainingDataReviewJobService; @Operation(summary = "목록 조회", description = "검수 할당 목록 조회") @ApiResponses( @@ -554,4 +556,11 @@ public class TrainingDataReviewApiController { return ApiResponseDto.ok( trainingDataReviewService.getCogImageUrl(mapSheetNum, beforeYear, afterYear)); } + + @Hidden + @GetMapping("/run-schedule") + public ApiResponseDto runTrainingReviewSchedule() { + trainingDataReviewJobService.assignReviewerYesterdayLabelComplete(); + return ApiResponseDto.ok(null); + } } diff --git a/src/main/java/com/kamco/cd/kamcoback/trainingdata/service/TrainingDataLabelService.java b/src/main/java/com/kamco/cd/kamcoback/trainingdata/service/TrainingDataLabelService.java index f4063521..b7af6862 100644 --- a/src/main/java/com/kamco/cd/kamcoback/trainingdata/service/TrainingDataLabelService.java +++ b/src/main/java/com/kamco/cd/kamcoback/trainingdata/service/TrainingDataLabelService.java @@ -3,6 +3,7 @@ package com.kamco.cd.kamcoback.trainingdata.service; import com.kamco.cd.kamcoback.config.api.ApiResponseDto.ApiResponseCode; import com.kamco.cd.kamcoback.config.api.ApiResponseDto.ResponseObj; import com.kamco.cd.kamcoback.postgres.core.TrainingDataLabelCoreService; +import com.kamco.cd.kamcoback.scheduler.service.TrainingDataReviewJobService; import com.kamco.cd.kamcoback.trainingdata.dto.TrainingDataLabelDto; import com.kamco.cd.kamcoback.trainingdata.dto.TrainingDataLabelDto.DefaultPaging; import com.kamco.cd.kamcoback.trainingdata.dto.TrainingDataLabelDto.DetailRes; @@ -22,9 +23,13 @@ import org.springframework.stereotype.Service; public class TrainingDataLabelService { private final TrainingDataLabelCoreService trainingDataLabelCoreService; + private final TrainingDataReviewJobService trainingDataReviewJobService; - public TrainingDataLabelService(TrainingDataLabelCoreService trainingDataLabelCoreService) { + public TrainingDataLabelService( + TrainingDataLabelCoreService trainingDataLabelCoreService, + TrainingDataReviewJobService trainingDataReviewJobService) { this.trainingDataLabelCoreService = trainingDataLabelCoreService; + this.trainingDataReviewJobService = trainingDataReviewJobService; } public Page findLabelingAssignedList(searchReq searchReq, String userId) { @@ -52,6 +57,10 @@ public class TrainingDataLabelService { trainingDataLabelCoreService.updateLabelingPolygonClass( inferenceGeomUid, request.getGeometry(), request.getProperties(), status); } + + // 라벨링 완료하면 실시간 검수 할당 (1건) + trainingDataReviewJobService.assignRealtime(assignmentUid); + return status; }