oom처리
This commit is contained in:
@@ -1,6 +1,9 @@
|
|||||||
package com.kamco.makesample.batch.config;
|
package com.kamco.makesample.batch.config;
|
||||||
|
|
||||||
import com.kamco.makesample.config.ConverterProperties;
|
import com.kamco.makesample.config.ConverterProperties;
|
||||||
|
import org.geotools.api.referencing.FactoryException;
|
||||||
|
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
||||||
|
import org.geotools.referencing.CRS;
|
||||||
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
@@ -38,6 +41,18 @@ public class BatchConfiguration {
|
|||||||
* <p>추가 설정이 필요하면 여기에 Bean을 정의
|
* <p>추가 설정이 필요하면 여기에 Bean을 정의
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 애플리케이션 전체에서 재사용되는 CoordinateReferenceSystem 싱글톤 빈
|
||||||
|
*
|
||||||
|
* <p>CRS.decode()는 GeoTools HSQL EPSG 팩토리를 초기화하므로 비용이 큼. @StepScope 컴포넌트마다 호출되면 파티셔닝 단계에서 CRS 객체가
|
||||||
|
* 누적되어 OOM을 유발합니다. 싱글톤으로 캐싱하여 한 번만 초기화합니다.
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public CoordinateReferenceSystem coordinateReferenceSystem() throws FactoryException {
|
||||||
|
String crsCode = properties.getCrs() != null ? properties.getCrs() : "EPSG:5186";
|
||||||
|
return CRS.decode(crsCode);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map ID별 파일 생성을 위한 TaskExecutor
|
* Map ID별 파일 생성을 위한 TaskExecutor
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -4,10 +4,8 @@ import com.kamco.makesample.batch.util.FeatureTypeFactory;
|
|||||||
import com.kamco.makesample.model.InferenceResult;
|
import com.kamco.makesample.model.InferenceResult;
|
||||||
import org.geotools.api.feature.simple.SimpleFeature;
|
import org.geotools.api.feature.simple.SimpleFeature;
|
||||||
import org.geotools.api.feature.simple.SimpleFeatureType;
|
import org.geotools.api.feature.simple.SimpleFeatureType;
|
||||||
import org.geotools.api.referencing.FactoryException;
|
|
||||||
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
||||||
import org.geotools.feature.simple.SimpleFeatureBuilder;
|
import org.geotools.feature.simple.SimpleFeatureBuilder;
|
||||||
import org.geotools.referencing.CRS;
|
|
||||||
import org.locationtech.jts.geom.Geometry;
|
import org.locationtech.jts.geom.Geometry;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -16,7 +14,6 @@ import org.springframework.batch.core.annotation.BeforeStep;
|
|||||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
import org.springframework.batch.item.ExecutionContext;
|
import org.springframework.batch.item.ExecutionContext;
|
||||||
import org.springframework.batch.item.ItemProcessor;
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -39,15 +36,15 @@ public class FeatureConversionProcessor implements ItemProcessor<InferenceResult
|
|||||||
private static final Logger log = LoggerFactory.getLogger(FeatureConversionProcessor.class);
|
private static final Logger log = LoggerFactory.getLogger(FeatureConversionProcessor.class);
|
||||||
|
|
||||||
private final FeatureTypeFactory featureTypeFactory;
|
private final FeatureTypeFactory featureTypeFactory;
|
||||||
|
private final CoordinateReferenceSystem crs;
|
||||||
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
|
|
||||||
private String crsCode;
|
|
||||||
|
|
||||||
private SimpleFeatureBuilder featureBuilder;
|
private SimpleFeatureBuilder featureBuilder;
|
||||||
private SimpleFeatureType featureType;
|
private SimpleFeatureType featureType;
|
||||||
|
|
||||||
public FeatureConversionProcessor(FeatureTypeFactory featureTypeFactory) {
|
public FeatureConversionProcessor(
|
||||||
|
FeatureTypeFactory featureTypeFactory, CoordinateReferenceSystem coordinateReferenceSystem) {
|
||||||
this.featureTypeFactory = featureTypeFactory;
|
this.featureTypeFactory = featureTypeFactory;
|
||||||
|
this.crs = coordinateReferenceSystem;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
@@ -66,21 +63,13 @@ public class FeatureConversionProcessor implements ItemProcessor<InferenceResult
|
|||||||
|
|
||||||
geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
|
geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
|
||||||
|
|
||||||
try {
|
// FeatureType 생성 (주입된 싱글톤 CRS 재사용 - CRS.decode() 반복 호출 방지)
|
||||||
// CRS 설정
|
this.featureType = featureTypeFactory.createFeatureType(crs, geometryType);
|
||||||
CoordinateReferenceSystem crs = CRS.decode(crsCode);
|
this.featureBuilder = new SimpleFeatureBuilder(this.featureType);
|
||||||
|
|
||||||
// FeatureType 생성
|
log.info(
|
||||||
this.featureType = featureTypeFactory.createFeatureType(crs, geometryType);
|
"FeatureConversionProcessor initialized with geometry type: {}",
|
||||||
this.featureBuilder = new SimpleFeatureBuilder(this.featureType);
|
geometryType.getSimpleName());
|
||||||
|
|
||||||
log.info(
|
|
||||||
"FeatureConversionProcessor initialized with geometry type: {}",
|
|
||||||
geometryType.getSimpleName());
|
|
||||||
|
|
||||||
} catch (FactoryException e) {
|
|
||||||
throw new RuntimeException("Failed to initialize FeatureConversionProcessor", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -172,6 +172,14 @@ public class GeometryTypeValidationTasklet implements Tasklet {
|
|||||||
}
|
}
|
||||||
log.info("========================================");
|
log.info("========================================");
|
||||||
|
|
||||||
|
// 진행률 표시를 위해 유효 건수를 JobExecutionContext에 저장
|
||||||
|
chunkContext
|
||||||
|
.getStepContext()
|
||||||
|
.getStepExecution()
|
||||||
|
.getJobExecution()
|
||||||
|
.getExecutionContext()
|
||||||
|
.putLong("totalValidRecords", validRows);
|
||||||
|
|
||||||
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
// 3. Mixed geometry type 체크 및 자동 변환 안내
|
// 3. Mixed geometry type 체크 및 자동 변환 안내
|
||||||
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
package com.kamco.makesample.batch.writer;
|
package com.kamco.makesample.batch.writer;
|
||||||
|
|
||||||
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.StringWriter;
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
@@ -46,6 +46,7 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
private FileOutputStream outputStream;
|
private FileOutputStream outputStream;
|
||||||
private FeatureJSON featureJSON;
|
private FeatureJSON featureJSON;
|
||||||
|
private ByteArrayOutputStream buffer;
|
||||||
|
|
||||||
private int chunkCount = 0;
|
private int chunkCount = 0;
|
||||||
private int totalRecordCount = 0;
|
private int totalRecordCount = 0;
|
||||||
@@ -91,6 +92,7 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
File geoJsonFile = new File(outputPath);
|
File geoJsonFile = new File(outputPath);
|
||||||
outputStream = new FileOutputStream(geoJsonFile);
|
outputStream = new FileOutputStream(geoJsonFile);
|
||||||
featureJSON = new FeatureJSON();
|
featureJSON = new FeatureJSON();
|
||||||
|
buffer = new ByteArrayOutputStream(8192);
|
||||||
|
|
||||||
// GeoJSON FeatureCollection 시작
|
// GeoJSON FeatureCollection 시작
|
||||||
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
|
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
|
||||||
@@ -129,11 +131,11 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
outputStream.write(",".getBytes());
|
outputStream.write(",".getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Feature를 GeoJSON으로 직렬화
|
// Feature를 GeoJSON으로 직렬화 (ByteArrayOutputStream 재사용으로 String 객체 생성 방지)
|
||||||
// StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
|
// ByteArrayOutputStream.close()는 no-op이므로 FeatureJSON이 닫아도 안전
|
||||||
StringWriter stringWriter = new StringWriter();
|
buffer.reset();
|
||||||
featureJSON.writeFeature(feature, stringWriter);
|
featureJSON.writeFeature(feature, buffer);
|
||||||
outputStream.write(stringWriter.toString().getBytes());
|
buffer.writeTo(outputStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
isFirstChunk = false;
|
isFirstChunk = false;
|
||||||
|
|||||||
@@ -14,13 +14,11 @@ import org.geotools.api.data.SimpleFeatureStore;
|
|||||||
import org.geotools.api.data.Transaction;
|
import org.geotools.api.data.Transaction;
|
||||||
import org.geotools.api.feature.simple.SimpleFeature;
|
import org.geotools.api.feature.simple.SimpleFeature;
|
||||||
import org.geotools.api.feature.simple.SimpleFeatureType;
|
import org.geotools.api.feature.simple.SimpleFeatureType;
|
||||||
import org.geotools.api.referencing.FactoryException;
|
|
||||||
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
||||||
import org.geotools.data.DefaultTransaction;
|
import org.geotools.data.DefaultTransaction;
|
||||||
import org.geotools.data.collection.ListFeatureCollection;
|
import org.geotools.data.collection.ListFeatureCollection;
|
||||||
import org.geotools.data.shapefile.ShapefileDataStore;
|
import org.geotools.data.shapefile.ShapefileDataStore;
|
||||||
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
|
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
|
||||||
import org.geotools.referencing.CRS;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.batch.core.StepExecution;
|
import org.springframework.batch.core.StepExecution;
|
||||||
@@ -48,11 +46,10 @@ import org.springframework.stereotype.Component;
|
|||||||
public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MapIdShapefileWriter.class);
|
private static final Logger log = LoggerFactory.getLogger(MapIdShapefileWriter.class);
|
||||||
|
private static final int LOG_INTERVAL_CHUNKS = 5; // 5청크(=5,000건)마다 진행 로그
|
||||||
|
|
||||||
private final FeatureTypeFactory featureTypeFactory;
|
private final FeatureTypeFactory featureTypeFactory;
|
||||||
|
private final CoordinateReferenceSystem crs;
|
||||||
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
|
|
||||||
private String crsCode;
|
|
||||||
|
|
||||||
@Value("#{stepExecutionContext['mapId']}")
|
@Value("#{stepExecutionContext['mapId']}")
|
||||||
private String mapId;
|
private String mapId;
|
||||||
@@ -70,11 +67,14 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
private int chunkCount = 0;
|
private int chunkCount = 0;
|
||||||
private int totalRecordCount = 0;
|
private int totalRecordCount = 0;
|
||||||
|
private long startTimeMs = 0;
|
||||||
|
|
||||||
private Class<?> geometryType;
|
private Class<?> geometryType;
|
||||||
|
|
||||||
public MapIdShapefileWriter(FeatureTypeFactory featureTypeFactory) {
|
public MapIdShapefileWriter(
|
||||||
|
FeatureTypeFactory featureTypeFactory, CoordinateReferenceSystem coordinateReferenceSystem) {
|
||||||
this.featureTypeFactory = featureTypeFactory;
|
this.featureTypeFactory = featureTypeFactory;
|
||||||
|
this.crs = coordinateReferenceSystem;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
@@ -143,10 +143,7 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
geometryType.getSimpleName(),
|
geometryType.getSimpleName(),
|
||||||
geometryTypeStr);
|
geometryTypeStr);
|
||||||
|
|
||||||
// CRS 설정
|
// SimpleFeatureType 생성 (주입된 싱글톤 CRS 재사용 - CRS.decode() 반복 호출 방지)
|
||||||
CoordinateReferenceSystem crs = CRS.decode(crsCode);
|
|
||||||
|
|
||||||
// SimpleFeatureType 생성
|
|
||||||
featureType = featureTypeFactory.createFeatureType(crs, geometryType);
|
featureType = featureTypeFactory.createFeatureType(crs, geometryType);
|
||||||
|
|
||||||
// ShapefileDataStore 생성
|
// ShapefileDataStore 생성
|
||||||
@@ -168,13 +165,12 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
|
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
|
||||||
featureStore.setTransaction(transaction);
|
featureStore.setTransaction(transaction);
|
||||||
|
|
||||||
|
startTimeMs = System.currentTimeMillis();
|
||||||
log.info(
|
log.info(
|
||||||
"ShapefileDataStore initialized for map_id: {} with geometry type: {}",
|
"ShapefileDataStore initialized for map_id: {} with geometry type: {}",
|
||||||
mapId,
|
mapId,
|
||||||
geometryType.getSimpleName());
|
geometryType.getSimpleName());
|
||||||
|
|
||||||
} catch (FactoryException e) {
|
|
||||||
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ItemStreamException("Failed to create shapefile for map_id " + mapId, e);
|
throw new ItemStreamException("Failed to create shapefile for map_id " + mapId, e);
|
||||||
}
|
}
|
||||||
@@ -191,27 +187,36 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
int itemCount = items.size();
|
int itemCount = items.size();
|
||||||
totalRecordCount += itemCount;
|
totalRecordCount += itemCount;
|
||||||
|
|
||||||
log.debug(
|
|
||||||
"[map_id: {}] Writing chunk #{} with {} features (total: {})",
|
|
||||||
mapId,
|
|
||||||
chunkCount,
|
|
||||||
itemCount,
|
|
||||||
totalRecordCount);
|
|
||||||
|
|
||||||
// ListFeatureCollection으로 변환하여 FeatureStore에 추가
|
|
||||||
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
|
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
|
||||||
featureStore.addFeatures(collection);
|
featureStore.addFeatures(collection);
|
||||||
|
|
||||||
log.debug("[map_id: {}] Chunk #{} written successfully", mapId, chunkCount);
|
if (chunkCount % LOG_INTERVAL_CHUNKS == 0) {
|
||||||
|
long elapsedSec = Math.max((System.currentTimeMillis() - startTimeMs) / 1000, 1);
|
||||||
|
long throughput = totalRecordCount / elapsedSec;
|
||||||
|
log.info(
|
||||||
|
"[map_id: {}] 진행: {}건 | 경과: {} | 속도: {}건/s",
|
||||||
|
mapId,
|
||||||
|
String.format("%,d", totalRecordCount),
|
||||||
|
formatDuration(elapsedSec),
|
||||||
|
String.format("%,d", throughput));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterStep
|
@AfterStep
|
||||||
public void afterStep() {
|
public void afterStep() {
|
||||||
|
long elapsedSec = Math.max((System.currentTimeMillis() - startTimeMs) / 1000, 1);
|
||||||
|
long throughput = totalRecordCount / elapsedSec;
|
||||||
log.info(
|
log.info(
|
||||||
"[map_id: {}] AfterStep called. Total {} records in {} chunks",
|
"[map_id: {}] 완료: {}건 처리 | 소요: {} | 평균 속도: {}건/s",
|
||||||
mapId,
|
mapId,
|
||||||
totalRecordCount,
|
String.format("%,d", totalRecordCount),
|
||||||
chunkCount);
|
formatDuration(elapsedSec),
|
||||||
|
String.format("%,d", throughput));
|
||||||
|
}
|
||||||
|
|
||||||
|
private String formatDuration(long seconds) {
|
||||||
|
if (seconds < 60) return seconds + "초";
|
||||||
|
return (seconds / 60) + "분 " + (seconds % 60) + "초";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -14,13 +14,11 @@ import org.geotools.api.data.SimpleFeatureStore;
|
|||||||
import org.geotools.api.data.Transaction;
|
import org.geotools.api.data.Transaction;
|
||||||
import org.geotools.api.feature.simple.SimpleFeature;
|
import org.geotools.api.feature.simple.SimpleFeature;
|
||||||
import org.geotools.api.feature.simple.SimpleFeatureType;
|
import org.geotools.api.feature.simple.SimpleFeatureType;
|
||||||
import org.geotools.api.referencing.FactoryException;
|
|
||||||
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
||||||
import org.geotools.data.DefaultTransaction;
|
import org.geotools.data.DefaultTransaction;
|
||||||
import org.geotools.data.collection.ListFeatureCollection;
|
import org.geotools.data.collection.ListFeatureCollection;
|
||||||
import org.geotools.data.shapefile.ShapefileDataStore;
|
import org.geotools.data.shapefile.ShapefileDataStore;
|
||||||
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
|
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
|
||||||
import org.geotools.referencing.CRS;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.batch.core.StepExecution;
|
import org.springframework.batch.core.StepExecution;
|
||||||
@@ -62,15 +60,14 @@ import org.springframework.stereotype.Component;
|
|||||||
public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(StreamingShapefileWriter.class);
|
private static final Logger log = LoggerFactory.getLogger(StreamingShapefileWriter.class);
|
||||||
|
private static final int LOG_INTERVAL_CHUNKS = 10; // 10청크(=10,000건)마다 진행 로그
|
||||||
|
|
||||||
private final FeatureTypeFactory featureTypeFactory;
|
private final FeatureTypeFactory featureTypeFactory;
|
||||||
|
private final CoordinateReferenceSystem crs;
|
||||||
|
|
||||||
@Value("#{jobParameters['outputPath']}")
|
@Value("#{jobParameters['outputPath']}")
|
||||||
private String outputPath;
|
private String outputPath;
|
||||||
|
|
||||||
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
|
|
||||||
private String crsCode;
|
|
||||||
|
|
||||||
private ShapefileDataStore dataStore;
|
private ShapefileDataStore dataStore;
|
||||||
private Transaction transaction;
|
private Transaction transaction;
|
||||||
private SimpleFeatureStore featureStore;
|
private SimpleFeatureStore featureStore;
|
||||||
@@ -78,11 +75,15 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
|
|
||||||
private int chunkCount = 0;
|
private int chunkCount = 0;
|
||||||
private int totalRecordCount = 0;
|
private int totalRecordCount = 0;
|
||||||
|
private long totalExpected = 0; // Validation Tasklet에서 조회한 전체 건수
|
||||||
|
private long startTimeMs = 0;
|
||||||
|
|
||||||
private Class<?> geometryType; // Geometry type from validation tasklet
|
private Class<?> geometryType; // Geometry type from validation tasklet
|
||||||
|
|
||||||
public StreamingShapefileWriter(FeatureTypeFactory featureTypeFactory) {
|
public StreamingShapefileWriter(
|
||||||
|
FeatureTypeFactory featureTypeFactory, CoordinateReferenceSystem coordinateReferenceSystem) {
|
||||||
this.featureTypeFactory = featureTypeFactory;
|
this.featureTypeFactory = featureTypeFactory;
|
||||||
|
this.crs = coordinateReferenceSystem;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
@@ -101,6 +102,12 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
"geometryType not set in Job ExecutionContext (empty dataset). Will use default in open()");
|
"geometryType not set in Job ExecutionContext (empty dataset). Will use default in open()");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 진행률 표시용 전체 건수 읽기
|
||||||
|
if (jobExecutionContext.containsKey("totalValidRecords")) {
|
||||||
|
totalExpected = jobExecutionContext.getLong("totalValidRecords");
|
||||||
|
log.info("[Shapefile] 처리 예정 건수: {}건", String.format("%,d", totalExpected));
|
||||||
|
}
|
||||||
|
|
||||||
// 출력 디렉토리 생성
|
// 출력 디렉토리 생성
|
||||||
try {
|
try {
|
||||||
Path outputDir = Paths.get(outputPath).getParent();
|
Path outputDir = Paths.get(outputPath).getParent();
|
||||||
@@ -118,9 +125,6 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
log.info("Opening StreamingShapefileWriter for: {}", outputPath);
|
log.info("Opening StreamingShapefileWriter for: {}", outputPath);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// CRS 설정
|
|
||||||
CoordinateReferenceSystem crs = CRS.decode(crsCode);
|
|
||||||
|
|
||||||
// Geometry type이 아직 설정되지 않은 경우 기본값 사용
|
// Geometry type이 아직 설정되지 않은 경우 기본값 사용
|
||||||
if (geometryType == null) {
|
if (geometryType == null) {
|
||||||
geometryType = featureTypeFactory.parseGeometryType(null);
|
geometryType = featureTypeFactory.parseGeometryType(null);
|
||||||
@@ -149,10 +153,9 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
|
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
|
||||||
featureStore.setTransaction(transaction);
|
featureStore.setTransaction(transaction);
|
||||||
|
|
||||||
|
startTimeMs = System.currentTimeMillis();
|
||||||
log.info("ShapefileDataStore initialized successfully");
|
log.info("ShapefileDataStore initialized successfully");
|
||||||
|
|
||||||
} catch (FactoryException e) {
|
|
||||||
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ItemStreamException("Failed to create shapefile at: " + outputPath, e);
|
throw new ItemStreamException("Failed to create shapefile at: " + outputPath, e);
|
||||||
}
|
}
|
||||||
@@ -169,27 +172,22 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
int itemCount = items.size();
|
int itemCount = items.size();
|
||||||
totalRecordCount += itemCount;
|
totalRecordCount += itemCount;
|
||||||
|
|
||||||
log.debug(
|
|
||||||
"Writing chunk #{} with {} features (total so far: {})",
|
|
||||||
chunkCount,
|
|
||||||
itemCount,
|
|
||||||
totalRecordCount);
|
|
||||||
|
|
||||||
// ListFeatureCollection으로 변환 (청크만 담김)
|
|
||||||
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
|
|
||||||
|
|
||||||
// FeatureStore에 추가 - GeoTools ShapefileDataStore는 Diff 없이 파일에 직접 씀
|
// FeatureStore에 추가 - GeoTools ShapefileDataStore는 Diff 없이 파일에 직접 씀
|
||||||
// 트랜잭션은 afterStep()에서 단일 커밋 (per-chunk 커밋 시 setTransaction()이 .shx 재스캔 → O(n²))
|
// 트랜잭션은 afterStep()에서 단일 커밋 (per-chunk 커밋 시 setTransaction()이 .shx 재스캔 → O(n²))
|
||||||
|
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
|
||||||
featureStore.addFeatures(collection);
|
featureStore.addFeatures(collection);
|
||||||
|
|
||||||
log.debug("Chunk #{} written successfully", chunkCount);
|
if (chunkCount % LOG_INTERVAL_CHUNKS == 0) {
|
||||||
|
logProgress();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterStep
|
@AfterStep
|
||||||
public void afterStep() {
|
public void afterStep() {
|
||||||
|
logProgress(); // 완료 시 최종 진행 상황 출력
|
||||||
log.info(
|
log.info(
|
||||||
"All chunks written ({} records in {} chunks). Finalizing shapefile.",
|
"[Shapefile] 완료: {}건 처리 ({}개 청크). 파일 저장 중...",
|
||||||
totalRecordCount,
|
String.format("%,d", totalRecordCount),
|
||||||
chunkCount);
|
chunkCount);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -234,6 +232,37 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void logProgress() {
|
||||||
|
long elapsedMs = System.currentTimeMillis() - startTimeMs;
|
||||||
|
long elapsedSec = Math.max(elapsedMs / 1000, 1);
|
||||||
|
long throughput = totalRecordCount / elapsedSec;
|
||||||
|
|
||||||
|
if (totalExpected > 0) {
|
||||||
|
double pct = totalRecordCount * 100.0 / totalExpected;
|
||||||
|
long remaining = totalRecordCount < totalExpected ? totalExpected - totalRecordCount : 0;
|
||||||
|
long etaSec = throughput > 0 ? remaining / throughput : 0;
|
||||||
|
log.info(
|
||||||
|
"[Shapefile] 진행: {} / {}건 ({}) | 경과: {} | 속도: {}건/s | 예상 남은: {}",
|
||||||
|
String.format("%,d", totalRecordCount),
|
||||||
|
String.format("%,d", totalExpected),
|
||||||
|
String.format("%.1f%%", pct),
|
||||||
|
formatDuration(elapsedSec),
|
||||||
|
String.format("%,d", throughput),
|
||||||
|
formatDuration(etaSec));
|
||||||
|
} else {
|
||||||
|
log.info(
|
||||||
|
"[Shapefile] 진행: {}건 처리 중 | 경과: {} | 속도: {}건/s",
|
||||||
|
String.format("%,d", totalRecordCount),
|
||||||
|
formatDuration(elapsedSec),
|
||||||
|
String.format("%,d", throughput));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private String formatDuration(long seconds) {
|
||||||
|
if (seconds < 60) return seconds + "초";
|
||||||
|
return (seconds / 60) + "분 " + (seconds % 60) + "초";
|
||||||
|
}
|
||||||
|
|
||||||
private void cleanup() {
|
private void cleanup() {
|
||||||
if (transaction != null) {
|
if (transaction != null) {
|
||||||
try {
|
try {
|
||||||
|
|||||||
Reference in New Issue
Block a user