diff --git a/api/app/services/elevation_service.py b/api/app/services/elevation_service.py index 5ac71f7..915d105 100644 --- a/api/app/services/elevation_service.py +++ b/api/app/services/elevation_service.py @@ -590,18 +590,13 @@ def import_dataset_data_files( warnings=list(existing_job.warnings_json or []), ) - mount = _require_mount(db, dataset.mount_code) - driver = _build_driver_or_400(mount) - dataset_dir = _resolve_dataset_dir(dataset.code) - _ensure_dataset_directory(driver=driver, dataset_dir=dataset_dir) - now = utcnow() job = ElevationDataImportJob( dataset_id=dataset.id, status="pending", progress_percent=0, - current_stage="staging", - detail_message="正在接收上传文件。", + current_stage="pending", + detail_message="导入任务已创建,等待上传文件。", trigger_analysis=trigger_analysis, create_date=now, create_user=actor.id, @@ -615,18 +610,13 @@ def import_dataset_data_files( if saved is None: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="创建导入任务失败") - stage_result: dict[str, Any] | None = None + upload_result: dict[str, Any] | None = None try: - stage_result = _stage_dataset_import_job_uploads( - driver=driver, - dataset=dataset, - import_job_id=saved.id, - files=files, - ) - saved.uploaded_file_count = stage_result["uploaded_file_count"] - saved.warning_count = len(stage_result["warnings"]) - saved.warnings_json = list(stage_result["warnings"]) - saved.staged_files_json = list(stage_result["staged_files"]) + upload_result = _prepare_upload_files_for_staging(files=files) + saved.uploaded_file_count = upload_result["uploaded_file_count"] + saved.warning_count = len(upload_result["warnings"]) + saved.warnings_json = list(upload_result["warnings"]) + saved.staged_files_json = list(upload_result["upload_files"]) saved.current_stage = "queued" saved.detail_message = "导入任务已提交,等待执行。" saved.progress_percent = 0 @@ -647,11 +637,10 @@ def import_dataset_data_files( failed.progress_percent = 100 failed.current_stage = "failed" failed.detail_message = str(exc.detail) - if stage_result is not None: - failed.uploaded_file_count = stage_result["uploaded_file_count"] - failed.warning_count = len(stage_result["warnings"]) - failed.warnings_json = list(stage_result["warnings"]) - failed.staged_files_json = list(stage_result["staged_files"]) + if upload_result is not None: + failed.uploaded_file_count = upload_result["uploaded_file_count"] + failed.warning_count = len(upload_result["warnings"]) + failed.warnings_json = list(upload_result["warnings"]) failed.staged_files_json = [] failed.finished_at = utcnow() failed.update_date = utcnow() @@ -661,7 +650,6 @@ def import_dataset_data_files( "elevation.dataset.import.failed", {"action": "dataset_import_failed", "dataset_id": dataset.id, "import_job_id": failed.id}, ) - _cleanup_staged_dataset_import_files(driver=driver, dataset_code=dataset.code, import_job_id=saved.id) raise except Exception as exc: db.rollback() @@ -671,11 +659,10 @@ def import_dataset_data_files( failed.progress_percent = 100 failed.current_stage = "failed" failed.detail_message = f"导入任务派发失败:{exc}" - if stage_result is not None: - failed.uploaded_file_count = stage_result["uploaded_file_count"] - failed.warning_count = len(stage_result["warnings"]) - failed.warnings_json = list(stage_result["warnings"]) - failed.staged_files_json = list(stage_result["staged_files"]) + if upload_result is not None: + failed.uploaded_file_count = upload_result["uploaded_file_count"] + failed.warning_count = len(upload_result["warnings"]) + failed.warnings_json = list(upload_result["warnings"]) failed.staged_files_json = [] failed.finished_at = utcnow() failed.update_date = utcnow() @@ -685,7 +672,6 @@ def import_dataset_data_files( "elevation.dataset.import.failed", {"action": "dataset_import_failed", "dataset_id": dataset.id, "import_job_id": failed.id}, ) - _cleanup_staged_dataset_import_files(driver=driver, dataset_code=dataset.code, import_job_id=saved.id) raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"导入任务派发失败: {exc}", @@ -828,6 +814,51 @@ def _close_upload_file(upload: UploadFile) -> None: pass +def _prepare_upload_files_for_staging( + *, + files: list[UploadFile], +) -> dict[str, Any]: + import base64 + + upload_files: list[dict[str, str | None]] = [] + warnings: list[str] = [] + uploaded_file_count = 0 + + for upload in files: + filename = (upload.filename or "").strip() + if not filename: + warnings.append("存在空文件名上传项,已跳过") + _close_upload_file(upload) + continue + + suffix = Path(filename).suffix.lower() + if suffix not in IMPORTABLE_ELEVATION_EXTENSIONS and suffix not in IMPORTABLE_ARCHIVE_EXTENSIONS: + warnings.append(f"文件 {filename} 类型不支持,已跳过(仅支持 csv/img/tif/tiff/zip)") + _close_upload_file(upload) + continue + + content = _read_upload_content(upload) + content_base64 = base64.b64encode(content).decode("utf-8") + + upload_files.append( + { + "filename": filename, + "content_type": upload.content_type, + "content_base64": content_base64, + } + ) + uploaded_file_count += 1 + + if not upload_files: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未提交任何可用高程文件") + + return { + "uploaded_file_count": uploaded_file_count, + "warnings": warnings, + "upload_files": upload_files, + } + + def _stage_dataset_import_job_uploads( *, driver: Any, @@ -895,6 +926,83 @@ def _stage_dataset_import_job_uploads( } +def _stage_dataset_import_job_uploads_from_serialized( + *, + driver: Any, + dataset: ElevationDataset, + import_job_id: str, + upload_files: list[dict[str, str | None]], +) -> dict[str, Any]: + import base64 + + stage_root_dir = _resolve_dataset_import_stage_root_dir(dataset.code) + stage_dir = _resolve_dataset_import_stage_dir(dataset_code=dataset.code, import_job_id=import_job_id) + try: + driver.ensure_directory(stage_root_dir) + driver.ensure_directory(stage_dir) + except StorageInvalidPathError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + except StorageDriverError as exc: + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc + + staged_files: list[dict[str, str | None]] = [] + warnings: list[str] = [] + uploaded_file_count = 0 + + for index, upload_file in enumerate(upload_files, start=1): + filename = _normalize_str(upload_file.get("filename")) + if not filename: + warnings.append("存在空文件名上传项,已跳过") + continue + + suffix = Path(filename).suffix.lower() + if suffix not in IMPORTABLE_ELEVATION_EXTENSIONS and suffix not in IMPORTABLE_ARCHIVE_EXTENSIONS: + warnings.append(f"文件 {filename} 类型不支持,已跳过(仅支持 csv/img/tif/tiff/zip)") + continue + + content_base64 = _normalize_str(upload_file.get("content_base64")) + if not content_base64: + warnings.append(f"文件 {filename} 内容缺失,已跳过") + continue + + try: + content = base64.b64decode(content_base64) + except Exception as exc: + warnings.append(f"文件 {filename} 内容解码失败,已跳过") + continue + + stage_filename = _build_dataset_import_stage_filename(index=index, filename=filename) + stage_path = join_virtual_path(stage_dir, stage_filename) + try: + driver.write_file( + stage_path, + content=content, + content_type=upload_file.get("content_type") or mimetypes.guess_type(filename)[0], + ) + except StorageInvalidPathError as exc: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc + except StorageDriverError as exc: + raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc + + staged_files.append( + { + "path": stage_path, + "filename": filename, + "content_type": upload_file.get("content_type"), + } + ) + uploaded_file_count += 1 + + if not staged_files: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未提交任何可用高程文件") + + return { + "uploaded_file_count": uploaded_file_count, + "warnings": warnings, + "staged_files": staged_files, + } + + def _cleanup_staged_dataset_import_files(*, driver: Any, dataset_code: str, import_job_id: str) -> None: stage_dir = _resolve_dataset_import_stage_dir(dataset_code=dataset_code, import_job_id=import_job_id) try: @@ -1799,8 +1907,8 @@ def execute_dataset_data_import_job(*, import_job_id: str, actor_user_id: str | job.status = "running" job.progress_percent = 5 - job.current_stage = "running" - job.detail_message = "导入任务开始执行。" + job.current_stage = "staging" + job.detail_message = "正在暂存上传文件。" job.started_at = utcnow() job.finished_at = None job.update_date = utcnow() @@ -1811,6 +1919,29 @@ def execute_dataset_data_import_job(*, import_job_id: str, actor_user_id: str | {"action": "dataset_import_running", "dataset_id": job.dataset_id, "import_job_id": job.id}, ) + upload_files = list(job.staged_files_json or []) + if not upload_files: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未找到待暂存的上传文件") + + stage_result = _stage_dataset_import_job_uploads_from_serialized( + driver=driver, + dataset=dataset, + import_job_id=job.id, + upload_files=upload_files, + ) + + combined_warnings = list(job.warnings_json or []) + list(stage_result["warnings"]) + job.uploaded_file_count = stage_result["uploaded_file_count"] + job.warning_count = len(combined_warnings) + job.warnings_json = combined_warnings + job.staged_files_json = list(stage_result["staged_files"]) + job.progress_percent = 10 + job.current_stage = "running" + job.detail_message = "开始导入数据文件。" + job.update_date = utcnow() + job.update_user = actor_user_id or job.update_user + db.commit() + result = _perform_dataset_data_import( db, dataset=dataset, @@ -1824,7 +1955,7 @@ def execute_dataset_data_import_job(*, import_job_id: str, actor_user_id: str | saved = get_data_import_job_by_id(db, import_job_id) if saved is None: return - combined_warnings = list(saved.warnings_json or []) + list(result["warnings"]) + combined_warnings_final = list(saved.warnings_json or []) + list(result["warnings"]) saved.status = "success" saved.progress_percent = 100 saved.current_stage = "completed" @@ -1834,8 +1965,8 @@ def execute_dataset_data_import_job(*, import_job_id: str, actor_user_id: str | saved.uploaded_file_count = result["uploaded_file_count"] saved.extracted_file_count = result["extracted_file_count"] saved.imported_file_count = result["imported_file_count"] - saved.warning_count = len(combined_warnings) - saved.warnings_json = combined_warnings + saved.warning_count = len(combined_warnings_final) + saved.warnings_json = combined_warnings_final saved.imported_files_json = list(result["imported_files"]) saved.staged_files_json = [] saved.finished_at = utcnow() diff --git a/web/src/app/admin/elevation/page.tsx b/web/src/app/admin/elevation/page.tsx index 0e1fa69..9cfdd46 100644 --- a/web/src/app/admin/elevation/page.tsx +++ b/web/src/app/admin/elevation/page.tsx @@ -127,7 +127,8 @@ function importJobStatusLabel(status: string): string { function importJobStageLabel(stage: string | null | undefined): string { if (!stage) return "-"; - if (stage === "staging") return "接收文件"; + if (stage === "pending") return "等待执行"; + if (stage === "staging") return "暂存文件"; if (stage === "queued") return "等待执行"; if (stage === "running") return "开始执行"; if (stage === "importing") return "导入文件";