diff --git a/shp-exporter/.gitignore b/shp-exporter/.gitignore new file mode 100644 index 0000000..0bf169d --- /dev/null +++ b/shp-exporter/.gitignore @@ -0,0 +1,76 @@ +HELP.md +.gradle +build/ +!gradle/wrapper/gradle-wrapper.jar +!**/src/main/**/build/ +!**/src/test/**/build/ + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache +bin/ +!**/src/main/**/bin/ +!**/src/test/**/bin/ + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr +out/ +!**/src/main/**/out/ +!**/src/test/**/out/ + +### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ + +### VS Code ### +.vscode/ + +### QueryDSL ### +/src/main/generated/ +**/generated/ + +### Logs ### +*.log +logs/ +*.log.* + +### OS ### +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes +ehthumbs.db +Thumbs.db + +### Environment ### +.env +.env.local +.env.*.local +application-local.yml +application-secret.yml +metrics-collector/.env + +### Docker (local testing) ### +.dockerignore +docker-compose.override.yml + +### Temporary ### +*.tmp +*.temp +*.swp +*.swo +*~ +!/CLAUDE.md + diff --git a/shp-exporter/.gradle/8.14.3/executionHistory/executionHistory.bin b/shp-exporter/.gradle/8.14.3/executionHistory/executionHistory.bin index 8ff7da3..27f8d5a 100755 Binary files a/shp-exporter/.gradle/8.14.3/executionHistory/executionHistory.bin and b/shp-exporter/.gradle/8.14.3/executionHistory/executionHistory.bin differ diff --git a/shp-exporter/.gradle/8.14.3/executionHistory/executionHistory.lock b/shp-exporter/.gradle/8.14.3/executionHistory/executionHistory.lock index d0d91d8..f68b3e0 100755 Binary files a/shp-exporter/.gradle/8.14.3/executionHistory/executionHistory.lock and b/shp-exporter/.gradle/8.14.3/executionHistory/executionHistory.lock differ diff --git a/shp-exporter/.gradle/8.14.3/fileHashes/fileHashes.bin b/shp-exporter/.gradle/8.14.3/fileHashes/fileHashes.bin index b03a5d7..44b49fe 100755 Binary files a/shp-exporter/.gradle/8.14.3/fileHashes/fileHashes.bin and b/shp-exporter/.gradle/8.14.3/fileHashes/fileHashes.bin differ diff --git a/shp-exporter/.gradle/8.14.3/fileHashes/fileHashes.lock b/shp-exporter/.gradle/8.14.3/fileHashes/fileHashes.lock index 26da333..31b7352 100755 Binary files a/shp-exporter/.gradle/8.14.3/fileHashes/fileHashes.lock and b/shp-exporter/.gradle/8.14.3/fileHashes/fileHashes.lock differ diff --git a/shp-exporter/.gradle/8.14.3/fileHashes/resourceHashesCache.bin b/shp-exporter/.gradle/8.14.3/fileHashes/resourceHashesCache.bin index 067ca19..8261caf 100755 Binary files a/shp-exporter/.gradle/8.14.3/fileHashes/resourceHashesCache.bin and b/shp-exporter/.gradle/8.14.3/fileHashes/resourceHashesCache.bin differ diff --git a/shp-exporter/.gradle/buildOutputCleanup/buildOutputCleanup.lock b/shp-exporter/.gradle/buildOutputCleanup/buildOutputCleanup.lock index 3320cbc..ab6e363 100755 Binary files a/shp-exporter/.gradle/buildOutputCleanup/buildOutputCleanup.lock and b/shp-exporter/.gradle/buildOutputCleanup/buildOutputCleanup.lock differ diff --git a/shp-exporter/.gradle/buildOutputCleanup/outputFiles.bin b/shp-exporter/.gradle/buildOutputCleanup/outputFiles.bin index 5709c06..50c5eef 100755 Binary files a/shp-exporter/.gradle/buildOutputCleanup/outputFiles.bin and b/shp-exporter/.gradle/buildOutputCleanup/outputFiles.bin differ diff --git a/shp-exporter/.gradle/file-system.probe b/shp-exporter/.gradle/file-system.probe index aecd84c..277d8aa 100755 Binary files a/shp-exporter/.gradle/file-system.probe and b/shp-exporter/.gradle/file-system.probe differ diff --git a/shp-exporter/CLAUDE.md b/shp-exporter/CLAUDE.md index c093f85..bad6f2e 100755 --- a/shp-exporter/CLAUDE.md +++ b/shp-exporter/CLAUDE.md @@ -1,11 +1,10 @@ - # CLAUDE.md This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. ## Project Overview -Spring Boot CLI application that queries PostgreSQL PostGIS spatial data and converts it to ESRI shapefiles. The application processes inference results from the KAMCO database and generates geographic shapefiles for visualization in GIS applications. +Spring Boot CLI application that queries PostgreSQL PostGIS spatial data and converts it to ESRI shapefiles and GeoJSON. The application processes AI inference results from the KAMCO database and generates geographic data files for visualization in GIS applications. It also supports automatic registration of shapefiles to GeoServer via REST API. ## Build and Run Commands @@ -14,14 +13,46 @@ Spring Boot CLI application that queries PostgreSQL PostGIS spatial data and con ./gradlew build ``` +The built JAR will be named `shp-exporter.jar` (configured in `bootJar` task). + ### Run Application + +#### Generate Shapefiles ```bash ./gradlew bootRun ``` -Or run the built JAR: +Or using JAR: ```bash -java -jar build/libs/makesample-1.0.0.jar +java -jar build/libs/shp-exporter.jar +``` + +#### Upload Shapefile to GeoServer +Set environment variables first: +```bash +export GEOSERVER_USERNAME=admin +export GEOSERVER_PASSWORD=geoserver +``` + +Then upload: +```bash +./gradlew bootRun --args="--upload-shp /path/to/file.shp --layer layer_name" +``` + +Or using JAR: +```bash +java -jar build/libs/shp-exporter.jar --upload-shp /path/to/file.shp --layer layer_name +``` + +#### Override Configuration via Command Line +Using Gradle (recommended - no quoting issues): +```bash +./gradlew bootRun --args="--converter.inference-id=ABC123 --converter.map-ids[0]=35813030 --converter.batch-ids[0]=252 --converter.mode=MERGED" +``` + +Using JAR with zsh (quote arguments with brackets): +```bash +java -jar build/libs/shp-exporter.jar '--converter.inference-id=ABC123' '--converter.map-ids[0]=35813030' ``` ### Code Formatting @@ -35,88 +66,127 @@ Check formatting without applying: ./gradlew spotlessCheck ``` +### Active Profile +By default, the application runs with `spring.profiles.active=prod` (set in `application.yml`). Profile-specific configurations are in `application-{profile}.yml` files. + ## Architecture ### Processing Pipeline The application follows a layered architecture with a linear data flow: -1. **CLI Entry** (`ConverterCommandLineRunner`) → Reads configuration and initiates batch processing -2. **Service Orchestration** (`ShapefileConverterService`) → Coordinates the conversion workflow for each map_id -3. **Data Access** (`InferenceResultRepository`) → Queries PostGIS database and converts WKT to JTS geometries -4. **Geometry Conversion** (`GeometryConverter`) → Converts PostGIS WKT format to JTS Geometry objects -5. **Shapefile Writing** (`ShapefileWriter`) → Uses GeoTools to generate shapefile artifacts (.shp, .shx, .dbf, .prj) +1. **CLI Entry** (`ConverterCommandLineRunner`) → Parses command-line args and routes to either shapefile generation or GeoServer upload +2. **Service Orchestration** (`ShapefileConverterService`) → Coordinates the conversion workflow based on mode (MERGED, MAP_IDS, or RESOLVE) +3. **Data Access** (`InferenceResultRepository`) → Queries PostGIS database using `PreparedStatementCreator` for PostgreSQL array parameters +4. **Geometry Conversion** (`GeometryConverter`) → Converts PostGIS WKT format to JTS Geometry objects using `WKTReader` +5. **File Writing** (`ShapefileWriter`, `GeoJsonWriter`, `ResultZipWriter`) → Generates output files using GeoTools +6. **GeoServer Integration** (`GeoServerRegistrationService`) → Registers shapefiles to GeoServer via REST API (optional) ### Key Design Points -**Geometry Handling**: The application uses a two-step geometry conversion process: -- PostGIS returns geometries as WKT (Well-Known Text) via `ST_AsText(geometry)` -- `GeometryConverter` parses WKT to JTS `Geometry` objects -- `ShapefileWriter` uses JTS geometries with GeoTools to write shapefiles +**Conversion Modes**: The application supports three execution modes controlled by `converter.mode`: +- `MERGED`: Creates a single shapefile for all data matching `batch-ids` (ignores `map-ids`) +- `MAP_IDS`: Processes only the `map-ids` specified in configuration (requires `map-ids` to be set) +- `RESOLVE`: Queries the database for all distinct `map-ids` matching `batch-ids`, then processes each (avoids OS command-line length limits) +- If `mode` is unspecified: defaults to `MERGED` if `map-ids` is empty, otherwise `MAP_IDS` -**Batch Processing**: Configuration in `application.yml` drives batch execution: -- Multiple `map-ids` processed sequentially (if specified) -- If `map-ids` is null/empty, creates a merged shapefile for all batch-ids -- Each map_id filtered by `batch-ids` array -- Output directory structure: `{output-base-dir}/{inference-id}/{map-id}/` or `{output-base-dir}/{inference-id}/merge/` for merged mode -- Separate output directory created for each map_id +**Geometry Handling**: Two-step conversion process: +- PostGIS returns geometries as WKT (Well-Known Text) via `ST_AsText(geometry)` in SQL query +- `GeometryConverter` parses WKT to JTS `Geometry` objects using `WKTReader` +- `ShapefileWriter` uses JTS geometries with GeoTools to write shapefile artifacts (.shp, .shx, .dbf, .prj) -**Shapefile Constraints**: The application validates that all geometries for a single shapefile are homogeneous (same type) because shapefiles cannot contain mixed geometry types. This validation happens in `ShapefileConverterService.validateGeometries()`. - -**Feature Schema**: GeoTools requires explicit geometry field setup: -- Default geometry field named `the_geom` (not `geometry`) -- Field names truncated to 10 characters for DBF format compatibility +**Shapefile Constraints**: +- Validates all geometries are homogeneous (same type) via `ShapefileConverterService.validateGeometries()` +- Shapefiles cannot contain mixed geometry types (e.g., cannot mix Polygon and Point) - Geometry type determined from first valid geometry in result set +**Output Structure**: +- For MAP_IDS/RESOLVE mode: `{output-base-dir}/{inference-id}/{map-id}/` +- For MERGED mode: `{output-base-dir}/{inference-id}/merge/` +- Each directory contains: `.shp`, `.shx`, `.dbf`, `.prj`, `.geojson`, and `.zip` files + +**PostgreSQL Array Parameters**: The repository uses `PreparedStatementCreator` to handle PostgreSQL array syntax: +```java +Array batchIdsArray = con.createArrayOf("bigint", batchIds.toArray()); +ps.setArray(1, batchIdsArray); +``` +This enables `WHERE batch_id = ANY(?)` queries. + +**GeoServer Integration**: +- Workspace 'cd' must be pre-created in GeoServer before registration +- Uses environment variables `GEOSERVER_USERNAME` and `GEOSERVER_PASSWORD` for authentication +- Supports automatic deletion and re-registration when `overwrite-existing: true` +- Non-blocking: registration failures are logged but don't stop the application + ## Configuration -Primary configuration in `src/main/resources/application.yml`: +Configuration files are located in `src/main/resources/`: +- `application.yml`: Base configuration (sets active profile) +- `application-prod.yml`: Production database and converter settings +- `application-dev.yml`: Development settings +- `application-local.yml`: Local development settings +### Converter Configuration ```yaml converter: - inference-id: 'D5E46F60FC40B1A8BE0CD1F3547AA6' # Inference ID (used for output folder structure) - map-ids: ['35813030'] # List of map_ids to process (text type), omit for merged shapefile - batch-ids: [252, 253, 257] # Batch ID array filter - output-base-dir: '/kamco-nfs/dataset/export/' # Base directory for shapefile output - crs: 'EPSG:5186' # Korean 2000 / Central Belt CRS + inference-id: 'D5E46F60FC40B1A8BE0CD1F3547AA6' + map-ids: [] # Optional: list of map_ids, or empty for merged mode + batch-ids: [252, 253, 257] # Required: batch ID filter + mode: 'MERGED' # Optional: MERGED, MAP_IDS, or RESOLVE + output-base-dir: '/data/model_output/export/' + crs: 'EPSG:5186' # Korean 2000 / Central Belt CRS ``` -Database connection configured via standard Spring Boot datasource properties. +### GeoServer Configuration +```yaml +geoserver: + base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver' + workspace: 'cd' + overwrite-existing: true + connection-timeout: 30000 + read-timeout: 60000 + username: 'admin' # Optional: prefer environment variables + password: 'geoserver' # Optional: prefer environment variables +``` ## Database Integration ### Query Pattern -The repository uses `PreparedStatementCreator` to handle PostgreSQL array parameters: -```sql -WHERE batch_id = ANY(?) AND map_id = ? -``` +All queries filter by `batch_id = ANY(?)` and include `after_c IS NOT NULL AND after_p IS NOT NULL` to ensure data quality. -The `ANY(?)` clause requires creating a PostgreSQL array using `Connection.createArrayOf("bigint", ...)`. +Primary queries: +- `findByMapId(batchIds, mapId)`: Retrieve records for a specific map_id +- `findByBatchIds(batchIds)`: Retrieve all records for batch_ids (merged mode) +- `findMapIdByBatchIds(batchIds)`: Query distinct map_ids for RESOLVE mode ### Field Mapping -Database columns are mapped to shapefile fields with Korean naming: +Database columns map to shapefile fields (note: shapefile field names limited to 10 characters): -| Database Column | Shapefile Field | -|-----------------|-----------------| -| uid | uid | -| map_id | map_id | -| probability | chn_dtct_prob | -| before_year | cprs_yr | -| after_year | crtr_yr | -| before_c | bf_cls_cd | -| before_p | bf_cls_prob | -| after_c | af_cls_cd | -| after_p | af_cls_prob | -| geometry | the_geom | +| Database Column | DB Type | Shapefile Field | Shapefile Type | +|-----------------|---------|-----------------|----------------| +| uid | uuid | uid | String | +| map_id | text | map_id | String | +| probability | float8 | chn_dtct_p | String | +| before_year | bigint | cprs_yr | Long | +| after_year | bigint | crtr_yr | Long | +| before_c | text | bf_cls_cd | String | +| before_p | float8 | bf_cls_pro | String | +| after_c | text | af_cls_cd | String | +| after_p | float8 | af_cls_pro | String | +| geometry | geom | the_geom | Polygon | + +**Note**: Probability and classification probability fields are stored as Strings in shapefiles (converted via `String.valueOf()`) to preserve precision. ### Coordinate Reference System -All geometries use **EPSG:5186** (Korean 2000 / Central Belt). The PostGIS geometry column is defined as `geometry(Polygon, 5186)`, and this CRS is preserved in the output shapefile's `.prj` file via GeoTools CRS encoding. +All geometries use **EPSG:5186** (Korean 2000 / Central Belt). The PostGIS geometry column is `geometry(Polygon, 5186)`, and this CRS is encoded in the output shapefile's `.prj` file via GeoTools. ## Dependencies -Key libraries and their roles: -- **GeoTools 30.0**: Shapefile generation (`gt-shapefile`, `gt-referencing`, `gt-epsg-hsql`) +Key libraries: +- **Spring Boot 3.5.7**: Framework (DI, JDBC, web for RestTemplate) +- **GeoTools 30.0**: Shapefile and GeoJSON generation (`gt-shapefile`, `gt-referencing`, `gt-epsg-hsql`, `gt-geojson`) - **JTS 1.19.0**: Java Topology Suite for geometry representation - **PostGIS JDBC 2.5.1**: PostgreSQL spatial extension support -- **Spring Boot 3.5.7**: Framework for DI, JDBC, and configuration +- **PostgreSQL JDBC Driver**: Database connectivity +- **HikariCP**: Connection pooling -Note: `javax.media:jai_core` is excluded in `build.gradle` to avoid conflicts. +**Important**: `javax.media:jai_core` is globally excluded in `build.gradle` to avoid conflicts with GeoTools. diff --git a/shp-exporter/README.md b/shp-exporter/README.md index 87c6a2f..8233fcb 100755 --- a/shp-exporter/README.md +++ b/shp-exporter/README.md @@ -85,7 +85,7 @@ You can override configuration values using command line arguments: **Using JAR (zsh shell - quote arguments with brackets):** ```bash -java -jar build/libs/makesample-1.0.0.jar \ +java -jar build/libs/shp-exporter.jar \ '--converter.inference-id=D5E46F60FC40B1A8BE0CD1F3547AA6' \ '--converter.map-ids[0]=35813030' \ '--converter.batch-ids[0]=252' \ @@ -97,7 +97,7 @@ java -jar build/libs/makesample-1.0.0.jar \ **Using JAR (bash shell - no quotes needed):** ```bash -java -jar build/libs/makesample-1.0.0.jar \ +java -jar build/libs/shp-exporter.jar \ --converter.inference-id=D5E46F60FC40B1A8BE0CD1F3547AA6 \ --converter.map-ids[0]=35813030 \ --converter.batch-ids[0]=252 \ @@ -116,7 +116,32 @@ java -jar build/libs/makesample-1.0.0.jar \ ## Running -### Generate Shapefiles +### Generate Shapefiles (Spring Batch Mode - Recommended) + +**New in v1.1.0**: Spring Batch mode provides memory-optimized processing for large datasets. + +```bash +# MERGED mode (creates single shapefile + GeoJSON for all batch-ids) +./gradlew bootRun --args="--batch --converter.batch-ids[0]=252 --converter.batch-ids[1]=253" + +# With GeoServer registration +./gradlew bootRun --args="--batch --geoserver.enabled=true --converter.batch-ids[0]=252" +``` + +**Output Files** (in `{output-base-dir}/{inference-id}/merge/`): +- `{inference-id}.shp` (+ .shx, .dbf, .prj) - Shapefile +- `{inference-id}.geojson` - GeoJSON file +- `{inference-id}.zip` - ZIP archive of shapefile + +**Benefits**: +- 90-95% memory reduction (2-13GB → 150-200MB for 1M records) +- Chunk-based streaming (1000 records per chunk) +- Restart capability after failures +- Step-by-step execution support + +See [claudedocs/SPRING_BATCH_MIGRATION.md](claudedocs/SPRING_BATCH_MIGRATION.md) for detailed documentation. + +### Generate Shapefiles (Legacy Mode) ```bash ./gradlew bootRun @@ -125,7 +150,7 @@ java -jar build/libs/makesample-1.0.0.jar \ Or run the JAR directly: ```bash -java -jar build/libs/makesample-1.0.0.jar +java -jar build/libs/shp-exporter.jar ``` ### Register Shapefile to GeoServer @@ -146,7 +171,7 @@ Then register a shapefile: Or using the JAR: ```bash -java -jar build/libs/makesample-1.0.0.jar \ +java -jar build/libs/shp-exporter.jar \ --upload-shp /path/to/shapefile.shp \ --layer layer_name ``` @@ -167,6 +192,7 @@ java -jar build/libs/makesample-1.0.0.jar \ ## Output +### Legacy Mode Output Shapefiles will be created in directories structured as `output-base-dir/inference-id/map-id/`: ``` @@ -177,9 +203,45 @@ Shapefiles will be created in directories structured as `output-base-dir/inferen └── 35813030.prj # Projection information ``` +### Spring Batch Mode Output +Output structure for MERGED mode (`output-base-dir/inference-id/merge/`): + +``` +/kamco-nfs/dataset/export/D5E46F60FC40B1A8BE0CD1F3547AA6/merge/ +├── D5E46F60FC40B1A8BE0CD1F3547AA6.shp # Shapefile geometry +├── D5E46F60FC40B1A8BE0CD1F3547AA6.shx # Shape index +├── D5E46F60FC40B1A8BE0CD1F3547AA6.dbf # Attribute data +├── D5E46F60FC40B1A8BE0CD1F3547AA6.prj # Projection information +├── D5E46F60FC40B1A8BE0CD1F3547AA6.geojson # GeoJSON format +└── D5E46F60FC40B1A8BE0CD1F3547AA6.zip # ZIP archive (for GeoServer) +``` + +**Note**: Only the shapefile (.shp and related files) are registered to GeoServer. GeoJSON files are generated for alternative consumption. + ## Database Query -The application executes the following query for each map_id: +### Spring Batch Mode (Recommended) + +The Spring Batch mode applies comprehensive validation to ensure data quality: + +```sql +ㄴㅅ +ORDER BY map_id, uid +``` + +**Validation Criteria**: +- **Geometry Type**: Only ST_Polygon and ST_MultiPolygon (excludes Point, LineString, etc.) +- **Coordinate System**: EPSG:5186 (Korean 2000 / Central Belt) +- **Coordinate Range**: Korea territory bounds (X: 125-530km, Y: -600-988km) +- **Geometry Validity**: Valid topology (ST_IsValid) + +Rows failing validation are automatically excluded from processing, ensuring clean shapefile generation. + +**Performance**: See [PERFORMANCE_OPTIMIZATION.md](claudedocs/PERFORMANCE_OPTIMIZATION.md) for indexing recommendations. + +### Legacy Mode + +Legacy mode uses a simpler query without validation: ```sql SELECT uid, map_id, probability, before_year, after_year, @@ -278,8 +340,26 @@ The project uses Google Java Format with 2-space indentation: ``` src/main/java/com/kamco/makesample/ ├── MakeSampleApplication.java # Main application class +├── batch/ # Spring Batch components (v1.1.0+) +│ ├── config/ +│ │ ├── BatchConfiguration.java # Spring Batch configuration +│ │ └── MergedModeJobConfig.java # MERGED mode Job definition +│ ├── processor/ +│ │ └── FeatureConversionProcessor.java # InferenceResult → SimpleFeature processor +│ ├── reader/ +│ │ ├── GeometryConvertingRowMapper.java # WKT → JTS converter +│ │ └── InferenceResultItemReaderConfig.java # Cursor-based DB reader +│ ├── tasklet/ +│ │ ├── CreateZipTasklet.java # ZIP creation tasklet +│ │ ├── GeoServerRegistrationTasklet.java # GeoServer registration tasklet +│ │ └── GeometryTypeValidationTasklet.java # Geometry validation tasklet +│ ├── util/ +│ │ └── FeatureTypeFactory.java # Shared feature type creation +│ └── writer/ +│ ├── StreamingGeoJsonWriter.java # Streaming GeoJSON writer +│ └── StreamingShapefileWriter.java # Streaming shapefile writer ├── cli/ -│ └── ConverterCommandLineRunner.java # CLI entry point +│ └── ConverterCommandLineRunner.java # CLI entry point (batch + legacy) ├── config/ │ ├── ConverterProperties.java # Shapefile converter configuration │ ├── GeoServerProperties.java # GeoServer configuration @@ -293,14 +373,14 @@ src/main/java/com/kamco/makesample/ ├── model/ │ └── InferenceResult.java # Domain model ├── repository/ -│ └── InferenceResultRepository.java # Data access layer +│ └── InferenceResultRepository.java # Data access layer (legacy) ├── service/ │ ├── GeometryConverter.java # PostGIS to JTS conversion -│ ├── ShapefileConverterService.java # Orchestration service +│ ├── ShapefileConverterService.java # Orchestration service (legacy) │ └── GeoServerRegistrationService.java # GeoServer REST API integration └── writer/ - ├── ShapefileWriter.java # GeoTools shapefile writer - └── GeoJsonWriter.java # GeoJSON export writer + ├── ShapefileWriter.java # GeoTools shapefile writer (legacy) + └── GeoJsonWriter.java # GeoJSON export writer (legacy) ``` ## Dependencies @@ -308,6 +388,7 @@ src/main/java/com/kamco/makesample/ - Spring Boot 3.5.7 - spring-boot-starter - spring-boot-starter-jdbc + - spring-boot-starter-batch (v1.1.0+) - spring-boot-starter-web (for RestTemplate) - spring-boot-starter-validation (for @NotBlank annotations) - GeoTools 30.0 @@ -383,6 +464,57 @@ SELECT COUNT(*) FROM inference_results_testing WHERE batch_id IN (252, 253, 257) AND map_id = '35813030'; ``` +## Batch Execution History + +### Overview + +Spring Batch mode automatically tracks execution history for each step, recording: +- Start time, end time, duration +- Success/failure status +- Error messages and stack traces (if failed) +- Processing statistics (read/write/commit/rollback/skip counts) + +### Table Setup + +Create the `batch_execution_history` table before running batch jobs: + +```bash +psql -h 192.168.2.127 -p 15432 -U kamco_cds -d kamco_cds \ + -f src/main/resources/db/migration/V1__create_batch_execution_history.sql +``` + +### Query Examples + +**View execution history for a specific job**: +```sql +SELECT step_name, start_time, end_time, duration_ms, status, read_count, write_count +FROM batch_execution_history +WHERE job_execution_id = 123 +ORDER BY start_time; +``` + +**Check failed steps**: +```sql +SELECT job_execution_id, step_name, start_time, error_message +FROM batch_execution_history +WHERE status = 'FAILED' +ORDER BY start_time DESC +LIMIT 10; +``` + +**Average step duration**: +```sql +SELECT step_name, + COUNT(*) as executions, + ROUND(AVG(duration_ms) / 1000.0, 2) as avg_duration_sec +FROM batch_execution_history +WHERE status = 'COMPLETED' +GROUP BY step_name +ORDER BY avg_duration_sec DESC; +``` + +For more query examples and detailed documentation, see [BATCH_EXECUTION_HISTORY.md](claudedocs/BATCH_EXECUTION_HISTORY.md). + ## License KAMCO Internal Use Only diff --git a/shp-exporter/build.gradle b/shp-exporter/build.gradle index b734704..5399294 100755 --- a/shp-exporter/build.gradle +++ b/shp-exporter/build.gradle @@ -49,6 +49,7 @@ dependencies { implementation 'org.springframework.boot:spring-boot-starter-jdbc' implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-validation' + implementation 'org.springframework.boot:spring-boot-starter-batch' // Database implementation 'org.postgresql:postgresql' diff --git a/shp-exporter/build/classes/java/main/com/kamco/makesample/cli/ConverterCommandLineRunner.class b/shp-exporter/build/classes/java/main/com/kamco/makesample/cli/ConverterCommandLineRunner.class index b449731..fe2a4d7 100644 Binary files a/shp-exporter/build/classes/java/main/com/kamco/makesample/cli/ConverterCommandLineRunner.class and b/shp-exporter/build/classes/java/main/com/kamco/makesample/cli/ConverterCommandLineRunner.class differ diff --git a/shp-exporter/build/classes/java/main/com/kamco/makesample/config/ConverterProperties.class b/shp-exporter/build/classes/java/main/com/kamco/makesample/config/ConverterProperties.class index 991fbcb..e389323 100644 Binary files a/shp-exporter/build/classes/java/main/com/kamco/makesample/config/ConverterProperties.class and b/shp-exporter/build/classes/java/main/com/kamco/makesample/config/ConverterProperties.class differ diff --git a/shp-exporter/build/classes/java/main/com/kamco/makesample/config/GeoServerProperties.class b/shp-exporter/build/classes/java/main/com/kamco/makesample/config/GeoServerProperties.class index 804fb8e..ef0b846 100644 Binary files a/shp-exporter/build/classes/java/main/com/kamco/makesample/config/GeoServerProperties.class and b/shp-exporter/build/classes/java/main/com/kamco/makesample/config/GeoServerProperties.class differ diff --git a/shp-exporter/build/classes/java/main/com/kamco/makesample/service/GeometryConverter.class b/shp-exporter/build/classes/java/main/com/kamco/makesample/service/GeometryConverter.class index b987073..47401e5 100644 Binary files a/shp-exporter/build/classes/java/main/com/kamco/makesample/service/GeometryConverter.class and b/shp-exporter/build/classes/java/main/com/kamco/makesample/service/GeometryConverter.class differ diff --git a/shp-exporter/build/classes/java/main/com/kamco/makesample/service/ShapefileConverterService.class b/shp-exporter/build/classes/java/main/com/kamco/makesample/service/ShapefileConverterService.class index 1c94970..d753643 100644 Binary files a/shp-exporter/build/classes/java/main/com/kamco/makesample/service/ShapefileConverterService.class and b/shp-exporter/build/classes/java/main/com/kamco/makesample/service/ShapefileConverterService.class differ diff --git a/shp-exporter/build/libs/shp-exporter.jar b/shp-exporter/build/libs/shp-exporter.jar index 65670d5..368531d 100644 Binary files a/shp-exporter/build/libs/shp-exporter.jar and b/shp-exporter/build/libs/shp-exporter.jar differ diff --git a/shp-exporter/build/reports/problems/problems-report.html b/shp-exporter/build/reports/problems/problems-report.html index 8b9c943..55fb4ed 100644 --- a/shp-exporter/build/reports/problems/problems-report.html +++ b/shp-exporter/build/reports/problems/problems-report.html @@ -650,7 +650,7 @@ code + .copy-button { diff --git a/shp-exporter/build/resources/main/application-dev.yml b/shp-exporter/build/resources/main/application-dev.yml old mode 100644 new mode 100755 index 6c92ce3..05583c9 --- a/shp-exporter/build/resources/main/application-dev.yml +++ b/shp-exporter/build/resources/main/application-dev.yml @@ -10,12 +10,6 @@ spring: idle-timeout: 600000 max-lifetime: 1800000 - application: - name: make-shapefile-service - - main: - web-application-type: none # Disable web server for CLI application - converter: inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6 # Optional: omit or set empty to create merged shapefile for all batch-ids @@ -45,7 +39,3 @@ logging: org.springframework: WARN pattern: console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n' - -layer: - geoserver-url: http://label-tile.gs.dabeeo.com - workspace: cd diff --git a/shp-exporter/build/resources/main/application-local.yml b/shp-exporter/build/resources/main/application-local.yml old mode 100644 new mode 100755 diff --git a/shp-exporter/build/resources/main/application-prod.yml b/shp-exporter/build/resources/main/application-prod.yml old mode 100644 new mode 100755 index 3e1665d..4bfd289 --- a/shp-exporter/build/resources/main/application-prod.yml +++ b/shp-exporter/build/resources/main/application-prod.yml @@ -5,16 +5,17 @@ spring: password: kamco_cds_Q!W@E#R$ driver-class-name: org.postgresql.Driver hikari: - maximum-pool-size: 5 + maximum-pool-size: 10 # Increased for batch processing connection-timeout: 30000 idle-timeout: 600000 max-lifetime: 1800000 - application: - name: make-shapefile-service - - main: - web-application-type: none # Disable web server for CLI application + batch: + job: + enabled: false # CLI에서 명시적으로 실행 + jdbc: + initialize-schema: always # 메타데이터 테이블 자동 생성 + table-prefix: BATCH_ converter: inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6 @@ -27,15 +28,19 @@ converter: output-base-dir: '/data/model_output/export/' crs: 'EPSG:5186' + batch: + chunk-size: 1000 # 청크 크기 (메모리 ~40MB per chunk) + skip-limit: 100 # 청크당 skip 허용 건수 + fetch-size: 1000 # JDBC 커서 fetch 크기 + enable-partitioning: false # 초기에는 비활성화 + partition-concurrency: 4 # Map ID별 병렬 처리 동시성 (4=~300MB, 8=~600MB) + geoserver: - base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver' + base-url: 'https://aicd-geo.e-kamco.com:18080/geoserver' workspace: 'cd' overwrite-existing: true connection-timeout: 30000 read-timeout: 60000 - # Credentials (optional - environment variables take precedence) - # Uncomment and set values for development convenience - # For production, use GEOSERVER_USERNAME and GEOSERVER_PASSWORD environment variables username: 'admin' password: 'geoserver' @@ -45,9 +50,3 @@ logging: org.springframework: WARN pattern: console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n' - -layer: - geoserver-url: https://kamco.geo-dev.gs.dabeeo.com - wms-path: geoserver/cd - wmts-path: geoserver/cd/gwc/service - workspace: cd diff --git a/shp-exporter/build/resources/main/application.yml b/shp-exporter/build/resources/main/application.yml index 65d113d..d35f2e3 100755 --- a/shp-exporter/build/resources/main/application.yml +++ b/shp-exporter/build/resources/main/application.yml @@ -3,3 +3,5 @@ spring: name: make-shapefile-service profiles: active: prod + main: + web-application-type: none # Disable web server for CLI application diff --git a/shp-exporter/build/tmp/compileJava/previous-compilation-data.bin b/shp-exporter/build/tmp/compileJava/previous-compilation-data.bin index 1a25040..11066ae 100644 Binary files a/shp-exporter/build/tmp/compileJava/previous-compilation-data.bin and b/shp-exporter/build/tmp/compileJava/previous-compilation-data.bin differ diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/config/BatchConfiguration.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/config/BatchConfiguration.java new file mode 100644 index 0000000..d72d0f6 --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/config/BatchConfiguration.java @@ -0,0 +1,72 @@ +package com.kamco.makesample.batch.config; + +import com.kamco.makesample.config.ConverterProperties; +import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +/** + * Spring Batch 기본 설정 + * + *

Spring Boot Auto-configuration이 다음을 자동으로 설정: - JobRepository (BATCH_* 테이블 사용) - JobLauncher - + * PlatformTransactionManager + * + *

메타데이터 테이블: - BATCH_JOB_INSTANCE - BATCH_JOB_EXECUTION - BATCH_JOB_EXECUTION_PARAMS - + * BATCH_STEP_EXECUTION - BATCH_STEP_EXECUTION_CONTEXT - BATCH_JOB_EXECUTION_CONTEXT + */ +@Configuration +@EnableBatchProcessing +public class BatchConfiguration { + + private final ConverterProperties properties; + + public BatchConfiguration(ConverterProperties properties) { + this.properties = properties; + } + + /** + * Spring Boot Auto-configuration이 자동으로 설정하는 내용: + * + *

1. JobRepository: DataSource를 사용하여 BATCH_* 테이블에 메타데이터 저장 + * + *

2. JobLauncher: CLI에서 Job 실행을 위한 런처 + * + *

3. TransactionManager: DataSource에 대한 트랜잭션 관리 + * + *

추가 설정이 필요하면 여기에 Bean을 정의 + */ + + /** + * Map ID별 파일 생성을 위한 TaskExecutor + * + *

병렬 처리를 통해 각 map_id별 shapefile/geojson을 동시에 생성합니다. + * + *

설정: + * + *

+ * + * @return TaskExecutor for partitioned step + */ + @Bean + public TaskExecutor mapIdTaskExecutor() { + int concurrency = properties.getBatch().getPartitionConcurrency(); + + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(concurrency); + executor.setMaxPoolSize(concurrency * 2); + executor.setQueueCapacity(50); + executor.setThreadNamePrefix("mapid-worker-"); + executor.setWaitForTasksToCompleteOnShutdown(true); + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + + return executor; + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/config/MergedModeJobConfig.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/config/MergedModeJobConfig.java new file mode 100644 index 0000000..03899a2 --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/config/MergedModeJobConfig.java @@ -0,0 +1,289 @@ +package com.kamco.makesample.batch.config; + +import com.kamco.makesample.batch.listener.BatchExecutionHistoryListener; +import com.kamco.makesample.batch.partitioner.MapIdPartitioner; +import com.kamco.makesample.batch.processor.FeatureConversionProcessor; +import com.kamco.makesample.batch.tasklet.CreateZipTasklet; +import com.kamco.makesample.batch.tasklet.GeoServerRegistrationTasklet; +import com.kamco.makesample.batch.tasklet.GeometryTypeValidationTasklet; +import com.kamco.makesample.batch.writer.MapIdGeoJsonWriter; +import com.kamco.makesample.batch.writer.MapIdShapefileWriter; +import com.kamco.makesample.batch.writer.StreamingGeoJsonWriter; +import com.kamco.makesample.batch.writer.StreamingShapefileWriter; +import com.kamco.makesample.model.InferenceResult; +import java.util.Arrays; +import org.geotools.api.feature.simple.SimpleFeature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.Step; +import org.springframework.batch.core.job.builder.JobBuilder; +import org.springframework.batch.core.repository.JobRepository; +import org.springframework.batch.core.step.builder.StepBuilder; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.support.CompositeItemWriter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskExecutor; +import org.springframework.transaction.PlatformTransactionManager; + +/** + * MERGED 모드 Job 설정 + * + *

전체 batch_ids에 대한 데이터를 하나의 shapefile과 GeoJSON으로 생성하고, merge 폴더의 shapefile만 GeoServer에 등록 + * + *

Job Flow: + * + *

    + *
  1. validateGeometryTypeStep: Geometry type 사전 검증 (Tasklet) + *
  2. generateShapefileStep: Shapefile 생성 (Chunk-oriented) + *
  3. generateGeoJsonStep: GeoJSON 생성 (Chunk-oriented) + *
  4. createZipStep: ZIP 파일 생성 (Tasklet) + *
  5. registerToGeoServerStep: GeoServer 등록 - merge 폴더의 shapefile만 (Tasklet, conditional) + *
  6. generateMapIdFilesStep: Map ID별 개별 shapefile/geojson 생성 (Partitioned, parallel) + *
+ */ +@Configuration +public class MergedModeJobConfig { + + private static final Logger log = LoggerFactory.getLogger(MergedModeJobConfig.class); + + /** + * MERGED 모드 Job 정의 + * + * @param jobRepository JobRepository + * @param validateGeometryTypeStep Geometry type 검증 Step + * @param generateShapefileStep Shapefile 생성 Step + * @param generateGeoJsonStep GeoJSON 생성 Step + * @param createZipStep ZIP 생성 Step + * @param registerToGeoServerStep GeoServer 등록 Step (merge 폴더의 shapefile만) + * @param generateMapIdFilesStep Map ID별 파일 생성 Step (병렬 처리) + * @return Job + */ + @Bean + public Job mergedModeJob( + JobRepository jobRepository, + Step validateGeometryTypeStep, + Step generateShapefileStep, + Step generateGeoJsonStep, + Step createZipStep, + Step registerToGeoServerStep, + Step generateMapIdFilesStep) { + + return new JobBuilder("mergedModeJob", jobRepository) + .start(validateGeometryTypeStep) + .next(generateShapefileStep) + .next(generateGeoJsonStep) + .next(createZipStep) + .next(registerToGeoServerStep) // Conditional execution + .next(generateMapIdFilesStep) // Map ID별 개별 파일 생성 + .build(); + } + + /** + * Step 1: Geometry Type 검증 + * + *

Shapefile은 homogeneous geometry type을 요구하므로 사전 검증 + * + * @param jobRepository JobRepository + * @param transactionManager TransactionManager + * @param validationTasklet GeometryTypeValidationTasklet + * @param historyListener BatchExecutionHistoryListener + * @return Step + */ + @Bean + public Step validateGeometryTypeStep( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + GeometryTypeValidationTasklet validationTasklet, + BatchExecutionHistoryListener historyListener) { + + return new StepBuilder("validateGeometryTypeStep", jobRepository) + .tasklet(validationTasklet, transactionManager) + .listener(historyListener) + .build(); + } + + /** + * Step 2: Shapefile 생성 (Chunk-oriented) + * + *

메모리 최적화: + * + *

+ * + * @param jobRepository JobRepository + * @param transactionManager TransactionManager + * @param shapefileReader ItemReader (Shapefile용) + * @param featureConversionProcessor ItemProcessor + * @param shapefileWriter ItemWriter + * @param chunkSize Chunk size (default: 1000) + * @param historyListener BatchExecutionHistoryListener + * @return Step + */ + @Bean + public Step generateShapefileStep( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + JdbcCursorItemReader shapefileReader, + FeatureConversionProcessor featureConversionProcessor, + StreamingShapefileWriter shapefileWriter, + @Value("${converter.batch.chunk-size:1000}") int chunkSize, + BatchExecutionHistoryListener historyListener) { + + return new StepBuilder("generateShapefileStep", jobRepository) + .chunk(chunkSize, transactionManager) + .reader(shapefileReader) + .processor(featureConversionProcessor) + .writer(shapefileWriter) + .listener(historyListener) + .build(); + } + + /** + * Step 3: GeoJSON 생성 (Chunk-oriented) + * + *

Shapefile과 동일한 데이터를 GeoJSON 형식으로 출력 + * + * @param jobRepository JobRepository + * @param transactionManager TransactionManager + * @param geoJsonReader ItemReader (GeoJSON용 - 별도 인스턴스) + * @param featureConversionProcessor ItemProcessor (재사용) + * @param geoJsonWriter ItemWriter + * @param chunkSize Chunk size + * @param historyListener BatchExecutionHistoryListener + * @return Step + */ + @Bean + public Step generateGeoJsonStep( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + JdbcCursorItemReader geoJsonReader, + FeatureConversionProcessor featureConversionProcessor, + StreamingGeoJsonWriter geoJsonWriter, + @Value("${converter.batch.chunk-size:1000}") int chunkSize, + BatchExecutionHistoryListener historyListener) { + + return new StepBuilder("generateGeoJsonStep", jobRepository) + .chunk(chunkSize, transactionManager) + .reader(geoJsonReader) + .processor(featureConversionProcessor) + .writer(geoJsonWriter) + .listener(historyListener) + .build(); + } + + /** + * Step 4: ZIP 파일 생성 + * + * @param jobRepository JobRepository + * @param transactionManager TransactionManager + * @param createZipTasklet CreateZipTasklet + * @param historyListener BatchExecutionHistoryListener + * @return Step + */ + @Bean + public Step createZipStep( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + CreateZipTasklet createZipTasklet, + BatchExecutionHistoryListener historyListener) { + + return new StepBuilder("createZipStep", jobRepository) + .tasklet(createZipTasklet, transactionManager) + .listener(historyListener) + .build(); + } + + /** + * Step 5: GeoServer 등록 (merge 폴더의 shapefile만) + * + *

Conditional execution: geoserver.enabled=true일 때만 실행 + * + * @param jobRepository JobRepository + * @param transactionManager TransactionManager + * @param registrationTasklet GeoServerRegistrationTasklet + * @param historyListener BatchExecutionHistoryListener + * @return Step + */ + @Bean + public Step registerToGeoServerStep( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + GeoServerRegistrationTasklet registrationTasklet, + BatchExecutionHistoryListener historyListener) { + + return new StepBuilder("registerToGeoServerStep", jobRepository) + .tasklet(registrationTasklet, transactionManager) + .listener(historyListener) + .build(); + } + + /** + * Step 6: Map ID별 개별 파일 생성 (Partitioned Step) + * + *

각 map_id마다 개별 shapefile과 geojson 파일을 병렬로 생성합니다. + * + * @param jobRepository JobRepository + * @param partitioner MapIdPartitioner + * @param mapIdWorkerStep Worker Step (각 파티션에서 실행) + * @param mapIdTaskExecutor TaskExecutor for parallel execution + * @return Partitioned Step + */ + @Bean + public Step generateMapIdFilesStep( + JobRepository jobRepository, + MapIdPartitioner partitioner, + Step mapIdWorkerStep, + TaskExecutor mapIdTaskExecutor) { + + return new StepBuilder("generateMapIdFilesStep", jobRepository) + .partitioner("mapIdWorker", partitioner) + .step(mapIdWorkerStep) + .taskExecutor(mapIdTaskExecutor) + .build(); + } + + /** + * Worker Step: Map ID별 파일 생성 작업 + * + *

각 파티션에서 실행되며, 해당 map_id의 데이터를 읽어 shapefile과 geojson을 동시에 생성합니다. + * + * @param jobRepository JobRepository + * @param transactionManager TransactionManager + * @param mapIdModeReader ItemReader (map_id별) + * @param featureConversionProcessor ItemProcessor + * @param mapIdShapefileWriter Shapefile Writer + * @param mapIdGeoJsonWriter GeoJSON Writer + * @param chunkSize Chunk size + * @param historyListener BatchExecutionHistoryListener + * @return Worker Step + */ + @Bean + public Step mapIdWorkerStep( + JobRepository jobRepository, + PlatformTransactionManager transactionManager, + JdbcCursorItemReader mapIdModeReader, + FeatureConversionProcessor featureConversionProcessor, + MapIdShapefileWriter mapIdShapefileWriter, + MapIdGeoJsonWriter mapIdGeoJsonWriter, + @Value("${converter.batch.chunk-size:1000}") int chunkSize, + BatchExecutionHistoryListener historyListener) { + + // CompositeItemWriter로 shapefile과 geojson 동시 생성 + CompositeItemWriter compositeWriter = new CompositeItemWriter<>(); + compositeWriter.setDelegates(Arrays.asList(mapIdShapefileWriter, mapIdGeoJsonWriter)); + + return new StepBuilder("mapIdWorkerStep", jobRepository) + .chunk(chunkSize, transactionManager) + .reader(mapIdModeReader) + .processor(featureConversionProcessor) + .writer(compositeWriter) + .listener(historyListener) + .build(); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/listener/BatchExecutionHistoryListener.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/listener/BatchExecutionHistoryListener.java new file mode 100644 index 0000000..4d4083d --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/listener/BatchExecutionHistoryListener.java @@ -0,0 +1,183 @@ +package com.kamco.makesample.batch.listener; + +import com.kamco.makesample.batch.model.BatchExecutionHistory; +import com.kamco.makesample.batch.repository.BatchExecutionHistoryRepository; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.time.LocalDateTime; +import java.util.List; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.ExitStatus; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.StepExecutionListener; +import org.springframework.stereotype.Component; + +/** + * 배치 실행 이력 추적 Listener + * + *

각 스텝의 시작/종료 시점에 실행 이력을 데이터베이스에 저장 + * + *

기록 항목: + * + *

+ */ +@Component +public class BatchExecutionHistoryListener implements StepExecutionListener { + + private static final Logger log = LoggerFactory.getLogger(BatchExecutionHistoryListener.class); + + private final BatchExecutionHistoryRepository historyRepository; + + // ThreadLocal로 각 스텝별 이력 ID 저장 + private final ThreadLocal historyIdHolder = new ThreadLocal<>(); + + public BatchExecutionHistoryListener(BatchExecutionHistoryRepository historyRepository) { + this.historyRepository = historyRepository; + } + + @Override + public void beforeStep(StepExecution stepExecution) { + try { + // 배치 실행 이력 생성 + BatchExecutionHistory history = new BatchExecutionHistory(); + history.setJobExecutionId(stepExecution.getJobExecutionId()); + history.setStepExecutionId(stepExecution.getId()); + history.setStepName(stepExecution.getStepName()); + history.setStartTime(LocalDateTime.now()); + history.setStatus("STARTED"); + + // Job Parameters에서 batch_ids, inference_id 추출 + String batchIds = stepExecution.getJobParameters().getString("batchIds"); + String inferenceId = stepExecution.getJobParameters().getString("inferenceId"); + history.setBatchIds(batchIds); + history.setInferenceId(inferenceId); + + // Step 타입 추정 (Tasklet vs Chunk) + String stepType = estimateStepType(stepExecution.getStepName()); + history.setStepType(stepType); + + // 이력 저장 + Long historyId = historyRepository.insert(history); + historyIdHolder.set(historyId); + + log.debug( + "Step execution history created: id={}, step={}, jobExecutionId={}", + historyId, + stepExecution.getStepName(), + stepExecution.getJobExecutionId()); + + } catch (Exception e) { + log.error("Failed to save step execution history on start: {}", e.getMessage(), e); + } + } + + @Override + public ExitStatus afterStep(StepExecution stepExecution) { + try { + Long historyId = historyIdHolder.get(); + if (historyId == null) { + log.warn("No history ID found for step: {}", stepExecution.getStepName()); + return stepExecution.getExitStatus(); + } + + // 종료 시간 및 상태 + LocalDateTime endTime = LocalDateTime.now(); + LocalDateTime startTime = + stepExecution.getStartTime() != null ? stepExecution.getStartTime() : LocalDateTime.now(); + + String status = stepExecution.getStatus().toString(); + String exitCode = stepExecution.getExitStatus().getExitCode(); + String exitMessage = stepExecution.getExitStatus().getExitDescription(); + + // 에러 정보 추출 + String errorMessage = null; + String errorStackTrace = null; + + List failureExceptions = stepExecution.getFailureExceptions(); + if (!failureExceptions.isEmpty()) { + Throwable firstException = failureExceptions.get(0); + errorMessage = firstException.getMessage(); + errorStackTrace = getStackTrace(firstException); + } + + // 처리 통계 (Chunk 기반 스텝용) + Long readCount = (long) stepExecution.getReadCount(); + Long writeCount = (long) stepExecution.getWriteCount(); + Long commitCount = (long) stepExecution.getCommitCount(); + Long rollbackCount = (long) stepExecution.getRollbackCount(); + Long skipCount = (long) stepExecution.getSkipCount(); + + // 이력 업데이트 + historyRepository.updateOnCompletion( + historyId, + endTime, + startTime, + status, + exitCode, + exitMessage, + errorMessage, + errorStackTrace, + readCount, + writeCount, + commitCount, + rollbackCount, + skipCount); + + log.debug( + "Step execution history updated: id={}, step={}, status={}, duration={}ms", + historyId, + stepExecution.getStepName(), + status, + java.time.Duration.between(startTime, endTime).toMillis()); + + // ThreadLocal 정리 + historyIdHolder.remove(); + + } catch (Exception e) { + log.error("Failed to update step execution history on completion: {}", e.getMessage(), e); + } + + return stepExecution.getExitStatus(); + } + + /** + * Step 이름으로 Step 타입 추정 + * + * @param stepName Step 이름 + * @return TASKLET 또는 CHUNK + */ + private String estimateStepType(String stepName) { + // Tasklet 스텝들 + if (stepName.contains("validate") + || stepName.contains("Zip") + || stepName.contains("GeoServer")) { + return "TASKLET"; + } + // Chunk 스텝들 + if (stepName.contains("generate") + || stepName.contains("Shapefile") + || stepName.contains("GeoJson")) { + return "CHUNK"; + } + return "UNKNOWN"; + } + + /** + * Exception을 스택 트레이스 문자열로 변환 + * + * @param throwable Exception + * @return 스택 트레이스 문자열 + */ + private String getStackTrace(Throwable throwable) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + throwable.printStackTrace(pw); + return sw.toString(); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/model/BatchExecutionHistory.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/model/BatchExecutionHistory.java new file mode 100644 index 0000000..4107cb7 --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/model/BatchExecutionHistory.java @@ -0,0 +1,212 @@ +package com.kamco.makesample.batch.model; + +import java.time.LocalDateTime; + +/** + * 배치 실행 이력 엔티티 + * + *

각 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유를 추적 + */ +public class BatchExecutionHistory { + + private Long id; + private Long jobExecutionId; + private Long stepExecutionId; + private String stepName; + private String stepType; + private LocalDateTime startTime; + private LocalDateTime endTime; + private Long durationMs; + private String status; + private String exitCode; + private String exitMessage; + private String errorMessage; + private String errorStackTrace; + private Long readCount; + private Long writeCount; + private Long commitCount; + private Long rollbackCount; + private Long skipCount; + private String batchIds; + private String inferenceId; + private LocalDateTime createdAt; + private LocalDateTime updatedAt; + + // Getters and Setters + + public Long getId() { + return id; + } + + public void setId(Long id) { + this.id = id; + } + + public Long getJobExecutionId() { + return jobExecutionId; + } + + public void setJobExecutionId(Long jobExecutionId) { + this.jobExecutionId = jobExecutionId; + } + + public Long getStepExecutionId() { + return stepExecutionId; + } + + public void setStepExecutionId(Long stepExecutionId) { + this.stepExecutionId = stepExecutionId; + } + + public String getStepName() { + return stepName; + } + + public void setStepName(String stepName) { + this.stepName = stepName; + } + + public String getStepType() { + return stepType; + } + + public void setStepType(String stepType) { + this.stepType = stepType; + } + + public LocalDateTime getStartTime() { + return startTime; + } + + public void setStartTime(LocalDateTime startTime) { + this.startTime = startTime; + } + + public LocalDateTime getEndTime() { + return endTime; + } + + public void setEndTime(LocalDateTime endTime) { + this.endTime = endTime; + } + + public Long getDurationMs() { + return durationMs; + } + + public void setDurationMs(Long durationMs) { + this.durationMs = durationMs; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getExitCode() { + return exitCode; + } + + public void setExitCode(String exitCode) { + this.exitCode = exitCode; + } + + public String getExitMessage() { + return exitMessage; + } + + public void setExitMessage(String exitMessage) { + this.exitMessage = exitMessage; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + public String getErrorStackTrace() { + return errorStackTrace; + } + + public void setErrorStackTrace(String errorStackTrace) { + this.errorStackTrace = errorStackTrace; + } + + public Long getReadCount() { + return readCount; + } + + public void setReadCount(Long readCount) { + this.readCount = readCount; + } + + public Long getWriteCount() { + return writeCount; + } + + public void setWriteCount(Long writeCount) { + this.writeCount = writeCount; + } + + public Long getCommitCount() { + return commitCount; + } + + public void setCommitCount(Long commitCount) { + this.commitCount = commitCount; + } + + public Long getRollbackCount() { + return rollbackCount; + } + + public void setRollbackCount(Long rollbackCount) { + this.rollbackCount = rollbackCount; + } + + public Long getSkipCount() { + return skipCount; + } + + public void setSkipCount(Long skipCount) { + this.skipCount = skipCount; + } + + public String getBatchIds() { + return batchIds; + } + + public void setBatchIds(String batchIds) { + this.batchIds = batchIds; + } + + public String getInferenceId() { + return inferenceId; + } + + public void setInferenceId(String inferenceId) { + this.inferenceId = inferenceId; + } + + public LocalDateTime getCreatedAt() { + return createdAt; + } + + public void setCreatedAt(LocalDateTime createdAt) { + this.createdAt = createdAt; + } + + public LocalDateTime getUpdatedAt() { + return updatedAt; + } + + public void setUpdatedAt(LocalDateTime updatedAt) { + this.updatedAt = updatedAt; + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/partitioner/MapIdPartitioner.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/partitioner/MapIdPartitioner.java new file mode 100644 index 0000000..b06c9fe --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/partitioner/MapIdPartitioner.java @@ -0,0 +1,100 @@ +package com.kamco.makesample.batch.partitioner; + +import com.kamco.makesample.config.ConverterProperties; +import com.kamco.makesample.repository.InferenceResultRepository; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.partition.support.Partitioner; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.stereotype.Component; + +/** + * Map ID별 파티션 생성 + * + *

batch_ids로부터 고유한 map_id 목록을 조회하여 각 map_id마다 ExecutionContext를 생성합니다. 각 파티션은 독립적으로 shapefile과 + * geojson을 생성합니다. + */ +@Component +public class MapIdPartitioner implements Partitioner { + + private static final Logger log = LoggerFactory.getLogger(MapIdPartitioner.class); + + private final InferenceResultRepository repository; + private final ConverterProperties properties; + + public MapIdPartitioner(InferenceResultRepository repository, ConverterProperties properties) { + this.repository = repository; + this.properties = properties; + } + + @Override + public Map partition(int gridSize) { + List batchIds = properties.getBatchIds(); + String inferenceId = properties.getInferenceId(); + String outputBaseDir = properties.getOutputBaseDir(); + + log.info("Creating partitions for batch_ids: {}, inference_id: {}", batchIds, inferenceId); + + // batch_ids로 고유한 map_id 목록 조회 + List mapIds = repository.findMapIdByBatchIds(batchIds); + + if (mapIds.isEmpty()) { + log.warn("No map_ids found for batch_ids: {}", batchIds); + return new HashMap<>(); + } + + log.info("Found {} map_ids to partition: {}", mapIds.size(), mapIds); + + // 각 map_id마다 ExecutionContext 생성 + Map partitions = new HashMap<>(); + + for (String mapId : mapIds) { + ExecutionContext context = new ExecutionContext(); + + // 파티션별 파라미터 설정 + context.putString("mapId", mapId); + context.putString("outputPath", buildShapefilePath(outputBaseDir, inferenceId, mapId)); + context.putString("geoJsonOutputPath", buildGeoJsonPath(outputBaseDir, inferenceId, mapId)); + + partitions.put("partition-" + mapId, context); + + log.debug( + "Created partition for map_id: {}, shapefile: {}, geojson: {}", + mapId, + context.getString("outputPath"), + context.getString("geoJsonOutputPath")); + } + + log.info("Created {} partitions for parallel processing", partitions.size()); + + return partitions; + } + + /** + * Shapefile 출력 경로 생성 + * + * @param baseDir 기본 디렉토리 + * @param inferenceId Inference ID + * @param mapId Map ID + * @return Shapefile 경로 + */ + private String buildShapefilePath(String baseDir, String inferenceId, String mapId) { + return Paths.get(baseDir, inferenceId, mapId, mapId + ".shp").toString(); + } + + /** + * GeoJSON 출력 경로 생성 + * + * @param baseDir 기본 디렉토리 + * @param inferenceId Inference ID + * @param mapId Map ID + * @return GeoJSON 경로 + */ + private String buildGeoJsonPath(String baseDir, String inferenceId, String mapId) { + return Paths.get(baseDir, inferenceId, mapId, mapId + ".geojson").toString(); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java new file mode 100644 index 0000000..1b92b9c --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/processor/FeatureConversionProcessor.java @@ -0,0 +1,124 @@ +package com.kamco.makesample.batch.processor; + +import com.kamco.makesample.batch.util.FeatureTypeFactory; +import com.kamco.makesample.model.InferenceResult; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.api.feature.simple.SimpleFeatureType; +import org.geotools.api.referencing.FactoryException; +import org.geotools.api.referencing.crs.CoordinateReferenceSystem; +import org.geotools.feature.simple.SimpleFeatureBuilder; +import org.geotools.referencing.CRS; +import org.locationtech.jts.geom.Geometry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.ItemProcessor; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * InferenceResult → SimpleFeature 변환 Processor + * + *

기존 ShapefileWriter의 buildFeature 로직을 Processor로 분리 + * + *

주요 역할: + * + *

    + *
  • Geometry 검증 (null 체크, isValid 체크) + *
  • InferenceResult 필드를 SimpleFeature 속성으로 변환 + *
  • Invalid geometry는 skip (null 반환) + *
+ */ +@Component +@StepScope +public class FeatureConversionProcessor implements ItemProcessor { + + private static final Logger log = LoggerFactory.getLogger(FeatureConversionProcessor.class); + + private final FeatureTypeFactory featureTypeFactory; + + @Value("#{jobParameters['crs'] ?: 'EPSG:5186'}") + private String crsCode; + + private SimpleFeatureBuilder featureBuilder; + private SimpleFeatureType featureType; + + public FeatureConversionProcessor(FeatureTypeFactory featureTypeFactory) { + this.featureTypeFactory = featureTypeFactory; + } + + @BeforeStep + public void beforeStep(StepExecution stepExecution) { + // StepExecutionContext에서 geometry type 읽기 + String geomTypeStr = stepExecution.getExecutionContext().getString("geometryType"); + Class geometryType = featureTypeFactory.parseGeometryType(geomTypeStr); + + try { + // CRS 설정 + CoordinateReferenceSystem crs = CRS.decode(crsCode); + + // FeatureType 생성 + this.featureType = featureTypeFactory.createFeatureType(crs, geometryType); + this.featureBuilder = new SimpleFeatureBuilder(this.featureType); + + log.info( + "FeatureConversionProcessor initialized with geometry type: {}", + geometryType.getSimpleName()); + + } catch (FactoryException e) { + throw new RuntimeException("Failed to initialize FeatureConversionProcessor", e); + } + } + + @Override + public SimpleFeature process(InferenceResult result) throws Exception { + // Geometry 검증 + Geometry geometry = result.getGeometry(); + + if (geometry == null) { + log.warn("Null geometry detected for uid: {} - skipping", result.getUid()); + return null; // Skip this item + } + + if (!geometry.isValid()) { + log.warn( + "Invalid geometry detected for uid: {} - skipping. Reason: {}", + result.getUid(), + geometry.getGeometryType()); + return null; // Skip invalid geometry + } + + // SimpleFeature 빌드 + return buildFeature(result, geometry); + } + + /** + * InferenceResult를 SimpleFeature로 변환 + * + *

기존 ShapefileWriter.buildFeature() 로직과 동일 + * + * @param result InferenceResult + * @param geometry Geometry + * @return SimpleFeature + */ + private SimpleFeature buildFeature(InferenceResult result, Geometry geometry) { + // Geometry 추가 (the_geom) + featureBuilder.add(geometry); + + // 속성 필드 추가 + featureBuilder.add(result.getUid()); + featureBuilder.add(result.getMapId()); + featureBuilder.add( + result.getProbability() != null ? String.valueOf(result.getProbability()) : "0.0"); + featureBuilder.add(result.getBeforeYear() != null ? result.getBeforeYear() : 0L); + featureBuilder.add(result.getAfterYear() != null ? result.getAfterYear() : 0L); + featureBuilder.add(result.getBeforeC()); + featureBuilder.add(result.getBeforeP() != null ? String.valueOf(result.getBeforeP()) : "0.0"); + featureBuilder.add(result.getAfterC()); + featureBuilder.add(result.getAfterP() != null ? String.valueOf(result.getAfterP()) : "0.0"); + + return featureBuilder.buildFeature(null); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/GeometryConvertingRowMapper.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/GeometryConvertingRowMapper.java new file mode 100644 index 0000000..c3fe37a --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/GeometryConvertingRowMapper.java @@ -0,0 +1,56 @@ +package com.kamco.makesample.batch.reader; + +import com.kamco.makesample.model.InferenceResult; +import com.kamco.makesample.service.GeometryConverter; +import java.sql.ResultSet; +import java.sql.SQLException; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.stereotype.Component; + +/** + * ResultSet을 InferenceResult로 변환하는 RowMapper + * + *

기존 InferenceResultRepository의 InferenceResultRowMapper와 동일한 로직을 사용하지만, Spring Batch의 + * ItemReader와 함께 사용하도록 독립 Component로 분리 + */ +@Component +public class GeometryConvertingRowMapper implements RowMapper { + + private final GeometryConverter geometryConverter; + + public GeometryConvertingRowMapper(GeometryConverter geometryConverter) { + this.geometryConverter = geometryConverter; + } + + @Override + public InferenceResult mapRow(ResultSet rs, int rowNum) throws SQLException { + InferenceResult result = new InferenceResult(); + result.setUid(rs.getString("uid")); + result.setMapId(rs.getString("map_id")); + result.setProbability(getDoubleOrNull(rs, "probability")); + result.setBeforeYear(getLongOrNull(rs, "before_year")); + result.setAfterYear(getLongOrNull(rs, "after_year")); + result.setBeforeC(rs.getString("before_c")); + result.setBeforeP(getDoubleOrNull(rs, "before_p")); + result.setAfterC(rs.getString("after_c")); + result.setAfterP(getDoubleOrNull(rs, "after_p")); + + // WKT → JTS Geometry 변환 (per-record conversion) + String geometryWkt = rs.getString("geometry_wkt"); + if (geometryWkt != null) { + result.setGeometry(geometryConverter.convertWKTToJTS(geometryWkt)); + } + + return result; + } + + private Long getLongOrNull(ResultSet rs, String columnName) throws SQLException { + long value = rs.getLong(columnName); + return rs.wasNull() ? null : value; + } + + private Double getDoubleOrNull(ResultSet rs, String columnName) throws SQLException { + double value = rs.getDouble(columnName); + return rs.wasNull() ? null : value; + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/InferenceResultItemReaderConfig.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/InferenceResultItemReaderConfig.java new file mode 100644 index 0000000..fa77218 --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/reader/InferenceResultItemReaderConfig.java @@ -0,0 +1,221 @@ +package com.kamco.makesample.batch.reader; + +import com.kamco.makesample.model.InferenceResult; +import java.sql.Array; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import javax.sql.DataSource; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.database.JdbcCursorItemReader; +import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.PreparedStatementSetter; + +/** + * 커서 기반 ItemReader 설정 + * + *

메모리 최적화의 핵심: 전체 데이터를 List로 로딩하지 않고 커서를 사용하여 스트리밍 처리 + * + *

주요 특징: + * + *

    + *
  • fetch-size: 1000 → DB에서 1000건씩 가져옴 + *
  • cursor-based → 전체 ResultSet을 메모리에 로딩하지 않음 + *
  • PreparedStatement → PostgreSQL array 파라미터 처리 + *
  • EPSG:5186 좌표계 정합성 검증 (SRID, 좌표 범위, geometry 유효성) + *
+ */ +@Configuration +public class InferenceResultItemReaderConfig { + + // EPSG:5186 좌표계 정합성 검증 조건: + // - SRID = 5186 (한국 2000 / 중부 좌표계) + // - ST_IsValid() = true (geometry 유효성) + // - X 범위: 125,000 ~ 530,000m (동서 방향) + // - Y 범위: -600,000 ~ 988,000m (남북 방향) + // 위 조건을 만족하지 않는 잘못된 좌표의 polygon은 배치 대상에서 제외됨 + private static final String SQL_QUERY = + """ + SELECT uid, map_id, probability, before_year, after_year, + before_c, before_p, after_c, after_p, + ST_AsText(geometry) as geometry_wkt + FROM inference_results_testing + WHERE batch_id = ANY(?) + AND after_c IS NOT NULL + AND after_p IS NOT NULL + AND geometry IS NOT NULL + AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon') + AND ST_SRID(geometry) = 5186 + AND ST_IsValid(geometry) = true + AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000 + AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000 + ORDER BY map_id, uid + """; + + /** + * MERGED 모드용 ItemReader (Shapefile 생성용) + * + *

전체 batch_ids에 대한 데이터를 스트리밍으로 읽어옴 + * + * @param dataSource DataSource + * @param batchIdsParam Job Parameter로 전달받은 batch_ids (콤마 구분 문자열, 예: "252,253,257") + * @param fetchSize fetch size (기본 1000) + * @param rowMapper RowMapper + * @return JdbcCursorItemReader + */ + @Bean + @StepScope + public JdbcCursorItemReader shapefileReader( + DataSource dataSource, + @Value("#{jobParameters['batchIds']}") String batchIdsParam, + @Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize, + GeometryConvertingRowMapper rowMapper) { + + // JobParameter로 받은 "252,253,257" 형태를 Long 배열로 변환 + Long[] batchIds = parseBatchIds(batchIdsParam); + + return new JdbcCursorItemReaderBuilder() + .name("shapefileReader") + .dataSource(dataSource) + .sql(SQL_QUERY) + .fetchSize(fetchSize) // 메모리 효율을 위한 fetch size + .rowMapper(rowMapper) + .preparedStatementSetter( + new PreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps) throws SQLException { + // PostgreSQL array 파라미터 설정 + Connection conn = ps.getConnection(); + Array sqlArray = conn.createArrayOf("bigint", batchIds); + ps.setArray(1, sqlArray); + } + }) + .build(); + } + + /** + * MERGED 모드용 ItemReader (GeoJSON 생성용) + * + *

전체 batch_ids에 대한 데이터를 스트리밍으로 읽어옴 (Shapefile과 동일한 데이터) + * + * @param dataSource DataSource + * @param batchIdsParam Job Parameter로 전달받은 batch_ids (콤마 구분 문자열, 예: "252,253,257") + * @param fetchSize fetch size (기본 1000) + * @param rowMapper RowMapper + * @return JdbcCursorItemReader + */ + @Bean + @StepScope + public JdbcCursorItemReader geoJsonReader( + DataSource dataSource, + @Value("#{jobParameters['batchIds']}") String batchIdsParam, + @Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize, + GeometryConvertingRowMapper rowMapper) { + + // JobParameter로 받은 "252,253,257" 형태를 Long 배열로 변환 + Long[] batchIds = parseBatchIds(batchIdsParam); + + return new JdbcCursorItemReaderBuilder() + .name("geoJsonReader") + .dataSource(dataSource) + .sql(SQL_QUERY) + .fetchSize(fetchSize) // 메모리 효율을 위한 fetch size + .rowMapper(rowMapper) + .preparedStatementSetter( + new PreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps) throws SQLException { + // PostgreSQL array 파라미터 설정 + Connection conn = ps.getConnection(); + Array sqlArray = conn.createArrayOf("bigint", batchIds); + ps.setArray(1, sqlArray); + } + }) + .build(); + } + + /** + * MAP_IDS 모드용 ItemReader + * + *

특정 map_id에 대한 데이터만 읽어옴 + * + * @param dataSource DataSource + * @param batchIdsParam Job Parameter로 전달받은 batch_ids + * @param mapId Step Execution Context에서 전달받은 map_id + * @param fetchSize fetch size + * @param rowMapper RowMapper + * @return JdbcCursorItemReader + */ + @Bean + @StepScope + public JdbcCursorItemReader mapIdModeReader( + DataSource dataSource, + @Value("#{jobParameters['batchIds']}") String batchIdsParam, + @Value("#{stepExecutionContext['mapId']}") String mapId, + @Value("#{jobParameters['fetchSize'] ?: 1000}") int fetchSize, + GeometryConvertingRowMapper rowMapper) { + + Long[] batchIds = parseBatchIds(batchIdsParam); + + String sqlWithMapId = + """ + SELECT uid, map_id, probability, before_year, after_year, + before_c, before_p, after_c, after_p, + ST_AsText(geometry) as geometry_wkt + FROM inference_results_testing + WHERE batch_id = ANY(?) + AND map_id = ? + AND after_c IS NOT NULL + AND after_p IS NOT NULL + AND geometry IS NOT NULL + AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon') + AND ST_SRID(geometry) = 5186 + AND ST_IsValid(geometry) = true + AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000 + AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000 + ORDER BY uid + """; + + return new JdbcCursorItemReaderBuilder() + .name("mapIdModeReader") + .dataSource(dataSource) + .sql(sqlWithMapId) + .fetchSize(fetchSize) + .rowMapper(rowMapper) + .preparedStatementSetter( + new PreparedStatementSetter() { + @Override + public void setValues(PreparedStatement ps) throws SQLException { + Connection conn = ps.getConnection(); + Array sqlArray = conn.createArrayOf("bigint", batchIds); + ps.setArray(1, sqlArray); + ps.setString(2, mapId); + } + }) + .build(); + } + + /** + * JobParameter 문자열을 Long 배열로 변환 + * + * @param batchIdsParam "252,253,257" 형태의 문자열 + * @return Long 배열 + */ + private Long[] parseBatchIds(String batchIdsParam) { + if (batchIdsParam == null || batchIdsParam.trim().isEmpty()) { + throw new IllegalArgumentException("batchIds parameter is required"); + } + + String[] parts = batchIdsParam.split(","); + Long[] batchIds = new Long[parts.length]; + + for (int i = 0; i < parts.length; i++) { + batchIds[i] = Long.parseLong(parts[i].trim()); + } + + return batchIds; + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/repository/BatchExecutionHistoryRepository.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/repository/BatchExecutionHistoryRepository.java new file mode 100644 index 0000000..91ec00a --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/repository/BatchExecutionHistoryRepository.java @@ -0,0 +1,206 @@ +package com.kamco.makesample.batch.repository; + +import com.kamco.makesample.batch.model.BatchExecutionHistory; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.LocalDateTime; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.stereotype.Repository; + +/** + * 배치 실행 이력 Repository + * + *

스텝별 실행 이력을 데이터베이스에 저장하고 조회 + */ +@Repository +public class BatchExecutionHistoryRepository { + + private final JdbcTemplate jdbcTemplate; + + public BatchExecutionHistoryRepository(JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + } + + /** + * 스텝 시작 시 이력 생성 + * + * @param history 배치 실행 이력 + * @return 생성된 이력의 ID + */ + public Long insert(BatchExecutionHistory history) { + String sql = + """ + INSERT INTO batch_execution_history ( + job_execution_id, step_execution_id, step_name, step_type, + start_time, status, batch_ids, inference_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?) + RETURNING id + """; + + return jdbcTemplate.queryForObject( + sql, + Long.class, + history.getJobExecutionId(), + history.getStepExecutionId(), + history.getStepName(), + history.getStepType(), + Timestamp.valueOf(history.getStartTime()), + history.getStatus(), + history.getBatchIds(), + history.getInferenceId()); + } + + /** + * 스텝 종료 시 이력 업데이트 + * + * @param id 이력 ID + * @param endTime 종료 시간 + * @param status 상태 + * @param exitCode Exit Code + * @param exitMessage Exit Message + * @param errorMessage 에러 메시지 + * @param errorStackTrace 스택 트레이스 + * @param readCount Read Count + * @param writeCount Write Count + * @param commitCount Commit Count + * @param rollbackCount Rollback Count + * @param skipCount Skip Count + */ + public void updateOnCompletion( + Long id, + LocalDateTime endTime, + LocalDateTime startTime, + String status, + String exitCode, + String exitMessage, + String errorMessage, + String errorStackTrace, + Long readCount, + Long writeCount, + Long commitCount, + Long rollbackCount, + Long skipCount) { + + // 소요 시간 계산 + long durationMs = Duration.between(startTime, endTime).toMillis(); + + String sql = + """ + UPDATE batch_execution_history + SET end_time = ?, + duration_ms = ?, + status = ?, + exit_code = ?, + exit_message = ?, + error_message = ?, + error_stack_trace = ?, + read_count = ?, + write_count = ?, + commit_count = ?, + rollback_count = ?, + skip_count = ?, + updated_at = CURRENT_TIMESTAMP + WHERE id = ? + """; + + jdbcTemplate.update( + sql, + Timestamp.valueOf(endTime), + durationMs, + status, + exitCode, + exitMessage, + errorMessage, + errorStackTrace, + readCount, + writeCount, + commitCount, + rollbackCount, + skipCount, + id); + } + + /** + * Job Execution ID로 모든 스텝 이력 조회 + * + * @param jobExecutionId Job Execution ID + * @return 이력 목록 + */ + public java.util.List findByJobExecutionId(Long jobExecutionId) { + String sql = + """ + SELECT * FROM batch_execution_history + WHERE job_execution_id = ? + ORDER BY start_time + """; + + return jdbcTemplate.query( + sql, + (rs, rowNum) -> { + BatchExecutionHistory history = new BatchExecutionHistory(); + history.setId(rs.getLong("id")); + history.setJobExecutionId(rs.getLong("job_execution_id")); + history.setStepExecutionId(rs.getLong("step_execution_id")); + history.setStepName(rs.getString("step_name")); + history.setStepType(rs.getString("step_type")); + history.setStartTime(rs.getTimestamp("start_time").toLocalDateTime()); + + Timestamp endTimestamp = rs.getTimestamp("end_time"); + if (endTimestamp != null) { + history.setEndTime(endTimestamp.toLocalDateTime()); + } + + history.setDurationMs(rs.getLong("duration_ms")); + history.setStatus(rs.getString("status")); + history.setExitCode(rs.getString("exit_code")); + history.setExitMessage(rs.getString("exit_message")); + history.setErrorMessage(rs.getString("error_message")); + history.setReadCount(rs.getLong("read_count")); + history.setWriteCount(rs.getLong("write_count")); + history.setCommitCount(rs.getLong("commit_count")); + history.setRollbackCount(rs.getLong("rollback_count")); + history.setSkipCount(rs.getLong("skip_count")); + history.setBatchIds(rs.getString("batch_ids")); + history.setInferenceId(rs.getString("inference_id")); + return history; + }, + jobExecutionId); + } + + /** + * 최근 N개 실행 이력 조회 + * + * @param limit 조회 개수 + * @return 이력 목록 + */ + public java.util.List findRecent(int limit) { + String sql = + """ + SELECT * FROM batch_execution_history + ORDER BY start_time DESC + LIMIT ? + """; + + return jdbcTemplate.query( + sql, + (rs, rowNum) -> { + BatchExecutionHistory history = new BatchExecutionHistory(); + history.setId(rs.getLong("id")); + history.setJobExecutionId(rs.getLong("job_execution_id")); + history.setStepExecutionId(rs.getLong("step_execution_id")); + history.setStepName(rs.getString("step_name")); + history.setStartTime(rs.getTimestamp("start_time").toLocalDateTime()); + + Timestamp endTimestamp = rs.getTimestamp("end_time"); + if (endTimestamp != null) { + history.setEndTime(endTimestamp.toLocalDateTime()); + } + + history.setDurationMs(rs.getLong("duration_ms")); + history.setStatus(rs.getString("status")); + history.setExitCode(rs.getString("exit_code")); + return history; + }, + limit); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/CreateZipTasklet.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/CreateZipTasklet.java new file mode 100644 index 0000000..7a0d96d --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/CreateZipTasklet.java @@ -0,0 +1,59 @@ +package com.kamco.makesample.batch.tasklet; + +import com.kamco.makesample.writer.ResultZipWriter; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * ZIP 파일 생성 Tasklet + * + *

기존 ResultZipWriter를 재사용하여 shapefile 관련 파일들을 압축 + */ +@Component +@StepScope +public class CreateZipTasklet implements Tasklet { + + private static final Logger log = LoggerFactory.getLogger(CreateZipTasklet.class); + + @Value("#{jobParameters['outputPath']}") + private String outputPath; + + @Value("#{jobParameters['zipBaseName']}") + private String zipBaseName; + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) + throws Exception { + + log.info("Creating ZIP file for shapefile: {}", outputPath); + + // outputPath에서 디렉토리 추출 + Path shapefilePath = Paths.get(outputPath); + Path dirPath = shapefilePath.getParent(); + + // 기존 ResultZipWriter 재사용 + ResultZipWriter.createZip(dirPath, zipBaseName); + + log.info("ZIP file created successfully: {}.zip", zipBaseName); + + // ZIP 파일 경로를 JobExecutionContext에 저장 (GeoServer 등록에서 사용) + String zipPath = dirPath.resolve(zipBaseName + ".zip").toString(); + chunkContext + .getStepContext() + .getStepExecution() + .getJobExecution() + .getExecutionContext() + .putString("zipFilePath", zipPath); + + return RepeatStatus.FINISHED; + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeoServerRegistrationTasklet.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeoServerRegistrationTasklet.java new file mode 100644 index 0000000..7d14ccf --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeoServerRegistrationTasklet.java @@ -0,0 +1,72 @@ +package com.kamco.makesample.batch.tasklet; + +import com.kamco.makesample.service.GeoServerRegistrationService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * GeoServer 등록 Tasklet + * + *

기존 GeoServerRegistrationService를 재사용하여 shapefile을 GeoServer에 등록 + * + *

Conditional execution: geoserver.enabled=false 이면 skip + */ +@Component +@StepScope +public class GeoServerRegistrationTasklet implements Tasklet { + + private static final Logger log = LoggerFactory.getLogger(GeoServerRegistrationTasklet.class); + + private final GeoServerRegistrationService geoServerService; + + @Value("#{jobParameters['geoserver.enabled'] ?: false}") + private boolean geoServerEnabled; + + @Value("#{jobParameters['layerName']}") + private String layerName; + + public GeoServerRegistrationTasklet(GeoServerRegistrationService geoServerService) { + this.geoServerService = geoServerService; + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) + throws Exception { + + if (!geoServerEnabled) { + log.info("GeoServer registration is disabled. Skipping."); + return RepeatStatus.FINISHED; + } + + log.info("Starting GeoServer registration for layer: {}", layerName); + + // JobExecutionContext에서 ZIP 파일 경로 가져오기 + String zipPath = + (String) + chunkContext + .getStepContext() + .getStepExecution() + .getJobExecution() + .getExecutionContext() + .get("zipFilePath"); + + if (zipPath == null) { + log.error("ZIP file path not found in JobExecutionContext"); + throw new IllegalStateException("ZIP file path not available for GeoServer registration"); + } + + // 기존 GeoServerRegistrationService 재사용 + geoServerService.uploadShapefileZip(zipPath, layerName); + + log.info("GeoServer registration completed successfully for layer: {}", layerName); + + return RepeatStatus.FINISHED; + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeometryTypeValidationTasklet.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeometryTypeValidationTasklet.java new file mode 100644 index 0000000..fe9d2dc --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/tasklet/GeometryTypeValidationTasklet.java @@ -0,0 +1,302 @@ +package com.kamco.makesample.batch.tasklet; + +import com.kamco.makesample.exception.MixedGeometryException; +import java.sql.Array; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.ArrayList; +import java.util.List; +import javax.sql.DataSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepContribution; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.core.scope.context.ChunkContext; +import org.springframework.batch.core.step.tasklet.Tasklet; +import org.springframework.batch.repeat.RepeatStatus; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * Geometry Type 검증 Tasklet + * + *

Shapefile은 homogeneous geometry type을 요구하므로, chunk 처리 전에 사전 검증 필요 + * + *

주요 역할: + * + *

    + *
  • SQL DISTINCT 쿼리로 geometry type 확인 (ST_Polygon, ST_MultiPolygon만 조회) + *
  • 지원하지 않는 geometry 타입 발견 시 즉시 에러 발생 (fast-fail) + *
  • StepExecutionContext에 geometry type 저장 (Writer가 사용) + *
+ */ +@Component +@StepScope +public class GeometryTypeValidationTasklet implements Tasklet { + + private static final Logger log = LoggerFactory.getLogger(GeometryTypeValidationTasklet.class); + + private final DataSource dataSource; + + @Value("#{jobParameters['batchIds']}") + private String batchIdsParam; + + public GeometryTypeValidationTasklet(DataSource dataSource) { + this.dataSource = dataSource; + } + + @Override + public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) + throws Exception { + + log.info("========================================"); + log.info("Step 1: Geometry Type Validation"); + log.info("========================================"); + log.info("Validating geometry types for batch_ids: {}", batchIdsParam); + + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 1. JobParameter를 Long 배열로 변환 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 예: "252,253,257" → [252L, 253L, 257L] + Long[] batchIds = parseBatchIds(batchIdsParam); + log.debug("Parsed batch IDs: {}", (Object) batchIds); + + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 1.1 전체 row 개수 조회 (검증 전) + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + String countAllSql = + """ + SELECT COUNT(*) as total_count + FROM inference_results_testing + WHERE batch_id = ANY(?) + AND geometry IS NOT NULL + """; + + long totalRows = 0; + try (Connection conn = dataSource.getConnection(); + PreparedStatement ps = conn.prepareStatement(countAllSql)) { + Array sqlArray = conn.createArrayOf("bigint", batchIds); + ps.setArray(1, sqlArray); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + totalRows = rs.getLong("total_count"); + } + } + } + log.info("Total rows with non-null geometry: {}", totalRows); + + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 2. SQL로 고유한 geometry type 조회 및 좌표계 검증 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // ST_GeometryType()는 "ST_Polygon", "ST_MultiPolygon" 등을 반환 + // DISTINCT로 고유한 타입만 조회 + // ST_Polygon, ST_MultiPolygon만 허용 (Point, LineString 등은 제외) + // geometry IS NOT NULL 조건으로 null geometry 제외 + // + // EPSG:5186 좌표계 정합성 검증: + // - SRID가 5186인지 확인 + // - 유효 범위: X(125000~530000m), Y(-600000~988000m) - 한국 중부 영역 + // - ST_IsValid()로 geometry 유효성 검증 + String sql = + """ + SELECT DISTINCT ST_GeometryType(geometry) as geom_type + FROM inference_results_testing + WHERE batch_id = ANY(?) + AND geometry IS NOT NULL + AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon') + AND ST_SRID(geometry) = 5186 + AND ST_IsValid(geometry) = true + AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000 + AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000 + """; + + List geometryTypes = new ArrayList<>(); + + try (Connection conn = dataSource.getConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + + // PostgreSQL array 파라미터 설정 + Array sqlArray = conn.createArrayOf("bigint", batchIds); + ps.setArray(1, sqlArray); + + try (ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + String geomType = rs.getString("geom_type"); + geometryTypes.add(geomType); + log.debug("Found geometry type: {}", geomType); + } + } + } + + log.info("Found {} distinct geometry type(s): {}", geometryTypes.size(), geometryTypes); + + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 2.1 검증 통과 row 개수 조회 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + String countValidSql = + """ + SELECT COUNT(*) as valid_count + FROM inference_results_testing + WHERE batch_id = ANY(?) + AND geometry IS NOT NULL + AND ST_GeometryType(geometry) IN ('ST_Polygon', 'ST_MultiPolygon') + AND ST_SRID(geometry) = 5186 + AND ST_IsValid(geometry) = true + AND ST_XMin(geometry) >= 125000 AND ST_XMax(geometry) <= 530000 + AND ST_YMin(geometry) >= -600000 AND ST_YMax(geometry) <= 988000 + """; + + long validRows = 0; + try (Connection conn = dataSource.getConnection(); + PreparedStatement ps = conn.prepareStatement(countValidSql)) { + Array sqlArray = conn.createArrayOf("bigint", batchIds); + ps.setArray(1, sqlArray); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + validRows = rs.getLong("valid_count"); + } + } + } + + long excludedRows = totalRows - validRows; + log.info("========================================"); + log.info("📊 Geometry Validation Summary:"); + log.info(" Total rows: {}", totalRows); + log.info(" Valid rows: {} (EPSG:5186 compliant)", validRows); + log.info(" Excluded rows: {} (invalid geometry or out of range)", excludedRows); + if (excludedRows > 0) { + log.warn( + "⚠️ {} rows excluded due to invalid geometry or coordinate out of EPSG:5186 range", + excludedRows); + } + log.info("========================================"); + + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 3. Mixed geometry type 체크 및 자동 변환 안내 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // Shapefile 제약사항: 하나의 shapefile은 단일 geometry type만 허용 + // MultiPolygon이 포함된 경우 자동으로 Polygon으로 변환됨 (GeometryConverter) + // + // ⚠️ 참고: SQL 필터로 ST_Polygon, ST_MultiPolygon만 조회하므로 + // 이론적으로는 이 두 타입만 존재해야 함 + // 만약 다른 타입이 섞여 있다면 데이터 정합성 문제 + if (geometryTypes.size() > 1) { + // ST_Polygon과 ST_MultiPolygon이 섞인 경우 → 자동 변환 허용 + boolean hasPolygon = geometryTypes.stream().anyMatch(t -> t.equals("ST_Polygon")); + boolean hasMultiPolygon = geometryTypes.stream().anyMatch(t -> t.equals("ST_MultiPolygon")); + + if (hasPolygon && hasMultiPolygon && geometryTypes.size() == 2) { + log.info("========================================"); + log.info("ℹ️ Mixed geometry types detected:"); + log.info(" Types: {}", geometryTypes); + log.info(""); + log.info("✅ Auto-conversion enabled:"); + log.info(" ST_MultiPolygon → ST_Polygon (first polygon only)"); + log.info(" This will unify all geometries to ST_Polygon type"); + log.info("========================================"); + + // Polygon을 기본 타입으로 설정 (자동 변환 후 모든 geometry가 Polygon이 됨) + chunkContext + .getStepContext() + .getStepExecution() + .getExecutionContext() + .putString("geometryType", "ST_Polygon"); + + log.info("✅ Geometry type validation PASSED with auto-conversion"); + log.info(" Target Type: ST_Polygon"); + log.info(" MultiPolygon geometries will be converted during processing"); + log.info("========================================"); + + return RepeatStatus.FINISHED; + } + + // 그 외의 혼합 타입은 즉시 에러 발생 (fast-fail) + // 예: ST_Polygon + ST_Point 등 (하지만 SQL 필터로 이미 제외되었어야 함) + log.error("❌ Unexpected mixed geometry types detected: {}", geometryTypes); + log.error("Shapefile requires homogeneous geometry type"); + log.error("Only Polygon + MultiPolygon mix is supported with auto-conversion"); + throw new MixedGeometryException( + "Shapefile requires homogeneous geometry type. Found: " + + geometryTypes + + ". Only Polygon + MultiPolygon mix is supported."); + } + + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 4. 빈 데이터셋 체크 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 모든 geometry가 null이거나 데이터가 없는 경우 + // 경고만 출력하고 통과 (Reader에서 읽을 데이터가 없으므로 Writer까지 가지 않음) + if (geometryTypes.isEmpty()) { + log.warn("========================================"); + log.warn("WARNING: No valid geometries found in dataset"); + log.warn("This may indicate:"); + log.warn(" 1. All geometries are NULL"); + log.warn(" 2. No data exists for the given batch_ids"); + log.warn(" 3. Database connection issues"); + log.warn("========================================"); + log.warn("Proceeding with empty dataset (no files will be generated)"); + + // 빈 데이터셋이지만 Step은 성공으로 처리 + // 다음 Step(Reader)에서 읽을 데이터가 없으므로 자연스럽게 종료됨 + return RepeatStatus.FINISHED; + } + + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 5. StepExecutionContext에 geometry type 저장 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 이후 Step에서 사용: + // - FeatureConversionProcessor: featureType 생성 시 사용 + // - StreamingShapefileWriter: shapefile schema 생성 시 사용 + String geometryType = geometryTypes.get(0); + + // MultiPolygon 타입인 경우 Polygon으로 변환됨을 안내 + if (geometryType.equals("ST_MultiPolygon")) { + log.info("========================================"); + log.info("ℹ️ Geometry type: {}", geometryType); + log.info(""); + log.info("✅ Auto-conversion will be applied:"); + log.info(" ST_MultiPolygon → ST_Polygon (first polygon only)"); + log.info(" All MultiPolygon geometries will be converted during processing"); + log.info("========================================"); + + // Polygon으로 저장 (변환 후 타입) + geometryType = "ST_Polygon"; + } + + chunkContext + .getStepContext() + .getStepExecution() + .getExecutionContext() + .putString("geometryType", geometryType); + + log.info("========================================"); + log.info("✅ Geometry type validation PASSED"); + log.info(" Geometry Type: {}", geometryType); + log.info(" All geometries are homogeneous (or will be converted)"); + log.info("========================================"); + + return RepeatStatus.FINISHED; + } + + /** + * JobParameter 문자열을 Long 배열로 변환 + * + * @param batchIdsParam "252,253,257" 형태의 문자열 + * @return Long 배열 + */ + private Long[] parseBatchIds(String batchIdsParam) { + if (batchIdsParam == null || batchIdsParam.trim().isEmpty()) { + throw new IllegalArgumentException("batchIds parameter is required"); + } + + String[] parts = batchIdsParam.split(","); + Long[] batchIds = new Long[parts.length]; + + for (int i = 0; i < parts.length; i++) { + batchIds[i] = Long.parseLong(parts[i].trim()); + } + + return batchIds; + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/util/FeatureTypeFactory.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/util/FeatureTypeFactory.java new file mode 100644 index 0000000..9ee4c10 --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/util/FeatureTypeFactory.java @@ -0,0 +1,75 @@ +package com.kamco.makesample.batch.util; + +import org.geotools.api.feature.simple.SimpleFeatureType; +import org.geotools.api.referencing.crs.CoordinateReferenceSystem; +import org.geotools.feature.simple.SimpleFeatureTypeBuilder; +import org.locationtech.jts.geom.Geometry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +/** + * SimpleFeatureType 생성 유틸리티 + * + *

Processor와 Writer가 공유하는 featureType 생성 로직 + * + *

일관성 보장: 동일한 스키마를 사용하여 feature 생성 및 저장 + */ +@Component +public class FeatureTypeFactory { + + private static final Logger log = LoggerFactory.getLogger(FeatureTypeFactory.class); + + /** + * SimpleFeatureType 생성 + * + * @param crs CoordinateReferenceSystem + * @param geomType Geometry 타입 (Polygon, MultiPolygon 등) + * @return SimpleFeatureType + */ + public SimpleFeatureType createFeatureType(CoordinateReferenceSystem crs, Class geomType) { + SimpleFeatureTypeBuilder builder = new SimpleFeatureTypeBuilder(); + builder.setName("inference_results"); + builder.setCRS(crs); + + // Geometry 필드를 기본 geometry로 설정 + builder.add("the_geom", geomType); + builder.setDefaultGeometry("the_geom"); + + // 속성 필드들 + builder.add("uid", String.class); + builder.add("map_id", String.class); + builder.add("chn_dtct_p", String.class); + builder.add("cprs_yr", Long.class); + builder.add("crtr_yr", Long.class); + builder.add("bf_cls_cd", String.class); + builder.add("bf_cls_pro", String.class); + builder.add("af_cls_cd", String.class); + builder.add("af_cls_pro", String.class); + + return builder.buildFeatureType(); + } + + /** + * Geometry type 문자열을 Class로 변환 + * + * @param geomTypeStr "ST_Polygon", "Polygon" 등 + * @return Geometry Class + */ + public Class parseGeometryType(String geomTypeStr) { + if (geomTypeStr == null || geomTypeStr.isEmpty()) { + return Geometry.class; + } + + // PostGIS ST_GeometryType() 함수는 "ST_Polygon" 형태로 반환 + // "ST_" 접두어 제거 + String typeName = geomTypeStr.replace("ST_", ""); + + try { + return Class.forName("org.locationtech.jts.geom." + typeName); + } catch (ClassNotFoundException e) { + log.warn("Unknown geometry type: {}, using Geometry.class", typeName); + return Geometry.class; + } + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdGeoJsonWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdGeoJsonWriter.java new file mode 100644 index 0000000..a55b5f4 --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdGeoJsonWriter.java @@ -0,0 +1,193 @@ +package com.kamco.makesample.batch.writer; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.geojson.feature.FeatureJSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.AfterStep; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.core.annotation.OnWriteError; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemStreamWriter; +import org.springframework.stereotype.Component; + +/** + * Map ID별 GeoJSON Writer + * + *

Partitioned Step의 worker에서 실행되며, StepExecutionContext로부터 map_id별 geoJsonOutputPath를 읽어 개별 + * GeoJSON 파일을 생성합니다. + * + *

StreamingGeoJsonWriter와 동일한 스트리밍 패턴을 사용하여 메모리 효율적으로 처리합니다. + */ +@Component +@StepScope +public class MapIdGeoJsonWriter implements ItemStreamWriter { + + private static final Logger log = LoggerFactory.getLogger(MapIdGeoJsonWriter.class); + + private String outputPath; + private String mapId; + + private FileOutputStream outputStream; + private FeatureJSON featureJSON; + + private int chunkCount = 0; + private int totalRecordCount = 0; + private boolean isFirstChunk = true; + + @BeforeStep + public void beforeStep(StepExecution stepExecution) { + // StepExecutionContext에서 partition별 파라미터 읽기 + ExecutionContext executionContext = stepExecution.getExecutionContext(); + this.mapId = executionContext.getString("mapId"); + this.outputPath = executionContext.getString("geoJsonOutputPath"); + + log.info("MapIdGeoJsonWriter initialized for map_id: {}, output: {}", mapId, outputPath); + + // 출력 디렉토리 생성 + try { + Path outputDir = Paths.get(outputPath).getParent(); + if (outputDir != null && !Files.exists(outputDir)) { + Files.createDirectories(outputDir); + log.info("Created output directory for map_id {} GeoJSON: {}", mapId, outputDir); + } + } catch (IOException e) { + throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e); + } + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + log.info("Opening GeoJSON writer for map_id: {}", mapId); + + try { + File geoJsonFile = new File(outputPath); + outputStream = new FileOutputStream(geoJsonFile); + featureJSON = new FeatureJSON(); + + // GeoJSON FeatureCollection 시작 + outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes()); + + log.info("GeoJSON file initialized for map_id: {}", mapId); + + } catch (IOException e) { + throw new ItemStreamException("Failed to open GeoJSON file for map_id " + mapId, e); + } + } + + @Override + public void write(Chunk chunk) throws Exception { + if (chunk.isEmpty()) { + return; + } + + chunkCount++; + List items = (List) chunk.getItems(); + int itemCount = items.size(); + totalRecordCount += itemCount; + + log.debug( + "[map_id: {}] Writing chunk #{} to GeoJSON with {} features (total: {})", + mapId, + chunkCount, + itemCount, + totalRecordCount); + + // 각 feature를 GeoJSON으로 변환하여 append + for (int i = 0; i < items.size(); i++) { + SimpleFeature feature = items.get(i); + + // 첫 번째 feature가 아니면 콤마 추가 + if (!isFirstChunk || i > 0) { + outputStream.write(",".getBytes()); + } + + // Feature를 GeoJSON으로 직렬화 + featureJSON.writeFeature(feature, outputStream); + } + + isFirstChunk = false; + + log.debug("[map_id: {}] Chunk #{} written to GeoJSON successfully", mapId, chunkCount); + } + + @AfterStep + public void afterStep() { + log.info( + "[map_id: {}] All chunks written to GeoJSON. Total {} records in {} chunks", + mapId, + totalRecordCount, + chunkCount); + + try { + if (outputStream != null) { + // GeoJSON FeatureCollection 종료 + outputStream.write("]}".getBytes()); + outputStream.flush(); + log.info("[map_id: {}] GeoJSON file finalized successfully", mapId); + } + } catch (IOException e) { + log.error("[map_id: {}] Failed to finalize GeoJSON file", mapId, e); + throw new ItemStreamException("Failed to finalize GeoJSON file for map_id: " + mapId, e); + } finally { + cleanup(); + } + } + + @Override + public void close() throws ItemStreamException { + cleanup(); + } + + @OnWriteError + public void onError(Exception exception, Chunk chunk) { + log.error( + "[map_id: {}] Error writing chunk #{} to GeoJSON: {}", + mapId, + chunkCount, + exception.getMessage(), + exception); + + cleanup(); + + // 부분 파일 삭제 + try { + File geoJsonFile = new File(outputPath); + if (geoJsonFile.exists()) { + geoJsonFile.delete(); + log.info("[map_id: {}] Deleted partial GeoJSON file", mapId); + } + } catch (Exception e) { + log.warn("[map_id: {}] Failed to delete partial GeoJSON file", mapId, e); + } + } + + private void cleanup() { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + log.warn("[map_id: {}] Failed to close GeoJSON output stream", mapId, e); + } + outputStream = null; + } + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + // Checkpoint + executionContext.putInt("chunkCount", chunkCount); + executionContext.putInt("totalRecordCount", totalRecordCount); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java new file mode 100644 index 0000000..93b3077 --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/MapIdShapefileWriter.java @@ -0,0 +1,250 @@ +package com.kamco.makesample.batch.writer; + +import com.kamco.makesample.batch.util.FeatureTypeFactory; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.geotools.api.data.SimpleFeatureStore; +import org.geotools.api.data.Transaction; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.api.feature.simple.SimpleFeatureType; +import org.geotools.api.referencing.FactoryException; +import org.geotools.api.referencing.crs.CoordinateReferenceSystem; +import org.geotools.data.DefaultTransaction; +import org.geotools.data.collection.ListFeatureCollection; +import org.geotools.data.shapefile.ShapefileDataStore; +import org.geotools.data.shapefile.ShapefileDataStoreFactory; +import org.geotools.referencing.CRS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.AfterStep; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.core.annotation.OnWriteError; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemStreamWriter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * Map ID별 Shapefile Writer + * + *

Partitioned Step의 worker에서 실행되며, StepExecutionContext로부터 map_id별 outputPath를 읽어 개별 shapefile을 + * 생성합니다. + * + *

StreamingShapefileWriter와 동일한 스트리밍 패턴을 사용하여 메모리 효율적으로 처리합니다. + */ +@Component +@StepScope +public class MapIdShapefileWriter implements ItemStreamWriter { + + private static final Logger log = LoggerFactory.getLogger(MapIdShapefileWriter.class); + + private final FeatureTypeFactory featureTypeFactory; + + @Value("#{jobParameters['crs'] ?: 'EPSG:5186'}") + private String crsCode; + + private String outputPath; + private String mapId; + + private ShapefileDataStore dataStore; + private Transaction transaction; + private SimpleFeatureStore featureStore; + private SimpleFeatureType featureType; + + private int chunkCount = 0; + private int totalRecordCount = 0; + + private Class geometryType; + + public MapIdShapefileWriter(FeatureTypeFactory featureTypeFactory) { + this.featureTypeFactory = featureTypeFactory; + } + + @BeforeStep + public void beforeStep(StepExecution stepExecution) { + // StepExecutionContext에서 partition별 파라미터 읽기 + ExecutionContext executionContext = stepExecution.getExecutionContext(); + this.mapId = executionContext.getString("mapId"); + this.outputPath = executionContext.getString("outputPath"); + + log.info("MapIdShapefileWriter initialized for map_id: {}, output: {}", mapId, outputPath); + + // Geometry type 읽기 (validation tasklet에서 설정) + String geomTypeStr = executionContext.getString("geometryType"); + if (geomTypeStr != null) { + this.geometryType = featureTypeFactory.parseGeometryType(geomTypeStr); + log.debug("Geometry type from validation: {}", geometryType.getSimpleName()); + } + + // 출력 디렉토리 생성 + try { + Path outputDir = Paths.get(outputPath).getParent(); + if (outputDir != null && !Files.exists(outputDir)) { + Files.createDirectories(outputDir); + log.info("Created output directory for map_id {}: {}", mapId, outputDir); + } + } catch (IOException e) { + throw new ItemStreamException("Failed to create output directory for map_id: " + mapId, e); + } + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + log.info("Opening shapefile writer for map_id: {}", mapId); + + try { + // CRS 설정 + CoordinateReferenceSystem crs = CRS.decode(crsCode); + + // Geometry type 기본값 설정 + if (geometryType == null) { + geometryType = featureTypeFactory.parseGeometryType(null); + log.warn("Geometry type not set for map_id {}, using default: Geometry.class", mapId); + } + + // SimpleFeatureType 생성 + featureType = featureTypeFactory.createFeatureType(crs, geometryType); + + // ShapefileDataStore 생성 + File shpFile = new File(outputPath); + ShapefileDataStoreFactory factory = new ShapefileDataStoreFactory(); + + Map params = new HashMap<>(); + params.put("url", shpFile.toURI().toURL()); + params.put("create spatial index", Boolean.TRUE); + + dataStore = (ShapefileDataStore) factory.createNewDataStore(params); + dataStore.createSchema(featureType); + + // Transaction 시작 + transaction = new DefaultTransaction("create-" + mapId); + + // FeatureStore 가져오기 + String typeName = dataStore.getTypeNames()[0]; + featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName); + featureStore.setTransaction(transaction); + + log.info("ShapefileDataStore initialized for map_id: {}", mapId); + + } catch (FactoryException e) { + throw new ItemStreamException("Invalid CRS code: " + crsCode, e); + } catch (IOException e) { + throw new ItemStreamException("Failed to create shapefile for map_id " + mapId, e); + } + } + + @Override + public void write(Chunk chunk) throws Exception { + if (chunk.isEmpty()) { + return; + } + + chunkCount++; + List items = (List) chunk.getItems(); + int itemCount = items.size(); + totalRecordCount += itemCount; + + log.debug( + "[map_id: {}] Writing chunk #{} with {} features (total: {})", + mapId, + chunkCount, + itemCount, + totalRecordCount); + + // ListFeatureCollection으로 변환하여 FeatureStore에 추가 + ListFeatureCollection collection = new ListFeatureCollection(featureType, items); + featureStore.addFeatures(collection); + + log.debug("[map_id: {}] Chunk #{} written successfully", mapId, chunkCount); + } + + @AfterStep + public void afterStep() { + log.info( + "[map_id: {}] All chunks written. Committing {} records in {} chunks", + mapId, + totalRecordCount, + chunkCount); + + try { + if (transaction != null) { + transaction.commit(); + log.info("[map_id: {}] Transaction committed successfully", mapId); + } + } catch (IOException e) { + log.error("[map_id: {}] Failed to commit transaction", mapId, e); + throw new ItemStreamException( + "Failed to commit shapefile transaction for map_id: " + mapId, e); + } finally { + cleanup(); + } + } + + @Override + public void close() throws ItemStreamException { + cleanup(); + } + + @OnWriteError + public void onError(Exception exception, Chunk chunk) { + log.error( + "[map_id: {}] Error writing chunk #{}: {}", + mapId, + chunkCount, + exception.getMessage(), + exception); + + try { + if (transaction != null) { + transaction.rollback(); + log.info("[map_id: {}] Transaction rolled back", mapId); + } + + // 부분 파일 삭제 + File shpFile = new File(outputPath); + if (shpFile.exists()) { + shpFile.delete(); + log.info("[map_id: {}] Deleted partial shapefile", mapId); + } + + } catch (IOException e) { + log.error("[map_id: {}] Failed to rollback transaction", mapId, e); + } finally { + cleanup(); + } + } + + private void cleanup() { + if (transaction != null) { + try { + transaction.close(); + } catch (IOException e) { + log.warn("[map_id: {}] Failed to close transaction", mapId, e); + } + transaction = null; + } + + if (dataStore != null) { + dataStore.dispose(); + dataStore = null; + } + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + // Checkpoint + executionContext.putInt("chunkCount", chunkCount); + executionContext.putInt("totalRecordCount", totalRecordCount); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingGeoJsonWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingGeoJsonWriter.java new file mode 100644 index 0000000..fef6c2b --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingGeoJsonWriter.java @@ -0,0 +1,185 @@ +package com.kamco.makesample.batch.writer; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.geojson.feature.FeatureJSON; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.annotation.AfterStep; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.core.annotation.OnWriteError; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemStreamWriter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * 스트리밍 GeoJSON Writer + * + *

StreamingShapefileWriter와 유사한 패턴으로 chunk 단위 스트리밍 처리 + * + *

메모리 효과: + * + *

    + *
  • 기존: 전체 데이터를 DefaultFeatureCollection에 누적 + *
  • 신규: chunk 단위로 GeoJSON 스트림에 append + *
+ */ +@Component +@StepScope +public class StreamingGeoJsonWriter implements ItemStreamWriter { + + private static final Logger log = LoggerFactory.getLogger(StreamingGeoJsonWriter.class); + + @Value("#{jobParameters['geoJsonOutputPath']}") + private String outputPath; + + private FileOutputStream outputStream; + private FeatureJSON featureJSON; + + private int chunkCount = 0; + private int totalRecordCount = 0; + + private boolean isFirstChunk = true; + + @BeforeStep + public void beforeStep() { + // 출력 디렉토리 생성 + try { + Path outputDir = Paths.get(outputPath).getParent(); + if (outputDir != null && !Files.exists(outputDir)) { + Files.createDirectories(outputDir); + log.info("Created output directory for GeoJSON: {}", outputDir); + } + } catch (IOException e) { + throw new ItemStreamException("Failed to create output directory", e); + } + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + log.info("Opening StreamingGeoJsonWriter for: {}", outputPath); + + try { + File geoJsonFile = new File(outputPath); + outputStream = new FileOutputStream(geoJsonFile); + featureJSON = new FeatureJSON(); + + // GeoJSON FeatureCollection 시작 + outputStream.write("{\"type\":\"FeatureCollection\",\"features\":[".getBytes()); + + log.info("GeoJSON file initialized successfully"); + + } catch (IOException e) { + throw new ItemStreamException("Failed to open GeoJSON file: " + outputPath, e); + } + } + + @Override + public void write(Chunk chunk) throws Exception { + if (chunk.isEmpty()) { + return; + } + + chunkCount++; + List items = (List) chunk.getItems(); + int itemCount = items.size(); + totalRecordCount += itemCount; + + log.debug( + "Writing chunk #{} to GeoJSON with {} features (total so far: {})", + chunkCount, + itemCount, + totalRecordCount); + + // 각 feature를 GeoJSON으로 변환하여 append + for (int i = 0; i < items.size(); i++) { + SimpleFeature feature = items.get(i); + + // 첫 번째 feature가 아니면 콤마 추가 + if (!isFirstChunk || i > 0) { + outputStream.write(",".getBytes()); + } + + // Feature를 GeoJSON으로 직렬화 + featureJSON.writeFeature(feature, outputStream); + } + + isFirstChunk = false; + + log.debug("Chunk #{} written to GeoJSON successfully", chunkCount); + } + + @AfterStep + public void afterStep() { + log.info( + "All chunks written to GeoJSON. Total {} records in {} chunks", + totalRecordCount, + chunkCount); + + try { + if (outputStream != null) { + // GeoJSON FeatureCollection 종료 + outputStream.write("]}".getBytes()); + outputStream.flush(); + log.info("GeoJSON file finalized successfully"); + } + } catch (IOException e) { + log.error("Failed to finalize GeoJSON file", e); + throw new ItemStreamException("Failed to finalize GeoJSON file", e); + } finally { + cleanup(); + } + } + + @Override + public void close() throws ItemStreamException { + cleanup(); + } + + @OnWriteError + public void onError(Exception exception, Chunk chunk) { + log.error( + "Error writing chunk #{} to GeoJSON: {}", chunkCount, exception.getMessage(), exception); + + cleanup(); + + // 부분 파일 삭제 + try { + File geoJsonFile = new File(outputPath); + if (geoJsonFile.exists()) { + geoJsonFile.delete(); + log.info("Deleted partial GeoJSON file: {}", outputPath); + } + } catch (Exception e) { + log.warn("Failed to delete partial GeoJSON file", e); + } + } + + private void cleanup() { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + log.warn("Failed to close GeoJSON output stream", e); + } + outputStream = null; + } + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + // Checkpoint + executionContext.putInt("chunkCount", chunkCount); + executionContext.putInt("totalRecordCount", totalRecordCount); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java new file mode 100644 index 0000000..c3b24d5 --- /dev/null +++ b/shp-exporter/src/main/java/com/kamco/makesample/batch/writer/StreamingShapefileWriter.java @@ -0,0 +1,253 @@ +package com.kamco.makesample.batch.writer; + +import com.kamco.makesample.batch.util.FeatureTypeFactory; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.geotools.api.data.SimpleFeatureStore; +import org.geotools.api.data.Transaction; +import org.geotools.api.feature.simple.SimpleFeature; +import org.geotools.api.feature.simple.SimpleFeatureType; +import org.geotools.api.referencing.FactoryException; +import org.geotools.api.referencing.crs.CoordinateReferenceSystem; +import org.geotools.data.DefaultTransaction; +import org.geotools.data.collection.ListFeatureCollection; +import org.geotools.data.shapefile.ShapefileDataStore; +import org.geotools.data.shapefile.ShapefileDataStoreFactory; +import org.geotools.referencing.CRS; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.core.StepExecution; +import org.springframework.batch.core.annotation.AfterStep; +import org.springframework.batch.core.annotation.BeforeStep; +import org.springframework.batch.core.annotation.OnWriteError; +import org.springframework.batch.core.configuration.annotation.StepScope; +import org.springframework.batch.item.Chunk; +import org.springframework.batch.item.ExecutionContext; +import org.springframework.batch.item.ItemStreamException; +import org.springframework.batch.item.ItemStreamWriter; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * 스트리밍 Shapefile Writer - 메모리 병목 해결의 핵심 + * + *

기존 문제: DefaultFeatureCollection에 모든 feature를 누적 (500MB-4GB) + * + *

해결 방안: GeoTools Transaction API를 사용한 chunk 단위 incremental write + * + *

메모리 효과: + * + *

    + *
  • 기존: 전체 데이터 (500MB-4GB) + *
  • 신규: 청크 크기만 (40MB per 1000 records) + *
+ * + *

동작 방식: + * + *

    + *
  1. open(): 첫 번째 chunk 전에 DataStore 생성, Transaction 시작 + *
  2. write(): 각 chunk를 ListFeatureCollection으로 변환하여 FeatureStore에 추가 + *
  3. afterStep(): 모든 chunk 완료 후 Transaction commit + *
+ */ +@Component +@StepScope +public class StreamingShapefileWriter implements ItemStreamWriter { + + private static final Logger log = LoggerFactory.getLogger(StreamingShapefileWriter.class); + + private final FeatureTypeFactory featureTypeFactory; + + @Value("#{jobParameters['outputPath']}") + private String outputPath; + + @Value("#{jobParameters['crs'] ?: 'EPSG:5186'}") + private String crsCode; + + private ShapefileDataStore dataStore; + private Transaction transaction; + private SimpleFeatureStore featureStore; + private SimpleFeatureType featureType; + + private int chunkCount = 0; + private int totalRecordCount = 0; + + private Class geometryType; // Geometry type from validation tasklet + + public StreamingShapefileWriter(FeatureTypeFactory featureTypeFactory) { + this.featureTypeFactory = featureTypeFactory; + } + + @BeforeStep + public void beforeStep(StepExecution stepExecution) { + // StepExecutionContext에서 geometry type 읽기 + String geomTypeStr = stepExecution.getExecutionContext().getString("geometryType"); + + if (geomTypeStr != null) { + this.geometryType = featureTypeFactory.parseGeometryType(geomTypeStr); + log.info("Geometry type from validation: {}", geometryType.getSimpleName()); + } + + // 출력 디렉토리 생성 + try { + Path outputDir = Paths.get(outputPath).getParent(); + if (outputDir != null && !Files.exists(outputDir)) { + Files.createDirectories(outputDir); + log.info("Created output directory: {}", outputDir); + } + } catch (IOException e) { + throw new ItemStreamException("Failed to create output directory", e); + } + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + log.info("Opening StreamingShapefileWriter for: {}", outputPath); + + try { + // CRS 설정 + CoordinateReferenceSystem crs = CRS.decode(crsCode); + + // Geometry type이 아직 설정되지 않은 경우 기본값 사용 + if (geometryType == null) { + geometryType = featureTypeFactory.parseGeometryType(null); + log.warn("Geometry type not set, using default: Geometry.class"); + } + + // SimpleFeatureType 생성 (FeatureTypeFactory 사용) + featureType = featureTypeFactory.createFeatureType(crs, geometryType); + + // ShapefileDataStore 생성 + File shpFile = new File(outputPath); + ShapefileDataStoreFactory factory = new ShapefileDataStoreFactory(); + + Map params = new HashMap<>(); + params.put("url", shpFile.toURI().toURL()); + params.put("create spatial index", Boolean.TRUE); + + dataStore = (ShapefileDataStore) factory.createNewDataStore(params); + dataStore.createSchema(featureType); + + // Transaction 시작 + transaction = new DefaultTransaction("create"); + + // FeatureStore 가져오기 + String typeName = dataStore.getTypeNames()[0]; + featureStore = (SimpleFeatureStore) dataStore.getFeatureSource(typeName); + featureStore.setTransaction(transaction); + + log.info("ShapefileDataStore initialized successfully"); + + } catch (FactoryException e) { + throw new ItemStreamException("Invalid CRS code: " + crsCode, e); + } catch (IOException e) { + throw new ItemStreamException("Failed to create shapefile at: " + outputPath, e); + } + } + + @Override + public void write(Chunk chunk) throws Exception { + if (chunk.isEmpty()) { + return; + } + + chunkCount++; + List items = (List) chunk.getItems(); + int itemCount = items.size(); + totalRecordCount += itemCount; + + log.debug( + "Writing chunk #{} with {} features (total so far: {})", + chunkCount, + itemCount, + totalRecordCount); + + // ListFeatureCollection으로 변환 (청크만 담김) + ListFeatureCollection collection = new ListFeatureCollection(featureType, items); + + // FeatureStore에 추가 (트랜잭션은 열린 상태 유지) + featureStore.addFeatures(collection); + + // 청크 완료 후 collection은 GC됨 + log.debug("Chunk #{} written successfully", chunkCount); + } + + @AfterStep + public void afterStep() { + log.info( + "All chunks written. Committing transaction for {} records in {} chunks", + totalRecordCount, + chunkCount); + + try { + if (transaction != null) { + transaction.commit(); + log.info("Transaction committed successfully"); + } + } catch (IOException e) { + log.error("Failed to commit transaction", e); + throw new ItemStreamException("Failed to commit shapefile transaction", e); + } finally { + cleanup(); + } + } + + @Override + public void close() throws ItemStreamException { + cleanup(); + } + + @OnWriteError + public void onError(Exception exception, Chunk chunk) { + log.error("Error writing chunk #{}: {}", chunkCount, exception.getMessage(), exception); + + try { + if (transaction != null) { + transaction.rollback(); + log.info("Transaction rolled back due to error"); + } + + // 부분 파일 삭제 + File shpFile = new File(outputPath); + if (shpFile.exists()) { + shpFile.delete(); + log.info("Deleted partial shapefile: {}", outputPath); + } + + } catch (IOException e) { + log.error("Failed to rollback transaction", e); + } finally { + cleanup(); + } + } + + private void cleanup() { + if (transaction != null) { + try { + transaction.close(); + } catch (IOException e) { + log.warn("Failed to close transaction", e); + } + transaction = null; + } + + if (dataStore != null) { + dataStore.dispose(); + dataStore = null; + } + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + // Checkpoint: chunk 완료 시 호출됨 + executionContext.putInt("chunkCount", chunkCount); + executionContext.putInt("totalRecordCount", totalRecordCount); + } +} diff --git a/shp-exporter/src/main/java/com/kamco/makesample/cli/ConverterCommandLineRunner.java b/shp-exporter/src/main/java/com/kamco/makesample/cli/ConverterCommandLineRunner.java index 131467d..88833ec 100755 --- a/shp-exporter/src/main/java/com/kamco/makesample/cli/ConverterCommandLineRunner.java +++ b/shp-exporter/src/main/java/com/kamco/makesample/cli/ConverterCommandLineRunner.java @@ -7,6 +7,11 @@ import java.nio.file.Paths; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.batch.core.Job; +import org.springframework.batch.core.JobExecution; +import org.springframework.batch.core.JobParameters; +import org.springframework.batch.core.JobParametersBuilder; +import org.springframework.batch.core.launch.JobLauncher; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.DefaultApplicationArguments; @@ -20,14 +25,20 @@ public class ConverterCommandLineRunner implements CommandLineRunner { private final ShapefileConverterService converterService; private final GeoServerRegistrationService geoServerService; private final ConverterProperties converterProperties; + private final JobLauncher jobLauncher; + private final Job mergedModeJob; public ConverterCommandLineRunner( ShapefileConverterService converterService, GeoServerRegistrationService geoServerService, - ConverterProperties converterProperties) { + ConverterProperties converterProperties, + JobLauncher jobLauncher, + Job mergedModeJob) { this.converterService = converterService; this.geoServerService = geoServerService; this.converterProperties = converterProperties; + this.jobLauncher = jobLauncher; + this.mergedModeJob = mergedModeJob; } @Override @@ -37,13 +48,21 @@ public class ConverterCommandLineRunner implements CommandLineRunner { List profiles = appArgs.getOptionValues("spring.profiles.active"); log.info("profiles.active={}", profiles); + // GeoServer 등록 모드 if (appArgs.containsOption("upload-shp")) { handleRegistration(appArgs); return; } - // Existing shapefile generation logic - log.info("=== PostgreSQL to Shapefile Converter ==="); + // Batch 모드 체크 + if (appArgs.containsOption("batch") || appArgs.containsOption("use-batch")) { + handleBatchMode(appArgs); + return; + } + + // 기존 로직 (Legacy mode) - 향후 deprecated 예정 + log.warn("Running in LEGACY mode. Consider using --batch flag for better performance."); + log.info("=== PostgreSQL to Shapefile Converter (Legacy) ==="); log.info("Inference ID: {}", converterProperties.getInferenceId()); List mapIds = converterProperties.getMapIds(); @@ -67,6 +86,103 @@ public class ConverterCommandLineRunner implements CommandLineRunner { } } + /** + * Spring Batch 기반 처리 + * + *

메모리 최적화와 단계별 실행 지원 + * + * @param appArgs ApplicationArguments + * @throws Exception Exception + */ + private void handleBatchMode(ApplicationArguments appArgs) throws Exception { + log.info("=== Spring Batch Mode: Shapefile Converter ==="); + log.info("Inference ID: {}", converterProperties.getInferenceId()); + log.info("Batch IDs: {}", converterProperties.getBatchIds()); + log.info("Output directory: {}", converterProperties.getOutputBaseDir()); + log.info("CRS: {}", converterProperties.getCrs()); + log.info("Chunk size: {}", converterProperties.getBatch().getChunkSize()); + log.info("=============================================="); + + // Job Parameters 구성 + JobParameters jobParams = buildJobParameters(appArgs); + + // Job 실행 + JobExecution execution = jobLauncher.run(mergedModeJob, jobParams); + + log.info("=============================================="); + log.info("Job Execution Status: {}", execution.getStatus()); + log.info("Exit Status: {}", execution.getExitStatus()); + + if (execution.getStatus().isUnsuccessful()) { + log.error("Job execution failed"); + System.exit(1); + } + + log.info("Job completed successfully"); + } + + /** + * Job Parameters 구성 + * + * @param appArgs ApplicationArguments + * @return JobParameters + */ + private JobParameters buildJobParameters(ApplicationArguments appArgs) { + JobParametersBuilder builder = new JobParametersBuilder(); + + // 기본 파라미터 + builder.addString("inferenceId", converterProperties.getInferenceId()); + builder.addString("batchIds", joinBatchIds(converterProperties.getBatchIds())); + builder.addString("crs", converterProperties.getCrs()); + builder.addLong("timestamp", System.currentTimeMillis()); // Job 실행 시각 (고유성 보장) + + // 출력 경로 구성 (Shapefile + GeoJSON 생성, GeoServer는 Shapefile만 등록) + String outputDir = + Paths.get( + converterProperties.getOutputBaseDir(), + converterProperties.getInferenceId(), + "merge") + .toString(); + + String shapefilePath = + Paths.get(outputDir, converterProperties.getInferenceId() + ".shp").toString(); + String geoJsonPath = + Paths.get(outputDir, converterProperties.getInferenceId() + ".geojson").toString(); + + builder.addString("outputPath", shapefilePath); + builder.addString("geoJsonOutputPath", geoJsonPath); + builder.addString("zipBaseName", converterProperties.getInferenceId()); + + // Layer name (GeoServer 등록용) + String layerName = "inference_" + converterProperties.getInferenceId(); + builder.addString("layerName", layerName); + + // GeoServer 등록 여부 + boolean geoServerEnabled = + appArgs.containsOption("geoserver.enabled") + && Boolean.parseBoolean(firstOption(appArgs, "geoserver.enabled")); + builder.addString("geoserver.enabled", String.valueOf(geoServerEnabled)); + + // Batch 설정 + builder.addLong("fetchSize", (long) converterProperties.getBatch().getFetchSize()); + + return builder.toJobParameters(); + } + + /** + * Batch IDs를 콤마로 구분된 문자열로 변환 + * + * @param batchIds List of batch IDs + * @return "252,253,257" 형태의 문자열 + */ + private String joinBatchIds(List batchIds) { + if (batchIds == null || batchIds.isEmpty()) { + throw new IllegalStateException("batch-ids must be specified"); + } + + return String.join(",", batchIds.stream().map(String::valueOf).toArray(String[]::new)); + } + private void handleRegistration(ApplicationArguments appArgs) { // --help if (appArgs.containsOption("help") || appArgs.containsOption("h")) { diff --git a/shp-exporter/src/main/java/com/kamco/makesample/config/ConverterProperties.java b/shp-exporter/src/main/java/com/kamco/makesample/config/ConverterProperties.java index aae98bf..5946d0d 100755 --- a/shp-exporter/src/main/java/com/kamco/makesample/config/ConverterProperties.java +++ b/shp-exporter/src/main/java/com/kamco/makesample/config/ConverterProperties.java @@ -14,6 +14,7 @@ public class ConverterProperties { private String outputBaseDir; private String crs; private String mode; + private BatchConfig batch = new BatchConfig(); // Spring Batch 설정 public String getInferenceId() { return inferenceId; @@ -62,4 +63,61 @@ public class ConverterProperties { public String getMode() { return mode; } + + public BatchConfig getBatch() { + return batch; + } + + public void setBatch(BatchConfig batch) { + this.batch = batch; + } + + /** Spring Batch 관련 설정 */ + public static class BatchConfig { + private int chunkSize = 1000; + private int skipLimit = 100; + private int fetchSize = 1000; + private boolean enablePartitioning = false; + private int partitionConcurrency = 4; // Map ID별 병렬 처리 동시성 + + public int getChunkSize() { + return chunkSize; + } + + public void setChunkSize(int chunkSize) { + this.chunkSize = chunkSize; + } + + public int getSkipLimit() { + return skipLimit; + } + + public void setSkipLimit(int skipLimit) { + this.skipLimit = skipLimit; + } + + public int getFetchSize() { + return fetchSize; + } + + public void setFetchSize(int fetchSize) { + this.fetchSize = fetchSize; + } + + public boolean isEnablePartitioning() { + return enablePartitioning; + } + + public void setEnablePartitioning(boolean enablePartitioning) { + this.enablePartitioning = enablePartitioning; + } + + public int getPartitionConcurrency() { + return partitionConcurrency; + } + + public void setPartitionConcurrency(int partitionConcurrency) { + this.partitionConcurrency = partitionConcurrency; + } + } } diff --git a/shp-exporter/src/main/java/com/kamco/makesample/config/GeoServerProperties.java b/shp-exporter/src/main/java/com/kamco/makesample/config/GeoServerProperties.java index 3a09a89..3337e4d 100755 --- a/shp-exporter/src/main/java/com/kamco/makesample/config/GeoServerProperties.java +++ b/shp-exporter/src/main/java/com/kamco/makesample/config/GeoServerProperties.java @@ -1,7 +1,6 @@ package com.kamco.makesample.config; import jakarta.validation.constraints.NotBlank; -import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import org.springframework.validation.annotation.Validated; @@ -12,11 +11,9 @@ import org.springframework.validation.annotation.Validated; public class GeoServerProperties { @NotBlank(message = "GeoServer base URL must be configured") - @Value("${layer.geoserver-url}") private String baseUrl; @NotBlank(message = "GeoServer workspace must be configured") - @Value("${layer.workspace}") private String workspace; @NotBlank(message = "GeoServer datastore must be configured") diff --git a/shp-exporter/src/main/java/com/kamco/makesample/service/GeometryConverter.java b/shp-exporter/src/main/java/com/kamco/makesample/service/GeometryConverter.java index 3e4d0b6..e74b719 100755 --- a/shp-exporter/src/main/java/com/kamco/makesample/service/GeometryConverter.java +++ b/shp-exporter/src/main/java/com/kamco/makesample/service/GeometryConverter.java @@ -2,12 +2,25 @@ package com.kamco.makesample.service; import com.kamco.makesample.exception.GeometryConversionException; import org.locationtech.jts.geom.Geometry; +import org.locationtech.jts.geom.MultiPolygon; +import org.locationtech.jts.geom.Polygon; import org.locationtech.jts.io.ParseException; import org.locationtech.jts.io.WKTReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +/** + * WKT ↔ JTS Geometry 변환 서비스 + * + *

주요 기능: + * + *

    + *
  • PostGIS WKT 문자열을 JTS Geometry 객체로 변환 + *
  • MultiPolygon을 자동으로 Polygon으로 변환 (첫 번째 polygon 추출) + *
  • Geometry 유효성 검증 + *
+ */ @Component public class GeometryConverter { @@ -15,27 +28,107 @@ public class GeometryConverter { private final WKTReader wktReader; + // MultiPolygon → Polygon 변환 통계 + private static int multiPolygonConversionCount = 0; + public GeometryConverter() { this.wktReader = new WKTReader(); } + /** + * WKT 문자열을 JTS Geometry로 변환 + * + *

변환 규칙: + * + *

    + *
  • MultiPolygon → 첫 번째 Polygon만 추출 (자동 변환) + *
  • Polygon → 그대로 사용 + *
  • 기타 타입 → 그대로 사용 + *
+ * + * @param wkt PostGIS ST_AsText() 결과 (WKT 형식) + * @return JTS Geometry 객체 (MultiPolygon은 Polygon으로 변환됨) + */ public Geometry convertWKTToJTS(String wkt) { if (wkt == null || wkt.trim().isEmpty()) { return null; } try { - // WKT 문자열을 JTS Geometry로 변환 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 1. WKT 문자열을 JTS Geometry로 변환 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ Geometry jtsGeometry = wktReader.read(wkt); + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 2. MultiPolygon → Polygon 자동 변환 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // Shapefile은 단일 타입만 허용하므로 MultiPolygon을 Polygon으로 통일 + // 첫 번째 polygon만 추출 (나머지는 버림) + if (jtsGeometry instanceof MultiPolygon) { + MultiPolygon multiPolygon = (MultiPolygon) jtsGeometry; + + if (multiPolygon.getNumGeometries() > 0) { + // 첫 번째 polygon 추출 + Polygon firstPolygon = (Polygon) multiPolygon.getGeometryN(0); + + // 통계 및 로깅 (첫 10건만 로그 출력) + multiPolygonConversionCount++; + if (multiPolygonConversionCount <= 10) { + log.debug( + "Converting MultiPolygon to Polygon (first geometry only). " + + "MultiPolygon had {} geometries. Conversion count: {}", + multiPolygon.getNumGeometries(), + multiPolygonConversionCount); + } else if (multiPolygonConversionCount == 11) { + log.debug("MultiPolygon → Polygon conversion ongoing... (suppressing further logs)"); + } + + // 여러 polygon을 포함한 경우 경고 + if (multiPolygon.getNumGeometries() > 1) { + log.warn( + "MultiPolygon contains {} polygons. Only the first polygon will be used. " + + "Other {} polygon(s) will be discarded.", + multiPolygon.getNumGeometries(), + multiPolygon.getNumGeometries() - 1); + } + + jtsGeometry = firstPolygon; + } else { + log.warn("Empty MultiPolygon detected (0 geometries). Returning null."); + return null; + } + } + + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + // 3. Geometry 유효성 검증 + // ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ if (!jtsGeometry.isValid()) { - log.warn("Invalid geometry detected: {}", jtsGeometry); + log.warn( + "Invalid geometry detected: type={}, reason={}", + jtsGeometry.getGeometryType(), + jtsGeometry.isValid() ? "valid" : "invalid"); } return jtsGeometry; + } catch (ParseException e) { throw new GeometryConversionException( "Failed to convert WKT to JTS geometry: " + e.getMessage(), e); } } + + /** + * MultiPolygon → Polygon 변환 통계 반환 + * + * @return 변환된 MultiPolygon 개수 + */ + public static int getMultiPolygonConversionCount() { + return multiPolygonConversionCount; + } + + /** 변환 통계 리셋 (테스트용) */ + public static void resetConversionCount() { + multiPolygonConversionCount = 0; + } } diff --git a/shp-exporter/src/main/resources/application-dev.yml b/shp-exporter/src/main/resources/application-dev.yml index 6c92ce3..05583c9 100755 --- a/shp-exporter/src/main/resources/application-dev.yml +++ b/shp-exporter/src/main/resources/application-dev.yml @@ -10,12 +10,6 @@ spring: idle-timeout: 600000 max-lifetime: 1800000 - application: - name: make-shapefile-service - - main: - web-application-type: none # Disable web server for CLI application - converter: inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6 # Optional: omit or set empty to create merged shapefile for all batch-ids @@ -45,7 +39,3 @@ logging: org.springframework: WARN pattern: console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n' - -layer: - geoserver-url: http://label-tile.gs.dabeeo.com - workspace: cd diff --git a/shp-exporter/src/main/resources/application-prod.yml b/shp-exporter/src/main/resources/application-prod.yml index 3e1665d..4bfd289 100755 --- a/shp-exporter/src/main/resources/application-prod.yml +++ b/shp-exporter/src/main/resources/application-prod.yml @@ -5,16 +5,17 @@ spring: password: kamco_cds_Q!W@E#R$ driver-class-name: org.postgresql.Driver hikari: - maximum-pool-size: 5 + maximum-pool-size: 10 # Increased for batch processing connection-timeout: 30000 idle-timeout: 600000 max-lifetime: 1800000 - application: - name: make-shapefile-service - - main: - web-application-type: none # Disable web server for CLI application + batch: + job: + enabled: false # CLI에서 명시적으로 실행 + jdbc: + initialize-schema: always # 메타데이터 테이블 자동 생성 + table-prefix: BATCH_ converter: inference-id: D5E46F60FC40B1A8BE0CD1F3547AA6 @@ -27,15 +28,19 @@ converter: output-base-dir: '/data/model_output/export/' crs: 'EPSG:5186' + batch: + chunk-size: 1000 # 청크 크기 (메모리 ~40MB per chunk) + skip-limit: 100 # 청크당 skip 허용 건수 + fetch-size: 1000 # JDBC 커서 fetch 크기 + enable-partitioning: false # 초기에는 비활성화 + partition-concurrency: 4 # Map ID별 병렬 처리 동시성 (4=~300MB, 8=~600MB) + geoserver: - base-url: 'https://kamco.geo-dev.gs.dabeeo.com/geoserver' + base-url: 'https://aicd-geo.e-kamco.com:18080/geoserver' workspace: 'cd' overwrite-existing: true connection-timeout: 30000 read-timeout: 60000 - # Credentials (optional - environment variables take precedence) - # Uncomment and set values for development convenience - # For production, use GEOSERVER_USERNAME and GEOSERVER_PASSWORD environment variables username: 'admin' password: 'geoserver' @@ -45,9 +50,3 @@ logging: org.springframework: WARN pattern: console: '%d{yyyy-MM-dd HH:mm:ss} - %msg%n' - -layer: - geoserver-url: https://kamco.geo-dev.gs.dabeeo.com - wms-path: geoserver/cd - wmts-path: geoserver/cd/gwc/service - workspace: cd diff --git a/shp-exporter/src/main/resources/application.yml b/shp-exporter/src/main/resources/application.yml index 65d113d..d35f2e3 100755 --- a/shp-exporter/src/main/resources/application.yml +++ b/shp-exporter/src/main/resources/application.yml @@ -3,3 +3,5 @@ spring: name: make-shapefile-service profiles: active: prod + main: + web-application-type: none # Disable web server for CLI application diff --git a/shp-exporter/src/main/resources/db/migration/V1__create_batch_execution_history.sql b/shp-exporter/src/main/resources/db/migration/V1__create_batch_execution_history.sql new file mode 100644 index 0000000..a261d59 --- /dev/null +++ b/shp-exporter/src/main/resources/db/migration/V1__create_batch_execution_history.sql @@ -0,0 +1,69 @@ +-- 배치 실행 이력 테이블 +-- 각 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유 추적 + +CREATE TABLE IF NOT EXISTS batch_execution_history ( + id BIGSERIAL PRIMARY KEY, + + -- Spring Batch 메타데이터 참조 + job_execution_id BIGINT NOT NULL, + step_execution_id BIGINT, + + -- 스텝 정보 + step_name VARCHAR(100) NOT NULL, + step_type VARCHAR(50), -- 'TASKLET' or 'CHUNK' + + -- 시간 정보 + start_time TIMESTAMP NOT NULL, + end_time TIMESTAMP, + duration_ms BIGINT, -- 소요 시간 (밀리초) + + -- 실행 결과 + status VARCHAR(20) NOT NULL, -- 'STARTED', 'COMPLETED', 'FAILED' + exit_code VARCHAR(20), -- 'COMPLETED', 'FAILED', 'UNKNOWN' + exit_message TEXT, + + -- 에러 정보 + error_message TEXT, + error_stack_trace TEXT, + + -- 처리 통계 (chunk 기반 스텝용) + read_count BIGINT DEFAULT 0, + write_count BIGINT DEFAULT 0, + commit_count BIGINT DEFAULT 0, + rollback_count BIGINT DEFAULT 0, + skip_count BIGINT DEFAULT 0, + + -- 배치 파라미터 정보 + batch_ids TEXT, + inference_id VARCHAR(100), + + -- 메타데이터 + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + +-- 인덱스 생성 +CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_job_exec_id +ON batch_execution_history(job_execution_id); + +CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_step_exec_id +ON batch_execution_history(step_execution_id); + +CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_step_name +ON batch_execution_history(step_name); + +CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_status +ON batch_execution_history(status); + +CREATE INDEX IF NOT EXISTS idx_batch_exec_hist_start_time +ON batch_execution_history(start_time DESC); + +-- 코멘트 추가 +COMMENT ON TABLE batch_execution_history IS '배치 실행 이력 추적 테이블 - 스텝별 시작/종료 시간, 소요 시간, 성공 여부, 에러 사유 기록'; +COMMENT ON COLUMN batch_execution_history.job_execution_id IS 'Spring Batch Job Execution ID'; +COMMENT ON COLUMN batch_execution_history.step_execution_id IS 'Spring Batch Step Execution ID'; +COMMENT ON COLUMN batch_execution_history.step_name IS '스텝 이름 (validateGeometryType, generateShapefile 등)'; +COMMENT ON COLUMN batch_execution_history.duration_ms IS '스텝 소요 시간 (밀리초)'; +COMMENT ON COLUMN batch_execution_history.status IS '실행 상태 (STARTED, COMPLETED, FAILED)'; +COMMENT ON COLUMN batch_execution_history.error_message IS '에러 발생 시 에러 메시지'; +COMMENT ON COLUMN batch_execution_history.error_stack_trace IS '에러 발생 시 전체 스택 트레이스';