oom처리
This commit is contained in:
@@ -4,14 +4,14 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
|
|||||||
|
|
||||||
## Project Overview
|
## Project Overview
|
||||||
|
|
||||||
Spring Boot 3.5.7 CLI application that converts PostgreSQL PostGIS spatial data to ESRI shapefiles and GeoJSON formats. The application uses **Spring Batch** for memory-efficient processing of large datasets (1M+ records) and supports automatic GeoServer layer registration via REST API.
|
Spring Boot 3.5.7 / Java 21 CLI application that converts PostgreSQL PostGIS spatial data to ESRI shapefiles and GeoJSON formats. The application uses **Spring Batch** for memory-efficient processing of large datasets (1M+ records) and supports automatic GeoServer layer registration via REST API.
|
||||||
|
|
||||||
**Key Features**:
|
**Key Features**:
|
||||||
- Memory-optimized batch processing (90-95% reduction: 2-13GB → 150-200MB)
|
- Memory-optimized batch processing (90-95% reduction: 2-13GB → 150-200MB)
|
||||||
- Chunk-based streaming with cursor pagination (fetch-size: 1000)
|
- Chunk-based streaming with cursor pagination (fetch-size: 1000)
|
||||||
- Automatic geometry validation and type conversion (MultiPolygon → Polygon)
|
- Automatic geometry validation and type conversion (MultiPolygon → Polygon)
|
||||||
- Coordinate system validation (EPSG:5186 Korean 2000 / Central Belt)
|
- Coordinate system validation (EPSG:5186 Korean 2000 / Central Belt)
|
||||||
- Dual execution modes: Spring Batch (recommended) and Legacy mode
|
- Three execution modes: Spring Batch (recommended), Legacy, and GeoServer registration-only
|
||||||
|
|
||||||
## Build and Run Commands
|
## Build and Run Commands
|
||||||
|
|
||||||
@@ -25,6 +25,8 @@ Spring Boot 3.5.7 CLI application that converts PostgreSQL PostGIS spatial data
|
|||||||
|
|
||||||
Output: `build/libs/shp-exporter.jar` (fixed name, no version suffix)
|
Output: `build/libs/shp-exporter.jar` (fixed name, no version suffix)
|
||||||
|
|
||||||
|
> **Note**: The `Dockerfile` currently references `shp-exporter-v2.jar` in its `COPY` step, which does not match the actual build output. Update the Dockerfile if building a Docker image.
|
||||||
|
|
||||||
### Run Application
|
### Run Application
|
||||||
|
|
||||||
#### Spring Batch Mode (Recommended)
|
#### Spring Batch Mode (Recommended)
|
||||||
@@ -113,6 +115,7 @@ ConverterCommandLineRunner
|
|||||||
→ JdbcCursorItemReader (fetch-size: 1000)
|
→ JdbcCursorItemReader (fetch-size: 1000)
|
||||||
→ FeatureConversionProcessor (InferenceResult → SimpleFeature)
|
→ FeatureConversionProcessor (InferenceResult → SimpleFeature)
|
||||||
→ StreamingShapefileWriter (chunk-based append)
|
→ StreamingShapefileWriter (chunk-based append)
|
||||||
|
→ Step 2-1: PostShapefileUpdateTasklet (post-export DB UPDATE hook)
|
||||||
→ Step 3: generateGeoJsonStep (chunk-oriented, same pattern)
|
→ Step 3: generateGeoJsonStep (chunk-oriented, same pattern)
|
||||||
→ Step 4: CreateZipTasklet (creates .zip for GeoServer)
|
→ Step 4: CreateZipTasklet (creates .zip for GeoServer)
|
||||||
→ Step 5: GeoServerRegistrationTasklet (conditional, if --geoserver.enabled=true)
|
→ Step 5: GeoServerRegistrationTasklet (conditional, if --geoserver.enabled=true)
|
||||||
@@ -379,6 +382,21 @@ public Step myNewStep(JobRepository jobRepository,
|
|||||||
```
|
```
|
||||||
4. **Always include `BatchExecutionHistoryListener`** to track execution metrics
|
4. **Always include `BatchExecutionHistoryListener`** to track execution metrics
|
||||||
|
|
||||||
|
### Post-Export DB Hook (`PostShapefileUpdateTasklet`)
|
||||||
|
|
||||||
|
`PostShapefileUpdateTasklet` runs immediately after `generateShapefileStep` and is designed as a placeholder for running UPDATE SQL after shapefile export (e.g., marking rows as exported). The SQL body is intentionally left as a `// TODO` — add your UPDATE statement inside `execute()`:
|
||||||
|
|
||||||
|
```java
|
||||||
|
// batch/tasklet/PostShapefileUpdateTasklet.java
|
||||||
|
int updated = jdbcTemplate.update(
|
||||||
|
"UPDATE some_table SET status = 'EXPORTED' WHERE batch_id = ANY(?)",
|
||||||
|
ps -> {
|
||||||
|
ps.setArray(1, ps.getConnection().createArrayOf("bigint", batchIdList.toArray()));
|
||||||
|
});
|
||||||
|
```
|
||||||
|
|
||||||
|
Job parameters available: `inferenceId` (String), `batchIds` (comma-separated String → `List<Long>`).
|
||||||
|
|
||||||
### Modifying ItemReader Configuration
|
### Modifying ItemReader Configuration
|
||||||
|
|
||||||
ItemReaders are **not thread-safe**. Each step requires its own instance:
|
ItemReaders are **not thread-safe**. Each step requires its own instance:
|
||||||
|
|||||||
@@ -30,7 +30,7 @@ ENV GEOSERVER_USERNAME=""
|
|||||||
ENV GEOSERVER_PASSWORD=""
|
ENV GEOSERVER_PASSWORD=""
|
||||||
|
|
||||||
ENTRYPOINT ["java", \
|
ENTRYPOINT ["java", \
|
||||||
"-Xmx4g", "-Xms512m", \
|
"-Xmx128g", "-Xms8g", \
|
||||||
"-XX:+UseG1GC", \
|
"-XX:+UseG1GC", \
|
||||||
"-XX:MaxGCPauseMillis=200", \
|
"-XX:MaxGCPauseMillis=200", \
|
||||||
"-XX:G1HeapRegionSize=16m", \
|
"-XX:G1HeapRegionSize=16m", \
|
||||||
|
|||||||
@@ -6,12 +6,15 @@ import com.kamco.makesample.batch.processor.FeatureConversionProcessor;
|
|||||||
import com.kamco.makesample.batch.tasklet.CreateZipTasklet;
|
import com.kamco.makesample.batch.tasklet.CreateZipTasklet;
|
||||||
import com.kamco.makesample.batch.tasklet.GeoServerRegistrationTasklet;
|
import com.kamco.makesample.batch.tasklet.GeoServerRegistrationTasklet;
|
||||||
import com.kamco.makesample.batch.tasklet.GeometryTypeValidationTasklet;
|
import com.kamco.makesample.batch.tasklet.GeometryTypeValidationTasklet;
|
||||||
|
import com.kamco.makesample.batch.tasklet.PostShapefileUpdateTasklet;
|
||||||
import com.kamco.makesample.batch.writer.MapIdGeoJsonWriter;
|
import com.kamco.makesample.batch.writer.MapIdGeoJsonWriter;
|
||||||
import com.kamco.makesample.batch.writer.MapIdShapefileWriter;
|
import com.kamco.makesample.batch.writer.MapIdShapefileWriter;
|
||||||
import com.kamco.makesample.batch.writer.StreamingGeoJsonWriter;
|
import com.kamco.makesample.batch.writer.StreamingGeoJsonWriter;
|
||||||
import com.kamco.makesample.batch.writer.StreamingShapefileWriter;
|
import com.kamco.makesample.batch.writer.StreamingShapefileWriter;
|
||||||
import com.kamco.makesample.model.InferenceResult;
|
import com.kamco.makesample.model.InferenceResult;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
import org.geotools.api.feature.simple.SimpleFeature;
|
import org.geotools.api.feature.simple.SimpleFeature;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@@ -52,33 +55,35 @@ public class MergedModeJobConfig {
|
|||||||
/**
|
/**
|
||||||
* MERGED 모드 Job 정의
|
* MERGED 모드 Job 정의
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param validateGeometryTypeStep Geometry type 검증 Step
|
* @param validateGeometryTypeStep Geometry type 검증 Step
|
||||||
* @param generateShapefileStep Shapefile 생성 Step
|
* @param generateShapefileStep Shapefile 생성 Step
|
||||||
* @param generateGeoJsonStep GeoJSON 생성 Step
|
* @param generateGeoJsonStep GeoJSON 생성 Step
|
||||||
* @param createZipStep ZIP 생성 Step
|
* @param createZipStep ZIP 생성 Step
|
||||||
* @param registerToGeoServerStep GeoServer 등록 Step (merge 폴더의 shapefile만)
|
* @param registerToGeoServerStep GeoServer 등록 Step (merge 폴더의 shapefile만)
|
||||||
* @param generateMapIdFilesStep Map ID별 파일 생성 Step (병렬 처리)
|
* @param generateMapIdFilesStep Map ID별 파일 생성 Step (병렬 처리)
|
||||||
* @return Job
|
* @return Job
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Job mergedModeJob(
|
public Job mergedModeJob(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
Step validateGeometryTypeStep,
|
Step validateGeometryTypeStep,
|
||||||
Step generateShapefileStep,
|
Step generateShapefileStep,
|
||||||
Step generateGeoJsonStep,
|
Step postShapefileUpdateStep,
|
||||||
Step createZipStep,
|
Step generateGeoJsonStep,
|
||||||
Step registerToGeoServerStep,
|
Step createZipStep,
|
||||||
Step generateMapIdFilesStep) {
|
Step registerToGeoServerStep,
|
||||||
|
Step generateMapIdFilesStep) {
|
||||||
|
|
||||||
return new JobBuilder("mergedModeJob", jobRepository)
|
return new JobBuilder("mergedModeJob", jobRepository)
|
||||||
.start(validateGeometryTypeStep)
|
.start(validateGeometryTypeStep)
|
||||||
.next(generateShapefileStep)
|
.next(generateShapefileStep)
|
||||||
.next(generateGeoJsonStep)
|
.next(generateGeoJsonStep)
|
||||||
.next(createZipStep)
|
.next(createZipStep)
|
||||||
.next(registerToGeoServerStep) // Conditional execution
|
.next(registerToGeoServerStep) // Conditional execution
|
||||||
.next(generateMapIdFilesStep) // Map ID별 개별 파일 생성
|
.next(generateMapIdFilesStep) // Map ID별 개별 파일 생성
|
||||||
.build();
|
.next(postShapefileUpdateStep) // Shapefile 생성 후 UPDATE 실행
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -86,23 +91,23 @@ public class MergedModeJobConfig {
|
|||||||
*
|
*
|
||||||
* <p>Shapefile은 homogeneous geometry type을 요구하므로 사전 검증
|
* <p>Shapefile은 homogeneous geometry type을 요구하므로 사전 검증
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param transactionManager TransactionManager
|
* @param transactionManager TransactionManager
|
||||||
* @param validationTasklet GeometryTypeValidationTasklet
|
* @param validationTasklet GeometryTypeValidationTasklet
|
||||||
* @param historyListener BatchExecutionHistoryListener
|
* @param historyListener BatchExecutionHistoryListener
|
||||||
* @return Step
|
* @return Step
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Step validateGeometryTypeStep(
|
public Step validateGeometryTypeStep(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
PlatformTransactionManager transactionManager,
|
PlatformTransactionManager transactionManager,
|
||||||
GeometryTypeValidationTasklet validationTasklet,
|
GeometryTypeValidationTasklet validationTasklet,
|
||||||
BatchExecutionHistoryListener historyListener) {
|
BatchExecutionHistoryListener historyListener) {
|
||||||
|
|
||||||
return new StepBuilder("validateGeometryTypeStep", jobRepository)
|
return new StepBuilder("validateGeometryTypeStep", jobRepository)
|
||||||
.tasklet(validationTasklet, transactionManager)
|
.tasklet(validationTasklet, transactionManager)
|
||||||
.listener(historyListener)
|
.listener(historyListener)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -116,32 +121,54 @@ public class MergedModeJobConfig {
|
|||||||
* <li>Writer: StreamingShapefileWriter (chunk 단위 쓰기)
|
* <li>Writer: StreamingShapefileWriter (chunk 단위 쓰기)
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param transactionManager TransactionManager
|
* @param transactionManager TransactionManager
|
||||||
* @param shapefileReader ItemReader (Shapefile용)
|
* @param shapefileReader ItemReader (Shapefile용)
|
||||||
* @param featureConversionProcessor ItemProcessor
|
* @param featureConversionProcessor ItemProcessor
|
||||||
* @param shapefileWriter ItemWriter
|
* @param shapefileWriter ItemWriter
|
||||||
* @param chunkSize Chunk size (default: 1000)
|
* @param chunkSize Chunk size (default: 1000)
|
||||||
* @param historyListener BatchExecutionHistoryListener
|
* @param historyListener BatchExecutionHistoryListener
|
||||||
* @return Step
|
* @return Step
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Step generateShapefileStep(
|
public Step generateShapefileStep(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
PlatformTransactionManager transactionManager,
|
PlatformTransactionManager transactionManager,
|
||||||
JdbcCursorItemReader<InferenceResult> shapefileReader,
|
JdbcCursorItemReader<InferenceResult> shapefileReader,
|
||||||
FeatureConversionProcessor featureConversionProcessor,
|
FeatureConversionProcessor featureConversionProcessor,
|
||||||
StreamingShapefileWriter shapefileWriter,
|
StreamingShapefileWriter shapefileWriter,
|
||||||
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
|
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
|
||||||
BatchExecutionHistoryListener historyListener) {
|
BatchExecutionHistoryListener historyListener) {
|
||||||
|
|
||||||
return new StepBuilder("generateShapefileStep", jobRepository)
|
return new StepBuilder("generateShapefileStep", jobRepository)
|
||||||
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
|
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
|
||||||
.reader(shapefileReader)
|
.reader(shapefileReader)
|
||||||
.processor(featureConversionProcessor)
|
.processor(featureConversionProcessor)
|
||||||
.writer(shapefileWriter)
|
.writer(shapefileWriter)
|
||||||
.listener(historyListener)
|
.listener(historyListener)
|
||||||
.build();
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Step 2-1: Shapefile 생성 후 UPDATE 실행
|
||||||
|
*
|
||||||
|
* @param jobRepository JobRepository
|
||||||
|
* @param transactionManager TransactionManager
|
||||||
|
* @param postShapefileUpdateTasklet PostShapefileUpdateTasklet
|
||||||
|
* @param historyListener BatchExecutionHistoryListener
|
||||||
|
* @return Step
|
||||||
|
*/
|
||||||
|
@Bean
|
||||||
|
public Step postShapefileUpdateStep(
|
||||||
|
JobRepository jobRepository,
|
||||||
|
PlatformTransactionManager transactionManager,
|
||||||
|
PostShapefileUpdateTasklet postShapefileUpdateTasklet,
|
||||||
|
BatchExecutionHistoryListener historyListener) {
|
||||||
|
|
||||||
|
return new StepBuilder("postShapefileUpdateStep", jobRepository)
|
||||||
|
.tasklet(postShapefileUpdateTasklet, transactionManager)
|
||||||
|
.listener(historyListener)
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -149,54 +176,54 @@ public class MergedModeJobConfig {
|
|||||||
*
|
*
|
||||||
* <p>Shapefile과 동일한 데이터를 GeoJSON 형식으로 출력
|
* <p>Shapefile과 동일한 데이터를 GeoJSON 형식으로 출력
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param transactionManager TransactionManager
|
* @param transactionManager TransactionManager
|
||||||
* @param geoJsonReader ItemReader (GeoJSON용 - 별도 인스턴스)
|
* @param geoJsonReader ItemReader (GeoJSON용 - 별도 인스턴스)
|
||||||
* @param featureConversionProcessor ItemProcessor (재사용)
|
* @param featureConversionProcessor ItemProcessor (재사용)
|
||||||
* @param geoJsonWriter ItemWriter
|
* @param geoJsonWriter ItemWriter
|
||||||
* @param chunkSize Chunk size
|
* @param chunkSize Chunk size
|
||||||
* @param historyListener BatchExecutionHistoryListener
|
* @param historyListener BatchExecutionHistoryListener
|
||||||
* @return Step
|
* @return Step
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Step generateGeoJsonStep(
|
public Step generateGeoJsonStep(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
PlatformTransactionManager transactionManager,
|
PlatformTransactionManager transactionManager,
|
||||||
JdbcCursorItemReader<InferenceResult> geoJsonReader,
|
JdbcCursorItemReader<InferenceResult> geoJsonReader,
|
||||||
FeatureConversionProcessor featureConversionProcessor,
|
FeatureConversionProcessor featureConversionProcessor,
|
||||||
StreamingGeoJsonWriter geoJsonWriter,
|
StreamingGeoJsonWriter geoJsonWriter,
|
||||||
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
|
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
|
||||||
BatchExecutionHistoryListener historyListener) {
|
BatchExecutionHistoryListener historyListener) {
|
||||||
|
|
||||||
return new StepBuilder("generateGeoJsonStep", jobRepository)
|
return new StepBuilder("generateGeoJsonStep", jobRepository)
|
||||||
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
|
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
|
||||||
.reader(geoJsonReader)
|
.reader(geoJsonReader)
|
||||||
.processor(featureConversionProcessor)
|
.processor(featureConversionProcessor)
|
||||||
.writer(geoJsonWriter)
|
.writer(geoJsonWriter)
|
||||||
.listener(historyListener)
|
.listener(historyListener)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Step 4: ZIP 파일 생성
|
* Step 4: ZIP 파일 생성
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param transactionManager TransactionManager
|
* @param transactionManager TransactionManager
|
||||||
* @param createZipTasklet CreateZipTasklet
|
* @param createZipTasklet CreateZipTasklet
|
||||||
* @param historyListener BatchExecutionHistoryListener
|
* @param historyListener BatchExecutionHistoryListener
|
||||||
* @return Step
|
* @return Step
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Step createZipStep(
|
public Step createZipStep(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
PlatformTransactionManager transactionManager,
|
PlatformTransactionManager transactionManager,
|
||||||
CreateZipTasklet createZipTasklet,
|
CreateZipTasklet createZipTasklet,
|
||||||
BatchExecutionHistoryListener historyListener) {
|
BatchExecutionHistoryListener historyListener) {
|
||||||
|
|
||||||
return new StepBuilder("createZipStep", jobRepository)
|
return new StepBuilder("createZipStep", jobRepository)
|
||||||
.tasklet(createZipTasklet, transactionManager)
|
.tasklet(createZipTasklet, transactionManager)
|
||||||
.listener(historyListener)
|
.listener(historyListener)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -204,23 +231,23 @@ public class MergedModeJobConfig {
|
|||||||
*
|
*
|
||||||
* <p>Conditional execution: geoserver.enabled=true일 때만 실행
|
* <p>Conditional execution: geoserver.enabled=true일 때만 실행
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param transactionManager TransactionManager
|
* @param transactionManager TransactionManager
|
||||||
* @param registrationTasklet GeoServerRegistrationTasklet
|
* @param registrationTasklet GeoServerRegistrationTasklet
|
||||||
* @param historyListener BatchExecutionHistoryListener
|
* @param historyListener BatchExecutionHistoryListener
|
||||||
* @return Step
|
* @return Step
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Step registerToGeoServerStep(
|
public Step registerToGeoServerStep(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
PlatformTransactionManager transactionManager,
|
PlatformTransactionManager transactionManager,
|
||||||
GeoServerRegistrationTasklet registrationTasklet,
|
GeoServerRegistrationTasklet registrationTasklet,
|
||||||
BatchExecutionHistoryListener historyListener) {
|
BatchExecutionHistoryListener historyListener) {
|
||||||
|
|
||||||
return new StepBuilder("registerToGeoServerStep", jobRepository)
|
return new StepBuilder("registerToGeoServerStep", jobRepository)
|
||||||
.tasklet(registrationTasklet, transactionManager)
|
.tasklet(registrationTasklet, transactionManager)
|
||||||
.listener(historyListener)
|
.listener(historyListener)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -229,21 +256,21 @@ public class MergedModeJobConfig {
|
|||||||
* <p>각 map_id마다 개별 shapefile과 geojson 파일을 순차적으로 생성합니다. SyncTaskExecutor를 명시적으로 지정하여 병렬 실행을 방지하고
|
* <p>각 map_id마다 개별 shapefile과 geojson 파일을 순차적으로 생성합니다. SyncTaskExecutor를 명시적으로 지정하여 병렬 실행을 방지하고
|
||||||
* DB connection pool 고갈 방지
|
* DB connection pool 고갈 방지
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param partitioner MapIdPartitioner
|
* @param partitioner MapIdPartitioner
|
||||||
* @param mapIdWorkerStep Worker Step (각 파티션에서 실행)
|
* @param mapIdWorkerStep Worker Step (각 파티션에서 실행)
|
||||||
* @return Partitioned Step
|
* @return Partitioned Step
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Step generateMapIdFilesStep(
|
public Step generateMapIdFilesStep(
|
||||||
JobRepository jobRepository, MapIdPartitioner partitioner, Step mapIdWorkerStep) {
|
JobRepository jobRepository, MapIdPartitioner partitioner, Step mapIdWorkerStep) {
|
||||||
|
|
||||||
return new StepBuilder("generateMapIdFilesStep", jobRepository)
|
return new StepBuilder("generateMapIdFilesStep", jobRepository)
|
||||||
.partitioner("mapIdWorker", partitioner)
|
.partitioner("mapIdWorker", partitioner)
|
||||||
.step(mapIdWorkerStep)
|
.step(mapIdWorkerStep)
|
||||||
.taskExecutor(new SyncTaskExecutor()) // 명시적으로 순차 실행 지정
|
.taskExecutor(new SyncTaskExecutor()) // 명시적으로 순차 실행 지정
|
||||||
.listener(partitioner) // Register partitioner as StepExecutionListener
|
.listener(partitioner) // Register partitioner as StepExecutionListener
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -251,39 +278,39 @@ public class MergedModeJobConfig {
|
|||||||
*
|
*
|
||||||
* <p>각 파티션에서 실행되며, 해당 map_id의 데이터를 읽어 shapefile과 geojson을 동시에 생성합니다.
|
* <p>각 파티션에서 실행되며, 해당 map_id의 데이터를 읽어 shapefile과 geojson을 동시에 생성합니다.
|
||||||
*
|
*
|
||||||
* @param jobRepository JobRepository
|
* @param jobRepository JobRepository
|
||||||
* @param transactionManager TransactionManager
|
* @param transactionManager TransactionManager
|
||||||
* @param mapIdModeReader ItemReader (map_id별)
|
* @param mapIdModeReader ItemReader (map_id별)
|
||||||
* @param featureConversionProcessor ItemProcessor
|
* @param featureConversionProcessor ItemProcessor
|
||||||
* @param mapIdShapefileWriter Shapefile Writer
|
* @param mapIdShapefileWriter Shapefile Writer
|
||||||
* @param mapIdGeoJsonWriter GeoJSON Writer
|
* @param mapIdGeoJsonWriter GeoJSON Writer
|
||||||
* @param chunkSize Chunk size
|
* @param chunkSize Chunk size
|
||||||
* @param historyListener BatchExecutionHistoryListener
|
* @param historyListener BatchExecutionHistoryListener
|
||||||
* @return Worker Step
|
* @return Worker Step
|
||||||
*/
|
*/
|
||||||
@Bean
|
@Bean
|
||||||
public Step mapIdWorkerStep(
|
public Step mapIdWorkerStep(
|
||||||
JobRepository jobRepository,
|
JobRepository jobRepository,
|
||||||
PlatformTransactionManager transactionManager,
|
PlatformTransactionManager transactionManager,
|
||||||
JdbcCursorItemReader<InferenceResult> mapIdModeReader,
|
JdbcCursorItemReader<InferenceResult> mapIdModeReader,
|
||||||
FeatureConversionProcessor featureConversionProcessor,
|
FeatureConversionProcessor featureConversionProcessor,
|
||||||
MapIdShapefileWriter mapIdShapefileWriter,
|
MapIdShapefileWriter mapIdShapefileWriter,
|
||||||
MapIdGeoJsonWriter mapIdGeoJsonWriter,
|
MapIdGeoJsonWriter mapIdGeoJsonWriter,
|
||||||
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
|
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
|
||||||
BatchExecutionHistoryListener historyListener) {
|
BatchExecutionHistoryListener historyListener) {
|
||||||
|
|
||||||
// CompositeItemWriter로 shapefile과 geojson 동시 생성
|
// CompositeItemWriter로 shapefile과 geojson 동시 생성
|
||||||
CompositeItemWriter<SimpleFeature> compositeWriter = new CompositeItemWriter<>();
|
CompositeItemWriter<SimpleFeature> compositeWriter = new CompositeItemWriter<>();
|
||||||
compositeWriter.setDelegates(Arrays.asList(mapIdShapefileWriter, mapIdGeoJsonWriter));
|
compositeWriter.setDelegates(Arrays.asList(mapIdShapefileWriter, mapIdGeoJsonWriter));
|
||||||
|
|
||||||
return new StepBuilder("mapIdWorkerStep", jobRepository)
|
return new StepBuilder("mapIdWorkerStep", jobRepository)
|
||||||
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
|
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
|
||||||
.reader(mapIdModeReader)
|
.reader(mapIdModeReader)
|
||||||
.processor(featureConversionProcessor)
|
.processor(featureConversionProcessor)
|
||||||
.writer(compositeWriter)
|
.writer(compositeWriter)
|
||||||
.stream(mapIdShapefileWriter)
|
.stream(mapIdShapefileWriter)
|
||||||
.stream(mapIdGeoJsonWriter)
|
.stream(mapIdGeoJsonWriter)
|
||||||
.listener(historyListener)
|
.listener(historyListener)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -139,6 +139,8 @@ public class GeometryTypeValidationTasklet implements Tasklet {
|
|||||||
SELECT COUNT(*) as valid_count
|
SELECT COUNT(*) as valid_count
|
||||||
FROM inference_results_testing
|
FROM inference_results_testing
|
||||||
WHERE batch_id = ANY(?)
|
WHERE batch_id = ANY(?)
|
||||||
|
AND after_c IS NOT NULL
|
||||||
|
AND after_p IS NOT NULL
|
||||||
AND geometry IS NOT NULL
|
AND geometry IS NOT NULL
|
||||||
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
|
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
|
||||||
AND ST_SRID(geometry) = 5186
|
AND ST_SRID(geometry) = 5186
|
||||||
|
|||||||
@@ -10,13 +10,11 @@ import java.nio.file.Paths;
|
|||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import org.geotools.api.data.SimpleFeatureStore;
|
import org.geotools.api.data.FeatureWriter;
|
||||||
import org.geotools.api.data.Transaction;
|
import org.geotools.api.data.Transaction;
|
||||||
import org.geotools.api.feature.simple.SimpleFeature;
|
import org.geotools.api.feature.simple.SimpleFeature;
|
||||||
import org.geotools.api.feature.simple.SimpleFeatureType;
|
import org.geotools.api.feature.simple.SimpleFeatureType;
|
||||||
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
|
||||||
import org.geotools.data.DefaultTransaction;
|
|
||||||
import org.geotools.data.collection.ListFeatureCollection;
|
|
||||||
import org.geotools.data.shapefile.ShapefileDataStore;
|
import org.geotools.data.shapefile.ShapefileDataStore;
|
||||||
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
|
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@@ -69,8 +67,7 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
private String outputPath;
|
private String outputPath;
|
||||||
|
|
||||||
private ShapefileDataStore dataStore;
|
private ShapefileDataStore dataStore;
|
||||||
private Transaction transaction;
|
private FeatureWriter<SimpleFeatureType, SimpleFeature> featureWriter;
|
||||||
private SimpleFeatureStore featureStore;
|
|
||||||
private SimpleFeatureType featureType;
|
private SimpleFeatureType featureType;
|
||||||
|
|
||||||
private int chunkCount = 0;
|
private int chunkCount = 0;
|
||||||
@@ -145,13 +142,9 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
|
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
|
||||||
dataStore.createSchema(featureType);
|
dataStore.createSchema(featureType);
|
||||||
|
|
||||||
// Transaction 시작
|
// FeatureWriter를 append 모드로 직접 열기 (Diff 누적 없이 파일에 직접 씀)
|
||||||
transaction = new DefaultTransaction("create");
|
|
||||||
|
|
||||||
// FeatureStore 가져오기
|
|
||||||
String typeName = dataStore.getTypeNames()[0];
|
String typeName = dataStore.getTypeNames()[0];
|
||||||
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
|
featureWriter = dataStore.getFeatureWriterAppend(typeName, Transaction.AUTO_COMMIT);
|
||||||
featureStore.setTransaction(transaction);
|
|
||||||
|
|
||||||
startTimeMs = System.currentTimeMillis();
|
startTimeMs = System.currentTimeMillis();
|
||||||
log.info("ShapefileDataStore initialized successfully");
|
log.info("ShapefileDataStore initialized successfully");
|
||||||
@@ -172,10 +165,13 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
int itemCount = items.size();
|
int itemCount = items.size();
|
||||||
totalRecordCount += itemCount;
|
totalRecordCount += itemCount;
|
||||||
|
|
||||||
// FeatureStore에 추가 - GeoTools ShapefileDataStore는 Diff 없이 파일에 직접 씀
|
// FeatureWriter로 직접 append - Diff 누적 없이 O(1) per record
|
||||||
// 트랜잭션은 afterStep()에서 단일 커밋 (per-chunk 커밋 시 setTransaction()이 .shx 재스캔 → O(n²))
|
for (SimpleFeature feature : items) {
|
||||||
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
|
SimpleFeature newFeature = featureWriter.next();
|
||||||
featureStore.addFeatures(collection);
|
newFeature.setAttributes(feature.getAttributes());
|
||||||
|
newFeature.setDefaultGeometry(feature.getDefaultGeometry());
|
||||||
|
featureWriter.write();
|
||||||
|
}
|
||||||
|
|
||||||
if (chunkCount % LOG_INTERVAL_CHUNKS == 0) {
|
if (chunkCount % LOG_INTERVAL_CHUNKS == 0) {
|
||||||
logProgress();
|
logProgress();
|
||||||
@@ -191,15 +187,10 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
chunkCount);
|
chunkCount);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (transaction != null) {
|
|
||||||
transaction.commit();
|
|
||||||
log.info("Final transaction committed successfully");
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("Failed to commit final transaction", e);
|
|
||||||
throw new ItemStreamException("Failed to commit shapefile transaction", e);
|
|
||||||
} finally {
|
|
||||||
cleanup();
|
cleanup();
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to close shapefile writer", e);
|
||||||
|
throw new ItemStreamException("Failed to close shapefile writer", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -212,23 +203,13 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
|
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
|
||||||
log.error("Error writing chunk #{}: {}", chunkCount, exception.getMessage(), exception);
|
log.error("Error writing chunk #{}: {}", chunkCount, exception.getMessage(), exception);
|
||||||
|
|
||||||
try {
|
cleanup();
|
||||||
if (transaction != null) {
|
|
||||||
transaction.rollback();
|
|
||||||
log.info("Transaction rolled back due to error");
|
|
||||||
}
|
|
||||||
|
|
||||||
// 부분 파일 삭제
|
// 부분 파일 삭제
|
||||||
File shpFile = new File(outputPath);
|
File shpFile = new File(outputPath);
|
||||||
if (shpFile.exists()) {
|
if (shpFile.exists()) {
|
||||||
shpFile.delete();
|
shpFile.delete();
|
||||||
log.info("Deleted partial shapefile: {}", outputPath);
|
log.info("Deleted partial shapefile: {}", outputPath);
|
||||||
}
|
|
||||||
|
|
||||||
} catch (IOException e) {
|
|
||||||
log.error("Failed to rollback transaction", e);
|
|
||||||
} finally {
|
|
||||||
cleanup();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -264,13 +245,13 @@ public class StreamingShapefileWriter implements ItemStreamWriter<SimpleFeature>
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void cleanup() {
|
private void cleanup() {
|
||||||
if (transaction != null) {
|
if (featureWriter != null) {
|
||||||
try {
|
try {
|
||||||
transaction.close();
|
featureWriter.close();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.warn("Failed to close transaction", e);
|
log.warn("Failed to close feature writer", e);
|
||||||
}
|
}
|
||||||
transaction = null;
|
featureWriter = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (dataStore != null) {
|
if (dataStore != null) {
|
||||||
|
|||||||
Reference in New Issue
Block a user