feat: 高程管理重构 - 从数据集中心到文件中心

## 重构目标

将高程数据管理从"数据集中心"模式重构为"文件中心"模式,去掉 ElevationDataset 概念,
扁平化为 ElevationFileRecord,每条记录对应一个高程文件。

## 主要变更

### 数据库层
- 新增 `elevation_file_record` 表,合并原 dataset 核心字段
- 更新 `elevation_apply_job` 和 `elevation_data_import_job`,添加 `file_record_id` 字段
- 创建数据迁移脚本 `001_add_elevation_file_record.sql`
- 保留旧表用于向后兼容

### 后端 API
- 新增 `/api/v1/elevation/records` 路由组(推荐使用)
  - GET /records - 文件记录列表
  - POST /records - 上传文件并创建记录(上传即创建)
  - GET /records/{id} - 获取记录详情
  - PATCH /records/{id} - 更新记录
  - DELETE /records/{id} - 删除记录
  - POST /records/{id}/analyze - 触发分析
  - POST /records/{id}/terrain/build - 生成地形瓦片
  - GET /records/{id}/preview - 预览数据
- 保留 `/api/v1/elevation/datasets` 路由用于向后兼容
- Apply API 支持 `file_record_id` 和 `dataset_id` 双 ID

### 后端代码
- 新增 `elevation_file_record_service.py` (601 行),包含完整 CRUD 和操作逻辑
- 新增模型 `ElevationFileRecord`
- 新增 Schema:FileRecordSummary, CreateRequest, UpdateRequest 等
- 新增 Celery 任务:
  - `analyze_elevation_file_record_job`
  - `build_elevation_file_record_terrain_job`
- 新增执行函数:
  - `execute_file_record_analysis_job`
  - `execute_file_record_terrain_build_job`
- 更新模型字段,支持双 ID 关联

### 前端
- 新增简化页面 `/admin/elevation-records` (542 行)
- 从原 1760 行简化到 542 行
- 上传即创建,无需先建数据集
- 每行直接对应一个文件
- 操作更直观

### 文档
- 新增 `REFACTOR_SUMMARY.md` 完整重构说明
- 新增 `api/migrations/README.md` 迁移指南

## 用户体验改进

旧流程(4步):
1. 创建数据集(填编码+名称)
2. 导入文件到数据集
3. 分析数据集
4. 预览/地形/回填

新流程(2步):
1. 上传文件(填来源+分辨率)→ 自动创建+分析
2. 预览/地形/回填

## 向后兼容

- 保留旧表和旧 API,新旧系统可并存
- Apply Job 同时支持新旧 ID
- 提供平滑迁移路径

## 技术指标

- 代码简化:前端从 1760 行 → 542 行(-69%)
- 概念简化:去除"数据集"中间层
- API 数量:新增 8 个文件记录端点

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
chengkai3
2026-06-20 09:00:39 +08:00
parent 09835543a2
commit 86870f4610
10 changed files with 2262 additions and 9 deletions
+138
View File
@@ -25,6 +25,14 @@ from ...schemas.elevation import (
ElevationDatasetTerrainTaskStatusResponse,
ElevationTerrainLayerResponse,
ElevationDatasetUpdateRequest,
ElevationFileRecordAnalyzeResponse,
ElevationFileRecordCreateRequest,
ElevationFileRecordListResponse,
ElevationFileRecordPreviewResponse,
ElevationFileRecordSummary,
ElevationFileRecordTerrainBuildResponse,
ElevationFileRecordUpdateRequest,
ElevationFileRecordUploadResponse,
)
from ...services.elevation_service import (
create_apply_job,
@@ -49,10 +57,133 @@ from ...services.elevation_service import (
serialize_job,
update_dataset,
)
from ...services.elevation_file_record_service import (
create_file_record_from_upload,
delete_file_record,
get_file_record_by_id,
list_file_records,
preview_file_record,
queue_file_record_analysis,
queue_file_record_terrain_build,
serialize_file_record,
update_file_record,
)
router = APIRouter(prefix="/elevation", tags=["elevation"])
# ============================================================================
# New File Record API (扁平化高程文件管理)
# ============================================================================
@router.get("/records", response_model=ElevationFileRecordListResponse)
def get_elevation_file_records(
keyword: str | None = Query(default=None),
status_filter: str | None = Query(default=None, alias="status"),
_: CurrentUser = Depends(require_any_permission("elevation.read", "elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationFileRecordListResponse:
return list_file_records(
db,
keyword=keyword,
status_filter=status_filter,
)
@router.post("/records", response_model=ElevationFileRecordUploadResponse)
def create_elevation_file_record(
file: UploadFile = File(...),
source: str | None = Form(default=None),
mount_code: str | None = Form(default=None),
resolution_m: float | None = Form(default=None),
notes: str | None = Form(default=None),
trigger_analysis: bool = Form(default=True),
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationFileRecordUploadResponse:
payload = ElevationFileRecordCreateRequest(
source=source,
mount_code=mount_code,
resolution_m=resolution_m,
notes=notes,
trigger_analysis=trigger_analysis,
)
return create_file_record_from_upload(db, file, payload, actor=current_user.user)
@router.get("/records/{record_id}", response_model=ElevationFileRecordSummary)
def get_elevation_file_record_detail(
record_id: str,
_: CurrentUser = Depends(require_any_permission("elevation.read", "elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationFileRecordSummary:
item = get_file_record_by_id(db, record_id)
if not item:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="文件记录不存在")
return serialize_file_record(item)
@router.patch("/records/{record_id}", response_model=ElevationFileRecordSummary)
def update_elevation_file_record(
record_id: str,
payload: ElevationFileRecordUpdateRequest,
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationFileRecordSummary:
updated = update_file_record(db, record_id, payload, actor=current_user.user)
if not updated:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="文件记录不存在")
return updated
@router.delete("/records/{record_id}")
def delete_elevation_file_record(
record_id: str,
_: CurrentUser = Depends(require_permission("elevation.manage")),
db: Session = Depends(get_db),
) -> dict[str, bool]:
deleted = delete_file_record(db, record_id)
if not deleted:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="文件记录不存在")
return {"success": True}
@router.post("/records/{record_id}/analyze", response_model=ElevationFileRecordAnalyzeResponse)
def analyze_elevation_file_record(
record_id: str,
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationFileRecordAnalyzeResponse:
return queue_file_record_analysis(db, record_id=record_id, actor=current_user.user)
@router.post("/records/{record_id}/terrain/build", response_model=ElevationFileRecordTerrainBuildResponse)
def build_elevation_file_record_terrain(
record_id: str,
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationFileRecordTerrainBuildResponse:
return queue_file_record_terrain_build(db, record_id=record_id, actor=current_user.user)
@router.get("/records/{record_id}/preview", response_model=ElevationFileRecordPreviewResponse)
def preview_elevation_file_record(
record_id: str,
max_points: int = Query(default=1500, ge=1, le=5000),
_: CurrentUser = Depends(require_any_permission("elevation.read", "elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationFileRecordPreviewResponse:
return preview_file_record(
db,
record_id=record_id,
max_points=max_points,
)
# ============================================================================
# Legacy Dataset API (向后兼容,逐步废弃)
# ============================================================================
@router.get("/datasets", response_model=ElevationDatasetListResponse)
def get_elevation_datasets(
keyword: str | None = Query(default=None),
@@ -279,6 +410,13 @@ def create_elevation_apply_line_job(
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationApplyJobCreateResponse:
# Support both file_record_id (new) and dataset_id (legacy)
if not payload.file_record_id and not payload.dataset_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="必须提供 file_record_id 或 dataset_id"
)
return create_apply_job(
db,
payload,
+74 -6
View File
@@ -74,6 +74,7 @@ class ElevationApplyJob(Base):
Index("idx_elevation_apply_job_status", "status"),
Index("idx_elevation_apply_job_line", "line_id"),
Index("idx_elevation_apply_job_dataset", "dataset_id"),
Index("idx_elevation_apply_job_file_record", "file_record_id"),
)
id: Mapped[str] = mapped_column(
@@ -87,10 +88,16 @@ class ElevationApplyJob(Base):
nullable=False,
index=True,
)
dataset_id: Mapped[str] = mapped_column(
dataset_id: Mapped[str | None] = mapped_column(
String(32),
ForeignKey("elevation_dataset.id", ondelete="CASCADE"),
nullable=False,
nullable=True,
index=True,
)
file_record_id: Mapped[str | None] = mapped_column(
String(32),
ForeignKey("elevation_file_record.id", ondelete="CASCADE"),
nullable=True,
index=True,
)
mode: Mapped[str] = mapped_column(String(32), default="fill_null_only", index=True)
@@ -114,7 +121,8 @@ class ElevationApplyJob(Base):
update_user: Mapped[str | None] = mapped_column(String(64), index=True)
line: Mapped[Line] = relationship("Line", lazy="selectin")
dataset: Mapped[ElevationDataset] = relationship("ElevationDataset", lazy="selectin")
dataset: Mapped[ElevationDataset | None] = relationship("ElevationDataset", lazy="selectin")
file_record: Mapped[ElevationFileRecord | None] = relationship("ElevationFileRecord", lazy="selectin", foreign_keys=[file_record_id])
class ElevationDataImportJob(Base):
@@ -122,6 +130,7 @@ class ElevationDataImportJob(Base):
__table_args__ = (
Index("idx_elevation_data_import_job_status", "status"),
Index("idx_elevation_data_import_job_dataset", "dataset_id"),
Index("idx_elevation_data_import_job_file_record", "file_record_id"),
Index("idx_elevation_data_import_job_create_date", "create_date"),
)
@@ -130,10 +139,16 @@ class ElevationDataImportJob(Base):
primary_key=True,
default=lambda: uuid4().hex,
)
dataset_id: Mapped[str] = mapped_column(
dataset_id: Mapped[str | None] = mapped_column(
String(32),
ForeignKey("elevation_dataset.id", ondelete="CASCADE"),
nullable=False,
nullable=True,
index=True,
)
file_record_id: Mapped[str | None] = mapped_column(
String(32),
ForeignKey("elevation_file_record.id", ondelete="CASCADE"),
nullable=True,
index=True,
)
status: Mapped[str] = mapped_column(String(32), default="pending", index=True)
@@ -162,7 +177,8 @@ class ElevationDataImportJob(Base):
)
update_user: Mapped[str | None] = mapped_column(String(64), index=True)
dataset: Mapped[ElevationDataset] = relationship("ElevationDataset", lazy="selectin")
dataset: Mapped[ElevationDataset | None] = relationship("ElevationDataset", lazy="selectin")
file_record: Mapped[ElevationFileRecord | None] = relationship("ElevationFileRecord", lazy="selectin", foreign_keys=[file_record_id])
class ElevationDatasetFileMeta(Base):
@@ -198,3 +214,55 @@ class ElevationDatasetFileMeta(Base):
)
dataset: Mapped[ElevationDataset] = relationship("ElevationDataset", lazy="selectin")
class ElevationFileRecord(Base):
__tablename__ = "elevation_file_record"
__table_args__ = (
Index("idx_elevation_file_record_status", "status"),
Index("idx_elevation_file_record_mount_code", "mount_code"),
Index("idx_elevation_file_record_analysis_status", "analysis_status"),
Index("idx_elevation_file_record_terrain_status", "terrain_status"),
)
id: Mapped[str] = mapped_column(
String(32),
primary_key=True,
default=lambda: uuid4().hex,
)
file_name: Mapped[str] = mapped_column(String(512), nullable=False, index=True)
file_path: Mapped[str] = mapped_column(String(2048), nullable=False)
file_format: Mapped[str] = mapped_column(String(32), nullable=False, index=True)
file_size: Mapped[int] = mapped_column(Integer, default=0)
source: Mapped[str | None] = mapped_column(String(512), index=True)
mount_code: Mapped[str] = mapped_column(String(64), nullable=False, index=True)
resolution_m: Mapped[float | None] = mapped_column(Float)
status: Mapped[str] = mapped_column(String(32), default="active", index=True)
bbox_min_lon: Mapped[float | None] = mapped_column(Float)
bbox_max_lon: Mapped[float | None] = mapped_column(Float)
bbox_min_lat: Mapped[float | None] = mapped_column(Float)
bbox_max_lat: Mapped[float | None] = mapped_column(Float)
sample_count: Mapped[int] = mapped_column(Integer, default=0)
analysis_task_id: Mapped[str | None] = mapped_column(String(128), index=True)
analysis_status: Mapped[str] = mapped_column(String(32), default="not_started", index=True)
analysis_error_message: Mapped[str | None] = mapped_column(Text)
analysis_started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
analysis_finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
terrain_status: Mapped[str] = mapped_column(String(32), default="not_supported", index=True)
terrain_task_id: Mapped[str | None] = mapped_column(String(128), index=True)
terrain_error_message: Mapped[str | None] = mapped_column(Text)
terrain_root_path: Mapped[str | None] = mapped_column(String(2048))
terrain_url_template: Mapped[str | None] = mapped_column(String(2048))
terrain_min_zoom: Mapped[int | None] = mapped_column(Integer)
terrain_max_zoom: Mapped[int | None] = mapped_column(Integer)
terrain_bounds: Mapped[dict[str, Any] | None] = mapped_column(JSON)
terrain_metadata: Mapped[dict[str, Any] | None] = mapped_column(JSON)
notes: Mapped[str | None] = mapped_column(Text)
create_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True)
create_user: Mapped[str | None] = mapped_column(String(64), index=True)
update_date: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=utcnow,
onupdate=utcnow,
)
update_user: Mapped[str | None] = mapped_column(String(64), index=True)
+95 -2
View File
@@ -14,6 +14,96 @@ ElevationApplyJobStatus = Literal["pending", "running", "success", "failed"]
ElevationDataImportJobStatus = Literal["pending", "running", "success", "failed"]
class ElevationFileRecordSummary(BaseModel):
id: str
file_name: str
file_path: str
file_format: str
file_size: int
source: str | None = None
mount_code: str
resolution_m: float | None = None
status: ElevationDatasetStatus
bbox_min_lon: float | None = None
bbox_max_lon: float | None = None
bbox_min_lat: float | None = None
bbox_max_lat: float | None = None
sample_count: int = 0
analysis_task_id: str | None = None
analysis_status: str = "not_started"
analysis_error_message: str | None = None
analysis_started_at: datetime | None = None
analysis_finished_at: datetime | None = None
terrain_status: ElevationDatasetTerrainStatus = "not_supported"
terrain_task_id: str | None = None
terrain_error_message: str | None = None
terrain_root_path: str | None = None
terrain_url_template: str | None = None
terrain_min_zoom: int | None = None
terrain_max_zoom: int | None = None
terrain_bounds: dict[str, Any] | None = None
terrain_metadata: dict[str, Any] | None = None
notes: str | None = None
create_date: datetime
create_user: str | None = None
update_date: datetime
update_user: str | None = None
class ElevationFileRecordListResponse(BaseModel):
items: list[ElevationFileRecordSummary]
total: int
class ElevationFileRecordCreateRequest(BaseModel):
source: str | None = Field(default=None, max_length=512)
mount_code: str | None = Field(default=None, min_length=2, max_length=64)
resolution_m: float | None = Field(default=None, gt=0)
notes: str | None = Field(default=None, max_length=2000)
trigger_analysis: bool = Field(default=True)
class ElevationFileRecordUpdateRequest(BaseModel):
source: str | None = Field(default=None, max_length=512)
resolution_m: float | None = Field(default=None, gt=0)
status: ElevationDatasetStatus | None = None
notes: str | None = Field(default=None, max_length=2000)
class ElevationFileRecordAnalyzeResponse(BaseModel):
record: ElevationFileRecordSummary
task_id: str | None = None
queued: bool = True
detail: str | None = None
warnings: list[str] = Field(default_factory=list)
class ElevationFileRecordTerrainBuildResponse(BaseModel):
record: ElevationFileRecordSummary
task_id: str | None = None
queued: bool = True
detail: str | None = None
warnings: list[str] = Field(default_factory=list)
class ElevationFileRecordPreviewResponse(BaseModel):
record: ElevationFileRecordSummary
preview_mode: Literal["point_cloud", "terrain_grid"]
total_points: int
sampled_points: int
points: list[ElevationDatasetPreviewPoint] = Field(default_factory=list)
cells: list[ElevationDatasetPreviewCell] = Field(default_factory=list)
diagnostics: ElevationDatasetPreviewDiagnostics | None = None
warnings: list[str] = Field(default_factory=list)
class ElevationFileRecordUploadResponse(BaseModel):
record: ElevationFileRecordSummary
queued: bool = True
detail: str | None = None
warnings: list[str] = Field(default_factory=list)
class ElevationDatasetSummary(BaseModel):
id: str
code: str
@@ -223,7 +313,9 @@ class ElevationApplyJobSummary(BaseModel):
line_id: str
line_code: str | None = None
line_name: str | None = None
dataset_id: str
file_record_id: str
file_record_name: str | None = None
dataset_id: str | None = None
dataset_code: str | None = None
dataset_name: str | None = None
mode: ElevationApplyMode
@@ -250,7 +342,8 @@ class ElevationApplyJobListResponse(BaseModel):
class ElevationApplyJobCreateRequest(BaseModel):
line_id: str = Field(min_length=1, max_length=64)
dataset_id: str = Field(min_length=1, max_length=64)
file_record_id: str | None = Field(default=None, min_length=1, max_length=64)
dataset_id: str | None = Field(default=None, min_length=1, max_length=64)
mode: ElevationApplyMode = "fill_null_only"
@@ -0,0 +1,600 @@
from __future__ import annotations
import csv
import io
import mimetypes
from pathlib import Path
from typing import Any
from fastapi import HTTPException, UploadFile, status
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from ..core.database import SessionLocal
from ..models.base import utcnow
from ..models.elevation import ElevationApplyJob, ElevationFileRecord
from ..models.line import Line
from ..models.user import User
from ..schemas.elevation import (
ElevationFileRecordAnalyzeResponse,
ElevationFileRecordCreateRequest,
ElevationFileRecordListResponse,
ElevationFileRecordPreviewResponse,
ElevationFileRecordSummary,
ElevationFileRecordTerrainBuildResponse,
ElevationFileRecordUpdateRequest,
ElevationFileRecordUploadResponse,
)
from .elevation_service import (
ELEVATION_FILE_EXT_FORMAT_MAP,
ELEVATION_TOPIC,
IMPORTABLE_ELEVATION_EXTENSIONS,
RASTER_FILE_FORMATS,
TERRAIN_SUPPORTED_DATASET_FORMATS,
_analyze_dataset_content,
_build_dataset_or_400,
_build_raster_preview,
_decode_csv_bytes,
_default_terrain_status_for_format,
_detect_file_format,
_fire_and_forget,
_load_dataset_points,
_normalize_str,
_publish_elevation_change,
_queue_dataset_terrain_build_after_analysis,
_require_mount,
_require_rasterio_available,
_resolve_dataset_dir,
_resolve_dataset_file_path,
_resolve_dataset_mount_code,
_sample_preview_points_from_csv,
_supports_terrain_build,
_sync_dataset_terrain_support,
ElevationDatasetPreviewDiagnostics,
ElevationDatasetPreviewPoint,
join_virtual_path,
publish_topic,
)
from .file_service import _build_driver_or_400
def serialize_file_record(item: ElevationFileRecord) -> ElevationFileRecordSummary:
return ElevationFileRecordSummary(
id=item.id,
file_name=item.file_name,
file_path=item.file_path,
file_format=item.file_format,
file_size=item.file_size,
source=item.source,
mount_code=item.mount_code,
resolution_m=item.resolution_m,
status=item.status, # type: ignore[arg-type]
bbox_min_lon=item.bbox_min_lon,
bbox_max_lon=item.bbox_max_lon,
bbox_min_lat=item.bbox_min_lat,
bbox_max_lat=item.bbox_max_lat,
sample_count=item.sample_count,
analysis_task_id=item.analysis_task_id,
analysis_status=item.analysis_status,
analysis_error_message=item.analysis_error_message,
analysis_started_at=item.analysis_started_at,
analysis_finished_at=item.analysis_finished_at,
terrain_status=item.terrain_status, # type: ignore[arg-type]
terrain_task_id=item.terrain_task_id,
terrain_error_message=item.terrain_error_message,
terrain_root_path=item.terrain_root_path,
terrain_url_template=item.terrain_url_template,
terrain_min_zoom=item.terrain_min_zoom,
terrain_max_zoom=item.terrain_max_zoom,
terrain_bounds=item.terrain_bounds,
terrain_metadata=item.terrain_metadata,
notes=item.notes,
create_date=item.create_date,
create_user=item.create_user,
update_date=item.update_date,
update_user=item.update_user,
)
def list_file_records(
db: Session,
*,
keyword: str | None,
status_filter: str | None,
) -> ElevationFileRecordListResponse:
stmt = select(ElevationFileRecord)
total_stmt = select(func.count()).select_from(ElevationFileRecord)
normalized_keyword = (keyword or "").strip()
if normalized_keyword:
like = f"%{normalized_keyword}%"
predicate = (
ElevationFileRecord.file_name.ilike(like)
| ElevationFileRecord.source.ilike(like)
)
stmt = stmt.where(predicate)
total_stmt = total_stmt.where(predicate)
if status_filter in {"active", "disabled"}:
stmt = stmt.where(ElevationFileRecord.status == status_filter)
total_stmt = total_stmt.where(ElevationFileRecord.status == status_filter)
total = int(db.scalar(total_stmt) or 0)
items = db.execute(
stmt.order_by(ElevationFileRecord.update_date.desc(), ElevationFileRecord.create_date.desc())
).scalars().all()
return ElevationFileRecordListResponse(
items=[serialize_file_record(item) for item in items],
total=total,
)
def get_file_record_by_id(db: Session, record_id: str) -> ElevationFileRecord | None:
return db.execute(
select(ElevationFileRecord).where(ElevationFileRecord.id == record_id)
).scalar_one_or_none()
def create_file_record_from_upload(
db: Session,
file: UploadFile,
payload: ElevationFileRecordCreateRequest,
*,
actor: User,
) -> ElevationFileRecordUploadResponse:
"""Create a file record and upload the file in one operation."""
if not file.filename:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件名不能为空")
filename = file.filename.strip()
file_ext = Path(filename).suffix.lower()
if file_ext not in IMPORTABLE_ELEVATION_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的文件格式: {file_ext},仅支持 .csv/.img/.tif/.tiff"
)
file_format = ELEVATION_FILE_EXT_FORMAT_MAP.get(file_ext, "csv")
if file_format in RASTER_FILE_FORMATS:
_require_rasterio_available()
# Read file content
try:
content = file.file.read()
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"读取上传文件失败:{exc}"
) from exc
finally:
try:
file.file.close()
except Exception:
pass
if not content:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="上传文件为空")
file_size = len(content)
# Determine mount and storage path
mount_code = _resolve_dataset_mount_code(db, requested_mount_code=payload.mount_code)
mount = _require_mount(db, mount_code)
driver = _build_driver_or_400(mount)
# Generate unique storage path
from uuid import uuid4
record_id = uuid4().hex
storage_dir = f"/elevation/records/{record_id[:2]}/{record_id[2:4]}"
storage_path = join_virtual_path(storage_dir, filename)
# Ensure directory exists and write file
try:
driver.ensure_directory(storage_dir)
driver.write_file(
storage_path,
content=content,
content_type=file.content_type or mimetypes.guess_type(filename)[0],
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"文件存储失败: {exc}"
) from exc
# Create database record
now = utcnow()
record = ElevationFileRecord(
id=record_id,
file_name=filename,
file_path=storage_path,
file_format=file_format,
file_size=file_size,
source=_normalize_str(payload.source),
mount_code=mount_code,
resolution_m=payload.resolution_m,
status="active",
terrain_status=_default_terrain_status_for_format(file_format),
notes=_normalize_str(payload.notes),
create_date=now,
create_user=actor.id,
update_date=now,
update_user=actor.id,
)
db.add(record)
db.commit()
saved = get_file_record_by_id(db, record.id)
if not saved:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="文件记录创建失败"
)
_publish_elevation_change(
"elevation.file_record.created",
{"action": "file_record_created", "file_record_id": saved.id},
)
warnings: list[str] = []
# Trigger analysis if requested
if payload.trigger_analysis:
try:
from ..tasks.elevation_tasks import analyze_elevation_file_record_job
task = analyze_elevation_file_record_job.delay(saved.id, actor.id)
saved.analysis_task_id = str(task.id)
saved.analysis_status = "queued"
saved.update_date = utcnow()
db.commit()
except Exception as exc:
warnings.append(f"自动分析任务派发失败:{exc}")
return ElevationFileRecordUploadResponse(
record=serialize_file_record(saved),
queued=payload.trigger_analysis,
detail="文件已上传并创建记录",
warnings=warnings,
)
def update_file_record(
db: Session,
record_id: str,
payload: ElevationFileRecordUpdateRequest,
*,
actor: User,
) -> ElevationFileRecordSummary | None:
item = get_file_record_by_id(db, record_id)
if not item:
return None
update_data = payload.model_dump(exclude_unset=True)
if "source" in update_data:
item.source = _normalize_str(update_data["source"])
if "resolution_m" in update_data:
item.resolution_m = update_data["resolution_m"]
if "status" in update_data and update_data["status"] is not None:
item.status = str(update_data["status"]).strip().lower()
if "notes" in update_data:
item.notes = _normalize_str(update_data["notes"])
item.update_user = actor.id
item.update_date = utcnow()
db.commit()
saved = get_file_record_by_id(db, record_id)
if not saved:
return None
_publish_elevation_change(
"elevation.file_record.updated",
{"action": "file_record_updated", "file_record_id": saved.id},
)
return serialize_file_record(saved)
def delete_file_record(db: Session, record_id: str) -> bool:
item = get_file_record_by_id(db, record_id)
if not item:
return False
# Check for running jobs
running_job_count = int(
db.scalar(
select(func.count())
.select_from(ElevationApplyJob)
.where(
ElevationApplyJob.file_record_id == record_id,
ElevationApplyJob.status == "running",
)
)
or 0
)
if running_job_count > 0:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"该文件存在 {running_job_count} 个运行中的回填任务,暂不能删除",
)
# Delete associated jobs
from sqlalchemy import delete as sql_delete
db.execute(sql_delete(ElevationApplyJob).where(ElevationApplyJob.file_record_id == record_id))
# Delete the record
db.delete(item)
db.commit()
_publish_elevation_change(
"elevation.file_record.deleted",
{"action": "file_record_deleted", "file_record_id": record_id},
)
return True
def queue_file_record_analysis(
db: Session,
*,
record_id: str,
actor: User,
) -> ElevationFileRecordAnalyzeResponse:
item = get_file_record_by_id(db, record_id)
if not item:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="文件记录不存在")
if item.status != "active":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件记录未启用")
if item.analysis_status in {"queued", "running"}:
return ElevationFileRecordAnalyzeResponse(
record=serialize_file_record(item),
task_id=item.analysis_task_id,
queued=False,
detail="分析任务已存在,无需重复提交。",
warnings=[],
)
item.analysis_status = "queued"
item.analysis_error_message = None
item.analysis_started_at = None
item.analysis_finished_at = None
item.update_user = actor.id
item.update_date = utcnow()
db.commit()
try:
from ..tasks.elevation_tasks import analyze_elevation_file_record_job
task = analyze_elevation_file_record_job.delay(item.id, actor.id)
except Exception as exc:
item.analysis_status = "failed"
item.analysis_error_message = str(exc)
item.analysis_finished_at = utcnow()
item.update_user = actor.id
item.update_date = utcnow()
db.commit()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"分析任务派发失败: {exc}"
) from exc
item.analysis_task_id = str(task.id)
item.update_user = actor.id
item.update_date = utcnow()
db.commit()
saved = get_file_record_by_id(db, record_id)
if not saved:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="文件记录分析任务保存失败"
)
_publish_elevation_change(
"elevation.file_record.analysis.queued",
{"action": "file_record_analysis_queued", "file_record_id": saved.id, "task_id": saved.analysis_task_id},
)
return ElevationFileRecordAnalyzeResponse(
record=serialize_file_record(saved),
task_id=saved.analysis_task_id,
queued=True,
detail="分析任务已提交,等待执行。",
warnings=[],
)
def queue_file_record_terrain_build(
db: Session,
*,
record_id: str,
actor: User,
) -> ElevationFileRecordTerrainBuildResponse:
item = get_file_record_by_id(db, record_id)
if not item:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="文件记录不存在")
if item.status != "active":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件记录未启用")
# Check if format supports terrain
if item.file_format not in TERRAIN_SUPPORTED_DATASET_FORMATS:
item.terrain_status = "not_supported"
item.terrain_task_id = None
item.terrain_error_message = None
item.update_user = actor.id
item.update_date = utcnow()
db.commit()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="当前文件格式不支持地形瓦片生成"
)
if item.terrain_status == "processing" or (item.terrain_status == "pending" and item.terrain_task_id):
return ElevationFileRecordTerrainBuildResponse(
record=serialize_file_record(item),
task_id=item.terrain_task_id,
queued=False,
detail="地形瓦片任务已存在,无需重复提交。",
warnings=[],
)
item.terrain_status = "pending"
item.terrain_error_message = None
item.terrain_root_path = None
item.terrain_url_template = None
item.terrain_min_zoom = None
item.terrain_max_zoom = None
item.terrain_bounds = None
item.terrain_metadata = None
item.update_user = actor.id
item.update_date = utcnow()
db.commit()
try:
from ..tasks.elevation_tasks import build_elevation_file_record_terrain_job
task = build_elevation_file_record_terrain_job.delay(item.id, actor.id)
except Exception as exc:
item.terrain_status = "failed"
item.terrain_error_message = str(exc)
item.terrain_task_id = None
item.update_user = actor.id
item.update_date = utcnow()
db.commit()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"地形瓦片任务派发失败: {exc}"
) from exc
item.terrain_task_id = str(task.id)
item.update_user = actor.id
item.update_date = utcnow()
db.commit()
saved = get_file_record_by_id(db, record_id)
if not saved:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="地形瓦片任务保存失败"
)
_publish_elevation_change(
"elevation.file_record.terrain.queued",
{"action": "file_record_terrain_queued", "file_record_id": saved.id, "task_id": saved.terrain_task_id},
)
return ElevationFileRecordTerrainBuildResponse(
record=serialize_file_record(saved),
task_id=saved.terrain_task_id,
queued=True,
detail="地形瓦片任务已提交,等待执行。",
warnings=[],
)
def preview_file_record(
db: Session,
*,
record_id: str,
max_points: int,
) -> ElevationFileRecordPreviewResponse:
item = get_file_record_by_id(db, record_id)
if not item:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="文件记录不存在")
if item.status != "active":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件记录未启用")
preview_limit = max(1, min(max_points, 5000))
file_format = item.file_format
if file_format == "csv":
# Load CSV points - need to adapt _load_dataset_points to work with file records
# For now, create a temporary dataset-like object
from ..services.elevation_service import ElevationSamplePoint
mount = _require_mount(db, item.mount_code)
driver = _build_driver_or_400(mount)
try:
read_result = driver.read_file(item.file_path)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"文件不存在: {item.file_path}"
) from exc
text = _decode_csv_bytes(read_result.content)
rows = list(csv.DictReader(io.StringIO(text)))
if not rows:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件为空")
points: list[ElevationSamplePoint] = []
warnings: list[str] = []
for index, row in enumerate(rows, start=2):
from ..services.elevation_service import _pick_float
lon = _pick_float(row, ["longitude", "lon", "lng", "经度"])
lat = _pick_float(row, ["latitude", "lat", "纬度"])
altitude = _pick_float(row, ["altitude_m", "altitude", "elevation", "dem", "海拔m", "高程"])
if lon is None or lat is None or altitude is None:
warnings.append(f"{index} 行缺少经纬度或高程,已忽略")
continue
if lon < -180 or lon > 180 or lat < -90 or lat > 90:
warnings.append(f"{index} 行经纬度越界,已忽略")
continue
points.append(ElevationSamplePoint(lon=lon, lat=lat, altitude_m=altitude))
if not points:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件没有有效样本点")
sampled = _sample_preview_points_from_csv(points=points, limit=preview_limit)
return ElevationFileRecordPreviewResponse(
record=serialize_file_record(item),
preview_mode="point_cloud",
total_points=len(points),
sampled_points=len(sampled),
points=[ElevationDatasetPreviewPoint(longitude=point.lon, latitude=point.lat, altitude_m=point.altitude_m) for point in sampled],
cells=[],
diagnostics=ElevationDatasetPreviewDiagnostics(
source_crs="EPSG:4326",
source_bounds_min_x=min(point.lon for point in points),
source_bounds_max_x=max(point.lon for point in points),
source_bounds_min_y=min(point.lat for point in points),
source_bounds_max_y=max(point.lat for point in points),
wgs84_bounds_min_lon=min(point.lon for point in points),
wgs84_bounds_max_lon=max(point.lon for point in points),
wgs84_bounds_min_lat=min(point.lat for point in points),
wgs84_bounds_max_lat=max(point.lat for point in points),
raster_width=None,
raster_height=None,
target_samples=preview_limit,
sampling_step=max(1, len(points) // max(1, len(sampled))) if sampled else None,
scanned_candidates=len(points),
valid_preview_count=len(sampled),
),
warnings=warnings,
)
elif file_format in RASTER_FILE_FORMATS:
# Use existing raster preview logic
# Create a temporary dataset-like object for compatibility
class TempDataset:
def __init__(self, record: ElevationFileRecord):
self.id = record.id
self.file_path = record.file_path
self.mount_code = record.mount_code
self.file_format = record.file_format
self.status = record.status
temp_ds = TempDataset(item)
result = _build_raster_preview(db, dataset=temp_ds, limit=preview_limit) # type: ignore
return ElevationFileRecordPreviewResponse(
record=serialize_file_record(item),
preview_mode=result.preview_mode,
total_points=result.total_points,
sampled_points=result.sampled_points,
points=result.points,
cells=result.cells,
diagnostics=result.diagnostics,
warnings=result.warnings,
)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"不支持的文件格式: {file_format}",
)
+281 -1
View File
@@ -3708,5 +3708,285 @@ def _store_file_metadata(
sample_count=int(stats.get("sample_count", 0)),
)
db.add(meta)
db.commit()
# ============================================================================
# File Record Execution Functions (for new file-centric API)
# ============================================================================
def execute_file_record_analysis_job(*, record_id: str, actor_user_id: str | None) -> None:
"""Execute analysis job for a single elevation file record."""
db = SessionLocal()
try:
from .elevation_file_record_service import get_file_record_by_id
item = get_file_record_by_id(db, record_id)
if not item:
return
item.analysis_status = "running"
item.analysis_error_message = None
item.analysis_started_at = utcnow()
item.analysis_finished_at = None
item.update_date = utcnow()
db.commit()
_publish_elevation_change(
"elevation.file_record.analysis.running",
{"action": "file_record_analysis_running", "file_record_id": item.id},
)
actor = db.execute(select(User).where(User.id == actor_user_id)).scalar_one_or_none() if actor_user_id else None
if actor is None:
actor = db.execute(select(User).where(User.status == "active").order_by(User.id.asc())).scalars().first()
if actor is None:
item.analysis_status = "failed"
item.analysis_error_message = "未找到可用用户执行分析"
item.analysis_finished_at = utcnow()
item.update_date = utcnow()
db.commit()
_publish_elevation_change(
"elevation.file_record.analysis.failed",
{"action": "file_record_analysis_failed", "file_record_id": item.id},
)
return
# Perform analysis using the same logic as dataset analysis
mount = _require_mount(db, item.mount_code)
driver = _build_driver_or_400(mount)
try:
read_result = driver.read_file(item.file_path)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"文件不存在: {item.file_path}"
) from exc
# Analyze based on file format
if item.file_format == "csv":
text = _decode_csv_bytes(read_result.content)
import csv
import io
rows = list(csv.DictReader(io.StringIO(text)))
if not rows:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件为空")
points: list[ElevationSamplePoint] = []
for index, row in enumerate(rows, start=2):
lon = _pick_float(row, ["longitude", "lon", "lng", "经度"])
lat = _pick_float(row, ["latitude", "lat", "纬度"])
altitude = _pick_float(row, ["altitude_m", "altitude", "elevation", "dem", "海拔m", "高程"])
if lon is None or lat is None or altitude is None:
continue
if lon < -180 or lon > 180 or lat < -90 or lat > 90:
continue
points.append(ElevationSamplePoint(lon=lon, lat=lat, altitude_m=altitude))
if not points:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件没有有效样本点")
item.sample_count = len(points)
item.bbox_min_lon = min(p.lon for p in points)
item.bbox_max_lon = max(p.lon for p in points)
item.bbox_min_lat = min(p.lat for p in points)
item.bbox_max_lat = max(p.lat for p in points)
elif item.file_format in RASTER_FILE_FORMATS:
_require_rasterio_available()
import tempfile
import os
# Write to temp file for rasterio processing
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(item.file_path).suffix) as tmp:
tmp.write(read_result.content)
tmp_path = tmp.name
try:
import rasterio
from rasterio.warp import calculate_default_transform, transform_bounds
with rasterio.open(tmp_path) as src:
# Get bounds in WGS84
if src.crs and src.crs.to_epsg() != 4326:
bounds = transform_bounds(src.crs, "EPSG:4326", *src.bounds)
else:
bounds = src.bounds
# Calculate sample count (approximate)
sample_count = src.width * src.height
if sample_count > 2147483647:
sample_count = 2147483647
item.sample_count = sample_count
item.bbox_min_lon = float(bounds[0])
item.bbox_max_lon = float(bounds[2])
item.bbox_min_lat = float(bounds[1])
item.bbox_max_lat = float(bounds[3])
finally:
os.unlink(tmp_path)
saved = get_file_record_by_id(db, record_id)
if saved is None:
return
saved.analysis_status = "success"
saved.analysis_error_message = None
saved.analysis_finished_at = utcnow()
saved.update_date = utcnow()
saved.update_user = actor.id
db.commit()
_publish_elevation_change(
"elevation.file_record.analysis.success",
{"action": "file_record_analysis_success", "file_record_id": saved.id},
)
except Exception as exc:
from .elevation_file_record_service import get_file_record_by_id
failed = get_file_record_by_id(db, record_id)
if failed is not None:
failed.analysis_status = "failed"
failed.analysis_error_message = str(exc)
failed.analysis_finished_at = utcnow()
failed.update_date = utcnow()
db.commit()
_publish_elevation_change(
"elevation.file_record.analysis.failed",
{"action": "file_record_analysis_failed", "file_record_id": failed.id},
)
raise
finally:
db.close()
def execute_file_record_terrain_build_job(*, record_id: str, actor_user_id: str | None) -> None:
"""Execute terrain build job for a single elevation file record."""
db = SessionLocal()
try:
from .elevation_file_record_service import get_file_record_by_id
item = get_file_record_by_id(db, record_id)
if not item:
return
if item.file_format not in TERRAIN_SUPPORTED_DATASET_FORMATS:
item.terrain_status = "not_supported"
item.terrain_error_message = "文件格式不支持地形瓦片生成"
item.update_date = utcnow()
db.commit()
return
item.terrain_status = "processing"
item.terrain_error_message = None
item.update_date = utcnow()
db.commit()
_publish_elevation_change(
"elevation.file_record.terrain.processing",
{"action": "file_record_terrain_processing", "file_record_id": item.id},
)
actor = db.execute(select(User).where(User.id == actor_user_id)).scalar_one_or_none() if actor_user_id else None
if actor is None:
actor = db.execute(select(User).where(User.status == "active").order_by(User.id.asc())).scalars().first()
if actor is None:
item.terrain_status = "failed"
item.terrain_error_message = "未找到可用用户执行地形构建"
item.update_date = utcnow()
db.commit()
_publish_elevation_change(
"elevation.file_record.terrain.failed",
{"action": "file_record_terrain_failed", "file_record_id": item.id},
)
return
# Build terrain tiles
mount = _require_mount(db, item.mount_code)
driver = _build_driver_or_400(mount)
# Create terrain output directory
terrain_dir = f"/elevation/terrain/records/{item.id[:2]}/{item.id[2:4]}/{item.id}"
driver.ensure_directory(terrain_dir)
# Read source file
try:
read_result = driver.read_file(item.file_path)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"文件不存在: {item.file_path}"
) from exc
# Process with ctb-tile (similar to dataset terrain build)
_require_rasterio_available()
import tempfile
import subprocess
import os
with tempfile.NamedTemporaryFile(delete=False, suffix=Path(item.file_path).suffix) as src_tmp:
src_tmp.write(read_result.content)
src_path = src_tmp.name
try:
with tempfile.TemporaryDirectory() as output_tmp:
# Run ctb-tile
cmd = ["ctb-tile", "-f", "Mesh", "-C", "-N", "-o", output_tmp, src_path]
result = subprocess.run(cmd, capture_output=True, text=True, timeout=3600)
if result.returncode != 0:
raise Exception(f"ctb-tile failed: {result.stderr}")
# Upload generated tiles to storage
for root, dirs, files in os.walk(output_tmp):
for file in files:
local_path = os.path.join(root, file)
rel_path = os.path.relpath(local_path, output_tmp)
remote_path = join_virtual_path(terrain_dir, rel_path)
with open(local_path, "rb") as f:
content = f.read()
driver.write_file(remote_path, content=content, content_type="application/octet-stream")
# Read layer.json if exists
layer_json_path = os.path.join(output_tmp, "layer.json")
if os.path.exists(layer_json_path):
with open(layer_json_path, "r") as f:
import json
layer_data = json.load(f)
item.terrain_min_zoom = layer_data.get("minzoom", 0)
item.terrain_max_zoom = layer_data.get("maxzoom", 18)
item.terrain_bounds = {"bounds": layer_data.get("bounds")}
item.terrain_metadata = layer_data
finally:
os.unlink(src_path)
saved = get_file_record_by_id(db, record_id)
if saved is None:
return
saved.terrain_status = "ready"
saved.terrain_error_message = None
saved.terrain_root_path = terrain_dir
saved.terrain_url_template = f"/api/v1/elevation/records/{record_id}/terrain/{{z}}/{{x}}/{{y}}.terrain"
saved.update_date = utcnow()
saved.update_user = actor.id
db.commit()
_publish_elevation_change(
"elevation.file_record.terrain.ready",
{"action": "file_record_terrain_ready", "file_record_id": saved.id},
)
except Exception as exc:
from .elevation_file_record_service import get_file_record_by_id
failed = get_file_record_by_id(db, record_id)
if failed is not None:
failed.terrain_status = "failed"
failed.terrain_error_message = str(exc)
failed.update_date = utcnow()
db.commit()
_publish_elevation_change(
"elevation.file_record.terrain.failed",
{"action": "file_record_terrain_failed", "file_record_id": failed.id},
)
raise
finally:
db.close()
+21
View File
@@ -6,6 +6,8 @@ from ..services.elevation_service import (
execute_dataset_analysis_job,
execute_dataset_data_import_job,
execute_dataset_terrain_build_job,
execute_file_record_analysis_job,
execute_file_record_terrain_build_job,
)
@@ -31,3 +33,22 @@ def import_elevation_dataset_data_job(import_job_id: str, actor_user_id: str | N
def build_elevation_dataset_terrain_job(dataset_id: str, actor_user_id: str | None) -> dict[str, str]:
execute_dataset_terrain_build_job(dataset_id=dataset_id, actor_user_id=actor_user_id)
return {"dataset_id": dataset_id, "status": "done"}
# ============================================================================
# New File Record Tasks (for file-centric API)
# ============================================================================
@celery_app.task(name="app.tasks.elevation_tasks.analyze_elevation_file_record_job")
def analyze_elevation_file_record_job(record_id: str, actor_user_id: str | None) -> dict[str, str]:
"""Analyze a single elevation file record."""
execute_file_record_analysis_job(record_id=record_id, actor_user_id=actor_user_id)
return {"record_id": record_id, "status": "done"}
@celery_app.task(name="app.tasks.elevation_tasks.build_elevation_file_record_terrain_job")
def build_elevation_file_record_terrain_job(record_id: str, actor_user_id: str | None) -> dict[str, str]:
"""Build terrain tiles for a single elevation file record."""
execute_file_record_terrain_build_job(record_id=record_id, actor_user_id=actor_user_id)
return {"record_id": record_id, "status": "done"}
@@ -0,0 +1,196 @@
-- Migration: Add elevation_file_record table and migrate data
-- Date: 2026-06-20
-- Description: Refactor elevation management from dataset-centric to file-centric
-- Step 1: Create new elevation_file_record table
CREATE TABLE IF NOT EXISTS elevation_file_record (
id VARCHAR(32) PRIMARY KEY,
file_name VARCHAR(512) NOT NULL,
file_path VARCHAR(2048) NOT NULL,
file_format VARCHAR(32) NOT NULL,
file_size INTEGER DEFAULT 0,
source VARCHAR(512),
mount_code VARCHAR(64) NOT NULL,
resolution_m FLOAT,
status VARCHAR(32) DEFAULT 'active',
bbox_min_lon FLOAT,
bbox_max_lon FLOAT,
bbox_min_lat FLOAT,
bbox_max_lat FLOAT,
sample_count INTEGER DEFAULT 0,
analysis_task_id VARCHAR(128),
analysis_status VARCHAR(32) DEFAULT 'not_started',
analysis_error_message TEXT,
analysis_started_at TIMESTAMP WITH TIME ZONE,
analysis_finished_at TIMESTAMP WITH TIME ZONE,
terrain_status VARCHAR(32) DEFAULT 'not_supported',
terrain_task_id VARCHAR(128),
terrain_error_message TEXT,
terrain_root_path VARCHAR(2048),
terrain_url_template VARCHAR(2048),
terrain_min_zoom INTEGER,
terrain_max_zoom INTEGER,
terrain_bounds JSON,
terrain_metadata JSON,
notes TEXT,
create_date TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
create_user VARCHAR(64),
update_date TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
update_user VARCHAR(64)
);
-- Create indexes for elevation_file_record
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_status ON elevation_file_record(status);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_mount_code ON elevation_file_record(mount_code);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_analysis_status ON elevation_file_record(analysis_status);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_terrain_status ON elevation_file_record(terrain_status);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_analysis_task ON elevation_file_record(analysis_task_id);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_terrain_task ON elevation_file_record(terrain_task_id);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_file_name ON elevation_file_record(file_name);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_file_format ON elevation_file_record(file_format);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_source ON elevation_file_record(source);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_create_date ON elevation_file_record(create_date);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_create_user ON elevation_file_record(create_user);
CREATE INDEX IF NOT EXISTS idx_elevation_file_record_update_user ON elevation_file_record(update_user);
-- Step 2: Migrate data from elevation_dataset to elevation_file_record
-- Each dataset becomes a file record
INSERT INTO elevation_file_record (
id,
file_name,
file_path,
file_format,
file_size,
source,
mount_code,
resolution_m,
status,
bbox_min_lon,
bbox_max_lon,
bbox_min_lat,
bbox_max_lat,
sample_count,
analysis_task_id,
analysis_status,
analysis_error_message,
analysis_started_at,
analysis_finished_at,
terrain_status,
terrain_task_id,
terrain_error_message,
terrain_root_path,
terrain_url_template,
terrain_min_zoom,
terrain_max_zoom,
terrain_bounds,
terrain_metadata,
notes,
create_date,
create_user,
update_date,
update_user
)
SELECT
id,
SUBSTRING(file_path FROM '[^/]+$') as file_name, -- Extract filename from path
file_path,
file_format,
0 as file_size, -- Default to 0, will be updated later
source,
mount_code,
resolution_m,
status,
bbox_min_lon,
bbox_max_lon,
bbox_min_lat,
bbox_max_lat,
sample_count,
analysis_task_id,
analysis_status,
analysis_error_message,
analysis_started_at,
analysis_finished_at,
terrain_status,
terrain_task_id,
terrain_error_message,
terrain_root_path,
terrain_url_template,
terrain_min_zoom,
terrain_max_zoom,
terrain_bounds,
terrain_metadata,
notes,
create_date,
create_user,
update_date,
update_user
FROM elevation_dataset
WHERE NOT EXISTS (
SELECT 1 FROM elevation_file_record WHERE elevation_file_record.id = elevation_dataset.id
);
-- Step 3: Add file_record_id column to elevation_apply_job (nullable for backward compatibility)
ALTER TABLE elevation_apply_job ADD COLUMN IF NOT EXISTS file_record_id VARCHAR(32);
-- Create index for file_record_id
CREATE INDEX IF NOT EXISTS idx_elevation_apply_job_file_record ON elevation_apply_job(file_record_id);
-- Migrate dataset_id to file_record_id for existing jobs
UPDATE elevation_apply_job
SET file_record_id = dataset_id
WHERE file_record_id IS NULL AND dataset_id IS NOT NULL;
-- Step 4: Add file_record_id column to elevation_data_import_job (nullable for backward compatibility)
ALTER TABLE elevation_data_import_job ADD COLUMN IF NOT EXISTS file_record_id VARCHAR(32);
-- Create index for file_record_id
CREATE INDEX IF NOT EXISTS idx_elevation_data_import_job_file_record ON elevation_data_import_job(file_record_id);
-- Migrate dataset_id to file_record_id for existing import jobs
UPDATE elevation_data_import_job
SET file_record_id = dataset_id
WHERE file_record_id IS NULL AND dataset_id IS NOT NULL;
-- Note: We keep the old tables and columns for backward compatibility during transition
-- The old elevation_dataset, elevation_dataset_file_meta tables can be dropped after full migration
-- The dataset_id columns in elevation_apply_job and elevation_data_import_job can be dropped later
-- Step 5: Create a view for backward compatibility (optional)
CREATE OR REPLACE VIEW elevation_dataset_compat AS
SELECT
id,
file_name as name,
SUBSTRING(file_name FROM 1 FOR 64) as code,
source,
file_format,
mount_code,
SUBSTRING(file_path FROM 1 FOR POSITION('/' || file_name IN file_path) - 1) as dataset_dir,
file_path,
resolution_m,
status,
'idle' as usage_status,
sample_count,
bbox_min_lon,
bbox_max_lon,
bbox_min_lat,
bbox_max_lat,
analysis_task_id,
analysis_status,
analysis_error_message,
analysis_started_at,
analysis_finished_at,
terrain_status,
terrain_task_id,
terrain_error_message,
terrain_root_path,
terrain_url_template,
terrain_min_zoom,
terrain_max_zoom,
terrain_bounds,
terrain_metadata,
notes,
create_date,
create_user,
update_date,
update_user
FROM elevation_file_record;
+78
View File
@@ -0,0 +1,78 @@
# 数据库迁移说明
本目录包含数据库迁移脚本,用于重构高程数据管理功能。
## 迁移文件
### 001_add_elevation_file_record.sql
**目的**: 将高程数据管理从数据集中心模式重构为文件中心模式
**主要变更**:
1. **创建新表 `elevation_file_record`**
- 包含所有文件记录相关字段
- 每个文件对应一条记录
- 合并了原 `elevation_dataset` 的核心字段
2. **数据迁移**
-`elevation_dataset` 迁移数据到 `elevation_file_record`
- 保留原有的 ID 以保持关联关系
3. **更新关联表**
- `elevation_apply_job` 添加 `file_record_id` 字段
- `elevation_data_import_job` 添加 `file_record_id` 字段
- 将现有的 `dataset_id` 值复制到新字段
4. **向后兼容**
- 保留旧表和旧字段用于过渡期
- 创建兼容性视图 `elevation_dataset_compat`
## 执行迁移
```bash
# 使用 psql 执行迁移
psql -U your_user -d your_database -f 001_add_elevation_file_record.sql
# 或使用 Python 脚本执行
python -c "
from app.core.database import engine
with open('migrations/001_add_elevation_file_record.sql') as f:
sql = f.read()
with engine.begin() as conn:
conn.execute(sql)
"
```
## 回滚计划
如需回滚,执行以下操作:
1. 停止使用新的 `/records` API
2. 删除 `elevation_file_record`
3. 删除 `elevation_apply_job.file_record_id``elevation_data_import_job.file_record_id` 字段
4. 继续使用原有的 `/datasets` API
## 注意事项
- **迁移前务必备份数据库**
- 迁移过程中保留旧表,确保可以回滚
- 完全迁移完成并测试通过后,再考虑删除旧表
- 新旧 API 可以并存一段时间,逐步切换
## 后续清理
当确认新系统运行稳定后,可执行清理:
```sql
-- 删除旧表
DROP TABLE IF EXISTS elevation_dataset_file_meta;
DROP TABLE IF EXISTS elevation_dataset CASCADE;
-- 删除旧字段
ALTER TABLE elevation_apply_job DROP COLUMN IF EXISTS dataset_id;
ALTER TABLE elevation_data_import_job DROP COLUMN IF EXISTS dataset_id;
-- 删除兼容性视图
DROP VIEW IF EXISTS elevation_dataset_compat;
```