[fix/feat]:[FL-76][高程数据导入报错]

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
chengkai3
2026-06-10 08:26:51 +08:00
parent f19f694098
commit 2a54857fe1
4 changed files with 302 additions and 39 deletions
+3 -1
View File
@@ -1,6 +1,6 @@
from __future__ import annotations
from fastapi import APIRouter, Depends, File, HTTPException, Query, Response, UploadFile, status
from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Response, UploadFile, status
from sqlalchemy.orm import Session
from ...core.database import get_db
@@ -91,6 +91,7 @@ def import_elevation_datasets(
def import_elevation_dataset_data(
dataset_id: str,
files: list[UploadFile] = File(...),
trigger_analysis: bool = Form(True),
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
db: Session = Depends(get_db),
) -> ElevationDatasetDataImportResponse:
@@ -99,6 +100,7 @@ def import_elevation_dataset_data(
dataset_id=dataset_id,
files=files,
actor=current_user.user,
trigger_analysis=trigger_analysis,
)
+60 -19
View File
@@ -520,6 +520,7 @@ def import_dataset_data_files(
dataset_id: str,
files: list[UploadFile],
actor: User,
trigger_analysis: bool = True,
) -> ElevationDatasetDataImportResponse:
dataset = get_dataset_by_id(db, dataset_id)
if not dataset:
@@ -573,13 +574,25 @@ def import_dataset_data_files(
uploaded_file_count += 1
imported_files.append(target_path)
preferred_file_path = _pick_preferred_dataset_file_path(paths=imported_files)
available_file_paths = _list_dataset_imported_file_paths(
driver=driver,
dataset_dir=dataset_dir,
)
preferred_file_path = _pick_preferred_dataset_file_path(
paths=available_file_paths,
current_path=dataset.file_path,
)
if preferred_file_path is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未导入任何可用高程文件")
dataset.dataset_dir = dataset_dir
dataset.file_path = preferred_file_path
dataset.file_format = _detect_file_format(preferred_file_path)
dataset.sample_count = 0
dataset.bbox_min_lon = None
dataset.bbox_max_lon = None
dataset.bbox_min_lat = None
dataset.bbox_max_lat = None
dataset.usage_status = "idle"
dataset.terrain_status = _default_terrain_status_for_format(dataset.file_format)
dataset.terrain_task_id = None
@@ -590,26 +603,32 @@ def import_dataset_data_files(
dataset.terrain_max_zoom = None
dataset.terrain_bounds = None
dataset.terrain_metadata = None
dataset.analysis_task_id = None
dataset.analysis_status = "not_started"
dataset.analysis_error_message = None
dataset.analysis_started_at = None
dataset.analysis_finished_at = None
dataset.update_user = actor.id
dataset.update_date = utcnow()
db.commit()
analysis_task_queued = False
analysis_task_id: str | None = None
try:
task = _dispatch_elevation_dataset_analysis_task(dataset_id=dataset.id, actor_user_id=actor.id)
analysis_task_queued = True
analysis_task_id = str(task.id)
dataset.analysis_task_id = analysis_task_id
dataset.analysis_status = "queued"
dataset.analysis_error_message = None
dataset.analysis_started_at = None
dataset.analysis_finished_at = None
dataset.update_date = utcnow()
dataset.update_user = actor.id
db.commit()
except Exception as exc: # pragma: no cover
warnings.append(f"自动分析任务派发失败:{exc}")
if trigger_analysis:
try:
task = _dispatch_elevation_dataset_analysis_task(dataset_id=dataset.id, actor_user_id=actor.id)
analysis_task_queued = True
analysis_task_id = str(task.id)
dataset.analysis_task_id = analysis_task_id
dataset.analysis_status = "queued"
dataset.analysis_error_message = None
dataset.analysis_started_at = None
dataset.analysis_finished_at = None
dataset.update_date = utcnow()
dataset.update_user = actor.id
db.commit()
except Exception as exc: # pragma: no cover
warnings.append(f"自动分析任务派发失败:{exc}")
refreshed = get_dataset_by_id(db, dataset.id)
if refreshed is None:
@@ -1656,13 +1675,35 @@ def _extract_zip_to_dataset_directory(
}
def _pick_preferred_dataset_file_path(*, paths: list[str]) -> str | None:
def _list_dataset_imported_file_paths(*, driver: Any, dataset_dir: str) -> list[str]:
try:
entries = driver.list_dir(dataset_dir)
except StoragePathNotFoundError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"高程数据目录不存在: {dataset_dir}") from exc
except StorageInvalidPathError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except StorageDriverError as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
return [
entry.path
for entry in entries
if not entry.is_dir and Path(entry.name).suffix.lower() in IMPORTABLE_ELEVATION_EXTENSIONS
]
def _pick_preferred_dataset_file_path(*, paths: list[str], current_path: str | None = None) -> str | None:
if not paths:
return None
for extension in (".img", ".tif", ".tiff", ".csv"):
for path in paths:
if path.lower().endswith(extension):
return path
matching_paths = [path for path in paths if path.lower().endswith(extension)]
if not matching_paths:
continue
if current_path in matching_paths:
return current_path
return matching_paths[0]
if current_path in paths:
return current_path
return paths[0]
+138
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
import io
from pathlib import Path
from types import SimpleNamespace
@@ -16,6 +17,54 @@ from app.schemas.wine import WineRunRequest
from app.services import atp_model_service, elevation_service, wine_service
class _MemoryStorageDriver:
def __init__(self) -> None:
self.directories: set[str] = set()
self.files: dict[str, bytes] = {}
def ensure_directory(self, path: str) -> None:
self.directories.add(path.rstrip("/") or "/")
def write_file(self, path: str, *, content: bytes, content_type: str | None = None) -> SimpleNamespace:
self.files[path] = content
parent_path = path.rsplit("/", 1)[0] or "/"
return SimpleNamespace(
path=path,
parent_path=parent_path,
name=Path(path).name,
is_dir=False,
size=len(content),
modified_at=None,
mime_type=content_type,
)
def list_dir(self, path: str) -> list[SimpleNamespace]:
prefix = f"{path.rstrip('/')}/"
entries: list[SimpleNamespace] = []
for file_path in sorted(self.files):
if not file_path.startswith(prefix):
continue
suffix = file_path[len(prefix):]
if "/" in suffix:
continue
entries.append(
SimpleNamespace(
path=file_path,
parent_path=path,
name=Path(file_path).name,
is_dir=False,
size=len(self.files[file_path]),
modified_at=None,
mime_type=None,
)
)
return entries
def _build_upload(filename: str, content: bytes, content_type: str = "application/octet-stream") -> SimpleNamespace:
return SimpleNamespace(filename=filename, file=io.BytesIO(content), content_type=content_type)
def _build_sessionmaker(*tables):
engine = create_engine("sqlite+pysqlite:///:memory:")
Base.metadata.create_all(bind=engine, tables=list(tables))
@@ -165,6 +214,95 @@ def test_queue_dataset_terrain_build_reuses_existing_running_task(monkeypatch) -
session.close()
def test_import_dataset_data_files_batches_keep_preferred_raster_and_only_queue_final_analysis(monkeypatch) -> None:
testing_session = _build_sessionmaker(ElevationDataset.__table__)
session: Session = testing_session()
try:
dataset = ElevationDataset(
code="ELEV-IMPORT-001",
name="批量导入样例",
file_format="csv",
mount_code="default",
dataset_dir="/elevation/datasets/ELEV-IMPORT-001",
file_path="/elevation/datasets/ELEV-IMPORT-001/dataset.csv",
status="active",
usage_status="idle",
sample_count=128,
bbox_min_lon=100.0,
bbox_max_lon=120.0,
bbox_min_lat=20.0,
bbox_max_lat=30.0,
analysis_task_id="old-task",
analysis_status="success",
terrain_status="not_supported",
)
session.add(dataset)
session.commit()
actor = User(
id="actor-1",
email="actor@example.com",
username="actor",
password_hash="hashed",
status="active",
)
driver = _MemoryStorageDriver()
analysis_calls: list[tuple[str, str | None]] = []
monkeypatch.setattr(elevation_service, "_require_mount", lambda *_args, **_kwargs: SimpleNamespace(code="default"))
monkeypatch.setattr(elevation_service, "_build_driver_or_400", lambda *_args, **_kwargs: driver)
monkeypatch.setattr(
elevation_service,
"_dispatch_elevation_dataset_analysis_task",
lambda *, dataset_id, actor_user_id: analysis_calls.append((dataset_id, actor_user_id)) or SimpleNamespace(id="new-task"),
)
monkeypatch.setattr(elevation_service, "_publish_elevation_change", lambda *_args, **_kwargs: None)
first = elevation_service.import_dataset_data_files(
session,
dataset_id=dataset.id,
files=[_build_upload("terrain.img", b"img-bytes", "application/octet-stream")],
actor=actor,
trigger_analysis=False,
)
session.refresh(dataset)
assert first.analysis_task_queued is False
assert first.analysis_task_id is None
assert dataset.file_path.endswith("/terrain.img")
assert dataset.file_format == "img"
assert dataset.analysis_status == "not_started"
assert dataset.analysis_task_id is None
assert dataset.sample_count == 0
assert dataset.bbox_min_lon is None
assert dataset.terrain_status == "pending"
assert analysis_calls == []
second = elevation_service.import_dataset_data_files(
session,
dataset_id=dataset.id,
files=[_build_upload("points.csv", b"lon,lat,elevation\n1,2,3\n", "text/csv")],
actor=actor,
trigger_analysis=True,
)
session.refresh(dataset)
assert second.analysis_task_queued is True
assert second.analysis_task_id == "new-task"
assert dataset.file_path.endswith("/terrain.img")
assert dataset.file_format == "img"
assert dataset.analysis_status == "queued"
assert dataset.analysis_task_id == "new-task"
assert dataset.terrain_status == "pending"
assert analysis_calls == [(dataset.id, actor.id)]
assert set(driver.files) == {
"/elevation/datasets/ELEV-IMPORT-001/terrain.img",
"/elevation/datasets/ELEV-IMPORT-001/points.csv",
}
finally:
session.close()
def test_wine_create_run_queues_task_and_worker_records_failure(monkeypatch) -> None:
testing_session = _build_sessionmaker(WineRun.__table__)
session: Session = testing_session()
+101 -19
View File
@@ -77,6 +77,8 @@ const DEFAULT_APPLY_FORM: ApplyFormValues = {
mode: "fill_null_only",
};
const DATASET_IMPORT_BATCH_SIZE = 20;
function statusTagColor(status: string): string {
if (status === "success" || status === "active") return "green";
if (status === "running") return "blue";
@@ -146,6 +148,17 @@ function readXhrError(xhr: XMLHttpRequest): string {
}
}
function chunkFiles(files: File[], size: number): File[][] {
if (size <= 0) {
return [files];
}
const chunks: File[][] = [];
for (let index = 0; index < files.length; index += size) {
chunks.push(files.slice(index, index + size));
}
return chunks;
}
export default function AdminElevationPage() {
const queryClient = useQueryClient();
const {
@@ -316,7 +329,11 @@ export default function AdminElevationPage() {
payload.files.length === 1 ? payload.files[0].name : `${payload.files.length} 个文件`,
);
const uploadWithXhr = (token: string | null) =>
const uploadWithXhr = (
token: string | null,
files: File[],
options: { completedBytes: number; totalBytes: number; triggerAnalysis: boolean; label: string },
) =>
new Promise<ElevationDatasetDataImportResponse>((resolve, reject) => {
const xhr = new XMLHttpRequest();
xhr.open("POST", `${getApiBaseUrl()}/api/v1/elevation/datasets/${payload.datasetId}/data/import`);
@@ -326,16 +343,16 @@ export default function AdminElevationPage() {
}
xhr.upload.onprogress = (event: ProgressEvent<EventTarget>) => {
if (!event.lengthComputable || event.total <= 0) {
if (options.totalBytes <= 0) {
return;
}
const percent = Math.min(99, Math.max(0, Math.round((event.loaded / event.total) * 100)));
const loadedBytes = options.completedBytes + (event.lengthComputable ? event.loaded : 0);
const percent = Math.min(99, Math.max(0, Math.round((loadedBytes / options.totalBytes) * 100)));
setDatasetDataUploadProgress(percent);
};
xhr.onload = () => {
if (xhr.status >= 200 && xhr.status < 300) {
setDatasetDataUploadProgress(100);
try {
resolve(JSON.parse(xhr.responseText) as ElevationDatasetDataImportResponse);
} catch {
@@ -350,31 +367,96 @@ export default function AdminElevationPage() {
xhr.onabort = () => reject(new Error("导入已取消"));
const formData = new FormData();
for (const file of payload.files) {
formData.append("trigger_analysis", String(options.triggerAnalysis));
for (const file of files) {
formData.append("files", file);
}
setDatasetDataUploadFileName(options.label);
xhr.send(formData);
});
let result: ElevationDatasetDataImportResponse;
try {
result = await uploadWithXhr(getAccessToken());
} catch (error) {
const message = error instanceof Error ? error.message : "导入失败";
const isUnauthorized = message.includes("401") || message.includes("未授权");
if (!isUnauthorized) {
throw error;
const batches = chunkFiles(payload.files, DATASET_IMPORT_BATCH_SIZE);
const totalBytes = payload.files.reduce((sum, file) => sum + file.size, 0);
let completedBytes = 0;
let latestResult: ElevationDatasetDataImportResponse | null = null;
let uploadedFileCount = 0;
let extractedFileCount = 0;
let importedFileCount = 0;
let analysisTaskQueued = false;
let analysisTaskId: string | null = null;
const warnings: string[] = [];
const importedFiles = new Set<string>();
for (const [index, batch] of batches.entries()) {
const batchBytes = batch.reduce((sum, file) => sum + file.size, 0);
const triggerAnalysis = index === batches.length - 1;
const label = batches.length === 1
? (batch.length === 1 ? batch[0].name : `${batch.length} 个文件`)
: `${index + 1}/${batches.length} 批,共 ${batch.length} 个文件`;
let result: ElevationDatasetDataImportResponse;
try {
result = await uploadWithXhr(getAccessToken(), batch, {
completedBytes,
totalBytes,
triggerAnalysis,
label,
});
} catch (error) {
const message = error instanceof Error ? error.message : "导入失败";
const isUnauthorized = message.includes("401") || message.includes("未授权");
if (!isUnauthorized) {
throw error;
}
const refreshed = await refreshAccessToken();
if (!refreshed) {
throw error;
}
result = await uploadWithXhr(getAccessToken(), batch, {
completedBytes,
totalBytes,
triggerAnalysis,
label,
});
}
const refreshed = await refreshAccessToken();
if (!refreshed) {
throw error;
latestResult = result;
uploadedFileCount += result.uploaded_file_count;
extractedFileCount += result.extracted_file_count;
importedFileCount += result.imported_file_count;
analysisTaskQueued = analysisTaskQueued || result.analysis_task_queued;
analysisTaskId = result.analysis_task_id ?? analysisTaskId;
for (const warning of result.warnings) {
warnings.push(warning);
}
result = await uploadWithXhr(getAccessToken());
for (const importedFile of result.imported_files) {
importedFiles.add(importedFile);
}
completedBytes += batchBytes;
setDatasetDataUploadProgress(Math.min(99, Math.max(0, Math.round((completedBytes / Math.max(totalBytes, 1)) * 100))));
}
return result;
setDatasetDataUploadProgress(100);
if (!latestResult) {
throw new Error("导入失败");
}
return {
...latestResult,
uploaded_file_count: uploadedFileCount,
extracted_file_count: extractedFileCount,
imported_file_count: importedFileCount,
analysis_task_queued: analysisTaskQueued,
analysis_task_id: analysisTaskId,
warning_count: warnings.length,
warnings,
imported_files: Array.from(importedFiles).sort(),
};
},
onSuccess: async (payload) => {
const monitorHint = payload.analysis_task_id ? `,分析任务ID${payload.analysis_task_id}` : "";
const monitorHint = payload.analysis_task_queued && payload.analysis_task_id
? `,分析任务ID${payload.analysis_task_id}`
: "";
const msg = payload.warning_count > 0
? `数据导入完成:上传 ${payload.uploaded_file_count} 个、解压 ${payload.extracted_file_count} 个、可用 ${payload.imported_file_count} 个,告警 ${payload.warning_count}${monitorHint}`
: `数据导入完成:上传 ${payload.uploaded_file_count} 个、解压 ${payload.extracted_file_count} 个、可用 ${payload.imported_file_count}${monitorHint}`;