commit
This commit is contained in:
@@ -25,7 +25,7 @@ import org.springframework.batch.item.support.CompositeItemWriter;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
import org.springframework.core.task.TaskExecutor;
|
import org.springframework.core.task.SyncTaskExecutor;
|
||||||
import org.springframework.transaction.PlatformTransactionManager;
|
import org.springframework.transaction.PlatformTransactionManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -224,27 +224,25 @@ public class MergedModeJobConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Step 6: Map ID별 개별 파일 생성 (Partitioned Step)
|
* Step 6: Map ID별 개별 파일 생성 (Partitioned Step - Sequential)
|
||||||
*
|
*
|
||||||
* <p>각 map_id마다 개별 shapefile과 geojson 파일을 병렬로 생성합니다.
|
* <p>각 map_id마다 개별 shapefile과 geojson 파일을 순차적으로 생성합니다. SyncTaskExecutor를 명시적으로 지정하여 병렬 실행을 방지하고
|
||||||
|
* DB connection pool 고갈 방지
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param partitioner MapIdPartitioner
|
* @param partitioner MapIdPartitioner
|
||||||
* @param mapIdWorkerStep Worker Step (각 파티션에서 실행)
|
* @param mapIdWorkerStep Worker Step (각 파티션에서 실행)
|
||||||
* @param mapIdTaskExecutor TaskExecutor for parallel execution
|
|
||||||
* @return Partitioned Step
|
* @return Partitioned Step
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Step generateMapIdFilesStep(
|
public Step generateMapIdFilesStep(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository, MapIdPartitioner partitioner, Step mapIdWorkerStep) {
|
||||||
MapIdPartitioner partitioner,
|
|
||||||
Step mapIdWorkerStep,
|
|
||||||
TaskExecutor mapIdTaskExecutor) {
|
|
||||||
|
|
||||||
return new StepBuilder("generateMapIdFilesStep", jobRepository)
|
return new StepBuilder("generateMapIdFilesStep", jobRepository)
|
||||||
.partitioner("mapIdWorker", partitioner)
|
.partitioner("mapIdWorker", partitioner)
|
||||||
.step(mapIdWorkerStep)
|
.step(mapIdWorkerStep)
|
||||||
.taskExecutor(mapIdTaskExecutor)
|
.taskExecutor(new SyncTaskExecutor()) // 명시적으로 순차 실행 지정
|
||||||
|
.listener(partitioner) // Register partitioner as StepExecutionListener
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -279,11 +277,13 @@ public class MergedModeJobConfig {
|
|||||||
compositeWriter.setDelegates(Arrays.asList(mapIdShapefileWriter, mapIdGeoJsonWriter));
|
compositeWriter.setDelegates(Arrays.asList(mapIdShapefileWriter, mapIdGeoJsonWriter));
|
||||||
|
|
||||||
return new StepBuilder("mapIdWorkerStep", jobRepository)
|
return new StepBuilder("mapIdWorkerStep", jobRepository)
|
||||||
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
|
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
|
||||||
.reader(mapIdModeReader)
|
.reader(mapIdModeReader)
|
||||||
.processor(featureConversionProcessor)
|
.processor(featureConversionProcessor)
|
||||||
.writer(compositeWriter)
|
.writer(compositeWriter)
|
||||||
.listener(historyListener)
|
.stream(mapIdShapefileWriter)
|
||||||
.build();
|
.stream(mapIdGeoJsonWriter)
|
||||||
|
.listener(historyListener)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,6 +8,11 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.batch.core.ExitStatus;
|
||||||
|
import org.springframework.batch.core.StepExecution;
|
||||||
|
import org.springframework.batch.core.StepExecutionListener;
|
||||||
|
import org.springframework.batch.core.annotation.AfterStep;
|
||||||
|
import org.springframework.batch.core.annotation.BeforeStep;
|
||||||
import org.springframework.batch.core.partition.support.Partitioner;
|
import org.springframework.batch.core.partition.support.Partitioner;
|
||||||
import org.springframework.batch.item.ExecutionContext;
|
import org.springframework.batch.item.ExecutionContext;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@@ -19,18 +24,49 @@ import org.springframework.stereotype.Component;
|
|||||||
* geojson을 생성합니다.
|
* geojson을 생성합니다.
|
||||||
*/
|
*/
|
||||||
@Component
|
@Component
|
||||||
public class MapIdPartitioner implements Partitioner {
|
public class MapIdPartitioner implements Partitioner, StepExecutionListener {
|
||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MapIdPartitioner.class);
|
private static final Logger log = LoggerFactory.getLogger(MapIdPartitioner.class);
|
||||||
|
|
||||||
private final InferenceResultRepository repository;
|
private final InferenceResultRepository repository;
|
||||||
private final ConverterProperties properties;
|
private final ConverterProperties properties;
|
||||||
|
|
||||||
|
private String geometryType; // Populated in @BeforeStep, used in partition()
|
||||||
|
|
||||||
public MapIdPartitioner(InferenceResultRepository repository, ConverterProperties properties) {
|
public MapIdPartitioner(InferenceResultRepository repository, ConverterProperties properties) {
|
||||||
this.repository = repository;
|
this.repository = repository;
|
||||||
this.properties = properties;
|
this.properties = properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@BeforeStep
|
||||||
|
public void beforeStep(StepExecution stepExecution) {
|
||||||
|
log.info("MapIdPartitioner.beforeStep() - retrieving geometryType from job context");
|
||||||
|
|
||||||
|
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
|
||||||
|
|
||||||
|
if (jobExecutionContext.containsKey("geometryType")) {
|
||||||
|
this.geometryType = jobExecutionContext.getString("geometryType");
|
||||||
|
log.info("Retrieved geometryType from job context: {}", this.geometryType);
|
||||||
|
} else {
|
||||||
|
String errorMsg =
|
||||||
|
"geometryType not found in job execution context. "
|
||||||
|
+ "GeometryTypeValidationTasklet must run before partitioning.";
|
||||||
|
log.error(errorMsg);
|
||||||
|
throw new IllegalStateException(errorMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.geometryType == null || this.geometryType.isEmpty()) {
|
||||||
|
throw new IllegalStateException("geometryType is null or empty in job context");
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info("MapIdPartitioner ready with geometryType: {}", this.geometryType);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterStep
|
||||||
|
public ExitStatus afterStep(StepExecution stepExecution) {
|
||||||
|
return ExitStatus.COMPLETED;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, ExecutionContext> partition(int gridSize) {
|
public Map<String, ExecutionContext> partition(int gridSize) {
|
||||||
List<Long> batchIds = properties.getBatchIds();
|
List<Long> batchIds = properties.getBatchIds();
|
||||||
@@ -60,16 +96,20 @@ public class MapIdPartitioner implements Partitioner {
|
|||||||
context.putString("outputPath", buildShapefilePath(outputBaseDir, inferenceId, mapId));
|
context.putString("outputPath", buildShapefilePath(outputBaseDir, inferenceId, mapId));
|
||||||
context.putString("geoJsonOutputPath", buildGeoJsonPath(outputBaseDir, inferenceId, mapId));
|
context.putString("geoJsonOutputPath", buildGeoJsonPath(outputBaseDir, inferenceId, mapId));
|
||||||
|
|
||||||
|
// Propagate geometryType to partition context
|
||||||
|
context.putString("geometryType", this.geometryType);
|
||||||
|
|
||||||
partitions.put("partition-" + mapId, context);
|
partitions.put("partition-" + mapId, context);
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
"Created partition for map_id: {}, shapefile: {}, geojson: {}",
|
"Created partition for map_id: {}, shapefile: {}, geojson: {}, geometryType: {}",
|
||||||
mapId,
|
mapId,
|
||||||
context.getString("outputPath"),
|
context.getString("outputPath"),
|
||||||
context.getString("geoJsonOutputPath"));
|
context.getString("geoJsonOutputPath"),
|
||||||
|
context.getString("geometryType"));
|
||||||
}
|
}
|
||||||
|
|
||||||
log.info("Created {} partitions for parallel processing", partitions.size());
|
log.info("Created {} partitions with geometryType: {}", partitions.size(), this.geometryType);
|
||||||
|
|
||||||
return partitions;
|
return partitions;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory;
|
|||||||
import org.springframework.batch.core.StepExecution;
|
import org.springframework.batch.core.StepExecution;
|
||||||
import org.springframework.batch.core.annotation.BeforeStep;
|
import org.springframework.batch.core.annotation.BeforeStep;
|
||||||
import org.springframework.batch.core.configuration.annotation.StepScope;
|
import org.springframework.batch.core.configuration.annotation.StepScope;
|
||||||
|
import org.springframework.batch.item.ExecutionContext;
|
||||||
import org.springframework.batch.item.ItemProcessor;
|
import org.springframework.batch.item.ItemProcessor;
|
||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@@ -51,9 +52,19 @@ public class FeatureConversionProcessor implements ItemProcessor<InferenceResult
|
|||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
public void beforeStep(StepExecution stepExecution) {
|
public void beforeStep(StepExecution stepExecution) {
|
||||||
// StepExecutionContext에서 geometry type 읽기
|
// Job ExecutionContext에서 geometry type 읽기 (이전 Step에서 설정한 값)
|
||||||
String geomTypeStr = stepExecution.getExecutionContext().getString("geometryType");
|
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
|
||||||
Class<?> geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
|
String geomTypeStr = null;
|
||||||
|
Class<?> geometryType;
|
||||||
|
|
||||||
|
// geometryType이 설정되어 있는지 확인 (빈 데이터셋인 경우 설정되지 않을 수 있음)
|
||||||
|
if (jobExecutionContext.containsKey("geometryType")) {
|
||||||
|
geomTypeStr = jobExecutionContext.getString("geometryType");
|
||||||
|
} else {
|
||||||
|
log.warn("geometryType not set in Job ExecutionContext (empty dataset). Using default");
|
||||||
|
}
|
||||||
|
|
||||||
|
geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// CRS 설정
|
// CRS 설정
|
||||||
|
|||||||
@@ -197,9 +197,11 @@ public class GeometryTypeValidationTasklet implements Tasklet {
|
|||||||
log.info("========================================");
|
log.info("========================================");
|
||||||
|
|
||||||
// Polygon을 기본 타입으로 설정 (자동 변환 후 모든 geometry가 Polygon이 됨)
|
// Polygon을 기본 타입으로 설정 (자동 변환 후 모든 geometry가 Polygon이 됨)
|
||||||
|
// Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
|
||||||
chunkContext
|
chunkContext
|
||||||
.getStepContext()
|
.getStepContext()
|
||||||
.getStepExecution()
|
.getStepExecution()
|
||||||
|
.getJobExecution()
|
||||||
.getExecutionContext()
|
.getExecutionContext()
|
||||||
.putString("geometryType", "ST_Polygon");
|
.putString("geometryType", "ST_Polygon");
|
||||||
|
|
||||||
@@ -237,6 +239,18 @@ public class GeometryTypeValidationTasklet implements Tasklet {
|
|||||||
log.warn("========================================");
|
log.warn("========================================");
|
||||||
log.warn("Proceeding with empty dataset (no files will be generated)");
|
log.warn("Proceeding with empty dataset (no files will be generated)");
|
||||||
|
|
||||||
|
// 빈 데이터셋이지만 Writer 초기화를 위해 기본 geometry type 설정
|
||||||
|
// ST_Polygon을 기본값으로 사용 (이 프로젝트의 주요 geometry type)
|
||||||
|
// Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
|
||||||
|
chunkContext
|
||||||
|
.getStepContext()
|
||||||
|
.getStepExecution()
|
||||||
|
.getJobExecution()
|
||||||
|
.getExecutionContext()
|
||||||
|
.putString("geometryType", "ST_Polygon");
|
||||||
|
|
||||||
|
log.info("Set default geometry type to ST_Polygon for empty dataset");
|
||||||
|
|
||||||
// 빈 데이터셋이지만 Step은 성공으로 처리
|
// 빈 데이터셋이지만 Step은 성공으로 처리
|
||||||
// 다음 Step(Reader)에서 읽을 데이터가 없으므로 자연스럽게 종료됨
|
// 다음 Step(Reader)에서 읽을 데이터가 없으므로 자연스럽게 종료됨
|
||||||
return RepeatStatus.FINISHED;
|
return RepeatStatus.FINISHED;
|
||||||
@@ -264,9 +278,11 @@ public class GeometryTypeValidationTasklet implements Tasklet {
|
|||||||
geometryType = "ST_Polygon";
|
geometryType = "ST_Polygon";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
|
||||||
chunkContext
|
chunkContext
|
||||||
.getStepContext()
|
.getStepContext()
|
||||||
.getStepExecution()
|
.getStepExecution()
|
||||||
|
.getJobExecution()
|
||||||
.getExecutionContext()
|
.getExecutionContext()
|
||||||
.putString("geometryType", geometryType);
|
.putString("geometryType", geometryType);
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package com.kamco.makesample.batch.writer;
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.StringWriter;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
@@ -20,6 +21,7 @@ import org.springframework.batch.item.Chunk;
|
|||||||
import org.springframework.batch.item.ExecutionContext;
|
import org.springframework.batch.item.ExecutionContext;
|
||||||
import org.springframework.batch.item.ItemStreamException;
|
import org.springframework.batch.item.ItemStreamException;
|
||||||
import org.springframework.batch.item.ItemStreamWriter;
|
import org.springframework.batch.item.ItemStreamWriter;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -36,9 +38,12 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
private static final Logger log = LoggerFactory.getLogger(MapIdGeoJsonWriter.class);
|
private static final Logger log = LoggerFactory.getLogger(MapIdGeoJsonWriter.class);
|
||||||
|
|
||||||
private String outputPath;
|
@Value("#{stepExecutionContext['mapId']}")
|
||||||
private String mapId;
|
private String mapId;
|
||||||
|
|
||||||
|
@Value("#{stepExecutionContext['geoJsonOutputPath']}")
|
||||||
|
private String outputPath;
|
||||||
|
|
||||||
private FileOutputStream outputStream;
|
private FileOutputStream outputStream;
|
||||||
private FeatureJSON featureJSON;
|
private FeatureJSON featureJSON;
|
||||||
|
|
||||||
@@ -48,10 +53,21 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
public void beforeStep(StepExecution stepExecution) {
|
public void beforeStep(StepExecution stepExecution) {
|
||||||
// StepExecutionContext에서 partition별 파라미터 읽기
|
log.info(
|
||||||
|
"MapIdGeoJsonWriter injected values: mapId='{}', outputPath='{}'",
|
||||||
|
this.mapId,
|
||||||
|
this.outputPath);
|
||||||
|
|
||||||
|
// @Value로 주입된 값 검증
|
||||||
ExecutionContext executionContext = stepExecution.getExecutionContext();
|
ExecutionContext executionContext = stepExecution.getExecutionContext();
|
||||||
this.mapId = executionContext.getString("mapId");
|
|
||||||
this.outputPath = executionContext.getString("geoJsonOutputPath");
|
if (this.mapId == null || this.outputPath == null) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
String.format(
|
||||||
|
"MapIdGeoJsonWriter requires non-null 'mapId' and 'geoJsonOutputPath' from @Value injection. "
|
||||||
|
+ "Got mapId='%s', geoJsonOutputPath='%s'. Available keys in ExecutionContext: %s",
|
||||||
|
this.mapId, this.outputPath, executionContext.entrySet()));
|
||||||
|
}
|
||||||
|
|
||||||
log.info("MapIdGeoJsonWriter initialized for map_id: {}, output: {}", mapId, outputPath);
|
log.info("MapIdGeoJsonWriter initialized for map_id: {}, output: {}", mapId, outputPath);
|
||||||
|
|
||||||
@@ -114,7 +130,10 @@ public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Feature를 GeoJSON으로 직렬화
|
// Feature를 GeoJSON으로 직렬화
|
||||||
featureJSON.writeFeature(feature, outputStream);
|
// StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
|
||||||
|
StringWriter stringWriter = new StringWriter();
|
||||||
|
featureJSON.writeFeature(feature, stringWriter);
|
||||||
|
outputStream.write(stringWriter.toString().getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
isFirstChunk = false;
|
isFirstChunk = false;
|
||||||
|
|||||||
@@ -54,9 +54,15 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
|
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
|
||||||
private String crsCode;
|
private String crsCode;
|
||||||
|
|
||||||
private String outputPath;
|
@Value("#{stepExecutionContext['mapId']}")
|
||||||
private String mapId;
|
private String mapId;
|
||||||
|
|
||||||
|
@Value("#{stepExecutionContext['outputPath']}")
|
||||||
|
private String outputPath;
|
||||||
|
|
||||||
|
@Value("#{stepExecutionContext['geometryType']}")
|
||||||
|
private String geometryTypeStr;
|
||||||
|
|
||||||
private ShapefileDataStore dataStore;
|
private ShapefileDataStore dataStore;
|
||||||
private Transaction transaction;
|
private Transaction transaction;
|
||||||
private SimpleFeatureStore featureStore;
|
private SimpleFeatureStore featureStore;
|
||||||
@@ -73,21 +79,51 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
public void beforeStep(StepExecution stepExecution) {
|
public void beforeStep(StepExecution stepExecution) {
|
||||||
// StepExecutionContext에서 partition별 파라미터 읽기
|
log.info("===== MapIdShapefileWriter.beforeStep() START =====");
|
||||||
ExecutionContext executionContext = stepExecution.getExecutionContext();
|
log.info(
|
||||||
this.mapId = executionContext.getString("mapId");
|
"Injected values: mapId='{}', outputPath='{}', geometryTypeStr='{}'",
|
||||||
this.outputPath = executionContext.getString("outputPath");
|
this.mapId,
|
||||||
|
this.outputPath,
|
||||||
|
this.geometryTypeStr);
|
||||||
|
|
||||||
log.info("MapIdShapefileWriter initialized for map_id: {}, output: {}", mapId, outputPath);
|
ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
|
||||||
|
|
||||||
// Geometry type 읽기 (validation tasklet에서 설정)
|
// Validate all required injections
|
||||||
String geomTypeStr = executionContext.getString("geometryType");
|
if (this.mapId == null || this.outputPath == null) {
|
||||||
if (geomTypeStr != null) {
|
String errorMsg =
|
||||||
this.geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
|
String.format(
|
||||||
log.debug("Geometry type from validation: {}", geometryType.getSimpleName());
|
"MapIdShapefileWriter requires non-null 'mapId' and 'outputPath'. "
|
||||||
|
+ "Got mapId='%s', outputPath='%s'. Available keys: %s",
|
||||||
|
this.mapId, this.outputPath, stepExecutionContext.entrySet());
|
||||||
|
log.error(errorMsg);
|
||||||
|
throw new IllegalStateException(errorMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 출력 디렉토리 생성
|
// Validate geometryTypeStr (fail fast)
|
||||||
|
if (this.geometryTypeStr == null || this.geometryTypeStr.isEmpty()) {
|
||||||
|
String errorMsg =
|
||||||
|
String.format(
|
||||||
|
"MapIdShapefileWriter requires non-null 'geometryType' from stepExecutionContext. "
|
||||||
|
+ "Got geometryTypeStr='%s'. Should be propagated by MapIdPartitioner. "
|
||||||
|
+ "Available keys: %s",
|
||||||
|
this.geometryTypeStr, stepExecutionContext.entrySet());
|
||||||
|
log.error(errorMsg);
|
||||||
|
throw new IllegalStateException(errorMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
log.info(
|
||||||
|
"MapIdShapefileWriter initialized for map_id: {}, output: {}, geometryType: {}",
|
||||||
|
mapId,
|
||||||
|
outputPath,
|
||||||
|
geometryTypeStr);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void open(ExecutionContext executionContext) throws ItemStreamException {
|
||||||
|
log.info("Opening shapefile writer for map_id: {}", mapId);
|
||||||
|
log.info("Using geometryTypeStr from stepExecutionContext: {}", geometryTypeStr);
|
||||||
|
|
||||||
|
// 출력 디렉토리 생성 (GeoTools가 파일을 만들기 전에 반드시 필요)
|
||||||
try {
|
try {
|
||||||
Path outputDir = Paths.get(outputPath).getParent();
|
Path outputDir = Paths.get(outputPath).getParent();
|
||||||
if (outputDir != null && !Files.exists(outputDir)) {
|
if (outputDir != null && !Files.exists(outputDir)) {
|
||||||
@@ -97,22 +133,19 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e);
|
throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void open(ExecutionContext executionContext) throws ItemStreamException {
|
|
||||||
log.info("Opening shapefile writer for map_id: {}", mapId);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
// Direct parsing (no fallback) - validated in beforeStep()
|
||||||
|
this.geometryType = featureTypeFactory.parseGeometryType(geometryTypeStr);
|
||||||
|
log.info(
|
||||||
|
"Parsed geometry type for map_id {}: {} (from: {})",
|
||||||
|
mapId,
|
||||||
|
geometryType.getSimpleName(),
|
||||||
|
geometryTypeStr);
|
||||||
|
|
||||||
// CRS 설정
|
// CRS 설정
|
||||||
CoordinateReferenceSystem crs = CRS.decode(crsCode);
|
CoordinateReferenceSystem crs = CRS.decode(crsCode);
|
||||||
|
|
||||||
// Geometry type 기본값 설정
|
|
||||||
if (geometryType == null) {
|
|
||||||
geometryType = featureTypeFactory.parseGeometryType(null);
|
|
||||||
log.warn("Geometry type not set for map_id {}, using default: Geometry.class", mapId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// SimpleFeatureType 생성
|
// SimpleFeatureType 생성
|
||||||
featureType = featureTypeFactory.createFeatureType(crs, geometryType);
|
featureType = featureTypeFactory.createFeatureType(crs, geometryType);
|
||||||
|
|
||||||
@@ -135,7 +168,10 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
|
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
|
||||||
featureStore.setTransaction(transaction);
|
featureStore.setTransaction(transaction);
|
||||||
|
|
||||||
log.info("ShapefileDataStore initialized for map_id: {}", mapId);
|
log.info(
|
||||||
|
"ShapefileDataStore initialized for map_id: {} with geometry type: {}",
|
||||||
|
mapId,
|
||||||
|
geometryType.getSimpleName());
|
||||||
|
|
||||||
} catch (FactoryException e) {
|
} catch (FactoryException e) {
|
||||||
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
|
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
|
||||||
@@ -172,7 +208,16 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
@AfterStep
|
@AfterStep
|
||||||
public void afterStep() {
|
public void afterStep() {
|
||||||
log.info(
|
log.info(
|
||||||
"[map_id: {}] All chunks written. Committing {} records in {} chunks",
|
"[map_id: {}] AfterStep called. Total {} records in {} chunks",
|
||||||
|
mapId,
|
||||||
|
totalRecordCount,
|
||||||
|
chunkCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws ItemStreamException {
|
||||||
|
log.info(
|
||||||
|
"[map_id: {}] Closing shapefile writer. Committing {} records in {} chunks",
|
||||||
mapId,
|
mapId,
|
||||||
totalRecordCount,
|
totalRecordCount,
|
||||||
chunkCount);
|
chunkCount);
|
||||||
@@ -191,11 +236,6 @@ public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void close() throws ItemStreamException {
|
|
||||||
cleanup();
|
|
||||||
}
|
|
||||||
|
|
||||||
@OnWriteError
|
@OnWriteError
|
||||||
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
|
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
|
||||||
log.error(
|
log.error(
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package com.kamco.makesample.batch.writer;
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.StringWriter;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.Paths;
|
import java.nio.file.Paths;
|
||||||
@@ -111,7 +112,10 @@ public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Feature를 GeoJSON으로 직렬화
|
// Feature를 GeoJSON으로 직렬화
|
||||||
featureJSON.writeFeature(feature, outputStream);
|
// StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
|
||||||
|
StringWriter stringWriter = new StringWriter();
|
||||||
|
featureJSON.writeFeature(feature, stringWriter);
|
||||||
|
outputStream.write(stringWriter.toString().getBytes());
|
||||||
}
|
}
|
||||||
|
|
||||||
isFirstChunk = false;
|
isFirstChunk = false;
|
||||||
|
|||||||
@@ -87,11 +87,19 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
|
|
||||||
@BeforeStep
|
@BeforeStep
|
||||||
public void beforeStep(StepExecution stepExecution) {
|
public void beforeStep(StepExecution stepExecution) {
|
||||||
// StepExecutionContext에서 geometry type 읽기
|
// Job ExecutionContext에서 geometry type 읽기 (이전 Step에서 설정한 값)
|
||||||
String geomTypeStr = stepExecution.getExecutionContext().getString("geometryType");
|
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
|
||||||
|
String geomTypeStr = null;
|
||||||
|
|
||||||
this.geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
|
// geometryType이 설정되어 있는지 확인 (빈 데이터셋인 경우 설정되지 않을 수 있음)
|
||||||
log.info("Geometry type from validation: {}", geometryType.getSimpleName());
|
if (jobExecutionContext.containsKey("geometryType")) {
|
||||||
|
geomTypeStr = jobExecutionContext.getString("geometryType");
|
||||||
|
this.geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
|
||||||
|
log.info("Geometry type from validation: {}", geometryType.getSimpleName());
|
||||||
|
} else {
|
||||||
|
log.warn(
|
||||||
|
"geometryType not set in Job ExecutionContext (empty dataset). Will use default in open()");
|
||||||
|
}
|
||||||
|
|
||||||
// 출력 디렉토리 생성
|
// 출력 디렉토리 생성
|
||||||
try {
|
try {
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ public class ConverterCommandLineRunner implements CommandLineRunner {
|
|||||||
builder.addString("zipBaseName", converterProperties.getInferenceId());
|
builder.addString("zipBaseName", converterProperties.getInferenceId());
|
||||||
|
|
||||||
// Layer name (GeoServer 등록용)
|
// Layer name (GeoServer 등록용)
|
||||||
String layerName = "inference_" + converterProperties.getInferenceId();
|
String layerName = converterProperties.getInferenceId();
|
||||||
builder.addString("layerName", layerName);
|
builder.addString("layerName", layerName);
|
||||||
|
|
||||||
// GeoServer 등록 여부
|
// GeoServer 등록 여부
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ spring:
|
|||||||
password: kamco_cds_Q!W@E#R$
|
password: kamco_cds_Q!W@E#R$
|
||||||
driver-class-name: org.postgresql.Driver
|
driver-class-name: org.postgresql.Driver
|
||||||
hikari:
|
hikari:
|
||||||
maximum-pool-size: 5
|
maximum-pool-size: 10
|
||||||
connection-timeout: 30000
|
connection-timeout: 30000
|
||||||
idle-timeout: 600000
|
idle-timeout: 600000
|
||||||
max-lifetime: 1800000
|
max-lifetime: 1800000
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ spring:
|
|||||||
password: kamco_cds_Q!W@E#R$
|
password: kamco_cds_Q!W@E#R$
|
||||||
driver-class-name: org.postgresql.Driver
|
driver-class-name: org.postgresql.Driver
|
||||||
hikari:
|
hikari:
|
||||||
maximum-pool-size: 5
|
maximum-pool-size: 10
|
||||||
connection-timeout: 30000
|
connection-timeout: 30000
|
||||||
idle-timeout: 600000
|
idle-timeout: 600000
|
||||||
max-lifetime: 1800000
|
max-lifetime: 1800000
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ spring:
|
|||||||
password: kamco_cds_Q!W@E#R$
|
password: kamco_cds_Q!W@E#R$
|
||||||
driver-class-name: org.postgresql.Driver
|
driver-class-name: org.postgresql.Driver
|
||||||
hikari:
|
hikari:
|
||||||
maximum-pool-size: 10 # Increased for batch processing
|
maximum-pool-size: 20 # Increased for batch processing
|
||||||
connection-timeout: 30000
|
connection-timeout: 30000
|
||||||
idle-timeout: 600000
|
idle-timeout: 600000
|
||||||
max-lifetime: 1800000
|
max-lifetime: 1800000
|
||||||
|
|||||||
Reference in New Issue
Block a user