4 Commits

Author SHA1 Message Date
dean
4fcc645f63 oom처리 2026-03-26 14:54:24 +09:00
dean
df00af784e make shapefile 2026-03-20 13:33:36 +09:00
dean
17ca27087a docker 2026-03-20 13:31:16 +09:00
dean
efbdbfd632 make shapefile 2026-03-20 08:27:24 +09:00
14 changed files with 327 additions and 139 deletions

98
shp-exporter/DOCKER.md Normal file
View File

@@ -0,0 +1,98 @@
# Docker 빌드 및 실행 가이드
## 추가된 파일
| 파일 | 설명 |
|------|------|
| `Dockerfile` | 멀티스테이지 빌드 정의 |
| `.dockerignore` | Docker 빌드 컨텍스트 제외 목록 |
---
## Dockerfile 구조
### Stage 1 — Builder (`eclipse-temurin:21-jdk-jammy`)
1. Gradle wrapper 및 `build.gradle` 복사 → 의존성 레이어 캐싱
2. 소스 코드 복사 후 `./gradlew clean bootJar -x test` 실행
3. 출력: `build/libs/shp-exporter-v2.jar`
### Stage 2 — Runtime (`eclipse-temurin:21-jre-jammy`)
- JDK 없이 JRE만 포함 → 이미지 크기 최소화
- 출력 디렉토리 `/data/model_output/export` 사전 생성
- JVM 옵션 포함 (`-Xmx4g`, G1GC 튜닝 등)
---
## 빌드
```bash
docker build -t shp-exporter .
```
---
## 실행
### Spring Batch 모드 (권장)
```bash
docker run --rm \
-v /data/model_output/export:/data/model_output/export \
-e GEOSERVER_USERNAME=admin \
-e GEOSERVER_PASSWORD=geoserver \
shp-exporter \
--batch \
--converter.batch-ids[0]=252
```
### GeoServer 등록 포함
```bash
docker run --rm \
-v /data/model_output/export:/data/model_output/export \
-e GEOSERVER_USERNAME=admin \
-e GEOSERVER_PASSWORD=geoserver \
shp-exporter \
--batch \
--geoserver.enabled=true \
--converter.inference-id=D5E46F60FC40B1A8BE0CD1F3547AA6 \
--converter.batch-ids[0]=252 \
--converter.batch-ids[1]=253
```
### Shapefile 업로드 모드
```bash
docker run --rm \
-v /data/model_output/export:/data/model_output/export \
-e GEOSERVER_USERNAME=admin \
-e GEOSERVER_PASSWORD=geoserver \
shp-exporter \
--upload-shp /data/model_output/export/path/to/file.shp \
--layer layer_name
```
### DB 주소 오버라이드
`application-prod.yml`의 DB 호스트(`kamco-cd-postgis`)가 네트워크 환경과 다를 경우:
```bash
docker run --rm \
--network your-network \
-v /data/model_output/export:/data/model_output/export \
shp-exporter \
--batch \
--spring.datasource.url=jdbc:postgresql://HOST:5432/kamco_cds \
--converter.batch-ids[0]=252
```
---
## 주의사항
- **볼륨 마운트 필수**: 출력 파일(`/data/model_output/export`)은 호스트 볼륨으로 마운트하지 않으면 컨테이너 종료 시 소실됨
- **네트워크**: DB 호스트명 `kamco-cd-postgis`가 컨테이너에서 resolve되어야 함 (`--network` 또는 `--add-host` 활용)
- **GeoServer 크리덴셜**: `GEOSERVER_USERNAME` / `GEOSERVER_PASSWORD` 환경변수로 주입 (`application-prod.yml`에 하드코딩된 값은 덮어씌워짐)
- **`.dockerignore``.gitignore`에 등록**되어 있어 git에 커밋되지 않음 — 필요 시 `.gitignore`에서 해당 라인 제거

38
shp-exporter/Dockerfile Normal file
View File

@@ -0,0 +1,38 @@
# ---- Build Stage ----
FROM eclipse-temurin:21-jdk-jammy AS builder
WORKDIR /workspace
# Gradle wrapper 및 빌드 설정 복사 (캐시 레이어 최적화)
COPY gradlew settings.gradle build.gradle ./
COPY gradle ./gradle
# 의존성 다운로드 (소스 변경 시 재빌드 방지)
RUN chmod +x gradlew && ./gradlew dependencies --no-daemon -q || true
# 소스 복사 후 빌드 (테스트 제외)
COPY src ./src
RUN ./gradlew clean bootJar -x test --no-daemon -q
# ---- Runtime Stage ----
FROM eclipse-temurin:21-jre-jammy
WORKDIR /app
# 출력 디렉토리 생성
RUN mkdir -p /data/model_output/export
# JAR 복사
COPY --from=builder /workspace/build/libs/shp-exporter-v2.jar app.jar
# GeoServer 크리덴셜은 환경변수로 주입
ENV GEOSERVER_USERNAME=""
ENV GEOSERVER_PASSWORD=""
ENTRYPOINT ["java", \
"-Xmx4g", "-Xms512m", \
"-XX:+UseG1GC", \
"-XX:MaxGCPauseMillis=200", \
"-XX:G1HeapRegionSize=16m", \
"-XX:+ParallelRefProcEnabled", \
"-jar", "app.jar"]

View File

@@ -14,27 +14,16 @@ java {
}
}
//repositories {
// mavenCentral()
// maven {
// url 'https://repo.osgeo.org/repository/release/'
// }
// maven {
// url 'https://repo.osgeo.org/repository/geotools-releases/'
// }
// maven {
// url 'https://repo.osgeo.org/repository/snapshot/'
// }
//}
def repoUrl = System.getProperty("org.gradle.project.repoUrl")
?: System.getenv("ORG_GRADLE_PROJECT_repoUrl")
?: "http://172.16.4.56:18100/repository/maven-public/"
repositories {
mavenCentral()
maven {
url = uri(repoUrl)
allowInsecureProtocol = true
url 'https://repo.osgeo.org/repository/release/'
}
maven {
url 'https://repo.osgeo.org/repository/geotools-releases/'
}
maven {
url 'https://repo.osgeo.org/repository/snapshot/'
}
}
@@ -98,3 +87,10 @@ spotless {
tasks.named('test') {
useJUnitPlatform()
}
bootRun {
jvmArgs = ['-Xmx4g', '-Xms512m', '-XX:+UseG1GC',
'-XX:MaxGCPauseMillis=200', // GC 목표 멈춤 시간
'-XX:G1HeapRegionSize=16m', // 큰 geometry 객체 처리
'-XX:+ParallelRefProcEnabled'] // 참조 처리 병렬화
}

View File

@@ -1,7 +1,7 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
#distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
distributionUrl=http\://172.16.4.56:18100/repository/gradle-distributions/gradle-8.14.3-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.14.3-bin.zip
#distributionUrl=http\://172.16.4.56:18100/repository/gradle-distributions/gradle-8.14.3-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME

View File

@@ -1,6 +1,9 @@
package com.kamco.makesample.batch.config;
import com.kamco.makesample.config.ConverterProperties;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.referencing.CRS;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -38,6 +41,18 @@ public class BatchConfiguration {
* <p>추가 설정이 필요하면 여기에 Bean을 정의
*/
/**
* 애플리케이션 전체에서 재사용되는 CoordinateReferenceSystem 싱글톤 빈
*
* <p>CRS.decode()는 GeoTools HSQL EPSG 팩토리를 초기화하므로 비용이 큼. @StepScope 컴포넌트마다 호출되면 파티셔닝 단계에서 CRS 객체가
* 누적되어 OOM을 유발합니다. 싱글톤으로 캐싱하여 한 번만 초기화합니다.
*/
@Bean
public CoordinateReferenceSystem coordinateReferenceSystem() throws FactoryException {
String crsCode = properties.getCrs() != null ? properties.getCrs() : "EPSG:5186";
return CRS.decode(crsCode);
}
/**
* Map ID별 파일 생성을 위한 TaskExecutor
*

View File

@@ -4,10 +4,8 @@ import com.kamco.makesample.batch.util.FeatureTypeFactory;
import com.kamco.makesample.model.InferenceResult;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.referencing.CRS;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -16,7 +14,6 @@ import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
@@ -39,15 +36,15 @@ public class FeatureConversionProcessor implements ItemProcessor<InferenceResult
private static final Logger log = LoggerFactory.getLogger(FeatureConversionProcessor.class);
private final FeatureTypeFactory featureTypeFactory;
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
private String crsCode;
private final CoordinateReferenceSystem crs;
private SimpleFeatureBuilder featureBuilder;
private SimpleFeatureType featureType;
public FeatureConversionProcessor(FeatureTypeFactory featureTypeFactory) {
public FeatureConversionProcessor(
FeatureTypeFactory featureTypeFactory, CoordinateReferenceSystem coordinateReferenceSystem) {
this.featureTypeFactory = featureTypeFactory;
this.crs = coordinateReferenceSystem;
}
@BeforeStep
@@ -66,21 +63,13 @@ public class FeatureConversionProcessor implements ItemProcessor<InferenceResult
geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
try {
// CRS 설정
CoordinateReferenceSystem crs = CRS.decode(crsCode);
// FeatureType 생성 (주입된 싱글톤 CRS 재사용 - CRS.decode() 반복 호출 방지)
this.featureType = featureTypeFactory.createFeatureType(crs, geometryType);
this.featureBuilder = new SimpleFeatureBuilder(this.featureType);
// FeatureType 생성
this.featureType = featureTypeFactory.createFeatureType(crs, geometryType);
this.featureBuilder = new SimpleFeatureBuilder(this.featureType);
log.info(
"FeatureConversionProcessor initialized with geometry type: {}",
geometryType.getSimpleName());
} catch (FactoryException e) {
throw new RuntimeException("Failed to initialize FeatureConversionProcessor", e);
}
log.info(
"FeatureConversionProcessor initialized with geometry type: {}",
geometryType.getSimpleName());
}
@Override

View File

@@ -172,6 +172,14 @@ public class GeometryTypeValidationTasklet implements Tasklet {
}
log.info("========================================");
// 진행률 표시를 위해 유효 건수를 JobExecutionContext에 저장
chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putLong("totalValidRecords", validRows);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 3. Mixed geometry type 체크 및 자동 변환 안내
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

View File

@@ -1,9 +1,9 @@
package com.kamco.makesample.batch.writer;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -46,6 +46,7 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
private FileOutputStream outputStream;
private FeatureJSON featureJSON;
private ByteArrayOutputStream buffer;
private int chunkCount = 0;
private int totalRecordCount = 0;
@@ -91,6 +92,7 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
File geoJsonFile = new File(outputPath);
outputStream = new FileOutputStream(geoJsonFile);
featureJSON = new FeatureJSON();
buffer = new ByteArrayOutputStream(8192);
// GeoJSON FeatureCollection 시작
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
@@ -129,11 +131,11 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
outputStream.write(",".getBytes());
}
// Feature를 GeoJSON으로 직렬화
// StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
StringWriter stringWriter = new StringWriter();
featureJSON.writeFeature(feature, stringWriter);
outputStream.write(stringWriter.toString().getBytes());
// Feature를 GeoJSON으로 직렬화 (ByteArrayOutputStream 재사용으로 String 객체 생성 방지)
// ByteArrayOutputStream.close()는 no-op이므로 FeatureJSON이 닫아도 안전
buffer.reset();
featureJSON.writeFeature(feature, buffer);
buffer.writeTo(outputStream);
}
isFirstChunk = false;

View File

@@ -14,13 +14,11 @@ import org.geotools.api.data.SimpleFeatureStore;
import org.geotools.api.data.Transaction;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.data.DefaultTransaction;
import org.geotools.data.collection.ListFeatureCollection;
import org.geotools.data.shapefile.ShapefileDataStore;
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
import org.geotools.referencing.CRS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
@@ -48,11 +46,10 @@ import org.springframework.stereotype.Component;
public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(MapIdShapefileWriter.class);
private static final int LOG_INTERVAL_CHUNKS = 5; // 5청크(=5,000건)마다 진행 로그
private final FeatureTypeFactory featureTypeFactory;
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
private String crsCode;
private final CoordinateReferenceSystem crs;
@Value("#{stepExecutionContext['mapId']}")
private String mapId;
@@ -70,11 +67,14 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
private int chunkCount = 0;
private int totalRecordCount = 0;
private long startTimeMs = 0;
private Class<?> geometryType;
public MapIdShapefileWriter(FeatureTypeFactory featureTypeFactory) {
public MapIdShapefileWriter(
FeatureTypeFactory featureTypeFactory, CoordinateReferenceSystem coordinateReferenceSystem) {
this.featureTypeFactory = featureTypeFactory;
this.crs = coordinateReferenceSystem;
}
@BeforeStep
@@ -143,10 +143,7 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
geometryType.getSimpleName(),
geometryTypeStr);
// CRS 설정
CoordinateReferenceSystem crs = CRS.decode(crsCode);
// SimpleFeatureType 생성
// SimpleFeatureType 생성 (주입된 싱글톤 CRS 재사용 - CRS.decode() 반복 호출 방지)
featureType = featureTypeFactory.createFeatureType(crs, geometryType);
// ShapefileDataStore 생성
@@ -155,7 +152,7 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
Map<String, Serializable> params = new HashMap<>();
params.put("url", shpFile.toURI().toURL());
params.put("create spatial index", Boolean.TRUE);
params.put("create spatial index", Boolean.FALSE);
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
dataStore.createSchema(featureType);
@@ -168,13 +165,12 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
featureStore.setTransaction(transaction);
startTimeMs = System.currentTimeMillis();
log.info(
"ShapefileDataStore initialized for map_id: {} with geometry type: {}",
mapId,
geometryType.getSimpleName());
} catch (FactoryException e) {
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
} catch (IOException e) {
throw new ItemStreamException("Failed to create shapefile for map_id " + mapId, e);
}
@@ -191,27 +187,36 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"[map_id: {}] Writing chunk #{} with {} features (total: {})",
mapId,
chunkCount,
itemCount,
totalRecordCount);
// ListFeatureCollection으로 변환하여 FeatureStore에 추가
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
featureStore.addFeatures(collection);
log.debug("[map_id: {}] Chunk #{} written successfully", mapId, chunkCount);
if (chunkCount % LOG_INTERVAL_CHUNKS == 0) {
long elapsedSec = Math.max((System.currentTimeMillis() - startTimeMs) / 1000, 1);
long throughput = totalRecordCount / elapsedSec;
log.info(
"[map_id: {}] 진행: {}건 | 경과: {} | 속도: {}건/s",
mapId,
String.format("%,d", totalRecordCount),
formatDuration(elapsedSec),
String.format("%,d", throughput));
}
}
@AfterStep
public void afterStep() {
long elapsedSec = Math.max((System.currentTimeMillis() - startTimeMs) / 1000, 1);
long throughput = totalRecordCount / elapsedSec;
log.info(
"[map_id: {}] AfterStep called. Total {} records in {} chunks",
"[map_id: {}] 완료: {}건 처리 | 소요: {} | 평균 속도: {}건/s",
mapId,
totalRecordCount,
chunkCount);
String.format("%,d", totalRecordCount),
formatDuration(elapsedSec),
String.format("%,d", throughput));
}
private String formatDuration(long seconds) {
if (seconds < 60) return seconds + "";
return (seconds / 60) + "" + (seconds % 60) + "";
}
@Override

View File

@@ -1,9 +1,12 @@
package com.kamco.makesample.batch.writer;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -31,8 +34,8 @@ import org.springframework.stereotype.Component;
* <p>메모리 효과:
*
* <ul>
* <li>기존: 전체 데이터를 DefaultFeatureCollection에 누적
* <li>신규: chunk 단위로 GeoJSON 스트림에 append
* <li>기존: StringWriter 생성 → String 변환 → getBytes() → 시스템콜 → GC 압박 누적
* <li>신규: BufferedOutputStream(64KB) + OutputStreamWriter 직접 쓰기 → 중간 객체 없음
* </ul>
*/
@Component
@@ -41,20 +44,22 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(StreamingGeoJsonWriter.class);
private static final int BUFFER_SIZE = 64 * 1024; // 64KB 버퍼: 시스템콜 횟수 대폭 감소
@Value("#{jobParameters['geoJsonOutputPath']}")
private String outputPath;
private FileOutputStream outputStream;
private BufferedOutputStream bufferedOut;
private Writer writer; // OutputStreamWriter → BufferedOutputStream → FileOutputStream
private FeatureJSON featureJSON;
private int chunkCount = 0;
private int totalRecordCount = 0;
private boolean isFirstChunk = true;
private boolean isFirstFeature = true;
@BeforeStep
public void beforeStep() {
// 출력 디렉토리 생성
try {
Path outputDir = Paths.get(outputPath).getParent();
if (outputDir != null && !Files.exists(outputDir)) {
@@ -72,11 +77,12 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
try {
File geoJsonFile = new File(outputPath);
outputStream = new FileOutputStream(geoJsonFile);
bufferedOut = new BufferedOutputStream(new FileOutputStream(geoJsonFile), BUFFER_SIZE);
writer = new OutputStreamWriter(bufferedOut, StandardCharsets.UTF_8);
featureJSON = new FeatureJSON();
// GeoJSON FeatureCollection 시작
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
writer.write("{\"type\":\"FeatureCollection\",\"features\":[");
log.info("GeoJSON file initialized successfully");
@@ -97,28 +103,23 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
totalRecordCount += itemCount;
log.debug(
"Writing chunk #{} to GeoJSON with {} features (total so far: {})",
"|₩Writing chunk #{} to GeoJSON with {} features (total so far: {})",
chunkCount,
itemCount,
totalRecordCount);
// 각 feature를 GeoJSON으로 변환하여 append
for (int i = 0; i < items.size(); i++) {
SimpleFeature feature = items.get(i);
// 첫 번째 feature가 아니면 콤마 추가
if (!isFirstChunk || i > 0) {
outputStream.write(",".getBytes());
// 각 feature를 GeoJSON으로 직렬화 → OutputStreamWriter 직접 쓰기 (중간 String 객체 없음)
for (SimpleFeature feature : items) {
if (!isFirstFeature) {
writer.write(",");
}
// Feature를 GeoJSON으로 직렬화
// StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
StringWriter stringWriter = new StringWriter();
featureJSON.writeFeature(feature, stringWriter);
outputStream.write(stringWriter.toString().getBytes());
// featureJSON이 Writer에 직접 쓰므로 StringWriter + toString() + getBytes() 불필요
featureJSON.writeFeature(feature, writer);
isFirstFeature = false;
}
isFirstChunk = false;
// 청크 완료 후 64KB 버퍼를 OS로 플러시
writer.flush();
log.debug("Chunk #{} written to GeoJSON successfully", chunkCount);
}
@@ -131,10 +132,9 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
chunkCount);
try {
if (outputStream != null) {
// GeoJSON FeatureCollection 종료
outputStream.write("]}".getBytes());
outputStream.flush();
if (writer != null) {
writer.write("]}");
writer.flush();
log.info("GeoJSON file finalized successfully");
}
} catch (IOException e) {
@@ -157,7 +157,6 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
cleanup();
// 부분 파일 삭제
try {
File geoJsonFile = new File(outputPath);
if (geoJsonFile.exists()) {
@@ -170,19 +169,19 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
}
private void cleanup() {
if (outputStream != null) {
if (writer != null) {
try {
outputStream.close();
writer.close();
} catch (IOException e) {
log.warn("Failed to close GeoJSON output stream", e);
log.warn("Failed to close GeoJSON writer", e);
}
outputStream = null;
writer = null;
bufferedOut = null;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// Checkpoint
executionContext.putInt("chunkCount", chunkCount);
executionContext.putInt("totalRecordCount", totalRecordCount);
}

View File

@@ -14,13 +14,11 @@ import org.geotools.api.data.SimpleFeatureStore;
import org.geotools.api.data.Transaction;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.data.DefaultTransaction;
import org.geotools.data.collection.ListFeatureCollection;
import org.geotools.data.shapefile.ShapefileDataStore;
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
import org.geotools.referencing.CRS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
@@ -62,15 +60,14 @@ import org.springframework.stereotype.Component;
public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(StreamingShapefileWriter.class);
private static final int LOG_INTERVAL_CHUNKS = 10; // 10청크(=10,000건)마다 진행 로그
private final FeatureTypeFactory featureTypeFactory;
private final CoordinateReferenceSystem crs;
@Value("#{jobParameters['outputPath']}")
private String outputPath;
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
private String crsCode;
private ShapefileDataStore dataStore;
private Transaction transaction;
private SimpleFeatureStore featureStore;
@@ -78,11 +75,15 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
private int chunkCount = 0;
private int totalRecordCount = 0;
private long totalExpected = 0; // Validation Tasklet에서 조회한 전체 건수
private long startTimeMs = 0;
private Class<?> geometryType; // Geometry type from validation tasklet
public StreamingShapefileWriter(FeatureTypeFactory featureTypeFactory) {
public StreamingShapefileWriter(
FeatureTypeFactory featureTypeFactory, CoordinateReferenceSystem coordinateReferenceSystem) {
this.featureTypeFactory = featureTypeFactory;
this.crs = coordinateReferenceSystem;
}
@BeforeStep
@@ -101,6 +102,12 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
"geometryType not set in Job ExecutionContext (empty dataset). Will use default in open()");
}
// 진행률 표시용 전체 건수 읽기
if (jobExecutionContext.containsKey("totalValidRecords")) {
totalExpected = jobExecutionContext.getLong("totalValidRecords");
log.info("[Shapefile] 처리 예정 건수: {}건", String.format("%,d", totalExpected));
}
// 출력 디렉토리 생성
try {
Path outputDir = Paths.get(outputPath).getParent();
@@ -118,9 +125,6 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
log.info("Opening StreamingShapefileWriter for: {}", outputPath);
try {
// CRS 설정
CoordinateReferenceSystem crs = CRS.decode(crsCode);
// Geometry type이 아직 설정되지 않은 경우 기본값 사용
if (geometryType == null) {
geometryType = featureTypeFactory.parseGeometryType(null);
@@ -136,7 +140,7 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
Map<String, Serializable> params = new HashMap<>();
params.put("url", shpFile.toURI().toURL());
params.put("create spatial index", Boolean.TRUE);
params.put("create spatial index", Boolean.FALSE); // 4.7M 건 QIX 인덱스 in-memory 빌드 → OOM 방지
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
dataStore.createSchema(featureType);
@@ -149,10 +153,9 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
featureStore.setTransaction(transaction);
startTimeMs = System.currentTimeMillis();
log.info("ShapefileDataStore initialized successfully");
} catch (FactoryException e) {
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
} catch (IOException e) {
throw new ItemStreamException("Failed to create shapefile at: " + outputPath, e);
}
@@ -169,36 +172,31 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"Writing chunk #{} with {} features (total so far: {})",
chunkCount,
itemCount,
totalRecordCount);
// ListFeatureCollection으로 변환 (청크만 담김)
// FeatureStore에 추가 - GeoTools ShapefileDataStore는 Diff 없이 파일에 직접 씀
// 트랜잭션은 afterStep()에서 단일 커밋 (per-chunk 커밋 시 setTransaction()이 .shx 재스캔 → O(n²))
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
// FeatureStore에 추가 (트랜잭션은 열린 상태 유지)
featureStore.addFeatures(collection);
// 청크 완료 후 collection은 GC됨
log.debug("Chunk #{} written successfully", chunkCount);
if (chunkCount % LOG_INTERVAL_CHUNKS == 0) {
logProgress();
}
}
@AfterStep
public void afterStep() {
logProgress(); // 완료 시 최종 진행 상황 출력
log.info(
"All chunks written. Committing transaction for {} records in {} chunks",
totalRecordCount,
"[Shapefile] 완료: {}건 처리 ({}개 청크). 파일 저장 중...",
String.format("%,d", totalRecordCount),
chunkCount);
try {
if (transaction != null) {
transaction.commit();
log.info("Transaction committed successfully");
log.info("Final transaction committed successfully");
}
} catch (IOException e) {
log.error("Failed to commit transaction", e);
log.error("Failed to commit final transaction", e);
throw new ItemStreamException("Failed to commit shapefile transaction", e);
} finally {
cleanup();
@@ -234,6 +232,37 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
}
}
private void logProgress() {
long elapsedMs = System.currentTimeMillis() - startTimeMs;
long elapsedSec = Math.max(elapsedMs / 1000, 1);
long throughput = totalRecordCount / elapsedSec;
if (totalExpected > 0) {
double pct = totalRecordCount * 100.0 / totalExpected;
long remaining = totalRecordCount < totalExpected ? totalExpected - totalRecordCount : 0;
long etaSec = throughput > 0 ? remaining / throughput : 0;
log.info(
"[Shapefile] 진행: {} / {}건 ({}) | 경과: {} | 속도: {}건/s | 예상 남은: {}",
String.format("%,d", totalRecordCount),
String.format("%,d", totalExpected),
String.format("%.1f%%", pct),
formatDuration(elapsedSec),
String.format("%,d", throughput),
formatDuration(etaSec));
} else {
log.info(
"[Shapefile] 진행: {}건 처리 중 | 경과: {} | 속도: {}건/s",
String.format("%,d", totalRecordCount),
formatDuration(elapsedSec),
String.format("%,d", throughput));
}
}
private String formatDuration(long seconds) {
if (seconds < 60) return seconds + "";
return (seconds / 60) + "" + (seconds % 60) + "";
}
private void cleanup() {
if (transaction != null) {
try {

View File

@@ -29,9 +29,9 @@ converter:
crs: 'EPSG:5186'
batch:
chunk-size: 5000 # 청크 크기 증가 (1000 → 5000, 성능 5배 향상)
chunk-size: 1000 # 청크 크기 증가 (1000 → 5000, 성능 5배 향상)
skip-limit: 100 # 청크당 skip 허용 건수
fetch-size: 5000 # JDBC 커서 fetch 크기 (chunk-size와 동일하게)
fetch-size: 1000 # JDBC 커서 fetch 크기 (chunk-size와 동일하게)
enable-partitioning: false
geoserver:

View File

@@ -1,6 +1,6 @@
spring:
datasource:
url: jdbc:postgresql://kamco-cd-postgis:5432/kamco_cds
url: jdbc:postgresql://172.16.4.56:15432/kamco_cds
username: kamco_cds
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
@@ -29,9 +29,9 @@ converter:
crs: 'EPSG:5186'
batch:
chunk-size: 5000 # 청크 크기 (1000→5000, 성능 5배 향상, 메모리 ~200MB per chunk)
chunk-size: 1000 # 4.7M 건 OOM 방지: 청크별 트랜잭션 커밋으로 GeoTools 메모리 해제
skip-limit: 100 # 청크당 skip 허용 건수
fetch-size: 5000 # JDBC 커서 fetch 크기 (chunk-size와 동일하게)
fetch-size: 1000 # JDBC 커서 fetch 크기 (chunk-size와 동일하게)
enable-partitioning: false # 초기에는 비활성화
partition-concurrency: 4 # Map ID별 병렬 처리 동시성 (4=~300MB, 8=~600MB)
@@ -46,7 +46,7 @@ geoserver:
logging:
level:
com.kamco.makesample: DEBUG
com.kamco.makesample: INFO # DEBUG → INFO: 4.7M 건 write 중 불필요한 로그 I/O 제거
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'

View File

@@ -0,0 +1,9 @@
java -Xmx4g -Xms512m -XX:+UseG1GC \
-XX:MaxGCPauseMillis=200 -XX:G1HeapRegionSize=16m \
-jar build/libs/shp-exporter.jar \
--batch \
--converter.inference-id=test009 \
--converter.batch-ids[0]=111 \
--converter.batch-ids[1]=114 \
--converter.batch-ids[2]=162 \
--geoserver.enabled=true