From 4fcc645f63c017e70e193997bb3214ea4212327a Mon Sep 17 00:00:00 2001 From: dean Date: Thu, 26 Mar 2026 14:54:24 +0900 Subject: [PATCH] =?UTF-8?q?oom=EC=B2=98=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/config/BatchConfiguration.java | 15 ++++ .../processor/FeatureConversionProcessor.java | 31 +++----- .../GeometryTypeValidationTasklet.java | 8 ++ .../batch/writer/MapIdGeoJsonWriter.java | 14 ++-- .../batch/writer/MapIdShapefileWriter.java | 53 +++++++------ .../writer/StreamingShapefileWriter.java | 75 +++++++++++++------ 6 files changed, 122 insertions(+), 74 deletions(-) diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/config/BatchConfiguration.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/config/BatchConfiguration.java index d72d0f6..d93dde8 100644 --- a/shp-exporter/src/main/java/com/kamco/makesample/batch/config/BatchConfiguration.java +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/config/BatchConfiguration.java @@ -1,6 +1,9 @@ package com.kamco.makesample.batch.config; import com.kamco.makesample.config.ConverterProperties; +import org.geotools.api.referencing.FactoryException; +import org.geotools.api.referencing.crs.CoordinateReferenceSystem; +import org.geotools.referencing.CRS; import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -38,6 +41,18 @@ public class BatchConfiguration { *

추가 설정이 필요하면 여기에 Bean을 정의 */ + /** + * 애플리케이션 전체에서 재사용되는 CoordinateReferenceSystem 싱글톤 빈 + * + *

CRS.decode()는 GeoTools HSQL EPSG 팩토리를 초기화하므로 비용이 큼. @StepScope 컴포넌트마다 호출되면 파티셔닝 단계에서 CRS 객체가 + * 누적되어 OOM을 유발합니다. 싱글톤으로 캐싱하여 한 번만 초기화합니다. + */ + @Bean + public CoordinateReferenceSystem coordinateReferenceSystem() throws FactoryException { + String crsCode = properties.getCrs() != null ? properties.getCrs() : "EPSG:5186"; + return CRS.decode(crsCode); + } + /** * Map ID별 파일 생성을 위한 TaskExecutor * diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java index b2e3d9c..d6ced76 100644 --- a/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java @@ -4,10 +4,8 @@ import com.kamco.makesample.batch.util.FeatureTypeFactory; import com.kamco.makesample.model.InferenceResult; import org.geotools.api.feature.simple.SimpleFeature; import org.geotools.api.feature.simple.SimpleFeatureType; -import org.geotools.api.referencing.FactoryException; import org.geotools.api.referencing.crs.CoordinateReferenceSystem; import org.geotools.feature.simple.SimpleFeatureBuilder; -import org.geotools.referencing.CRS; import org.locationtech.jts.geom.Geometry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,7 +14,6 @@ import org.springframework.batch.core.annotation.BeforeStep; import org.springframework.batch.core.configuration.annotation.StepScope; import org.springframework.batch.item.ExecutionContext; import org.springframework.batch.item.ItemProcessor; -import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** @@ -39,15 +36,15 @@ public class FeatureConversionProcessor implements ItemProcessor { private FileOutputStream outputStream; private FeatureJSON featureJSON; + private ByteArrayOutputStream buffer; private int chunkCount = 0; private int totalRecordCount = 0; @@ -91,6 +92,7 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter { File geoJsonFile = new File(outputPath); outputStream = new FileOutputStream(geoJsonFile); featureJSON = new FeatureJSON(); + buffer = new ByteArrayOutputStream(8192); // GeoJSON FeatureCollection 시작 outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes()); @@ -129,11 +131,11 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter { outputStream.write(",".getBytes()); } - // Feature를 GeoJSON으로 직렬화 - // StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지 - StringWriter stringWriter = new StringWriter(); - featureJSON.writeFeature(feature, stringWriter); - outputStream.write(stringWriter.toString().getBytes()); + // Feature를 GeoJSON으로 직렬화 (ByteArrayOutputStream 재사용으로 String 객체 생성 방지) + // ByteArrayOutputStream.close()는 no-op이므로 FeatureJSON이 닫아도 안전 + buffer.reset(); + featureJSON.writeFeature(feature, buffer); + buffer.writeTo(outputStream); } isFirstChunk = false; diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java index c989c04..59d7ae1 100644 --- a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java @@ -14,13 +14,11 @@ import org.geotools.api.data.SimpleFeatureStore; import org.geotools.api.data.Transaction; import org.geotools.api.feature.simple.SimpleFeature; import org.geotools.api.feature.simple.SimpleFeatureType; -import org.geotools.api.referencing.FactoryException; import org.geotools.api.referencing.crs.CoordinateReferenceSystem; import org.geotools.data.DefaultTransaction; import org.geotools.data.collection.ListFeatureCollection; import org.geotools.data.shapefile.ShapefileDataStore; import org.geotools.data.shapefile.ShapefileDataStoreFactory; -import org.geotools.referencing.CRS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepExecution; @@ -48,11 +46,10 @@ import org.springframework.stereotype.Component; public class MapIdShapefileWriter implements ItemStreamWriter { private static final Logger log = LoggerFactory.getLogger(MapIdShapefileWriter.class); + private static final int LOG_INTERVAL_CHUNKS = 5; // 5청크(=5,000건)마다 진행 로그 private final FeatureTypeFactory featureTypeFactory; - - @Value("#{jobParameters['crs'] ?: 'EPSG:5186'}") - private String crsCode; + private final CoordinateReferenceSystem crs; @Value("#{stepExecutionContext['mapId']}") private String mapId; @@ -70,11 +67,14 @@ public class MapIdShapefileWriter implements ItemStreamWriter { private int chunkCount = 0; private int totalRecordCount = 0; + private long startTimeMs = 0; private Class geometryType; - public MapIdShapefileWriter(FeatureTypeFactory featureTypeFactory) { + public MapIdShapefileWriter( + FeatureTypeFactory featureTypeFactory, CoordinateReferenceSystem coordinateReferenceSystem) { this.featureTypeFactory = featureTypeFactory; + this.crs = coordinateReferenceSystem; } @BeforeStep @@ -143,10 +143,7 @@ public class MapIdShapefileWriter implements ItemStreamWriter { geometryType.getSimpleName(), geometryTypeStr); - // CRS 설정 - CoordinateReferenceSystem crs = CRS.decode(crsCode); - - // SimpleFeatureType 생성 + // SimpleFeatureType 생성 (주입된 싱글톤 CRS 재사용 - CRS.decode() 반복 호출 방지) featureType = featureTypeFactory.createFeatureType(crs, geometryType); // ShapefileDataStore 생성 @@ -168,13 +165,12 @@ public class MapIdShapefileWriter implements ItemStreamWriter { featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName); featureStore.setTransaction(transaction); + startTimeMs = System.currentTimeMillis(); log.info( "ShapefileDataStore initialized for map_id: {} with geometry type: {}", mapId, geometryType.getSimpleName()); - } catch (FactoryException e) { - throw new ItemStreamException("Invalid CRS code: " + crsCode, e); } catch (IOException e) { throw new ItemStreamException("Failed to create shapefile for map_id " + mapId, e); } @@ -191,27 +187,36 @@ public class MapIdShapefileWriter implements ItemStreamWriter { int itemCount = items.size(); totalRecordCount += itemCount; - log.debug( - "[map_id: {}] Writing chunk #{} with {} features (total: {})", - mapId, - chunkCount, - itemCount, - totalRecordCount); - - // ListFeatureCollection으로 변환하여 FeatureStore에 추가 ListFeatureCollection collection = new ListFeatureCollection(featureType, items); featureStore.addFeatures(collection); - log.debug("[map_id: {}] Chunk #{} written successfully", mapId, chunkCount); + if (chunkCount % LOG_INTERVAL_CHUNKS == 0) { + long elapsedSec = Math.max((System.currentTimeMillis() - startTimeMs) / 1000, 1); + long throughput = totalRecordCount / elapsedSec; + log.info( + "[map_id: {}] 진행: {}건 | 경과: {} | 속도: {}건/s", + mapId, + String.format("%,d", totalRecordCount), + formatDuration(elapsedSec), + String.format("%,d", throughput)); + } } @AfterStep public void afterStep() { + long elapsedSec = Math.max((System.currentTimeMillis() - startTimeMs) / 1000, 1); + long throughput = totalRecordCount / elapsedSec; log.info( - "[map_id: {}] AfterStep called. Total {} records in {} chunks", + "[map_id: {}] 완료: {}건 처리 | 소요: {} | 평균 속도: {}건/s", mapId, - totalRecordCount, - chunkCount); + String.format("%,d", totalRecordCount), + formatDuration(elapsedSec), + String.format("%,d", throughput)); + } + + private String formatDuration(long seconds) { + if (seconds < 60) return seconds + "초"; + return (seconds / 60) + "분 " + (seconds % 60) + "초"; } @Override diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java index e0f949e..34d34b1 100644 --- a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java @@ -14,13 +14,11 @@ import org.geotools.api.data.SimpleFeatureStore; import org.geotools.api.data.Transaction; import org.geotools.api.feature.simple.SimpleFeature; import org.geotools.api.feature.simple.SimpleFeatureType; -import org.geotools.api.referencing.FactoryException; import org.geotools.api.referencing.crs.CoordinateReferenceSystem; import org.geotools.data.DefaultTransaction; import org.geotools.data.collection.ListFeatureCollection; import org.geotools.data.shapefile.ShapefileDataStore; import org.geotools.data.shapefile.ShapefileDataStoreFactory; -import org.geotools.referencing.CRS; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.batch.core.StepExecution; @@ -62,15 +60,14 @@ import org.springframework.stereotype.Component; public class StreamingShapefileWriter implements ItemStreamWriter { private static final Logger log = LoggerFactory.getLogger(StreamingShapefileWriter.class); + private static final int LOG_INTERVAL_CHUNKS = 10; // 10청크(=10,000건)마다 진행 로그 private final FeatureTypeFactory featureTypeFactory; + private final CoordinateReferenceSystem crs; @Value("#{jobParameters['outputPath']}") private String outputPath; - @Value("#{jobParameters['crs'] ?: 'EPSG:5186'}") - private String crsCode; - private ShapefileDataStore dataStore; private Transaction transaction; private SimpleFeatureStore featureStore; @@ -78,11 +75,15 @@ public class StreamingShapefileWriter implements ItemStreamWriter private int chunkCount = 0; private int totalRecordCount = 0; + private long totalExpected = 0; // Validation Tasklet에서 조회한 전체 건수 + private long startTimeMs = 0; private Class geometryType; // Geometry type from validation tasklet - public StreamingShapefileWriter(FeatureTypeFactory featureTypeFactory) { + public StreamingShapefileWriter( + FeatureTypeFactory featureTypeFactory, CoordinateReferenceSystem coordinateReferenceSystem) { this.featureTypeFactory = featureTypeFactory; + this.crs = coordinateReferenceSystem; } @BeforeStep @@ -101,6 +102,12 @@ public class StreamingShapefileWriter implements ItemStreamWriter "geometryType not set in Job ExecutionContext (empty dataset). Will use default in open()"); } + // 진행률 표시용 전체 건수 읽기 + if (jobExecutionContext.containsKey("totalValidRecords")) { + totalExpected = jobExecutionContext.getLong("totalValidRecords"); + log.info("[Shapefile] 처리 예정 건수: {}건", String.format("%,d", totalExpected)); + } + // 출력 디렉토리 생성 try { Path outputDir = Paths.get(outputPath).getParent(); @@ -118,9 +125,6 @@ public class StreamingShapefileWriter implements ItemStreamWriter log.info("Opening StreamingShapefileWriter for: {}", outputPath); try { - // CRS 설정 - CoordinateReferenceSystem crs = CRS.decode(crsCode); - // Geometry type이 아직 설정되지 않은 경우 기본값 사용 if (geometryType == null) { geometryType = featureTypeFactory.parseGeometryType(null); @@ -149,10 +153,9 @@ public class StreamingShapefileWriter implements ItemStreamWriter featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName); featureStore.setTransaction(transaction); + startTimeMs = System.currentTimeMillis(); log.info("ShapefileDataStore initialized successfully"); - } catch (FactoryException e) { - throw new ItemStreamException("Invalid CRS code: " + crsCode, e); } catch (IOException e) { throw new ItemStreamException("Failed to create shapefile at: " + outputPath, e); } @@ -169,27 +172,22 @@ public class StreamingShapefileWriter implements ItemStreamWriter int itemCount = items.size(); totalRecordCount += itemCount; - log.debug( - "Writing chunk #{} with {} features (total so far: {})", - chunkCount, - itemCount, - totalRecordCount); - - // ListFeatureCollection으로 변환 (청크만 담김) - ListFeatureCollection collection = new ListFeatureCollection(featureType, items); - // FeatureStore에 추가 - GeoTools ShapefileDataStore는 Diff 없이 파일에 직접 씀 // 트랜잭션은 afterStep()에서 단일 커밋 (per-chunk 커밋 시 setTransaction()이 .shx 재스캔 → O(n²)) + ListFeatureCollection collection = new ListFeatureCollection(featureType, items); featureStore.addFeatures(collection); - log.debug("Chunk #{} written successfully", chunkCount); + if (chunkCount % LOG_INTERVAL_CHUNKS == 0) { + logProgress(); + } } @AfterStep public void afterStep() { + logProgress(); // 완료 시 최종 진행 상황 출력 log.info( - "All chunks written ({} records in {} chunks). Finalizing shapefile.", - totalRecordCount, + "[Shapefile] 완료: {}건 처리 ({}개 청크). 파일 저장 중...", + String.format("%,d", totalRecordCount), chunkCount); try { @@ -234,6 +232,37 @@ public class StreamingShapefileWriter implements ItemStreamWriter } } + private void logProgress() { + long elapsedMs = System.currentTimeMillis() - startTimeMs; + long elapsedSec = Math.max(elapsedMs / 1000, 1); + long throughput = totalRecordCount / elapsedSec; + + if (totalExpected > 0) { + double pct = totalRecordCount * 100.0 / totalExpected; + long remaining = totalRecordCount < totalExpected ? totalExpected - totalRecordCount : 0; + long etaSec = throughput > 0 ? remaining / throughput : 0; + log.info( + "[Shapefile] 진행: {} / {}건 ({}) | 경과: {} | 속도: {}건/s | 예상 남은: {}", + String.format("%,d", totalRecordCount), + String.format("%,d", totalExpected), + String.format("%.1f%%", pct), + formatDuration(elapsedSec), + String.format("%,d", throughput), + formatDuration(etaSec)); + } else { + log.info( + "[Shapefile] 진행: {}건 처리 중 | 경과: {} | 속도: {}건/s", + String.format("%,d", totalRecordCount), + formatDuration(elapsedSec), + String.format("%,d", throughput)); + } + } + + private String formatDuration(long seconds) { + if (seconds < 60) return seconds + "초"; + return (seconds / 60) + "분 " + (seconds % 60) + "초"; + } + private void cleanup() { if (transaction != null) { try {