Merge pull request 'feat/dean/project-change-structure' (#3) from feat/dean/project-change-structure into main

Reviewed-on: #3
This commit was merged in pull request #3.
This commit is contained in:
2026-03-11 23:44:18 +09:00
111 changed files with 4440 additions and 1131 deletions

77
.gitignore vendored Normal file
View File

@@ -0,0 +1,77 @@
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
### QueryDSL ###
/src/main/generated/
**/generated/
### Logs ###
*.log
logs/
*.log.*
### OS ###
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
### Environment ###
.env
.env.local
.env.*.local
application-local.yml
application-secret.yml
metrics-collector/.env
### Docker (local testing) ###
.dockerignore
docker-compose.override.yml
### Temporary ###
*.tmp
*.temp
*.swp
*.swo
*~
!/CLAUDE.md

86
shp-exporter/.gitignore vendored Normal file
View File

@@ -0,0 +1,86 @@
HELP.md
.gradle
build/
!gradle/wrapper/gradle-wrapper.jar
!**/src/main/**/build/
!**/src/test/**/build/
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
bin/
!**/src/main/**/bin/
!**/src/test/**/bin/
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
out/
!**/src/main/**/out/
!**/src/test/**/out/
### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
### VS Code ###
.vscode/
### QueryDSL ###
/src/main/generated/
**/generated/
### Logs ###
*.log
logs/
*.log.*
### OS ###
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db
### Environment ###
.env
.env.local
.env.*.local
application-local.yml
application-secret.yml
metrics-collector/.env
### Docker (local testing) ###
.dockerignore
docker-compose.override.yml
### Temporary ###
*.tmp
*.temp
*.swp
*.swo
*~
!/CLAUDE.md
*.jar
*.class
/build
### Output directories ###
export/
merge/
### Documentation (temporary) ###
claudedocs/

View File

@@ -1,2 +0,0 @@
#Wed Jan 14 15:14:03 KST 2026
gradle.version=8.14.3

8
shp-exporter/.idea/.gitignore generated vendored
View File

@@ -1,8 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml
# Editor-based HTTP Client requests
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

View File

@@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="CompilerConfiguration">
<bytecodeTargetLevel target="17" />
</component>
<component name="JavacSettings">
<option name="ADDITIONAL_OPTIONS_STRING" value="-parameters" />
</component>
</project>

View File

@@ -1,19 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="GradleMigrationSettings" migrationVersion="1" />
<component name="GradleSettings">
<option name="linkedExternalProjectsSettings">
<GradleProjectSettings>
<option name="delegatedBuild" value="false" />
<option name="testRunner" value="PLATFORM" />
<option name="externalProjectPath" value="$PROJECT_DIR$" />
<option name="modules">
<set>
<option value="$PROJECT_DIR$" />
</set>
</option>
<option name="resolveExternalAnnotations" value="true" />
</GradleProjectSettings>
</option>
</component>
</project>

View File

@@ -1,35 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="RemoteRepositoriesConfiguration">
<remote-repository>
<option name="id" value="central" />
<option name="name" value="Maven Central repository" />
<option name="url" value="https://repo1.maven.org/maven2" />
</remote-repository>
<remote-repository>
<option name="id" value="jboss.community" />
<option name="name" value="JBoss Community repository" />
<option name="url" value="https://repository.jboss.org/nexus/content/repositories/public/" />
</remote-repository>
<remote-repository>
<option name="id" value="MavenRepo" />
<option name="name" value="MavenRepo" />
<option name="url" value="https://repo.maven.apache.org/maven2/" />
</remote-repository>
<remote-repository>
<option name="id" value="maven" />
<option name="name" value="maven" />
<option name="url" value="https://repo.osgeo.org/repository/release/" />
</remote-repository>
<remote-repository>
<option name="id" value="maven2" />
<option name="name" value="maven2" />
<option name="url" value="https://repo.osgeo.org/repository/geotools-releases/" />
</remote-repository>
<remote-repository>
<option name="id" value="maven3" />
<option name="name" value="maven3" />
<option name="url" value="https://repo.osgeo.org/repository/snapshot/" />
</remote-repository>
</component>
</project>

View File

@@ -1,9 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="JAVA_MODULE" version="4">
<component name="NewModuleRootManager" inherit-compiler-output="true">
<exclude-output />
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="FrameworkDetectionExcludesConfiguration">
<file type="web" url="file://$PROJECT_DIR$" />
</component>
<component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="17" project-jdk-type="JavaSDK">
<output url="file://$PROJECT_DIR$/out" />
</component>
</project>

View File

@@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>

View File

@@ -1,27 +1,82 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Spring Boot CLI application that queries PostgreSQL PostGIS spatial data and converts it to ESRI shapefiles. The application processes inference results from the KAMCO database and generates geographic shapefiles for visualization in GIS applications.
Spring Boot 3.5.7 CLI application that converts PostgreSQL PostGIS spatial data to ESRI shapefiles and GeoJSON formats. The application uses **Spring Batch** for memory-efficient processing of large datasets (1M+ records) and supports automatic GeoServer layer registration via REST API.
**Key Features**:
- Memory-optimized batch processing (90-95% reduction: 2-13GB → 150-200MB)
- Chunk-based streaming with cursor pagination (fetch-size: 1000)
- Automatic geometry validation and type conversion (MultiPolygon → Polygon)
- Coordinate system validation (EPSG:5186 Korean 2000 / Central Belt)
- Dual execution modes: Spring Batch (recommended) and Legacy mode
## Build and Run Commands
### Build
```bash
./gradlew build
./gradlew build # Full build with tests
./gradlew clean build -x test # Skip tests
./gradlew spotlessApply # Apply Google Java Format (2-space indentation)
./gradlew spotlessCheck # Verify formatting without applying
```
Output: `build/libs/shp-exporter.jar` (fixed name, no version suffix)
### Run Application
#### Spring Batch Mode (Recommended)
```bash
./gradlew bootRun
# Generate shapefile + GeoJSON
./gradlew bootRun --args="--batch --converter.batch-ids[0]=252"
# With GeoServer registration
export GEOSERVER_USERNAME=admin
export GEOSERVER_PASSWORD=geoserver
./gradlew bootRun --args="--batch --geoserver.enabled=true --converter.batch-ids[0]=252"
# Using JAR (production)
java -jar build/libs/shp-exporter.jar \
--batch \
--converter.inference-id=D5E46F60FC40B1A8BE0CD1F3547AA6 \
--converter.batch-ids[0]=252 \
--converter.batch-ids[1]=253
```
Or run the built JAR:
#### Legacy Mode (Small Datasets Only)
```bash
java -jar build/libs/makesample-1.0.0.jar
./gradlew bootRun # No --batch flag
# Warning: May OOM on large datasets
```
#### Upload Shapefile to GeoServer
Set environment variables first:
```bash
export GEOSERVER_USERNAME=admin
export GEOSERVER_PASSWORD=geoserver
```
Then upload:
```bash
./gradlew bootRun --args="--upload-shp /path/to/file.shp --layer layer_name"
```
Or using JAR:
```bash
java -jar build/libs/shp-exporter.jar --upload-shp /path/to/file.shp --layer layer_name
```
#### Override Configuration via Command Line
Using Gradle (recommended - no quoting issues):
```bash
./gradlew bootRun --args="--converter.inference-id=ABC123 --converter.map-ids[0]=35813030 --converter.batch-ids[0]=252 --converter.mode=MERGED"
```
Using JAR with zsh (quote arguments with brackets):
```bash
java -jar build/libs/shp-exporter.jar '--converter.inference-id=ABC123' '--converter.map-ids[0]=35813030'
```
### Code Formatting
@@ -35,88 +90,415 @@ Check formatting without applying:
./gradlew spotlessCheck
```
### Active Profile
By default, the application runs with `spring.profiles.active=prod` (set in `application.yml`). Profile-specific configurations are in `application-{profile}.yml` files.
## Architecture
### Processing Pipeline
The application follows a layered architecture with a linear data flow:
### Dual Execution Modes
1. **CLI Entry** (`ConverterCommandLineRunner`) → Reads configuration and initiates batch processing
2. **Service Orchestration** (`ShapefileConverterService`) → Coordinates the conversion workflow for each map_id
3. **Data Access** (`InferenceResultRepository`) → Queries PostGIS database and converts WKT to JTS geometries
4. **Geometry Conversion** (`GeometryConverter`) → Converts PostGIS WKT format to JTS Geometry objects
5. **Shapefile Writing** (`ShapefileWriter`) → Uses GeoTools to generate shapefile artifacts (.shp, .shx, .dbf, .prj)
The application supports two execution modes with distinct processing pipelines:
### Key Design Points
#### Spring Batch Mode (Recommended)
**Trigger**: `--batch` flag
**Use Case**: Large datasets (100K+ records), production workloads
**Memory**: 150-200MB constant (chunk-based streaming)
**Geometry Handling**: The application uses a two-step geometry conversion process:
- PostGIS returns geometries as WKT (Well-Known Text) via `ST_AsText(geometry)`
- `GeometryConverter` parses WKT to JTS `Geometry` objects
- `ShapefileWriter` uses JTS geometries with GeoTools to write shapefiles
**Pipeline Flow**:
```
ConverterCommandLineRunner
→ JobLauncher.run(mergedModeJob)
→ Step 1: GeometryTypeValidationTasklet (validates geometry homogeneity)
→ Step 2: generateShapefileStep (chunk-oriented)
→ JdbcCursorItemReader (fetch-size: 1000)
→ FeatureConversionProcessor (InferenceResult → SimpleFeature)
→ StreamingShapefileWriter (chunk-based append)
→ Step 3: generateGeoJsonStep (chunk-oriented, same pattern)
→ Step 4: CreateZipTasklet (creates .zip for GeoServer)
→ Step 5: GeoServerRegistrationTasklet (conditional, if --geoserver.enabled=true)
→ Step 6: generateMapIdFilesStep (partitioned, sequential map_id processing)
```
**Batch Processing**: Configuration in `application.yml` drives batch execution:
- Multiple `map-ids` processed sequentially (if specified)
- If `map-ids` is null/empty, creates a merged shapefile for all batch-ids
- Each map_id filtered by `batch-ids` array
- Output directory structure: `{output-base-dir}/{inference-id}/{map-id}/` or `{output-base-dir}/{inference-id}/merge/` for merged mode
- Separate output directory created for each map_id
**Key Components**:
- `JdbcCursorItemReader`: Cursor-based streaming (no full result set loading)
- `StreamingShapefileWriter`: Opens GeoTools transaction, writes chunks incrementally, commits at end
- `GeometryTypeValidationTasklet`: Pre-validates with SQL `DISTINCT ST_GeometryType()`, auto-converts MultiPolygon
- `CompositeItemWriter`: Simultaneously writes shapefile and GeoJSON in map_id worker step
**Shapefile Constraints**: The application validates that all geometries for a single shapefile are homogeneous (same type) because shapefiles cannot contain mixed geometry types. This validation happens in `ShapefileConverterService.validateGeometries()`.
#### Legacy Mode
**Trigger**: No `--batch` flag (deprecated)
**Use Case**: Small datasets (<10K records)
**Memory**: 1.4-9GB (loads entire result set)
**Feature Schema**: GeoTools requires explicit geometry field setup:
- Default geometry field named `the_geom` (not `geometry`)
- Field names truncated to 10 characters for DBF format compatibility
- Geometry type determined from first valid geometry in result set
**Pipeline Flow**:
```
ConverterCommandLineRunner
→ ShapefileConverterService.convertAll()
→ InferenceResultRepository.findByBatchIds() (full List<InferenceResult>)
→ validateGeometries() (in-memory validation)
→ ShapefileWriter.write() (DefaultFeatureCollection accumulation)
→ GeoJsonWriter.write()
```
### Key Design Patterns
**Geometry Type Validation & Auto-Conversion**:
- Pre-validation step runs SQL `SELECT DISTINCT ST_GeometryType(geometry)` to detect mixed types
- Supports automatic conversion: `ST_MultiPolygon``ST_Polygon` (extracts first polygon only)
- Fails fast on unsupported mixed types (e.g., Polygon + LineString)
- Validates EPSG:5186 coordinate bounds (X: 125-530km, Y: -600-988km) and ST_IsValid()
- See `GeometryTypeValidationTasklet` (batch/tasklet/GeometryTypeValidationTasklet.java:1-290)
**WKT to JTS Conversion Pipeline**:
1. PostGIS query returns `ST_AsText(geometry)` as WKT string
2. `GeometryConvertingRowMapper` converts ResultSet row to `InferenceResult` with WKT string (batch/reader/GeometryConvertingRowMapper.java:1-74)
3. `FeatureConversionProcessor` uses `GeometryConverter.parseGeometry()` to convert WKT → JTS Geometry (service/GeometryConverter.java:1-92)
4. `StreamingShapefileWriter` wraps JTS geometry in GeoTools `SimpleFeature` and writes to shapefile
**Chunk-Based Transaction Management** (Spring Batch only):
```java
// StreamingShapefileWriter
@BeforeStep
public void open() {
transaction = new DefaultTransaction("create");
featureStore.setTransaction(transaction); // Long-running transaction
}
@Override
public void write(Chunk<SimpleFeature> chunk) {
ListFeatureCollection collection = new ListFeatureCollection(featureType, chunk.getItems());
featureStore.addFeatures(collection); // Append chunk to shapefile
// chunk goes out of scope → GC eligible
}
@AfterStep
public void afterStep() {
transaction.commit(); // Commit all chunks at once
transaction.close();
}
```
**PostgreSQL Array Parameter Handling**:
```java
// InferenceResultItemReaderConfig uses PreparedStatementSetter
ps -> {
Array batchIdsArray = ps.getConnection().createArrayOf("bigint", batchIds.toArray());
ps.setArray(1, batchIdsArray); // WHERE batch_id = ANY(?)
ps.setString(2, mapId);
}
```
**Output Directory Strategy**:
- Batch mode (MERGED): `{output-base-dir}/{inference-id}/merge/` → Single merged shapefile + GeoJSON
- Batch mode (map_id partitioning): `{output-base-dir}/{inference-id}/{map-id}/` → Per-map_id files
- Legacy mode: `{output-base-dir}/{inference-id}/{map-id}/` (no merge folder)
**GeoServer Registration**:
- Only shapefile ZIP is uploaded (GeoJSON not registered)
- Requires pre-created workspace 'cd' and environment variables for auth
- Conditional execution via JobParameter `geoserver.enabled`
- Non-blocking: failures logged but don't stop batch job
## Configuration
Primary configuration in `src/main/resources/application.yml`:
### Profile System
- Default profile: `prod` (set in application.yml)
- Configuration hierarchy: `application.yml``application-{profile}.yml`
- Override via: `--spring.profiles.active=dev`
### Key Configuration Properties
**Converter Settings** (`ConverterProperties.java`):
```yaml
converter:
inference-id: 'D5E46F60FC40B1A8BE0CD1F3547AA6' # Inference ID (used for output folder structure)
map-ids: ['35813030'] # List of map_ids to process (text type), omit for merged shapefile
batch-ids: [252, 253, 257] # Batch ID array filter
output-base-dir: '/kamco-nfs/dataset/export/' # Base directory for shapefile output
crs: 'EPSG:5186' # Korean 2000 / Central Belt CRS
inference-id: 'D5E46F60FC40B1A8BE0CD1F3547AA6' # Output folder name
batch-ids: [252, 253, 257] # PostgreSQL batch_id filter (required)
map-ids: [] # Legacy mode only (ignored in batch mode)
mode: 'MERGED' # Legacy mode only: MERGED, MAP_IDS, or RESOLVE
output-base-dir: '/data/model_output/export/'
crs: 'EPSG:5186' # Korean 2000 / Central Belt
batch:
chunk-size: 1000 # Records per chunk (affects memory usage)
fetch-size: 1000 # JDBC cursor fetch size
skip-limit: 100 # Max skippable records per chunk
enable-partitioning: false # Future: parallel map_id processing
```
Database connection configured via standard Spring Boot datasource properties.
**GeoServer Settings** (`GeoServerProperties.java`):
```yaml
geoserver:
base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver'
workspace: 'cd' # Must be pre-created in GeoServer
overwrite-existing: true # Delete existing layer before registration
connection-timeout: 30000 # 30 seconds
read-timeout: 60000 # 60 seconds
# Credentials from environment variables (preferred):
# GEOSERVER_USERNAME, GEOSERVER_PASSWORD
```
**Spring Batch Metadata**:
```yaml
spring:
batch:
job:
enabled: false # Prevent auto-run on startup
jdbc:
initialize-schema: always # Auto-create BATCH_* tables
```
## Database Integration
### Query Pattern
The repository uses `PreparedStatementCreator` to handle PostgreSQL array parameters:
### Query Strategies
**Spring Batch Mode** (streaming):
```sql
WHERE batch_id = ANY(?) AND map_id = ?
-- InferenceResultItemReaderConfig.java
SELECT uid, map_id, probability, before_year, after_year,
before_c, before_p, after_c, after_p,
ST_AsText(geometry) as geometry_wkt
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_X(ST_Centroid(geometry)) BETWEEN 125000 AND 530000
AND ST_Y(ST_Centroid(geometry)) BETWEEN -600000 AND 988000
AND ST_IsValid(geometry) = true
ORDER BY map_id, uid
-- Uses server-side cursor with fetch-size=1000
```
The `ANY(?)` clause requires creating a PostgreSQL array using `Connection.createArrayOf("bigint", ...)`.
**Legacy Mode** (full load):
```sql
-- InferenceResultRepository.java
SELECT uid, map_id, probability, before_year, after_year,
before_c, before_p, after_c, after_p,
ST_AsText(geometry) as geometry_wkt
FROM inference_results_testing
WHERE batch_id = ANY(?) AND map_id = ?
-- Returns full List<InferenceResult> in memory
```
**Geometry Type Validation**:
```sql
-- GeometryTypeValidationTasklet.java
SELECT DISTINCT ST_GeometryType(geometry)
FROM inference_results_testing
WHERE batch_id = ANY(?) AND geometry IS NOT NULL
-- Pre-validates homogeneous geometry requirement
```
### Field Mapping
Database columns are mapped to shapefile fields with Korean naming:
Database columns map to shapefile fields (10-character limit):
| Database Column | Shapefile Field |
|-----------------|-----------------|
| uid | uid |
| map_id | map_id |
| probability | chn_dtct_prob |
| before_year | cprs_yr |
| after_year | crtr_yr |
| before_c | bf_cls_cd |
| before_p | bf_cls_prob |
| after_c | af_cls_cd |
| after_p | af_cls_prob |
| geometry | the_geom |
| Database Column | DB Type | Shapefile Field | Shapefile Type | Notes |
|-----------------|---------|-----------------|----------------|-------|
| uid | uuid | chnDtctId | String | Change detection ID |
| map_id | text | mpqd_no | String | Map quadrant number |
| probability | float8 | chn_dtct_p | Double | Change detection probability |
| before_year | bigint | cprs_yr | Long | Comparison year |
| after_year | bigint | crtr_yr | Long | Criteria year |
| before_c | text | bf_cls_cd | String | Before classification code |
| before_p | float8 | bf_cls_pro | Double | Before classification probability |
| after_c | text | af_cls_cd | String | After classification code |
| after_p | float8 | af_cls_pro | Double | After classification probability |
| geometry | geom | the_geom | Polygon | Geometry in EPSG:5186 |
**Field name source**: See `FeatureTypeFactory.java` (batch/util/FeatureTypeFactory.java:1-104)
### Coordinate Reference System
All geometries use **EPSG:5186** (Korean 2000 / Central Belt). The PostGIS geometry column is defined as `geometry(Polygon, 5186)`, and this CRS is preserved in the output shapefile's `.prj` file via GeoTools CRS encoding.
- **CRS**: EPSG:5186 (Korean 2000 / Central Belt)
- **Valid Coordinate Bounds**: X ∈ [125km, 530km], Y ∈ [-600km, 988km]
- **Encoding**: WKT in SQL → JTS Geometry → GeoTools SimpleFeature → `.prj` file
- **Validation**: Automatic in batch mode via `ST_X(ST_Centroid())` range check
## Dependencies
Key libraries and their roles:
- **GeoTools 30.0**: Shapefile generation (`gt-shapefile`, `gt-referencing`, `gt-epsg-hsql`)
- **JTS 1.19.0**: Java Topology Suite for geometry representation
- **PostGIS JDBC 2.5.1**: PostgreSQL spatial extension support
- **Spring Boot 3.5.7**: Framework for DI, JDBC, and configuration
**Core Framework**:
- Spring Boot 3.5.7
- `spring-boot-starter`: DI container, logging
- `spring-boot-starter-jdbc`: JDBC template, HikariCP
- `spring-boot-starter-batch`: Spring Batch framework, job repository
- `spring-boot-starter-web`: RestTemplate for GeoServer API calls
- `spring-boot-starter-validation`: @NotBlank annotations
Note: `javax.media:jai_core` is excluded in `build.gradle` to avoid conflicts.
**Spatial Libraries**:
- GeoTools 30.0 (via OSGeo repository)
- `gt-shapefile`: Shapefile I/O (DataStore, FeatureStore, Transaction)
- `gt-geojson`: GeoJSON encoding/decoding
- `gt-referencing`: CRS transformations
- `gt-epsg-hsql`: EPSG database for CRS lookups
- JTS 1.19.0: Geometry primitives (Polygon, MultiPolygon, GeometryFactory)
- PostGIS JDBC 2.5.1: PostGIS geometry type support
**Database**:
- PostgreSQL JDBC Driver (latest)
- HikariCP (bundled with Spring Boot)
**Build Configuration**:
```gradle
// build.gradle
configurations.all {
exclude group: 'javax.media', module: 'jai_core' // Conflicts with GeoTools
}
bootJar {
archiveFileName = "shp-exporter.jar" // Fixed JAR name
}
spotless {
java {
googleJavaFormat('1.19.2') // 2-space indentation
}
}
```
## Development Patterns
### Adding a New Step to Spring Batch Job
When adding steps to `mergedModeJob`, follow this pattern:
1. **Create Tasklet or ItemWriter** in `batch/tasklet/` or `batch/writer/`
2. **Define Step Bean** in `MergedModeJobConfig.java`:
```java
@Bean
public Step myNewStep(JobRepository jobRepository,
PlatformTransactionManager transactionManager,
MyTasklet tasklet,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("myNewStep", jobRepository)
.tasklet(tasklet, transactionManager)
.listener(historyListener) // REQUIRED for history tracking
.build();
}
```
3. **Add to Job Flow** in `mergedModeJob()`:
```java
.next(myNewStep)
```
4. **Always include `BatchExecutionHistoryListener`** to track execution metrics
### Modifying ItemReader Configuration
ItemReaders are **not thread-safe**. Each step requires its own instance:
```java
// WRONG: Sharing reader between steps
@Bean
public JdbcCursorItemReader<InferenceResult> reader() { ... }
// RIGHT: Separate readers with @StepScope
@Bean
@StepScope // Creates new instance per step
public JdbcCursorItemReader<InferenceResult> shapefileReader() { ... }
@Bean
@StepScope
public JdbcCursorItemReader<InferenceResult> geoJsonReader() { ... }
```
See `InferenceResultItemReaderConfig.java` for working examples.
### Streaming Writers Pattern
When writing custom streaming writers, follow `StreamingShapefileWriter` pattern:
```java
@Component
@StepScope
public class MyStreamingWriter implements ItemStreamWriter<MyType> {
private Transaction transaction;
@BeforeStep
public void open(ExecutionContext context) {
// Open resources, start transaction
transaction = new DefaultTransaction("create");
}
@Override
public void write(Chunk<? extends MyType> chunk) {
// Write chunk incrementally
// Do NOT accumulate in memory
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
transaction.commit(); // Commit all chunks
transaction.close();
return ExitStatus.COMPLETED;
}
}
```
### JobParameters and StepExecutionContext
**Pass data between steps** using `StepExecutionContext`:
```java
// Step 1: Store data
stepExecution.getExecutionContext().putString("geometryType", "ST_Polygon");
// Step 2: Retrieve data
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
String geomType = stepExecution.getJobExecution()
.getExecutionContext()
.getString("geometryType");
}
```
**Job-level parameters** from command line:
```java
// ConverterCommandLineRunner.buildJobParameters()
JobParametersBuilder builder = new JobParametersBuilder();
builder.addString("inferenceId", converterProperties.getInferenceId());
builder.addLong("timestamp", System.currentTimeMillis()); // Ensures uniqueness
```
### Partitioning Pattern (Map ID Processing)
The `generateMapIdFilesStep` uses partitioning but runs **sequentially** to avoid DB connection pool exhaustion:
```java
@Bean
public Step generateMapIdFilesStep(...) {
return new StepBuilder("generateMapIdFilesStep", jobRepository)
.partitioner("mapIdWorker", partitioner)
.step(mapIdWorkerStep)
.taskExecutor(new SyncTaskExecutor()) // SEQUENTIAL execution
.build();
}
```
For parallel execution in future (requires connection pool tuning):
```java
.taskExecutor(new SimpleAsyncTaskExecutor())
.gridSize(4) // 4 concurrent workers
```
### GeoServer REST API Integration
GeoServer operations use `RestTemplate` with custom error handling:
```java
// GeoServerRegistrationService.java
try {
restTemplate.exchange(url, HttpMethod.PUT, entity, String.class);
} catch (HttpClientErrorException e) {
if (e.getStatusCode() == HttpStatus.NOT_FOUND) {
// Handle workspace not found
}
}
```
Always check workspace existence before layer registration.
### Testing Considerations
- **Unit tests**: Mock `JdbcTemplate`, `DataSource` for repository tests
- **Integration tests**: Use `@SpringBatchTest` with embedded H2 database
- **GeoTools**: Use `MemoryDataStore` for shapefile writer tests
- **Current state**: Limited test coverage (focus on critical path validation)
Refer to `claudedocs/SPRING_BATCH_MIGRATION.md` for detailed batch architecture documentation.

View File

@@ -85,7 +85,7 @@ You can override configuration values using command line arguments:
**Using JAR (zsh shell - quote arguments with brackets):**
```bash
java -jar build/libs/makesample-1.0.0.jar \
java -jar build/libs/shp-exporter.jar \
'--converter.inference-id=D5E46F60FC40B1A8BE0CD1F3547AA6' \
'--converter.map-ids[0]=35813030' \
'--converter.batch-ids[0]=252' \
@@ -97,7 +97,7 @@ java -jar build/libs/makesample-1.0.0.jar \
**Using JAR (bash shell - no quotes needed):**
```bash
java -jar build/libs/makesample-1.0.0.jar \
java -jar build/libs/shp-exporter.jar \
--converter.inference-id=D5E46F60FC40B1A8BE0CD1F3547AA6 \
--converter.map-ids[0]=35813030 \
--converter.batch-ids[0]=252 \
@@ -106,6 +106,19 @@ java -jar build/libs/makesample-1.0.0.jar \
--converter.mode=MERGED
```
```bash
java -jar build/libs/shp-exporter.jar \
--batch \
--converter.inference-id=test009 \
--converter.batch-ids[0]=111 \
--converter.batch-ids[1]=114 \
--converter.batch-ids[2]=162 \
--geoserver.enabled=true
```
**Note for zsh users:** zsh interprets square brackets `[]` as glob patterns. Always quote arguments containing brackets when using zsh.
## Building
@@ -116,7 +129,32 @@ java -jar build/libs/makesample-1.0.0.jar \
## Running
### Generate Shapefiles
### Generate Shapefiles (Spring Batch Mode - Recommended)
**New in v1.1.0**: Spring Batch mode provides memory-optimized processing for large datasets.
```bash
# MERGED mode (creates single shapefile + GeoJSON for all batch-ids)
./gradlew bootRun --args="--batch --converter.batch-ids[0]=252 --converter.batch-ids[1]=253"
# With GeoServer registration
./gradlew bootRun --args="--batch --geoserver.enabled=true --converter.batch-ids[0]=252"
```
**Output Files** (in `{output-base-dir}/{inference-id}/merge/`):
- `{inference-id}.shp` (+ .shx, .dbf, .prj) - Shapefile
- `{inference-id}.geojson` - GeoJSON file
- `{inference-id}.zip` - ZIP archive of shapefile
**Benefits**:
- 90-95% memory reduction (2-13GB → 150-200MB for 1M records)
- Chunk-based streaming (1000 records per chunk)
- Restart capability after failures
- Step-by-step execution support
See [claudedocs/SPRING_BATCH_MIGRATION.md](claudedocs/SPRING_BATCH_MIGRATION.md) for detailed documentation.
### Generate Shapefiles (Legacy Mode)
```bash
./gradlew bootRun
@@ -125,7 +163,7 @@ java -jar build/libs/makesample-1.0.0.jar \
Or run the JAR directly:
```bash
java -jar build/libs/makesample-1.0.0.jar
java -jar build/libs/shp-exporter.jar
```
### Register Shapefile to GeoServer
@@ -146,7 +184,7 @@ Then register a shapefile:
Or using the JAR:
```bash
java -jar build/libs/makesample-1.0.0.jar \
java -jar build/libs/shp-exporter.jar \
--upload-shp /path/to/shapefile.shp \
--layer layer_name
```
@@ -167,6 +205,7 @@ java -jar build/libs/makesample-1.0.0.jar \
## Output
### Legacy Mode Output
Shapefiles will be created in directories structured as `output-base-dir/inference-id/map-id/`:
```
@@ -177,9 +216,45 @@ Shapefiles will be created in directories structured as `output-base-dir/inferen
└── 35813030.prj # Projection information
```
### Spring Batch Mode Output
Output structure for MERGED mode (`output-base-dir/inference-id/merge/`):
```
/kamco-nfs/dataset/export/D5E46F60FC40B1A8BE0CD1F3547AA6/merge/
├── D5E46F60FC40B1A8BE0CD1F3547AA6.shp # Shapefile geometry
├── D5E46F60FC40B1A8BE0CD1F3547AA6.shx # Shape index
├── D5E46F60FC40B1A8BE0CD1F3547AA6.dbf # Attribute data
├── D5E46F60FC40B1A8BE0CD1F3547AA6.prj # Projection information
├── D5E46F60FC40B1A8BE0CD1F3547AA6.geojson # GeoJSON format
└── D5E46F60FC40B1A8BE0CD1F3547AA6.zip # ZIP archive (for GeoServer)
```
**Note**: Only the shapefile (.shp and related files) are registered to GeoServer. GeoJSON files are generated for alternative consumption.
## Database Query
The application executes the following query for each map_id:
### Spring Batch Mode (Recommended)
The Spring Batch mode applies comprehensive validation to ensure data quality:
```sql
ORDER BY map_id, uid
```
**Validation Criteria**:
- **Geometry Type**: Only ST_Polygon and ST_MultiPolygon (excludes Point, LineString, etc.)
- **Coordinate System**: EPSG:5186 (Korean 2000 / Central Belt)
- **Coordinate Range**: Korea territory bounds (X: 125-530km, Y: -600-988km)
- **Geometry Validity**: Valid topology (ST_IsValid)
Rows failing validation are automatically excluded from processing, ensuring clean shapefile generation.
**Performance**: See [PERFORMANCE_OPTIMIZATION.md](claudedocs/PERFORMANCE_OPTIMIZATION.md) for indexing recommendations.
### Legacy Mode
Legacy mode uses a simpler query without validation:
```sql
SELECT uid, map_id, probability, before_year, after_year,
@@ -278,8 +353,26 @@ The project uses Google Java Format with 2-space indentation:
```
src/main/java/com/kamco/makesample/
├── MakeSampleApplication.java # Main application class
├── batch/ # Spring Batch components (v1.1.0+)
│ ├── config/
│ │ ├── BatchConfiguration.java # Spring Batch configuration
│ │ └── MergedModeJobConfig.java # MERGED mode Job definition
│ ├── processor/
│ │ └── FeatureConversionProcessor.java # InferenceResult → SimpleFeature processor
│ ├── reader/
│ │ ├── GeometryConvertingRowMapper.java # WKT → JTS converter
│ │ └── InferenceResultItemReaderConfig.java # Cursor-based DB reader
│ ├── tasklet/
│ │ ├── CreateZipTasklet.java # ZIP creation tasklet
│ │ ├── GeoServerRegistrationTasklet.java # GeoServer registration tasklet
│ │ └── GeometryTypeValidationTasklet.java # Geometry validation tasklet
│ ├── util/
│ │ └── FeatureTypeFactory.java # Shared feature type creation
│ └── writer/
│ ├── StreamingGeoJsonWriter.java # Streaming GeoJSON writer
│ └── StreamingShapefileWriter.java # Streaming shapefile writer
├── cli/
│ └── ConverterCommandLineRunner.java # CLI entry point
│ └── ConverterCommandLineRunner.java # CLI entry point (batch + legacy)
├── config/
│ ├── ConverterProperties.java # Shapefile converter configuration
│ ├── GeoServerProperties.java # GeoServer configuration
@@ -293,14 +386,14 @@ src/main/java/com/kamco/makesample/
├── model/
│ └── InferenceResult.java # Domain model
├── repository/
│ └── InferenceResultRepository.java # Data access layer
│ └── InferenceResultRepository.java # Data access layer (legacy)
├── service/
│ ├── GeometryConverter.java # PostGIS to JTS conversion
│ ├── ShapefileConverterService.java # Orchestration service
│ ├── ShapefileConverterService.java # Orchestration service (legacy)
│ └── GeoServerRegistrationService.java # GeoServer REST API integration
└── writer/
├── ShapefileWriter.java # GeoTools shapefile writer
└── GeoJsonWriter.java # GeoJSON export writer
├── ShapefileWriter.java # GeoTools shapefile writer (legacy)
└── GeoJsonWriter.java # GeoJSON export writer (legacy)
```
## Dependencies
@@ -308,6 +401,7 @@ src/main/java/com/kamco/makesample/
- Spring Boot 3.5.7
- spring-boot-starter
- spring-boot-starter-jdbc
- spring-boot-starter-batch (v1.1.0+)
- spring-boot-starter-web (for RestTemplate)
- spring-boot-starter-validation (for @NotBlank annotations)
- GeoTools 30.0
@@ -383,6 +477,57 @@ SELECT COUNT(*) FROM inference_results_testing
WHERE batch_id IN (252, 253, 257) AND map_id = '35813030';
```
## Batch Execution History
### Overview
Spring Batch mode automatically tracks execution history for each step, recording:
- Start time, end time, duration
- Success/failure status
- Error messages and stack traces (if failed)
- Processing statistics (read/write/commit/rollback/skip counts)
### Table Setup
Create the `batch_execution_history` table before running batch jobs:
```bash
psql -h 192.168.2.127 -p 15432 -U kamco_cds -d kamco_cds \
-f src/main/resources/db/migration/V1__create_batch_execution_history.sql
```
### Query Examples
**View execution history for a specific job**:
```sql
SELECT step_name, start_time, end_time, duration_ms, status, read_count, write_count
FROM batch_execution_history
WHERE job_execution_id = 123
ORDER BY start_time;
```
**Check failed steps**:
```sql
SELECT job_execution_id, step_name, start_time, error_message
FROM batch_execution_history
WHERE status = 'FAILED'
ORDER BY start_time DESC
LIMIT 10;
```
**Average step duration**:
```sql
SELECT step_name,
COUNT(*) as executions,
ROUND(AVG(duration_ms) / 1000.0, 2) as avg_duration_sec
FROM batch_execution_history
WHERE status = 'COMPLETED'
GROUP BY step_name
ORDER BY avg_duration_sec DESC;
```
For more query examples and detailed documentation, see [BATCH_EXECUTION_HISTORY.md](claudedocs/BATCH_EXECUTION_HISTORY.md).
## License
KAMCO Internal Use Only

View File

@@ -10,7 +10,7 @@ version = '1.0.0'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
languageVersion = JavaLanguageVersion.of(21)
}
}
@@ -36,7 +36,7 @@ configurations.all {
}
bootJar {
archiveFileName = "shp-exporter.jar"
archiveFileName = "shp-exporter-v2.jar"
}
jar {
@@ -49,6 +49,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-jdbc'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.boot:spring-boot-starter-batch'
// Database
implementation 'org.postgresql:postgresql'

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
com.kamco.makesample.MakeSampleApplication

View File

@@ -1,51 +0,0 @@
spring:
datasource:
url: jdbc:postgresql://192.168.2.127:15432/kamco_cds
username: kamco_cds
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
application:
name: make-shapefile-service
main:
web-application-type: none # Disable web server for CLI application
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
# Optional: omit or set empty to create merged shapefile for all batch-ids
batch-ids: # Required
- 252
- 253
- 257
output-base-dir: '/kamco-nfs/model_output/export/'
crs: 'EPSG:5186'
geoserver:
base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver'
workspace: 'cd'
overwrite-existing: true
connection-timeout: 30000
read-timeout: 60000
# Credentials (optional - environment variables take precedence)
# Uncomment and set values for development convenience
# For production, use GEOSERVER_USERNAME and GEOSERVER_PASSWORD environment variables
username: 'admin'
password: 'geoserver'
logging:
level:
com.kamco.makesample: DEBUG
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
layer:
geoserver-url: http://label-tile.gs.dabeeo.com
workspace: cd

View File

@@ -1,52 +0,0 @@
spring:
datasource:
url: jdbc:postgresql://192.168.2.127:15432/kamco_cds
username: kamco_cds
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
application:
name: make-shapefile-service
main:
web-application-type: none # Disable web server for CLI application
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
# Optional: omit or set empty to create merged shapefile for all batch-ids
batch-ids: # Required
- 252
- 253
- 257
output-base-dir: '/kamco-nfs/model_output/export/'
#output-base-dir: '/Users/bokmin/export/'
crs: 'EPSG:5186'
geoserver:
base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver'
workspace: 'cd'
overwrite-existing: true
connection-timeout: 30000
read-timeout: 60000
# Credentials (optional - environment variables take precedence)
# Uncomment and set values for development convenience
# For production, use GEOSERVER_USERNAME and GEOSERVER_PASSWORD environment variables
username: 'admin'
password: 'geoserver'
logging:
level:
com.kamco.makesample: DEBUG
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
layer:
geoserver-url: http://label-tile.gs.dabeeo.com
workspace: cd

View File

@@ -1,53 +0,0 @@
spring:
datasource:
url: jdbc:postgresql://kamco-cd-postgis:5432/kamco_cds
username: kamco_cds
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
application:
name: make-shapefile-service
main:
web-application-type: none # Disable web server for CLI application
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
# Optional: omit or set empty to create merged shapefile for all batch-ids
batch-ids: # Required
- 252
- 253
- 257
output-base-dir: '/data/model_output/export/'
crs: 'EPSG:5186'
geoserver:
base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver'
workspace: 'cd'
overwrite-existing: true
connection-timeout: 30000
read-timeout: 60000
# Credentials (optional - environment variables take precedence)
# Uncomment and set values for development convenience
# For production, use GEOSERVER_USERNAME and GEOSERVER_PASSWORD environment variables
username: 'admin'
password: 'geoserver'
logging:
level:
com.kamco.makesample: DEBUG
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
layer:
geoserver-url: https://kamco.geo-dev.gs.dabeeo.com
wms-path: geoserver/cd
wmts-path: geoserver/cd/gwc/service
workspace: cd

View File

@@ -1,5 +0,0 @@
spring:
application:
name: make-shapefile-service
profiles:
active: prod

View File

@@ -1,12 +0,0 @@
Manifest-Version: 1.0
Main-Class: org.springframework.boot.loader.launch.JarLauncher
Start-Class: com.kamco.makesample.MakeSampleApplication
Spring-Boot-Version: 3.5.7
Spring-Boot-Classes: BOOT-INF/classes/
Spring-Boot-Lib: BOOT-INF/lib/
Spring-Boot-Classpath-Index: BOOT-INF/classpath.idx
Spring-Boot-Layers-Index: BOOT-INF/layers.idx
Build-Jdk-Spec: 17
Implementation-Title: shp-exporter
Implementation-Version: 1.0.0

File diff suppressed because one or more lines are too long

View File

@@ -1 +0,0 @@
PROJCS["Korea 2000 / Central Belt 2010", GEOGCS["Korea 2000", DATUM["Geocentric datum of Korea", SPHEROID["GRS 1980", 6378137.0, 298.257222101, AUTHORITY["EPSG","7019"]], TOWGS84[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0], AUTHORITY["EPSG","6737"]], PRIMEM["Greenwich", 0.0, AUTHORITY["EPSG","8901"]], UNIT["degree", 0.017453292519943295], AXIS["Geodetic latitude", NORTH], AXIS["Geodetic longitude", EAST], AUTHORITY["EPSG","4737"]], PROJECTION["Transverse_Mercator", AUTHORITY["EPSG","9807"]], PARAMETER["central_meridian", 127.0], PARAMETER["latitude_of_origin", 38.0], PARAMETER["scale_factor", 1.0], PARAMETER["false_easting", 200000.0], PARAMETER["false_northing", 600000.0], UNIT["m", 1.0], AXIS["Northing", NORTH], AXIS["Easting", EAST], AUTHORITY["EPSG","5186"]]

View File

@@ -1,52 +0,0 @@
spring:
datasource:
url: jdbc:postgresql://192.168.2.127:15432/kamco_cds
username: kamco_cds
password: kamco_cds_Q!W@E#R$
driver-class-name: org.postgresql.Driver
hikari:
maximum-pool-size: 5
connection-timeout: 30000
idle-timeout: 600000
max-lifetime: 1800000
application:
name: make-shapefile-service
main:
web-application-type: none # Disable web server for CLI application
converter:
inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6
# Optional: omit or set empty to create merged shapefile for all batch-ids
batch-ids: # Required
- 252
- 253
- 257
output-base-dir: '/kamco-nfs/dataset/export/'
#output-base-dir: '/Users/bokmin/export/'
crs: 'EPSG:5186'
geoserver:
base-url: 'http://label-tile.gs.dabeeo.com/geoserver'
workspace: 'cd'
overwrite-existing: true
connection-timeout: 30000
read-timeout: 60000
# Credentials (optional - environment variables take precedence)
# Uncomment and set values for development convenience
# For production, use GEOSERVER_USERNAME and GEOSERVER_PASSWORD environment variables
username: 'admin'
password: 'geoserver'
logging:
level:
com.kamco.makesample: DEBUG
org.springframework: WARN
pattern:
console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n'
layer:
geoserver-url: http://label-tile.gs.dabeeo.com
workspace: cd

View File

@@ -0,0 +1,72 @@
package com.kamco.makesample.batch.config;
import com.kamco.makesample.config.ConverterProperties;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
/**
* Spring Batch 기본 설정
*
* <p>Spring Boot Auto-configuration이 다음을 자동으로 설정: - JobRepository (BATCH_* 테이블 사용) - JobLauncher -
* PlatformTransactionManager
*
* <p>메타데이터 테이블: - BATCH_JOB_INSTANCE - BATCH_JOB_EXECUTION - BATCH_JOB_EXECUTION_PARAMS -
* BATCH_STEP_EXECUTION - BATCH_STEP_EXECUTION_CONTEXT - BATCH_JOB_EXECUTION_CONTEXT
*/
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
private final ConverterProperties properties;
public BatchConfiguration(ConverterProperties properties) {
this.properties = properties;
}
/**
* Spring Boot Auto-configuration이 자동으로 설정하는 내용:
*
* <p>1. JobRepository: DataSource를 사용하여 BATCH_* 테이블에 메타데이터 저장
*
* <p>2. JobLauncher: CLI에서 Job 실행을 위한 런처
*
* <p>3. TransactionManager: DataSource에 대한 트랜잭션 관리
*
* <p>추가 설정이 필요하면 여기에 Bean을 정의
*/
/**
* Map ID별 파일 생성을 위한 TaskExecutor
*
* <p>병렬 처리를 통해 각 map_id별 shapefile/geojson을 동시에 생성합니다.
*
* <p>설정:
*
* <ul>
* <li>corePoolSize: partition-concurrency 설정값 (기본 4)
* <li>maxPoolSize: corePoolSize * 2 (최대 확장 가능)
* <li>queueCapacity: 50 (대기 큐 크기)
* <li>threadNamePrefix: mapid-worker- (로그 추적용)
* </ul>
*
* @return TaskExecutor for partitioned step
*/
@Bean
public TaskExecutor mapIdTaskExecutor() {
int concurrency = properties.getBatch().getPartitionConcurrency();
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(concurrency);
executor.setMaxPoolSize(concurrency * 2);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("mapid-worker-");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}

View File

@@ -0,0 +1,289 @@
package com.kamco.makesample.batch.config;
import com.kamco.makesample.batch.listener.BatchExecutionHistoryListener;
import com.kamco.makesample.batch.partitioner.MapIdPartitioner;
import com.kamco.makesample.batch.processor.FeatureConversionProcessor;
import com.kamco.makesample.batch.tasklet.CreateZipTasklet;
import com.kamco.makesample.batch.tasklet.GeoServerRegistrationTasklet;
import com.kamco.makesample.batch.tasklet.GeometryTypeValidationTasklet;
import com.kamco.makesample.batch.writer.MapIdGeoJsonWriter;
import com.kamco.makesample.batch.writer.MapIdShapefileWriter;
import com.kamco.makesample.batch.writer.StreamingGeoJsonWriter;
import com.kamco.makesample.batch.writer.StreamingShapefileWriter;
import com.kamco.makesample.model.InferenceResult;
import java.util.Arrays;
import org.geotools.api.feature.simple.SimpleFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
/**
* MERGED 모드 Job 설정
*
* <p>전체 batch_ids에 대한 데이터를 하나의 shapefile과 GeoJSON으로 생성하고, merge 폴더의 shapefile만 GeoServer에 등록
*
* <p>Job Flow:
*
* <ol>
* <li>validateGeometryTypeStep: Geometry type 사전 검증 (Tasklet)
* <li>generateShapefileStep: Shapefile 생성 (Chunk-oriented)
* <li>generateGeoJsonStep: GeoJSON 생성 (Chunk-oriented)
* <li>createZipStep: ZIP 파일 생성 (Tasklet)
* <li>registerToGeoServerStep: GeoServer 등록 - merge 폴더의 shapefile만 (Tasklet, conditional)
* <li>generateMapIdFilesStep: Map ID별 개별 shapefile/geojson 생성 (Partitioned, parallel)
* </ol>
*/
@Configuration
public class MergedModeJobConfig {
private static final Logger log = LoggerFactory.getLogger(MergedModeJobConfig.class);
/**
* MERGED 모드 Job 정의
*
* @param jobRepository JobRepository
* @param validateGeometryTypeStep Geometry type 검증 Step
* @param generateShapefileStep Shapefile 생성 Step
* @param generateGeoJsonStep GeoJSON 생성 Step
* @param createZipStep ZIP 생성 Step
* @param registerToGeoServerStep GeoServer 등록 Step (merge 폴더의 shapefile만)
* @param generateMapIdFilesStep Map ID별 파일 생성 Step (병렬 처리)
* @return Job
*/
@Bean
public Job mergedModeJob(
JobRepository jobRepository,
Step validateGeometryTypeStep,
Step generateShapefileStep,
Step generateGeoJsonStep,
Step createZipStep,
Step registerToGeoServerStep,
Step generateMapIdFilesStep) {
return new JobBuilder("mergedModeJob", jobRepository)
.start(validateGeometryTypeStep)
.next(generateShapefileStep)
.next(generateGeoJsonStep)
.next(createZipStep)
.next(registerToGeoServerStep) // Conditional execution
.next(generateMapIdFilesStep) // Map ID별 개별 파일 생성
.build();
}
/**
* Step 1: Geometry Type 검증
*
* <p>Shapefile은 homogeneous geometry type을 요구하므로 사전 검증
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param validationTasklet GeometryTypeValidationTasklet
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step validateGeometryTypeStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
GeometryTypeValidationTasklet validationTasklet,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("validateGeometryTypeStep", jobRepository)
.tasklet(validationTasklet, transactionManager)
.listener(historyListener)
.build();
}
/**
* Step 2: Shapefile 생성 (Chunk-oriented)
*
* <p>메모리 최적화:
*
* <ul>
* <li>Reader: JdbcCursorItemReader (스트리밍)
* <li>Processor: InferenceResult → SimpleFeature 변환
* <li>Writer: StreamingShapefileWriter (chunk 단위 쓰기)
* </ul>
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param shapefileReader ItemReader (Shapefile용)
* @param featureConversionProcessor ItemProcessor
* @param shapefileWriter ItemWriter
* @param chunkSize Chunk size (default: 1000)
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step generateShapefileStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JdbcCursorItemReader<InferenceResult> shapefileReader,
FeatureConversionProcessor featureConversionProcessor,
StreamingShapefileWriter shapefileWriter,
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("generateShapefileStep", jobRepository)
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
.reader(shapefileReader)
.processor(featureConversionProcessor)
.writer(shapefileWriter)
.listener(historyListener)
.build();
}
/**
* Step 3: GeoJSON 생성 (Chunk-oriented)
*
* <p>Shapefile과 동일한 데이터를 GeoJSON 형식으로 출력
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param geoJsonReader ItemReader (GeoJSON용 - 별도 인스턴스)
* @param featureConversionProcessor ItemProcessor (재사용)
* @param geoJsonWriter ItemWriter
* @param chunkSize Chunk size
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step generateGeoJsonStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JdbcCursorItemReader<InferenceResult> geoJsonReader,
FeatureConversionProcessor featureConversionProcessor,
StreamingGeoJsonWriter geoJsonWriter,
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("generateGeoJsonStep", jobRepository)
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
.reader(geoJsonReader)
.processor(featureConversionProcessor)
.writer(geoJsonWriter)
.listener(historyListener)
.build();
}
/**
* Step 4: ZIP 파일 생성
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param createZipTasklet CreateZipTasklet
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step createZipStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
CreateZipTasklet createZipTasklet,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("createZipStep", jobRepository)
.tasklet(createZipTasklet, transactionManager)
.listener(historyListener)
.build();
}
/**
* Step 5: GeoServer 등록 (merge 폴더의 shapefile만)
*
* <p>Conditional execution: geoserver.enabled=true일 때만 실행
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param registrationTasklet GeoServerRegistrationTasklet
* @param historyListener BatchExecutionHistoryListener
* @return Step
*/
@Bean
public Step registerToGeoServerStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
GeoServerRegistrationTasklet registrationTasklet,
BatchExecutionHistoryListener historyListener) {
return new StepBuilder("registerToGeoServerStep", jobRepository)
.tasklet(registrationTasklet, transactionManager)
.listener(historyListener)
.build();
}
/**
* Step 6: Map ID별 개별 파일 생성 (Partitioned Step - Sequential)
*
* <p>각 map_id마다 개별 shapefile과 geojson 파일을 순차적으로 생성합니다. SyncTaskExecutor를 명시적으로 지정하여 병렬 실행을 방지하고
* DB connection pool 고갈 방지
*
* @param jobRepository JobRepository
* @param partitioner MapIdPartitioner
* @param mapIdWorkerStep Worker Step (각 파티션에서 실행)
* @return Partitioned Step
*/
@Bean
public Step generateMapIdFilesStep(
JobRepository jobRepository, MapIdPartitioner partitioner, Step mapIdWorkerStep) {
return new StepBuilder("generateMapIdFilesStep", jobRepository)
.partitioner("mapIdWorker", partitioner)
.step(mapIdWorkerStep)
.taskExecutor(new SyncTaskExecutor()) // 명시적으로 순차 실행 지정
.listener(partitioner) // Register partitioner as StepExecutionListener
.build();
}
/**
* Worker Step: Map ID별 파일 생성 작업
*
* <p>각 파티션에서 실행되며, 해당 map_id의 데이터를 읽어 shapefile과 geojson을 동시에 생성합니다.
*
* @param jobRepository JobRepository
* @param transactionManager TransactionManager
* @param mapIdModeReader ItemReader (map_id별)
* @param featureConversionProcessor ItemProcessor
* @param mapIdShapefileWriter Shapefile Writer
* @param mapIdGeoJsonWriter GeoJSON Writer
* @param chunkSize Chunk size
* @param historyListener BatchExecutionHistoryListener
* @return Worker Step
*/
@Bean
public Step mapIdWorkerStep(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
JdbcCursorItemReader<InferenceResult> mapIdModeReader,
FeatureConversionProcessor featureConversionProcessor,
MapIdShapefileWriter mapIdShapefileWriter,
MapIdGeoJsonWriter mapIdGeoJsonWriter,
@Value("${converter.batch.chunk-size:1000}") int chunkSize,
BatchExecutionHistoryListener historyListener) {
// CompositeItemWriter로 shapefile과 geojson 동시 생성
CompositeItemWriter<SimpleFeature> compositeWriter = new CompositeItemWriter<>();
compositeWriter.setDelegates(Arrays.asList(mapIdShapefileWriter, mapIdGeoJsonWriter));
return new StepBuilder("mapIdWorkerStep", jobRepository)
.<InferenceResult, SimpleFeature>chunk(chunkSize, transactionManager)
.reader(mapIdModeReader)
.processor(featureConversionProcessor)
.writer(compositeWriter)
.stream(mapIdShapefileWriter)
.stream(mapIdGeoJsonWriter)
.listener(historyListener)
.build();
}
}

View File

@@ -0,0 +1,183 @@
package com.kamco.makesample.batch.listener;
import com.kamco.makesample.batch.model.BatchExecutionHistory;
import com.kamco.makesample.batch.repository.BatchExecutionHistoryRepository;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.LocalDateTime;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.stereotype.Component;
/**
* 배치 실행 이력 추적 Listener
*
* <p>각 스텝의 시작/종료 시점에 실행 이력을 데이터베이스에 저장
*
* <p>기록 항목:
*
* <ul>
* <li>시작 시간, 종료 시간, 소요 시간
* <li>성공/실패 상태
* <li>에러 발생 시 에러 메시지 및 스택 트레이스
* <li>처리 통계 (read/write/commit/rollback/skip count)
* </ul>
*/
@Component
public class BatchExecutionHistoryListener implements StepExecutionListener {
private static final Logger log = LoggerFactory.getLogger(BatchExecutionHistoryListener.class);
private final BatchExecutionHistoryRepository historyRepository;
// ThreadLocal로 각 스텝별 이력 ID 저장
private final ThreadLocal<Long> historyIdHolder = new ThreadLocal<>();
public BatchExecutionHistoryListener(BatchExecutionHistoryRepository historyRepository) {
this.historyRepository = historyRepository;
}
@Override
public void beforeStep(StepExecution stepExecution) {
try {
// 배치 실행 이력 생성
BatchExecutionHistory history = new BatchExecutionHistory();
history.setJobExecutionId(stepExecution.getJobExecutionId());
history.setStepExecutionId(stepExecution.getId());
history.setStepName(stepExecution.getStepName());
history.setStartTime(LocalDateTime.now());
history.setStatus("STARTED");
// Job Parameters에서 batch_ids, inference_id 추출
String batchIds = stepExecution.getJobParameters().getString("batchIds");
String inferenceId = stepExecution.getJobParameters().getString("inferenceId");
history.setBatchIds(batchIds);
history.setInferenceId(inferenceId);
// Step 타입 추정 (Tasklet vs Chunk)
String stepType = estimateStepType(stepExecution.getStepName());
history.setStepType(stepType);
// 이력 저장
Long historyId = historyRepository.insert(history);
historyIdHolder.set(historyId);
log.debug(
"Step execution history created: id={}, step={}, jobExecutionId={}",
historyId,
stepExecution.getStepName(),
stepExecution.getJobExecutionId());
} catch (Exception e) {
log.error("Failed to save step execution history on start: {}", e.getMessage(), e);
}
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
try {
Long historyId = historyIdHolder.get();
if (historyId == null) {
log.warn("No history ID found for step: {}", stepExecution.getStepName());
return stepExecution.getExitStatus();
}
// 종료 시간 및 상태
LocalDateTime endTime = LocalDateTime.now();
LocalDateTime startTime =
stepExecution.getStartTime() != null ? stepExecution.getStartTime() : LocalDateTime.now();
String status = stepExecution.getStatus().toString();
String exitCode = stepExecution.getExitStatus().getExitCode();
String exitMessage = stepExecution.getExitStatus().getExitDescription();
// 에러 정보 추출
String errorMessage = null;
String errorStackTrace = null;
List<Throwable> failureExceptions = stepExecution.getFailureExceptions();
if (!failureExceptions.isEmpty()) {
Throwable firstException = failureExceptions.get(0);
errorMessage = firstException.getMessage();
errorStackTrace = getStackTrace(firstException);
}
// 처리 통계 (Chunk 기반 스텝용)
Long readCount = (long) stepExecution.getReadCount();
Long writeCount = (long) stepExecution.getWriteCount();
Long commitCount = (long) stepExecution.getCommitCount();
Long rollbackCount = (long) stepExecution.getRollbackCount();
Long skipCount = (long) stepExecution.getSkipCount();
// 이력 업데이트
historyRepository.updateOnCompletion(
historyId,
endTime,
startTime,
status,
exitCode,
exitMessage,
errorMessage,
errorStackTrace,
readCount,
writeCount,
commitCount,
rollbackCount,
skipCount);
log.debug(
"Step execution history updated: id={}, step={}, status={}, duration={}ms",
historyId,
stepExecution.getStepName(),
status,
java.time.Duration.between(startTime, endTime).toMillis());
// ThreadLocal 정리
historyIdHolder.remove();
} catch (Exception e) {
log.error("Failed to update step execution history on completion: {}", e.getMessage(), e);
}
return stepExecution.getExitStatus();
}
/**
* Step 이름으로 Step 타입 추정
*
* @param stepName Step 이름
* @return TASKLET 또는 CHUNK
*/
private String estimateStepType(String stepName) {
// Tasklet 스텝들
if (stepName.contains("validate")
|| stepName.contains("Zip")
|| stepName.contains("GeoServer")) {
return "TASKLET";
}
// Chunk 스텝들
if (stepName.contains("generate")
|| stepName.contains("Shapefile")
|| stepName.contains("GeoJson")) {
return "CHUNK";
}
return "UNKNOWN";
}
/**
* Exception을 스택 트레이스 문자열로 변환
*
* @param throwable Exception
* @return 스택 트레이스 문자열
*/
private String getStackTrace(Throwable throwable) {
StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw);
throwable.printStackTrace(pw);
return sw.toString();
}
}

View File

@@ -0,0 +1,212 @@
package com.kamco.makesample.batch.model;
import java.time.LocalDateTime;
/**
* 배치 실행 이력 엔티티
*
* <p>각 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유를 추적
*/
public class BatchExecutionHistory {
private Long id;
private Long jobExecutionId;
private Long stepExecutionId;
private String stepName;
private String stepType;
private LocalDateTime startTime;
private LocalDateTime endTime;
private Long durationMs;
private String status;
private String exitCode;
private String exitMessage;
private String errorMessage;
private String errorStackTrace;
private Long readCount;
private Long writeCount;
private Long commitCount;
private Long rollbackCount;
private Long skipCount;
private String batchIds;
private String inferenceId;
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// Getters and Setters
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getJobExecutionId() {
return jobExecutionId;
}
public void setJobExecutionId(Long jobExecutionId) {
this.jobExecutionId = jobExecutionId;
}
public Long getStepExecutionId() {
return stepExecutionId;
}
public void setStepExecutionId(Long stepExecutionId) {
this.stepExecutionId = stepExecutionId;
}
public String getStepName() {
return stepName;
}
public void setStepName(String stepName) {
this.stepName = stepName;
}
public String getStepType() {
return stepType;
}
public void setStepType(String stepType) {
this.stepType = stepType;
}
public LocalDateTime getStartTime() {
return startTime;
}
public void setStartTime(LocalDateTime startTime) {
this.startTime = startTime;
}
public LocalDateTime getEndTime() {
return endTime;
}
public void setEndTime(LocalDateTime endTime) {
this.endTime = endTime;
}
public Long getDurationMs() {
return durationMs;
}
public void setDurationMs(Long durationMs) {
this.durationMs = durationMs;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
public String getExitCode() {
return exitCode;
}
public void setExitCode(String exitCode) {
this.exitCode = exitCode;
}
public String getExitMessage() {
return exitMessage;
}
public void setExitMessage(String exitMessage) {
this.exitMessage = exitMessage;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public String getErrorStackTrace() {
return errorStackTrace;
}
public void setErrorStackTrace(String errorStackTrace) {
this.errorStackTrace = errorStackTrace;
}
public Long getReadCount() {
return readCount;
}
public void setReadCount(Long readCount) {
this.readCount = readCount;
}
public Long getWriteCount() {
return writeCount;
}
public void setWriteCount(Long writeCount) {
this.writeCount = writeCount;
}
public Long getCommitCount() {
return commitCount;
}
public void setCommitCount(Long commitCount) {
this.commitCount = commitCount;
}
public Long getRollbackCount() {
return rollbackCount;
}
public void setRollbackCount(Long rollbackCount) {
this.rollbackCount = rollbackCount;
}
public Long getSkipCount() {
return skipCount;
}
public void setSkipCount(Long skipCount) {
this.skipCount = skipCount;
}
public String getBatchIds() {
return batchIds;
}
public void setBatchIds(String batchIds) {
this.batchIds = batchIds;
}
public String getInferenceId() {
return inferenceId;
}
public void setInferenceId(String inferenceId) {
this.inferenceId = inferenceId;
}
public LocalDateTime getCreatedAt() {
return createdAt;
}
public void setCreatedAt(LocalDateTime createdAt) {
this.createdAt = createdAt;
}
public LocalDateTime getUpdatedAt() {
return updatedAt;
}
public void setUpdatedAt(LocalDateTime updatedAt) {
this.updatedAt = updatedAt;
}
}

View File

@@ -0,0 +1,140 @@
package com.kamco.makesample.batch.partitioner;
import com.kamco.makesample.config.ConverterProperties;
import com.kamco.makesample.repository.InferenceResultRepository;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.partition.support.Partitioner;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.stereotype.Component;
/**
* Map ID별 파티션 생성
*
* <p>batch_ids로부터 고유한 map_id 목록을 조회하여 각 map_id마다 ExecutionContext를 생성합니다. 각 파티션은 독립적으로 shapefile과
* geojson을 생성합니다.
*/
@Component
public class MapIdPartitioner implements Partitioner, StepExecutionListener {
private static final Logger log = LoggerFactory.getLogger(MapIdPartitioner.class);
private final InferenceResultRepository repository;
private final ConverterProperties properties;
private String geometryType; // Populated in @BeforeStep, used in partition()
public MapIdPartitioner(InferenceResultRepository repository, ConverterProperties properties) {
this.repository = repository;
this.properties = properties;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
log.info("MapIdPartitioner.beforeStep() - retrieving geometryType from job context");
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
if (jobExecutionContext.containsKey("geometryType")) {
this.geometryType = jobExecutionContext.getString("geometryType");
log.info("Retrieved geometryType from job context: {}", this.geometryType);
} else {
String errorMsg =
"geometryType not found in job execution context. "
+ "GeometryTypeValidationTasklet must run before partitioning.";
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
if (this.geometryType == null || this.geometryType.isEmpty()) {
throw new IllegalStateException("geometryType is null or empty in job context");
}
log.info("MapIdPartitioner ready with geometryType: {}", this.geometryType);
}
@AfterStep
public ExitStatus afterStep(StepExecution stepExecution) {
return ExitStatus.COMPLETED;
}
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
List<Long> batchIds = properties.getBatchIds();
String inferenceId = properties.getInferenceId();
String outputBaseDir = properties.getOutputBaseDir();
log.info("Creating partitions for batch_ids: {}, inference_id: {}", batchIds, inferenceId);
// batch_ids로 고유한 map_id 목록 조회
List<String> mapIds = repository.findMapIdByBatchIds(batchIds);
if (mapIds.isEmpty()) {
log.warn("No map_ids found for batch_ids: {}", batchIds);
return new HashMap<>();
}
log.info("Found {} map_ids to partition: {}", mapIds.size(), mapIds);
// 각 map_id마다 ExecutionContext 생성
Map<String, ExecutionContext> partitions = new HashMap<>();
for (String mapId : mapIds) {
ExecutionContext context = new ExecutionContext();
// 파티션별 파라미터 설정
context.putString("mapId", mapId);
context.putString("outputPath", buildShapefilePath(outputBaseDir, inferenceId, mapId));
context.putString("geoJsonOutputPath", buildGeoJsonPath(outputBaseDir, inferenceId, mapId));
// Propagate geometryType to partition context
context.putString("geometryType", this.geometryType);
partitions.put("partition-" + mapId, context);
log.debug(
"Created partition for map_id: {}, shapefile: {}, geojson: {}, geometryType: {}",
mapId,
context.getString("outputPath"),
context.getString("geoJsonOutputPath"),
context.getString("geometryType"));
}
log.info("Created {} partitions with geometryType: {}", partitions.size(), this.geometryType);
return partitions;
}
/**
* Shapefile 출력 경로 생성
*
* @param baseDir 기본 디렉토리
* @param inferenceId Inference ID
* @param mapId Map ID
* @return Shapefile 경로
*/
private String buildShapefilePath(String baseDir, String inferenceId, String mapId) {
return Paths.get(baseDir, inferenceId, mapId, mapId + ".shp").toString();
}
/**
* GeoJSON 출력 경로 생성
*
* @param baseDir 기본 디렉토리
* @param inferenceId Inference ID
* @param mapId Map ID
* @return GeoJSON 경로
*/
private String buildGeoJsonPath(String baseDir, String inferenceId, String mapId) {
return Paths.get(baseDir, inferenceId, mapId, mapId + ".geojson").toString();
}
}

View File

@@ -0,0 +1,135 @@
package com.kamco.makesample.batch.processor;
import com.kamco.makesample.batch.util.FeatureTypeFactory;
import com.kamco.makesample.model.InferenceResult;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.feature.simple.SimpleFeatureBuilder;
import org.geotools.referencing.CRS;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* InferenceResult → SimpleFeature 변환 Processor
*
* <p>기존 ShapefileWriter의 buildFeature 로직을 Processor로 분리
*
* <p>주요 역할:
*
* <ul>
* <li>Geometry 검증 (null 체크, isValid 체크)
* <li>InferenceResult 필드를 SimpleFeature 속성으로 변환
* <li>Invalid geometry는 skip (null 반환)
* </ul>
*/
@Component
@StepScope
public class FeatureConversionProcessor implements ItemProcessor<InferenceResult, SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(FeatureConversionProcessor.class);
private final FeatureTypeFactory featureTypeFactory;
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
private String crsCode;
private SimpleFeatureBuilder featureBuilder;
private SimpleFeatureType featureType;
public FeatureConversionProcessor(FeatureTypeFactory featureTypeFactory) {
this.featureTypeFactory = featureTypeFactory;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
// Job ExecutionContext에서 geometry type 읽기 (이전 Step에서 설정한 값)
ExecutionContext jobExecutionContext = stepExecution.getJobExecution().getExecutionContext();
String geomTypeStr = null;
Class<?> geometryType;
// geometryType이 설정되어 있는지 확인 (빈 데이터셋인 경우 설정되지 않을 수 있음)
if (jobExecutionContext.containsKey("geometryType")) {
geomTypeStr = jobExecutionContext.getString("geometryType");
} else {
log.warn("geometryType not set in Job ExecutionContext (empty dataset). Using default");
}
geometryType = featureTypeFactory.parseGeometryType(geomTypeStr);
try {
// CRS 설정
CoordinateReferenceSystem crs = CRS.decode(crsCode);
// FeatureType 생성
this.featureType = featureTypeFactory.createFeatureType(crs, geometryType);
this.featureBuilder = new SimpleFeatureBuilder(this.featureType);
log.info(
"FeatureConversionProcessor initialized with geometry type: {}",
geometryType.getSimpleName());
} catch (FactoryException e) {
throw new RuntimeException("Failed to initialize FeatureConversionProcessor", e);
}
}
@Override
public SimpleFeature process(InferenceResult result) throws Exception {
// Geometry 검증
Geometry geometry = result.getGeometry();
if (geometry == null) {
log.warn("Null geometry detected for uid: {} - skipping", result.getUid());
return null; // Skip this item
}
if (!geometry.isValid()) {
log.warn(
"Invalid geometry detected for uid: {} - skipping. Reason: {}",
result.getUid(),
geometry.getGeometryType());
return null; // Skip invalid geometry
}
// SimpleFeature 빌드
return buildFeature(result, geometry);
}
/**
* InferenceResult를 SimpleFeature로 변환
*
* <p>기존 ShapefileWriter.buildFeature() 로직과 동일
*
* @param result InferenceResult
* @param geometry Geometry
* @return SimpleFeature
*/
private SimpleFeature buildFeature(InferenceResult result, Geometry geometry) {
// Geometry 추가 (the_geom)
featureBuilder.add(geometry);
// 속성 필드 추가
featureBuilder.add(result.getUid());
featureBuilder.add(result.getMapId());
featureBuilder.add(
result.getProbability() != null ? String.valueOf(result.getProbability()) : "0.0");
featureBuilder.add(result.getBeforeYear() != null ? result.getBeforeYear() : 0L);
featureBuilder.add(result.getAfterYear() != null ? result.getAfterYear() : 0L);
featureBuilder.add(result.getBeforeC());
featureBuilder.add(result.getBeforeP() != null ? String.valueOf(result.getBeforeP()) : "0.0");
featureBuilder.add(result.getAfterC());
featureBuilder.add(result.getAfterP() != null ? String.valueOf(result.getAfterP()) : "0.0");
return featureBuilder.buildFeature(null);
}
}

View File

@@ -0,0 +1,56 @@
package com.kamco.makesample.batch.reader;
import com.kamco.makesample.model.InferenceResult;
import com.kamco.makesample.service.GeometryConverter;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
/**
* ResultSet을 InferenceResult로 변환하는 RowMapper
*
* <p>기존 InferenceResultRepository의 InferenceResultRowMapper와 동일한 로직을 사용하지만, Spring Batch의
* ItemReader와 함께 사용하도록 독립 Component로 분리
*/
@Component
public class GeometryConvertingRowMapper implements RowMapper<InferenceResult> {
private final GeometryConverter geometryConverter;
public GeometryConvertingRowMapper(GeometryConverter geometryConverter) {
this.geometryConverter = geometryConverter;
}
@Override
public InferenceResult mapRow(ResultSet rs, int rowNum) throws SQLException {
InferenceResult result = new InferenceResult();
result.setUid(rs.getString("uid"));
result.setMapId(rs.getString("map_id"));
result.setProbability(getDoubleOrNull(rs, "probability"));
result.setBeforeYear(getLongOrNull(rs, "before_year"));
result.setAfterYear(getLongOrNull(rs, "after_year"));
result.setBeforeC(rs.getString("before_c"));
result.setBeforeP(getDoubleOrNull(rs, "before_p"));
result.setAfterC(rs.getString("after_c"));
result.setAfterP(getDoubleOrNull(rs, "after_p"));
// WKT → JTS Geometry 변환 (per-record conversion)
String geometryWkt = rs.getString("geometry_wkt");
if (geometryWkt != null) {
result.setGeometry(geometryConverter.convertWKTToJTS(geometryWkt));
}
return result;
}
private Long getLongOrNull(ResultSet rs, String columnName) throws SQLException {
long value = rs.getLong(columnName);
return rs.wasNull() ? null : value;
}
private Double getDoubleOrNull(ResultSet rs, String columnName) throws SQLException {
double value = rs.getDouble(columnName);
return rs.wasNull() ? null : value;
}
}

View File

@@ -0,0 +1,221 @@
package com.kamco.makesample.batch.reader;
import com.kamco.makesample.model.InferenceResult;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.sql.DataSource;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.PreparedStatementSetter;
/**
* 커서 기반 ItemReader 설정
*
* <p>메모리 최적화의 핵심: 전체 데이터를 List로 로딩하지 않고 커서를 사용하여 스트리밍 처리
*
* <p>주요 특징:
*
* <ul>
* <li>fetch-size: 1000 → DB에서 1000건씩 가져옴
* <li>cursor-based → 전체 ResultSet을 메모리에 로딩하지 않음
* <li>PreparedStatement → PostgreSQL array 파라미터 처리
* <li>EPSG:5186 좌표계 정합성 검증 (SRID, 좌표 범위, geometry 유효성)
* </ul>
*/
@Configuration
public class InferenceResultItemReaderConfig {
// EPSG:5186 좌표계 정합성 검증 조건:
// - SRID = 5186 (한국 2000 / 중부 좌표계)
// - ST_IsValid() = true (geometry 유효성)
// - X 범위: 125,000 ~ 530,000m (동서 방향)
// - Y 범위: -600,000 ~ 988,000m (남북 방향)
// 위 조건을 만족하지 않는 잘못된 좌표의 polygon은 배치 대상에서 제외됨
private static final String SQL_QUERY =
"""
SELECT uid, map_id, probability, before_year, after_year,
before_c, before_p, after_c, after_p,
ST_AsText(geometry) as geometry_wkt
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND after_c IS NOT NULL
AND after_p IS NOT NULL
AND geometry IS NOT NULL
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_IsValid(geometry) = true
AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
ORDER BY map_id, uid
""";
/**
* MERGED 모드용 ItemReader (Shapefile 생성용)
*
* <p>전체 batch_ids에 대한 데이터를 스트리밍으로 읽어옴
*
* @param dataSource DataSource
* @param batchIdsParam Job Parameter로 전달받은 batch_ids (콤마 구분 문자열, 예: "252,253,257")
* @param fetchSize fetch size (기본 1000)
* @param rowMapper RowMapper
* @return JdbcCursorItemReader
*/
@Bean
@StepScope
public JdbcCursorItemReader<InferenceResult> shapefileReader(
DataSource dataSource,
@Value("#{jobParameters['batchIds']}") String batchIdsParam,
@Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
GeometryConvertingRowMapper rowMapper) {
// JobParameter로 받은 "252,253,257" 형태를 Long 배열로 변환
Long[] batchIds = parseBatchIds(batchIdsParam);
return new JdbcCursorItemReaderBuilder<InferenceResult>()
.name("shapefileReader")
.dataSource(dataSource)
.sql(SQL_QUERY)
.fetchSize(fetchSize) // 메모리 효율을 위한 fetch size
.rowMapper(rowMapper)
.preparedStatementSetter(
new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
// PostgreSQL array 파라미터 설정
Connection conn = ps.getConnection();
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
}
})
.build();
}
/**
* MERGED 모드용 ItemReader (GeoJSON 생성용)
*
* <p>전체 batch_ids에 대한 데이터를 스트리밍으로 읽어옴 (Shapefile과 동일한 데이터)
*
* @param dataSource DataSource
* @param batchIdsParam Job Parameter로 전달받은 batch_ids (콤마 구분 문자열, 예: "252,253,257")
* @param fetchSize fetch size (기본 1000)
* @param rowMapper RowMapper
* @return JdbcCursorItemReader
*/
@Bean
@StepScope
public JdbcCursorItemReader<InferenceResult> geoJsonReader(
DataSource dataSource,
@Value("#{jobParameters['batchIds']}") String batchIdsParam,
@Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
GeometryConvertingRowMapper rowMapper) {
// JobParameter로 받은 "252,253,257" 형태를 Long 배열로 변환
Long[] batchIds = parseBatchIds(batchIdsParam);
return new JdbcCursorItemReaderBuilder<InferenceResult>()
.name("geoJsonReader")
.dataSource(dataSource)
.sql(SQL_QUERY)
.fetchSize(fetchSize) // 메모리 효율을 위한 fetch size
.rowMapper(rowMapper)
.preparedStatementSetter(
new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
// PostgreSQL array 파라미터 설정
Connection conn = ps.getConnection();
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
}
})
.build();
}
/**
* MAP_IDS 모드용 ItemReader
*
* <p>특정 map_id에 대한 데이터만 읽어옴
*
* @param dataSource DataSource
* @param batchIdsParam Job Parameter로 전달받은 batch_ids
* @param mapId Step Execution Context에서 전달받은 map_id
* @param fetchSize fetch size
* @param rowMapper RowMapper
* @return JdbcCursorItemReader
*/
@Bean
@StepScope
public JdbcCursorItemReader<InferenceResult> mapIdModeReader(
DataSource dataSource,
@Value("#{jobParameters['batchIds']}") String batchIdsParam,
@Value("#{stepExecutionContext['mapId']}") String mapId,
@Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize,
GeometryConvertingRowMapper rowMapper) {
Long[] batchIds = parseBatchIds(batchIdsParam);
String sqlWithMapId =
"""
SELECT uid, map_id, probability, before_year, after_year,
before_c, before_p, after_c, after_p,
ST_AsText(geometry) as geometry_wkt
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND map_id = ?
AND after_c IS NOT NULL
AND after_p IS NOT NULL
AND geometry IS NOT NULL
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_IsValid(geometry) = true
AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
ORDER BY uid
""";
return new JdbcCursorItemReaderBuilder<InferenceResult>()
.name("mapIdModeReader")
.dataSource(dataSource)
.sql(sqlWithMapId)
.fetchSize(fetchSize)
.rowMapper(rowMapper)
.preparedStatementSetter(
new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
Connection conn = ps.getConnection();
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
ps.setString(2, mapId);
}
})
.build();
}
/**
* JobParameter 문자열을 Long 배열로 변환
*
* @param batchIdsParam "252,253,257" 형태의 문자열
* @return Long 배열
*/
private Long[] parseBatchIds(String batchIdsParam) {
if (batchIdsParam == null || batchIdsParam.trim().isEmpty()) {
throw new IllegalArgumentException("batchIds parameter is required");
}
String[] parts = batchIdsParam.split(",");
Long[] batchIds = new Long[parts.length];
for (int i = 0; i < parts.length; i++) {
batchIds[i] = Long.parseLong(parts[i].trim());
}
return batchIds;
}
}

View File

@@ -0,0 +1,206 @@
package com.kamco.makesample.batch.repository;
import com.kamco.makesample.batch.model.BatchExecutionHistory;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.LocalDateTime;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
/**
* 배치 실행 이력 Repository
*
* <p>스텝별 실행 이력을 데이터베이스에 저장하고 조회
*/
@Repository
public class BatchExecutionHistoryRepository {
private final JdbcTemplate jdbcTemplate;
public BatchExecutionHistoryRepository(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
/**
* 스텝 시작 시 이력 생성
*
* @param history 배치 실행 이력
* @return 생성된 이력의 ID
*/
public Long insert(BatchExecutionHistory history) {
String sql =
"""
INSERT INTO batch_execution_history (
job_execution_id, step_execution_id, step_name, step_type,
start_time, status, batch_ids, inference_id
) VALUES (?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id
""";
return jdbcTemplate.queryForObject(
sql,
Long.class,
history.getJobExecutionId(),
history.getStepExecutionId(),
history.getStepName(),
history.getStepType(),
Timestamp.valueOf(history.getStartTime()),
history.getStatus(),
history.getBatchIds(),
history.getInferenceId());
}
/**
* 스텝 종료 시 이력 업데이트
*
* @param id 이력 ID
* @param endTime 종료 시간
* @param status 상태
* @param exitCode Exit Code
* @param exitMessage Exit Message
* @param errorMessage 에러 메시지
* @param errorStackTrace 스택 트레이스
* @param readCount Read Count
* @param writeCount Write Count
* @param commitCount Commit Count
* @param rollbackCount Rollback Count
* @param skipCount Skip Count
*/
public void updateOnCompletion(
Long id,
LocalDateTime endTime,
LocalDateTime startTime,
String status,
String exitCode,
String exitMessage,
String errorMessage,
String errorStackTrace,
Long readCount,
Long writeCount,
Long commitCount,
Long rollbackCount,
Long skipCount) {
// 소요 시간 계산
long durationMs = Duration.between(startTime, endTime).toMillis();
String sql =
"""
UPDATE batch_execution_history
SET end_time = ?,
duration_ms = ?,
status = ?,
exit_code = ?,
exit_message = ?,
error_message = ?,
error_stack_trace = ?,
read_count = ?,
write_count = ?,
commit_count = ?,
rollback_count = ?,
skip_count = ?,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""";
jdbcTemplate.update(
sql,
Timestamp.valueOf(endTime),
durationMs,
status,
exitCode,
exitMessage,
errorMessage,
errorStackTrace,
readCount,
writeCount,
commitCount,
rollbackCount,
skipCount,
id);
}
/**
* Job Execution ID로 모든 스텝 이력 조회
*
* @param jobExecutionId Job Execution ID
* @return 이력 목록
*/
public java.util.List<BatchExecutionHistory> findByJobExecutionId(Long jobExecutionId) {
String sql =
"""
SELECT * FROM batch_execution_history
WHERE job_execution_id = ?
ORDER BY start_time
""";
return jdbcTemplate.query(
sql,
(rs, rowNum) -> {
BatchExecutionHistory history = new BatchExecutionHistory();
history.setId(rs.getLong("id"));
history.setJobExecutionId(rs.getLong("job_execution_id"));
history.setStepExecutionId(rs.getLong("step_execution_id"));
history.setStepName(rs.getString("step_name"));
history.setStepType(rs.getString("step_type"));
history.setStartTime(rs.getTimestamp("start_time").toLocalDateTime());
Timestamp endTimestamp = rs.getTimestamp("end_time");
if (endTimestamp != null) {
history.setEndTime(endTimestamp.toLocalDateTime());
}
history.setDurationMs(rs.getLong("duration_ms"));
history.setStatus(rs.getString("status"));
history.setExitCode(rs.getString("exit_code"));
history.setExitMessage(rs.getString("exit_message"));
history.setErrorMessage(rs.getString("error_message"));
history.setReadCount(rs.getLong("read_count"));
history.setWriteCount(rs.getLong("write_count"));
history.setCommitCount(rs.getLong("commit_count"));
history.setRollbackCount(rs.getLong("rollback_count"));
history.setSkipCount(rs.getLong("skip_count"));
history.setBatchIds(rs.getString("batch_ids"));
history.setInferenceId(rs.getString("inference_id"));
return history;
},
jobExecutionId);
}
/**
* 최근 N개 실행 이력 조회
*
* @param limit 조회 개수
* @return 이력 목록
*/
public java.util.List<BatchExecutionHistory> findRecent(int limit) {
String sql =
"""
SELECT * FROM batch_execution_history
ORDER BY start_time DESC
LIMIT ?
""";
return jdbcTemplate.query(
sql,
(rs, rowNum) -> {
BatchExecutionHistory history = new BatchExecutionHistory();
history.setId(rs.getLong("id"));
history.setJobExecutionId(rs.getLong("job_execution_id"));
history.setStepExecutionId(rs.getLong("step_execution_id"));
history.setStepName(rs.getString("step_name"));
history.setStartTime(rs.getTimestamp("start_time").toLocalDateTime());
Timestamp endTimestamp = rs.getTimestamp("end_time");
if (endTimestamp != null) {
history.setEndTime(endTimestamp.toLocalDateTime());
}
history.setDurationMs(rs.getLong("duration_ms"));
history.setStatus(rs.getString("status"));
history.setExitCode(rs.getString("exit_code"));
return history;
},
limit);
}
}

View File

@@ -0,0 +1,59 @@
package com.kamco.makesample.batch.tasklet;
import com.kamco.makesample.writer.ResultZipWriter;
import java.nio.file.Path;
import java.nio.file.Paths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* ZIP 파일 생성 Tasklet
*
* <p>기존 ResultZipWriter를 재사용하여 shapefile 관련 파일들을 압축
*/
@Component
@StepScope
public class CreateZipTasklet implements Tasklet {
private static final Logger log = LoggerFactory.getLogger(CreateZipTasklet.class);
@Value("#{jobParameters['outputPath']}")
private String outputPath;
@Value("#{jobParameters['zipBaseName']}")
private String zipBaseName;
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
log.info("Creating ZIP file for shapefile: {}", outputPath);
// outputPath에서 디렉토리 추출
Path shapefilePath = Paths.get(outputPath);
Path dirPath = shapefilePath.getParent();
// 기존 ResultZipWriter 재사용
ResultZipWriter.createZip(dirPath, zipBaseName);
log.info("ZIP file created successfully: {}.zip", zipBaseName);
// ZIP 파일 경로를 JobExecutionContext에 저장 (GeoServer 등록에서 사용)
String zipPath = dirPath.resolve(zipBaseName + ".zip").toString();
chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("zipFilePath", zipPath);
return RepeatStatus.FINISHED;
}
}

View File

@@ -0,0 +1,83 @@
package com.kamco.makesample.batch.tasklet;
import com.kamco.makesample.service.GeoServerRegistrationService;
import java.io.File;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* GeoServer 등록 Tasklet
*
* <p>기존 GeoServerRegistrationService를 재사용하여 shapefile을 GeoServer에 등록
*
* <p>등록 방식: external.shp 엔드포인트를 사용한 file:// 경로 참조 (모든 파일 크기)
*
* <p>Conditional execution: geoserver.enabled=false 이면 skip
*/
@Component
@StepScope
public class GeoServerRegistrationTasklet implements Tasklet {
private static final Logger log = LoggerFactory.getLogger(GeoServerRegistrationTasklet.class);
private final GeoServerRegistrationService geoServerService;
@Value("#{jobParameters['geoserver.enabled'] ?: false}")
private boolean geoServerEnabled;
@Value("#{jobParameters['layerName']}")
private String layerName;
public GeoServerRegistrationTasklet(GeoServerRegistrationService geoServerService) {
this.geoServerService = geoServerService;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
if (!geoServerEnabled) {
log.info("GeoServer registration is disabled. Skipping.");
return RepeatStatus.FINISHED;
}
log.info("Starting GeoServer registration for layer: {}", layerName);
// JobExecutionContext에서 ZIP 파일 경로 가져오기
String zipPath =
(String)
chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.get("zipFilePath");
if (zipPath == null) {
log.error("ZIP file path not found in JobExecutionContext");
throw new IllegalStateException("ZIP file path not available for GeoServer registration");
}
// Log file size for monitoring
File zipFile = new File(zipPath);
long fileSize = zipFile.length();
long fileSizeMB = fileSize / 1024 / 1024;
log.info("ZIP file size: {} bytes ({} MB)", fileSize, fileSizeMB);
// Register using file:// path reference (external.shp endpoint) for all file sizes
log.info("Using file path reference method (external.shp endpoint)");
log.info("Note: GeoServer must have read access to the file path: {}", zipPath);
geoServerService.registerShapefileByPath(zipPath, layerName);
log.info("GeoServer registration completed successfully for layer: {}", layerName);
return RepeatStatus.FINISHED;
}
}

View File

@@ -0,0 +1,318 @@
package com.kamco.makesample.batch.tasklet;
import com.kamco.makesample.exception.MixedGeometryException;
import java.sql.Array;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Geometry Type 검증 Tasklet
*
* <p>Shapefile은 homogeneous geometry type을 요구하므로, chunk 처리 전에 사전 검증 필요
*
* <p>주요 역할:
*
* <ul>
* <li>SQL DISTINCT 쿼리로 geometry type 확인 (ST_Polygon, ST_MultiPolygon만 조회)
* <li>지원하지 않는 geometry 타입 발견 시 즉시 에러 발생 (fast-fail)
* <li>StepExecutionContext에 geometry type 저장 (Writer가 사용)
* </ul>
*/
@Component
@StepScope
public class GeometryTypeValidationTasklet implements Tasklet {
private static final Logger log = LoggerFactory.getLogger(GeometryTypeValidationTasklet.class);
private final DataSource dataSource;
@Value("#{jobParameters['batchIds']}")
private String batchIdsParam;
public GeometryTypeValidationTasklet(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext)
throws Exception {
log.info("========================================");
log.info("Step 1: Geometry Type Validation");
log.info("========================================");
log.info("Validating geometry types for batch_ids: {}", batchIdsParam);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 1. JobParameter를 Long 배열로 변환
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 예: "252,253,257" → [252L, 253L, 257L]
Long[] batchIds = parseBatchIds(batchIdsParam);
log.debug("Parsed batch IDs: {}", (Object) batchIds);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 1.1 전체 row 개수 조회 (검증 전)
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
String countAllSql =
"""
SELECT COUNT(*) as total_count
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND geometry IS NOT NULL
""";
long totalRows = 0;
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(countAllSql)) {
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
totalRows = rs.getLong("total_count");
}
}
}
log.info("Total rows with non-null geometry: {}", totalRows);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 2. SQL로 고유한 geometry type 조회 및 좌표계 검증
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// ST_GeometryType()는 "ST_Polygon", "ST_MultiPolygon" 등을 반환
// DISTINCT로 고유한 타입만 조회
// ST_Polygon, ST_MultiPolygon만 허용 (Point, LineString 등은 제외)
// geometry IS NOT NULL 조건으로 null geometry 제외
//
// EPSG:5186 좌표계 정합성 검증:
// - SRID가 5186인지 확인
// - 유효 범위: X(125000~530000m), Y(-600000~988000m) - 한국 중부 영역
// - ST_IsValid()로 geometry 유효성 검증
String sql =
"""
SELECT DISTINCT ST_GeometryType(geometry) as geom_type
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND geometry IS NOT NULL
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_IsValid(geometry) = true
AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
""";
List<String> geometryTypes = new ArrayList<>();
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(sql)) {
// PostgreSQL array 파라미터 설정
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String geomType = rs.getString("geom_type");
geometryTypes.add(geomType);
log.debug("Found geometry type: {}", geomType);
}
}
}
log.info("Found {} distinct geometry type(s): {}", geometryTypes.size(), geometryTypes);
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 2.1 검증 통과 row 개수 조회
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
String countValidSql =
"""
SELECT COUNT(*) as valid_count
FROM inference_results_testing
WHERE batch_id = ANY(?)
AND geometry IS NOT NULL
AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon')
AND ST_SRID(geometry) = 5186
AND ST_IsValid(geometry) = true
AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000
AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000
""";
long validRows = 0;
try (Connection conn = dataSource.getConnection();
PreparedStatement ps = conn.prepareStatement(countValidSql)) {
Array sqlArray = conn.createArrayOf("bigint", batchIds);
ps.setArray(1, sqlArray);
try (ResultSet rs = ps.executeQuery()) {
if (rs.next()) {
validRows = rs.getLong("valid_count");
}
}
}
long excludedRows = totalRows - validRows;
log.info("========================================");
log.info("📊 Geometry Validation Summary:");
log.info(" Total rows: {}", totalRows);
log.info(" Valid rows: {} (EPSG:5186 compliant)", validRows);
log.info(" Excluded rows: {} (invalid geometry or out of range)", excludedRows);
if (excludedRows > 0) {
log.warn(
"⚠️ {} rows excluded due to invalid geometry or coordinate out of EPSG:5186 range",
excludedRows);
}
log.info("========================================");
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 3. Mixed geometry type 체크 및 자동 변환 안내
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// Shapefile 제약사항: 하나의 shapefile은 단일 geometry type만 허용
// MultiPolygon이 포함된 경우 자동으로 Polygon으로 변환됨 (GeometryConverter)
//
// ⚠️ 참고: SQL 필터로 ST_Polygon, ST_MultiPolygon만 조회하므로
// 이론적으로는 이 두 타입만 존재해야 함
// 만약 다른 타입이 섞여 있다면 데이터 정합성 문제
if (geometryTypes.size() > 1) {
// ST_Polygon과 ST_MultiPolygon이 섞인 경우 → 자동 변환 허용
boolean hasPolygon = geometryTypes.stream().anyMatch(t -> t.equals("ST_Polygon"));
boolean hasMultiPolygon = geometryTypes.stream().anyMatch(t -> t.equals("ST_MultiPolygon"));
if (hasPolygon && hasMultiPolygon && geometryTypes.size() == 2) {
log.info("========================================");
log.info(" Mixed geometry types detected:");
log.info(" Types: {}", geometryTypes);
log.info("");
log.info("✅ Auto-conversion enabled:");
log.info(" ST_MultiPolygon → ST_Polygon (first polygon only)");
log.info(" This will unify all geometries to ST_Polygon type");
log.info("========================================");
// Polygon을 기본 타입으로 설정 (자동 변환 후 모든 geometry가 Polygon이 됨)
// Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("geometryType", "ST_Polygon");
log.info("✅ Geometry type validation PASSED with auto-conversion");
log.info(" Target Type: ST_Polygon");
log.info(" MultiPolygon geometries will be converted during processing");
log.info("========================================");
return RepeatStatus.FINISHED;
}
// 그 외의 혼합 타입은 즉시 에러 발생 (fast-fail)
// 예: ST_Polygon + ST_Point 등 (하지만 SQL 필터로 이미 제외되었어야 함)
log.error("❌ Unexpected mixed geometry types detected: {}", geometryTypes);
log.error("Shapefile requires homogeneous geometry type");
log.error("Only Polygon + MultiPolygon mix is supported with auto-conversion");
throw new MixedGeometryException(
"Shapefile requires homogeneous geometry type. Found: "
+ geometryTypes
+ ". Only Polygon + MultiPolygon mix is supported.");
}
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 4. 빈 데이터셋 체크
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 모든 geometry가 null이거나 데이터가 없는 경우
// 경고만 출력하고 통과 (Reader에서 읽을 데이터가 없으므로 Writer까지 가지 않음)
if (geometryTypes.isEmpty()) {
log.warn("========================================");
log.warn("WARNING: No valid geometries found in dataset");
log.warn("This may indicate:");
log.warn(" 1. All geometries are NULL");
log.warn(" 2. No data exists for the given batch_ids");
log.warn(" 3. Database connection issues");
log.warn("========================================");
log.warn("Proceeding with empty dataset (no files will be generated)");
// 빈 데이터셋이지만 Writer 초기화를 위해 기본 geometry type 설정
// ST_Polygon을 기본값으로 사용 (이 프로젝트의 주요 geometry type)
// Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("geometryType", "ST_Polygon");
log.info("Set default geometry type to ST_Polygon for empty dataset");
// 빈 데이터셋이지만 Step은 성공으로 처리
// 다음 Step(Reader)에서 읽을 데이터가 없으므로 자연스럽게 종료됨
return RepeatStatus.FINISHED;
}
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 5. StepExecutionContext에 geometry type 저장
// ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
// 이후 Step에서 사용:
// - FeatureConversionProcessor: featureType 생성 시 사용
// - StreamingShapefileWriter: shapefile schema 생성 시 사용
String geometryType = geometryTypes.get(0);
// MultiPolygon 타입인 경우 Polygon으로 변환됨을 안내
if (geometryType.equals("ST_MultiPolygon")) {
log.info("========================================");
log.info(" Geometry type: {}", geometryType);
log.info("");
log.info("✅ Auto-conversion will be applied:");
log.info(" ST_MultiPolygon → ST_Polygon (first polygon only)");
log.info(" All MultiPolygon geometries will be converted during processing");
log.info("========================================");
// Polygon으로 저장 (변환 후 타입)
geometryType = "ST_Polygon";
}
// Job ExecutionContext에 저장 (다음 Step에서 읽을 수 있도록)
chunkContext
.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext()
.putString("geometryType", geometryType);
log.info("========================================");
log.info("✅ Geometry type validation PASSED");
log.info(" Geometry Type: {}", geometryType);
log.info(" All geometries are homogeneous (or will be converted)");
log.info("========================================");
return RepeatStatus.FINISHED;
}
/**
* JobParameter 문자열을 Long 배열로 변환
*
* @param batchIdsParam "252,253,257" 형태의 문자열
* @return Long 배열
*/
private Long[] parseBatchIds(String batchIdsParam) {
if (batchIdsParam == null || batchIdsParam.trim().isEmpty()) {
throw new IllegalArgumentException("batchIds parameter is required");
}
String[] parts = batchIdsParam.split(",");
Long[] batchIds = new Long[parts.length];
for (int i = 0; i < parts.length; i++) {
batchIds[i] = Long.parseLong(parts[i].trim());
}
return batchIds;
}
}

View File

@@ -0,0 +1,75 @@
package com.kamco.makesample.batch.util;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.feature.simple.SimpleFeatureTypeBuilder;
import org.locationtech.jts.geom.Geometry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
/**
* SimpleFeatureType 생성 유틸리티
*
* <p>Processor와 Writer가 공유하는 featureType 생성 로직
*
* <p>일관성 보장: 동일한 스키마를 사용하여 feature 생성 및 저장
*/
@Component
public class FeatureTypeFactory {
private static final Logger log = LoggerFactory.getLogger(FeatureTypeFactory.class);
/**
* SimpleFeatureType 생성
*
* @param crs CoordinateReferenceSystem
* @param geomType Geometry 타입 (Polygon, MultiPolygon 등)
* @return SimpleFeatureType
*/
public SimpleFeatureType createFeatureType(CoordinateReferenceSystem crs, Class<?> geomType) {
SimpleFeatureTypeBuilder builder = new SimpleFeatureTypeBuilder();
builder.setName("inference_results");
builder.setCRS(crs);
// Geometry 필드를 기본 geometry로 설정
builder.add("the_geom", geomType);
builder.setDefaultGeometry("the_geom");
// 속성 필드들
builder.add("uid", String.class);
builder.add("map_id", String.class);
builder.add("chn_dtct_p", String.class);
builder.add("cprs_yr", Long.class);
builder.add("crtr_yr", Long.class);
builder.add("bf_cls_cd", String.class);
builder.add("bf_cls_pro", String.class);
builder.add("af_cls_cd", String.class);
builder.add("af_cls_pro", String.class);
return builder.buildFeatureType();
}
/**
* Geometry type 문자열을 Class로 변환
*
* @param geomTypeStr "ST_Polygon", "Polygon" 등
* @return Geometry Class
*/
public Class<?> parseGeometryType(String geomTypeStr) {
if (geomTypeStr == null || geomTypeStr.isEmpty()) {
return Geometry.class;
}
// PostGIS ST_GeometryType() 함수는 "ST_Polygon" 형태로 반환
// "ST_" 접두어 제거
String typeName = geomTypeStr.replace("ST_", "");
try {
return Class.forName("org.locationtech.jts.geom." + typeName);
} catch (ClassNotFoundException e) {
log.warn("Unknown geometry type: {}, using Geometry.class", typeName);
return Geometry.class;
}
}
}

View File

@@ -0,0 +1,212 @@
package com.kamco.makesample.batch.writer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.geojson.feature.FeatureJSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.annotation.OnWriteError;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Map ID별 GeoJSON Writer
*
* <p>Partitioned Step의 worker에서 실행되며, StepExecutionContext로부터 map_id별 geoJsonOutputPath를 읽어 개별
* GeoJSON 파일을 생성합니다.
*
* <p>StreamingGeoJsonWriter와 동일한 스트리밍 패턴을 사용하여 메모리 효율적으로 처리합니다.
*/
@Component
@StepScope
public class MapIdGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(MapIdGeoJsonWriter.class);
@Value("#{stepExecutionContext['mapId']}")
private String mapId;
@Value("#{stepExecutionContext['geoJsonOutputPath']}")
private String outputPath;
private FileOutputStream outputStream;
private FeatureJSON featureJSON;
private int chunkCount = 0;
private int totalRecordCount = 0;
private boolean isFirstChunk = true;
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
log.info(
"MapIdGeoJsonWriter injected values: mapId='{}', outputPath='{}'",
this.mapId,
this.outputPath);
// @Value로 주입된 값 검증
ExecutionContext executionContext = stepExecution.getExecutionContext();
if (this.mapId == null || this.outputPath == null) {
throw new IllegalStateException(
String.format(
"MapIdGeoJsonWriter requires non-null 'mapId' and 'geoJsonOutputPath' from @Value injection. "
+ "Got mapId='%s', geoJsonOutputPath='%s'. Available keys in ExecutionContext: %s",
this.mapId, this.outputPath, executionContext.entrySet()));
}
log.info("MapIdGeoJsonWriter initialized for map_id: {}, output: {}", mapId, outputPath);
// 출력 디렉토리 생성
try {
Path outputDir = Paths.get(outputPath).getParent();
if (outputDir != null && !Files.exists(outputDir)) {
Files.createDirectories(outputDir);
log.info("Created output directory for map_id {} GeoJSON: {}", mapId, outputDir);
}
} catch (IOException e) {
throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e);
}
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
log.info("Opening GeoJSON writer for map_id: {}", mapId);
try {
File geoJsonFile = new File(outputPath);
outputStream = new FileOutputStream(geoJsonFile);
featureJSON = new FeatureJSON();
// GeoJSON FeatureCollection 시작
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
log.info("GeoJSON file initialized for map_id: {}", mapId);
} catch (IOException e) {
throw new ItemStreamException("Failed to open GeoJSON file for map_id " + mapId, e);
}
}
@Override
public void write(Chunk<? extends SimpleFeature> chunk) throws Exception {
if (chunk.isEmpty()) {
return;
}
chunkCount++;
List<SimpleFeature> items = (List<SimpleFeature>) chunk.getItems();
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"[map_id: {}] Writing chunk #{} to GeoJSON with {} features (total: {})",
mapId,
chunkCount,
itemCount,
totalRecordCount);
// 각 feature를 GeoJSON으로 변환하여 append
for (int i = 0; i < items.size(); i++) {
SimpleFeature feature = items.get(i);
// 첫 번째 feature가 아니면 콤마 추가
if (!isFirstChunk || i > 0) {
outputStream.write(",".getBytes());
}
// Feature를 GeoJSON으로 직렬화
// StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
StringWriter stringWriter = new StringWriter();
featureJSON.writeFeature(feature, stringWriter);
outputStream.write(stringWriter.toString().getBytes());
}
isFirstChunk = false;
log.debug("[map_id: {}] Chunk #{} written to GeoJSON successfully", mapId, chunkCount);
}
@AfterStep
public void afterStep() {
log.info(
"[map_id: {}] All chunks written to GeoJSON. Total {} records in {} chunks",
mapId,
totalRecordCount,
chunkCount);
try {
if (outputStream != null) {
// GeoJSON FeatureCollection 종료
outputStream.write("]}".getBytes());
outputStream.flush();
log.info("[map_id: {}] GeoJSON file finalized successfully", mapId);
}
} catch (IOException e) {
log.error("[map_id: {}] Failed to finalize GeoJSON file", mapId, e);
throw new ItemStreamException("Failed to finalize GeoJSON file for map_id: " + mapId, e);
} finally {
cleanup();
}
}
@Override
public void close() throws ItemStreamException {
cleanup();
}
@OnWriteError
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
log.error(
"[map_id: {}] Error writing chunk #{} to GeoJSON: {}",
mapId,
chunkCount,
exception.getMessage(),
exception);
cleanup();
// 부분 파일 삭제
try {
File geoJsonFile = new File(outputPath);
if (geoJsonFile.exists()) {
geoJsonFile.delete();
log.info("[map_id: {}] Deleted partial GeoJSON file", mapId);
}
} catch (Exception e) {
log.warn("[map_id: {}] Failed to delete partial GeoJSON file", mapId, e);
}
}
private void cleanup() {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
log.warn("[map_id: {}] Failed to close GeoJSON output stream", mapId, e);
}
outputStream = null;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// Checkpoint
executionContext.putInt("chunkCount", chunkCount);
executionContext.putInt("totalRecordCount", totalRecordCount);
}
}

View File

@@ -0,0 +1,290 @@
package com.kamco.makesample.batch.writer;
import com.kamco.makesample.batch.util.FeatureTypeFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.geotools.api.data.SimpleFeatureStore;
import org.geotools.api.data.Transaction;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.api.feature.simple.SimpleFeatureType;
import org.geotools.api.referencing.FactoryException;
import org.geotools.api.referencing.crs.CoordinateReferenceSystem;
import org.geotools.data.DefaultTransaction;
import org.geotools.data.collection.ListFeatureCollection;
import org.geotools.data.shapefile.ShapefileDataStore;
import org.geotools.data.shapefile.ShapefileDataStoreFactory;
import org.geotools.referencing.CRS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.annotation.OnWriteError;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* Map ID별 Shapefile Writer
*
* <p>Partitioned Step의 worker에서 실행되며, StepExecutionContext로부터 map_id별 outputPath를 읽어 개별 shapefile을
* 생성합니다.
*
* <p>StreamingShapefileWriter와 동일한 스트리밍 패턴을 사용하여 메모리 효율적으로 처리합니다.
*/
@Component
@StepScope
public class MapIdShapefileWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(MapIdShapefileWriter.class);
private final FeatureTypeFactory featureTypeFactory;
@Value("#{jobParameters['crs'] ?: 'EPSG:5186'}")
private String crsCode;
@Value("#{stepExecutionContext['mapId']}")
private String mapId;
@Value("#{stepExecutionContext['outputPath']}")
private String outputPath;
@Value("#{stepExecutionContext['geometryType']}")
private String geometryTypeStr;
private ShapefileDataStore dataStore;
private Transaction transaction;
private SimpleFeatureStore featureStore;
private SimpleFeatureType featureType;
private int chunkCount = 0;
private int totalRecordCount = 0;
private Class<?> geometryType;
public MapIdShapefileWriter(FeatureTypeFactory featureTypeFactory) {
this.featureTypeFactory = featureTypeFactory;
}
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
log.info("===== MapIdShapefileWriter.beforeStep() START =====");
log.info(
"Injected values: mapId='{}', outputPath='{}', geometryTypeStr='{}'",
this.mapId,
this.outputPath,
this.geometryTypeStr);
ExecutionContext stepExecutionContext = stepExecution.getExecutionContext();
// Validate all required injections
if (this.mapId == null || this.outputPath == null) {
String errorMsg =
String.format(
"MapIdShapefileWriter requires non-null 'mapId' and 'outputPath'. "
+ "Got mapId='%s', outputPath='%s'. Available keys: %s",
this.mapId, this.outputPath, stepExecutionContext.entrySet());
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
// Validate geometryTypeStr (fail fast)
if (this.geometryTypeStr == null || this.geometryTypeStr.isEmpty()) {
String errorMsg =
String.format(
"MapIdShapefileWriter requires non-null 'geometryType' from stepExecutionContext. "
+ "Got geometryTypeStr='%s'. Should be propagated by MapIdPartitioner. "
+ "Available keys: %s",
this.geometryTypeStr, stepExecutionContext.entrySet());
log.error(errorMsg);
throw new IllegalStateException(errorMsg);
}
log.info(
"MapIdShapefileWriter initialized for map_id: {}, output: {}, geometryType: {}",
mapId,
outputPath,
geometryTypeStr);
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
log.info("Opening shapefile writer for map_id: {}", mapId);
log.info("Using geometryTypeStr from stepExecutionContext: {}", geometryTypeStr);
// 출력 디렉토리 생성 (GeoTools가 파일을 만들기 전에 반드시 필요)
try {
Path outputDir = Paths.get(outputPath).getParent();
if (outputDir != null && !Files.exists(outputDir)) {
Files.createDirectories(outputDir);
log.info("Created output directory for map_id {}: {}", mapId, outputDir);
}
} catch (IOException e) {
throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e);
}
try {
// Direct parsing (no fallback) - validated in beforeStep()
this.geometryType = featureTypeFactory.parseGeometryType(geometryTypeStr);
log.info(
"Parsed geometry type for map_id {}: {} (from: {})",
mapId,
geometryType.getSimpleName(),
geometryTypeStr);
// CRS 설정
CoordinateReferenceSystem crs = CRS.decode(crsCode);
// SimpleFeatureType 생성
featureType = featureTypeFactory.createFeatureType(crs, geometryType);
// ShapefileDataStore 생성
File shpFile = new File(outputPath);
ShapefileDataStoreFactory factory = new ShapefileDataStoreFactory();
Map<String, Serializable> params = new HashMap<>();
params.put("url", shpFile.toURI().toURL());
params.put("create spatial index", Boolean.TRUE);
dataStore = (ShapefileDataStore) factory.createNewDataStore(params);
dataStore.createSchema(featureType);
// Transaction 시작
transaction = new DefaultTransaction("create-" + mapId);
// FeatureStore 가져오기
String typeName = dataStore.getTypeNames()[0];
featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName);
featureStore.setTransaction(transaction);
log.info(
"ShapefileDataStore initialized for map_id: {} with geometry type: {}",
mapId,
geometryType.getSimpleName());
} catch (FactoryException e) {
throw new ItemStreamException("Invalid CRS code: " + crsCode, e);
} catch (IOException e) {
throw new ItemStreamException("Failed to create shapefile for map_id " + mapId, e);
}
}
@Override
public void write(Chunk<? extends SimpleFeature> chunk) throws Exception {
if (chunk.isEmpty()) {
return;
}
chunkCount++;
List<SimpleFeature> items = (List<SimpleFeature>) chunk.getItems();
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"[map_id: {}] Writing chunk #{} with {} features (total: {})",
mapId,
chunkCount,
itemCount,
totalRecordCount);
// ListFeatureCollection으로 변환하여 FeatureStore에 추가
ListFeatureCollection collection = new ListFeatureCollection(featureType, items);
featureStore.addFeatures(collection);
log.debug("[map_id: {}] Chunk #{} written successfully", mapId, chunkCount);
}
@AfterStep
public void afterStep() {
log.info(
"[map_id: {}] AfterStep called. Total {} records in {} chunks",
mapId,
totalRecordCount,
chunkCount);
}
@Override
public void close() throws ItemStreamException {
log.info(
"[map_id: {}] Closing shapefile writer. Committing {} records in {} chunks",
mapId,
totalRecordCount,
chunkCount);
try {
if (transaction != null) {
transaction.commit();
log.info("[map_id: {}] Transaction committed successfully", mapId);
}
} catch (IOException e) {
log.error("[map_id: {}] Failed to commit transaction", mapId, e);
throw new ItemStreamException(
"Failed to commit shapefile transaction for map_id: " + mapId, e);
} finally {
cleanup();
}
}
@OnWriteError
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
log.error(
"[map_id: {}] Error writing chunk #{}: {}",
mapId,
chunkCount,
exception.getMessage(),
exception);
try {
if (transaction != null) {
transaction.rollback();
log.info("[map_id: {}] Transaction rolled back", mapId);
}
// 부분 파일 삭제
File shpFile = new File(outputPath);
if (shpFile.exists()) {
shpFile.delete();
log.info("[map_id: {}] Deleted partial shapefile", mapId);
}
} catch (IOException e) {
log.error("[map_id: {}] Failed to rollback transaction", mapId, e);
} finally {
cleanup();
}
}
private void cleanup() {
if (transaction != null) {
try {
transaction.close();
} catch (IOException e) {
log.warn("[map_id: {}] Failed to close transaction", mapId, e);
}
transaction = null;
}
if (dataStore != null) {
dataStore.dispose();
dataStore = null;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// Checkpoint
executionContext.putInt("chunkCount", chunkCount);
executionContext.putInt("totalRecordCount", totalRecordCount);
}
}

View File

@@ -0,0 +1,189 @@
package com.kamco.makesample.batch.writer;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.geotools.api.feature.simple.SimpleFeature;
import org.geotools.geojson.feature.FeatureJSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.annotation.AfterStep;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.core.annotation.OnWriteError;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamWriter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 스트리밍 GeoJSON Writer
*
* <p>StreamingShapefileWriter와 유사한 패턴으로 chunk 단위 스트리밍 처리
*
* <p>메모리 효과:
*
* <ul>
* <li>기존: 전체 데이터를 DefaultFeatureCollection에 누적
* <li>신규: chunk 단위로 GeoJSON 스트림에 append
* </ul>
*/
@Component
@StepScope
public class StreamingGeoJsonWriter implements ItemStreamWriter<SimpleFeature> {
private static final Logger log = LoggerFactory.getLogger(StreamingGeoJsonWriter.class);
@Value("#{jobParameters['geoJsonOutputPath']}")
private String outputPath;
private FileOutputStream outputStream;
private FeatureJSON featureJSON;
private int chunkCount = 0;
private int totalRecordCount = 0;
private boolean isFirstChunk = true;
@BeforeStep
public void beforeStep() {
// 출력 디렉토리 생성
try {
Path outputDir = Paths.get(outputPath).getParent();
if (outputDir != null && !Files.exists(outputDir)) {
Files.createDirectories(outputDir);
log.info("Created output directory for GeoJSON: {}", outputDir);
}
} catch (IOException e) {
throw new ItemStreamException("Failed to create output directory", e);
}
}
@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
log.info("Opening StreamingGeoJsonWriter for: {}", outputPath);
try {
File geoJsonFile = new File(outputPath);
outputStream = new FileOutputStream(geoJsonFile);
featureJSON = new FeatureJSON();
// GeoJSON FeatureCollection 시작
outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes());
log.info("GeoJSON file initialized successfully");
} catch (IOException e) {
throw new ItemStreamException("Failed to open GeoJSON file: " + outputPath, e);
}
}
@Override
public void write(Chunk<? extends SimpleFeature> chunk) throws Exception {
if (chunk.isEmpty()) {
return;
}
chunkCount++;
List<SimpleFeature> items = (List<SimpleFeature>) chunk.getItems();
int itemCount = items.size();
totalRecordCount += itemCount;
log.debug(
"Writing chunk #{} to GeoJSON with {} features (total so far: {})",
chunkCount,
itemCount,
totalRecordCount);
// 각 feature를 GeoJSON으로 변환하여 append
for (int i = 0; i < items.size(); i++) {
SimpleFeature feature = items.get(i);
// 첫 번째 feature가 아니면 콤마 추가
if (!isFirstChunk || i > 0) {
outputStream.write(",".getBytes());
}
// Feature를 GeoJSON으로 직렬화
// StringWriter를 사용하여 FeatureJSON이 스트림을 닫지 못하도록 방지
StringWriter stringWriter = new StringWriter();
featureJSON.writeFeature(feature, stringWriter);
outputStream.write(stringWriter.toString().getBytes());
}
isFirstChunk = false;
log.debug("Chunk #{} written to GeoJSON successfully", chunkCount);
}
@AfterStep
public void afterStep() {
log.info(
"All chunks written to GeoJSON. Total {} records in {} chunks",
totalRecordCount,
chunkCount);
try {
if (outputStream != null) {
// GeoJSON FeatureCollection 종료
outputStream.write("]}".getBytes());
outputStream.flush();
log.info("GeoJSON file finalized successfully");
}
} catch (IOException e) {
log.error("Failed to finalize GeoJSON file", e);
throw new ItemStreamException("Failed to finalize GeoJSON file", e);
} finally {
cleanup();
}
}
@Override
public void close() throws ItemStreamException {
cleanup();
}
@OnWriteError
public void onError(Exception exception, Chunk<? extends SimpleFeature> chunk) {
log.error(
"Error writing chunk #{} to GeoJSON: {}", chunkCount, exception.getMessage(), exception);
cleanup();
// 부분 파일 삭제
try {
File geoJsonFile = new File(outputPath);
if (geoJsonFile.exists()) {
geoJsonFile.delete();
log.info("Deleted partial GeoJSON file: {}", outputPath);
}
} catch (Exception e) {
log.warn("Failed to delete partial GeoJSON file", e);
}
}
private void cleanup() {
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
log.warn("Failed to close GeoJSON output stream", e);
}
outputStream = null;
}
}
@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
// Checkpoint
executionContext.putInt("chunkCount", chunkCount);
executionContext.putInt("totalRecordCount", totalRecordCount);
}
}

Some files were not shown because too many files have changed in this diff Show More