diff --git a/MEMORY.md b/MEMORY.md index 5225460..24b07c5 100644 --- a/MEMORY.md +++ b/MEMORY.md @@ -226,6 +226,16 @@ - `DB_SCHEMA` 通过 PostgreSQL `search_path` 注入,语义等价 JDBC 的 `currentSchema`。 - API 启动初始化口径:`seed_defaults` 对本地目标执行;为兼容老表状态约束,初始管理员状态写入值统一为 `ENABLED`(不使用 `active`)。 - 用户表兼容口径:用户主键列对齐旧库 `users.user_id`,与用户关联的外键统一引用 `users.user_id`(不再引用 `users.id`)。 + +## 高程数据管理口径(2026-05-01) + +- 高程管理一期采用“文件 + 元数据 + 异步回填”三层: + - 文件层:高程源数据文件由现有文件管理模块承载(`mount_code + file_path`)。 + - 元数据层:`elevation_dataset` 记录数据集来源、分辨率、样本统计、覆盖 bbox、状态。 + - 任务层:`elevation_apply_job` 记录线路高程回填任务状态与统计。 +- 线路渲染继续复用 `power_line_tower.altitude_m` 作为高度来源;回填后在 `raw_extra_json.elevation` 写入溯源信息(dataset/sample_distance/sampled_at)。 +- 一期采样策略为 CSV 点集最近邻(非栅格插值),用于先打通管理与回填闭环;默认推荐回填模式 `fill_null_only`,避免覆盖人工高程。 +- 高频通知 topic 为 `admin.elevation`,线路联动通知沿用 `admin.power-lines`。 - 用户名列口径:历史环境存在 `users.username` 与 `users.user_name` 双形态;运行时通过 `USER_USERNAME_COLUMN`(`username`/`user_name`)与目标库对齐,避免启动阶段关系预加载触发 `UndefinedColumn`。 - 密码列口径:历史环境存在 `users.password` 与 `users.password_hash` 双形态;运行时通过 `USER_PASSWORD_COLUMN`(`password`/`password_hash`)与目标库对齐,避免启动阶段关系预加载触发 `UndefinedColumn`。 diff --git a/api/app/api/router.py b/api/app/api/router.py index a3c74bc..91374ce 100644 --- a/api/app/api/router.py +++ b/api/app/api/router.py @@ -5,6 +5,7 @@ from .v1.admin_files import router as admin_files_router from .v1.atp_models import router as atp_models_router from .v1.auth import router as auth_router from .v1.diary import router as diary_router +from .v1.elevation import router as elevation_router from .v1.life_countdown import router as life_countdown_router from .v1.lightning import router as lightning_router from .v1.lines import router as lines_router @@ -27,6 +28,7 @@ v1_router.include_router(task_monitor_router) v1_router.include_router(token_usage_router) v1_router.include_router(system_params_router) v1_router.include_router(diary_router) +v1_router.include_router(elevation_router) v1_router.include_router(life_countdown_router) v1_router.include_router(lightning_router) v1_router.include_router(lines_router) diff --git a/api/app/api/v1/elevation.py b/api/app/api/v1/elevation.py new file mode 100644 index 0000000..0f40bf9 --- /dev/null +++ b/api/app/api/v1/elevation.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +from fastapi import APIRouter, Depends, HTTPException, Query, status +from sqlalchemy.orm import Session + +from ...core.database import get_db +from ...core.dependencies import CurrentUser, require_any_permission, require_permission +from ...schemas.elevation import ( + ElevationApplyJobCreateRequest, + ElevationApplyJobCreateResponse, + ElevationApplyJobListResponse, + ElevationApplyJobSummary, + ElevationDatasetAnalyzeResponse, + ElevationDatasetCreateRequest, + ElevationDatasetListResponse, + ElevationDatasetSummary, + ElevationDatasetUpdateRequest, +) +from ...services.elevation_service import ( + analyze_dataset, + create_apply_job, + create_dataset, + get_job_by_id, + list_datasets, + list_jobs, + serialize_job, + update_dataset, +) + +router = APIRouter(prefix="/elevation", tags=["elevation"]) + + +@router.get("/datasets", response_model=ElevationDatasetListResponse) +def get_elevation_datasets( + keyword: str | None = Query(default=None), + status_filter: str | None = Query(default=None, alias="status"), + _: CurrentUser = Depends(require_any_permission("elevation.read", "elevation.manage")), + db: Session = Depends(get_db), +) -> ElevationDatasetListResponse: + return list_datasets( + db, + keyword=keyword, + status_filter=status_filter, + ) + + +@router.post("/datasets", response_model=ElevationDatasetSummary) +def create_elevation_dataset( + payload: ElevationDatasetCreateRequest, + current_user: CurrentUser = Depends(require_permission("elevation.manage")), + db: Session = Depends(get_db), +) -> ElevationDatasetSummary: + created = create_dataset(db, payload, actor=current_user.user) + if not created: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="高程数据集编码已存在") + return created + + +@router.patch("/datasets/{dataset_id}", response_model=ElevationDatasetSummary) +def update_elevation_dataset( + dataset_id: str, + payload: ElevationDatasetUpdateRequest, + current_user: CurrentUser = Depends(require_permission("elevation.manage")), + db: Session = Depends(get_db), +) -> ElevationDatasetSummary: + updated = update_dataset(db, dataset_id, payload, actor=current_user.user) + if not updated: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="高程数据集不存在") + return updated + + +@router.post("/datasets/{dataset_id}/analyze", response_model=ElevationDatasetAnalyzeResponse) +def analyze_elevation_dataset( + dataset_id: str, + current_user: CurrentUser = Depends(require_permission("elevation.manage")), + db: Session = Depends(get_db), +) -> ElevationDatasetAnalyzeResponse: + return analyze_dataset(db, dataset_id=dataset_id, actor=current_user.user) + + +@router.get("/jobs", response_model=ElevationApplyJobListResponse) +def get_elevation_jobs( + line_id: str | None = Query(default=None), + dataset_id: str | None = Query(default=None), + status_filter: str | None = Query(default=None, alias="status"), + limit: int = Query(default=50, ge=1, le=200), + _: CurrentUser = Depends(require_any_permission("elevation.read", "elevation.manage")), + db: Session = Depends(get_db), +) -> ElevationApplyJobListResponse: + return list_jobs( + db, + line_id=line_id, + dataset_id=dataset_id, + status_filter=status_filter, + limit=limit, + ) + + +@router.get("/jobs/{job_id}", response_model=ElevationApplyJobSummary) +def get_elevation_job_detail( + job_id: str, + _: CurrentUser = Depends(require_any_permission("elevation.read", "elevation.manage")), + db: Session = Depends(get_db), +) -> ElevationApplyJobSummary: + item = get_job_by_id(db, job_id) + if not item: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="高程回填任务不存在") + return serialize_job(item) + + +@router.post("/jobs/apply-line", response_model=ElevationApplyJobCreateResponse) +def create_elevation_apply_line_job( + payload: ElevationApplyJobCreateRequest, + current_user: CurrentUser = Depends(require_permission("elevation.manage")), + db: Session = Depends(get_db), +) -> ElevationApplyJobCreateResponse: + return create_apply_job(db, payload, actor=current_user.user) diff --git a/api/app/core/celery_app.py b/api/app/core/celery_app.py index b0ffe58..9754156 100644 --- a/api/app/core/celery_app.py +++ b/api/app/core/celery_app.py @@ -10,7 +10,7 @@ celery_app = Celery( "fquiz", broker=settings.resolved_celery_broker_url, backend=settings.resolved_celery_result_backend, - include=["app.tasks.schedule_tasks"], + include=["app.tasks.schedule_tasks", "app.tasks.elevation_tasks"], ) celery_app.conf.update( diff --git a/api/app/core/database.py b/api/app/core/database.py index e8da541..1a9bf6e 100644 --- a/api/app/core/database.py +++ b/api/app/core/database.py @@ -200,6 +200,7 @@ def init_db() -> None: auth_session, calendar_event, diary, + elevation, file_storage, hot_search, life_countdown, diff --git a/api/app/models/__init__.py b/api/app/models/__init__.py index 33776dc..8f35cfc 100644 --- a/api/app/models/__init__.py +++ b/api/app/models/__init__.py @@ -4,7 +4,7 @@ Import all model modules during package initialization so SQLAlchemy can resolve string-based relationships regardless of route/service import order. """ -from . import atp_model, audit_log, auth_session, calendar_event, diary, file_storage, hot_search, life_countdown, lightning_event, lightning_sample, line, line_tower, menu, model_registry, object_group, question_bank, rbac, requirement, system_param, todo, user, vocabulary_word +from . import atp_model, audit_log, auth_session, calendar_event, diary, elevation, file_storage, hot_search, life_countdown, lightning_event, lightning_sample, line, line_tower, menu, model_registry, object_group, question_bank, rbac, requirement, system_param, todo, user, vocabulary_word __all__ = [ "atp_model", @@ -12,6 +12,7 @@ __all__ = [ "auth_session", "calendar_event", "diary", + "elevation", "file_storage", "hot_search", "life_countdown", diff --git a/api/app/models/elevation.py b/api/app/models/elevation.py new file mode 100644 index 0000000..932ef21 --- /dev/null +++ b/api/app/models/elevation.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING +from uuid import uuid4 + +from sqlalchemy import DateTime, Float, ForeignKey, Index, Integer, String, Text +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from ..core.database import Base +from .base import utcnow + +if TYPE_CHECKING: + from .line import Line + + +class ElevationDataset(Base): + __tablename__ = "elevation_dataset" + __table_args__ = ( + Index("idx_elevation_dataset_status", "status"), + Index("idx_elevation_dataset_mount_code", "mount_code"), + ) + + id: Mapped[str] = mapped_column( + String(32), + primary_key=True, + default=lambda: uuid4().hex, + ) + code: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True) + name: Mapped[str] = mapped_column(String(255), nullable=False) + source: Mapped[str | None] = mapped_column(String(128), index=True) + file_format: Mapped[str] = mapped_column(String(32), default="csv", index=True) + mount_code: Mapped[str] = mapped_column(String(64), nullable=False, index=True) + file_path: Mapped[str] = mapped_column(String(2048), nullable=False) + resolution_m: Mapped[float | None] = mapped_column(Float) + status: Mapped[str] = mapped_column(String(32), default="active", index=True) + sample_count: Mapped[int] = mapped_column(Integer, default=0) + bbox_min_lon: Mapped[float | None] = mapped_column(Float) + bbox_max_lon: Mapped[float | None] = mapped_column(Float) + bbox_min_lat: Mapped[float | None] = mapped_column(Float) + bbox_max_lat: Mapped[float | None] = mapped_column(Float) + notes: Mapped[str | None] = mapped_column(Text) + create_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) + create_user: Mapped[str | None] = mapped_column(String(64), index=True) + update_date: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utcnow, + onupdate=utcnow, + ) + update_user: Mapped[str | None] = mapped_column(String(64), index=True) + + +class ElevationApplyJob(Base): + __tablename__ = "elevation_apply_job" + __table_args__ = ( + Index("idx_elevation_apply_job_status", "status"), + Index("idx_elevation_apply_job_line", "line_id"), + Index("idx_elevation_apply_job_dataset", "dataset_id"), + ) + + id: Mapped[str] = mapped_column( + String(32), + primary_key=True, + default=lambda: uuid4().hex, + ) + line_id: Mapped[str] = mapped_column( + String(32), + ForeignKey("power_line.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + dataset_id: Mapped[str] = mapped_column( + String(32), + ForeignKey("elevation_dataset.id", ondelete="CASCADE"), + nullable=False, + index=True, + ) + mode: Mapped[str] = mapped_column(String(32), default="fill_null_only", index=True) + status: Mapped[str] = mapped_column(String(32), default="pending", index=True) + task_id: Mapped[str | None] = mapped_column(String(128), index=True) + total_tower_count: Mapped[int] = mapped_column(Integer, default=0) + updated_tower_count: Mapped[int] = mapped_column(Integer, default=0) + skipped_tower_count: Mapped[int] = mapped_column(Integer, default=0) + missing_geo_count: Mapped[int] = mapped_column(Integer, default=0) + unmatched_count: Mapped[int] = mapped_column(Integer, default=0) + error_message: Mapped[str | None] = mapped_column(Text) + started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + create_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) + create_user: Mapped[str | None] = mapped_column(String(64), index=True) + update_date: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utcnow, + onupdate=utcnow, + ) + update_user: Mapped[str | None] = mapped_column(String(64), index=True) + + line: Mapped[Line] = relationship("Line", lazy="selectin") + dataset: Mapped[ElevationDataset] = relationship("ElevationDataset", lazy="selectin") diff --git a/api/app/schemas/elevation.py b/api/app/schemas/elevation.py new file mode 100644 index 0000000..e048773 --- /dev/null +++ b/api/app/schemas/elevation.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, Field + + +ElevationDatasetStatus = Literal["active", "disabled"] +ElevationApplyMode = Literal["fill_null_only", "overwrite_all"] +ElevationApplyJobStatus = Literal["pending", "running", "success", "failed"] + + +class ElevationDatasetSummary(BaseModel): + id: str + code: str + name: str + source: str | None = None + file_format: str + mount_code: str + file_path: str + resolution_m: float | None = None + status: ElevationDatasetStatus + sample_count: int = 0 + bbox_min_lon: float | None = None + bbox_max_lon: float | None = None + bbox_min_lat: float | None = None + bbox_max_lat: float | None = None + notes: str | None = None + create_date: datetime + create_user: str | None = None + update_date: datetime + update_user: str | None = None + + +class ElevationDatasetListResponse(BaseModel): + items: list[ElevationDatasetSummary] + total: int + + +class ElevationDatasetCreateRequest(BaseModel): + code: str = Field(min_length=2, max_length=64) + name: str = Field(min_length=2, max_length=255) + source: str | None = Field(default=None, max_length=128) + mount_code: str = Field(min_length=2, max_length=64) + file_path: str = Field(min_length=1, max_length=2048) + resolution_m: float | None = Field(default=None, gt=0) + notes: str | None = Field(default=None, max_length=2000) + + +class ElevationDatasetUpdateRequest(BaseModel): + name: str | None = Field(default=None, min_length=2, max_length=255) + source: str | None = Field(default=None, max_length=128) + resolution_m: float | None = Field(default=None, gt=0) + status: ElevationDatasetStatus | None = None + notes: str | None = Field(default=None, max_length=2000) + + +class ElevationDatasetAnalyzeResponse(BaseModel): + dataset: ElevationDatasetSummary + warnings: list[str] = Field(default_factory=list) + + +class ElevationApplyJobSummary(BaseModel): + id: str + line_id: str + line_code: str | None = None + line_name: str | None = None + dataset_id: str + dataset_code: str | None = None + dataset_name: str | None = None + mode: ElevationApplyMode + status: ElevationApplyJobStatus + task_id: str | None = None + total_tower_count: int = 0 + updated_tower_count: int = 0 + skipped_tower_count: int = 0 + missing_geo_count: int = 0 + unmatched_count: int = 0 + error_message: str | None = None + started_at: datetime | None = None + finished_at: datetime | None = None + create_date: datetime + create_user: str | None = None + update_date: datetime + update_user: str | None = None + + +class ElevationApplyJobListResponse(BaseModel): + items: list[ElevationApplyJobSummary] + total: int + + +class ElevationApplyJobCreateRequest(BaseModel): + line_id: str = Field(min_length=1, max_length=64) + dataset_id: str = Field(min_length=1, max_length=64) + mode: ElevationApplyMode = "fill_null_only" + + +class ElevationApplyJobCreateResponse(BaseModel): + job: ElevationApplyJobSummary + queued: bool = True diff --git a/api/app/services/admin_service.py b/api/app/services/admin_service.py index a45b58d..03587e8 100644 --- a/api/app/services/admin_service.py +++ b/api/app/services/admin_service.py @@ -403,7 +403,7 @@ def update_menu(db: Session, menu_id: int, payload: MenuUpdateRequest) -> MenuPu def delete_menu(db: Session, menu_id: int) -> bool: menu = get_menu_by_id(db, menu_id) - if not menu or menu.code in {"dashboard", "admin.users", "admin.roles", "admin.menus", "admin.system_params", "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", "admin.task_monitor", "admin.atp_models", "admin.files", "admin.syslog", "admin.wine_runner"}: + if not menu or menu.code in {"dashboard", "admin.users", "admin.roles", "admin.menus", "admin.system_params", "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", "admin.task_monitor", "admin.atp_models", "admin.files", "admin.elevation", "admin.syslog", "admin.wine_runner"}: return False child_exists = db.scalar(select(Menu.id).where(Menu.parent_id == menu_id)) if child_exists is not None: diff --git a/api/app/services/elevation_service.py b/api/app/services/elevation_service.py new file mode 100644 index 0000000..b566d41 --- /dev/null +++ b/api/app/services/elevation_service.py @@ -0,0 +1,691 @@ +from __future__ import annotations + +import asyncio +import csv +import io +from dataclasses import dataclass +from typing import Any + +from fastapi import HTTPException, status +from sqlalchemy import func, select +from sqlalchemy.orm import Session + +from ..core.database import SessionLocal +from ..models.base import utcnow +from ..models.elevation import ElevationApplyJob, ElevationDataset +from ..models.line import Line +from ..models.line_tower import LineTower +from ..models.user import User +from ..schemas.elevation import ( + ElevationApplyJobCreateRequest, + ElevationApplyJobCreateResponse, + ElevationApplyJobListResponse, + ElevationApplyJobSummary, + ElevationDatasetAnalyzeResponse, + ElevationDatasetCreateRequest, + ElevationDatasetListResponse, + ElevationDatasetSummary, + ElevationDatasetUpdateRequest, +) +from .file_service import _build_driver_or_400, _require_mount +from .push_service import publish_topic +from .storage_driver import StorageInvalidPathError, StoragePathNotFoundError + +ELEVATION_TOPIC = "admin.elevation" +POWER_LINES_TOPIC = "admin.power-lines" +CSV_ENCODINGS = ("utf-8-sig", "utf-8", "gbk", "latin-1") +NEAREST_MATCH_MAX_DISTANCE_M = 2000.0 + + +@dataclass +class ElevationSamplePoint: + lon: float + lat: float + altitude_m: float + + +def serialize_dataset(item: ElevationDataset) -> ElevationDatasetSummary: + return ElevationDatasetSummary( + id=item.id, + code=item.code, + name=item.name, + source=item.source, + file_format=item.file_format, + mount_code=item.mount_code, + file_path=item.file_path, + resolution_m=item.resolution_m, + status=item.status, # type: ignore[arg-type] + sample_count=item.sample_count, + bbox_min_lon=item.bbox_min_lon, + bbox_max_lon=item.bbox_max_lon, + bbox_min_lat=item.bbox_min_lat, + bbox_max_lat=item.bbox_max_lat, + notes=item.notes, + create_date=item.create_date, + create_user=item.create_user, + update_date=item.update_date, + update_user=item.update_user, + ) + + +def serialize_job(item: ElevationApplyJob) -> ElevationApplyJobSummary: + line = item.line + dataset = item.dataset + return ElevationApplyJobSummary( + id=item.id, + line_id=item.line_id, + line_code=line.code if line else None, + line_name=line.name if line else None, + dataset_id=item.dataset_id, + dataset_code=dataset.code if dataset else None, + dataset_name=dataset.name if dataset else None, + mode=item.mode, # type: ignore[arg-type] + status=item.status, # type: ignore[arg-type] + task_id=item.task_id, + total_tower_count=item.total_tower_count, + updated_tower_count=item.updated_tower_count, + skipped_tower_count=item.skipped_tower_count, + missing_geo_count=item.missing_geo_count, + unmatched_count=item.unmatched_count, + error_message=item.error_message, + started_at=item.started_at, + finished_at=item.finished_at, + create_date=item.create_date, + create_user=item.create_user, + update_date=item.update_date, + update_user=item.update_user, + ) + + +def list_datasets( + db: Session, + *, + keyword: str | None, + status_filter: str | None, +) -> ElevationDatasetListResponse: + stmt = select(ElevationDataset) + total_stmt = select(func.count()).select_from(ElevationDataset) + + normalized_keyword = (keyword or "").strip() + if normalized_keyword: + like = f"%{normalized_keyword}%" + predicate = ( + ElevationDataset.code.ilike(like) + | ElevationDataset.name.ilike(like) + | ElevationDataset.source.ilike(like) + ) + stmt = stmt.where(predicate) + total_stmt = total_stmt.where(predicate) + + if status_filter in {"active", "disabled"}: + stmt = stmt.where(ElevationDataset.status == status_filter) + total_stmt = total_stmt.where(ElevationDataset.status == status_filter) + + total = int(db.scalar(total_stmt) or 0) + items = db.execute( + stmt.order_by(ElevationDataset.update_date.desc(), ElevationDataset.code.asc()) + ).scalars().all() + return ElevationDatasetListResponse( + items=[serialize_dataset(item) for item in items], + total=total, + ) + + +def get_dataset_by_id(db: Session, dataset_id: str) -> ElevationDataset | None: + return db.execute( + select(ElevationDataset).where(ElevationDataset.id == dataset_id) + ).scalar_one_or_none() + + +def get_dataset_by_code(db: Session, code: str) -> ElevationDataset | None: + normalized = code.strip() + if not normalized: + return None + return db.execute( + select(ElevationDataset).where( + func.lower(ElevationDataset.code) == normalized.lower() + ) + ).scalar_one_or_none() + + +def create_dataset( + db: Session, + payload: ElevationDatasetCreateRequest, + *, + actor: User, +) -> ElevationDatasetSummary | None: + if get_dataset_by_code(db, payload.code): + return None + + _ensure_dataset_file_exists(db, mount_code=payload.mount_code, file_path=payload.file_path) + + now = utcnow() + item = ElevationDataset( + code=payload.code.strip(), + name=payload.name.strip(), + source=_normalize_str(payload.source), + file_format="csv", + mount_code=payload.mount_code.strip(), + file_path=payload.file_path.strip(), + resolution_m=payload.resolution_m, + status="active", + notes=_normalize_str(payload.notes), + create_date=now, + create_user=actor.id, + update_date=now, + update_user=actor.id, + ) + db.add(item) + db.commit() + saved = get_dataset_by_id(db, item.id) + if not saved: + return None + + _publish_elevation_change( + "elevation.dataset.created", + {"action": "dataset_created", "dataset_id": saved.id}, + ) + return serialize_dataset(saved) + + +def update_dataset( + db: Session, + dataset_id: str, + payload: ElevationDatasetUpdateRequest, + *, + actor: User, +) -> ElevationDatasetSummary | None: + item = get_dataset_by_id(db, dataset_id) + if not item: + return None + + update_data = payload.model_dump(exclude_unset=True) + if "name" in update_data and update_data["name"] is not None: + item.name = str(update_data["name"]).strip() + if "source" in update_data: + item.source = _normalize_str(update_data["source"]) + if "resolution_m" in update_data: + item.resolution_m = update_data["resolution_m"] + if "status" in update_data and update_data["status"] is not None: + item.status = str(update_data["status"]).strip().lower() + if "notes" in update_data: + item.notes = _normalize_str(update_data["notes"]) + + item.update_user = actor.id + item.update_date = utcnow() + db.commit() + saved = get_dataset_by_id(db, dataset_id) + if not saved: + return None + _publish_elevation_change( + "elevation.dataset.updated", + {"action": "dataset_updated", "dataset_id": saved.id}, + ) + return serialize_dataset(saved) + + +def analyze_dataset( + db: Session, + *, + dataset_id: str, + actor: User, +) -> ElevationDatasetAnalyzeResponse: + item = get_dataset_by_id(db, dataset_id) + if not item: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="高程数据集不存在") + if item.status != "active": + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="高程数据集未启用") + + sample_points, warnings = _load_dataset_points(db, item) + stats = _compute_dataset_stats(sample_points) + + item.sample_count = stats["sample_count"] + item.bbox_min_lon = stats["bbox_min_lon"] + item.bbox_max_lon = stats["bbox_max_lon"] + item.bbox_min_lat = stats["bbox_min_lat"] + item.bbox_max_lat = stats["bbox_max_lat"] + item.update_user = actor.id + item.update_date = utcnow() + db.commit() + + saved = get_dataset_by_id(db, dataset_id) + if not saved: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="高程数据集分析保存失败") + + _publish_elevation_change( + "elevation.dataset.analyzed", + {"action": "dataset_analyzed", "dataset_id": saved.id}, + ) + return ElevationDatasetAnalyzeResponse( + dataset=serialize_dataset(saved), + warnings=warnings, + ) + + +def list_jobs( + db: Session, + *, + line_id: str | None, + dataset_id: str | None, + status_filter: str | None, + limit: int, +) -> ElevationApplyJobListResponse: + stmt = select(ElevationApplyJob) + total_stmt = select(func.count()).select_from(ElevationApplyJob) + + if line_id: + stmt = stmt.where(ElevationApplyJob.line_id == line_id) + total_stmt = total_stmt.where(ElevationApplyJob.line_id == line_id) + if dataset_id: + stmt = stmt.where(ElevationApplyJob.dataset_id == dataset_id) + total_stmt = total_stmt.where(ElevationApplyJob.dataset_id == dataset_id) + if status_filter in {"pending", "running", "success", "failed"}: + stmt = stmt.where(ElevationApplyJob.status == status_filter) + total_stmt = total_stmt.where(ElevationApplyJob.status == status_filter) + + total = int(db.scalar(total_stmt) or 0) + items = db.execute( + stmt.order_by(ElevationApplyJob.create_date.desc(), ElevationApplyJob.id.desc()).limit(limit) + ).scalars().all() + return ElevationApplyJobListResponse( + items=[serialize_job(item) for item in items], + total=total, + ) + + +def get_job_by_id(db: Session, job_id: str) -> ElevationApplyJob | None: + return db.execute( + select(ElevationApplyJob).where(ElevationApplyJob.id == job_id) + ).scalar_one_or_none() + + +def create_apply_job( + db: Session, + payload: ElevationApplyJobCreateRequest, + *, + actor: User, +) -> ElevationApplyJobCreateResponse: + line = db.execute(select(Line).where(Line.id == payload.line_id)).scalar_one_or_none() + if not line: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="线路不存在") + + dataset = get_dataset_by_id(db, payload.dataset_id) + if not dataset: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="高程数据集不存在") + if dataset.status != "active": + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="高程数据集未启用") + + allowed_modes = {"fill_null_only", "overwrite_all"} + if payload.mode not in allowed_modes: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="不支持的回填模式") + + total_tower_count = int( + db.scalar( + select(func.count()) + .select_from(LineTower) + .where(LineTower.line_id == line.id) + ) + or 0 + ) + if total_tower_count <= 0: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="当前线路没有可回填的杆塔数据") + + now = utcnow() + job = ElevationApplyJob( + line_id=line.id, + dataset_id=dataset.id, + mode=payload.mode, + status="pending", + total_tower_count=total_tower_count, + create_date=now, + create_user=actor.id, + update_date=now, + update_user=actor.id, + ) + db.add(job) + db.commit() + saved = get_job_by_id(db, job.id) + if not saved: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="创建任务失败") + + from ..tasks.elevation_tasks import apply_elevation_for_line_job + + task = apply_elevation_for_line_job.delay(saved.id) + saved.task_id = task.id + saved.update_user = actor.id + saved.update_date = utcnow() + db.commit() + + latest = get_job_by_id(db, saved.id) + if not latest: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="任务派发失败") + + _publish_elevation_change( + "elevation.job.created", + {"action": "job_created", "job_id": latest.id, "line_id": latest.line_id}, + ) + return ElevationApplyJobCreateResponse(job=serialize_job(latest), queued=True) + + +def execute_apply_job(job_id: str) -> None: + db = SessionLocal() + try: + job = get_job_by_id(db, job_id) + if not job: + return + if job.status in {"success", "failed"}: + return + + job.status = "running" + job.started_at = utcnow() + job.update_date = utcnow() + db.commit() + _publish_elevation_change( + "elevation.job.running", + {"action": "job_running", "job_id": job.id, "line_id": job.line_id}, + ) + + line = db.execute(select(Line).where(Line.id == job.line_id)).scalar_one_or_none() + dataset = get_dataset_by_id(db, job.dataset_id) + if not line or not dataset: + job.status = "failed" + job.error_message = "线路或高程数据集不存在" + job.finished_at = utcnow() + job.update_date = utcnow() + db.commit() + _publish_elevation_change( + "elevation.job.failed", + {"action": "job_failed", "job_id": job.id, "line_id": job.line_id}, + ) + return + + points, warnings = _load_dataset_points(db, dataset) + if warnings: + warning_note = "; ".join(warnings[:5]) + else: + warning_note = None + stats = _apply_points_to_line_towers( + db, + line_id=line.id, + dataset=dataset, + mode=job.mode, + points=points, + ) + job.updated_tower_count = stats["updated_tower_count"] + job.skipped_tower_count = stats["skipped_tower_count"] + job.missing_geo_count = stats["missing_geo_count"] + job.unmatched_count = stats["unmatched_count"] + job.status = "success" + job.error_message = warning_note + job.finished_at = utcnow() + job.update_date = utcnow() + db.commit() + + _publish_elevation_change( + "elevation.job.success", + { + "action": "job_success", + "job_id": job.id, + "line_id": line.id, + "updated_tower_count": job.updated_tower_count, + "skipped_tower_count": job.skipped_tower_count, + }, + ) + _publish_line_change( + "power-lines.elevation.updated", + {"action": "elevation_updated", "line_id": line.id, "job_id": job.id}, + ) + except Exception as exc: + db.rollback() + failed = get_job_by_id(db, job_id) + if failed: + failed.status = "failed" + failed.error_message = str(exc) + failed.finished_at = utcnow() + failed.update_date = utcnow() + db.commit() + _publish_elevation_change( + "elevation.job.failed", + {"action": "job_failed", "job_id": failed.id, "line_id": failed.line_id}, + ) + raise + finally: + db.close() + + +def _ensure_dataset_file_exists(db: Session, *, mount_code: str, file_path: str) -> None: + mount = _require_mount(db, mount_code.strip()) + driver = _build_driver_or_400(mount) + try: + driver.read_file(file_path.strip()) + except StoragePathNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"数据文件不存在: {file_path}") from exc + except StorageInvalidPathError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + + +def _load_dataset_points( + db: Session, + dataset: ElevationDataset, +) -> tuple[list[ElevationSamplePoint], list[str]]: + mount = _require_mount(db, dataset.mount_code) + driver = _build_driver_or_400(mount) + try: + read_result = driver.read_file(dataset.file_path) + except StoragePathNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"高程数据文件不存在: {dataset.file_path}") from exc + except StorageInvalidPathError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + text = _decode_csv_bytes(read_result.content) + rows = list(csv.DictReader(io.StringIO(text))) + if not rows: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="高程数据文件为空") + + points: list[ElevationSamplePoint] = [] + warnings: list[str] = [] + for index, row in enumerate(rows, start=2): + lon = _pick_float(row, ["longitude", "lon", "lng", "经度"]) + lat = _pick_float(row, ["latitude", "lat", "纬度"]) + altitude = _pick_float(row, ["altitude_m", "altitude", "elevation", "dem", "海拔m", "高程"]) + if lon is None or lat is None or altitude is None: + warnings.append(f"第 {index} 行缺少经纬度或高程,已忽略") + continue + if lon < -180 or lon > 180 or lat < -90 or lat > 90: + warnings.append(f"第 {index} 行经纬度越界,已忽略") + continue + points.append(ElevationSamplePoint(lon=lon, lat=lat, altitude_m=altitude)) + + if not points: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="高程数据文件没有有效样本点") + return points, warnings + + +def _compute_dataset_stats(points: list[ElevationSamplePoint]) -> dict[str, float | int]: + lon_values = [item.lon for item in points] + lat_values = [item.lat for item in points] + return { + "sample_count": len(points), + "bbox_min_lon": min(lon_values), + "bbox_max_lon": max(lon_values), + "bbox_min_lat": min(lat_values), + "bbox_max_lat": max(lat_values), + } + + +def _apply_points_to_line_towers( + db: Session, + *, + line_id: str, + dataset: ElevationDataset, + mode: str, + points: list[ElevationSamplePoint], +) -> dict[str, int]: + towers = db.execute( + select(LineTower) + .where(LineTower.line_id == line_id) + .order_by(LineTower.seq_no.asc(), LineTower.id.asc()) + ).scalars().all() + + updated_tower_count = 0 + skipped_tower_count = 0 + missing_geo_count = 0 + unmatched_count = 0 + + for tower in towers: + if tower.longitude is None or tower.latitude is None: + missing_geo_count += 1 + continue + if mode == "fill_null_only" and tower.altitude_m is not None: + skipped_tower_count += 1 + continue + + match = _find_nearest_point( + lon=float(tower.longitude), + lat=float(tower.latitude), + points=points, + ) + if match is None: + unmatched_count += 1 + continue + + altitude, distance_m = match + if distance_m > NEAREST_MATCH_MAX_DISTANCE_M: + unmatched_count += 1 + continue + + tower.altitude_m = round(altitude, 3) + raw_extra = dict(tower.raw_extra_json or {}) + raw_extra["elevation"] = { + "dataset_id": dataset.id, + "dataset_code": dataset.code, + "sample_method": "nearest", + "sample_distance_m": round(distance_m, 3), + "sampled_at": utcnow().isoformat(), + } + tower.raw_extra_json = raw_extra + tower.update_date = utcnow() + updated_tower_count += 1 + + db.commit() + return { + "updated_tower_count": updated_tower_count, + "skipped_tower_count": skipped_tower_count, + "missing_geo_count": missing_geo_count, + "unmatched_count": unmatched_count, + } + + +def _find_nearest_point( + *, + lon: float, + lat: float, + points: list[ElevationSamplePoint], +) -> tuple[float, float] | None: + best_altitude: float | None = None + best_distance: float | None = None + + for point in points: + distance = _haversine_distance_m( + lon_a=lon, + lat_a=lat, + lon_b=point.lon, + lat_b=point.lat, + ) + if best_distance is None or distance < best_distance: + best_distance = distance + best_altitude = point.altitude_m + + if best_altitude is None or best_distance is None: + return None + return best_altitude, best_distance + + +def _haversine_distance_m( + *, + lon_a: float, + lat_a: float, + lon_b: float, + lat_b: float, +) -> float: + import math + + r = 6371000.0 + lon1 = math.radians(lon_a) + lat1 = math.radians(lat_a) + lon2 = math.radians(lon_b) + lat2 = math.radians(lat_b) + d_lon = lon2 - lon1 + d_lat = lat2 - lat1 + + h = ( + math.sin(d_lat / 2) ** 2 + + math.cos(lat1) * math.cos(lat2) * math.sin(d_lon / 2) ** 2 + ) + return 2 * r * math.asin(min(1.0, math.sqrt(h))) + + +def _pick_float(row: dict[str, Any], keys: list[str]) -> float | None: + for key in keys: + value = row.get(key) + number = _parse_float(value) + if number is not None: + return number + return None + + +def _decode_csv_bytes(content: bytes) -> str: + for encoding in CSV_ENCODINGS: + try: + return content.decode(encoding) + except UnicodeDecodeError: + continue + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="CSV 编码不受支持") + + +def _normalize_str(value: Any) -> str | None: + if value is None: + return None + text = str(value).strip() + return text or None + + +def _parse_float(value: Any) -> float | None: + text = _normalize_str(value) + if text is None: + return None + try: + return float(text) + except ValueError: + return None + + +def _publish_elevation_change(event_name: str, payload: dict[str, Any]) -> None: + _fire_and_forget( + publish_topic( + ELEVATION_TOPIC, + name=event_name, + payload=payload, + requires_refetch=["/api/v1/elevation/datasets", "/api/v1/elevation/jobs"], + dedupe_key=f"{event_name}:{payload.get('job_id') or payload.get('dataset_id') or 'unknown'}", + ) + ) + + +def _publish_line_change(event_name: str, payload: dict[str, Any]) -> None: + _fire_and_forget( + publish_topic( + POWER_LINES_TOPIC, + name=event_name, + payload=payload, + requires_refetch=["/api/v1/lines"], + dedupe_key=f"{event_name}:{payload.get('line_id', 'unknown')}", + ) + ) + + +def _fire_and_forget(coro: object) -> None: + try: + loop = asyncio.get_running_loop() + except RuntimeError: + return + loop.create_task(coro) diff --git a/api/app/services/legacy_admin_rbac_service.py b/api/app/services/legacy_admin_rbac_service.py index d152110..40b4076 100644 --- a/api/app/services/legacy_admin_rbac_service.py +++ b/api/app/services/legacy_admin_rbac_service.py @@ -76,6 +76,7 @@ PROTECTED_MENU_CODES = { "admin.system_params", "admin.wxapp", "admin.files", + "admin.elevation", "admin.filedetector", "admin.baidu_pan", "admin.power_lines", diff --git a/api/app/services/legacy_authz_service.py b/api/app/services/legacy_authz_service.py index 7531ad6..311f68a 100644 --- a/api/app/services/legacy_authz_service.py +++ b/api/app/services/legacy_authz_service.py @@ -26,6 +26,8 @@ DEFAULT_ADMIN_PERMISSION_CODES: set[str] = { "tower.manage", "lightning.read", "lightning.manage", + "elevation.read", + "elevation.manage", "atp.read", "atp.manage", "atp.run", @@ -97,6 +99,7 @@ MENU_CODE_PERMISSION_MAP: dict[str, set[str]] = { "admin.menus": {"menu.read", "menu.manage"}, "admin.system_params": {"system_param.read", "system_param.manage"}, "admin.files": {"file.read", "file.manage"}, + "admin.elevation": {"elevation.read", "elevation.manage"}, "admin.task_monitor": {"celery.read", "celery.manage"}, "admin.atp_models": {"atp.read", "atp.manage", "atp.run"}, "admin.lightning_currents": {"lightning.read", "lightning.manage"}, @@ -117,6 +120,17 @@ SYNTHETIC_LEGACY_MENU_ROWS: list[dict[str, Any]] = [ "seq": 54, "state": "ENABLED", }, + { + "menu_id": "admin.elevation", + "menu_name": "admin.elevation", + "menu_label": "高程数据管理", + "menu_type": "MENU", + "parent_id": None, + "url": "/admin/elevation", + "menu_icon": "Database", + "seq": 56, + "state": "ENABLED", + }, { "menu_id": "admin.task_monitor", "menu_name": "admin.task_monitor", diff --git a/api/app/services/seed_service.py b/api/app/services/seed_service.py index b6c6647..8e375ac 100644 --- a/api/app/services/seed_service.py +++ b/api/app/services/seed_service.py @@ -32,6 +32,8 @@ DEFAULT_PERMISSIONS: dict[str, str] = { "tower.manage": "Manage line towers", "lightning.read": "Read lightning current events and features", "lightning.manage": "Manage lightning current events and data imports", + "elevation.read": "Read elevation datasets and apply jobs", + "elevation.manage": "Manage elevation datasets and run altitude apply jobs", "atp.read": "Read ATP models and versions", "atp.manage": "Manage ATP models and version artifacts", "atp.run": "Run ATP simulations", @@ -66,6 +68,8 @@ DEFAULT_ROLES: dict[str, dict[str, object]] = { "tower.manage", "lightning.read", "lightning.manage", + "elevation.read", + "elevation.manage", "atp.read", "atp.manage", "atp.run", @@ -225,6 +229,19 @@ DEFAULT_MENUS: list[dict[str, object]] = [ "cacheable": False, "permission_code": "file.read", }, + { + "code": "admin.elevation", + "name": "高程数据管理", + "path": "/admin/elevation", + "icon": "Database", + "parent_code": None, + "type": "menu", + "sort_order": 56, + "status": "enabled", + "visible": True, + "cacheable": False, + "permission_code": "elevation.read", + }, { "code": "admin.syslog", "name": "系统日志", @@ -254,7 +271,7 @@ DEFAULT_MENUS: list[dict[str, object]] = [ ] ROLE_MENU_BINDINGS: dict[str, list[str]] = { - "admin": ["dashboard", "admin.users", "admin.roles", "admin.menus", "admin.system_params", "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", "admin.task_monitor", "admin.atp_models", "admin.files", "admin.syslog", "admin.wine_runner"], + "admin": ["dashboard", "admin.users", "admin.roles", "admin.menus", "admin.system_params", "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", "admin.task_monitor", "admin.atp_models", "admin.files", "admin.elevation", "admin.syslog", "admin.wine_runner"], "user": ["dashboard"], } diff --git a/api/app/services/topic_registry.py b/api/app/services/topic_registry.py index a8d1451..58f1b5f 100644 --- a/api/app/services/topic_registry.py +++ b/api/app/services/topic_registry.py @@ -23,6 +23,7 @@ TOPIC_RULES: dict[str, TopicRule] = { "admin.menus": TopicRule(any_permission_codes={"menu.read", "menu.manage"}), "admin.system-params": TopicRule(any_permission_codes={"system_param.read", "system_param.manage"}), "admin.files": TopicRule(any_permission_codes={"file.read", "file.manage"}), + "admin.elevation": TopicRule(any_permission_codes={"elevation.read", "elevation.manage"}), "admin.atp-models": TopicRule(any_permission_codes={"atp.read", "atp.run", "atp.manage"}), "admin.audit_logs": TopicRule(any_permission_codes={"menu.read", "menu.manage"}), "admin.question_bank": TopicRule(any_permission_codes={"question_bank.read", "question_bank.manage"}), diff --git a/api/app/tasks/elevation_tasks.py b/api/app/tasks/elevation_tasks.py new file mode 100644 index 0000000..b916654 --- /dev/null +++ b/api/app/tasks/elevation_tasks.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +from ..core.celery_app import celery_app +from ..services.elevation_service import execute_apply_job + + +@celery_app.task(name="app.tasks.elevation_tasks.apply_elevation_for_line_job") +def apply_elevation_for_line_job(job_id: str) -> dict[str, str]: + execute_apply_job(job_id) + return {"job_id": job_id, "status": "done"} diff --git a/memory/2026-05-01.md b/memory/2026-05-01.md index 707f7c4..dc51286 100644 --- a/memory/2026-05-01.md +++ b/memory/2026-05-01.md @@ -319,3 +319,69 @@ - 风险与影响: - 影响范围仅为线路管理分布图控件区 UI 与交互;后端接口和数据结构无影响。 + +## Work Log - 新增高程数据管理功能(2026-05-01) + +- 背景: + - 目标是支撑线路走向图高程渲染,提供“高程数据集管理 + 杆塔高程回填任务”闭环能力。 + - 约束:最小改动优先,不引入重型 GDAL 依赖,先落可运行方案。 + +- 本次改动(最小闭环): + - 后端模型与任务: + - 新增 `api/app/models/elevation.py` + - `elevation_dataset`:存高程数据集元信息(挂载、文件路径、分辨率、样本统计、bbox、状态)。 + - `elevation_apply_job`:存线路回填任务(模式、进度统计、状态、错误信息)。 + - 新增 `api/app/tasks/elevation_tasks.py` + - Celery 任务 `apply_elevation_for_line_job`,异步执行指定 job。 + - `api/app/core/celery_app.py` + - Celery include 扩展:加入 `app.tasks.elevation_tasks`。 + - 后端 API 与服务: + - 新增 `api/app/schemas/elevation.py`(dataset/job 请求与响应模型)。 + - 新增 `api/app/services/elevation_service.py`,提供: + - 数据集列表/创建/更新/分析; + - 回填任务列表/详情/创建; + - 回填执行逻辑(最近邻采样 CSV 点,写回 `power_line_tower.altitude_m`)。 + - 回填结果写入 `raw_extra_json.elevation`(数据集来源、采样距离、时间)。 + - 新增 `api/app/api/v1/elevation.py`: + - `GET /api/v1/elevation/datasets` + - `POST /api/v1/elevation/datasets` + - `PATCH /api/v1/elevation/datasets/{dataset_id}` + - `POST /api/v1/elevation/datasets/{dataset_id}/analyze` + - `GET /api/v1/elevation/jobs` + - `GET /api/v1/elevation/jobs/{job_id}` + - `POST /api/v1/elevation/jobs/apply-line` + - `api/app/api/router.py` 注册 elevation 路由。 + - `api/app/core/database.py` + `api/app/models/__init__.py` 注册新模型,确保 `init_db` 自动建表。 + - 权限/菜单/订阅: + - `api/app/services/seed_service.py` + - 新增权限:`elevation.read` / `elevation.manage`。 + - 新增后台菜单:`admin.elevation` -> `/admin/elevation`。 + - admin 默认菜单绑定新增 `admin.elevation`。 + - `api/app/services/legacy_authz_service.py` + - admin 默认权限加入 elevation 权限。 + - `MENU_CODE_PERMISSION_MAP` 增加 `admin.elevation`。 + - legacy synthetic 菜单补齐 `admin.elevation`。 + - `api/app/services/topic_registry.py` + - 新增 topic 规则:`admin.elevation`。 + - `api/app/services/admin_service.py` / `legacy_admin_rbac_service.py` + - 将 `admin.elevation` 设为受保护菜单(不可误删)。 + - 前端页面与类型: + - 新增 `web/src/app/admin/elevation/page.tsx` + - 高程数据集管理(创建、分析、列表)。 + - 回填任务管理(创建、进度/结果查看)。 + - 支持跳转文件管理上传 CSV(复用现有文件系统)。 + - `web/src/types/auth.ts` 增加 elevation 相关类型定义。 + - `web/src/app/admin/page.tsx` 新增“高程数据管理”卡片入口。 + - `web/src/app/admin/menus/page.tsx` 将 `admin.elevation` 加入前端受保护菜单编码集合。 + +- 验证: + - 后端语法编译: + - `python3 -m compileall api/app` -> 通过。 + - 前端构建: + - `npm run build:web` -> 通过。 + - 构建产物中已包含路由:`/admin/elevation`。 + +- 风险与影响: + - 当前实现使用 CSV 点集“最近邻采样”,适合先跑通管理与回填流程;不是严格栅格插值方案。 + - 未引入 GDAL/rasterio,部署更稳,但精度依赖 CSV 样本密度与坐标质量。 + - 回填默认允许 `overwrite_all`,存在覆盖人工高程风险;前端默认展示“仅填空(推荐)”。 diff --git a/web/src/app/admin/elevation/page.tsx b/web/src/app/admin/elevation/page.tsx new file mode 100644 index 0000000..3717b59 --- /dev/null +++ b/web/src/app/admin/elevation/page.tsx @@ -0,0 +1,566 @@ +"use client"; + +import Link from "next/link"; +import { useCallback, useMemo, useState } from "react"; +import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query"; +import { + Alert, + Empty, + Form, + Input, + InputNumber, + Modal, + Select, + Space, + Spin, + Table, + Tag, + Typography, + message, +} from "antd"; +import type { ColumnsType } from "antd/es/table"; + +import { useAuth } from "@/components/auth-provider"; +import { Card } from "@/components/ui-antd"; +import { useTopicSubscription } from "@/hooks/use-topic-subscription"; +import { readApiError } from "@/lib/api"; +import type { + ElevationApplyJobCreateResponse, + ElevationApplyJobListResponse, + ElevationApplyJobSummary, + ElevationDatasetAnalyzeResponse, + ElevationDatasetListResponse, + ElevationDatasetSummary, + LineListResponse, + LineSummary, +} from "@/types/auth"; + +type DatasetFormValues = { + code: string; + name: string; + source: string; + mount_code: string; + file_path: string; + resolution_m: number | null; + notes: string; +}; + +type ApplyFormValues = { + line_id: string; + dataset_id: string; + mode: "fill_null_only" | "overwrite_all"; +}; + +const DEFAULT_DATASET_FORM: DatasetFormValues = { + code: "", + name: "", + source: "", + mount_code: "main", + file_path: "", + resolution_m: null, + notes: "", +}; + +const DEFAULT_APPLY_FORM: ApplyFormValues = { + line_id: "", + dataset_id: "", + mode: "fill_null_only", +}; + +function statusTagColor(status: string): string { + if (status === "success" || status === "active") return "green"; + if (status === "running") return "blue"; + if (status === "pending") return "orange"; + if (status === "failed" || status === "disabled") return "red"; + return "default"; +} + +function applyModeLabel(mode: string): string { + if (mode === "fill_null_only") return "仅填空"; + if (mode === "overwrite_all") return "全部覆盖"; + return mode; +} + +function formatDate(value: string | null): string { + if (!value) return "-"; + return new Date(value).toLocaleString(); +} + +export default function AdminElevationPage() { + const queryClient = useQueryClient(); + const { user, initializing, hasPermission, fetchWithAuth } = useAuth(); + const [messageApi, messageContextHolder] = message.useMessage(); + const [error, setError] = useState(""); + const [success, setSuccess] = useState(""); + const [datasetModalOpen, setDatasetModalOpen] = useState(false); + const [applyModalOpen, setApplyModalOpen] = useState(false); + const [analyzingDatasetId, setAnalyzingDatasetId] = useState(null); + const [datasetForm] = Form.useForm(); + const [applyForm] = Form.useForm(); + + const canRead = hasPermission("elevation.read") || hasPermission("elevation.manage"); + const canManage = hasPermission("elevation.manage"); + + const datasetListPath = "/api/v1/elevation/datasets"; + const jobListPath = "/api/v1/elevation/jobs?limit=100"; + const lineListPath = "/api/v1/lines"; + + const datasetsQuery = useQuery({ + queryKey: [datasetListPath], + enabled: !!user && canRead, + queryFn: async () => { + const response = await fetchWithAuth(datasetListPath); + if (!response.ok) { + throw new Error(await readApiError(response)); + } + return (await response.json()) as ElevationDatasetListResponse; + }, + }); + + const jobsQuery = useQuery({ + queryKey: [jobListPath], + enabled: !!user && canRead, + queryFn: async () => { + const response = await fetchWithAuth(jobListPath); + if (!response.ok) { + throw new Error(await readApiError(response)); + } + return (await response.json()) as ElevationApplyJobListResponse; + }, + }); + + const linesQuery = useQuery({ + queryKey: [lineListPath], + enabled: !!user && canRead, + queryFn: async () => { + const response = await fetchWithAuth(lineListPath); + if (!response.ok) { + throw new Error(await readApiError(response)); + } + return (await response.json()) as LineListResponse; + }, + }); + + const refreshElevationData = useCallback(async () => { + await queryClient.invalidateQueries({ + predicate: (query) => + Array.isArray(query.queryKey) + && typeof query.queryKey[0] === "string" + && ( + query.queryKey[0].startsWith("/api/v1/elevation/datasets") + || query.queryKey[0].startsWith("/api/v1/elevation/jobs") + ), + }); + }, [queryClient]); + + const refreshPowerLines = useCallback(async () => { + await queryClient.invalidateQueries({ + predicate: (query) => + Array.isArray(query.queryKey) + && typeof query.queryKey[0] === "string" + && query.queryKey[0].startsWith("/api/v1/lines"), + }); + }, [queryClient]); + + useTopicSubscription( + "admin.elevation", + useCallback(() => { + void refreshElevationData(); + }, [refreshElevationData]), + ); + useTopicSubscription( + "admin.power-lines", + useCallback(() => { + void refreshPowerLines(); + }, [refreshPowerLines]), + ); + + const datasetCreateMutation = useMutation({ + mutationFn: async (values: DatasetFormValues) => { + const payload = { + code: values.code.trim(), + name: values.name.trim(), + source: values.source.trim() || null, + mount_code: values.mount_code.trim(), + file_path: values.file_path.trim(), + resolution_m: values.resolution_m, + notes: values.notes.trim() || null, + }; + const response = await fetchWithAuth("/api/v1/elevation/datasets", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify(payload), + }); + if (!response.ok) { + throw new Error(await readApiError(response)); + } + return (await response.json()) as ElevationDatasetSummary; + }, + onSuccess: async () => { + setSuccess("高程数据集已创建"); + setError(""); + messageApi.success("高程数据集已创建"); + setDatasetModalOpen(false); + datasetForm.resetFields(); + await refreshElevationData(); + }, + onError: (candidate) => { + const nextError = candidate instanceof Error ? candidate.message : "创建高程数据集失败"; + setError(nextError); + setSuccess(""); + messageApi.error(nextError); + }, + }); + + const analyzeMutation = useMutation({ + mutationFn: async (datasetId: string) => { + const response = await fetchWithAuth(`/api/v1/elevation/datasets/${datasetId}/analyze`, { + method: "POST", + }); + if (!response.ok) { + throw new Error(await readApiError(response)); + } + return (await response.json()) as ElevationDatasetAnalyzeResponse; + }, + onMutate: (datasetId) => { + setAnalyzingDatasetId(datasetId); + }, + onSuccess: async (payload) => { + const warnings = payload.warnings.length; + const msg = warnings > 0 ? `分析完成(${warnings} 条告警)` : "分析完成"; + setSuccess(msg); + setError(""); + messageApi.success(msg); + await refreshElevationData(); + }, + onError: (candidate) => { + const nextError = candidate instanceof Error ? candidate.message : "分析失败"; + setError(nextError); + setSuccess(""); + messageApi.error(nextError); + }, + onSettled: () => { + setAnalyzingDatasetId(null); + }, + }); + + const applyMutation = useMutation({ + mutationFn: async (values: ApplyFormValues) => { + const response = await fetchWithAuth("/api/v1/elevation/jobs/apply-line", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ + line_id: values.line_id, + dataset_id: values.dataset_id, + mode: values.mode, + }), + }); + if (!response.ok) { + throw new Error(await readApiError(response)); + } + return (await response.json()) as ElevationApplyJobCreateResponse; + }, + onSuccess: async () => { + setSuccess("高程回填任务已提交"); + setError(""); + messageApi.success("高程回填任务已提交"); + setApplyModalOpen(false); + applyForm.resetFields(); + await refreshElevationData(); + }, + onError: (candidate) => { + const nextError = candidate instanceof Error ? candidate.message : "提交回填任务失败"; + setError(nextError); + setSuccess(""); + messageApi.error(nextError); + }, + }); + + const datasets = datasetsQuery.data?.items ?? []; + const jobs = jobsQuery.data?.items ?? []; + const lines = linesQuery.data?.items ?? []; + + const lineOptions = useMemo( + () => + lines.map((item: LineSummary) => ({ + value: item.id, + label: `${item.code} - ${item.name}`, + })), + [lines], + ); + const datasetOptions = useMemo( + () => + datasets + .filter((item) => item.status === "active") + .map((item) => ({ + value: item.id, + label: `${item.code} - ${item.name}`, + })), + [datasets], + ); + + const datasetColumns = useMemo>( + () => [ + { title: "编码", dataIndex: "code", width: 140 }, + { title: "名称", dataIndex: "name", width: 220 }, + { title: "来源", dataIndex: "source", width: 140, render: (value: string | null) => value || "-" }, + { title: "挂载", dataIndex: "mount_code", width: 100 }, + { title: "文件路径", dataIndex: "file_path", width: 260 }, + { title: "分辨率(m)", dataIndex: "resolution_m", width: 110, render: (value: number | null) => value ?? "-" }, + { title: "样本数", dataIndex: "sample_count", width: 100 }, + { + title: "状态", + dataIndex: "status", + width: 90, + render: (value: string) => {value}, + }, + { + title: "边界框", + key: "bbox", + width: 280, + render: (_, row) => ( + + {row.bbox_min_lon ?? "-"}, {row.bbox_min_lat ?? "-"} ~ {row.bbox_max_lon ?? "-"}, {row.bbox_max_lat ?? "-"} + + ), + }, + { + title: "更新时间", + dataIndex: "update_date", + width: 170, + render: (value: string) => formatDate(value), + }, + { + title: "操作", + key: "actions", + fixed: "right", + width: 120, + render: (_, row) => ( + + { + if (!canManage) return; + analyzeMutation.mutate(row.id); + }} + > + {analyzingDatasetId === row.id ? "分析中..." : "分析"} + + + ), + }, + ], + [analyzeMutation, analyzingDatasetId, canManage], + ); + + const jobColumns = useMemo>( + () => [ + { + title: "任务ID", + dataIndex: "id", + width: 180, + render: (value: string) => {value}, + }, + { title: "线路", width: 220, render: (_, row) => `${row.line_code || "-"} ${row.line_name || ""}`.trim() || "-" }, + { title: "数据集", width: 220, render: (_, row) => `${row.dataset_code || "-"} ${row.dataset_name || ""}`.trim() || "-" }, + { title: "模式", dataIndex: "mode", width: 110, render: (value: string) => applyModeLabel(value) }, + { title: "状态", dataIndex: "status", width: 100, render: (value: string) => {value} }, + { title: "总杆塔", dataIndex: "total_tower_count", width: 90 }, + { title: "更新", dataIndex: "updated_tower_count", width: 80 }, + { title: "跳过", dataIndex: "skipped_tower_count", width: 80 }, + { title: "缺坐标", dataIndex: "missing_geo_count", width: 90 }, + { title: "未匹配", dataIndex: "unmatched_count", width: 90 }, + { title: "开始时间", dataIndex: "started_at", width: 170, render: (value: string | null) => formatDate(value) }, + { title: "结束时间", dataIndex: "finished_at", width: 170, render: (value: string | null) => formatDate(value) }, + { + title: "错误", + dataIndex: "error_message", + width: 240, + render: (value: string | null) => value || "-", + }, + ], + [], + ); + + if (initializing || datasetsQuery.isLoading || jobsQuery.isLoading || linesQuery.isLoading) { + return ( +
+ +
+ ); + } + + if (!user) { + return ( +
+ 请先登录后再访问高程数据管理页面。 + + 返回首页 + +
+ ); + } + + if (!canRead) { + return ( +
+ 你没有访问该页面的权限(需要 `elevation.read`)。 + + 返回首页 + +
+ ); + } + + return ( +
+ {messageContextHolder} + + {(error || success || datasetsQuery.error || jobsQuery.error || linesQuery.error) && ( + + )} + + + + 去文件管理上传 + + {canManage && ( + { + event.preventDefault(); + datasetForm.setFieldsValue(DEFAULT_DATASET_FORM); + setDatasetModalOpen(true); + }} + > + 新建数据集 + + )} + + )} + > + {datasets.length === 0 ? ( + + ) : ( + + rowKey={(row) => row.id} + columns={datasetColumns} + dataSource={datasets} + pagination={false} + scroll={{ x: 1650 }} + /> + )} + + + { + event.preventDefault(); + applyForm.setFieldsValue(DEFAULT_APPLY_FORM); + setApplyModalOpen(true); + }} + > + 新建回填任务 + + ) : null} + > + {jobs.length === 0 ? ( + + ) : ( + + rowKey={(row) => row.id} + columns={jobColumns} + dataSource={jobs} + pagination={false} + scroll={{ x: 1900 }} + /> + )} + + + { + if (datasetCreateMutation.isPending) return; + setDatasetModalOpen(false); + }} + onOk={() => { + void datasetForm.validateFields().then((values) => { + datasetCreateMutation.mutate(values); + }); + }} + confirmLoading={datasetCreateMutation.isPending} + > + form={datasetForm} layout="vertical" initialValues={DEFAULT_DATASET_FORM}> + + + + + + + + + + + + + + + + + + + + + + + + + { + if (applyMutation.isPending) return; + setApplyModalOpen(false); + }} + onOk={() => { + void applyForm.validateFields().then((values) => { + applyMutation.mutate(values); + }); + }} + confirmLoading={applyMutation.isPending} + > + form={applyForm} layout="vertical" initialValues={DEFAULT_APPLY_FORM}> + + + + +