支持高程数据管理批量导入

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
2026-05-03 11:59:40 +08:00
parent 4aad80b34f
commit 779acf8e1e
6 changed files with 279 additions and 13 deletions
+16 -1
View File
@@ -1,6 +1,6 @@
from __future__ import annotations
from fastapi import APIRouter, Depends, HTTPException, Query, status
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile, status
from sqlalchemy.orm import Session
from ...core.database import get_db
@@ -11,6 +11,7 @@ from ...schemas.elevation import (
ElevationApplyJobListResponse,
ElevationApplyJobSummary,
ElevationDatasetAnalyzeResponse,
ElevationDatasetBatchImportResponse,
ElevationDatasetCreateRequest,
ElevationDatasetListResponse,
ElevationDatasetPreviewResponse,
@@ -23,6 +24,7 @@ from ...services.elevation_service import (
create_dataset,
delete_dataset,
get_job_by_id,
import_datasets_from_csv,
list_datasets,
list_jobs,
preview_dataset,
@@ -59,6 +61,19 @@ def create_elevation_dataset(
return created
@router.post("/datasets/import", response_model=ElevationDatasetBatchImportResponse)
def import_elevation_datasets(
file: UploadFile = File(...),
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationDatasetBatchImportResponse:
return import_datasets_from_csv(
db,
file=file,
actor=current_user.user,
)
@router.patch("/datasets/{dataset_id}", response_model=ElevationDatasetSummary)
def update_elevation_dataset(
dataset_id: str,
+9
View File
@@ -38,6 +38,15 @@ class ElevationDatasetListResponse(BaseModel):
total: int
class ElevationDatasetBatchImportResponse(BaseModel):
imported_count: int
analyzed_count: int
skipped_count: int
warning_count: int
warnings: list[str] = Field(default_factory=list)
items: list[ElevationDatasetSummary] = Field(default_factory=list)
class ElevationDatasetCreateRequest(BaseModel):
code: str = Field(min_length=2, max_length=64)
name: str = Field(min_length=2, max_length=255)
+141 -1
View File
@@ -8,7 +8,7 @@ from pathlib import Path
from tempfile import NamedTemporaryFile
from typing import Any
from fastapi import HTTPException, status
from fastapi import HTTPException, UploadFile, status
from sqlalchemy import delete, func, select
from sqlalchemy.orm import Session
@@ -24,6 +24,7 @@ from ..schemas.elevation import (
ElevationApplyJobListResponse,
ElevationApplyJobSummary,
ElevationDatasetAnalyzeResponse,
ElevationDatasetBatchImportResponse,
ElevationDatasetPreviewCell,
ElevationDatasetPreviewDiagnostics,
ElevationDatasetCreateRequest,
@@ -40,6 +41,7 @@ from .storage_driver import StorageInvalidPathError, StoragePathNotFoundError
ELEVATION_TOPIC = "admin.elevation"
POWER_LINES_TOPIC = "admin.power-lines"
CSV_ENCODINGS = ("utf-8-sig", "utf-8", "gbk", "latin-1")
CSV_IMPORT_TEXT_ENCODINGS = ("utf-8-sig", "utf-8", "gbk", "latin-1")
NEAREST_MATCH_MAX_DISTANCE_M = 2000.0
ELEVATION_FILE_EXT_FORMAT_MAP = {
".csv": "csv",
@@ -78,6 +80,21 @@ class _OpenedRasterDataset:
return False
@dataclass
class ElevationDatasetBatchImportStats:
imported_count: int = 0
analyzed_count: int = 0
skipped_count: int = 0
warnings: list[str] | None = None
items: list[ElevationDatasetSummary] | None = None
def __post_init__(self) -> None:
if self.warnings is None:
self.warnings = []
if self.items is None:
self.items = []
def serialize_dataset(item: ElevationDataset) -> ElevationDatasetSummary:
return ElevationDatasetSummary(
id=item.id,
@@ -226,6 +243,96 @@ def create_dataset(
return serialize_dataset(saved)
def import_datasets_from_csv(
db: Session,
*,
file: UploadFile,
actor: User,
) -> ElevationDatasetBatchImportResponse:
content = file.file.read()
if not content:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="上传文件为空")
text = _decode_text_bytes_for_import(content)
rows = list(csv.DictReader(io.StringIO(text)))
if not rows:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="CSV 文件没有可导入的数据行")
stats = ElevationDatasetBatchImportStats()
batch_created_ids: list[str] = []
for row_index, row in enumerate(rows, start=2):
normalized_row = {
str(key).strip(): value
for key, value in row.items()
if key is not None
}
if all(not str(value or "").strip() for value in normalized_row.values()):
continue
code = _pick_csv_value(normalized_row, ["code", "编码"])
name = _pick_csv_value(normalized_row, ["name", "名称"])
mount_code = _pick_csv_value(normalized_row, ["mount_code", "挂载编码", "挂载"])
file_path = _pick_csv_value(normalized_row, ["file_path", "文件路径", "路径"])
source = _pick_csv_value(normalized_row, ["source", "来源"])
notes = _pick_csv_value(normalized_row, ["notes", "备注"])
resolution_text = _pick_csv_value(normalized_row, ["resolution_m", "分辨率", "分辨率m", "分辨率(米)"])
if not code or not name or not mount_code or not file_path:
stats.skipped_count += 1
if stats.warnings is not None:
stats.warnings.append(f"{row_index} 行缺少必填字段(code/name/mount_code/file_path),已跳过")
continue
try:
payload = ElevationDatasetCreateRequest(
code=code,
name=name,
source=source,
mount_code=mount_code,
file_path=file_path,
resolution_m=_parse_csv_optional_positive_float(resolution_text),
notes=notes,
)
created = create_dataset(db, payload, actor=actor)
if created is None:
stats.skipped_count += 1
if stats.warnings is not None:
stats.warnings.append(f"{row_index} 行编码重复({code}),已跳过")
continue
stats.imported_count += 1
if stats.items is not None:
stats.items.append(created)
batch_created_ids.append(created.id)
except HTTPException as exc:
stats.skipped_count += 1
detail = str(exc.detail) if exc.detail else "未知错误"
if stats.warnings is not None:
stats.warnings.append(f"{row_index} 行导入失败({code}):{detail}")
continue
for dataset_id in batch_created_ids:
try:
analyze_dataset(db, dataset_id=dataset_id, actor=actor)
stats.analyzed_count += 1
except HTTPException as exc:
detail = str(exc.detail) if exc.detail else "未知错误"
if stats.warnings is not None:
stats.warnings.append(f"数据集 {dataset_id} 自动分析失败:{detail}")
except Exception as exc:
if stats.warnings is not None:
stats.warnings.append(f"数据集 {dataset_id} 自动分析异常:{exc}")
return ElevationDatasetBatchImportResponse(
imported_count=stats.imported_count,
analyzed_count=stats.analyzed_count,
skipped_count=stats.skipped_count,
warning_count=len(stats.warnings or []),
warnings=stats.warnings or [],
items=stats.items or [],
)
def update_dataset(
db: Session,
dataset_id: str,
@@ -1276,6 +1383,39 @@ def _parse_float(value: Any) -> float | None:
return None
def _decode_text_bytes_for_import(content: bytes) -> str:
for encoding in CSV_IMPORT_TEXT_ENCODINGS:
try:
return content.decode(encoding)
except UnicodeDecodeError:
continue
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="CSV 编码不受支持")
def _pick_csv_value(row: dict[str, Any], keys: list[str]) -> str | None:
normalized_keys = {str(key).strip(): key for key in row.keys()}
for key in keys:
actual_key = normalized_keys.get(key)
if actual_key is None:
continue
value = _normalize_str(row.get(actual_key))
if value is not None:
return value
return None
def _parse_csv_optional_positive_float(value: str | None) -> float | None:
if value is None:
return None
try:
number = float(value)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"无效分辨率:{value}") from exc
if number <= 0:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"分辨率必须大于 0{value}")
return number
def _publish_elevation_change(event_name: str, payload: dict[str, Any]) -> None:
_fire_and_forget(
publish_topic(
+38
View File
@@ -146,3 +146,41 @@
- 风险与影响:
- 删除数据集会同时删除其关联的回填任务记录(仅记录,不会回滚已写入杆塔的高程值)。
- 若数据集存在运行中任务,接口会拒绝删除并提示先等待任务结束。
## Work Log - 高程数据管理支持批量导入(2026-05-03)
- 背景:
- Issue `FL-182` 要求“高程数据管理要支持批量导入”。
- 现有能力仅支持单条新建数据集,不适合一次导入多条高程数据集元信息。
- 本次改动(最小闭环):
- 后端新增批量导入 API
- 文件:`api/app/api/v1/elevation.py`
- 新增 `POST /api/v1/elevation/datasets/import`(权限:`elevation.manage``multipart/form-data` 上传 CSV)。
- 后端新增批量导入服务逻辑:
- 文件:`api/app/services/elevation_service.py`
- 新增 `import_datasets_from_csv(...)`
- 解析 CSV 行级导入,支持中英文列名:`code/编码``name/名称``mount_code/挂载编码``file_path/文件路径``source/来源``resolution_m/分辨率``notes/备注`
- 复用现有 `create_dataset` 校验(格式识别、文件存在性检查、编码唯一性)。
- 对导入成功的数据集自动触发 `analyze_dataset`,补齐样本数和边界框。
- 返回导入统计(新增/分析/跳过/告警)与成功导入的数据集清单。
- 新增辅助函数:CSV 编码解码、列名取值、分辨率解析校验。
- 后端 schema 扩展:
- 文件:`api/app/schemas/elevation.py`
- 新增 `ElevationDatasetBatchImportResponse`
- 前端高程管理页接入批量导入入口:
- 文件:`web/src/app/admin/elevation/page.tsx`
- 在“高程数据集”卡片右上新增“批量导入”按钮(上传 CSV)。
- 调用 `/api/v1/elevation/datasets/import`,导入后提示统计结果并刷新数据。
- 页面提示文案补充“支持通过批量导入上传数据集元数据 CSV”。
- 前端类型补充:
- 文件:`web/src/types/auth.ts`
- 新增 `ElevationDatasetBatchImportResponse`
- 验证:
- 后端语法编译:`python3 -m compileall api/app` 通过。
- 遵循任务约束,未执行前端构建检查、未安装依赖。
- 风险与影响:
- 批量导入 CSV 仅导入“数据集元信息”,不会上传实际高程文件;文件需先在文件管理中准备到目标挂载路径。
- 单行存在字段/路径/编码问题时按行跳过并返回告警,不会中断整个批次。
+66 -11
View File
@@ -1,7 +1,7 @@
"use client";
import Link from "next/link";
import { useCallback, useMemo, useState } from "react";
import { ChangeEvent, useCallback, useMemo, useRef, useState } from "react";
import { useMutation, useQuery, useQueryClient } from "@tanstack/react-query";
import {
App,
@@ -32,6 +32,7 @@ import type {
ElevationApplyJobListResponse,
ElevationApplyJobSummary,
ElevationDatasetAnalyzeResponse,
ElevationDatasetBatchImportResponse,
ElevationDatasetListResponse,
ElevationDatasetPreviewResponse,
ElevationDatasetSummary,
@@ -109,6 +110,7 @@ export default function AdminElevationPage() {
const [previewData, setPreviewData] = useState<ElevationDatasetPreviewResponse | null>(null);
const [previewLoading, setPreviewLoading] = useState(false);
const [analyzingDatasetId, setAnalyzingDatasetId] = useState<string | null>(null);
const importInputRef = useRef<HTMLInputElement | null>(null);
const [datasetForm] = Form.useForm<DatasetFormValues>();
const [applyForm] = Form.useForm<ApplyFormValues>();
@@ -259,6 +261,36 @@ export default function AdminElevationPage() {
},
});
const datasetImportMutation = useMutation({
mutationFn: async (file: File) => {
const formData = new FormData();
formData.append("file", file);
const response = await fetchWithAuth("/api/v1/elevation/datasets/import", {
method: "POST",
body: formData,
});
if (!response.ok) {
throw new Error(await readApiError(response));
}
return (await response.json()) as ElevationDatasetBatchImportResponse;
},
onSuccess: async (payload) => {
const msg = payload.warning_count > 0
? `批量导入完成:新增 ${payload.imported_count} 条,分析 ${payload.analyzed_count} 条,跳过 ${payload.skipped_count} 条,告警 ${payload.warning_count}`
: `批量导入完成:新增 ${payload.imported_count} 条,分析 ${payload.analyzed_count} 条,跳过 ${payload.skipped_count}`;
setSuccess(msg);
setError("");
messageApi.success(msg);
await refreshElevationData();
},
onError: (candidate) => {
const nextError = candidate instanceof Error ? candidate.message : "批量导入高程数据集失败";
setError(nextError);
setSuccess("");
messageApi.error(nextError);
},
});
const applyMutation = useMutation({
mutationFn: async (values: ApplyFormValues) => {
const response = await fetchWithAuth("/api/v1/elevation/jobs/apply-line", {
@@ -528,15 +560,38 @@ export default function AdminElevationPage() {
<Typography.Link></Typography.Link>
</Link>
{canManage && (
<a
onClick={(event) => {
event.preventDefault();
datasetForm.setFieldsValue(DEFAULT_DATASET_FORM);
setDatasetModalOpen(true);
}}
>
</a>
<>
<Typography.Link
onClick={() => {
if (datasetImportMutation.isPending) return;
importInputRef.current?.click();
}}
>
</Typography.Link>
<input
ref={importInputRef}
type="file"
accept=".csv,text/csv"
className="hidden"
onChange={(event: ChangeEvent<HTMLInputElement>) => {
const file = event.target.files?.[0];
if (file) {
datasetImportMutation.mutate(file);
}
event.target.value = "";
}}
/>
<a
onClick={(event) => {
event.preventDefault();
datasetForm.setFieldsValue(DEFAULT_DATASET_FORM);
setDatasetModalOpen(true);
}}
>
</a>
</>
)}
</Space>
)}
@@ -545,7 +600,7 @@ export default function AdminElevationPage() {
type="info"
showIcon
message="支持文件格式:CSV(点集)/ IMG / TIF / TIFF(栅格)"
description="CSV 预览为点云;IMG/TIF/TIFF 预览为地形网格高低色带(与杆塔无关)。"
description="CSV 预览为点云;IMG/TIF/TIFF 预览为地形网格高低色带(与杆塔无关)。支持通过“批量导入”上传数据集元数据 CSV。"
className="mb-4"
/>
{datasets.length === 0 ? (
+9
View File
@@ -314,6 +314,15 @@ export type ElevationDatasetAnalyzeResponse = {
warnings: string[];
};
export type ElevationDatasetBatchImportResponse = {
imported_count: number;
analyzed_count: number;
skipped_count: number;
warning_count: number;
warnings: string[];
items: ElevationDatasetSummary[];
};
export type ElevationDatasetPreviewPoint = {
longitude: number;
latitude: number;