From 2a54857fe1d2a64bb5eeac02f38c3829d931990b Mon Sep 17 00:00:00 2001 From: chengkai3 Date: Wed, 10 Jun 2026 08:26:51 +0800 Subject: [PATCH] =?UTF-8?q?[fix/feat]:[FL-76][=E9=AB=98=E7=A8=8B=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=AF=BC=E5=85=A5=E6=8A=A5=E9=94=99]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: multica-agent --- api/app/api/v1/elevation.py | 4 +- api/app/services/elevation_service.py | 79 ++++++++++--- api/tests/test_async_dispatch_services.py | 138 ++++++++++++++++++++++ web/src/app/admin/elevation/page.tsx | 120 ++++++++++++++++--- 4 files changed, 302 insertions(+), 39 deletions(-) diff --git a/api/app/api/v1/elevation.py b/api/app/api/v1/elevation.py index b3b955d..c7d37ca 100644 --- a/api/app/api/v1/elevation.py +++ b/api/app/api/v1/elevation.py @@ -1,6 +1,6 @@ from __future__ import annotations -from fastapi import APIRouter, Depends, File, HTTPException, Query, Response, UploadFile, status +from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Response, UploadFile, status from sqlalchemy.orm import Session from ...core.database import get_db @@ -91,6 +91,7 @@ def import_elevation_datasets( def import_elevation_dataset_data( dataset_id: str, files: list[UploadFile] = File(...), + trigger_analysis: bool = Form(True), current_user: CurrentUser = Depends(require_permission("elevation.manage")), db: Session = Depends(get_db), ) -> ElevationDatasetDataImportResponse: @@ -99,6 +100,7 @@ def import_elevation_dataset_data( dataset_id=dataset_id, files=files, actor=current_user.user, + trigger_analysis=trigger_analysis, ) diff --git a/api/app/services/elevation_service.py b/api/app/services/elevation_service.py index dc98f69..c7d6e9e 100644 --- a/api/app/services/elevation_service.py +++ b/api/app/services/elevation_service.py @@ -520,6 +520,7 @@ def import_dataset_data_files( dataset_id: str, files: list[UploadFile], actor: User, + trigger_analysis: bool = True, ) -> ElevationDatasetDataImportResponse: dataset = get_dataset_by_id(db, dataset_id) if not dataset: @@ -573,13 +574,25 @@ def import_dataset_data_files( uploaded_file_count += 1 imported_files.append(target_path) - preferred_file_path = _pick_preferred_dataset_file_path(paths=imported_files) + available_file_paths = _list_dataset_imported_file_paths( + driver=driver, + dataset_dir=dataset_dir, + ) + preferred_file_path = _pick_preferred_dataset_file_path( + paths=available_file_paths, + current_path=dataset.file_path, + ) if preferred_file_path is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未导入任何可用高程文件") dataset.dataset_dir = dataset_dir dataset.file_path = preferred_file_path dataset.file_format = _detect_file_format(preferred_file_path) + dataset.sample_count = 0 + dataset.bbox_min_lon = None + dataset.bbox_max_lon = None + dataset.bbox_min_lat = None + dataset.bbox_max_lat = None dataset.usage_status = "idle" dataset.terrain_status = _default_terrain_status_for_format(dataset.file_format) dataset.terrain_task_id = None @@ -590,26 +603,32 @@ def import_dataset_data_files( dataset.terrain_max_zoom = None dataset.terrain_bounds = None dataset.terrain_metadata = None + dataset.analysis_task_id = None + dataset.analysis_status = "not_started" + dataset.analysis_error_message = None + dataset.analysis_started_at = None + dataset.analysis_finished_at = None dataset.update_user = actor.id dataset.update_date = utcnow() db.commit() analysis_task_queued = False analysis_task_id: str | None = None - try: - task = _dispatch_elevation_dataset_analysis_task(dataset_id=dataset.id, actor_user_id=actor.id) - analysis_task_queued = True - analysis_task_id = str(task.id) - dataset.analysis_task_id = analysis_task_id - dataset.analysis_status = "queued" - dataset.analysis_error_message = None - dataset.analysis_started_at = None - dataset.analysis_finished_at = None - dataset.update_date = utcnow() - dataset.update_user = actor.id - db.commit() - except Exception as exc: # pragma: no cover - warnings.append(f"自动分析任务派发失败:{exc}") + if trigger_analysis: + try: + task = _dispatch_elevation_dataset_analysis_task(dataset_id=dataset.id, actor_user_id=actor.id) + analysis_task_queued = True + analysis_task_id = str(task.id) + dataset.analysis_task_id = analysis_task_id + dataset.analysis_status = "queued" + dataset.analysis_error_message = None + dataset.analysis_started_at = None + dataset.analysis_finished_at = None + dataset.update_date = utcnow() + dataset.update_user = actor.id + db.commit() + except Exception as exc: # pragma: no cover + warnings.append(f"自动分析任务派发失败:{exc}") refreshed = get_dataset_by_id(db, dataset.id) if refreshed is None: @@ -1656,13 +1675,35 @@ def _extract_zip_to_dataset_directory( } -def _pick_preferred_dataset_file_path(*, paths: list[str]) -> str | None: +def _list_dataset_imported_file_paths(*, driver: Any, dataset_dir: str) -> list[str]: + try: + entries = driver.list_dir(dataset_dir) + except StoragePathNotFoundError as exc: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"高程数据目录不存在: {dataset_dir}") from exc + except StorageInvalidPathError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + except StorageDriverError as exc: + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc + + return [ + entry.path + for entry in entries + if not entry.is_dir and Path(entry.name).suffix.lower() in IMPORTABLE_ELEVATION_EXTENSIONS + ] + + +def _pick_preferred_dataset_file_path(*, paths: list[str], current_path: str | None = None) -> str | None: if not paths: return None for extension in (".img", ".tif", ".tiff", ".csv"): - for path in paths: - if path.lower().endswith(extension): - return path + matching_paths = [path for path in paths if path.lower().endswith(extension)] + if not matching_paths: + continue + if current_path in matching_paths: + return current_path + return matching_paths[0] + if current_path in paths: + return current_path return paths[0] diff --git a/api/tests/test_async_dispatch_services.py b/api/tests/test_async_dispatch_services.py index 5ab051e..f1afd2b 100644 --- a/api/tests/test_async_dispatch_services.py +++ b/api/tests/test_async_dispatch_services.py @@ -1,5 +1,6 @@ from __future__ import annotations +import io from pathlib import Path from types import SimpleNamespace @@ -16,6 +17,54 @@ from app.schemas.wine import WineRunRequest from app.services import atp_model_service, elevation_service, wine_service +class _MemoryStorageDriver: + def __init__(self) -> None: + self.directories: set[str] = set() + self.files: dict[str, bytes] = {} + + def ensure_directory(self, path: str) -> None: + self.directories.add(path.rstrip("/") or "/") + + def write_file(self, path: str, *, content: bytes, content_type: str | None = None) -> SimpleNamespace: + self.files[path] = content + parent_path = path.rsplit("/", 1)[0] or "/" + return SimpleNamespace( + path=path, + parent_path=parent_path, + name=Path(path).name, + is_dir=False, + size=len(content), + modified_at=None, + mime_type=content_type, + ) + + def list_dir(self, path: str) -> list[SimpleNamespace]: + prefix = f"{path.rstrip('/')}/" + entries: list[SimpleNamespace] = [] + for file_path in sorted(self.files): + if not file_path.startswith(prefix): + continue + suffix = file_path[len(prefix):] + if "/" in suffix: + continue + entries.append( + SimpleNamespace( + path=file_path, + parent_path=path, + name=Path(file_path).name, + is_dir=False, + size=len(self.files[file_path]), + modified_at=None, + mime_type=None, + ) + ) + return entries + + +def _build_upload(filename: str, content: bytes, content_type: str = "application/octet-stream") -> SimpleNamespace: + return SimpleNamespace(filename=filename, file=io.BytesIO(content), content_type=content_type) + + def _build_sessionmaker(*tables): engine = create_engine("sqlite+pysqlite:///:memory:") Base.metadata.create_all(bind=engine, tables=list(tables)) @@ -165,6 +214,95 @@ def test_queue_dataset_terrain_build_reuses_existing_running_task(monkeypatch) - session.close() +def test_import_dataset_data_files_batches_keep_preferred_raster_and_only_queue_final_analysis(monkeypatch) -> None: + testing_session = _build_sessionmaker(ElevationDataset.__table__) + session: Session = testing_session() + try: + dataset = ElevationDataset( + code="ELEV-IMPORT-001", + name="批量导入样例", + file_format="csv", + mount_code="default", + dataset_dir="/elevation/datasets/ELEV-IMPORT-001", + file_path="/elevation/datasets/ELEV-IMPORT-001/dataset.csv", + status="active", + usage_status="idle", + sample_count=128, + bbox_min_lon=100.0, + bbox_max_lon=120.0, + bbox_min_lat=20.0, + bbox_max_lat=30.0, + analysis_task_id="old-task", + analysis_status="success", + terrain_status="not_supported", + ) + session.add(dataset) + session.commit() + + actor = User( + id="actor-1", + email="actor@example.com", + username="actor", + password_hash="hashed", + status="active", + ) + driver = _MemoryStorageDriver() + analysis_calls: list[tuple[str, str | None]] = [] + + monkeypatch.setattr(elevation_service, "_require_mount", lambda *_args, **_kwargs: SimpleNamespace(code="default")) + monkeypatch.setattr(elevation_service, "_build_driver_or_400", lambda *_args, **_kwargs: driver) + monkeypatch.setattr( + elevation_service, + "_dispatch_elevation_dataset_analysis_task", + lambda *, dataset_id, actor_user_id: analysis_calls.append((dataset_id, actor_user_id)) or SimpleNamespace(id="new-task"), + ) + monkeypatch.setattr(elevation_service, "_publish_elevation_change", lambda *_args, **_kwargs: None) + + first = elevation_service.import_dataset_data_files( + session, + dataset_id=dataset.id, + files=[_build_upload("terrain.img", b"img-bytes", "application/octet-stream")], + actor=actor, + trigger_analysis=False, + ) + session.refresh(dataset) + + assert first.analysis_task_queued is False + assert first.analysis_task_id is None + assert dataset.file_path.endswith("/terrain.img") + assert dataset.file_format == "img" + assert dataset.analysis_status == "not_started" + assert dataset.analysis_task_id is None + assert dataset.sample_count == 0 + assert dataset.bbox_min_lon is None + assert dataset.terrain_status == "pending" + assert analysis_calls == [] + + second = elevation_service.import_dataset_data_files( + session, + dataset_id=dataset.id, + files=[_build_upload("points.csv", b"lon,lat,elevation\n1,2,3\n", "text/csv")], + actor=actor, + trigger_analysis=True, + ) + session.refresh(dataset) + + assert second.analysis_task_queued is True + assert second.analysis_task_id == "new-task" + assert dataset.file_path.endswith("/terrain.img") + assert dataset.file_format == "img" + assert dataset.analysis_status == "queued" + assert dataset.analysis_task_id == "new-task" + assert dataset.terrain_status == "pending" + assert analysis_calls == [(dataset.id, actor.id)] + assert set(driver.files) == { + "/elevation/datasets/ELEV-IMPORT-001/terrain.img", + "/elevation/datasets/ELEV-IMPORT-001/points.csv", + } + finally: + session.close() + + def test_wine_create_run_queues_task_and_worker_records_failure(monkeypatch) -> None: testing_session = _build_sessionmaker(WineRun.__table__) session: Session = testing_session() diff --git a/web/src/app/admin/elevation/page.tsx b/web/src/app/admin/elevation/page.tsx index a3f3f7e..d9579df 100644 --- a/web/src/app/admin/elevation/page.tsx +++ b/web/src/app/admin/elevation/page.tsx @@ -77,6 +77,8 @@ const DEFAULT_APPLY_FORM: ApplyFormValues = { mode: "fill_null_only", }; +const DATASET_IMPORT_BATCH_SIZE = 20; + function statusTagColor(status: string): string { if (status === "success" || status === "active") return "green"; if (status === "running") return "blue"; @@ -146,6 +148,17 @@ function readXhrError(xhr: XMLHttpRequest): string { } } +function chunkFiles(files: File[], size: number): File[][] { + if (size <= 0) { + return [files]; + } + const chunks: File[][] = []; + for (let index = 0; index < files.length; index += size) { + chunks.push(files.slice(index, index + size)); + } + return chunks; +} + export default function AdminElevationPage() { const queryClient = useQueryClient(); const { @@ -316,7 +329,11 @@ export default function AdminElevationPage() { payload.files.length === 1 ? payload.files[0].name : `共 ${payload.files.length} 个文件`, ); - const uploadWithXhr = (token: string | null) => + const uploadWithXhr = ( + token: string | null, + files: File[], + options: { completedBytes: number; totalBytes: number; triggerAnalysis: boolean; label: string }, + ) => new Promise((resolve, reject) => { const xhr = new XMLHttpRequest(); xhr.open("POST", `${getApiBaseUrl()}/api/v1/elevation/datasets/${payload.datasetId}/data/import`); @@ -326,16 +343,16 @@ export default function AdminElevationPage() { } xhr.upload.onprogress = (event: ProgressEvent) => { - if (!event.lengthComputable || event.total <= 0) { + if (options.totalBytes <= 0) { return; } - const percent = Math.min(99, Math.max(0, Math.round((event.loaded / event.total) * 100))); + const loadedBytes = options.completedBytes + (event.lengthComputable ? event.loaded : 0); + const percent = Math.min(99, Math.max(0, Math.round((loadedBytes / options.totalBytes) * 100))); setDatasetDataUploadProgress(percent); }; xhr.onload = () => { if (xhr.status >= 200 && xhr.status < 300) { - setDatasetDataUploadProgress(100); try { resolve(JSON.parse(xhr.responseText) as ElevationDatasetDataImportResponse); } catch { @@ -350,31 +367,96 @@ export default function AdminElevationPage() { xhr.onabort = () => reject(new Error("导入已取消")); const formData = new FormData(); - for (const file of payload.files) { + formData.append("trigger_analysis", String(options.triggerAnalysis)); + for (const file of files) { formData.append("files", file); } + setDatasetDataUploadFileName(options.label); xhr.send(formData); }); - let result: ElevationDatasetDataImportResponse; - try { - result = await uploadWithXhr(getAccessToken()); - } catch (error) { - const message = error instanceof Error ? error.message : "导入失败"; - const isUnauthorized = message.includes("401") || message.includes("未授权"); - if (!isUnauthorized) { - throw error; + const batches = chunkFiles(payload.files, DATASET_IMPORT_BATCH_SIZE); + const totalBytes = payload.files.reduce((sum, file) => sum + file.size, 0); + let completedBytes = 0; + let latestResult: ElevationDatasetDataImportResponse | null = null; + let uploadedFileCount = 0; + let extractedFileCount = 0; + let importedFileCount = 0; + let analysisTaskQueued = false; + let analysisTaskId: string | null = null; + const warnings: string[] = []; + const importedFiles = new Set(); + + for (const [index, batch] of batches.entries()) { + const batchBytes = batch.reduce((sum, file) => sum + file.size, 0); + const triggerAnalysis = index === batches.length - 1; + const label = batches.length === 1 + ? (batch.length === 1 ? batch[0].name : `共 ${batch.length} 个文件`) + : `第 ${index + 1}/${batches.length} 批,共 ${batch.length} 个文件`; + + let result: ElevationDatasetDataImportResponse; + try { + result = await uploadWithXhr(getAccessToken(), batch, { + completedBytes, + totalBytes, + triggerAnalysis, + label, + }); + } catch (error) { + const message = error instanceof Error ? error.message : "导入失败"; + const isUnauthorized = message.includes("401") || message.includes("未授权"); + if (!isUnauthorized) { + throw error; + } + const refreshed = await refreshAccessToken(); + if (!refreshed) { + throw error; + } + result = await uploadWithXhr(getAccessToken(), batch, { + completedBytes, + totalBytes, + triggerAnalysis, + label, + }); } - const refreshed = await refreshAccessToken(); - if (!refreshed) { - throw error; + + latestResult = result; + uploadedFileCount += result.uploaded_file_count; + extractedFileCount += result.extracted_file_count; + importedFileCount += result.imported_file_count; + analysisTaskQueued = analysisTaskQueued || result.analysis_task_queued; + analysisTaskId = result.analysis_task_id ?? analysisTaskId; + for (const warning of result.warnings) { + warnings.push(warning); } - result = await uploadWithXhr(getAccessToken()); + for (const importedFile of result.imported_files) { + importedFiles.add(importedFile); + } + completedBytes += batchBytes; + setDatasetDataUploadProgress(Math.min(99, Math.max(0, Math.round((completedBytes / Math.max(totalBytes, 1)) * 100)))); } - return result; + setDatasetDataUploadProgress(100); + + if (!latestResult) { + throw new Error("导入失败"); + } + + return { + ...latestResult, + uploaded_file_count: uploadedFileCount, + extracted_file_count: extractedFileCount, + imported_file_count: importedFileCount, + analysis_task_queued: analysisTaskQueued, + analysis_task_id: analysisTaskId, + warning_count: warnings.length, + warnings, + imported_files: Array.from(importedFiles).sort(), + }; }, onSuccess: async (payload) => { - const monitorHint = payload.analysis_task_id ? `,分析任务ID:${payload.analysis_task_id}` : ""; + const monitorHint = payload.analysis_task_queued && payload.analysis_task_id + ? `,分析任务ID:${payload.analysis_task_id}` + : ""; const msg = payload.warning_count > 0 ? `数据导入完成:上传 ${payload.uploaded_file_count} 个、解压 ${payload.extracted_file_count} 个、可用 ${payload.imported_file_count} 个,告警 ${payload.warning_count} 条${monitorHint}` : `数据导入完成:上传 ${payload.uploaded_file_count} 个、解压 ${payload.extracted_file_count} 个、可用 ${payload.imported_file_count} 个${monitorHint}`;