make shapefile
This commit is contained in:
@@ -155,7 +155,7 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
Map<String, Serializable> params = new HashMap<>();
|
Map<String, Serializable> params = new HashMap<>();
|
||||||
params.put("url", shpFile.toURI().toURL());
|
params.put("url", shpFile.toURI().toURL());
|
||||||
params.put("create spatial index", Boolean.TRUE);
|
params.put("create spatial index", Boolean.FALSE);
|
||||||
|
|
||||||
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
|
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
|
||||||
dataStore.createSchema(featureType);
|
dataStore.createSchema(featureType);
|
||||||
|
|||||||
@@ -1,9 +1,12 @@
|
|||||||
package com.kamco.makesample.batch.writer;
|
package com.kamco.makesample.batch.writer;
|
||||||
|
|
||||||
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.StringWriter;
|
import java.io.OutputStreamWriter;
|
||||||
|
import java.io.Writer;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
@@ -31,8 +34,8 @@ import org.springframework.stereotype.Component;
|
|||||||
* <p>메모리 효과:
|
* <p>메모리 효과:
|
||||||
*
|
*
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>기존: 전체 데이터를 DefaultFeatureCollection에 누적
|
* <li>기존: StringWriter 생성 → String 변환 → getBytes() → 시스템콜 → GC 압박 누적
|
||||||
* <li>신규: chunk 단위로 GeoJSON 스트림에 append
|
* <li>신규: BufferedOutputStream(64KB) + OutputStreamWriter 직접 쓰기 → 중간 객체 없음
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
@@ -41,20 +44,22 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(StreamingGeoJsonWriter.class);
|
private static final Logger log = LoggerFactory.getLogger(StreamingGeoJsonWriter.class);
|
||||||
|
|
||||||
|
private static final int BUFFER_SIZE = 64 * 1024; // 64KB 버퍼: 시스템콜 횟수 대폭 감소
|
||||||
|
|
||||||
@Value("#{jobParameters['geoJsonOutputPath']}")
|
@Value("#{jobParameters['geoJsonOutputPath']}")
|
||||||
private String outputPath;
|
private String outputPath;
|
||||||
|
|
||||||
private FileOutputStream outputStream;
|
private BufferedOutputStream bufferedOut;
|
||||||
|
private Writer writer; // OutputStreamWriter → BufferedOutputStream → FileOutputStream
|
||||||
private FeatureJSON featureJSON;
|
private FeatureJSON featureJSON;
|
||||||
|
|
||||||
private int chunkCount = 0;
|
private int chunkCount = 0;
|
||||||
private int totalRecordCount = 0;
|
private int totalRecordCount = 0;
|
||||||
|
|
||||||
private boolean isFirstChunk = true;
|
private boolean isFirstFeature = true;
|
||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
public void beforeStep() {
|
public void beforeStep() {
|
||||||
// 출력 디렉토리 생성
|
|
||||||
try {
|
try {
|
||||||
Path outputDir = Paths.get(outputPath).getParent();
|
Path outputDir = Paths.get(outputPath).getParent();
|
||||||
if (outputDir != null && !Files.exists(outputDir)) {
|
if (outputDir != null && !Files.exists(outputDir)) {
|
||||||
@@ -72,11 +77,12 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
File geoJsonFile = new File(outputPath);
|
File geoJsonFile = new File(outputPath);
|
||||||
outputStream = new FileOutputStream(geoJsonFile);
|
bufferedOut = new BufferedOutputStream(new FileOutputStream(geoJsonFile), BUFFER_SIZE);
|
||||||
|
writer = new OutputStreamWriter(bufferedOut, StandardCharsets.UTF_8);
|
||||||
featureJSON = new FeatureJSON();
|
featureJSON = new FeatureJSON();
|
||||||
|
|
||||||
// GeoJSON FeatureCollection 시작
|
// GeoJSON FeatureCollection 시작
|
||||||
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
|
writer.write("{\"type\":\"FeatureCollection\",\"features\":[");
|
||||||
|
|
||||||
log.info("GeoJSON file initialized successfully");
|
log.info("GeoJSON file initialized successfully");
|
||||||
|
|
||||||
@@ -97,28 +103,23 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
totalRecordCount += itemCount;
|
totalRecordCount += itemCount;
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
"Writing chunk #{} to GeoJSON with {} features (total so far: {})",
|
"|₩Writing chunk #{} to GeoJSON with {} features (total so far: {})",
|
||||||
chunkCount,
|
chunkCount,
|
||||||
itemCount,
|
itemCount,
|
||||||
totalRecordCount);
|
totalRecordCount);
|
||||||
|
|
||||||
// 각 feature를 GeoJSON으로 변환하여 append
|
// 각 feature를 GeoJSON으로 직렬화 → OutputStreamWriter 직접 쓰기 (중간 String 객체 없음)
|
||||||
for (int i = 0; i < items.size(); i++) {
|
for (SimpleFeature feature : items) {
|
||||||
SimpleFeature feature = items.get(i);
|
if (!isFirstFeature) {
|
||||||
|
writer.write(",");
|
||||||
// 첫 번째 feature가 아니면 콤마 추가
|
|
||||||
if (!isFirstChunk || i > 0) {
|
|
||||||
outputStream.write(",".getBytes());
|
|
||||||
}
|
}
|
||||||
|
// featureJSON이 Writer에 직접 쓰므로 StringWriter + toString() + getBytes() 불필요
|
||||||
// Feature를 GeoJSON으로 직렬화
|
featureJSON.writeFeature(feature, writer);
|
||||||
// StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
|
isFirstFeature = false;
|
||||||
StringWriter stringWriter = new StringWriter();
|
|
||||||
featureJSON.writeFeature(feature, stringWriter);
|
|
||||||
outputStream.write(stringWriter.toString().getBytes());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
isFirstChunk = false;
|
// 청크 완료 후 64KB 버퍼를 OS로 플러시
|
||||||
|
writer.flush();
|
||||||
|
|
||||||
log.debug("Chunk #{} written to GeoJSON successfully", chunkCount);
|
log.debug("Chunk #{} written to GeoJSON successfully", chunkCount);
|
||||||
}
|
}
|
||||||
@@ -131,10 +132,9 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
chunkCount);
|
chunkCount);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (outputStream != null) {
|
if (writer != null) {
|
||||||
// GeoJSON FeatureCollection 종료
|
writer.write("]}");
|
||||||
outputStream.write("]}".getBytes());
|
writer.flush();
|
||||||
outputStream.flush();
|
|
||||||
log.info("GeoJSON file finalized successfully");
|
log.info("GeoJSON file finalized successfully");
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@@ -157,7 +157,6 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
cleanup();
|
cleanup();
|
||||||
|
|
||||||
// 부분 파일 삭제
|
|
||||||
try {
|
try {
|
||||||
File geoJsonFile = new File(outputPath);
|
File geoJsonFile = new File(outputPath);
|
||||||
if (geoJsonFile.exists()) {
|
if (geoJsonFile.exists()) {
|
||||||
@@ -170,19 +169,19 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void cleanup() {
|
private void cleanup() {
|
||||||
if (outputStream != null) {
|
if (writer != null) {
|
||||||
try {
|
try {
|
||||||
outputStream.close();
|
writer.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.warn("Failed to close GeoJSON output stream", e);
|
log.warn("Failed to close GeoJSON writer", e);
|
||||||
}
|
}
|
||||||
outputStream = null;
|
writer = null;
|
||||||
|
bufferedOut = null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void update(ExecutionContext executionContext) throws ItemStreamException {
|
public void update(ExecutionContext executionContext) throws ItemStreamException {
|
||||||
// Checkpoint
|
|
||||||
executionContext.putInt("chunkCount", chunkCount);
|
executionContext.putInt("chunkCount", chunkCount);
|
||||||
executionContext.putInt("totalRecordCount", totalRecordCount);
|
executionContext.putInt("totalRecordCount", totalRecordCount);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -136,7 +136,7 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
|
|
||||||
Map<String, Serializable> params = new HashMap<>();
|
Map<String, Serializable> params = new HashMap<>();
|
||||||
params.put("url", shpFile.toURI().toURL());
|
params.put("url", shpFile.toURI().toURL());
|
||||||
params.put("create spatial index", Boolean.TRUE);
|
params.put("create spatial index", Boolean.FALSE); // 4.7M 건 QIX 인덱스 in-memory 빌드 → OOM 방지
|
||||||
|
|
||||||
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
|
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
|
||||||
dataStore.createSchema(featureType);
|
dataStore.createSchema(featureType);
|
||||||
@@ -178,27 +178,27 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
// ListFeatureCollection으로 변환 (청크만 담김)
|
// ListFeatureCollection으로 변환 (청크만 담김)
|
||||||
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
|
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
|
||||||
|
|
||||||
// FeatureStore에 추가 (트랜잭션은 열린 상태 유지)
|
// FeatureStore에 추가 - GeoTools ShapefileDataStore는 Diff 없이 파일에 직접 씀
|
||||||
|
// 트랜잭션은 afterStep()에서 단일 커밋 (per-chunk 커밋 시 setTransaction()이 .shx 재스캔 → O(n²))
|
||||||
featureStore.addFeatures(collection);
|
featureStore.addFeatures(collection);
|
||||||
|
|
||||||
// 청크 완료 후 collection은 GC됨
|
|
||||||
log.debug("Chunk #{} written successfully", chunkCount);
|
log.debug("Chunk #{} written successfully", chunkCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterStep
|
@AfterStep
|
||||||
public void afterStep() {
|
public void afterStep() {
|
||||||
log.info(
|
log.info(
|
||||||
"All chunks written. Committing transaction for {} records in {} chunks",
|
"All chunks written ({} records in {} chunks). Finalizing shapefile.",
|
||||||
totalRecordCount,
|
totalRecordCount,
|
||||||
chunkCount);
|
chunkCount);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (transaction != null) {
|
if (transaction != null) {
|
||||||
transaction.commit();
|
transaction.commit();
|
||||||
log.info("Transaction committed successfully");
|
log.info("Final transaction committed successfully");
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Failed to commit transaction", e);
|
log.error("Failed to commit final transaction", e);
|
||||||
throw new ItemStreamException("Failed to commit shapefile transaction", e);
|
throw new ItemStreamException("Failed to commit shapefile transaction", e);
|
||||||
} finally {
|
} finally {
|
||||||
cleanup();
|
cleanup();
|
||||||
|
|||||||
Reference in New Issue
Block a user