스케줄러로 변경

This commit is contained in:
2026-03-08 21:33:41 +09:00
parent a6bb589189
commit b156b61caf
49 changed files with 3572 additions and 126 deletions

View File

@@ -0,0 +1,72 @@
package com.kamco.makesample.batch.config;
import com.kamco.makesample.config.ConverterProperties;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* Spring Batch 기본 설정
*
* <p>Spring Boot Auto-configuration이 다음을 자동으로 설정: - JobRepository (BATCH_* 테이블 사용) - JobLauncher -
* PlatformTransactionManager
*
* <p>메타데이터 테이블: - BATCH_JOB_INSTANCE - BATCH_JOB_EXECUTION - BATCH_JOB_EXECUTION_PARAMS -
* BATCH_STEP_EXECUTION - BATCH_STEP_EXECUTION_CONTEXT - BATCH_JOB_EXECUTION_CONTEXT
*/
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
private final ConverterProperties properties;
public BatchConfiguration(ConverterProperties properties) {
this.properties = properties;
}
/**
* Spring Boot Auto-configuration이 자동으로 설정하는 내용:
*
* <p>1. JobRepository: DataSource를 사용하여 BATCH_* 테이블에 메타데이터 저장
*
* <p>2. JobLauncher: CLI에서 Job 실행을 위한 런처
*
* <p>3. TransactionManager: DataSource에 대한 트랜잭션 관리
*
* <p>추가 설정이 필요하면 여기에 Bean을 정의
*/
/**
* Map ID별 파일 생성을 위한 TaskExecutor
*
* <p>병렬 처리를 통해 각 map_id별 shapefile/geojson을 동시에 생성합니다.
*
* <p>설정:
*
* <ul>
* <li>corePoolSize: partition-concurrency 설정값 (기본 4)
* <li>maxPoolSize: corePoolSize * 2 (최대 확장 가능)
* <li>queueCapacity: 50 (대기 큐 크기)
* <li>threadNamePrefix: mapid-worker- (로그 추적용)
* </ul>
*
* @return TaskExecutor for partitioned step
*/
@Bean
public TaskExecutor mapIdTaskExecutor() {
int concurrency = properties.getBatch().getPartitionConcurrency();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(concurrency);
executor.setMaxPoolSize(concurrency * 2);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("mapid-worker-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}

View File

@@ -0,0 +1,289 @@
package com.kamco.makesample.batch.config;
import com.kamco.makesample.batch.listener.BatchExecutionHistoryListener;
import com.kamco.makesample.batch.partitioner.MapIdPartitioner;
import com.kamco.makesample.batch.processor.FeatureConversionProcessor;
import com.kamco.makesample.batch.tasklet.CreateZipTasklet;
import com.kamco.makesample.batch.tasklet.GeoServerRegistrationTasklet;
import com.kamco.makesample.batch.tasklet.GeometryTypeValidationTasklet;
import com.kamco.makesample.batch.writer.MapIdGeoJsonWriter;
import com.kamco.makesample.batch.writer.MapIdShapefileWriter;
import com.kamco.makesample.batch.writer.StreamingGeoJsonWriter;
import com.kamco.makesample.batch.writer.StreamingShapefileWriter;
import com.kamco.makesample.model.InferenceResult;
import java.util.Arrays;
import org.geotools.api.feature.simple.SimpleFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
/**
* MERGED 모드 Job 설정
*
* <p>전체 batch_ids에 대한 데이터를 하나의 shapefile과 GeoJSON으로 생성하고, merge 폴더의 shapefile만 GeoServer에 등록
*
* <p>Job Flow:
*
* <ol>
* <li>validateGeometryTypeStep: Geometry type 사전 검증 (Tasklet)
* <li>generateShapefileStep: Shapefile 생성 (Chunk-oriented)
* <li>generateGeoJsonStep: GeoJSON 생성 (Chunk-oriented)
* <li>createZipStep: ZIP 파일 생성 (Tasklet)
* <li>registerToGeoServerStep: GeoServer 등록 - merge 폴더의 shapefile만 (Tasklet, conditional)
* <li>generateMapIdFilesStep: Map ID별 개별 shapefile/geojson 생성 (Partitioned, parallel)
* </ol>
*/
@Configuration
public class MergedModeJobConfig {
private static final Logger log = LoggerFactory.getLogger(MergedModeJobConfig.class);
/**
* MERGED 모드 Job 정의
*
* @param jobRepository JobRepository
* @param validateGeometryTypeStep Geometry type 검증 Step
* @param generateShapefileStep Shapefile 생성 Step
* @param generateGeoJsonStep GeoJSON 생성 Step
* @param createZipStep ZIP 생성 Step
* @param registerToGeoServerStep GeoServer 등록 Step (merge 폴더의 shapefile만)
* @param generateMapIdFilesStep Map ID별 파일 생성 Step (병렬 처리)
* @return Job
*/
@Bean
public Job mergedModeJob(
JobRepository jobRepository,
Step validateGeometryTypeStep,
Step generateShapefileStep,
Step generateGeoJsonStep,
Step createZipStep,
Step registerToGeoServerStep,
Step generateMapIdFilesStep) {
return new JobBuilder("mergedModeJob", jobRepository)
.start(validateGeometryTypeStep)
.next(generateShapefileStep)
.next(generateGeoJsonStep)
.next(createZipStep)
.next(registerToGeoServerStep) // Conditional execution
.next(generateMapIdFilesStep) // Map ID별 개별 파일 생성
.build();
}
/**
* Step 1: Geometry Type 검증
*
* <p>Shapefile은 homogeneous geometry type을 요구하므로 사전 검증
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param validationTasklet GeometryTypeValidationTasklet
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step validateGeometryTypeStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
GeometryTypeValidationTasklet validationTasklet,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("validateGeometryTypeStep", jobRepository)
.tasklet(validationTasklet, transactionManager)
.listener(historyListener)
.build();
}
/**
* Step 2: Shapefile 생성 (Chunk-oriented)
*
* <p>메모리 최적화:
*
* <ul>
* <li>Reader: JdbcCursorItemReader (스트리밍)
* <li>Processor: InferenceResult → SimpleFeature 변환
* <li>Writer: StreamingShapefileWriter (chunk 단위 쓰기)
* </ul>
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param shapefileReader ItemReader (Shapefile용)
* @param featureConversionProcessor ItemProcessor
* @param shapefileWriter ItemWriter
* @param chunkSize Chunk size (default: 1000)
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step generateShapefileStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JdbcCursorItemReader<InferenceResult> shapefileReader,
FeatureConversionProcessor featureConversionProcessor,
StreamingShapefileWriter shapefileWriter,
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("generateShapefileStep", jobRepository)
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
.reader(shapefileReader)
.processor(featureConversionProcessor)
.writer(shapefileWriter)
.listener(historyListener)
.build();
}
/**
* Step 3: GeoJSON 생성 (Chunk-oriented)
*
* <p>Shapefile과 동일한 데이터를 GeoJSON 형식으로 출력
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param geoJsonReader ItemReader (GeoJSON용 - 별도 인스턴스)
* @param featureConversionProcessor ItemProcessor (재사용)
* @param geoJsonWriter ItemWriter
* @param chunkSize Chunk size
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step generateGeoJsonStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JdbcCursorItemReader<InferenceResult> geoJsonReader,
FeatureConversionProcessor featureConversionProcessor,
StreamingGeoJsonWriter geoJsonWriter,
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("generateGeoJsonStep", jobRepository)
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
.reader(geoJsonReader)
.processor(featureConversionProcessor)
.writer(geoJsonWriter)
.listener(historyListener)
.build();
}
/**
* Step 4: ZIP 파일 생성
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param createZipTasklet CreateZipTasklet
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step createZipStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
CreateZipTasklet createZipTasklet,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("createZipStep", jobRepository)
.tasklet(createZipTasklet, transactionManager)
.listener(historyListener)
.build();
}
/**
* Step 5: GeoServer 등록 (merge 폴더의 shapefile만)
*
* <p>Conditional execution: geoserver.enabled=true일 때만 실행
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param registrationTasklet GeoServerRegistrationTasklet
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step registerToGeoServerStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
GeoServerRegistrationTasklet registrationTasklet,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("registerToGeoServerStep", jobRepository)
.tasklet(registrationTasklet, transactionManager)
.listener(historyListener)
.build();
}
/**
* Step 6: Map ID별 개별 파일 생성 (Partitioned Step)
*
* <p>각 map_id마다 개별 shapefile과 geojson 파일을 병렬로 생성합니다.
*
* @param jobRepository JobRepository
* @param partitioner MapIdPartitioner
* @param mapIdWorkerStep Worker Step (각 파티션에서 실행)
* @param mapIdTaskExecutor TaskExecutor for parallel execution
* @return Partitioned Step
*/
@Bean
public Step generateMapIdFilesStep(
JobRepository jobRepository,
MapIdPartitioner partitioner,
Step mapIdWorkerStep,
TaskExecutor mapIdTaskExecutor) {
return new StepBuilder("generateMapIdFilesStep", jobRepository)
.partitioner("mapIdWorker", partitioner)
.step(mapIdWorkerStep)
.taskExecutor(mapIdTaskExecutor)
.build();
}
/**
* Worker Step: Map ID별 파일 생성 작업
*
* <p>각 파티션에서 실행되며, 해당 map_id의 데이터를 읽어 shapefile과 geojson을 동시에 생성합니다.
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param mapIdModeReader ItemReader (map_id별)
* @param featureConversionProcessor ItemProcessor
* @param mapIdShapefileWriter Shapefile Writer
* @param mapIdGeoJsonWriter GeoJSON Writer
* @param chunkSize Chunk size
* @param historyListener BatchExecutionHistoryListener
* @return Worker Step
*/
@Bean
public Step mapIdWorkerStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JdbcCursorItemReader<InferenceResult> mapIdModeReader,
FeatureConversionProcessor featureConversionProcessor,
MapIdShapefileWriter mapIdShapefileWriter,
MapIdGeoJsonWriter mapIdGeoJsonWriter,
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
BatchExecutionHistoryListener historyListener) {
// CompositeItemWriter로 shapefile과 geojson 동시 생성
CompositeItemWriter<SimpleFeature> compositeWriter = new CompositeItemWriter<>();
compositeWriter.setDelegates(Arrays.asList(mapIdShapefileWriter, mapIdGeoJsonWriter));
return new StepBuilder("mapIdWorkerStep", jobRepository)
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
.reader(mapIdModeReader)
.processor(featureConversionProcessor)
.writer(compositeWriter)
.listener(historyListener)
.build();
}
}

View File

@@ -0,0 +1,183 @@
package com.kamco.makesample.batch.listener;
import com.kamco.makesample.batch.model.BatchExecutionHistory;
import com.kamco.makesample.batch.repository.BatchExecutionHistoryRepository;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.LocalDateTime;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;
/**
* 배치 실행 이력 추적 Listener
*
* <p>각 스텝의 시작/종료 시점에 실행 이력을 데이터베이스에 저장
*
* <p>기록 항목:
*
* <ul>
* <li>시작 시간, 종료 시간, 소요 시간
* <li>성공/실패 상태
* <li>에러 발생 시 에러 메시지 및 스택 트레이스
* <li>처리 통계 (read/write/commit/rollback/skip count)
* </ul>
*/
@Component
public class BatchExecutionHistoryListener implements StepExecutionListener {
private static final Logger log = LoggerFactory.getLogger(BatchExecutionHistoryListener.class);
private final BatchExecutionHistoryRepository historyRepository;
// ThreadLocal로 각 스텝별 이력 ID 저장
private final ThreadLocal<Long> historyIdHolder = new ThreadLocal<>();
public BatchExecutionHistoryListener(BatchExecutionHistoryRepository historyRepository) {
this.historyRepository = historyRepository;
}
@Override
public void beforeStep(StepExecution stepExecution) {
try {
// 배치 실행 이력 생성
BatchExecutionHistory history = new BatchExecutionHistory();
history.setJobExecutionId(stepExecution.getJobExecutionId());
history.setStepExecutionId(stepExecution.getId());
history.setStepName(stepExecution.getStepName());
history.setStartTime(LocalDateTime.now());
history.setStatus("STARTED");
// Job Parameters에서 batch_ids, inference_id 추출
String batchIds = stepExecution.getJobParameters().getString("batchIds");
String inferenceId = stepExecution.getJobParameters().getString("inferenceId");
history.setBatchIds(batchIds);
history.setInferenceId(inferenceId);
// Step 타입 추정 (Tasklet vs Chunk)
String stepType = estimateStepType(stepExecution.getStepName());
history.setStepType(stepType);
// 이력 저장
Long historyId = historyRepository.insert(history);
historyIdHolder.set(historyId);
log.debug(
"Step execution history created: id={}, step={}, jobExecutionId={}",
historyId,
stepExecution.getStepName(),
stepExecution.getJobExecutionId());
} catch (Exception e) {
log.error("Failed to save step execution history on start: {}", e.getMessage(), e);
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
try {
Long historyId = historyIdHolder.get();
if (historyId == null) {
log.warn("No history ID found for step: {}", stepExecution.getStepName());
return stepExecution.getExitStatus();
}
// 종료 시간 및 상태
LocalDateTime endTime = LocalDateTime.now();
LocalDateTime startTime =
stepExecution.getStartTime() != null ? stepExecution.getStartTime() : LocalDateTime.now();
String status = stepExecution.getStatus().toString();
String exitCode = stepExecution.getExitStatus().getExitCode();
String exitMessage = stepExecution.getExitStatus().getExitDescription();
// 에러 정보 추출
String errorMessage = null;
String errorStackTrace = null;
List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
if (!failureExceptions.isEmpty()) {
Throwable firstException = failureExceptions.get(0);
errorMessage = firstException.getMessage();
errorStackTrace = getStackTrace(firstException);
}
// 처리 통계 (Chunk 기반 스텝용)
Long readCount = (long) stepExecution.getReadCount();
Long writeCount = (long) stepExecution.getWriteCount();
Long commitCount = (long) stepExecution.getCommitCount();
Long rollbackCount = (long) stepExecution.getRollbackCount();
Long skipCount = (long) stepExecution.getSkipCount();
// 이력 업데이트
historyRepository.updateOnCompletion(
historyId,
endTime,
startTime,
status,
exitCode,
exitMessage,
errorMessage,
errorStackTrace,
readCount,
writeCount,
commitCount,
rollbackCount,
skipCount);
log.debug(
"Step execution history updated: id={}, step={}, status={}, duration={}ms",
historyId,
stepExecution.getStepName(),
status,
java.time.Duration.between(startTime, endTime).toMillis());
// ThreadLocal 정리
historyIdHolder.remove();
} catch (Exception e) {
log.error("Failed to update step execution history on completion: {}", e.getMessage(), e);
}
return stepExecution.getExitStatus();
}
/**
* Step 이름으로 Step 타입 추정
*
* @param stepName Step 이름
* @return TASKLET 또는 CHUNK
*/
private String estimateStepType(String stepName) {
// Tasklet 스텝들
if (stepName.contains("validate")
|| stepName.contains("Zip")
|| stepName.contains("GeoServer")) {
return "TASKLET";
}
// Chunk 스텝들
if (stepName.contains("generate")
|| stepName.contains("Shapefile")
|| stepName.contains("GeoJson")) {
return "CHUNK";
}
return "UNKNOWN";
}
/**
* Exception을 스택 트레이스 문자열로 변환
*
* @param throwable Exception
* @return 스택 트레이스 문자열
*/
private String getStackTrace(Throwable throwable) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
throwable.printStackTrace(pw);
return sw.toString();
}
}

View File

@@ -0,0 +1,212 @@
package com.kamco.makesample.batch.model;
import java.time.LocalDateTime;
/**
* 배치 실행 이력 엔티티
*
* <p>각 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유를 추적
*/
public class BatchExecutionHistory {
private Long id;
private Long jobExecutionId;
private Long stepExecutionId;
private String stepName;
private String stepType;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Long durationMs;
private String status;
private String exitCode;
private String exitMessage;
private String errorMessage;
private String errorStackTrace;
private Long readCount;
private Long writeCount;
private Long commitCount;
private Long rollbackCount;
private Long skipCount;
private String batchIds;
private String inferenceId;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// Getters and Setters
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getJobExecutionId() {
return jobExecutionId;
}
public void setJobExecutionId(Long jobExecutionId) {
this.jobExecutionId = jobExecutionId;
}
public Long getStepExecutionId() {
return stepExecutionId;
}
public void setStepExecutionId(Long stepExecutionId) {
this.stepExecutionId = stepExecutionId;
}
public String getStepName() {
return stepName;
}
public void setStepName(String stepName) {
this.stepName = stepName;
}
public String getStepType() {
return stepType;
}
public void setStepType(String stepType) {
this.stepType = stepType;
}
public LocalDateTime getStartTime() {
return startTime;
}
public void setStartTime(LocalDateTime startTime) {
this.startTime = startTime;
}
public LocalDateTime getEndTime() {
return endTime;
}
public void setEndTime(LocalDateTime endTime) {
this.endTime = endTime;
}
public Long getDurationMs() {
return durationMs;
}
public void setDurationMs(Long durationMs) {
this.durationMs = durationMs;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getExitCode() {
return exitCode;
}
public void setExitCode(String exitCode) {
this.exitCode = exitCode;
}
public String getExitMessage() {
return exitMessage;
}
public void setExitMessage(String exitMessage) {
this.exitMessage = exitMessage;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public String getErrorStackTrace() {
return errorStackTrace;
}
public void setErrorStackTrace(String errorStackTrace) {
this.errorStackTrace = errorStackTrace;
}
public Long getReadCount() {
return readCount;
}
public void setReadCount(Long readCount) {
this.readCount = readCount;
}
public Long getWriteCount() {
return writeCount;
}
public void setWriteCount(Long writeCount) {
this.writeCount = writeCount;
}
public Long getCommitCount() {
return commitCount;
}
public void setCommitCount(Long commitCount) {
this.commitCount = commitCount;
}
public Long getRollbackCount() {
return rollbackCount;
}
public void setRollbackCount(Long rollbackCount) {
this.rollbackCount = rollbackCount;
}
public Long getSkipCount() {
return skipCount;
}
public void setSkipCount(Long skipCount) {
this.skipCount = skipCount;
}
public String getBatchIds() {
return batchIds;
}
public void setBatchIds(String batchIds) {
this.batchIds = batchIds;
}
public String getInferenceId() {
return inferenceId;
}
public void setInferenceId(String inferenceId) {
this.inferenceId = inferenceId;
}
public LocalDateTime getCreatedAt() {
return createdAt;
}
public void setCreatedAt(LocalDateTime createdAt) {
this.createdAt = createdAt;
}
public LocalDateTime getUpdatedAt() {
return updatedAt;
}
public void setUpdatedAt(LocalDateTime updatedAt) {
this.updatedAt = updatedAt;
}
}

View File

@@ -0,0 +1,100 @@
package com.kamco.makesample.batch.partitioner;
import com.kamco.makesample.config.ConverterProperties;
import com.kamco.makesample.repository.InferenceResultRepository;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;
/**
* Map ID별 파티션 생성
*
* <p>batch_ids로부터 고유한 map_id 목록을 조회하여 각 map_id마다 ExecutionContext를 생성합니다. 각 파티션은 독립적으로 shapefile과
* geojson을 생성합니다.
*/
@Component
public class MapIdPartitioner implements Partitioner {
private static final Logger log = LoggerFactory.getLogger(MapIdPartitioner.class);
private final InferenceResultRepository repository;
private final ConverterProperties properties;
public MapIdPartitioner(InferenceResultRepository repository, ConverterProperties properties) {
this.repository = repository;
this.properties = properties;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
List<Long> batchIds = properties.getBatchIds();
String inferenceId = properties.getInferenceId();
String outputBaseDir = properties.getOutputBaseDir();
log.info("Creating partitions for batch_ids: {}, inference_id: {}", batchIds, inferenceId);
// batch_ids로 고유한 map_id 목록 조회
List<String> mapIds = repository.findMapIdByBatchIds(batchIds);
if (mapIds.isEmpty()) {
log.warn("No map_ids found for batch_ids: {}", batchIds);
return new HashMap<>();
}
log.info("Found {} map_ids to partition: {}", mapIds.size(), mapIds);
// 각 map_id마다 ExecutionContext 생성
Map<String, ExecutionContext> partitions = new HashMap<>();
for (String mapId : mapIds) {
ExecutionContext context = new ExecutionContext();
// 파티션별 파라미터 설정
context.putString("mapId", mapId);
context.putString("outputPath", buildShapefilePath(outputBaseDir, inferenceId, mapId));
context.putString("geoJsonOutputPath", buildGeoJsonPath(outputBaseDir, inferenceId, mapId));
partitions.put("partition-" + mapId, context);
log.debug(
"Created partition for map_id: {}, shapefile: {}, geojson: {}",
mapId,
context.getString("outputPath"),
context.getString("geoJsonOutputPath"));
}
log.info("Created {} partitions for parallel processing", partitions.size());
return partitions;
}
/**
* Shapefile 출력 경로 생성
*
* @param baseDir 기본 디렉토리
* @param inferenceId Inference ID
* @param mapId Map ID
* @return Shapefile 경로
*/
private String buildShapefilePath(String baseDir, String inferenceId, String mapId) {
return Paths.get(baseDir, inferenceId, mapId, mapId + ".shp").toString();
}
/**
* GeoJSON 출력 경로 생성
*
* @param baseDir 기본 디렉토리
* @param inferenceId Inference ID
* @param mapId Map ID
* @return GeoJSON 경로
*/
private String buildGeoJsonPath(String baseDir, String inferenceId, String mapId) {
return Paths.get(baseDir, inferenceId, mapId, mapId + ".geojson").toString();
}
}

View File

@@ -0,0 +1,124 @@
package com.kamco.makesample.batch.processor;
import com.kamco.makesample.batch.util.FeatureTypeFactory;
import com.kamco.makesample.model.InferenceResult;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.referencing.CRS;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* InferenceResult → SimpleFeature 변환 Processor
*
* <p>기존 ShapefileWriter의 buildFeature 로직을 Processor로 분리
*
* <p>주요 역할:
*
* <ul>
* <li>Geometry 검증 (null 체크, isValid 체크)
* <li>InferenceResult 필드를 SimpleFeature 속성으로 변환
* <li>Invalid geometry는 skip (null 반환)
* </ul>
*/
@Component
@StepScope
public class FeatureConversionProcessor implements ItemProcessor<InferenceResult, SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(FeatureConversionProcessor.class);
private final FeatureTypeFactory featureTypeFactory;
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
private String crsCode;
private SimpleFeatureBuilder featureBuilder;
private SimpleFeatureType featureType;
public FeatureConversionProcessor(FeatureTypeFactory featureTypeFactory) {
this.featureTypeFactory = featureTypeFactory;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
// StepExecutionContext에서 geometry type 읽기
String geomTypeStr = stepExecution.getExecutionContext().getString("geometryType");
Class<?> geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
try {
// CRS 설정
CoordinateReferenceSystem crs = CRS.decode(crsCode);
// FeatureType 생성
this.featureType = featureTypeFactory.createFeatureType(crs, geometryType);
this.featureBuilder = new SimpleFeatureBuilder(this.featureType);
log.info(
"FeatureConversionProcessor initialized with geometry type: {}",
geometryType.getSimpleName());
} catch (FactoryException e) {
throw new RuntimeException("Failed to initialize FeatureConversionProcessor", e);
}
}
@Override
public SimpleFeature process(InferenceResult result) throws Exception {
// Geometry 검증
Geometry geometry = result.getGeometry();
if (geometry == null) {
log.warn("Null geometry detected for uid: {} - skipping", result.getUid());
return null; // Skip this item
}
if (!geometry.isValid()) {
log.warn(
"Invalid geometry detected for uid: {} - skipping. Reason: {}",
result.getUid(),
geometry.getGeometryType());
return null; // Skip invalid geometry
}
// SimpleFeature 빌드
return buildFeature(result, geometry);
}
/**
* InferenceResult를 SimpleFeature로 변환
*
* <p>기존 ShapefileWriter.buildFeature() 로직과 동일
*
* @param result InferenceResult
* @param geometry Geometry
* @return SimpleFeature
*/
private SimpleFeature buildFeature(InferenceResult result, Geometry geometry) {
// Geometry 추가 (the_geom)
featureBuilder.add(geometry);
// 속성 필드 추가
featureBuilder.add(result.getUid());
featureBuilder.add(result.getMapId());
featureBuilder.add(
result.getProbability() != null ? String.valueOf(result.getProbability()) : "0.0");
featureBuilder.add(result.getBeforeYear() != null ? result.getBeforeYear() : 0L);
featureBuilder.add(result.getAfterYear() != null ? result.getAfterYear() : 0L);
featureBuilder.add(result.getBeforeC());
featureBuilder.add(result.getBeforeP() != null ? String.valueOf(result.getBeforeP()) : "0.0");
featureBuilder.add(result.getAfterC());
featureBuilder.add(result.getAfterP() != null ? String.valueOf(result.getAfterP()) : "0.0");
return featureBuilder.buildFeature(null);
}
}

View File

@@ -0,0 +1,56 @@
package com.kamco.makesample.batch.reader;
import com.kamco.makesample.model.InferenceResult;
import com.kamco.makesample.service.GeometryConverter;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
/**
* ResultSet을 InferenceResult로 변환하는 RowMapper
*
* <p>기존 InferenceResultRepository의 InferenceResultRowMapper와 동일한 로직을 사용하지만, Spring Batch의
* ItemReader와 함께 사용하도록 독립 Component로 분리
*/
@Component
public class GeometryConvertingRowMapper implements RowMapper<InferenceResult> {
private final GeometryConverter geometryConverter;
public GeometryConvertingRowMapper(GeometryConverter geometryConverter) {
this.geometryConverter = geometryConverter;
}
@Override
public InferenceResult mapRow(ResultSet rs, int rowNum) throws SQLException {
InferenceResult result = new InferenceResult();
result.setUid(rs.getString("uid"));
result.setMapId(rs.getString("map_id"));
result.setProbability(getDoubleOrNull(rs, "probability"));
result.setBeforeYear(getLongOrNull(rs, "before_year"));
result.setAfterYear(getLongOrNull(rs, "after_year"));
result.setBeforeC(rs.getString("before_c"));
result.setBeforeP(getDoubleOrNull(rs, "before_p"));
result.setAfterC(rs.getString("after_c"));
result.setAfterP(getDoubleOrNull(rs, "after_p"));
// WKT → JTS Geometry 변환 (per-record conversion)
String geometryWkt = rs.getString("geometry_wkt");
if (geometryWkt != null) {
result.setGeometry(geometryConverter.convertWKTToJTS(geometryWkt));
}
return result;
}
private Long getLongOrNull(ResultSet rs, String columnName) throws SQLException {
long value = rs.getLong(columnName);
return rs.wasNull() ? null : value;
}
private Double getDoubleOrNull(ResultSet rs, String columnName) throws SQLException {
double value = rs.getDouble(columnName);
return rs.wasNull() ? null : value;
}
}

View File

@@ -0,0 +1,221 @@
package com.kamco.makesample.batch.reader;
import com.kamco.makesample.model.InferenceResult;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.PreparedStatementSetter;
/**
* 커서 기반 ItemReader 설정
*
* <p>메모리 최적화의 핵심: 전체 데이터를 List로 로딩하지 않고 커서를 사용하여 스트리밍 처리
*
* <p>주요 특징:
*
* <ul>
* <li>fetch-size: 1000 → DB에서 1000건씩 가져옴
* <li>cursor-based → 전체 ResultSet을 메모리에 로딩하지 않음
* <li>PreparedStatement → PostgreSQL array 파라미터 처리
* <li>EPSG:5186 좌표계 정합성 검증 (SRID, 좌표 범위, geometry 유효성)
* </ul>
*/
@Configuration
public class InferenceResultItemReaderConfig {
// EPSG:5186 좌표계 정합성 검증 조건:
// - SRID = 5186 (한국 2000 / 중부 좌표계)
// - ST_IsValid() = true (geometry 유효성)
// - X 범위: 125,000 ~ 530,000m (동서 방향)
// - Y 범위: -600,000 ~ 988,000m (남북 방향)
// 위 조건을 만족하지 않는 잘못된 좌표의 polygon은 배치 대상에서 제외됨
private static final String SQL_QUERY =
"""
SELECT uid, map_id, probability, before_year, after_year,
before_c, before_p, after_c, after_p,
ST_AsText(geometry) as geometry_wkt
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND after_c IS NOT NULL
AND after_p IS NOT NULL
AND geometry IS NOT NULL
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_IsValid(geometry) = true
AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
ORDER BY map_id, uid
""";
/**
* MERGED 모드용 ItemReader (Shapefile 생성용)
*
* <p>전체 batch_ids에 대한 데이터를 스트리밍으로 읽어옴
*
* @param dataSource DataSource
* @param batchIdsParam Job Parameter로 전달받은 batch_ids (콤마 구분 문자열, 예: "252,253,257")
* @param fetchSize fetch size (기본 1000)
* @param rowMapper RowMapper
* @return JdbcCursorItemReader
*/
@Bean
@StepScope
public JdbcCursorItemReader<InferenceResult> shapefileReader(
DataSource dataSource,
@Value("#{jobParameters['batchIds']}") String batchIdsParam,
@Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
GeometryConvertingRowMapper rowMapper) {
// JobParameter로 받은 "252,253,257" 형태를 Long 배열로 변환
Long[] batchIds = parseBatchIds(batchIdsParam);
return new JdbcCursorItemReaderBuilder<InferenceResult>()
.name("shapefileReader")
.dataSource(dataSource)
.sql(SQL_QUERY)
.fetchSize(fetchSize) // 메모리 효율을 위한 fetch size
.rowMapper(rowMapper)
.preparedStatementSetter(
new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
// PostgreSQL array 파라미터 설정
Connection conn = ps.getConnection();
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
}
})
.build();
}
/**
* MERGED 모드용 ItemReader (GeoJSON 생성용)
*
* <p>전체 batch_ids에 대한 데이터를 스트리밍으로 읽어옴 (Shapefile과 동일한 데이터)
*
* @param dataSource DataSource
* @param batchIdsParam Job Parameter로 전달받은 batch_ids (콤마 구분 문자열, 예: "252,253,257")
* @param fetchSize fetch size (기본 1000)
* @param rowMapper RowMapper
* @return JdbcCursorItemReader
*/
@Bean
@StepScope
public JdbcCursorItemReader<InferenceResult> geoJsonReader(
DataSource dataSource,
@Value("#{jobParameters['batchIds']}") String batchIdsParam,
@Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
GeometryConvertingRowMapper rowMapper) {
// JobParameter로 받은 "252,253,257" 형태를 Long 배열로 변환
Long[] batchIds = parseBatchIds(batchIdsParam);
return new JdbcCursorItemReaderBuilder<InferenceResult>()
.name("geoJsonReader")
.dataSource(dataSource)
.sql(SQL_QUERY)
.fetchSize(fetchSize) // 메모리 효율을 위한 fetch size
.rowMapper(rowMapper)
.preparedStatementSetter(
new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
// PostgreSQL array 파라미터 설정
Connection conn = ps.getConnection();
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
}
})
.build();
}
/**
* MAP_IDS 모드용 ItemReader
*
* <p>특정 map_id에 대한 데이터만 읽어옴
*
* @param dataSource DataSource
* @param batchIdsParam Job Parameter로 전달받은 batch_ids
* @param mapId Step Execution Context에서 전달받은 map_id
* @param fetchSize fetch size
* @param rowMapper RowMapper
* @return JdbcCursorItemReader
*/
@Bean
@StepScope
public JdbcCursorItemReader<InferenceResult> mapIdModeReader(
DataSource dataSource,
@Value("#{jobParameters['batchIds']}") String batchIdsParam,
@Value("#{stepExecutionContext['mapId']}") String mapId,
@Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
GeometryConvertingRowMapper rowMapper) {
Long[] batchIds = parseBatchIds(batchIdsParam);
String sqlWithMapId =
"""
SELECT uid, map_id, probability, before_year, after_year,
before_c, before_p, after_c, after_p,
ST_AsText(geometry) as geometry_wkt
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND map_id = ?
AND after_c IS NOT NULL
AND after_p IS NOT NULL
AND geometry IS NOT NULL
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_IsValid(geometry) = true
AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
ORDER BY uid
""";
return new JdbcCursorItemReaderBuilder<InferenceResult>()
.name("mapIdModeReader")
.dataSource(dataSource)
.sql(sqlWithMapId)
.fetchSize(fetchSize)
.rowMapper(rowMapper)
.preparedStatementSetter(
new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
Connection conn = ps.getConnection();
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
ps.setString(2, mapId);
}
})
.build();
}
/**
* JobParameter 문자열을 Long 배열로 변환
*
* @param batchIdsParam "252,253,257" 형태의 문자열
* @return Long 배열
*/
private Long[] parseBatchIds(String batchIdsParam) {
if (batchIdsParam == null || batchIdsParam.trim().isEmpty()) {
throw new IllegalArgumentException("batchIds parameter is required");
}
String[] parts = batchIdsParam.split(",");
Long[] batchIds = new Long[parts.length];
for (int i = 0; i < parts.length; i++) {
batchIds[i] = Long.parseLong(parts[i].trim());
}
return batchIds;
}
}

View File

@@ -0,0 +1,206 @@
package com.kamco.makesample.batch.repository;
import com.kamco.makesample.batch.model.BatchExecutionHistory;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDateTime;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
/**
* 배치 실행 이력 Repository
*
* <p>스텝별 실행 이력을 데이터베이스에 저장하고 조회
*/
@Repository
public class BatchExecutionHistoryRepository {
private final JdbcTemplate jdbcTemplate;
public BatchExecutionHistoryRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
/**
* 스텝 시작 시 이력 생성
*
* @param history 배치 실행 이력
* @return 생성된 이력의 ID
*/
public Long insert(BatchExecutionHistory history) {
String sql =
"""
INSERT INTO batch_execution_history (
job_execution_id, step_execution_id, step_name, step_type,
start_time, status, batch_ids, inference_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id
""";
return jdbcTemplate.queryForObject(
sql,
Long.class,
history.getJobExecutionId(),
history.getStepExecutionId(),
history.getStepName(),
history.getStepType(),
Timestamp.valueOf(history.getStartTime()),
history.getStatus(),
history.getBatchIds(),
history.getInferenceId());
}
/**
* 스텝 종료 시 이력 업데이트
*
* @param id 이력 ID
* @param endTime 종료 시간
* @param status 상태
* @param exitCode Exit Code
* @param exitMessage Exit Message
* @param errorMessage 에러 메시지
* @param errorStackTrace 스택 트레이스
* @param readCount Read Count
* @param writeCount Write Count
* @param commitCount Commit Count
* @param rollbackCount Rollback Count
* @param skipCount Skip Count
*/
public void updateOnCompletion(
Long id,
LocalDateTime endTime,
LocalDateTime startTime,
String status,
String exitCode,
String exitMessage,
String errorMessage,
String errorStackTrace,
Long readCount,
Long writeCount,
Long commitCount,
Long rollbackCount,
Long skipCount) {
// 소요 시간 계산
long durationMs = Duration.between(startTime, endTime).toMillis();
String sql =
"""
UPDATE batch_execution_history
SET end_time = ?,
duration_ms = ?,
status = ?,
exit_code = ?,
exit_message = ?,
error_message = ?,
error_stack_trace = ?,
read_count = ?,
write_count = ?,
commit_count = ?,
rollback_count = ?,
skip_count = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""";
jdbcTemplate.update(
sql,
Timestamp.valueOf(endTime),
durationMs,
status,
exitCode,
exitMessage,
errorMessage,
errorStackTrace,
readCount,
writeCount,
commitCount,
rollbackCount,
skipCount,
id);
}
/**
* Job Execution ID로 모든 스텝 이력 조회
*
* @param jobExecutionId Job Execution ID
* @return 이력 목록
*/
public java.util.List<BatchExecutionHistory> findByJobExecutionId(Long jobExecutionId) {
String sql =
"""
SELECT * FROM batch_execution_history
WHERE job_execution_id = ?
ORDER BY start_time
""";
return jdbcTemplate.query(
sql,
(rs, rowNum) -> {
BatchExecutionHistory history = new BatchExecutionHistory();
history.setId(rs.getLong("id"));
history.setJobExecutionId(rs.getLong("job_execution_id"));
history.setStepExecutionId(rs.getLong("step_execution_id"));
history.setStepName(rs.getString("step_name"));
history.setStepType(rs.getString("step_type"));
history.setStartTime(rs.getTimestamp("start_time").toLocalDateTime());
Timestamp endTimestamp = rs.getTimestamp("end_time");
if (endTimestamp != null) {
history.setEndTime(endTimestamp.toLocalDateTime());
}
history.setDurationMs(rs.getLong("duration_ms"));
history.setStatus(rs.getString("status"));
history.setExitCode(rs.getString("exit_code"));
history.setExitMessage(rs.getString("exit_message"));
history.setErrorMessage(rs.getString("error_message"));
history.setReadCount(rs.getLong("read_count"));
history.setWriteCount(rs.getLong("write_count"));
history.setCommitCount(rs.getLong("commit_count"));
history.setRollbackCount(rs.getLong("rollback_count"));
history.setSkipCount(rs.getLong("skip_count"));
history.setBatchIds(rs.getString("batch_ids"));
history.setInferenceId(rs.getString("inference_id"));
return history;
},
jobExecutionId);
}
/**
* 최근 N개 실행 이력 조회
*
* @param limit 조회 개수
* @return 이력 목록
*/
public java.util.List<BatchExecutionHistory> findRecent(int limit) {
String sql =
"""
SELECT * FROM batch_execution_history
ORDER BY start_time DESC
LIMIT ?
""";
return jdbcTemplate.query(
sql,
(rs, rowNum) -> {
BatchExecutionHistory history = new BatchExecutionHistory();
history.setId(rs.getLong("id"));
history.setJobExecutionId(rs.getLong("job_execution_id"));
history.setStepExecutionId(rs.getLong("step_execution_id"));
history.setStepName(rs.getString("step_name"));
history.setStartTime(rs.getTimestamp("start_time").toLocalDateTime());
Timestamp endTimestamp = rs.getTimestamp("end_time");
if (endTimestamp != null) {
history.setEndTime(endTimestamp.toLocalDateTime());
}
history.setDurationMs(rs.getLong("duration_ms"));
history.setStatus(rs.getString("status"));
history.setExitCode(rs.getString("exit_code"));
return history;
},
limit);
}
}

View File

@@ -0,0 +1,59 @@
package com.kamco.makesample.batch.tasklet;
import com.kamco.makesample.writer.ResultZipWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* ZIP 파일 생성 Tasklet
*
* <p>기존 ResultZipWriter를 재사용하여 shapefile 관련 파일들을 압축
*/
@Component
@StepScope
public class CreateZipTasklet implements Tasklet {
private static final Logger log = LoggerFactory.getLogger(CreateZipTasklet.class);
@Value("#{jobParameters['outputPath']}")
private String outputPath;
@Value("#{jobParameters['zipBaseName']}")
private String zipBaseName;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
log.info("Creating ZIP file for shapefile: {}", outputPath);
// outputPath에서 디렉토리 추출
Path shapefilePath = Paths.get(outputPath);
Path dirPath = shapefilePath.getParent();
// 기존 ResultZipWriter 재사용
ResultZipWriter.createZip(dirPath, zipBaseName);
log.info("ZIP file created successfully: {}.zip", zipBaseName);
// ZIP 파일 경로를 JobExecutionContext에 저장 (GeoServer 등록에서 사용)
String zipPath = dirPath.resolve(zipBaseName + ".zip").toString();
chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("zipFilePath", zipPath);
return RepeatStatus.FINISHED;
}
}

View File

@@ -0,0 +1,72 @@
package com.kamco.makesample.batch.tasklet;
import com.kamco.makesample.service.GeoServerRegistrationService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* GeoServer 등록 Tasklet
*
* <p>기존 GeoServerRegistrationService를 재사용하여 shapefile을 GeoServer에 등록
*
* <p>Conditional execution: geoserver.enabled=false 이면 skip
*/
@Component
@StepScope
public class GeoServerRegistrationTasklet implements Tasklet {
private static final Logger log = LoggerFactory.getLogger(GeoServerRegistrationTasklet.class);
private final GeoServerRegistrationService geoServerService;
@Value("#{jobParameters['geoserver.enabled'] ?: false}")
private boolean geoServerEnabled;
@Value("#{jobParameters['layerName']}")
private String layerName;
public GeoServerRegistrationTasklet(GeoServerRegistrationService geoServerService) {
this.geoServerService = geoServerService;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
if (!geoServerEnabled) {
log.info("GeoServer registration is disabled. Skipping.");
return RepeatStatus.FINISHED;
}
log.info("Starting GeoServer registration for layer: {}", layerName);
// JobExecutionContext에서 ZIP 파일 경로 가져오기
String zipPath =
(String)
chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.get("zipFilePath");
if (zipPath == null) {
log.error("ZIP file path not found in JobExecutionContext");
throw new IllegalStateException("ZIP file path not available for GeoServer registration");
}
// 기존 GeoServerRegistrationService 재사용
geoServerService.uploadShapefileZip(zipPath, layerName);
log.info("GeoServer registration completed successfully for layer: {}", layerName);
return RepeatStatus.FINISHED;
}
}

View File

@@ -0,0 +1,302 @@
package com.kamco.makesample.batch.tasklet;
import com.kamco.makesample.exception.MixedGeometryException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Geometry Type 검증 Tasklet
*
* <p>Shapefile은 homogeneous geometry type을 요구하므로, chunk 처리 전에 사전 검증 필요
*
* <p>주요 역할:
*
* <ul>
* <li>SQL DISTINCT 쿼리로 geometry type 확인 (ST_Polygon, ST_MultiPolygon만 조회)
* <li>지원하지 않는 geometry 타입 발견 시 즉시 에러 발생 (fast-fail)
* <li>StepExecutionContext에 geometry type 저장 (Writer가 사용)
* </ul>
*/
@Component
@StepScope
public class GeometryTypeValidationTasklet implements Tasklet {
private static final Logger log = LoggerFactory.getLogger(GeometryTypeValidationTasklet.class);
private final DataSource dataSource;
@Value("#{jobParameters['batchIds']}")
private String batchIdsParam;
public GeometryTypeValidationTasklet(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
log.info("========================================");
log.info("Step 1: Geometry Type Validation");
log.info("========================================");
log.info("Validating geometry types for batch_ids: {}", batchIdsParam);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 1. JobParameter를 Long 배열로 변환
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 예: "252,253,257" → [252L, 253L, 257L]
Long[] batchIds = parseBatchIds(batchIdsParam);
log.debug("Parsed batch IDs: {}", (Object) batchIds);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 1.1 전체 row 개수 조회 (검증 전)
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
String countAllSql =
"""
SELECT COUNT(*) as total_count
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND geometry IS NOT NULL
""";
long totalRows = 0;
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(countAllSql)) {
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
totalRows = rs.getLong("total_count");
}
}
}
log.info("Total rows with non-null geometry: {}", totalRows);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 2. SQL로 고유한 geometry type 조회 및 좌표계 검증
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// ST_GeometryType()는 "ST_Polygon", "ST_MultiPolygon" 등을 반환
// DISTINCT로 고유한 타입만 조회
// ST_Polygon, ST_MultiPolygon만 허용 (Point, LineString 등은 제외)
// geometry IS NOT NULL 조건으로 null geometry 제외
//
// EPSG:5186 좌표계 정합성 검증:
// - SRID가 5186인지 확인
// - 유효 범위: X(125000~530000m), Y(-600000~988000m) - 한국 중부 영역
// - ST_IsValid()로 geometry 유효성 검증
String sql =
"""
SELECT DISTINCT ST_GeometryType(geometry) as geom_type
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND geometry IS NOT NULL
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_IsValid(geometry) = true
AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
""";
List<String> geometryTypes = new ArrayList<>();
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
// PostgreSQL array 파라미터 설정
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String geomType = rs.getString("geom_type");
geometryTypes.add(geomType);
log.debug("Found geometry type: {}", geomType);
}
}
}
log.info("Found {} distinct geometry type(s): {}", geometryTypes.size(), geometryTypes);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 2.1 검증 통과 row 개수 조회
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
String countValidSql =
"""
SELECT COUNT(*) as valid_count
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND geometry IS NOT NULL
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_IsValid(geometry) = true
AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
""";
long validRows = 0;
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(countValidSql)) {
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
validRows = rs.getLong("valid_count");
}
}
}
long excludedRows = totalRows - validRows;
log.info("========================================");
log.info("📊 Geometry Validation Summary:");
log.info(" Total rows: {}", totalRows);
log.info(" Valid rows: {} (EPSG:5186 compliant)", validRows);
log.info(" Excluded rows: {} (invalid geometry or out of range)", excludedRows);
if (excludedRows > 0) {
log.warn(
"⚠️ {} rows excluded due to invalid geometry or coordinate out of EPSG:5186 range",
excludedRows);
}
log.info("========================================");
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 3. Mixed geometry type 체크 및 자동 변환 안내
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// Shapefile 제약사항: 하나의 shapefile은 단일 geometry type만 허용
// MultiPolygon이 포함된 경우 자동으로 Polygon으로 변환됨 (GeometryConverter)
//
// ⚠️ 참고: SQL 필터로 ST_Polygon, ST_MultiPolygon만 조회하므로
// 이론적으로는 이 두 타입만 존재해야 함
// 만약 다른 타입이 섞여 있다면 데이터 정합성 문제
if (geometryTypes.size() > 1) {
// ST_Polygon과 ST_MultiPolygon이 섞인 경우 → 자동 변환 허용
boolean hasPolygon = geometryTypes.stream().anyMatch(t -> t.equals("ST_Polygon"));
boolean hasMultiPolygon = geometryTypes.stream().anyMatch(t -> t.equals("ST_MultiPolygon"));
if (hasPolygon && hasMultiPolygon && geometryTypes.size() == 2) {
log.info("========================================");
log.info(" Mixed geometry types detected:");
log.info(" Types: {}", geometryTypes);
log.info("");
log.info("✅ Auto-conversion enabled:");
log.info(" ST_MultiPolygon → ST_Polygon (first polygon only)");
log.info(" This will unify all geometries to ST_Polygon type");
log.info("========================================");
// Polygon을 기본 타입으로 설정 (자동 변환 후 모든 geometry가 Polygon이 됨)
chunkContext
.getStepContext()
.getStepExecution()
.getExecutionContext()
.putString("geometryType", "ST_Polygon");
log.info("✅ Geometry type validation PASSED with auto-conversion");
log.info(" Target Type: ST_Polygon");
log.info(" MultiPolygon geometries will be converted during processing");
log.info("========================================");
return RepeatStatus.FINISHED;
}
// 그 외의 혼합 타입은 즉시 에러 발생 (fast-fail)
// 예: ST_Polygon + ST_Point 등 (하지만 SQL 필터로 이미 제외되었어야 함)
log.error("❌ Unexpected mixed geometry types detected: {}", geometryTypes);
log.error("Shapefile requires homogeneous geometry type");
log.error("Only Polygon + MultiPolygon mix is supported with auto-conversion");
throw new MixedGeometryException(
"Shapefile requires homogeneous geometry type. Found: "
+ geometryTypes
+ ". Only Polygon + MultiPolygon mix is supported.");
}
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 4. 빈 데이터셋 체크
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 모든 geometry가 null이거나 데이터가 없는 경우
// 경고만 출력하고 통과 (Reader에서 읽을 데이터가 없으므로 Writer까지 가지 않음)
if (geometryTypes.isEmpty()) {
log.warn("========================================");
log.warn("WARNING: No valid geometries found in dataset");
log.warn("This may indicate:");
log.warn(" 1. All geometries are NULL");
log.warn(" 2. No data exists for the given batch_ids");
log.warn(" 3. Database connection issues");
log.warn("========================================");
log.warn("Proceeding with empty dataset (no files will be generated)");
// 빈 데이터셋이지만 Step은 성공으로 처리
// 다음 Step(Reader)에서 읽을 데이터가 없으므로 자연스럽게 종료됨
return RepeatStatus.FINISHED;
}
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 5. StepExecutionContext에 geometry type 저장
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 이후 Step에서 사용:
// - FeatureConversionProcessor: featureType 생성 시 사용
// - StreamingShapefileWriter: shapefile schema 생성 시 사용
String geometryType = geometryTypes.get(0);
// MultiPolygon 타입인 경우 Polygon으로 변환됨을 안내
if (geometryType.equals("ST_MultiPolygon")) {
log.info("========================================");
log.info(" Geometry type: {}", geometryType);
log.info("");
log.info("✅ Auto-conversion will be applied:");
log.info(" ST_MultiPolygon → ST_Polygon (first polygon only)");
log.info(" All MultiPolygon geometries will be converted during processing");
log.info("========================================");
// Polygon으로 저장 (변환 후 타입)
geometryType = "ST_Polygon";
}
chunkContext
.getStepContext()
.getStepExecution()
.getExecutionContext()
.putString("geometryType", geometryType);
log.info("========================================");
log.info("✅ Geometry type validation PASSED");
log.info(" Geometry Type: {}", geometryType);
log.info(" All geometries are homogeneous (or will be converted)");
log.info("========================================");
return RepeatStatus.FINISHED;
}
/**
* JobParameter 문자열을 Long 배열로 변환
*
* @param batchIdsParam "252,253,257" 형태의 문자열
* @return Long 배열
*/
private Long[] parseBatchIds(String batchIdsParam) {
if (batchIdsParam == null || batchIdsParam.trim().isEmpty()) {
throw new IllegalArgumentException("batchIds parameter is required");
}
String[] parts = batchIdsParam.split(",");
Long[] batchIds = new Long[parts.length];
for (int i = 0; i < parts.length; i++) {
batchIds[i] = Long.parseLong(parts[i].trim());
}
return batchIds;
}
}

View File

@@ -0,0 +1,75 @@
package com.kamco.makesample.batch.util;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.feature.simple.SimpleFeatureTypeBuilder;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* SimpleFeatureType 생성 유틸리티
*
* <p>Processor와 Writer가 공유하는 featureType 생성 로직
*
* <p>일관성 보장: 동일한 스키마를 사용하여 feature 생성 및 저장
*/
@Component
public class FeatureTypeFactory {
private static final Logger log = LoggerFactory.getLogger(FeatureTypeFactory.class);
/**
* SimpleFeatureType 생성
*
* @param crs CoordinateReferenceSystem
* @param geomType Geometry 타입 (Polygon, MultiPolygon 등)
* @return SimpleFeatureType
*/
public SimpleFeatureType createFeatureType(CoordinateReferenceSystem crs, Class<?> geomType) {
SimpleFeatureTypeBuilder builder = new SimpleFeatureTypeBuilder();
builder.setName("inference_results");
builder.setCRS(crs);
// Geometry 필드를 기본 geometry로 설정
builder.add("the_geom", geomType);
builder.setDefaultGeometry("the_geom");
// 속성 필드들
builder.add("uid", String.class);
builder.add("map_id", String.class);
builder.add("chn_dtct_p", String.class);
builder.add("cprs_yr", Long.class);
builder.add("crtr_yr", Long.class);
builder.add("bf_cls_cd", String.class);
builder.add("bf_cls_pro", String.class);
builder.add("af_cls_cd", String.class);
builder.add("af_cls_pro", String.class);
return builder.buildFeatureType();
}
/**
* Geometry type 문자열을 Class로 변환
*
* @param geomTypeStr "ST_Polygon", "Polygon" 등
* @return Geometry Class
*/
public Class<?> parseGeometryType(String geomTypeStr) {
if (geomTypeStr == null || geomTypeStr.isEmpty()) {
return Geometry.class;
}
// PostGIS ST_GeometryType() 함수는 "ST_Polygon" 형태로 반환
// "ST_" 접두어 제거
String typeName = geomTypeStr.replace("ST_", "");
try {
return Class.forName("org.locationtech.jts.geom." + typeName);
} catch (ClassNotFoundException e) {
log.warn("Unknown geometry type: {}, using Geometry.class", typeName);
return Geometry.class;
}
}
}

View File

@@ -0,0 +1,193 @@
package com.kamco.makesample.batch.writer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.geojson.feature.FeatureJSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.annotation.OnWriteError;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.stereotype.Component;
/**
* Map ID별 GeoJSON Writer
*
* <p>Partitioned Step의 worker에서 실행되며, StepExecutionContext로부터 map_id별 geoJsonOutputPath를 읽어 개별
* GeoJSON 파일을 생성합니다.
*
* <p>StreamingGeoJsonWriter와 동일한 스트리밍 패턴을 사용하여 메모리 효율적으로 처리합니다.
*/
@Component
@StepScope
public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(MapIdGeoJsonWriter.class);
private String outputPath;
private String mapId;
private FileOutputStream outputStream;
private FeatureJSON featureJSON;
private int chunkCount = 0;
private int totalRecordCount = 0;
private boolean isFirstChunk = true;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
// StepExecutionContext에서 partition별 파라미터 읽기
ExecutionContext executionContext = stepExecution.getExecutionContext();
this.mapId = executionContext.getString("mapId");
this.outputPath = executionContext.getString("geoJsonOutputPath");
log.info("MapIdGeoJsonWriter initialized for map_id: {}, output: {}", mapId, outputPath);
// 출력 디렉토리 생성
try {
Path outputDir = Paths.get(outputPath).getParent();
if (outputDir != null && !Files.exists(outputDir)) {
Files.createDirectories(outputDir);
log.info("Created output directory for map_id {} GeoJSON: {}", mapId, outputDir);
}
} catch (IOException e) {
throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e);
}
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
log.info("Opening GeoJSON writer for map_id: {}", mapId);
try {
File geoJsonFile = new File(outputPath);
outputStream = new FileOutputStream(geoJsonFile);
featureJSON = new FeatureJSON();
// GeoJSON FeatureCollection 시작
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
log.info("GeoJSON file initialized for map_id: {}", mapId);
} catch (IOException e) {
throw new ItemStreamException("Failed to open GeoJSON file for map_id " + mapId, e);
}
}
@Override
public void write(Chunk<? extends SimpleFeature> chunk) throws Exception {
if (chunk.isEmpty()) {
return;
}
chunkCount++;
List<SimpleFeature> items = (List<SimpleFeature>) chunk.getItems();
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"[map_id: {}] Writing chunk #{} to GeoJSON with {} features (total: {})",
mapId,
chunkCount,
itemCount,
totalRecordCount);
// 각 feature를 GeoJSON으로 변환하여 append
for (int i = 0; i < items.size(); i++) {
SimpleFeature feature = items.get(i);
// 첫 번째 feature가 아니면 콤마 추가
if (!isFirstChunk || i > 0) {
outputStream.write(",".getBytes());
}
// Feature를 GeoJSON으로 직렬화
featureJSON.writeFeature(feature, outputStream);
}
isFirstChunk = false;
log.debug("[map_id: {}] Chunk #{} written to GeoJSON successfully", mapId, chunkCount);
}
@AfterStep
public void afterStep() {
log.info(
"[map_id: {}] All chunks written to GeoJSON. Total {} records in {} chunks",
mapId,
totalRecordCount,
chunkCount);
try {
if (outputStream != null) {
// GeoJSON FeatureCollection 종료
outputStream.write("]}".getBytes());
outputStream.flush();
log.info("[map_id: {}] GeoJSON file finalized successfully", mapId);
}
} catch (IOException e) {
log.error("[map_id: {}] Failed to finalize GeoJSON file", mapId, e);
throw new ItemStreamException("Failed to finalize GeoJSON file for map_id: " + mapId, e);
} finally {
cleanup();
}
}
@Override
public void close() throws ItemStreamException {
cleanup();
}
@OnWriteError
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
log.error(
"[map_id: {}] Error writing chunk #{} to GeoJSON: {}",
mapId,
chunkCount,
exception.getMessage(),
exception);
cleanup();
// 부분 파일 삭제
try {
File geoJsonFile = new File(outputPath);
if (geoJsonFile.exists()) {
geoJsonFile.delete();
log.info("[map_id: {}] Deleted partial GeoJSON file", mapId);
}
} catch (Exception e) {
log.warn("[map_id: {}] Failed to delete partial GeoJSON file", mapId, e);
}
}
private void cleanup() {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
log.warn("[map_id: {}] Failed to close GeoJSON output stream", mapId, e);
}
outputStream = null;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// Checkpoint
executionContext.putInt("chunkCount", chunkCount);
executionContext.putInt("totalRecordCount", totalRecordCount);
}
}

View File

@@ -0,0 +1,250 @@
package com.kamco.makesample.batch.writer;
import com.kamco.makesample.batch.util.FeatureTypeFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.geotools.api.data.SimpleFeatureStore;
import org.geotools.api.data.Transaction;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.data.DefaultTransaction;
import org.geotools.data.collection.ListFeatureCollection;
import org.geotools.data.shapefile.ShapefileDataStore;
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
import org.geotools.referencing.CRS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.annotation.OnWriteError;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Map ID별 Shapefile Writer
*
* <p>Partitioned Step의 worker에서 실행되며, StepExecutionContext로부터 map_id별 outputPath를 읽어 개별 shapefile을
* 생성합니다.
*
* <p>StreamingShapefileWriter와 동일한 스트리밍 패턴을 사용하여 메모리 효율적으로 처리합니다.
*/
@Component
@StepScope
public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(MapIdShapefileWriter.class);
private final FeatureTypeFactory featureTypeFactory;
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
private String crsCode;
private String outputPath;
private String mapId;
private ShapefileDataStore dataStore;
private Transaction transaction;
private SimpleFeatureStore featureStore;
private SimpleFeatureType featureType;
private int chunkCount = 0;
private int totalRecordCount = 0;
private Class<?> geometryType;
public MapIdShapefileWriter(FeatureTypeFactory featureTypeFactory) {
this.featureTypeFactory = featureTypeFactory;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
// StepExecutionContext에서 partition별 파라미터 읽기
ExecutionContext executionContext = stepExecution.getExecutionContext();
this.mapId = executionContext.getString("mapId");
this.outputPath = executionContext.getString("outputPath");
log.info("MapIdShapefileWriter initialized for map_id: {}, output: {}", mapId, outputPath);
// Geometry type 읽기 (validation tasklet에서 설정)
String geomTypeStr = executionContext.getString("geometryType");
if (geomTypeStr != null) {
this.geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
log.debug("Geometry type from validation: {}", geometryType.getSimpleName());
}
// 출력 디렉토리 생성
try {
Path outputDir = Paths.get(outputPath).getParent();
if (outputDir != null && !Files.exists(outputDir)) {
Files.createDirectories(outputDir);
log.info("Created output directory for map_id {}: {}", mapId, outputDir);
}
} catch (IOException e) {
throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e);
}
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
log.info("Opening shapefile writer for map_id: {}", mapId);
try {
// CRS 설정
CoordinateReferenceSystem crs = CRS.decode(crsCode);
// Geometry type 기본값 설정
if (geometryType == null) {
geometryType = featureTypeFactory.parseGeometryType(null);
log.warn("Geometry type not set for map_id {}, using default: Geometry.class", mapId);
}
// SimpleFeatureType 생성
featureType = featureTypeFactory.createFeatureType(crs, geometryType);
// ShapefileDataStore 생성
File shpFile = new File(outputPath);
ShapefileDataStoreFactory factory = new ShapefileDataStoreFactory();
Map<String, Serializable> params = new HashMap<>();
params.put("url", shpFile.toURI().toURL());
params.put("create spatial index", Boolean.TRUE);
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
dataStore.createSchema(featureType);
// Transaction 시작
transaction = new DefaultTransaction("create-" + mapId);
// FeatureStore 가져오기
String typeName = dataStore.getTypeNames()[0];
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
featureStore.setTransaction(transaction);
log.info("ShapefileDataStore initialized for map_id: {}", mapId);
} catch (FactoryException e) {
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
} catch (IOException e) {
throw new ItemStreamException("Failed to create shapefile for map_id " + mapId, e);
}
}
@Override
public void write(Chunk<? extends SimpleFeature> chunk) throws Exception {
if (chunk.isEmpty()) {
return;
}
chunkCount++;
List<SimpleFeature> items = (List<SimpleFeature>) chunk.getItems();
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"[map_id: {}] Writing chunk #{} with {} features (total: {})",
mapId,
chunkCount,
itemCount,
totalRecordCount);
// ListFeatureCollection으로 변환하여 FeatureStore에 추가
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
featureStore.addFeatures(collection);
log.debug("[map_id: {}] Chunk #{} written successfully", mapId, chunkCount);
}
@AfterStep
public void afterStep() {
log.info(
"[map_id: {}] All chunks written. Committing {} records in {} chunks",
mapId,
totalRecordCount,
chunkCount);
try {
if (transaction != null) {
transaction.commit();
log.info("[map_id: {}] Transaction committed successfully", mapId);
}
} catch (IOException e) {
log.error("[map_id: {}] Failed to commit transaction", mapId, e);
throw new ItemStreamException(
"Failed to commit shapefile transaction for map_id: " + mapId, e);
} finally {
cleanup();
}
}
@Override
public void close() throws ItemStreamException {
cleanup();
}
@OnWriteError
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
log.error(
"[map_id: {}] Error writing chunk #{}: {}",
mapId,
chunkCount,
exception.getMessage(),
exception);
try {
if (transaction != null) {
transaction.rollback();
log.info("[map_id: {}] Transaction rolled back", mapId);
}
// 부분 파일 삭제
File shpFile = new File(outputPath);
if (shpFile.exists()) {
shpFile.delete();
log.info("[map_id: {}] Deleted partial shapefile", mapId);
}
} catch (IOException e) {
log.error("[map_id: {}] Failed to rollback transaction", mapId, e);
} finally {
cleanup();
}
}
private void cleanup() {
if (transaction != null) {
try {
transaction.close();
} catch (IOException e) {
log.warn("[map_id: {}] Failed to close transaction", mapId, e);
}
transaction = null;
}
if (dataStore != null) {
dataStore.dispose();
dataStore = null;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// Checkpoint
executionContext.putInt("chunkCount", chunkCount);
executionContext.putInt("totalRecordCount", totalRecordCount);
}
}

View File

@@ -0,0 +1,185 @@
package com.kamco.makesample.batch.writer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.geojson.feature.FeatureJSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.annotation.OnWriteError;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 스트리밍 GeoJSON Writer
*
* <p>StreamingShapefileWriter와 유사한 패턴으로 chunk 단위 스트리밍 처리
*
* <p>메모리 효과:
*
* <ul>
* <li>기존: 전체 데이터를 DefaultFeatureCollection에 누적
* <li>신규: chunk 단위로 GeoJSON 스트림에 append
* </ul>
*/
@Component
@StepScope
public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(StreamingGeoJsonWriter.class);
@Value("#{jobParameters['geoJsonOutputPath']}")
private String outputPath;
private FileOutputStream outputStream;
private FeatureJSON featureJSON;
private int chunkCount = 0;
private int totalRecordCount = 0;
private boolean isFirstChunk = true;
@BeforeStep
public void beforeStep() {
// 출력 디렉토리 생성
try {
Path outputDir = Paths.get(outputPath).getParent();
if (outputDir != null && !Files.exists(outputDir)) {
Files.createDirectories(outputDir);
log.info("Created output directory for GeoJSON: {}", outputDir);
}
} catch (IOException e) {
throw new ItemStreamException("Failed to create output directory", e);
}
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
log.info("Opening StreamingGeoJsonWriter for: {}", outputPath);
try {
File geoJsonFile = new File(outputPath);
outputStream = new FileOutputStream(geoJsonFile);
featureJSON = new FeatureJSON();
// GeoJSON FeatureCollection 시작
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
log.info("GeoJSON file initialized successfully");
} catch (IOException e) {
throw new ItemStreamException("Failed to open GeoJSON file: " + outputPath, e);
}
}
@Override
public void write(Chunk<? extends SimpleFeature> chunk) throws Exception {
if (chunk.isEmpty()) {
return;
}
chunkCount++;
List<SimpleFeature> items = (List<SimpleFeature>) chunk.getItems();
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"Writing chunk #{} to GeoJSON with {} features (total so far: {})",
chunkCount,
itemCount,
totalRecordCount);
// 각 feature를 GeoJSON으로 변환하여 append
for (int i = 0; i < items.size(); i++) {
SimpleFeature feature = items.get(i);
// 첫 번째 feature가 아니면 콤마 추가
if (!isFirstChunk || i > 0) {
outputStream.write(",".getBytes());
}
// Feature를 GeoJSON으로 직렬화
featureJSON.writeFeature(feature, outputStream);
}
isFirstChunk = false;
log.debug("Chunk #{} written to GeoJSON successfully", chunkCount);
}
@AfterStep
public void afterStep() {
log.info(
"All chunks written to GeoJSON. Total {} records in {} chunks",
totalRecordCount,
chunkCount);
try {
if (outputStream != null) {
// GeoJSON FeatureCollection 종료
outputStream.write("]}".getBytes());
outputStream.flush();
log.info("GeoJSON file finalized successfully");
}
} catch (IOException e) {
log.error("Failed to finalize GeoJSON file", e);
throw new ItemStreamException("Failed to finalize GeoJSON file", e);
} finally {
cleanup();
}
}
@Override
public void close() throws ItemStreamException {
cleanup();
}
@OnWriteError
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
log.error(
"Error writing chunk #{} to GeoJSON: {}", chunkCount, exception.getMessage(), exception);
cleanup();
// 부분 파일 삭제
try {
File geoJsonFile = new File(outputPath);
if (geoJsonFile.exists()) {
geoJsonFile.delete();
log.info("Deleted partial GeoJSON file: {}", outputPath);
}
} catch (Exception e) {
log.warn("Failed to delete partial GeoJSON file", e);
}
}
private void cleanup() {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
log.warn("Failed to close GeoJSON output stream", e);
}
outputStream = null;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// Checkpoint
executionContext.putInt("chunkCount", chunkCount);
executionContext.putInt("totalRecordCount", totalRecordCount);
}
}

View File

@@ -0,0 +1,253 @@
package com.kamco.makesample.batch.writer;
import com.kamco.makesample.batch.util.FeatureTypeFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.geotools.api.data.SimpleFeatureStore;
import org.geotools.api.data.Transaction;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.data.DefaultTransaction;
import org.geotools.data.collection.ListFeatureCollection;
import org.geotools.data.shapefile.ShapefileDataStore;
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
import org.geotools.referencing.CRS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.annotation.OnWriteError;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 스트리밍 Shapefile Writer - 메모리 병목 해결의 핵심
*
* <p>기존 문제: DefaultFeatureCollection에 모든 feature를 누적 (500MB-4GB)
*
* <p>해결 방안: GeoTools Transaction API를 사용한 chunk 단위 incremental write
*
* <p>메모리 효과:
*
* <ul>
* <li>기존: 전체 데이터 (500MB-4GB)
* <li>신규: 청크 크기만 (40MB per 1000 records)
* </ul>
*
* <p>동작 방식:
*
* <ol>
* <li>open(): 첫 번째 chunk 전에 DataStore 생성, Transaction 시작
* <li>write(): 각 chunk를 ListFeatureCollection으로 변환하여 FeatureStore에 추가
* <li>afterStep(): 모든 chunk 완료 후 Transaction commit
* </ol>
*/
@Component
@StepScope
public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(StreamingShapefileWriter.class);
private final FeatureTypeFactory featureTypeFactory;
@Value("#{jobParameters['outputPath']}")
private String outputPath;
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
private String crsCode;
private ShapefileDataStore dataStore;
private Transaction transaction;
private SimpleFeatureStore featureStore;
private SimpleFeatureType featureType;
private int chunkCount = 0;
private int totalRecordCount = 0;
private Class<?> geometryType; // Geometry type from validation tasklet
public StreamingShapefileWriter(FeatureTypeFactory featureTypeFactory) {
this.featureTypeFactory = featureTypeFactory;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
// StepExecutionContext에서 geometry type 읽기
String geomTypeStr = stepExecution.getExecutionContext().getString("geometryType");
if (geomTypeStr != null) {
this.geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
log.info("Geometry type from validation: {}", geometryType.getSimpleName());
}
// 출력 디렉토리 생성
try {
Path outputDir = Paths.get(outputPath).getParent();
if (outputDir != null && !Files.exists(outputDir)) {
Files.createDirectories(outputDir);
log.info("Created output directory: {}", outputDir);
}
} catch (IOException e) {
throw new ItemStreamException("Failed to create output directory", e);
}
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
log.info("Opening StreamingShapefileWriter for: {}", outputPath);
try {
// CRS 설정
CoordinateReferenceSystem crs = CRS.decode(crsCode);
// Geometry type이 아직 설정되지 않은 경우 기본값 사용
if (geometryType == null) {
geometryType = featureTypeFactory.parseGeometryType(null);
log.warn("Geometry type not set, using default: Geometry.class");
}
// SimpleFeatureType 생성 (FeatureTypeFactory 사용)
featureType = featureTypeFactory.createFeatureType(crs, geometryType);
// ShapefileDataStore 생성
File shpFile = new File(outputPath);
ShapefileDataStoreFactory factory = new ShapefileDataStoreFactory();
Map<String, Serializable> params = new HashMap<>();
params.put("url", shpFile.toURI().toURL());
params.put("create spatial index", Boolean.TRUE);
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
dataStore.createSchema(featureType);
// Transaction 시작
transaction = new DefaultTransaction("create");
// FeatureStore 가져오기
String typeName = dataStore.getTypeNames()[0];
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
featureStore.setTransaction(transaction);
log.info("ShapefileDataStore initialized successfully");
} catch (FactoryException e) {
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
} catch (IOException e) {
throw new ItemStreamException("Failed to create shapefile at: " + outputPath, e);
}
}
@Override
public void write(Chunk<? extends SimpleFeature> chunk) throws Exception {
if (chunk.isEmpty()) {
return;
}
chunkCount++;
List<SimpleFeature> items = (List<SimpleFeature>) chunk.getItems();
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"Writing chunk #{} with {} features (total so far: {})",
chunkCount,
itemCount,
totalRecordCount);
// ListFeatureCollection으로 변환 (청크만 담김)
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
// FeatureStore에 추가 (트랜잭션은 열린 상태 유지)
featureStore.addFeatures(collection);
// 청크 완료 후 collection은 GC됨
log.debug("Chunk #{} written successfully", chunkCount);
}
@AfterStep
public void afterStep() {
log.info(
"All chunks written. Committing transaction for {} records in {} chunks",
totalRecordCount,
chunkCount);
try {
if (transaction != null) {
transaction.commit();
log.info("Transaction committed successfully");
}
} catch (IOException e) {
log.error("Failed to commit transaction", e);
throw new ItemStreamException("Failed to commit shapefile transaction", e);
} finally {
cleanup();
}
}
@Override
public void close() throws ItemStreamException {
cleanup();
}
@OnWriteError
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
log.error("Error writing chunk #{}: {}", chunkCount, exception.getMessage(), exception);
try {
if (transaction != null) {
transaction.rollback();
log.info("Transaction rolled back due to error");
}
// 부분 파일 삭제
File shpFile = new File(outputPath);
if (shpFile.exists()) {
shpFile.delete();
log.info("Deleted partial shapefile: {}", outputPath);
}
} catch (IOException e) {
log.error("Failed to rollback transaction", e);
} finally {
cleanup();
}
}
private void cleanup() {
if (transaction != null) {
try {
transaction.close();
} catch (IOException e) {
log.warn("Failed to close transaction", e);
}
transaction = null;
}
if (dataStore != null) {
dataStore.dispose();
dataStore = null;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// Checkpoint: chunk 완료 시 호출됨
executionContext.putInt("chunkCount", chunkCount);
executionContext.putInt("totalRecordCount", totalRecordCount);
}
}

View File

@@ -7,6 +7,11 @@ import java.nio.file.Paths;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.DefaultApplicationArguments;
@@ -20,14 +25,20 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
private final ShapefileConverterService converterService;
private final GeoServerRegistrationService geoServerService;
private final ConverterProperties converterProperties;
private final JobLauncher jobLauncher;
private final Job mergedModeJob;
public ConverterCommandLineRunner(
ShapefileConverterService converterService,
GeoServerRegistrationService geoServerService,
ConverterProperties converterProperties) {
ConverterProperties converterProperties,
JobLauncher jobLauncher,
Job mergedModeJob) {
this.converterService = converterService;
this.geoServerService = geoServerService;
this.converterProperties = converterProperties;
this.jobLauncher = jobLauncher;
this.mergedModeJob = mergedModeJob;
}
@Override
@@ -37,13 +48,21 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
List<String> profiles = appArgs.getOptionValues("spring.profiles.active");
log.info("profiles.active={}", profiles);
// GeoServer 등록 모드
if (appArgs.containsOption("upload-shp")) {
handleRegistration(appArgs);
return;
}
// Existing shapefile generation logic
log.info("=== PostgreSQL to Shapefile Converter ===");
// Batch 모드 체크
if (appArgs.containsOption("batch") || appArgs.containsOption("use-batch")) {
handleBatchMode(appArgs);
return;
}
// 기존 로직 (Legacy mode) - 향후 deprecated 예정
log.warn("Running in LEGACY mode. Consider using --batch flag for better performance.");
log.info("=== PostgreSQL to Shapefile Converter (Legacy) ===");
log.info("Inference ID: {}", converterProperties.getInferenceId());
List<String> mapIds = converterProperties.getMapIds();
@@ -67,6 +86,103 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
}
}
/**
* Spring Batch 기반 처리
*
* <p>메모리 최적화와 단계별 실행 지원
*
* @param appArgs ApplicationArguments
* @throws Exception Exception
*/
private void handleBatchMode(ApplicationArguments appArgs) throws Exception {
log.info("=== Spring Batch Mode: Shapefile Converter ===");
log.info("Inference ID: {}", converterProperties.getInferenceId());
log.info("Batch IDs: {}", converterProperties.getBatchIds());
log.info("Output directory: {}", converterProperties.getOutputBaseDir());
log.info("CRS: {}", converterProperties.getCrs());
log.info("Chunk size: {}", converterProperties.getBatch().getChunkSize());
log.info("==============================================");
// Job Parameters 구성
JobParameters jobParams = buildJobParameters(appArgs);
// Job 실행
JobExecution execution = jobLauncher.run(mergedModeJob, jobParams);
log.info("==============================================");
log.info("Job Execution Status: {}", execution.getStatus());
log.info("Exit Status: {}", execution.getExitStatus());
if (execution.getStatus().isUnsuccessful()) {
log.error("Job execution failed");
System.exit(1);
}
log.info("Job completed successfully");
}
/**
* Job Parameters 구성
*
* @param appArgs ApplicationArguments
* @return JobParameters
*/
private JobParameters buildJobParameters(ApplicationArguments appArgs) {
JobParametersBuilder builder = new JobParametersBuilder();
// 기본 파라미터
builder.addString("inferenceId", converterProperties.getInferenceId());
builder.addString("batchIds", joinBatchIds(converterProperties.getBatchIds()));
builder.addString("crs", converterProperties.getCrs());
builder.addLong("timestamp", System.currentTimeMillis()); // Job 실행 시각 (고유성 보장)
// 출력 경로 구성 (Shapefile + GeoJSON 생성, GeoServer는 Shapefile만 등록)
String outputDir =
Paths.get(
converterProperties.getOutputBaseDir(),
converterProperties.getInferenceId(),
"merge")
.toString();
String shapefilePath =
Paths.get(outputDir, converterProperties.getInferenceId() + ".shp").toString();
String geoJsonPath =
Paths.get(outputDir, converterProperties.getInferenceId() + ".geojson").toString();
builder.addString("outputPath", shapefilePath);
builder.addString("geoJsonOutputPath", geoJsonPath);
builder.addString("zipBaseName", converterProperties.getInferenceId());
// Layer name (GeoServer 등록용)
String layerName = "inference_" + converterProperties.getInferenceId();
builder.addString("layerName", layerName);
// GeoServer 등록 여부
boolean geoServerEnabled =
appArgs.containsOption("geoserver.enabled")
&& Boolean.parseBoolean(firstOption(appArgs, "geoserver.enabled"));
builder.addString("geoserver.enabled", String.valueOf(geoServerEnabled));
// Batch 설정
builder.addLong("fetchSize", (long) converterProperties.getBatch().getFetchSize());
return builder.toJobParameters();
}
/**
* Batch IDs를 콤마로 구분된 문자열로 변환
*
* @param batchIds List of batch IDs
* @return "252,253,257" 형태의 문자열
*/
private String joinBatchIds(List<Long> batchIds) {
if (batchIds == null || batchIds.isEmpty()) {
throw new IllegalStateException("batch-ids must be specified");
}
return String.join(",", batchIds.stream().map(String::valueOf).toArray(String[]::new));
}
private void handleRegistration(ApplicationArguments appArgs) {
// --help
if (appArgs.containsOption("help") || appArgs.containsOption("h")) {

View File

@@ -14,6 +14,7 @@ public class ConverterProperties {
private String outputBaseDir;
private String crs;
private String mode;
private BatchConfig batch = new BatchConfig(); // Spring Batch 설정
public String getInferenceId() {
return inferenceId;
@@ -62,4 +63,61 @@ public class ConverterProperties {
public String getMode() {
return mode;
}
public BatchConfig getBatch() {
return batch;
}
public void setBatch(BatchConfig batch) {
this.batch = batch;
}
/** Spring Batch 관련 설정 */
public static class BatchConfig {
private int chunkSize = 1000;
private int skipLimit = 100;
private int fetchSize = 1000;
private boolean enablePartitioning = false;
private int partitionConcurrency = 4; // Map ID별 병렬 처리 동시성
public int getChunkSize() {
return chunkSize;
}
public void setChunkSize(int chunkSize) {
this.chunkSize = chunkSize;
}
public int getSkipLimit() {
return skipLimit;
}
public void setSkipLimit(int skipLimit) {
this.skipLimit = skipLimit;
}
public int getFetchSize() {
return fetchSize;
}
public void setFetchSize(int fetchSize) {
this.fetchSize = fetchSize;
}
public boolean isEnablePartitioning() {
return enablePartitioning;
}
public void setEnablePartitioning(boolean enablePartitioning) {
this.enablePartitioning = enablePartitioning;
}
public int getPartitionConcurrency() {
return partitionConcurrency;
}
public void setPartitionConcurrency(int partitionConcurrency) {
this.partitionConcurrency = partitionConcurrency;
}
}
}

View File

@@ -1,7 +1,6 @@
package com.kamco.makesample.config;
import jakarta.validation.constraints.NotBlank;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
@@ -12,11 +11,9 @@ import org.springframework.validation.annotation.Validated;
public class GeoServerProperties {
@NotBlank(message = "GeoServer base URL must be configured")
@Value("${layer.geoserver-url}")
private String baseUrl;
@NotBlank(message = "GeoServer workspace must be configured")
@Value("${layer.workspace}")
private String workspace;
@NotBlank(message = "GeoServer datastore must be configured")

View File

@@ -2,12 +2,25 @@ package com.kamco.makesample.service;
import com.kamco.makesample.exception.GeometryConversionException;
import org.locationtech.jts.geom.Geometry;
import org.locationtech.jts.geom.MultiPolygon;
import org.locationtech.jts.geom.Polygon;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKTReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* WKT ↔ JTS Geometry 변환 서비스
*
* <p>주요 기능:
*
* <ul>
* <li>PostGIS WKT 문자열을 JTS Geometry 객체로 변환
* <li>MultiPolygon을 자동으로 Polygon으로 변환 (첫 번째 polygon 추출)
* <li>Geometry 유효성 검증
* </ul>
*/
@Component
public class GeometryConverter {
@@ -15,27 +28,107 @@ public class GeometryConverter {
private final WKTReader wktReader;
// MultiPolygon → Polygon 변환 통계
private static int multiPolygonConversionCount = 0;
public GeometryConverter() {
this.wktReader = new WKTReader();
}
/**
* WKT 문자열을 JTS Geometry로 변환
*
* <p>변환 규칙:
*
* <ul>
* <li>MultiPolygon → 첫 번째 Polygon만 추출 (자동 변환)
* <li>Polygon → 그대로 사용
* <li>기타 타입 → 그대로 사용
* </ul>
*
* @param wkt PostGIS ST_AsText() 결과 (WKT 형식)
* @return JTS Geometry 객체 (MultiPolygon은 Polygon으로 변환됨)
*/
public Geometry convertWKTToJTS(String wkt) {
if (wkt == null || wkt.trim().isEmpty()) {
return null;
}
try {
// WKT 문자열을 JTS Geometry로 변환
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 1. WKT 문자열을 JTS Geometry로 변환
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Geometry jtsGeometry = wktReader.read(wkt);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 2. MultiPolygon → Polygon 자동 변환
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// Shapefile은 단일 타입만 허용하므로 MultiPolygon을 Polygon으로 통일
// 첫 번째 polygon만 추출 (나머지는 버림)
if (jtsGeometry instanceof MultiPolygon) {
MultiPolygon multiPolygon = (MultiPolygon) jtsGeometry;
if (multiPolygon.getNumGeometries() > 0) {
// 첫 번째 polygon 추출
Polygon firstPolygon = (Polygon) multiPolygon.getGeometryN(0);
// 통계 및 로깅 (첫 10건만 로그 출력)
multiPolygonConversionCount++;
if (multiPolygonConversionCount <= 10) {
log.debug(
"Converting MultiPolygon to Polygon (first geometry only). "
+ "MultiPolygon had {} geometries. Conversion count: {}",
multiPolygon.getNumGeometries(),
multiPolygonConversionCount);
} else if (multiPolygonConversionCount == 11) {
log.debug("MultiPolygon → Polygon conversion ongoing... (suppressing further logs)");
}
// 여러 polygon을 포함한 경우 경고
if (multiPolygon.getNumGeometries() > 1) {
log.warn(
"MultiPolygon contains {} polygons. Only the first polygon will be used. "
+ "Other {} polygon(s) will be discarded.",
multiPolygon.getNumGeometries(),
multiPolygon.getNumGeometries() - 1);
}
jtsGeometry = firstPolygon;
} else {
log.warn("Empty MultiPolygon detected (0 geometries). Returning null.");
return null;
}
}
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 3. Geometry 유효성 검증
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
if (!jtsGeometry.isValid()) {
log.warn("Invalid geometry detected: {}", jtsGeometry);
log.warn(
"Invalid geometry detected: type={}, reason={}",
jtsGeometry.getGeometryType(),
jtsGeometry.isValid() ? "valid" : "invalid");
}
return jtsGeometry;
} catch (ParseException e) {
throw new GeometryConversionException(
"Failed to convert WKT to JTS geometry: " + e.getMessage(), e);
}
}
/**
* MultiPolygon → Polygon 변환 통계 반환
*
* @return 변환된 MultiPolygon 개수
*/
public static int getMultiPolygonConversionCount() {
return multiPolygonConversionCount;
}
/** 변환 통계 리셋 (테스트용) */
public static void resetConversionCount() {
multiPolygonConversionCount = 0;
}
}

View File

@@ -10,12 +10,6 @@ spring:
idle-timeout: 600000
max-lifetime: 1800000
application:
name: make-shapefile-service
main:
web-application-type: none # Disable web server for CLI application
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
# Optional: omit or set empty to create merged shapefile for all batch-ids
@@ -45,7 +39,3 @@ logging:
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
layer:
geoserver-url: http://label-tile.gs.dabeeo.com
workspace: cd

View File

@@ -5,16 +5,17 @@ spring:
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 5
maximum-pool-size: 10 # Increased for batch processing
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
application:
name: make-shapefile-service
main:
web-application-type: none # Disable web server for CLI application
batch:
job:
enabled: false # CLI에서 명시적으로 실행
jdbc:
initialize-schema: always # 메타데이터 테이블 자동 생성
table-prefix: BATCH_
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
@@ -27,15 +28,19 @@ converter:
output-base-dir: '/data/model_output/export/'
crs: 'EPSG:5186'
batch:
chunk-size: 1000 # 청크 크기 (메모리 ~40MB per chunk)
skip-limit: 100 # 청크당 skip 허용 건수
fetch-size: 1000 # JDBC 커서 fetch 크기
enable-partitioning: false # 초기에는 비활성화
partition-concurrency: 4 # Map ID별 병렬 처리 동시성 (4=~300MB, 8=~600MB)
geoserver:
base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver'
base-url: 'https://aicd-geo.e-kamco.com:18080/geoserver'
workspace: 'cd'
overwrite-existing: true
connection-timeout: 30000
read-timeout: 60000
# Credentials (optional - environment variables take precedence)
# Uncomment and set values for development convenience
# For production, use GEOSERVER_USERNAME and GEOSERVER_PASSWORD environment variables
username: 'admin'
password: 'geoserver'
@@ -45,9 +50,3 @@ logging:
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
layer:
geoserver-url: https://kamco.geo-dev.gs.dabeeo.com
wms-path: geoserver/cd
wmts-path: geoserver/cd/gwc/service
workspace: cd

View File

@@ -3,3 +3,5 @@ spring:
name: make-shapefile-service
profiles:
active: prod
main:
web-application-type: none # Disable web server for CLI application

View File

@@ -0,0 +1,69 @@
-- 배치 실행 이력 테이블
-- 각 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유 추적
CREATE TABLE IF NOT EXISTS batch_execution_history (
id BIGSERIAL PRIMARY KEY,
-- Spring Batch 메타데이터 참조
job_execution_id BIGINT NOT NULL,
step_execution_id BIGINT,
-- 스텝 정보
step_name VARCHAR(100) NOT NULL,
step_type VARCHAR(50), -- 'TASKLET' or 'CHUNK'
-- 시간 정보
start_time TIMESTAMP NOT NULL,
end_time TIMESTAMP,
duration_ms BIGINT, -- 소요 시간 (밀리초)
-- 실행 결과
status VARCHAR(20) NOT NULL, -- 'STARTED', 'COMPLETED', 'FAILED'
exit_code VARCHAR(20), -- 'COMPLETED', 'FAILED', 'UNKNOWN'
exit_message TEXT,
-- 에러 정보
error_message TEXT,
error_stack_trace TEXT,
-- 처리 통계 (chunk 기반 스텝용)
read_count BIGINT DEFAULT 0,
write_count BIGINT DEFAULT 0,
commit_count BIGINT DEFAULT 0,
rollback_count BIGINT DEFAULT 0,
skip_count BIGINT DEFAULT 0,
-- 배치 파라미터 정보
batch_ids TEXT,
inference_id VARCHAR(100),
-- 메타데이터
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 인덱스 생성
CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_job_exec_id
ON batch_execution_history(job_execution_id);
CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_step_exec_id
ON batch_execution_history(step_execution_id);
CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_step_name
ON batch_execution_history(step_name);
CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_status
ON batch_execution_history(status);
CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_start_time
ON batch_execution_history(start_time DESC);
-- 코멘트 추가
COMMENT ON TABLE batch_execution_history IS '배치 실행 이력 추적 테이블 - 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유 기록';
COMMENT ON COLUMN batch_execution_history.job_execution_id IS 'Spring Batch Job Execution ID';
COMMENT ON COLUMN batch_execution_history.step_execution_id IS 'Spring Batch Step Execution ID';
COMMENT ON COLUMN batch_execution_history.step_name IS '스텝 이름 (validateGeometryType, generateShapefile 등)';
COMMENT ON COLUMN batch_execution_history.duration_ms IS '스텝 소요 시간 (밀리초)';
COMMENT ON COLUMN batch_execution_history.status IS '실행 상태 (STARTED, COMPLETED, FAILED)';
COMMENT ON COLUMN batch_execution_history.error_message IS '에러 발생 시 에러 메시지';
COMMENT ON COLUMN batch_execution_history.error_stack_trace IS '에러 발생 시 전체 스택 트레이스';