chunk(chunkSize, transactionManager)
+ .reader(shapefileReader)
+ .processor(featureConversionProcessor)
+ .writer(shapefileWriter)
+ .listener(historyListener)
+ .build();
+ }
+
+ /**
+ * Step 3: GeoJSON 생성 (Chunk-oriented)
+ *
+ * Shapefile과 동일한 데이터를 GeoJSON 형식으로 출력
+ *
+ * @param jobRepository JobRepository
+ * @param transactionManager TransactionManager
+ * @param geoJsonReader ItemReader (GeoJSON용 - 별도 인스턴스)
+ * @param featureConversionProcessor ItemProcessor (재사용)
+ * @param geoJsonWriter ItemWriter
+ * @param chunkSize Chunk size
+ * @param historyListener BatchExecutionHistoryListener
+ * @return Step
+ */
+ @Bean
+ public Step generateGeoJsonStep(
+ JobRepository jobRepository,
+ PlatformTransactionManager transactionManager,
+ JdbcCursorItemReader geoJsonReader,
+ FeatureConversionProcessor featureConversionProcessor,
+ StreamingGeoJsonWriter geoJsonWriter,
+ @Value("${converter.batch.chunk-size:1000}") int chunkSize,
+ BatchExecutionHistoryListener historyListener) {
+
+ return new StepBuilder("generateGeoJsonStep", jobRepository)
+ .chunk(chunkSize, transactionManager)
+ .reader(geoJsonReader)
+ .processor(featureConversionProcessor)
+ .writer(geoJsonWriter)
+ .listener(historyListener)
+ .build();
+ }
+
+ /**
+ * Step 4: ZIP 파일 생성
+ *
+ * @param jobRepository JobRepository
+ * @param transactionManager TransactionManager
+ * @param createZipTasklet CreateZipTasklet
+ * @param historyListener BatchExecutionHistoryListener
+ * @return Step
+ */
+ @Bean
+ public Step createZipStep(
+ JobRepository jobRepository,
+ PlatformTransactionManager transactionManager,
+ CreateZipTasklet createZipTasklet,
+ BatchExecutionHistoryListener historyListener) {
+
+ return new StepBuilder("createZipStep", jobRepository)
+ .tasklet(createZipTasklet, transactionManager)
+ .listener(historyListener)
+ .build();
+ }
+
+ /**
+ * Step 5: GeoServer 등록 (merge 폴더의 shapefile만)
+ *
+ * Conditional execution: geoserver.enabled=true일 때만 실행
+ *
+ * @param jobRepository JobRepository
+ * @param transactionManager TransactionManager
+ * @param registrationTasklet GeoServerRegistrationTasklet
+ * @param historyListener BatchExecutionHistoryListener
+ * @return Step
+ */
+ @Bean
+ public Step registerToGeoServerStep(
+ JobRepository jobRepository,
+ PlatformTransactionManager transactionManager,
+ GeoServerRegistrationTasklet registrationTasklet,
+ BatchExecutionHistoryListener historyListener) {
+
+ return new StepBuilder("registerToGeoServerStep", jobRepository)
+ .tasklet(registrationTasklet, transactionManager)
+ .listener(historyListener)
+ .build();
+ }
+
+ /**
+ * Step 6: Map ID별 개별 파일 생성 (Partitioned Step - Sequential)
+ *
+ *
각 map_id마다 개별 shapefile과 geojson 파일을 순차적으로 생성합니다. SyncTaskExecutor를 명시적으로 지정하여 병렬 실행을 방지하고
+ * DB connection pool 고갈 방지
+ *
+ * @param jobRepository JobRepository
+ * @param partitioner MapIdPartitioner
+ * @param mapIdWorkerStep Worker Step (각 파티션에서 실행)
+ * @return Partitioned Step
+ */
+ @Bean
+ public Step generateMapIdFilesStep(
+ JobRepository jobRepository, MapIdPartitioner partitioner, Step mapIdWorkerStep) {
+
+ return new StepBuilder("generateMapIdFilesStep", jobRepository)
+ .partitioner("mapIdWorker", partitioner)
+ .step(mapIdWorkerStep)
+ .taskExecutor(new SyncTaskExecutor()) // 명시적으로 순차 실행 지정
+ .listener(partitioner) // Register partitioner as StepExecutionListener
+ .build();
+ }
+
+ /**
+ * Worker Step: Map ID별 파일 생성 작업
+ *
+ *
각 파티션에서 실행되며, 해당 map_id의 데이터를 읽어 shapefile과 geojson을 동시에 생성합니다.
+ *
+ * @param jobRepository JobRepository
+ * @param transactionManager TransactionManager
+ * @param mapIdModeReader ItemReader (map_id별)
+ * @param featureConversionProcessor ItemProcessor
+ * @param mapIdShapefileWriter Shapefile Writer
+ * @param mapIdGeoJsonWriter GeoJSON Writer
+ * @param chunkSize Chunk size
+ * @param historyListener BatchExecutionHistoryListener
+ * @return Worker Step
+ */
+ @Bean
+ public Step mapIdWorkerStep(
+ JobRepository jobRepository,
+ PlatformTransactionManager transactionManager,
+ JdbcCursorItemReader mapIdModeReader,
+ FeatureConversionProcessor featureConversionProcessor,
+ MapIdShapefileWriter mapIdShapefileWriter,
+ MapIdGeoJsonWriter mapIdGeoJsonWriter,
+ @Value("${converter.batch.chunk-size:1000}") int chunkSize,
+ BatchExecutionHistoryListener historyListener) {
+
+ // CompositeItemWriter로 shapefile과 geojson 동시 생성
+ CompositeItemWriter compositeWriter = new CompositeItemWriter<>();
+ compositeWriter.setDelegates(Arrays.asList(mapIdShapefileWriter, mapIdGeoJsonWriter));
+
+ return new StepBuilder("mapIdWorkerStep", jobRepository)
+ .chunk(chunkSize, transactionManager)
+ .reader(mapIdModeReader)
+ .processor(featureConversionProcessor)
+ .writer(compositeWriter)
+ .stream(mapIdShapefileWriter)
+ .stream(mapIdGeoJsonWriter)
+ .listener(historyListener)
+ .build();
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/listener/BatchExecutionHistoryListener.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/listener/BatchExecutionHistoryListener.java
new file mode 100644
index 0000000..4d4083d
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/listener/BatchExecutionHistoryListener.java
@@ -0,0 +1,183 @@
+package com.kamco.makesample.batch.listener;
+
+import com.kamco.makesample.batch.model.BatchExecutionHistory;
+import com.kamco.makesample.batch.repository.BatchExecutionHistoryRepository;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.time.LocalDateTime;
+import java.util.List;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.StepExecutionListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * 배치 실행 이력 추적 Listener
+ *
+ * 각 스텝의 시작/종료 시점에 실행 이력을 데이터베이스에 저장
+ *
+ *
기록 항목:
+ *
+ *
+ * - 시작 시간, 종료 시간, 소요 시간
+ *
- 성공/실패 상태
+ *
- 에러 발생 시 에러 메시지 및 스택 트레이스
+ *
- 처리 통계 (read/write/commit/rollback/skip count)
+ *
+ */
+@Component
+public class BatchExecutionHistoryListener implements StepExecutionListener {
+
+ private static final Logger log = LoggerFactory.getLogger(BatchExecutionHistoryListener.class);
+
+ private final BatchExecutionHistoryRepository historyRepository;
+
+ // ThreadLocal로 각 스텝별 이력 ID 저장
+ private final ThreadLocal historyIdHolder = new ThreadLocal<>();
+
+ public BatchExecutionHistoryListener(BatchExecutionHistoryRepository historyRepository) {
+ this.historyRepository = historyRepository;
+ }
+
+ @Override
+ public void beforeStep(StepExecution stepExecution) {
+ try {
+ // 배치 실행 이력 생성
+ BatchExecutionHistory history = new BatchExecutionHistory();
+ history.setJobExecutionId(stepExecution.getJobExecutionId());
+ history.setStepExecutionId(stepExecution.getId());
+ history.setStepName(stepExecution.getStepName());
+ history.setStartTime(LocalDateTime.now());
+ history.setStatus("STARTED");
+
+ // Job Parameters에서 batch_ids, inference_id 추출
+ String batchIds = stepExecution.getJobParameters().getString("batchIds");
+ String inferenceId = stepExecution.getJobParameters().getString("inferenceId");
+ history.setBatchIds(batchIds);
+ history.setInferenceId(inferenceId);
+
+ // Step 타입 추정 (Tasklet vs Chunk)
+ String stepType = estimateStepType(stepExecution.getStepName());
+ history.setStepType(stepType);
+
+ // 이력 저장
+ Long historyId = historyRepository.insert(history);
+ historyIdHolder.set(historyId);
+
+ log.debug(
+ "Step execution history created: id={}, step={}, jobExecutionId={}",
+ historyId,
+ stepExecution.getStepName(),
+ stepExecution.getJobExecutionId());
+
+ } catch (Exception e) {
+ log.error("Failed to save step execution history on start: {}", e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public ExitStatus afterStep(StepExecution stepExecution) {
+ try {
+ Long historyId = historyIdHolder.get();
+ if (historyId == null) {
+ log.warn("No history ID found for step: {}", stepExecution.getStepName());
+ return stepExecution.getExitStatus();
+ }
+
+ // 종료 시간 및 상태
+ LocalDateTime endTime = LocalDateTime.now();
+ LocalDateTime startTime =
+ stepExecution.getStartTime() != null ? stepExecution.getStartTime() : LocalDateTime.now();
+
+ String status = stepExecution.getStatus().toString();
+ String exitCode = stepExecution.getExitStatus().getExitCode();
+ String exitMessage = stepExecution.getExitStatus().getExitDescription();
+
+ // 에러 정보 추출
+ String errorMessage = null;
+ String errorStackTrace = null;
+
+ List failureExceptions = stepExecution.getFailureExceptions();
+ if (!failureExceptions.isEmpty()) {
+ Throwable firstException = failureExceptions.get(0);
+ errorMessage = firstException.getMessage();
+ errorStackTrace = getStackTrace(firstException);
+ }
+
+ // 처리 통계 (Chunk 기반 스텝용)
+ Long readCount = (long) stepExecution.getReadCount();
+ Long writeCount = (long) stepExecution.getWriteCount();
+ Long commitCount = (long) stepExecution.getCommitCount();
+ Long rollbackCount = (long) stepExecution.getRollbackCount();
+ Long skipCount = (long) stepExecution.getSkipCount();
+
+ // 이력 업데이트
+ historyRepository.updateOnCompletion(
+ historyId,
+ endTime,
+ startTime,
+ status,
+ exitCode,
+ exitMessage,
+ errorMessage,
+ errorStackTrace,
+ readCount,
+ writeCount,
+ commitCount,
+ rollbackCount,
+ skipCount);
+
+ log.debug(
+ "Step execution history updated: id={}, step={}, status={}, duration={}ms",
+ historyId,
+ stepExecution.getStepName(),
+ status,
+ java.time.Duration.between(startTime, endTime).toMillis());
+
+ // ThreadLocal 정리
+ historyIdHolder.remove();
+
+ } catch (Exception e) {
+ log.error("Failed to update step execution history on completion: {}", e.getMessage(), e);
+ }
+
+ return stepExecution.getExitStatus();
+ }
+
+ /**
+ * Step 이름으로 Step 타입 추정
+ *
+ * @param stepName Step 이름
+ * @return TASKLET 또는 CHUNK
+ */
+ private String estimateStepType(String stepName) {
+ // Tasklet 스텝들
+ if (stepName.contains("validate")
+ || stepName.contains("Zip")
+ || stepName.contains("GeoServer")) {
+ return "TASKLET";
+ }
+ // Chunk 스텝들
+ if (stepName.contains("generate")
+ || stepName.contains("Shapefile")
+ || stepName.contains("GeoJson")) {
+ return "CHUNK";
+ }
+ return "UNKNOWN";
+ }
+
+ /**
+ * Exception을 스택 트레이스 문자열로 변환
+ *
+ * @param throwable Exception
+ * @return 스택 트레이스 문자열
+ */
+ private String getStackTrace(Throwable throwable) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ throwable.printStackTrace(pw);
+ return sw.toString();
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/model/BatchExecutionHistory.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/model/BatchExecutionHistory.java
new file mode 100644
index 0000000..4107cb7
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/model/BatchExecutionHistory.java
@@ -0,0 +1,212 @@
+package com.kamco.makesample.batch.model;
+
+import java.time.LocalDateTime;
+
+/**
+ * 배치 실행 이력 엔티티
+ *
+ * 각 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유를 추적
+ */
+public class BatchExecutionHistory {
+
+ private Long id;
+ private Long jobExecutionId;
+ private Long stepExecutionId;
+ private String stepName;
+ private String stepType;
+ private LocalDateTime startTime;
+ private LocalDateTime endTime;
+ private Long durationMs;
+ private String status;
+ private String exitCode;
+ private String exitMessage;
+ private String errorMessage;
+ private String errorStackTrace;
+ private Long readCount;
+ private Long writeCount;
+ private Long commitCount;
+ private Long rollbackCount;
+ private Long skipCount;
+ private String batchIds;
+ private String inferenceId;
+ private LocalDateTime createdAt;
+ private LocalDateTime updatedAt;
+
+ // Getters and Setters
+
+ public Long getId() {
+ return id;
+ }
+
+ public void setId(Long id) {
+ this.id = id;
+ }
+
+ public Long getJobExecutionId() {
+ return jobExecutionId;
+ }
+
+ public void setJobExecutionId(Long jobExecutionId) {
+ this.jobExecutionId = jobExecutionId;
+ }
+
+ public Long getStepExecutionId() {
+ return stepExecutionId;
+ }
+
+ public void setStepExecutionId(Long stepExecutionId) {
+ this.stepExecutionId = stepExecutionId;
+ }
+
+ public String getStepName() {
+ return stepName;
+ }
+
+ public void setStepName(String stepName) {
+ this.stepName = stepName;
+ }
+
+ public String getStepType() {
+ return stepType;
+ }
+
+ public void setStepType(String stepType) {
+ this.stepType = stepType;
+ }
+
+ public LocalDateTime getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(LocalDateTime startTime) {
+ this.startTime = startTime;
+ }
+
+ public LocalDateTime getEndTime() {
+ return endTime;
+ }
+
+ public void setEndTime(LocalDateTime endTime) {
+ this.endTime = endTime;
+ }
+
+ public Long getDurationMs() {
+ return durationMs;
+ }
+
+ public void setDurationMs(Long durationMs) {
+ this.durationMs = durationMs;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getExitCode() {
+ return exitCode;
+ }
+
+ public void setExitCode(String exitCode) {
+ this.exitCode = exitCode;
+ }
+
+ public String getExitMessage() {
+ return exitMessage;
+ }
+
+ public void setExitMessage(String exitMessage) {
+ this.exitMessage = exitMessage;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ public String getErrorStackTrace() {
+ return errorStackTrace;
+ }
+
+ public void setErrorStackTrace(String errorStackTrace) {
+ this.errorStackTrace = errorStackTrace;
+ }
+
+ public Long getReadCount() {
+ return readCount;
+ }
+
+ public void setReadCount(Long readCount) {
+ this.readCount = readCount;
+ }
+
+ public Long getWriteCount() {
+ return writeCount;
+ }
+
+ public void setWriteCount(Long writeCount) {
+ this.writeCount = writeCount;
+ }
+
+ public Long getCommitCount() {
+ return commitCount;
+ }
+
+ public void setCommitCount(Long commitCount) {
+ this.commitCount = commitCount;
+ }
+
+ public Long getRollbackCount() {
+ return rollbackCount;
+ }
+
+ public void setRollbackCount(Long rollbackCount) {
+ this.rollbackCount = rollbackCount;
+ }
+
+ public Long getSkipCount() {
+ return skipCount;
+ }
+
+ public void setSkipCount(Long skipCount) {
+ this.skipCount = skipCount;
+ }
+
+ public String getBatchIds() {
+ return batchIds;
+ }
+
+ public void setBatchIds(String batchIds) {
+ this.batchIds = batchIds;
+ }
+
+ public String getInferenceId() {
+ return inferenceId;
+ }
+
+ public void setInferenceId(String inferenceId) {
+ this.inferenceId = inferenceId;
+ }
+
+ public LocalDateTime getCreatedAt() {
+ return createdAt;
+ }
+
+ public void setCreatedAt(LocalDateTime createdAt) {
+ this.createdAt = createdAt;
+ }
+
+ public LocalDateTime getUpdatedAt() {
+ return updatedAt;
+ }
+
+ public void setUpdatedAt(LocalDateTime updatedAt) {
+ this.updatedAt = updatedAt;
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/partitioner/MapIdPartitioner.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/partitioner/MapIdPartitioner.java
new file mode 100644
index 0000000..e0707de
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/partitioner/MapIdPartitioner.java
@@ -0,0 +1,140 @@
+package com.kamco.makesample.batch.partitioner;
+
+import com.kamco.makesample.config.ConverterProperties;
+import com.kamco.makesample.repository.InferenceResultRepository;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.ExitStatus;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.StepExecutionListener;
+import org.springframework.batch.core.annotation.AfterStep;
+import org.springframework.batch.core.annotation.BeforeStep;
+import org.springframework.batch.core.partition.support.Partitioner;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.stereotype.Component;
+
+/**
+ * Map ID별 파티션 생성
+ *
+ *
batch_ids로부터 고유한 map_id 목록을 조회하여 각 map_id마다 ExecutionContext를 생성합니다. 각 파티션은 독립적으로 shapefile과
+ * geojson을 생성합니다.
+ */
+@Component
+public class MapIdPartitioner implements Partitioner, StepExecutionListener {
+
+ private static final Logger log = LoggerFactory.getLogger(MapIdPartitioner.class);
+
+ private final InferenceResultRepository repository;
+ private final ConverterProperties properties;
+
+ private String geometryType; // Populated in @BeforeStep, used in partition()
+
+ public MapIdPartitioner(InferenceResultRepository repository, ConverterProperties properties) {
+ this.repository = repository;
+ this.properties = properties;
+ }
+
+ @BeforeStep
+ public void beforeStep(StepExecution stepExecution) {
+ log.info("MapIdPartitioner.beforeStep() - retrieving geometryType from job context");
+
+ ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
+
+ if (jobExecutionContext.containsKey("geometryType")) {
+ this.geometryType = jobExecutionContext.getString("geometryType");
+ log.info("Retrieved geometryType from job context: {}", this.geometryType);
+ } else {
+ String errorMsg =
+ "geometryType not found in job execution context. "
+ + "GeometryTypeValidationTasklet must run before partitioning.";
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+
+ if (this.geometryType == null || this.geometryType.isEmpty()) {
+ throw new IllegalStateException("geometryType is null or empty in job context");
+ }
+
+ log.info("MapIdPartitioner ready with geometryType: {}", this.geometryType);
+ }
+
+ @AfterStep
+ public ExitStatus afterStep(StepExecution stepExecution) {
+ return ExitStatus.COMPLETED;
+ }
+
+ @Override
+ public Map partition(int gridSize) {
+ List batchIds = properties.getBatchIds();
+ String inferenceId = properties.getInferenceId();
+ String outputBaseDir = properties.getOutputBaseDir();
+
+ log.info("Creating partitions for batch_ids: {}, inference_id: {}", batchIds, inferenceId);
+
+ // batch_ids로 고유한 map_id 목록 조회
+ List mapIds = repository.findMapIdByBatchIds(batchIds);
+
+ if (mapIds.isEmpty()) {
+ log.warn("No map_ids found for batch_ids: {}", batchIds);
+ return new HashMap<>();
+ }
+
+ log.info("Found {} map_ids to partition: {}", mapIds.size(), mapIds);
+
+ // 각 map_id마다 ExecutionContext 생성
+ Map partitions = new HashMap<>();
+
+ for (String mapId : mapIds) {
+ ExecutionContext context = new ExecutionContext();
+
+ // 파티션별 파라미터 설정
+ context.putString("mapId", mapId);
+ context.putString("outputPath", buildShapefilePath(outputBaseDir, inferenceId, mapId));
+ context.putString("geoJsonOutputPath", buildGeoJsonPath(outputBaseDir, inferenceId, mapId));
+
+ // Propagate geometryType to partition context
+ context.putString("geometryType", this.geometryType);
+
+ partitions.put("partition-" + mapId, context);
+
+ log.debug(
+ "Created partition for map_id: {}, shapefile: {}, geojson: {}, geometryType: {}",
+ mapId,
+ context.getString("outputPath"),
+ context.getString("geoJsonOutputPath"),
+ context.getString("geometryType"));
+ }
+
+ log.info("Created {} partitions with geometryType: {}", partitions.size(), this.geometryType);
+
+ return partitions;
+ }
+
+ /**
+ * Shapefile 출력 경로 생성
+ *
+ * @param baseDir 기본 디렉토리
+ * @param inferenceId Inference ID
+ * @param mapId Map ID
+ * @return Shapefile 경로
+ */
+ private String buildShapefilePath(String baseDir, String inferenceId, String mapId) {
+ return Paths.get(baseDir, inferenceId, mapId, mapId + ".shp").toString();
+ }
+
+ /**
+ * GeoJSON 출력 경로 생성
+ *
+ * @param baseDir 기본 디렉토리
+ * @param inferenceId Inference ID
+ * @param mapId Map ID
+ * @return GeoJSON 경로
+ */
+ private String buildGeoJsonPath(String baseDir, String inferenceId, String mapId) {
+ return Paths.get(baseDir, inferenceId, mapId, mapId + ".geojson").toString();
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java
new file mode 100644
index 0000000..b2e3d9c
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java
@@ -0,0 +1,135 @@
+package com.kamco.makesample.batch.processor;
+
+import com.kamco.makesample.batch.util.FeatureTypeFactory;
+import com.kamco.makesample.model.InferenceResult;
+import org.geotools.api.feature.simple.SimpleFeature;
+import org.geotools.api.feature.simple.SimpleFeatureType;
+import org.geotools.api.referencing.FactoryException;
+import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
+import org.geotools.feature.simple.SimpleFeatureBuilder;
+import org.geotools.referencing.CRS;
+import org.locationtech.jts.geom.Geometry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.annotation.BeforeStep;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemProcessor;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * InferenceResult → SimpleFeature 변환 Processor
+ *
+ * 기존 ShapefileWriter의 buildFeature 로직을 Processor로 분리
+ *
+ *
주요 역할:
+ *
+ *
+ * - Geometry 검증 (null 체크, isValid 체크)
+ *
- InferenceResult 필드를 SimpleFeature 속성으로 변환
+ *
- Invalid geometry는 skip (null 반환)
+ *
+ */
+@Component
+@StepScope
+public class FeatureConversionProcessor implements ItemProcessor {
+
+ private static final Logger log = LoggerFactory.getLogger(FeatureConversionProcessor.class);
+
+ private final FeatureTypeFactory featureTypeFactory;
+
+ @Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
+ private String crsCode;
+
+ private SimpleFeatureBuilder featureBuilder;
+ private SimpleFeatureType featureType;
+
+ public FeatureConversionProcessor(FeatureTypeFactory featureTypeFactory) {
+ this.featureTypeFactory = featureTypeFactory;
+ }
+
+ @BeforeStep
+ public void beforeStep(StepExecution stepExecution) {
+ // Job ExecutionContext에서 geometry type 읽기 (이전 Step에서 설정한 값)
+ ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
+ String geomTypeStr = null;
+ Class> geometryType;
+
+ // geometryType이 설정되어 있는지 확인 (빈 데이터셋인 경우 설정되지 않을 수 있음)
+ if (jobExecutionContext.containsKey("geometryType")) {
+ geomTypeStr = jobExecutionContext.getString("geometryType");
+ } else {
+ log.warn("geometryType not set in Job ExecutionContext (empty dataset). Using default");
+ }
+
+ geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
+
+ try {
+ // CRS 설정
+ CoordinateReferenceSystem crs = CRS.decode(crsCode);
+
+ // FeatureType 생성
+ this.featureType = featureTypeFactory.createFeatureType(crs, geometryType);
+ this.featureBuilder = new SimpleFeatureBuilder(this.featureType);
+
+ log.info(
+ "FeatureConversionProcessor initialized with geometry type: {}",
+ geometryType.getSimpleName());
+
+ } catch (FactoryException e) {
+ throw new RuntimeException("Failed to initialize FeatureConversionProcessor", e);
+ }
+ }
+
+ @Override
+ public SimpleFeature process(InferenceResult result) throws Exception {
+ // Geometry 검증
+ Geometry geometry = result.getGeometry();
+
+ if (geometry == null) {
+ log.warn("Null geometry detected for uid: {} - skipping", result.getUid());
+ return null; // Skip this item
+ }
+
+ if (!geometry.isValid()) {
+ log.warn(
+ "Invalid geometry detected for uid: {} - skipping. Reason: {}",
+ result.getUid(),
+ geometry.getGeometryType());
+ return null; // Skip invalid geometry
+ }
+
+ // SimpleFeature 빌드
+ return buildFeature(result, geometry);
+ }
+
+ /**
+ * InferenceResult를 SimpleFeature로 변환
+ *
+ * 기존 ShapefileWriter.buildFeature() 로직과 동일
+ *
+ * @param result InferenceResult
+ * @param geometry Geometry
+ * @return SimpleFeature
+ */
+ private SimpleFeature buildFeature(InferenceResult result, Geometry geometry) {
+ // Geometry 추가 (the_geom)
+ featureBuilder.add(geometry);
+
+ // 속성 필드 추가
+ featureBuilder.add(result.getUid());
+ featureBuilder.add(result.getMapId());
+ featureBuilder.add(
+ result.getProbability() != null ? String.valueOf(result.getProbability()) : "0.0");
+ featureBuilder.add(result.getBeforeYear() != null ? result.getBeforeYear() : 0L);
+ featureBuilder.add(result.getAfterYear() != null ? result.getAfterYear() : 0L);
+ featureBuilder.add(result.getBeforeC());
+ featureBuilder.add(result.getBeforeP() != null ? String.valueOf(result.getBeforeP()) : "0.0");
+ featureBuilder.add(result.getAfterC());
+ featureBuilder.add(result.getAfterP() != null ? String.valueOf(result.getAfterP()) : "0.0");
+
+ return featureBuilder.buildFeature(null);
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/GeometryConvertingRowMapper.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/GeometryConvertingRowMapper.java
new file mode 100644
index 0000000..c3fe37a
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/GeometryConvertingRowMapper.java
@@ -0,0 +1,56 @@
+package com.kamco.makesample.batch.reader;
+
+import com.kamco.makesample.model.InferenceResult;
+import com.kamco.makesample.service.GeometryConverter;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.springframework.jdbc.core.RowMapper;
+import org.springframework.stereotype.Component;
+
+/**
+ * ResultSet을 InferenceResult로 변환하는 RowMapper
+ *
+ *
기존 InferenceResultRepository의 InferenceResultRowMapper와 동일한 로직을 사용하지만, Spring Batch의
+ * ItemReader와 함께 사용하도록 독립 Component로 분리
+ */
+@Component
+public class GeometryConvertingRowMapper implements RowMapper {
+
+ private final GeometryConverter geometryConverter;
+
+ public GeometryConvertingRowMapper(GeometryConverter geometryConverter) {
+ this.geometryConverter = geometryConverter;
+ }
+
+ @Override
+ public InferenceResult mapRow(ResultSet rs, int rowNum) throws SQLException {
+ InferenceResult result = new InferenceResult();
+ result.setUid(rs.getString("uid"));
+ result.setMapId(rs.getString("map_id"));
+ result.setProbability(getDoubleOrNull(rs, "probability"));
+ result.setBeforeYear(getLongOrNull(rs, "before_year"));
+ result.setAfterYear(getLongOrNull(rs, "after_year"));
+ result.setBeforeC(rs.getString("before_c"));
+ result.setBeforeP(getDoubleOrNull(rs, "before_p"));
+ result.setAfterC(rs.getString("after_c"));
+ result.setAfterP(getDoubleOrNull(rs, "after_p"));
+
+ // WKT → JTS Geometry 변환 (per-record conversion)
+ String geometryWkt = rs.getString("geometry_wkt");
+ if (geometryWkt != null) {
+ result.setGeometry(geometryConverter.convertWKTToJTS(geometryWkt));
+ }
+
+ return result;
+ }
+
+ private Long getLongOrNull(ResultSet rs, String columnName) throws SQLException {
+ long value = rs.getLong(columnName);
+ return rs.wasNull() ? null : value;
+ }
+
+ private Double getDoubleOrNull(ResultSet rs, String columnName) throws SQLException {
+ double value = rs.getDouble(columnName);
+ return rs.wasNull() ? null : value;
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/InferenceResultItemReaderConfig.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/InferenceResultItemReaderConfig.java
new file mode 100644
index 0000000..fa77218
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/InferenceResultItemReaderConfig.java
@@ -0,0 +1,221 @@
+package com.kamco.makesample.batch.reader;
+
+import com.kamco.makesample.model.InferenceResult;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import javax.sql.DataSource;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.item.database.JdbcCursorItemReader;
+import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.core.PreparedStatementSetter;
+
+/**
+ * 커서 기반 ItemReader 설정
+ *
+ * 메모리 최적화의 핵심: 전체 데이터를 List로 로딩하지 않고 커서를 사용하여 스트리밍 처리
+ *
+ *
주요 특징:
+ *
+ *
+ * - fetch-size: 1000 → DB에서 1000건씩 가져옴
+ *
- cursor-based → 전체 ResultSet을 메모리에 로딩하지 않음
+ *
- PreparedStatement → PostgreSQL array 파라미터 처리
+ *
- EPSG:5186 좌표계 정합성 검증 (SRID, 좌표 범위, geometry 유효성)
+ *
+ */
+@Configuration
+public class InferenceResultItemReaderConfig {
+
+ // EPSG:5186 좌표계 정합성 검증 조건:
+ // - SRID = 5186 (한국 2000 / 중부 좌표계)
+ // - ST_IsValid() = true (geometry 유효성)
+ // - X 범위: 125,000 ~ 530,000m (동서 방향)
+ // - Y 범위: -600,000 ~ 988,000m (남북 방향)
+ // 위 조건을 만족하지 않는 잘못된 좌표의 polygon은 배치 대상에서 제외됨
+ private static final String SQL_QUERY =
+ """
+ SELECT uid, map_id, probability, before_year, after_year,
+ before_c, before_p, after_c, after_p,
+ ST_AsText(geometry) as geometry_wkt
+ FROM inference_results_testing
+ WHERE batch_id = ANY(?)
+ AND after_c IS NOT NULL
+ AND after_p IS NOT NULL
+ AND geometry IS NOT NULL
+ AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
+ AND ST_SRID(geometry) = 5186
+ AND ST_IsValid(geometry) = true
+ AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
+ AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
+ ORDER BY map_id, uid
+ """;
+
+ /**
+ * MERGED 모드용 ItemReader (Shapefile 생성용)
+ *
+ * 전체 batch_ids에 대한 데이터를 스트리밍으로 읽어옴
+ *
+ * @param dataSource DataSource
+ * @param batchIdsParam Job Parameter로 전달받은 batch_ids (콤마 구분 문자열, 예: "252,253,257")
+ * @param fetchSize fetch size (기본 1000)
+ * @param rowMapper RowMapper
+ * @return JdbcCursorItemReader
+ */
+ @Bean
+ @StepScope
+ public JdbcCursorItemReader shapefileReader(
+ DataSource dataSource,
+ @Value("#{jobParameters['batchIds']}") String batchIdsParam,
+ @Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
+ GeometryConvertingRowMapper rowMapper) {
+
+ // JobParameter로 받은 "252,253,257" 형태를 Long 배열로 변환
+ Long[] batchIds = parseBatchIds(batchIdsParam);
+
+ return new JdbcCursorItemReaderBuilder()
+ .name("shapefileReader")
+ .dataSource(dataSource)
+ .sql(SQL_QUERY)
+ .fetchSize(fetchSize) // 메모리 효율을 위한 fetch size
+ .rowMapper(rowMapper)
+ .preparedStatementSetter(
+ new PreparedStatementSetter() {
+ @Override
+ public void setValues(PreparedStatement ps) throws SQLException {
+ // PostgreSQL array 파라미터 설정
+ Connection conn = ps.getConnection();
+ Array sqlArray = conn.createArrayOf("bigint", batchIds);
+ ps.setArray(1, sqlArray);
+ }
+ })
+ .build();
+ }
+
+ /**
+ * MERGED 모드용 ItemReader (GeoJSON 생성용)
+ *
+ * 전체 batch_ids에 대한 데이터를 스트리밍으로 읽어옴 (Shapefile과 동일한 데이터)
+ *
+ * @param dataSource DataSource
+ * @param batchIdsParam Job Parameter로 전달받은 batch_ids (콤마 구분 문자열, 예: "252,253,257")
+ * @param fetchSize fetch size (기본 1000)
+ * @param rowMapper RowMapper
+ * @return JdbcCursorItemReader
+ */
+ @Bean
+ @StepScope
+ public JdbcCursorItemReader geoJsonReader(
+ DataSource dataSource,
+ @Value("#{jobParameters['batchIds']}") String batchIdsParam,
+ @Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
+ GeometryConvertingRowMapper rowMapper) {
+
+ // JobParameter로 받은 "252,253,257" 형태를 Long 배열로 변환
+ Long[] batchIds = parseBatchIds(batchIdsParam);
+
+ return new JdbcCursorItemReaderBuilder()
+ .name("geoJsonReader")
+ .dataSource(dataSource)
+ .sql(SQL_QUERY)
+ .fetchSize(fetchSize) // 메모리 효율을 위한 fetch size
+ .rowMapper(rowMapper)
+ .preparedStatementSetter(
+ new PreparedStatementSetter() {
+ @Override
+ public void setValues(PreparedStatement ps) throws SQLException {
+ // PostgreSQL array 파라미터 설정
+ Connection conn = ps.getConnection();
+ Array sqlArray = conn.createArrayOf("bigint", batchIds);
+ ps.setArray(1, sqlArray);
+ }
+ })
+ .build();
+ }
+
+ /**
+ * MAP_IDS 모드용 ItemReader
+ *
+ * 특정 map_id에 대한 데이터만 읽어옴
+ *
+ * @param dataSource DataSource
+ * @param batchIdsParam Job Parameter로 전달받은 batch_ids
+ * @param mapId Step Execution Context에서 전달받은 map_id
+ * @param fetchSize fetch size
+ * @param rowMapper RowMapper
+ * @return JdbcCursorItemReader
+ */
+ @Bean
+ @StepScope
+ public JdbcCursorItemReader mapIdModeReader(
+ DataSource dataSource,
+ @Value("#{jobParameters['batchIds']}") String batchIdsParam,
+ @Value("#{stepExecutionContext['mapId']}") String mapId,
+ @Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
+ GeometryConvertingRowMapper rowMapper) {
+
+ Long[] batchIds = parseBatchIds(batchIdsParam);
+
+ String sqlWithMapId =
+ """
+ SELECT uid, map_id, probability, before_year, after_year,
+ before_c, before_p, after_c, after_p,
+ ST_AsText(geometry) as geometry_wkt
+ FROM inference_results_testing
+ WHERE batch_id = ANY(?)
+ AND map_id = ?
+ AND after_c IS NOT NULL
+ AND after_p IS NOT NULL
+ AND geometry IS NOT NULL
+ AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
+ AND ST_SRID(geometry) = 5186
+ AND ST_IsValid(geometry) = true
+ AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
+ AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
+ ORDER BY uid
+ """;
+
+ return new JdbcCursorItemReaderBuilder()
+ .name("mapIdModeReader")
+ .dataSource(dataSource)
+ .sql(sqlWithMapId)
+ .fetchSize(fetchSize)
+ .rowMapper(rowMapper)
+ .preparedStatementSetter(
+ new PreparedStatementSetter() {
+ @Override
+ public void setValues(PreparedStatement ps) throws SQLException {
+ Connection conn = ps.getConnection();
+ Array sqlArray = conn.createArrayOf("bigint", batchIds);
+ ps.setArray(1, sqlArray);
+ ps.setString(2, mapId);
+ }
+ })
+ .build();
+ }
+
+ /**
+ * JobParameter 문자열을 Long 배열로 변환
+ *
+ * @param batchIdsParam "252,253,257" 형태의 문자열
+ * @return Long 배열
+ */
+ private Long[] parseBatchIds(String batchIdsParam) {
+ if (batchIdsParam == null || batchIdsParam.trim().isEmpty()) {
+ throw new IllegalArgumentException("batchIds parameter is required");
+ }
+
+ String[] parts = batchIdsParam.split(",");
+ Long[] batchIds = new Long[parts.length];
+
+ for (int i = 0; i < parts.length; i++) {
+ batchIds[i] = Long.parseLong(parts[i].trim());
+ }
+
+ return batchIds;
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/repository/BatchExecutionHistoryRepository.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/repository/BatchExecutionHistoryRepository.java
new file mode 100644
index 0000000..91ec00a
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/repository/BatchExecutionHistoryRepository.java
@@ -0,0 +1,206 @@
+package com.kamco.makesample.batch.repository;
+
+import com.kamco.makesample.batch.model.BatchExecutionHistory;
+import java.sql.Timestamp;
+import java.time.Duration;
+import java.time.LocalDateTime;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Repository;
+
+/**
+ * 배치 실행 이력 Repository
+ *
+ * 스텝별 실행 이력을 데이터베이스에 저장하고 조회
+ */
+@Repository
+public class BatchExecutionHistoryRepository {
+
+ private final JdbcTemplate jdbcTemplate;
+
+ public BatchExecutionHistoryRepository(JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+ /**
+ * 스텝 시작 시 이력 생성
+ *
+ * @param history 배치 실행 이력
+ * @return 생성된 이력의 ID
+ */
+ public Long insert(BatchExecutionHistory history) {
+ String sql =
+ """
+ INSERT INTO batch_execution_history (
+ job_execution_id, step_execution_id, step_name, step_type,
+ start_time, status, batch_ids, inference_id
+ ) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
+ RETURNING id
+ """;
+
+ return jdbcTemplate.queryForObject(
+ sql,
+ Long.class,
+ history.getJobExecutionId(),
+ history.getStepExecutionId(),
+ history.getStepName(),
+ history.getStepType(),
+ Timestamp.valueOf(history.getStartTime()),
+ history.getStatus(),
+ history.getBatchIds(),
+ history.getInferenceId());
+ }
+
+ /**
+ * 스텝 종료 시 이력 업데이트
+ *
+ * @param id 이력 ID
+ * @param endTime 종료 시간
+ * @param status 상태
+ * @param exitCode Exit Code
+ * @param exitMessage Exit Message
+ * @param errorMessage 에러 메시지
+ * @param errorStackTrace 스택 트레이스
+ * @param readCount Read Count
+ * @param writeCount Write Count
+ * @param commitCount Commit Count
+ * @param rollbackCount Rollback Count
+ * @param skipCount Skip Count
+ */
+ public void updateOnCompletion(
+ Long id,
+ LocalDateTime endTime,
+ LocalDateTime startTime,
+ String status,
+ String exitCode,
+ String exitMessage,
+ String errorMessage,
+ String errorStackTrace,
+ Long readCount,
+ Long writeCount,
+ Long commitCount,
+ Long rollbackCount,
+ Long skipCount) {
+
+ // 소요 시간 계산
+ long durationMs = Duration.between(startTime, endTime).toMillis();
+
+ String sql =
+ """
+ UPDATE batch_execution_history
+ SET end_time = ?,
+ duration_ms = ?,
+ status = ?,
+ exit_code = ?,
+ exit_message = ?,
+ error_message = ?,
+ error_stack_trace = ?,
+ read_count = ?,
+ write_count = ?,
+ commit_count = ?,
+ rollback_count = ?,
+ skip_count = ?,
+ updated_at = CURRENT_TIMESTAMP
+ WHERE id = ?
+ """;
+
+ jdbcTemplate.update(
+ sql,
+ Timestamp.valueOf(endTime),
+ durationMs,
+ status,
+ exitCode,
+ exitMessage,
+ errorMessage,
+ errorStackTrace,
+ readCount,
+ writeCount,
+ commitCount,
+ rollbackCount,
+ skipCount,
+ id);
+ }
+
+ /**
+ * Job Execution ID로 모든 스텝 이력 조회
+ *
+ * @param jobExecutionId Job Execution ID
+ * @return 이력 목록
+ */
+ public java.util.List findByJobExecutionId(Long jobExecutionId) {
+ String sql =
+ """
+ SELECT * FROM batch_execution_history
+ WHERE job_execution_id = ?
+ ORDER BY start_time
+ """;
+
+ return jdbcTemplate.query(
+ sql,
+ (rs, rowNum) -> {
+ BatchExecutionHistory history = new BatchExecutionHistory();
+ history.setId(rs.getLong("id"));
+ history.setJobExecutionId(rs.getLong("job_execution_id"));
+ history.setStepExecutionId(rs.getLong("step_execution_id"));
+ history.setStepName(rs.getString("step_name"));
+ history.setStepType(rs.getString("step_type"));
+ history.setStartTime(rs.getTimestamp("start_time").toLocalDateTime());
+
+ Timestamp endTimestamp = rs.getTimestamp("end_time");
+ if (endTimestamp != null) {
+ history.setEndTime(endTimestamp.toLocalDateTime());
+ }
+
+ history.setDurationMs(rs.getLong("duration_ms"));
+ history.setStatus(rs.getString("status"));
+ history.setExitCode(rs.getString("exit_code"));
+ history.setExitMessage(rs.getString("exit_message"));
+ history.setErrorMessage(rs.getString("error_message"));
+ history.setReadCount(rs.getLong("read_count"));
+ history.setWriteCount(rs.getLong("write_count"));
+ history.setCommitCount(rs.getLong("commit_count"));
+ history.setRollbackCount(rs.getLong("rollback_count"));
+ history.setSkipCount(rs.getLong("skip_count"));
+ history.setBatchIds(rs.getString("batch_ids"));
+ history.setInferenceId(rs.getString("inference_id"));
+ return history;
+ },
+ jobExecutionId);
+ }
+
+ /**
+ * 최근 N개 실행 이력 조회
+ *
+ * @param limit 조회 개수
+ * @return 이력 목록
+ */
+ public java.util.List findRecent(int limit) {
+ String sql =
+ """
+ SELECT * FROM batch_execution_history
+ ORDER BY start_time DESC
+ LIMIT ?
+ """;
+
+ return jdbcTemplate.query(
+ sql,
+ (rs, rowNum) -> {
+ BatchExecutionHistory history = new BatchExecutionHistory();
+ history.setId(rs.getLong("id"));
+ history.setJobExecutionId(rs.getLong("job_execution_id"));
+ history.setStepExecutionId(rs.getLong("step_execution_id"));
+ history.setStepName(rs.getString("step_name"));
+ history.setStartTime(rs.getTimestamp("start_time").toLocalDateTime());
+
+ Timestamp endTimestamp = rs.getTimestamp("end_time");
+ if (endTimestamp != null) {
+ history.setEndTime(endTimestamp.toLocalDateTime());
+ }
+
+ history.setDurationMs(rs.getLong("duration_ms"));
+ history.setStatus(rs.getString("status"));
+ history.setExitCode(rs.getString("exit_code"));
+ return history;
+ },
+ limit);
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/CreateZipTasklet.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/CreateZipTasklet.java
new file mode 100644
index 0000000..7a0d96d
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/CreateZipTasklet.java
@@ -0,0 +1,59 @@
+package com.kamco.makesample.batch.tasklet;
+
+import com.kamco.makesample.writer.ResultZipWriter;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * ZIP 파일 생성 Tasklet
+ *
+ * 기존 ResultZipWriter를 재사용하여 shapefile 관련 파일들을 압축
+ */
+@Component
+@StepScope
+public class CreateZipTasklet implements Tasklet {
+
+ private static final Logger log = LoggerFactory.getLogger(CreateZipTasklet.class);
+
+ @Value("#{jobParameters['outputPath']}")
+ private String outputPath;
+
+ @Value("#{jobParameters['zipBaseName']}")
+ private String zipBaseName;
+
+ @Override
+ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
+ throws Exception {
+
+ log.info("Creating ZIP file for shapefile: {}", outputPath);
+
+ // outputPath에서 디렉토리 추출
+ Path shapefilePath = Paths.get(outputPath);
+ Path dirPath = shapefilePath.getParent();
+
+ // 기존 ResultZipWriter 재사용
+ ResultZipWriter.createZip(dirPath, zipBaseName);
+
+ log.info("ZIP file created successfully: {}.zip", zipBaseName);
+
+ // ZIP 파일 경로를 JobExecutionContext에 저장 (GeoServer 등록에서 사용)
+ String zipPath = dirPath.resolve(zipBaseName + ".zip").toString();
+ chunkContext
+ .getStepContext()
+ .getStepExecution()
+ .getJobExecution()
+ .getExecutionContext()
+ .putString("zipFilePath", zipPath);
+
+ return RepeatStatus.FINISHED;
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeoServerRegistrationTasklet.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeoServerRegistrationTasklet.java
new file mode 100644
index 0000000..6a694b2
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeoServerRegistrationTasklet.java
@@ -0,0 +1,83 @@
+package com.kamco.makesample.batch.tasklet;
+
+import com.kamco.makesample.service.GeoServerRegistrationService;
+import java.io.File;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * GeoServer 등록 Tasklet
+ *
+ *
기존 GeoServerRegistrationService를 재사용하여 shapefile을 GeoServer에 등록
+ *
+ *
등록 방식: external.shp 엔드포인트를 사용한 file:// 경로 참조 (모든 파일 크기)
+ *
+ *
Conditional execution: geoserver.enabled=false 이면 skip
+ */
+@Component
+@StepScope
+public class GeoServerRegistrationTasklet implements Tasklet {
+
+ private static final Logger log = LoggerFactory.getLogger(GeoServerRegistrationTasklet.class);
+
+ private final GeoServerRegistrationService geoServerService;
+
+ @Value("#{jobParameters['geoserver.enabled'] ?: false}")
+ private boolean geoServerEnabled;
+
+ @Value("#{jobParameters['layerName']}")
+ private String layerName;
+
+ public GeoServerRegistrationTasklet(GeoServerRegistrationService geoServerService) {
+ this.geoServerService = geoServerService;
+ }
+
+ @Override
+ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
+ throws Exception {
+
+ if (!geoServerEnabled) {
+ log.info("GeoServer registration is disabled. Skipping.");
+ return RepeatStatus.FINISHED;
+ }
+
+ log.info("Starting GeoServer registration for layer: {}", layerName);
+
+ // JobExecutionContext에서 ZIP 파일 경로 가져오기
+ String zipPath =
+ (String)
+ chunkContext
+ .getStepContext()
+ .getStepExecution()
+ .getJobExecution()
+ .getExecutionContext()
+ .get("zipFilePath");
+
+ if (zipPath == null) {
+ log.error("ZIP file path not found in JobExecutionContext");
+ throw new IllegalStateException("ZIP file path not available for GeoServer registration");
+ }
+
+ // Log file size for monitoring
+ File zipFile = new File(zipPath);
+ long fileSize = zipFile.length();
+ long fileSizeMB = fileSize / 1024 / 1024;
+ log.info("ZIP file size: {} bytes ({} MB)", fileSize, fileSizeMB);
+
+ // Register using file:// path reference (external.shp endpoint) for all file sizes
+ log.info("Using file path reference method (external.shp endpoint)");
+ log.info("Note: GeoServer must have read access to the file path: {}", zipPath);
+ geoServerService.registerShapefileByPath(zipPath, layerName);
+
+ log.info("GeoServer registration completed successfully for layer: {}", layerName);
+
+ return RepeatStatus.FINISHED;
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeometryTypeValidationTasklet.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeometryTypeValidationTasklet.java
new file mode 100644
index 0000000..70ce750
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeometryTypeValidationTasklet.java
@@ -0,0 +1,318 @@
+package com.kamco.makesample.batch.tasklet;
+
+import com.kamco.makesample.exception.MixedGeometryException;
+import java.sql.Array;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.util.ArrayList;
+import java.util.List;
+import javax.sql.DataSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.StepContribution;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.core.scope.context.ChunkContext;
+import org.springframework.batch.core.step.tasklet.Tasklet;
+import org.springframework.batch.repeat.RepeatStatus;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * Geometry Type 검증 Tasklet
+ *
+ *
Shapefile은 homogeneous geometry type을 요구하므로, chunk 처리 전에 사전 검증 필요
+ *
+ *
주요 역할:
+ *
+ *
+ * - SQL DISTINCT 쿼리로 geometry type 확인 (ST_Polygon, ST_MultiPolygon만 조회)
+ *
- 지원하지 않는 geometry 타입 발견 시 즉시 에러 발생 (fast-fail)
+ *
- StepExecutionContext에 geometry type 저장 (Writer가 사용)
+ *
+ */
+@Component
+@StepScope
+public class GeometryTypeValidationTasklet implements Tasklet {
+
+ private static final Logger log = LoggerFactory.getLogger(GeometryTypeValidationTasklet.class);
+
+ private final DataSource dataSource;
+
+ @Value("#{jobParameters['batchIds']}")
+ private String batchIdsParam;
+
+ public GeometryTypeValidationTasklet(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ @Override
+ public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
+ throws Exception {
+
+ log.info("========================================");
+ log.info("Step 1: Geometry Type Validation");
+ log.info("========================================");
+ log.info("Validating geometry types for batch_ids: {}", batchIdsParam);
+
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 1. JobParameter를 Long 배열로 변환
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 예: "252,253,257" → [252L, 253L, 257L]
+ Long[] batchIds = parseBatchIds(batchIdsParam);
+ log.debug("Parsed batch IDs: {}", (Object) batchIds);
+
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 1.1 전체 row 개수 조회 (검증 전)
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ String countAllSql =
+ """
+ SELECT COUNT(*) as total_count
+ FROM inference_results_testing
+ WHERE batch_id = ANY(?)
+ AND geometry IS NOT NULL
+ """;
+
+ long totalRows = 0;
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(countAllSql)) {
+ Array sqlArray = conn.createArrayOf("bigint", batchIds);
+ ps.setArray(1, sqlArray);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ totalRows = rs.getLong("total_count");
+ }
+ }
+ }
+ log.info("Total rows with non-null geometry: {}", totalRows);
+
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 2. SQL로 고유한 geometry type 조회 및 좌표계 검증
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // ST_GeometryType()는 "ST_Polygon", "ST_MultiPolygon" 등을 반환
+ // DISTINCT로 고유한 타입만 조회
+ // ST_Polygon, ST_MultiPolygon만 허용 (Point, LineString 등은 제외)
+ // geometry IS NOT NULL 조건으로 null geometry 제외
+ //
+ // EPSG:5186 좌표계 정합성 검증:
+ // - SRID가 5186인지 확인
+ // - 유효 범위: X(125000~530000m), Y(-600000~988000m) - 한국 중부 영역
+ // - ST_IsValid()로 geometry 유효성 검증
+ String sql =
+ """
+ SELECT DISTINCT ST_GeometryType(geometry) as geom_type
+ FROM inference_results_testing
+ WHERE batch_id = ANY(?)
+ AND geometry IS NOT NULL
+ AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
+ AND ST_SRID(geometry) = 5186
+ AND ST_IsValid(geometry) = true
+ AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
+ AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
+ """;
+
+ List geometryTypes = new ArrayList<>();
+
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(sql)) {
+
+ // PostgreSQL array 파라미터 설정
+ Array sqlArray = conn.createArrayOf("bigint", batchIds);
+ ps.setArray(1, sqlArray);
+
+ try (ResultSet rs = ps.executeQuery()) {
+ while (rs.next()) {
+ String geomType = rs.getString("geom_type");
+ geometryTypes.add(geomType);
+ log.debug("Found geometry type: {}", geomType);
+ }
+ }
+ }
+
+ log.info("Found {} distinct geometry type(s): {}", geometryTypes.size(), geometryTypes);
+
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 2.1 검증 통과 row 개수 조회
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ String countValidSql =
+ """
+ SELECT COUNT(*) as valid_count
+ FROM inference_results_testing
+ WHERE batch_id = ANY(?)
+ AND geometry IS NOT NULL
+ AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
+ AND ST_SRID(geometry) = 5186
+ AND ST_IsValid(geometry) = true
+ AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
+ AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
+ """;
+
+ long validRows = 0;
+ try (Connection conn = dataSource.getConnection();
+ PreparedStatement ps = conn.prepareStatement(countValidSql)) {
+ Array sqlArray = conn.createArrayOf("bigint", batchIds);
+ ps.setArray(1, sqlArray);
+ try (ResultSet rs = ps.executeQuery()) {
+ if (rs.next()) {
+ validRows = rs.getLong("valid_count");
+ }
+ }
+ }
+
+ long excludedRows = totalRows - validRows;
+ log.info("========================================");
+ log.info("📊 Geometry Validation Summary:");
+ log.info(" Total rows: {}", totalRows);
+ log.info(" Valid rows: {} (EPSG:5186 compliant)", validRows);
+ log.info(" Excluded rows: {} (invalid geometry or out of range)", excludedRows);
+ if (excludedRows > 0) {
+ log.warn(
+ "⚠️ {} rows excluded due to invalid geometry or coordinate out of EPSG:5186 range",
+ excludedRows);
+ }
+ log.info("========================================");
+
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 3. Mixed geometry type 체크 및 자동 변환 안내
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // Shapefile 제약사항: 하나의 shapefile은 단일 geometry type만 허용
+ // MultiPolygon이 포함된 경우 자동으로 Polygon으로 변환됨 (GeometryConverter)
+ //
+ // ⚠️ 참고: SQL 필터로 ST_Polygon, ST_MultiPolygon만 조회하므로
+ // 이론적으로는 이 두 타입만 존재해야 함
+ // 만약 다른 타입이 섞여 있다면 데이터 정합성 문제
+ if (geometryTypes.size() > 1) {
+ // ST_Polygon과 ST_MultiPolygon이 섞인 경우 → 자동 변환 허용
+ boolean hasPolygon = geometryTypes.stream().anyMatch(t -> t.equals("ST_Polygon"));
+ boolean hasMultiPolygon = geometryTypes.stream().anyMatch(t -> t.equals("ST_MultiPolygon"));
+
+ if (hasPolygon && hasMultiPolygon && geometryTypes.size() == 2) {
+ log.info("========================================");
+ log.info("ℹ️ Mixed geometry types detected:");
+ log.info(" Types: {}", geometryTypes);
+ log.info("");
+ log.info("✅ Auto-conversion enabled:");
+ log.info(" ST_MultiPolygon → ST_Polygon (first polygon only)");
+ log.info(" This will unify all geometries to ST_Polygon type");
+ log.info("========================================");
+
+ // Polygon을 기본 타입으로 설정 (자동 변환 후 모든 geometry가 Polygon이 됨)
+ // Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
+ chunkContext
+ .getStepContext()
+ .getStepExecution()
+ .getJobExecution()
+ .getExecutionContext()
+ .putString("geometryType", "ST_Polygon");
+
+ log.info("✅ Geometry type validation PASSED with auto-conversion");
+ log.info(" Target Type: ST_Polygon");
+ log.info(" MultiPolygon geometries will be converted during processing");
+ log.info("========================================");
+
+ return RepeatStatus.FINISHED;
+ }
+
+ // 그 외의 혼합 타입은 즉시 에러 발생 (fast-fail)
+ // 예: ST_Polygon + ST_Point 등 (하지만 SQL 필터로 이미 제외되었어야 함)
+ log.error("❌ Unexpected mixed geometry types detected: {}", geometryTypes);
+ log.error("Shapefile requires homogeneous geometry type");
+ log.error("Only Polygon + MultiPolygon mix is supported with auto-conversion");
+ throw new MixedGeometryException(
+ "Shapefile requires homogeneous geometry type. Found: "
+ + geometryTypes
+ + ". Only Polygon + MultiPolygon mix is supported.");
+ }
+
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 4. 빈 데이터셋 체크
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 모든 geometry가 null이거나 데이터가 없는 경우
+ // 경고만 출력하고 통과 (Reader에서 읽을 데이터가 없으므로 Writer까지 가지 않음)
+ if (geometryTypes.isEmpty()) {
+ log.warn("========================================");
+ log.warn("WARNING: No valid geometries found in dataset");
+ log.warn("This may indicate:");
+ log.warn(" 1. All geometries are NULL");
+ log.warn(" 2. No data exists for the given batch_ids");
+ log.warn(" 3. Database connection issues");
+ log.warn("========================================");
+ log.warn("Proceeding with empty dataset (no files will be generated)");
+
+ // 빈 데이터셋이지만 Writer 초기화를 위해 기본 geometry type 설정
+ // ST_Polygon을 기본값으로 사용 (이 프로젝트의 주요 geometry type)
+ // Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
+ chunkContext
+ .getStepContext()
+ .getStepExecution()
+ .getJobExecution()
+ .getExecutionContext()
+ .putString("geometryType", "ST_Polygon");
+
+ log.info("Set default geometry type to ST_Polygon for empty dataset");
+
+ // 빈 데이터셋이지만 Step은 성공으로 처리
+ // 다음 Step(Reader)에서 읽을 데이터가 없으므로 자연스럽게 종료됨
+ return RepeatStatus.FINISHED;
+ }
+
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 5. StepExecutionContext에 geometry type 저장
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 이후 Step에서 사용:
+ // - FeatureConversionProcessor: featureType 생성 시 사용
+ // - StreamingShapefileWriter: shapefile schema 생성 시 사용
+ String geometryType = geometryTypes.get(0);
+
+ // MultiPolygon 타입인 경우 Polygon으로 변환됨을 안내
+ if (geometryType.equals("ST_MultiPolygon")) {
+ log.info("========================================");
+ log.info("ℹ️ Geometry type: {}", geometryType);
+ log.info("");
+ log.info("✅ Auto-conversion will be applied:");
+ log.info(" ST_MultiPolygon → ST_Polygon (first polygon only)");
+ log.info(" All MultiPolygon geometries will be converted during processing");
+ log.info("========================================");
+
+ // Polygon으로 저장 (변환 후 타입)
+ geometryType = "ST_Polygon";
+ }
+
+ // Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
+ chunkContext
+ .getStepContext()
+ .getStepExecution()
+ .getJobExecution()
+ .getExecutionContext()
+ .putString("geometryType", geometryType);
+
+ log.info("========================================");
+ log.info("✅ Geometry type validation PASSED");
+ log.info(" Geometry Type: {}", geometryType);
+ log.info(" All geometries are homogeneous (or will be converted)");
+ log.info("========================================");
+
+ return RepeatStatus.FINISHED;
+ }
+
+ /**
+ * JobParameter 문자열을 Long 배열로 변환
+ *
+ * @param batchIdsParam "252,253,257" 형태의 문자열
+ * @return Long 배열
+ */
+ private Long[] parseBatchIds(String batchIdsParam) {
+ if (batchIdsParam == null || batchIdsParam.trim().isEmpty()) {
+ throw new IllegalArgumentException("batchIds parameter is required");
+ }
+
+ String[] parts = batchIdsParam.split(",");
+ Long[] batchIds = new Long[parts.length];
+
+ for (int i = 0; i < parts.length; i++) {
+ batchIds[i] = Long.parseLong(parts[i].trim());
+ }
+
+ return batchIds;
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/util/FeatureTypeFactory.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/util/FeatureTypeFactory.java
new file mode 100644
index 0000000..9ee4c10
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/util/FeatureTypeFactory.java
@@ -0,0 +1,75 @@
+package com.kamco.makesample.batch.util;
+
+import org.geotools.api.feature.simple.SimpleFeatureType;
+import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
+import org.geotools.feature.simple.SimpleFeatureTypeBuilder;
+import org.locationtech.jts.geom.Geometry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * SimpleFeatureType 생성 유틸리티
+ *
+ * Processor와 Writer가 공유하는 featureType 생성 로직
+ *
+ *
일관성 보장: 동일한 스키마를 사용하여 feature 생성 및 저장
+ */
+@Component
+public class FeatureTypeFactory {
+
+ private static final Logger log = LoggerFactory.getLogger(FeatureTypeFactory.class);
+
+ /**
+ * SimpleFeatureType 생성
+ *
+ * @param crs CoordinateReferenceSystem
+ * @param geomType Geometry 타입 (Polygon, MultiPolygon 등)
+ * @return SimpleFeatureType
+ */
+ public SimpleFeatureType createFeatureType(CoordinateReferenceSystem crs, Class> geomType) {
+ SimpleFeatureTypeBuilder builder = new SimpleFeatureTypeBuilder();
+ builder.setName("inference_results");
+ builder.setCRS(crs);
+
+ // Geometry 필드를 기본 geometry로 설정
+ builder.add("the_geom", geomType);
+ builder.setDefaultGeometry("the_geom");
+
+ // 속성 필드들
+ builder.add("uid", String.class);
+ builder.add("map_id", String.class);
+ builder.add("chn_dtct_p", String.class);
+ builder.add("cprs_yr", Long.class);
+ builder.add("crtr_yr", Long.class);
+ builder.add("bf_cls_cd", String.class);
+ builder.add("bf_cls_pro", String.class);
+ builder.add("af_cls_cd", String.class);
+ builder.add("af_cls_pro", String.class);
+
+ return builder.buildFeatureType();
+ }
+
+ /**
+ * Geometry type 문자열을 Class로 변환
+ *
+ * @param geomTypeStr "ST_Polygon", "Polygon" 등
+ * @return Geometry Class
+ */
+ public Class> parseGeometryType(String geomTypeStr) {
+ if (geomTypeStr == null || geomTypeStr.isEmpty()) {
+ return Geometry.class;
+ }
+
+ // PostGIS ST_GeometryType() 함수는 "ST_Polygon" 형태로 반환
+ // "ST_" 접두어 제거
+ String typeName = geomTypeStr.replace("ST_", "");
+
+ try {
+ return Class.forName("org.locationtech.jts.geom." + typeName);
+ } catch (ClassNotFoundException e) {
+ log.warn("Unknown geometry type: {}, using Geometry.class", typeName);
+ return Geometry.class;
+ }
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdGeoJsonWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdGeoJsonWriter.java
new file mode 100644
index 0000000..83aab92
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdGeoJsonWriter.java
@@ -0,0 +1,212 @@
+package com.kamco.makesample.batch.writer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import org.geotools.api.feature.simple.SimpleFeature;
+import org.geotools.geojson.feature.FeatureJSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.annotation.AfterStep;
+import org.springframework.batch.core.annotation.BeforeStep;
+import org.springframework.batch.core.annotation.OnWriteError;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.item.Chunk;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamException;
+import org.springframework.batch.item.ItemStreamWriter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * Map ID별 GeoJSON Writer
+ *
+ *
Partitioned Step의 worker에서 실행되며, StepExecutionContext로부터 map_id별 geoJsonOutputPath를 읽어 개별
+ * GeoJSON 파일을 생성합니다.
+ *
+ *
StreamingGeoJsonWriter와 동일한 스트리밍 패턴을 사용하여 메모리 효율적으로 처리합니다.
+ */
+@Component
+@StepScope
+public class MapIdGeoJsonWriter implements ItemStreamWriter {
+
+ private static final Logger log = LoggerFactory.getLogger(MapIdGeoJsonWriter.class);
+
+ @Value("#{stepExecutionContext['mapId']}")
+ private String mapId;
+
+ @Value("#{stepExecutionContext['geoJsonOutputPath']}")
+ private String outputPath;
+
+ private FileOutputStream outputStream;
+ private FeatureJSON featureJSON;
+
+ private int chunkCount = 0;
+ private int totalRecordCount = 0;
+ private boolean isFirstChunk = true;
+
+ @BeforeStep
+ public void beforeStep(StepExecution stepExecution) {
+ log.info(
+ "MapIdGeoJsonWriter injected values: mapId='{}', outputPath='{}'",
+ this.mapId,
+ this.outputPath);
+
+ // @Value로 주입된 값 검증
+ ExecutionContext executionContext = stepExecution.getExecutionContext();
+
+ if (this.mapId == null || this.outputPath == null) {
+ throw new IllegalStateException(
+ String.format(
+ "MapIdGeoJsonWriter requires non-null 'mapId' and 'geoJsonOutputPath' from @Value injection. "
+ + "Got mapId='%s', geoJsonOutputPath='%s'. Available keys in ExecutionContext: %s",
+ this.mapId, this.outputPath, executionContext.entrySet()));
+ }
+
+ log.info("MapIdGeoJsonWriter initialized for map_id: {}, output: {}", mapId, outputPath);
+
+ // 출력 디렉토리 생성
+ try {
+ Path outputDir = Paths.get(outputPath).getParent();
+ if (outputDir != null && !Files.exists(outputDir)) {
+ Files.createDirectories(outputDir);
+ log.info("Created output directory for map_id {} GeoJSON: {}", mapId, outputDir);
+ }
+ } catch (IOException e) {
+ throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e);
+ }
+ }
+
+ @Override
+ public void open(ExecutionContext executionContext) throws ItemStreamException {
+ log.info("Opening GeoJSON writer for map_id: {}", mapId);
+
+ try {
+ File geoJsonFile = new File(outputPath);
+ outputStream = new FileOutputStream(geoJsonFile);
+ featureJSON = new FeatureJSON();
+
+ // GeoJSON FeatureCollection 시작
+ outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
+
+ log.info("GeoJSON file initialized for map_id: {}", mapId);
+
+ } catch (IOException e) {
+ throw new ItemStreamException("Failed to open GeoJSON file for map_id " + mapId, e);
+ }
+ }
+
+ @Override
+ public void write(Chunk extends SimpleFeature> chunk) throws Exception {
+ if (chunk.isEmpty()) {
+ return;
+ }
+
+ chunkCount++;
+ List items = (List) chunk.getItems();
+ int itemCount = items.size();
+ totalRecordCount += itemCount;
+
+ log.debug(
+ "[map_id: {}] Writing chunk #{} to GeoJSON with {} features (total: {})",
+ mapId,
+ chunkCount,
+ itemCount,
+ totalRecordCount);
+
+ // 각 feature를 GeoJSON으로 변환하여 append
+ for (int i = 0; i < items.size(); i++) {
+ SimpleFeature feature = items.get(i);
+
+ // 첫 번째 feature가 아니면 콤마 추가
+ if (!isFirstChunk || i > 0) {
+ outputStream.write(",".getBytes());
+ }
+
+ // Feature를 GeoJSON으로 직렬화
+ // StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
+ StringWriter stringWriter = new StringWriter();
+ featureJSON.writeFeature(feature, stringWriter);
+ outputStream.write(stringWriter.toString().getBytes());
+ }
+
+ isFirstChunk = false;
+
+ log.debug("[map_id: {}] Chunk #{} written to GeoJSON successfully", mapId, chunkCount);
+ }
+
+ @AfterStep
+ public void afterStep() {
+ log.info(
+ "[map_id: {}] All chunks written to GeoJSON. Total {} records in {} chunks",
+ mapId,
+ totalRecordCount,
+ chunkCount);
+
+ try {
+ if (outputStream != null) {
+ // GeoJSON FeatureCollection 종료
+ outputStream.write("]}".getBytes());
+ outputStream.flush();
+ log.info("[map_id: {}] GeoJSON file finalized successfully", mapId);
+ }
+ } catch (IOException e) {
+ log.error("[map_id: {}] Failed to finalize GeoJSON file", mapId, e);
+ throw new ItemStreamException("Failed to finalize GeoJSON file for map_id: " + mapId, e);
+ } finally {
+ cleanup();
+ }
+ }
+
+ @Override
+ public void close() throws ItemStreamException {
+ cleanup();
+ }
+
+ @OnWriteError
+ public void onError(Exception exception, Chunk extends SimpleFeature> chunk) {
+ log.error(
+ "[map_id: {}] Error writing chunk #{} to GeoJSON: {}",
+ mapId,
+ chunkCount,
+ exception.getMessage(),
+ exception);
+
+ cleanup();
+
+ // 부분 파일 삭제
+ try {
+ File geoJsonFile = new File(outputPath);
+ if (geoJsonFile.exists()) {
+ geoJsonFile.delete();
+ log.info("[map_id: {}] Deleted partial GeoJSON file", mapId);
+ }
+ } catch (Exception e) {
+ log.warn("[map_id: {}] Failed to delete partial GeoJSON file", mapId, e);
+ }
+ }
+
+ private void cleanup() {
+ if (outputStream != null) {
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ log.warn("[map_id: {}] Failed to close GeoJSON output stream", mapId, e);
+ }
+ outputStream = null;
+ }
+ }
+
+ @Override
+ public void update(ExecutionContext executionContext) throws ItemStreamException {
+ // Checkpoint
+ executionContext.putInt("chunkCount", chunkCount);
+ executionContext.putInt("totalRecordCount", totalRecordCount);
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java
new file mode 100644
index 0000000..7da293f
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java
@@ -0,0 +1,290 @@
+package com.kamco.makesample.batch.writer;
+
+import com.kamco.makesample.batch.util.FeatureTypeFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.geotools.api.data.SimpleFeatureStore;
+import org.geotools.api.data.Transaction;
+import org.geotools.api.feature.simple.SimpleFeature;
+import org.geotools.api.feature.simple.SimpleFeatureType;
+import org.geotools.api.referencing.FactoryException;
+import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
+import org.geotools.data.DefaultTransaction;
+import org.geotools.data.collection.ListFeatureCollection;
+import org.geotools.data.shapefile.ShapefileDataStore;
+import org.geotools.data.shapefile.ShapefileDataStoreFactory;
+import org.geotools.referencing.CRS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.annotation.AfterStep;
+import org.springframework.batch.core.annotation.BeforeStep;
+import org.springframework.batch.core.annotation.OnWriteError;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.item.Chunk;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamException;
+import org.springframework.batch.item.ItemStreamWriter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * Map ID별 Shapefile Writer
+ *
+ * Partitioned Step의 worker에서 실행되며, StepExecutionContext로부터 map_id별 outputPath를 읽어 개별 shapefile을
+ * 생성합니다.
+ *
+ *
StreamingShapefileWriter와 동일한 스트리밍 패턴을 사용하여 메모리 효율적으로 처리합니다.
+ */
+@Component
+@StepScope
+public class MapIdShapefileWriter implements ItemStreamWriter {
+
+ private static final Logger log = LoggerFactory.getLogger(MapIdShapefileWriter.class);
+
+ private final FeatureTypeFactory featureTypeFactory;
+
+ @Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
+ private String crsCode;
+
+ @Value("#{stepExecutionContext['mapId']}")
+ private String mapId;
+
+ @Value("#{stepExecutionContext['outputPath']}")
+ private String outputPath;
+
+ @Value("#{stepExecutionContext['geometryType']}")
+ private String geometryTypeStr;
+
+ private ShapefileDataStore dataStore;
+ private Transaction transaction;
+ private SimpleFeatureStore featureStore;
+ private SimpleFeatureType featureType;
+
+ private int chunkCount = 0;
+ private int totalRecordCount = 0;
+
+ private Class> geometryType;
+
+ public MapIdShapefileWriter(FeatureTypeFactory featureTypeFactory) {
+ this.featureTypeFactory = featureTypeFactory;
+ }
+
+ @BeforeStep
+ public void beforeStep(StepExecution stepExecution) {
+ log.info("===== MapIdShapefileWriter.beforeStep() START =====");
+ log.info(
+ "Injected values: mapId='{}', outputPath='{}', geometryTypeStr='{}'",
+ this.mapId,
+ this.outputPath,
+ this.geometryTypeStr);
+
+ ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
+
+ // Validate all required injections
+ if (this.mapId == null || this.outputPath == null) {
+ String errorMsg =
+ String.format(
+ "MapIdShapefileWriter requires non-null 'mapId' and 'outputPath'. "
+ + "Got mapId='%s', outputPath='%s'. Available keys: %s",
+ this.mapId, this.outputPath, stepExecutionContext.entrySet());
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+
+ // Validate geometryTypeStr (fail fast)
+ if (this.geometryTypeStr == null || this.geometryTypeStr.isEmpty()) {
+ String errorMsg =
+ String.format(
+ "MapIdShapefileWriter requires non-null 'geometryType' from stepExecutionContext. "
+ + "Got geometryTypeStr='%s'. Should be propagated by MapIdPartitioner. "
+ + "Available keys: %s",
+ this.geometryTypeStr, stepExecutionContext.entrySet());
+ log.error(errorMsg);
+ throw new IllegalStateException(errorMsg);
+ }
+
+ log.info(
+ "MapIdShapefileWriter initialized for map_id: {}, output: {}, geometryType: {}",
+ mapId,
+ outputPath,
+ geometryTypeStr);
+ }
+
+ @Override
+ public void open(ExecutionContext executionContext) throws ItemStreamException {
+ log.info("Opening shapefile writer for map_id: {}", mapId);
+ log.info("Using geometryTypeStr from stepExecutionContext: {}", geometryTypeStr);
+
+ // 출력 디렉토리 생성 (GeoTools가 파일을 만들기 전에 반드시 필요)
+ try {
+ Path outputDir = Paths.get(outputPath).getParent();
+ if (outputDir != null && !Files.exists(outputDir)) {
+ Files.createDirectories(outputDir);
+ log.info("Created output directory for map_id {}: {}", mapId, outputDir);
+ }
+ } catch (IOException e) {
+ throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e);
+ }
+
+ try {
+ // Direct parsing (no fallback) - validated in beforeStep()
+ this.geometryType = featureTypeFactory.parseGeometryType(geometryTypeStr);
+ log.info(
+ "Parsed geometry type for map_id {}: {} (from: {})",
+ mapId,
+ geometryType.getSimpleName(),
+ geometryTypeStr);
+
+ // CRS 설정
+ CoordinateReferenceSystem crs = CRS.decode(crsCode);
+
+ // SimpleFeatureType 생성
+ featureType = featureTypeFactory.createFeatureType(crs, geometryType);
+
+ // ShapefileDataStore 생성
+ File shpFile = new File(outputPath);
+ ShapefileDataStoreFactory factory = new ShapefileDataStoreFactory();
+
+ Map params = new HashMap<>();
+ params.put("url", shpFile.toURI().toURL());
+ params.put("create spatial index", Boolean.TRUE);
+
+ dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
+ dataStore.createSchema(featureType);
+
+ // Transaction 시작
+ transaction = new DefaultTransaction("create-" + mapId);
+
+ // FeatureStore 가져오기
+ String typeName = dataStore.getTypeNames()[0];
+ featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
+ featureStore.setTransaction(transaction);
+
+ log.info(
+ "ShapefileDataStore initialized for map_id: {} with geometry type: {}",
+ mapId,
+ geometryType.getSimpleName());
+
+ } catch (FactoryException e) {
+ throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
+ } catch (IOException e) {
+ throw new ItemStreamException("Failed to create shapefile for map_id " + mapId, e);
+ }
+ }
+
+ @Override
+ public void write(Chunk extends SimpleFeature> chunk) throws Exception {
+ if (chunk.isEmpty()) {
+ return;
+ }
+
+ chunkCount++;
+ List items = (List) chunk.getItems();
+ int itemCount = items.size();
+ totalRecordCount += itemCount;
+
+ log.debug(
+ "[map_id: {}] Writing chunk #{} with {} features (total: {})",
+ mapId,
+ chunkCount,
+ itemCount,
+ totalRecordCount);
+
+ // ListFeatureCollection으로 변환하여 FeatureStore에 추가
+ ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
+ featureStore.addFeatures(collection);
+
+ log.debug("[map_id: {}] Chunk #{} written successfully", mapId, chunkCount);
+ }
+
+ @AfterStep
+ public void afterStep() {
+ log.info(
+ "[map_id: {}] AfterStep called. Total {} records in {} chunks",
+ mapId,
+ totalRecordCount,
+ chunkCount);
+ }
+
+ @Override
+ public void close() throws ItemStreamException {
+ log.info(
+ "[map_id: {}] Closing shapefile writer. Committing {} records in {} chunks",
+ mapId,
+ totalRecordCount,
+ chunkCount);
+
+ try {
+ if (transaction != null) {
+ transaction.commit();
+ log.info("[map_id: {}] Transaction committed successfully", mapId);
+ }
+ } catch (IOException e) {
+ log.error("[map_id: {}] Failed to commit transaction", mapId, e);
+ throw new ItemStreamException(
+ "Failed to commit shapefile transaction for map_id: " + mapId, e);
+ } finally {
+ cleanup();
+ }
+ }
+
+ @OnWriteError
+ public void onError(Exception exception, Chunk extends SimpleFeature> chunk) {
+ log.error(
+ "[map_id: {}] Error writing chunk #{}: {}",
+ mapId,
+ chunkCount,
+ exception.getMessage(),
+ exception);
+
+ try {
+ if (transaction != null) {
+ transaction.rollback();
+ log.info("[map_id: {}] Transaction rolled back", mapId);
+ }
+
+ // 부분 파일 삭제
+ File shpFile = new File(outputPath);
+ if (shpFile.exists()) {
+ shpFile.delete();
+ log.info("[map_id: {}] Deleted partial shapefile", mapId);
+ }
+
+ } catch (IOException e) {
+ log.error("[map_id: {}] Failed to rollback transaction", mapId, e);
+ } finally {
+ cleanup();
+ }
+ }
+
+ private void cleanup() {
+ if (transaction != null) {
+ try {
+ transaction.close();
+ } catch (IOException e) {
+ log.warn("[map_id: {}] Failed to close transaction", mapId, e);
+ }
+ transaction = null;
+ }
+
+ if (dataStore != null) {
+ dataStore.dispose();
+ dataStore = null;
+ }
+ }
+
+ @Override
+ public void update(ExecutionContext executionContext) throws ItemStreamException {
+ // Checkpoint
+ executionContext.putInt("chunkCount", chunkCount);
+ executionContext.putInt("totalRecordCount", totalRecordCount);
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingGeoJsonWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingGeoJsonWriter.java
new file mode 100644
index 0000000..8be830c
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingGeoJsonWriter.java
@@ -0,0 +1,189 @@
+package com.kamco.makesample.batch.writer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import org.geotools.api.feature.simple.SimpleFeature;
+import org.geotools.geojson.feature.FeatureJSON;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.annotation.AfterStep;
+import org.springframework.batch.core.annotation.BeforeStep;
+import org.springframework.batch.core.annotation.OnWriteError;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.item.Chunk;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamException;
+import org.springframework.batch.item.ItemStreamWriter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * 스트리밍 GeoJSON Writer
+ *
+ * StreamingShapefileWriter와 유사한 패턴으로 chunk 단위 스트리밍 처리
+ *
+ *
메모리 효과:
+ *
+ *
+ * - 기존: 전체 데이터를 DefaultFeatureCollection에 누적
+ *
- 신규: chunk 단위로 GeoJSON 스트림에 append
+ *
+ */
+@Component
+@StepScope
+public class StreamingGeoJsonWriter implements ItemStreamWriter {
+
+ private static final Logger log = LoggerFactory.getLogger(StreamingGeoJsonWriter.class);
+
+ @Value("#{jobParameters['geoJsonOutputPath']}")
+ private String outputPath;
+
+ private FileOutputStream outputStream;
+ private FeatureJSON featureJSON;
+
+ private int chunkCount = 0;
+ private int totalRecordCount = 0;
+
+ private boolean isFirstChunk = true;
+
+ @BeforeStep
+ public void beforeStep() {
+ // 출력 디렉토리 생성
+ try {
+ Path outputDir = Paths.get(outputPath).getParent();
+ if (outputDir != null && !Files.exists(outputDir)) {
+ Files.createDirectories(outputDir);
+ log.info("Created output directory for GeoJSON: {}", outputDir);
+ }
+ } catch (IOException e) {
+ throw new ItemStreamException("Failed to create output directory", e);
+ }
+ }
+
+ @Override
+ public void open(ExecutionContext executionContext) throws ItemStreamException {
+ log.info("Opening StreamingGeoJsonWriter for: {}", outputPath);
+
+ try {
+ File geoJsonFile = new File(outputPath);
+ outputStream = new FileOutputStream(geoJsonFile);
+ featureJSON = new FeatureJSON();
+
+ // GeoJSON FeatureCollection 시작
+ outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
+
+ log.info("GeoJSON file initialized successfully");
+
+ } catch (IOException e) {
+ throw new ItemStreamException("Failed to open GeoJSON file: " + outputPath, e);
+ }
+ }
+
+ @Override
+ public void write(Chunk extends SimpleFeature> chunk) throws Exception {
+ if (chunk.isEmpty()) {
+ return;
+ }
+
+ chunkCount++;
+ List items = (List) chunk.getItems();
+ int itemCount = items.size();
+ totalRecordCount += itemCount;
+
+ log.debug(
+ "Writing chunk #{} to GeoJSON with {} features (total so far: {})",
+ chunkCount,
+ itemCount,
+ totalRecordCount);
+
+ // 각 feature를 GeoJSON으로 변환하여 append
+ for (int i = 0; i < items.size(); i++) {
+ SimpleFeature feature = items.get(i);
+
+ // 첫 번째 feature가 아니면 콤마 추가
+ if (!isFirstChunk || i > 0) {
+ outputStream.write(",".getBytes());
+ }
+
+ // Feature를 GeoJSON으로 직렬화
+ // StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
+ StringWriter stringWriter = new StringWriter();
+ featureJSON.writeFeature(feature, stringWriter);
+ outputStream.write(stringWriter.toString().getBytes());
+ }
+
+ isFirstChunk = false;
+
+ log.debug("Chunk #{} written to GeoJSON successfully", chunkCount);
+ }
+
+ @AfterStep
+ public void afterStep() {
+ log.info(
+ "All chunks written to GeoJSON. Total {} records in {} chunks",
+ totalRecordCount,
+ chunkCount);
+
+ try {
+ if (outputStream != null) {
+ // GeoJSON FeatureCollection 종료
+ outputStream.write("]}".getBytes());
+ outputStream.flush();
+ log.info("GeoJSON file finalized successfully");
+ }
+ } catch (IOException e) {
+ log.error("Failed to finalize GeoJSON file", e);
+ throw new ItemStreamException("Failed to finalize GeoJSON file", e);
+ } finally {
+ cleanup();
+ }
+ }
+
+ @Override
+ public void close() throws ItemStreamException {
+ cleanup();
+ }
+
+ @OnWriteError
+ public void onError(Exception exception, Chunk extends SimpleFeature> chunk) {
+ log.error(
+ "Error writing chunk #{} to GeoJSON: {}", chunkCount, exception.getMessage(), exception);
+
+ cleanup();
+
+ // 부분 파일 삭제
+ try {
+ File geoJsonFile = new File(outputPath);
+ if (geoJsonFile.exists()) {
+ geoJsonFile.delete();
+ log.info("Deleted partial GeoJSON file: {}", outputPath);
+ }
+ } catch (Exception e) {
+ log.warn("Failed to delete partial GeoJSON file", e);
+ }
+ }
+
+ private void cleanup() {
+ if (outputStream != null) {
+ try {
+ outputStream.close();
+ } catch (IOException e) {
+ log.warn("Failed to close GeoJSON output stream", e);
+ }
+ outputStream = null;
+ }
+ }
+
+ @Override
+ public void update(ExecutionContext executionContext) throws ItemStreamException {
+ // Checkpoint
+ executionContext.putInt("chunkCount", chunkCount);
+ executionContext.putInt("totalRecordCount", totalRecordCount);
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java
new file mode 100644
index 0000000..7928f4c
--- /dev/null
+++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java
@@ -0,0 +1,259 @@
+package com.kamco.makesample.batch.writer;
+
+import com.kamco.makesample.batch.util.FeatureTypeFactory;
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.geotools.api.data.SimpleFeatureStore;
+import org.geotools.api.data.Transaction;
+import org.geotools.api.feature.simple.SimpleFeature;
+import org.geotools.api.feature.simple.SimpleFeatureType;
+import org.geotools.api.referencing.FactoryException;
+import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
+import org.geotools.data.DefaultTransaction;
+import org.geotools.data.collection.ListFeatureCollection;
+import org.geotools.data.shapefile.ShapefileDataStore;
+import org.geotools.data.shapefile.ShapefileDataStoreFactory;
+import org.geotools.referencing.CRS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.StepExecution;
+import org.springframework.batch.core.annotation.AfterStep;
+import org.springframework.batch.core.annotation.BeforeStep;
+import org.springframework.batch.core.annotation.OnWriteError;
+import org.springframework.batch.core.configuration.annotation.StepScope;
+import org.springframework.batch.item.Chunk;
+import org.springframework.batch.item.ExecutionContext;
+import org.springframework.batch.item.ItemStreamException;
+import org.springframework.batch.item.ItemStreamWriter;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+/**
+ * 스트리밍 Shapefile Writer - 메모리 병목 해결의 핵심
+ *
+ * 기존 문제: DefaultFeatureCollection에 모든 feature를 누적 (500MB-4GB)
+ *
+ *
해결 방안: GeoTools Transaction API를 사용한 chunk 단위 incremental write
+ *
+ *
메모리 효과:
+ *
+ *
+ * - 기존: 전체 데이터 (500MB-4GB)
+ *
- 신규: 청크 크기만 (40MB per 1000 records)
+ *
+ *
+ * 동작 방식:
+ *
+ *
+ * - open(): 첫 번째 chunk 전에 DataStore 생성, Transaction 시작
+ *
- write(): 각 chunk를 ListFeatureCollection으로 변환하여 FeatureStore에 추가
+ *
- afterStep(): 모든 chunk 완료 후 Transaction commit
+ *
+ */
+@Component
+@StepScope
+public class StreamingShapefileWriter implements ItemStreamWriter {
+
+ private static final Logger log = LoggerFactory.getLogger(StreamingShapefileWriter.class);
+
+ private final FeatureTypeFactory featureTypeFactory;
+
+ @Value("#{jobParameters['outputPath']}")
+ private String outputPath;
+
+ @Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
+ private String crsCode;
+
+ private ShapefileDataStore dataStore;
+ private Transaction transaction;
+ private SimpleFeatureStore featureStore;
+ private SimpleFeatureType featureType;
+
+ private int chunkCount = 0;
+ private int totalRecordCount = 0;
+
+ private Class> geometryType; // Geometry type from validation tasklet
+
+ public StreamingShapefileWriter(FeatureTypeFactory featureTypeFactory) {
+ this.featureTypeFactory = featureTypeFactory;
+ }
+
+ @BeforeStep
+ public void beforeStep(StepExecution stepExecution) {
+ // Job ExecutionContext에서 geometry type 읽기 (이전 Step에서 설정한 값)
+ ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
+ String geomTypeStr = null;
+
+ // geometryType이 설정되어 있는지 확인 (빈 데이터셋인 경우 설정되지 않을 수 있음)
+ if (jobExecutionContext.containsKey("geometryType")) {
+ geomTypeStr = jobExecutionContext.getString("geometryType");
+ this.geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
+ log.info("Geometry type from validation: {}", geometryType.getSimpleName());
+ } else {
+ log.warn(
+ "geometryType not set in Job ExecutionContext (empty dataset). Will use default in open()");
+ }
+
+ // 출력 디렉토리 생성
+ try {
+ Path outputDir = Paths.get(outputPath).getParent();
+ if (outputDir != null && !Files.exists(outputDir)) {
+ Files.createDirectories(outputDir);
+ log.info("Created output directory: {}", outputDir);
+ }
+ } catch (IOException e) {
+ throw new ItemStreamException("Failed to create output directory", e);
+ }
+ }
+
+ @Override
+ public void open(ExecutionContext executionContext) throws ItemStreamException {
+ log.info("Opening StreamingShapefileWriter for: {}", outputPath);
+
+ try {
+ // CRS 설정
+ CoordinateReferenceSystem crs = CRS.decode(crsCode);
+
+ // Geometry type이 아직 설정되지 않은 경우 기본값 사용
+ if (geometryType == null) {
+ geometryType = featureTypeFactory.parseGeometryType(null);
+ log.warn("Geometry type not set, using default: Geometry.class");
+ }
+
+ // SimpleFeatureType 생성 (FeatureTypeFactory 사용)
+ featureType = featureTypeFactory.createFeatureType(crs, geometryType);
+
+ // ShapefileDataStore 생성
+ File shpFile = new File(outputPath);
+ ShapefileDataStoreFactory factory = new ShapefileDataStoreFactory();
+
+ Map params = new HashMap<>();
+ params.put("url", shpFile.toURI().toURL());
+ params.put("create spatial index", Boolean.TRUE);
+
+ dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
+ dataStore.createSchema(featureType);
+
+ // Transaction 시작
+ transaction = new DefaultTransaction("create");
+
+ // FeatureStore 가져오기
+ String typeName = dataStore.getTypeNames()[0];
+ featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
+ featureStore.setTransaction(transaction);
+
+ log.info("ShapefileDataStore initialized successfully");
+
+ } catch (FactoryException e) {
+ throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
+ } catch (IOException e) {
+ throw new ItemStreamException("Failed to create shapefile at: " + outputPath, e);
+ }
+ }
+
+ @Override
+ public void write(Chunk extends SimpleFeature> chunk) throws Exception {
+ if (chunk.isEmpty()) {
+ return;
+ }
+
+ chunkCount++;
+ List items = (List) chunk.getItems();
+ int itemCount = items.size();
+ totalRecordCount += itemCount;
+
+ log.debug(
+ "Writing chunk #{} with {} features (total so far: {})",
+ chunkCount,
+ itemCount,
+ totalRecordCount);
+
+ // ListFeatureCollection으로 변환 (청크만 담김)
+ ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
+
+ // FeatureStore에 추가 (트랜잭션은 열린 상태 유지)
+ featureStore.addFeatures(collection);
+
+ // 청크 완료 후 collection은 GC됨
+ log.debug("Chunk #{} written successfully", chunkCount);
+ }
+
+ @AfterStep
+ public void afterStep() {
+ log.info(
+ "All chunks written. Committing transaction for {} records in {} chunks",
+ totalRecordCount,
+ chunkCount);
+
+ try {
+ if (transaction != null) {
+ transaction.commit();
+ log.info("Transaction committed successfully");
+ }
+ } catch (IOException e) {
+ log.error("Failed to commit transaction", e);
+ throw new ItemStreamException("Failed to commit shapefile transaction", e);
+ } finally {
+ cleanup();
+ }
+ }
+
+ @Override
+ public void close() throws ItemStreamException {
+ cleanup();
+ }
+
+ @OnWriteError
+ public void onError(Exception exception, Chunk extends SimpleFeature> chunk) {
+ log.error("Error writing chunk #{}: {}", chunkCount, exception.getMessage(), exception);
+
+ try {
+ if (transaction != null) {
+ transaction.rollback();
+ log.info("Transaction rolled back due to error");
+ }
+
+ // 부분 파일 삭제
+ File shpFile = new File(outputPath);
+ if (shpFile.exists()) {
+ shpFile.delete();
+ log.info("Deleted partial shapefile: {}", outputPath);
+ }
+
+ } catch (IOException e) {
+ log.error("Failed to rollback transaction", e);
+ } finally {
+ cleanup();
+ }
+ }
+
+ private void cleanup() {
+ if (transaction != null) {
+ try {
+ transaction.close();
+ } catch (IOException e) {
+ log.warn("Failed to close transaction", e);
+ }
+ transaction = null;
+ }
+
+ if (dataStore != null) {
+ dataStore.dispose();
+ dataStore = null;
+ }
+ }
+
+ @Override
+ public void update(ExecutionContext executionContext) throws ItemStreamException {
+ // Checkpoint: chunk 완료 시 호출됨
+ executionContext.putInt("chunkCount", chunkCount);
+ executionContext.putInt("totalRecordCount", totalRecordCount);
+ }
+}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/cli/ConverterCommandLineRunner.java b/shp-exporter/src/main/java/com/kamco/makesample/cli/ConverterCommandLineRunner.java
index 131467d..4542d78 100755
--- a/shp-exporter/src/main/java/com/kamco/makesample/cli/ConverterCommandLineRunner.java
+++ b/shp-exporter/src/main/java/com/kamco/makesample/cli/ConverterCommandLineRunner.java
@@ -7,6 +7,11 @@ import java.nio.file.Paths;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.batch.core.Job;
+import org.springframework.batch.core.JobExecution;
+import org.springframework.batch.core.JobParameters;
+import org.springframework.batch.core.JobParametersBuilder;
+import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.DefaultApplicationArguments;
@@ -20,14 +25,20 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
private final ShapefileConverterService converterService;
private final GeoServerRegistrationService geoServerService;
private final ConverterProperties converterProperties;
+ private final JobLauncher jobLauncher;
+ private final Job mergedModeJob;
public ConverterCommandLineRunner(
ShapefileConverterService converterService,
GeoServerRegistrationService geoServerService,
- ConverterProperties converterProperties) {
+ ConverterProperties converterProperties,
+ JobLauncher jobLauncher,
+ Job mergedModeJob) {
this.converterService = converterService;
this.geoServerService = geoServerService;
this.converterProperties = converterProperties;
+ this.jobLauncher = jobLauncher;
+ this.mergedModeJob = mergedModeJob;
}
@Override
@@ -37,13 +48,21 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
List profiles = appArgs.getOptionValues("spring.profiles.active");
log.info("profiles.active={}", profiles);
+ // GeoServer 등록 모드
if (appArgs.containsOption("upload-shp")) {
handleRegistration(appArgs);
return;
}
- // Existing shapefile generation logic
- log.info("=== PostgreSQL to Shapefile Converter ===");
+ // Batch 모드 체크
+ if (appArgs.containsOption("batch") || appArgs.containsOption("use-batch")) {
+ handleBatchMode(appArgs);
+ return;
+ }
+
+ // 기존 로직 (Legacy mode) - 향후 deprecated 예정
+ log.warn("Running in LEGACY mode. Consider using --batch flag for better performance.");
+ log.info("=== PostgreSQL to Shapefile Converter (Legacy) ===");
log.info("Inference ID: {}", converterProperties.getInferenceId());
List mapIds = converterProperties.getMapIds();
@@ -67,6 +86,103 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
}
}
+ /**
+ * Spring Batch 기반 처리
+ *
+ * 메모리 최적화와 단계별 실행 지원
+ *
+ * @param appArgs ApplicationArguments
+ * @throws Exception Exception
+ */
+ private void handleBatchMode(ApplicationArguments appArgs) throws Exception {
+ log.info("=== Spring Batch Mode: Shapefile Converter ===");
+ log.info("Inference ID: {}", converterProperties.getInferenceId());
+ log.info("Batch IDs: {}", converterProperties.getBatchIds());
+ log.info("Output directory: {}", converterProperties.getOutputBaseDir());
+ log.info("CRS: {}", converterProperties.getCrs());
+ log.info("Chunk size: {}", converterProperties.getBatch().getChunkSize());
+ log.info("==============================================");
+
+ // Job Parameters 구성
+ JobParameters jobParams = buildJobParameters(appArgs);
+
+ // Job 실행
+ JobExecution execution = jobLauncher.run(mergedModeJob, jobParams);
+
+ log.info("==============================================");
+ log.info("Job Execution Status: {}", execution.getStatus());
+ log.info("Exit Status: {}", execution.getExitStatus());
+
+ if (execution.getStatus().isUnsuccessful()) {
+ log.error("Job execution failed");
+ System.exit(1);
+ }
+
+ log.info("Job completed successfully");
+ }
+
+ /**
+ * Job Parameters 구성
+ *
+ * @param appArgs ApplicationArguments
+ * @return JobParameters
+ */
+ private JobParameters buildJobParameters(ApplicationArguments appArgs) {
+ JobParametersBuilder builder = new JobParametersBuilder();
+
+ // 기본 파라미터
+ builder.addString("inferenceId", converterProperties.getInferenceId());
+ builder.addString("batchIds", joinBatchIds(converterProperties.getBatchIds()));
+ builder.addString("crs", converterProperties.getCrs());
+ builder.addLong("timestamp", System.currentTimeMillis()); // Job 실행 시각 (고유성 보장)
+
+ // 출력 경로 구성 (Shapefile + GeoJSON 생성, GeoServer는 Shapefile만 등록)
+ String outputDir =
+ Paths.get(
+ converterProperties.getOutputBaseDir(),
+ converterProperties.getInferenceId(),
+ "merge")
+ .toString();
+
+ String shapefilePath =
+ Paths.get(outputDir, converterProperties.getInferenceId() + ".shp").toString();
+ String geoJsonPath =
+ Paths.get(outputDir, converterProperties.getInferenceId() + ".geojson").toString();
+
+ builder.addString("outputPath", shapefilePath);
+ builder.addString("geoJsonOutputPath", geoJsonPath);
+ builder.addString("zipBaseName", converterProperties.getInferenceId());
+
+ // Layer name (GeoServer 등록용)
+ String layerName = converterProperties.getInferenceId();
+ builder.addString("layerName", layerName);
+
+ // GeoServer 등록 여부
+ boolean geoServerEnabled =
+ appArgs.containsOption("geoserver.enabled")
+ && Boolean.parseBoolean(firstOption(appArgs, "geoserver.enabled"));
+ builder.addString("geoserver.enabled", String.valueOf(geoServerEnabled));
+
+ // Batch 설정
+ builder.addLong("fetchSize", (long) converterProperties.getBatch().getFetchSize());
+
+ return builder.toJobParameters();
+ }
+
+ /**
+ * Batch IDs를 콤마로 구분된 문자열로 변환
+ *
+ * @param batchIds List of batch IDs
+ * @return "252,253,257" 형태의 문자열
+ */
+ private String joinBatchIds(List batchIds) {
+ if (batchIds == null || batchIds.isEmpty()) {
+ throw new IllegalStateException("batch-ids must be specified");
+ }
+
+ return String.join(",", batchIds.stream().map(String::valueOf).toArray(String[]::new));
+ }
+
private void handleRegistration(ApplicationArguments appArgs) {
// --help
if (appArgs.containsOption("help") || appArgs.containsOption("h")) {
@@ -74,35 +190,56 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
return;
}
- String filePath = firstOption(appArgs, "upload-shp");
- String layerName = firstOption(appArgs, "layer");
+ // Check for file path reference method (recommended for large files)
+ String filePathReference = firstOption(appArgs, "register-by-path");
+ boolean usePathReference = filePathReference != null && !filePathReference.isBlank();
- if (filePath == null || filePath.isBlank()) {
- log.info("No upload requested. Use --upload-shp option to upload a shapefile.");
+ // Check for upload method (traditional, file size limited)
+ String uploadFilePath = firstOption(appArgs, "upload-shp");
+ boolean useUpload = uploadFilePath != null && !uploadFilePath.isBlank();
+
+ if (!usePathReference && !useUpload) {
+ log.info("No upload or registration requested.");
printUsage();
return;
}
+ if (usePathReference && useUpload) {
+ log.error("Cannot use both --upload-shp and --register-by-path at the same time.");
+ log.error("Choose one method:");
+ log.error(" --upload-shp: Upload file content via REST API (< 100MB)");
+ log.error(" --register-by-path: Reference file path (500MB+)");
+ System.exit(1);
+ }
+
+ String filePath = usePathReference ? filePathReference : uploadFilePath;
+ String layerName = firstOption(appArgs, "layer");
+
if (layerName == null || layerName.isBlank()) {
String fileName = Paths.get(filePath).getFileName().toString();
layerName = fileName.replaceAll("(?i)\\.(zip|shp)$", ""); // 대소문자도 처리
}
log.info("========================================");
- log.info("Shapefile Upload to GeoServer");
+ log.info("Shapefile {} to GeoServer", usePathReference ? "Registration" : "Upload");
log.info("========================================");
+ log.info("Method: {}", usePathReference ? "File Path Reference" : "REST API Upload");
log.info("Input File: {}", filePath);
log.info("Layer Name: {}", layerName);
log.info("========================================");
try {
- geoServerService.uploadShapefileZip(filePath, layerName);
+ if (usePathReference) {
+ geoServerService.registerShapefileByPath(filePath, layerName);
+ } else {
+ geoServerService.uploadShapefileZip(filePath, layerName);
+ }
log.info("========================================");
- log.info("Upload completed successfully!");
+ log.info("{} completed successfully!", usePathReference ? "Registration" : "Upload");
log.info("========================================");
} catch (Exception e) {
log.error("========================================");
- log.error("Upload failed: {}", e.getMessage(), e);
+ log.error("{} failed: {}", usePathReference ? "Registration" : "Upload", e.getMessage(), e);
log.error("========================================");
throw e;
}
@@ -147,21 +284,44 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
System.out.println();
System.out.println("Options:");
System.out.println(
- " --upload-shp Upload shapefile to GeoServer (.shp or .zip)");
+ " --upload-shp Upload shapefile via REST API (< 100MB)");
System.out.println(
- " --layer Specify layer name (optional, defaults to filename)");
- System.out.println(" --help, -h Show this help message");
+ " --register-by-path Register shapefile using file path (500MB+)");
+ System.out.println(
+ " --layer Specify layer name (optional, defaults to filename)");
+ System.out.println(" --help, -h Show this help message");
+ System.out.println();
+ System.out.println("GeoServer Registration Methods:");
+ System.out.println();
+ System.out.println(" 1. REST API Upload (--upload-shp):");
+ System.out.println(" - Uploads file content to GeoServer via HTTP");
+ System.out.println(" - File size limit: < 100MB (HTTP payload limit)");
+ System.out.println(" - Use for small to medium files");
+ System.out.println();
+ System.out.println(" 2. File Path Reference (--register-by-path):");
+ System.out.println(" - GeoServer reads file from its local file system");
+ System.out.println(" - No file size limit (supports 500MB ~ 2GB+)");
+ System.out.println(" - Requirements:");
+ System.out.println(" * GeoServer must have file system access to the path");
+ System.out.println(" * Path must be absolute (e.g., /data/model_output/...)");
+ System.out.println(" * File must be readable by GeoServer user");
System.out.println();
System.out.println("Examples:");
- System.out.println(" # Upload ZIP file directly");
- System.out.println(" java -jar shp-exporter.jar --upload-shp /path/to/shapefile.zip");
System.out.println();
- System.out.println(" # Upload .shp file (will auto-create ZIP with related files)");
- System.out.println(" java -jar shp-exporter.jar --upload-shp /path/to/shapefile.shp");
+ System.out.println(" # Small file (< 100MB): Upload via REST API");
+ System.out.println(" java -jar shp-exporter.jar --upload-shp /path/to/small_file.zip");
System.out.println();
- System.out.println(" # Specify custom layer name");
+ System.out.println(" # Large file (500MB+): Register by file path");
System.out.println(
- " java -jar shp-exporter.jar --upload-shp /path/to/shapefile.shp --layer my_layer");
+ " java -jar shp-exporter.jar --register-by-path"
+ + " /data/model_output/export/inference_id/merge/large_file.zip");
+ System.out.println();
+ System.out.println(" # With custom layer name");
+ System.out.println(
+ " java -jar shp-exporter.jar --register-by-path /path/to/file.zip --layer my_layer");
+ System.out.println();
+ System.out.println(" # Auto-create ZIP from .shp file (upload method)");
+ System.out.println(" java -jar shp-exporter.jar --upload-shp /path/to/shapefile.shp");
System.out.println();
}
}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/config/ConverterProperties.java b/shp-exporter/src/main/java/com/kamco/makesample/config/ConverterProperties.java
index aae98bf..5946d0d 100755
--- a/shp-exporter/src/main/java/com/kamco/makesample/config/ConverterProperties.java
+++ b/shp-exporter/src/main/java/com/kamco/makesample/config/ConverterProperties.java
@@ -14,6 +14,7 @@ public class ConverterProperties {
private String outputBaseDir;
private String crs;
private String mode;
+ private BatchConfig batch = new BatchConfig(); // Spring Batch 설정
public String getInferenceId() {
return inferenceId;
@@ -62,4 +63,61 @@ public class ConverterProperties {
public String getMode() {
return mode;
}
+
+ public BatchConfig getBatch() {
+ return batch;
+ }
+
+ public void setBatch(BatchConfig batch) {
+ this.batch = batch;
+ }
+
+ /** Spring Batch 관련 설정 */
+ public static class BatchConfig {
+ private int chunkSize = 1000;
+ private int skipLimit = 100;
+ private int fetchSize = 1000;
+ private boolean enablePartitioning = false;
+ private int partitionConcurrency = 4; // Map ID별 병렬 처리 동시성
+
+ public int getChunkSize() {
+ return chunkSize;
+ }
+
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+
+ public int getSkipLimit() {
+ return skipLimit;
+ }
+
+ public void setSkipLimit(int skipLimit) {
+ this.skipLimit = skipLimit;
+ }
+
+ public int getFetchSize() {
+ return fetchSize;
+ }
+
+ public void setFetchSize(int fetchSize) {
+ this.fetchSize = fetchSize;
+ }
+
+ public boolean isEnablePartitioning() {
+ return enablePartitioning;
+ }
+
+ public void setEnablePartitioning(boolean enablePartitioning) {
+ this.enablePartitioning = enablePartitioning;
+ }
+
+ public int getPartitionConcurrency() {
+ return partitionConcurrency;
+ }
+
+ public void setPartitionConcurrency(int partitionConcurrency) {
+ this.partitionConcurrency = partitionConcurrency;
+ }
+ }
}
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/config/GeoServerProperties.java b/shp-exporter/src/main/java/com/kamco/makesample/config/GeoServerProperties.java
index 3a09a89..3337e4d 100755
--- a/shp-exporter/src/main/java/com/kamco/makesample/config/GeoServerProperties.java
+++ b/shp-exporter/src/main/java/com/kamco/makesample/config/GeoServerProperties.java
@@ -1,7 +1,6 @@
package com.kamco.makesample.config;
import jakarta.validation.constraints.NotBlank;
-import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
@@ -12,11 +11,9 @@ import org.springframework.validation.annotation.Validated;
public class GeoServerProperties {
@NotBlank(message = "GeoServer base URL must be configured")
- @Value("${layer.geoserver-url}")
private String baseUrl;
@NotBlank(message = "GeoServer workspace must be configured")
- @Value("${layer.workspace}")
private String workspace;
@NotBlank(message = "GeoServer datastore must be configured")
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/service/GeoServerRegistrationService.java b/shp-exporter/src/main/java/com/kamco/makesample/service/GeoServerRegistrationService.java
index c39100d..28be6f5 100644
--- a/shp-exporter/src/main/java/com/kamco/makesample/service/GeoServerRegistrationService.java
+++ b/shp-exporter/src/main/java/com/kamco/makesample/service/GeoServerRegistrationService.java
@@ -38,12 +38,21 @@ public class GeoServerRegistrationService {
this.properties = properties;
}
+ /**
+ * Register shapefile to GeoServer by uploading file content (REST API)
+ *
+ * LIMITATION: File size limited by HTTP request size (typically < 100MB) Use
+ * registerShapefileByPath() for larger files (500MB+)
+ *
+ * @param filePath Path to shapefile (.shp or .zip)
+ * @param layerName GeoServer layer name
+ */
public void uploadShapefileZip(String filePath, String layerName) {
String zipFilePath = filePath;
boolean tempZipCreated = false;
try {
- log.info("Starting shapefile upload to GeoServer");
+ log.info("Starting shapefile upload to GeoServer (REST API upload)");
log.info("Input file: {}", filePath);
log.info("Layer name: {}", layerName);
log.info("Workspace: {}", properties.getWorkspace());
@@ -59,6 +68,18 @@ public class GeoServerRegistrationService {
log.info("Temporary ZIP created: {}", zipFilePath);
}
+ // Check file size and warn if too large
+ Path path = Paths.get(zipFilePath);
+ long fileSize = Files.size(path);
+ log.info("ZIP file size: {} bytes ({} MB)", fileSize, fileSize / 1024 / 1024);
+
+ if (fileSize > 100 * 1024 * 1024) { // 100MB
+ log.warn(
+ "WARNING: File size ({} MB) may exceed HTTP upload limits. Consider using"
+ + " registerShapefileByPath() for files > 100MB",
+ fileSize / 1024 / 1024);
+ }
+
// Check if layer exists and handle overwrite
if (properties.isOverwriteExisting() && layerExists(layerName)) {
log.info("Layer '{}' already exists. Deleting...", layerName);
@@ -66,9 +87,7 @@ public class GeoServerRegistrationService {
}
// Read ZIP file
- Path path = Paths.get(zipFilePath);
byte[] zipData = Files.readAllBytes(path);
- log.info("ZIP file size: {} bytes", zipData.length);
// Upload to GeoServer
String url =
@@ -102,6 +121,24 @@ public class GeoServerRegistrationService {
"GeoServer upload failed. Status: {}, Response: {}",
e.getStatusCode(),
e.getResponseBodyAsString());
+
+ // Provide helpful message for 413 Payload Too Large
+ if (e.getStatusCode() == HttpStatus.PAYLOAD_TOO_LARGE) {
+ log.error("");
+ log.error("========================================");
+ log.error("ERROR: File size exceeds GeoServer upload limit (HTTP 413)");
+ log.error("");
+ log.error("Solution: Use file path reference method instead:");
+ log.error(" 1. Copy shapefile to GeoServer data directory");
+ log.error(" 2. Use registerShapefileByPath() method");
+ log.error("");
+ log.error("Or increase GeoServer upload limits:");
+ log.error(" - Tomcat: maxPostSize in server.xml");
+ log.error(" - Nginx: client_max_body_size");
+ log.error("========================================");
+ log.error("");
+ }
+
throw new RuntimeException("GeoServer upload failed", e);
} catch (Exception e) {
log.error("Unexpected error during shapefile upload", e);
@@ -119,6 +156,208 @@ public class GeoServerRegistrationService {
}
}
+ /**
+ * Register shapefile to GeoServer using file:// URL (for large files 500MB+)
+ *
+ *
This method does NOT upload file content. Instead, it tells GeoServer to read the file from
+ * its local file system.
+ *
+ *
Requirements: - GeoServer must have file system access to the shapefile path - The path must
+ * be absolute and accessible from GeoServer server
+ *
+ * @param absoluteFilePath Absolute file path to shapefile (.shp or .zip) on GeoServer server
+ * @param layerName GeoServer layer name
+ */
+ public void registerShapefileByPath(String absoluteFilePath, String layerName) {
+ String shpFilePath = null; // Declare outside try block for error handling
+ try {
+ log.info("Starting shapefile registration to GeoServer (file path reference)");
+ log.info("Input file path: {}", absoluteFilePath);
+ log.info("Layer name: {}", layerName);
+ log.info("Workspace: {}", properties.getWorkspace());
+
+ // Validate inputs
+ if (absoluteFilePath == null || absoluteFilePath.trim().isEmpty()) {
+ throw new IllegalArgumentException("File path cannot be empty");
+ }
+
+ if (layerName == null || layerName.trim().isEmpty()) {
+ throw new IllegalArgumentException("Layer name cannot be empty");
+ }
+
+ // Verify file exists
+ File file = new File(absoluteFilePath);
+ if (!file.exists()) {
+ throw new IllegalArgumentException("File does not exist: " + absoluteFilePath);
+ }
+
+ if (!file.isAbsolute()) {
+ throw new IllegalArgumentException("File path must be absolute: " + absoluteFilePath);
+ }
+
+ String lowerPath = absoluteFilePath.toLowerCase();
+ if (!lowerPath.endsWith(".zip") && !lowerPath.endsWith(".shp")) {
+ throw new IllegalArgumentException("File must be a .zip or .shp file: " + absoluteFilePath);
+ }
+
+ log.info("File size: {} MB", file.length() / 1024 / 1024);
+
+ // Convert .zip path to .shp path if needed
+ shpFilePath = absoluteFilePath;
+ if (lowerPath.endsWith(".zip")) {
+ shpFilePath = absoluteFilePath.substring(0, absoluteFilePath.length() - 4) + ".shp";
+ File shpFile = new File(shpFilePath);
+ if (!shpFile.exists()) {
+ throw new IllegalArgumentException(
+ "Shapefile not found. Expected: "
+ + shpFilePath
+ + " (ZIP file was provided, but .shp file must exist in same directory)");
+ }
+ log.info("Converted ZIP path to SHP path: {}", shpFilePath);
+ }
+
+ // Verify all shapefile components exist and are readable
+ File shpFile = new File(shpFilePath);
+ String basePathWithoutExt = shpFilePath.substring(0, shpFilePath.length() - 4);
+ String[] requiredExtensions = {".shp", ".shx", ".dbf"};
+ String[] optionalExtensions = {".prj", ".cpg", ".qpj"};
+
+ log.info("=== Shapefile Component Verification ===");
+ for (String ext : requiredExtensions) {
+ File component = new File(basePathWithoutExt + ext);
+ if (!component.exists()) {
+ throw new IllegalArgumentException(
+ "Required shapefile component not found: " + component.getAbsolutePath());
+ }
+ if (!component.canRead()) {
+ throw new IllegalArgumentException(
+ "Cannot read shapefile component (permission denied): "
+ + component.getAbsolutePath());
+ }
+ log.info(" ✓ {} exists ({} bytes, readable: {})", ext, component.length(), true);
+ }
+
+ for (String ext : optionalExtensions) {
+ File component = new File(basePathWithoutExt + ext);
+ if (component.exists()) {
+ log.info(
+ " ✓ {} exists ({} bytes, readable: {})",
+ ext,
+ component.length(),
+ component.canRead());
+ } else {
+ log.info(" ⚠ {} not found (optional)", ext);
+ }
+ }
+ log.info("=== End Verification ===");
+
+ // Check if layer exists and handle overwrite
+ if (properties.isOverwriteExisting() && layerExists(layerName)) {
+ log.info("Layer '{}' already exists. Deleting...", layerName);
+ deleteLayer(layerName);
+ }
+
+ // Construct file:// URL (must point to .shp file, not .zip)
+ // String fileUrl = "file://" + shpFilePath;
+ String fileUrl = shpFilePath;
+ log.info("Using file URL: {}", fileUrl);
+
+ // GeoServer REST API endpoint for external file reference
+ // Use external.shp (not file.shp) for file:// URLs
+ String url =
+ String.format(
+ "%s/rest/workspaces/%s/datastores/%s/external.shp?configure=all",
+ properties.getBaseUrl(), properties.getWorkspace(), layerName);
+
+ HttpHeaders headers = createHeaders();
+ headers.setContentType(MediaType.TEXT_PLAIN);
+
+ // Send file:// URL as request body
+ HttpEntity request = new HttpEntity<>(fileUrl, headers);
+
+ log.info("Registering shapefile to GeoServer: {}", url);
+ ResponseEntity response =
+ restTemplate.exchange(url, HttpMethod.PUT, request, String.class);
+
+ if (response.getStatusCode() == HttpStatus.CREATED
+ || response.getStatusCode() == HttpStatus.OK) {
+ log.info("Shapefile registered successfully to GeoServer");
+ log.info(
+ "Layer '{}' is now available in workspace '{}'", layerName, properties.getWorkspace());
+ log.info("GeoServer will read data from: {}", shpFilePath);
+ } else {
+ log.warn("Unexpected response status: {}", response.getStatusCode());
+ }
+
+ } catch (HttpClientErrorException e) {
+ log.error(
+ "GeoServer registration failed. Status: {}, Response: {}",
+ e.getStatusCode(),
+ e.getResponseBodyAsString());
+
+ if (e.getStatusCode() == HttpStatus.BAD_REQUEST) {
+ log.error("");
+ log.error("========================================");
+ log.error("ERROR: GeoServer returned 400 Bad Request - Cannot locate file");
+ log.error("");
+ if (shpFilePath != null) {
+ log.error("File URL sent to GeoServer: file://{}", shpFilePath);
+ } else {
+ log.error("File URL: (path not yet determined)");
+ }
+ log.error("");
+ log.error("Possible causes:");
+ log.error(" 1. PATH MISMATCH: GeoServer sees a different path than application server");
+ if (shpFilePath != null) {
+ log.error(" - Application path: {}", shpFilePath);
+ } else {
+ log.error(" - Application path: {}", absoluteFilePath);
+ }
+ log.error(" - GeoServer may see: different mount point or drive letter");
+ log.error("");
+ log.error(" 2. GEOSERVER SECURITY: External file access may be disabled");
+ log.error(" - Check GeoServer global settings for 'Enable external entities'");
+ log.error(" - Check data security settings");
+ log.error("");
+ log.error(" 3. FILE ACCESS: GeoServer process cannot read the file");
+ log.error(" - Check file permissions from GeoServer server perspective");
+ log.error(" - GeoServer user (tomcat/geoserver) needs read access");
+ log.error("");
+ log.error(" 4. MISSING COMPONENTS: Required shapefile files missing");
+ log.error(" - All files (.shp, .shx, .dbf, .prj) must exist");
+ log.error("");
+ log.error("Verification steps:");
+ log.error(" 1. SSH to GeoServer server");
+ if (shpFilePath != null) {
+ log.error(" 2. Run: ls -la {}", shpFilePath);
+ } else {
+ log.error(" 2. Run: ls -la ");
+ }
+ log.error(" 3. Verify path matches and files are readable by GeoServer user");
+ log.error(" 4. Check GeoServer logs for more details");
+ log.error("========================================");
+ log.error("");
+ } else if (e.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
+ log.error("");
+ log.error("========================================");
+ log.error("ERROR: GeoServer internal error (500)");
+ log.error("");
+ log.error("This usually indicates a shapefile format issue or corruption.");
+ log.error("Check GeoServer logs for detailed error information.");
+ log.error("========================================");
+ log.error("");
+ }
+
+ throw new RuntimeException("GeoServer registration failed", e);
+ } catch (IllegalArgumentException e) {
+ log.error("Invalid file path: {}", e.getMessage());
+ throw e;
+ } catch (Exception e) {
+ log.error("Unexpected error during shapefile registration", e);
+ throw new RuntimeException("Shapefile registration failed", e);
+ }
+ }
+
private void validateInputs(String filePath, String layerName) {
if (filePath == null || filePath.trim().isEmpty()) {
throw new IllegalArgumentException("File path cannot be empty");
diff --git a/shp-exporter/src/main/java/com/kamco/makesample/service/GeometryConverter.java b/shp-exporter/src/main/java/com/kamco/makesample/service/GeometryConverter.java
index 3e4d0b6..e74b719 100755
--- a/shp-exporter/src/main/java/com/kamco/makesample/service/GeometryConverter.java
+++ b/shp-exporter/src/main/java/com/kamco/makesample/service/GeometryConverter.java
@@ -2,12 +2,25 @@ package com.kamco.makesample.service;
import com.kamco.makesample.exception.GeometryConversionException;
import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.MultiPolygon;
+import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
+/**
+ * WKT ↔ JTS Geometry 변환 서비스
+ *
+ * 주요 기능:
+ *
+ *
+ * - PostGIS WKT 문자열을 JTS Geometry 객체로 변환
+ *
- MultiPolygon을 자동으로 Polygon으로 변환 (첫 번째 polygon 추출)
+ *
- Geometry 유효성 검증
+ *
+ */
@Component
public class GeometryConverter {
@@ -15,27 +28,107 @@ public class GeometryConverter {
private final WKTReader wktReader;
+ // MultiPolygon → Polygon 변환 통계
+ private static int multiPolygonConversionCount = 0;
+
public GeometryConverter() {
this.wktReader = new WKTReader();
}
+ /**
+ * WKT 문자열을 JTS Geometry로 변환
+ *
+ * 변환 규칙:
+ *
+ *
+ * - MultiPolygon → 첫 번째 Polygon만 추출 (자동 변환)
+ *
- Polygon → 그대로 사용
+ *
- 기타 타입 → 그대로 사용
+ *
+ *
+ * @param wkt PostGIS ST_AsText() 결과 (WKT 형식)
+ * @return JTS Geometry 객체 (MultiPolygon은 Polygon으로 변환됨)
+ */
public Geometry convertWKTToJTS(String wkt) {
if (wkt == null || wkt.trim().isEmpty()) {
return null;
}
try {
- // WKT 문자열을 JTS Geometry로 변환
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 1. WKT 문자열을 JTS Geometry로 변환
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Geometry jtsGeometry = wktReader.read(wkt);
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 2. MultiPolygon → Polygon 자동 변환
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // Shapefile은 단일 타입만 허용하므로 MultiPolygon을 Polygon으로 통일
+ // 첫 번째 polygon만 추출 (나머지는 버림)
+ if (jtsGeometry instanceof MultiPolygon) {
+ MultiPolygon multiPolygon = (MultiPolygon) jtsGeometry;
+
+ if (multiPolygon.getNumGeometries() > 0) {
+ // 첫 번째 polygon 추출
+ Polygon firstPolygon = (Polygon) multiPolygon.getGeometryN(0);
+
+ // 통계 및 로깅 (첫 10건만 로그 출력)
+ multiPolygonConversionCount++;
+ if (multiPolygonConversionCount <= 10) {
+ log.debug(
+ "Converting MultiPolygon to Polygon (first geometry only). "
+ + "MultiPolygon had {} geometries. Conversion count: {}",
+ multiPolygon.getNumGeometries(),
+ multiPolygonConversionCount);
+ } else if (multiPolygonConversionCount == 11) {
+ log.debug("MultiPolygon → Polygon conversion ongoing... (suppressing further logs)");
+ }
+
+ // 여러 polygon을 포함한 경우 경고
+ if (multiPolygon.getNumGeometries() > 1) {
+ log.warn(
+ "MultiPolygon contains {} polygons. Only the first polygon will be used. "
+ + "Other {} polygon(s) will be discarded.",
+ multiPolygon.getNumGeometries(),
+ multiPolygon.getNumGeometries() - 1);
+ }
+
+ jtsGeometry = firstPolygon;
+ } else {
+ log.warn("Empty MultiPolygon detected (0 geometries). Returning null.");
+ return null;
+ }
+ }
+
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
+ // 3. Geometry 유효성 검증
+ // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
if (!jtsGeometry.isValid()) {
- log.warn("Invalid geometry detected: {}", jtsGeometry);
+ log.warn(
+ "Invalid geometry detected: type={}, reason={}",
+ jtsGeometry.getGeometryType(),
+ jtsGeometry.isValid() ? "valid" : "invalid");
}
return jtsGeometry;
+
} catch (ParseException e) {
throw new GeometryConversionException(
"Failed to convert WKT to JTS geometry: " + e.getMessage(), e);
}
}
+
+ /**
+ * MultiPolygon → Polygon 변환 통계 반환
+ *
+ * @return 변환된 MultiPolygon 개수
+ */
+ public static int getMultiPolygonConversionCount() {
+ return multiPolygonConversionCount;
+ }
+
+ /** 변환 통계 리셋 (테스트용) */
+ public static void resetConversionCount() {
+ multiPolygonConversionCount = 0;
+ }
}
diff --git a/shp-exporter/src/main/resources/application-dev.yml b/shp-exporter/src/main/resources/application-dev.yml
index 6c92ce3..38e2e87 100755
--- a/shp-exporter/src/main/resources/application-dev.yml
+++ b/shp-exporter/src/main/resources/application-dev.yml
@@ -5,16 +5,17 @@ spring:
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
hikari:
- maximum-pool-size: 5
+ maximum-pool-size: 20 # Batch 처리를 위해 증가
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
- application:
- name: make-shapefile-service
-
- main:
- web-application-type: none # Disable web server for CLI application
+ batch:
+ job:
+ enabled: false # CLI에서 명시적으로 실행
+ jdbc:
+ initialize-schema: always # 메타데이터 테이블 자동 생성
+ table-prefix: BATCH_
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
@@ -24,9 +25,15 @@ converter:
- 252
- 253
- 257
- output-base-dir: '/kamco-nfs/model_output/export/'
+ output-base-dir: '/data/model_output/export/'
crs: 'EPSG:5186'
+ batch:
+ chunk-size: 5000 # 청크 크기 증가 (1000 → 5000, 성능 5배 향상)
+ skip-limit: 100 # 청크당 skip 허용 건수
+ fetch-size: 5000 # JDBC 커서 fetch 크기 (chunk-size와 동일하게)
+ enable-partitioning: false
+
geoserver:
base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver'
workspace: 'cd'
@@ -45,7 +52,3 @@ logging:
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
-
-layer:
- geoserver-url: http://label-tile.gs.dabeeo.com
- workspace: cd
diff --git a/shp-exporter/src/main/resources/application-local.yml b/shp-exporter/src/main/resources/application-local.yml
index 196f450..b3cb75d 100755
--- a/shp-exporter/src/main/resources/application-local.yml
+++ b/shp-exporter/src/main/resources/application-local.yml
@@ -5,17 +5,11 @@ spring:
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
hikari:
- maximum-pool-size: 5
+ maximum-pool-size: 10
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
- application:
- name: make-shapefile-service
-
- main:
- web-application-type: none # Disable web server for CLI application
-
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
# Optional: omit or set empty to create merged shapefile for all batch-ids
@@ -24,8 +18,7 @@ converter:
- 252
- 253
- 257
- output-base-dir: '/kamco-nfs/model_output/export/'
- #output-base-dir: '/Users/bokmin/export/'
+ output-base-dir: '/Users/d-pn-0105/dev/kamco-cd/kamco-cd-cron/shp-exporter/export/'
crs: 'EPSG:5186'
geoserver:
@@ -46,7 +39,3 @@ logging:
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
-
-layer:
- geoserver-url: http://label-tile.gs.dabeeo.com
- workspace: cd
diff --git a/shp-exporter/src/main/resources/application-prod.yml b/shp-exporter/src/main/resources/application-prod.yml
index 3e1665d..722e987 100755
--- a/shp-exporter/src/main/resources/application-prod.yml
+++ b/shp-exporter/src/main/resources/application-prod.yml
@@ -5,16 +5,17 @@ spring:
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
hikari:
- maximum-pool-size: 5
+ maximum-pool-size: 20 # Increased for batch processing
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
- application:
- name: make-shapefile-service
-
- main:
- web-application-type: none # Disable web server for CLI application
+ batch:
+ job:
+ enabled: false # CLI에서 명시적으로 실행
+ jdbc:
+ initialize-schema: always # 메타데이터 테이블 자동 생성
+ table-prefix: BATCH_
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
@@ -27,15 +28,19 @@ converter:
output-base-dir: '/data/model_output/export/'
crs: 'EPSG:5186'
+ batch:
+ chunk-size: 5000 # 청크 크기 (1000→5000, 성능 5배 향상, 메모리 ~200MB per chunk)
+ skip-limit: 100 # 청크당 skip 허용 건수
+ fetch-size: 5000 # JDBC 커서 fetch 크기 (chunk-size와 동일하게)
+ enable-partitioning: false # 초기에는 비활성화
+ partition-concurrency: 4 # Map ID별 병렬 처리 동시성 (4=~300MB, 8=~600MB)
+
geoserver:
- base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver'
+ base-url: 'https://aicd-geo.e-kamco.com:18080/geoserver'
workspace: 'cd'
overwrite-existing: true
connection-timeout: 30000
read-timeout: 60000
- # Credentials (optional - environment variables take precedence)
- # Uncomment and set values for development convenience
- # For production, use GEOSERVER_USERNAME and GEOSERVER_PASSWORD environment variables
username: 'admin'
password: 'geoserver'
@@ -45,9 +50,3 @@ logging:
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
-
-layer:
- geoserver-url: https://kamco.geo-dev.gs.dabeeo.com
- wms-path: geoserver/cd
- wmts-path: geoserver/cd/gwc/service
- workspace: cd
diff --git a/shp-exporter/src/main/resources/application.yml b/shp-exporter/src/main/resources/application.yml
index 65d113d..d35f2e3 100755
--- a/shp-exporter/src/main/resources/application.yml
+++ b/shp-exporter/src/main/resources/application.yml
@@ -3,3 +3,5 @@ spring:
name: make-shapefile-service
profiles:
active: prod
+ main:
+ web-application-type: none # Disable web server for CLI application
diff --git a/shp-exporter/src/main/resources/db/migration/V1__create_batch_execution_history.sql b/shp-exporter/src/main/resources/db/migration/V1__create_batch_execution_history.sql
new file mode 100644
index 0000000..a261d59
--- /dev/null
+++ b/shp-exporter/src/main/resources/db/migration/V1__create_batch_execution_history.sql
@@ -0,0 +1,69 @@
+-- 배치 실행 이력 테이블
+-- 각 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유 추적
+
+CREATE TABLE IF NOT EXISTS batch_execution_history (
+ id BIGSERIAL PRIMARY KEY,
+
+ -- Spring Batch 메타데이터 참조
+ job_execution_id BIGINT NOT NULL,
+ step_execution_id BIGINT,
+
+ -- 스텝 정보
+ step_name VARCHAR(100) NOT NULL,
+ step_type VARCHAR(50), -- 'TASKLET' or 'CHUNK'
+
+ -- 시간 정보
+ start_time TIMESTAMP NOT NULL,
+ end_time TIMESTAMP,
+ duration_ms BIGINT, -- 소요 시간 (밀리초)
+
+ -- 실행 결과
+ status VARCHAR(20) NOT NULL, -- 'STARTED', 'COMPLETED', 'FAILED'
+ exit_code VARCHAR(20), -- 'COMPLETED', 'FAILED', 'UNKNOWN'
+ exit_message TEXT,
+
+ -- 에러 정보
+ error_message TEXT,
+ error_stack_trace TEXT,
+
+ -- 처리 통계 (chunk 기반 스텝용)
+ read_count BIGINT DEFAULT 0,
+ write_count BIGINT DEFAULT 0,
+ commit_count BIGINT DEFAULT 0,
+ rollback_count BIGINT DEFAULT 0,
+ skip_count BIGINT DEFAULT 0,
+
+ -- 배치 파라미터 정보
+ batch_ids TEXT,
+ inference_id VARCHAR(100),
+
+ -- 메타데이터
+ created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+);
+
+-- 인덱스 생성
+CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_job_exec_id
+ON batch_execution_history(job_execution_id);
+
+CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_step_exec_id
+ON batch_execution_history(step_execution_id);
+
+CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_step_name
+ON batch_execution_history(step_name);
+
+CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_status
+ON batch_execution_history(status);
+
+CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_start_time
+ON batch_execution_history(start_time DESC);
+
+-- 코멘트 추가
+COMMENT ON TABLE batch_execution_history IS '배치 실행 이력 추적 테이블 - 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유 기록';
+COMMENT ON COLUMN batch_execution_history.job_execution_id IS 'Spring Batch Job Execution ID';
+COMMENT ON COLUMN batch_execution_history.step_execution_id IS 'Spring Batch Step Execution ID';
+COMMENT ON COLUMN batch_execution_history.step_name IS '스텝 이름 (validateGeometryType, generateShapefile 등)';
+COMMENT ON COLUMN batch_execution_history.duration_ms IS '스텝 소요 시간 (밀리초)';
+COMMENT ON COLUMN batch_execution_history.status IS '실행 상태 (STARTED, COMPLETED, FAILED)';
+COMMENT ON COLUMN batch_execution_history.error_message IS '에러 발생 시 에러 메시지';
+COMMENT ON COLUMN batch_execution_history.error_stack_trace IS '에러 발생 시 전체 스택 트레이스';