test3
This commit is contained in:
@@ -103,8 +103,8 @@ public class BatchJobConfig {
|
|||||||
+ " AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon') "
|
+ " AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon') "
|
||||||
+ " AND ST_SRID(geometry) = 5186 "
|
+ " AND ST_SRID(geometry) = 5186 "
|
||||||
+ " AND ST_IsValid(geometry) = true "
|
+ " AND ST_IsValid(geometry) = true "
|
||||||
+ " AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000 "
|
+ " AND after_c IS NOT NULL "
|
||||||
+ " AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000 "
|
+ " AND after_p IS NOT NULL "
|
||||||
+ "ORDER BY map_id, uid";
|
+ "ORDER BY map_id, uid";
|
||||||
|
|
||||||
PreparedStatementSetter pss = ps -> {
|
PreparedStatementSetter pss = ps -> {
|
||||||
|
|||||||
@@ -54,11 +54,37 @@ public class GeomTypeTasklet implements Tasklet {
|
|||||||
String resolved = "Polygon";
|
String resolved = "Polygon";
|
||||||
log.info("Using geometry type: {}", resolved);
|
log.info("Using geometry type: {}", resolved);
|
||||||
|
|
||||||
chunkContext.getStepContext()
|
// 전체 map_id 수 및 레코드 수 사전 집계
|
||||||
|
long[] counts = new long[]{0, 0};
|
||||||
|
jdbcTemplate.query(
|
||||||
|
con -> {
|
||||||
|
var ps = con.prepareStatement(
|
||||||
|
"SELECT COUNT(DISTINCT map_id), COUNT(*) "
|
||||||
|
+ "FROM inference_results_testing "
|
||||||
|
+ "WHERE batch_id = ANY(?) "
|
||||||
|
+ " AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon') "
|
||||||
|
+ " AND ST_SRID(geometry) = 5186 "
|
||||||
|
+ " AND ST_IsValid(geometry) = true "
|
||||||
|
+ " AND after_c IS NOT NULL "
|
||||||
|
+ " AND after_p IS NOT NULL");
|
||||||
|
Array arr = con.createArrayOf("bigint", batchIds.toArray());
|
||||||
|
ps.setArray(1, arr);
|
||||||
|
return ps;
|
||||||
|
},
|
||||||
|
rs -> {
|
||||||
|
counts[0] = rs.getLong(1);
|
||||||
|
counts[1] = rs.getLong(2);
|
||||||
|
});
|
||||||
|
|
||||||
|
log.info("처리 대상: map_id {}개, 레코드 {}건", counts[0], counts[1]);
|
||||||
|
|
||||||
|
var jobCtx = chunkContext.getStepContext()
|
||||||
.getStepExecution()
|
.getStepExecution()
|
||||||
.getJobExecution()
|
.getJobExecution()
|
||||||
.getExecutionContext()
|
.getExecutionContext();
|
||||||
.putString("geometryType", resolved);
|
jobCtx.putString("geometryType", resolved);
|
||||||
|
jobCtx.putLong("totalMapIds", counts[0]);
|
||||||
|
jobCtx.putLong("totalExpectedRecords", counts[1]);
|
||||||
|
|
||||||
return RepeatStatus.FINISHED;
|
return RepeatStatus.FINISHED;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -49,7 +49,7 @@ import org.springframework.stereotype.Component;
|
|||||||
public class MapIdSwitchingWriter implements ItemStreamWriter<InferenceResult> {
|
public class MapIdSwitchingWriter implements ItemStreamWriter<InferenceResult> {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MapIdSwitchingWriter.class);
|
private static final Logger log = LoggerFactory.getLogger(MapIdSwitchingWriter.class);
|
||||||
private static final int LOG_INTERVAL = 500; // 500 map_id마다 진행 로그
|
private static final int LOG_INTERVAL_DEFAULT = 100; // 기본 진행 로그 간격
|
||||||
|
|
||||||
private final ExporterProperties properties;
|
private final ExporterProperties properties;
|
||||||
private final CoordinateReferenceSystem crs;
|
private final CoordinateReferenceSystem crs;
|
||||||
@@ -68,6 +68,8 @@ public class MapIdSwitchingWriter implements ItemStreamWriter<InferenceResult> {
|
|||||||
private int totalFiles = 0;
|
private int totalFiles = 0;
|
||||||
private long totalRecords = 0;
|
private long totalRecords = 0;
|
||||||
private long startTimeMs;
|
private long startTimeMs;
|
||||||
|
private long totalMapIds = 0; // GeomTypeTasklet에서 사전 집계
|
||||||
|
private int logInterval = LOG_INTERVAL_DEFAULT;
|
||||||
|
|
||||||
public MapIdSwitchingWriter(ExporterProperties properties, CoordinateReferenceSystem crs) {
|
public MapIdSwitchingWriter(ExporterProperties properties, CoordinateReferenceSystem crs) {
|
||||||
this.properties = properties;
|
this.properties = properties;
|
||||||
@@ -76,17 +78,24 @@ public class MapIdSwitchingWriter implements ItemStreamWriter<InferenceResult> {
|
|||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
public void beforeStep(StepExecution stepExecution) {
|
public void beforeStep(StepExecution stepExecution) {
|
||||||
String geomTypeStr = stepExecution.getJobExecution()
|
var jobCtx = stepExecution.getJobExecution().getExecutionContext();
|
||||||
.getExecutionContext()
|
String geomTypeStr = jobCtx.getString("geometryType", "Polygon");
|
||||||
.getString("geometryType", "Polygon");
|
this.totalMapIds = jobCtx.getLong("totalMapIds", 0L);
|
||||||
|
long totalExpectedRecords = jobCtx.getLong("totalExpectedRecords", 0L);
|
||||||
|
|
||||||
|
// 전체의 10% 단위로 로그 (최소 1, 최대 500)
|
||||||
|
if (totalMapIds > 0) {
|
||||||
|
logInterval = (int) Math.min(500, Math.max(1, totalMapIds / 10));
|
||||||
|
}
|
||||||
|
|
||||||
Class<?> geomClass = resolveGeometryClass(geomTypeStr);
|
Class<?> geomClass = resolveGeometryClass(geomTypeStr);
|
||||||
this.featureType = buildFeatureType(geomClass);
|
this.featureType = buildFeatureType(geomClass);
|
||||||
this.featureBuilder = new SimpleFeatureBuilder(featureType);
|
this.featureBuilder = new SimpleFeatureBuilder(featureType);
|
||||||
this.startTimeMs = System.currentTimeMillis();
|
this.startTimeMs = System.currentTimeMillis();
|
||||||
|
|
||||||
log.info("MapIdSwitchingWriter initialized. geometryType={}, outputBase={}/{}",
|
log.info("MapIdSwitchingWriter initialized. geometryType={}, outputBase={}/{}, 처리 대상: map_id {}개 / 레코드 {}건",
|
||||||
geomTypeStr, properties.getOutputBaseDir(), properties.getInferenceId());
|
geomTypeStr, properties.getOutputBaseDir(), properties.getInferenceId(),
|
||||||
|
totalMapIds, totalExpectedRecords);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -117,10 +126,18 @@ public class MapIdSwitchingWriter implements ItemStreamWriter<InferenceResult> {
|
|||||||
currentMapId = mapId;
|
currentMapId = mapId;
|
||||||
totalFiles++;
|
totalFiles++;
|
||||||
|
|
||||||
if (totalFiles % LOG_INTERVAL == 0) {
|
if (totalFiles % logInterval == 0) {
|
||||||
long elapsed = Math.max((System.currentTimeMillis() - startTimeMs) / 1000, 1);
|
long elapsed = Math.max((System.currentTimeMillis() - startTimeMs) / 1000, 1);
|
||||||
log.info("진행: map_id {}개 완료 | 총 {}건 | 경과 {}s | 속도 {}건/s",
|
double rate = (double) totalFiles / elapsed; // map_id/s 기준 ETA 계산
|
||||||
totalFiles, totalRecords, elapsed, totalRecords / elapsed);
|
String progress = totalMapIds > 0
|
||||||
|
? String.format("%d / %d (%.1f%%)", totalFiles, totalMapIds, totalFiles * 100.0 / totalMapIds)
|
||||||
|
: String.valueOf(totalFiles);
|
||||||
|
String eta = (totalMapIds > 0 && rate > 0)
|
||||||
|
? formatSeconds((long) ((totalMapIds - totalFiles) / rate))
|
||||||
|
: "N/A";
|
||||||
|
log.info("진행: map_id {} 완료 | 총 {}건 | 경과 {}s | 속도 {}건/s | 남은시간 {} | thread={}",
|
||||||
|
progress, totalRecords, elapsed, totalRecords / elapsed,
|
||||||
|
eta, Thread.currentThread().getName());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -232,6 +249,12 @@ public class MapIdSwitchingWriter implements ItemStreamWriter<InferenceResult> {
|
|||||||
return builder.buildFeatureType();
|
return builder.buildFeatureType();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static String formatSeconds(long seconds) {
|
||||||
|
if (seconds < 60) return seconds + "s";
|
||||||
|
if (seconds < 3600) return String.format("%dm %ds", seconds / 60, seconds % 60);
|
||||||
|
return String.format("%dh %dm", seconds / 3600, (seconds % 3600) / 60);
|
||||||
|
}
|
||||||
|
|
||||||
private Class<?> resolveGeometryClass(String typeStr) {
|
private Class<?> resolveGeometryClass(String typeStr) {
|
||||||
try {
|
try {
|
||||||
String name = typeStr.replace("ST_", "");
|
String name = typeStr.replace("ST_", "");
|
||||||
|
|||||||
Reference in New Issue
Block a user