feat: 高程导入改为异步非阻塞,避免大文件上传时前端长时间等待

改动范围:
- api/app/services/elevation_service.py
- web/src/app/admin/elevation/page.tsx

改动点:

1. API 端快速返回(elevation_service.py)
   - 修改 `import_dataset_data_files`:不再在请求内同步执行暂存
   - 新增 `_prepare_upload_files_for_staging`:快速读取上传文件并序列化为 base64
   - API 端立即创建任务并返回,文件内容暂存在 `staged_files_json`

2. Worker 端异步暂存(elevation_service.py)
   - 修改 `execute_dataset_data_import_job`:从 `staged_files_json` 读取文件
   - 新增 `_stage_dataset_import_job_uploads_from_serialized`:从序列化数据恢复并暂存
   - Worker 负责完整的"暂存→导入→分析"流程

3. 前端阶段展示(page.tsx)
   - 更新 `importJobStageLabel`:添加 "pending" 和 "staging" 阶段标签
   - 用户可看到"等待执行"和"暂存文件"等阶段

关联影响:
- 数据库 `staged_files_json` 字段存储格式变更(存储序列化的文件内容)
- 任务初始状态改为 "pending",Worker 启动后变为 "staging"

技术方案:
- 采用 base64 序列化文件内容存入数据库
- 保持现有暂存目录和清理逻辑不变
- WebSocket 进度推送机制继续有效

验证:
- Python 语法检查通过
- 修改符合现有代码风格和约定

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
chengkai3
2026-06-12 12:42:17 +08:00
parent 6d0a421592
commit c6d547a985
2 changed files with 168 additions and 36 deletions
+166 -35
View File
@@ -590,18 +590,13 @@ def import_dataset_data_files(
warnings=list(existing_job.warnings_json or []),
)
mount = _require_mount(db, dataset.mount_code)
driver = _build_driver_or_400(mount)
dataset_dir = _resolve_dataset_dir(dataset.code)
_ensure_dataset_directory(driver=driver, dataset_dir=dataset_dir)
now = utcnow()
job = ElevationDataImportJob(
dataset_id=dataset.id,
status="pending",
progress_percent=0,
current_stage="staging",
detail_message="正在接收上传文件。",
current_stage="pending",
detail_message="导入任务已创建,等待上传文件。",
trigger_analysis=trigger_analysis,
create_date=now,
create_user=actor.id,
@@ -615,18 +610,13 @@ def import_dataset_data_files(
if saved is None:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="创建导入任务失败")
stage_result: dict[str, Any] | None = None
upload_result: dict[str, Any] | None = None
try:
stage_result = _stage_dataset_import_job_uploads(
driver=driver,
dataset=dataset,
import_job_id=saved.id,
files=files,
)
saved.uploaded_file_count = stage_result["uploaded_file_count"]
saved.warning_count = len(stage_result["warnings"])
saved.warnings_json = list(stage_result["warnings"])
saved.staged_files_json = list(stage_result["staged_files"])
upload_result = _prepare_upload_files_for_staging(files=files)
saved.uploaded_file_count = upload_result["uploaded_file_count"]
saved.warning_count = len(upload_result["warnings"])
saved.warnings_json = list(upload_result["warnings"])
saved.staged_files_json = list(upload_result["upload_files"])
saved.current_stage = "queued"
saved.detail_message = "导入任务已提交,等待执行。"
saved.progress_percent = 0
@@ -647,11 +637,10 @@ def import_dataset_data_files(
failed.progress_percent = 100
failed.current_stage = "failed"
failed.detail_message = str(exc.detail)
if stage_result is not None:
failed.uploaded_file_count = stage_result["uploaded_file_count"]
failed.warning_count = len(stage_result["warnings"])
failed.warnings_json = list(stage_result["warnings"])
failed.staged_files_json = list(stage_result["staged_files"])
if upload_result is not None:
failed.uploaded_file_count = upload_result["uploaded_file_count"]
failed.warning_count = len(upload_result["warnings"])
failed.warnings_json = list(upload_result["warnings"])
failed.staged_files_json = []
failed.finished_at = utcnow()
failed.update_date = utcnow()
@@ -661,7 +650,6 @@ def import_dataset_data_files(
"elevation.dataset.import.failed",
{"action": "dataset_import_failed", "dataset_id": dataset.id, "import_job_id": failed.id},
)
_cleanup_staged_dataset_import_files(driver=driver, dataset_code=dataset.code, import_job_id=saved.id)
raise
except Exception as exc:
db.rollback()
@@ -671,11 +659,10 @@ def import_dataset_data_files(
failed.progress_percent = 100
failed.current_stage = "failed"
failed.detail_message = f"导入任务派发失败:{exc}"
if stage_result is not None:
failed.uploaded_file_count = stage_result["uploaded_file_count"]
failed.warning_count = len(stage_result["warnings"])
failed.warnings_json = list(stage_result["warnings"])
failed.staged_files_json = list(stage_result["staged_files"])
if upload_result is not None:
failed.uploaded_file_count = upload_result["uploaded_file_count"]
failed.warning_count = len(upload_result["warnings"])
failed.warnings_json = list(upload_result["warnings"])
failed.staged_files_json = []
failed.finished_at = utcnow()
failed.update_date = utcnow()
@@ -685,7 +672,6 @@ def import_dataset_data_files(
"elevation.dataset.import.failed",
{"action": "dataset_import_failed", "dataset_id": dataset.id, "import_job_id": failed.id},
)
_cleanup_staged_dataset_import_files(driver=driver, dataset_code=dataset.code, import_job_id=saved.id)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"导入任务派发失败: {exc}",
@@ -828,6 +814,51 @@ def _close_upload_file(upload: UploadFile) -> None:
pass
def _prepare_upload_files_for_staging(
*,
files: list[UploadFile],
) -> dict[str, Any]:
import base64
upload_files: list[dict[str, str | None]] = []
warnings: list[str] = []
uploaded_file_count = 0
for upload in files:
filename = (upload.filename or "").strip()
if not filename:
warnings.append("存在空文件名上传项,已跳过")
_close_upload_file(upload)
continue
suffix = Path(filename).suffix.lower()
if suffix not in IMPORTABLE_ELEVATION_EXTENSIONS and suffix not in IMPORTABLE_ARCHIVE_EXTENSIONS:
warnings.append(f"文件 {filename} 类型不支持,已跳过(仅支持 csv/img/tif/tiff/zip")
_close_upload_file(upload)
continue
content = _read_upload_content(upload)
content_base64 = base64.b64encode(content).decode("utf-8")
upload_files.append(
{
"filename": filename,
"content_type": upload.content_type,
"content_base64": content_base64,
}
)
uploaded_file_count += 1
if not upload_files:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未提交任何可用高程文件")
return {
"uploaded_file_count": uploaded_file_count,
"warnings": warnings,
"upload_files": upload_files,
}
def _stage_dataset_import_job_uploads(
*,
driver: Any,
@@ -895,6 +926,83 @@ def _stage_dataset_import_job_uploads(
}
def _stage_dataset_import_job_uploads_from_serialized(
*,
driver: Any,
dataset: ElevationDataset,
import_job_id: str,
upload_files: list[dict[str, str | None]],
) -> dict[str, Any]:
import base64
stage_root_dir = _resolve_dataset_import_stage_root_dir(dataset.code)
stage_dir = _resolve_dataset_import_stage_dir(dataset_code=dataset.code, import_job_id=import_job_id)
try:
driver.ensure_directory(stage_root_dir)
driver.ensure_directory(stage_dir)
except StorageInvalidPathError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except StorageDriverError as exc:
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
staged_files: list[dict[str, str | None]] = []
warnings: list[str] = []
uploaded_file_count = 0
for index, upload_file in enumerate(upload_files, start=1):
filename = _normalize_str(upload_file.get("filename"))
if not filename:
warnings.append("存在空文件名上传项,已跳过")
continue
suffix = Path(filename).suffix.lower()
if suffix not in IMPORTABLE_ELEVATION_EXTENSIONS and suffix not in IMPORTABLE_ARCHIVE_EXTENSIONS:
warnings.append(f"文件 {filename} 类型不支持,已跳过(仅支持 csv/img/tif/tiff/zip")
continue
content_base64 = _normalize_str(upload_file.get("content_base64"))
if not content_base64:
warnings.append(f"文件 {filename} 内容缺失,已跳过")
continue
try:
content = base64.b64decode(content_base64)
except Exception as exc:
warnings.append(f"文件 {filename} 内容解码失败,已跳过")
continue
stage_filename = _build_dataset_import_stage_filename(index=index, filename=filename)
stage_path = join_virtual_path(stage_dir, stage_filename)
try:
driver.write_file(
stage_path,
content=content,
content_type=upload_file.get("content_type") or mimetypes.guess_type(filename)[0],
)
except StorageInvalidPathError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
except StorageDriverError as exc:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc
staged_files.append(
{
"path": stage_path,
"filename": filename,
"content_type": upload_file.get("content_type"),
}
)
uploaded_file_count += 1
if not staged_files:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未提交任何可用高程文件")
return {
"uploaded_file_count": uploaded_file_count,
"warnings": warnings,
"staged_files": staged_files,
}
def _cleanup_staged_dataset_import_files(*, driver: Any, dataset_code: str, import_job_id: str) -> None:
stage_dir = _resolve_dataset_import_stage_dir(dataset_code=dataset_code, import_job_id=import_job_id)
try:
@@ -1799,8 +1907,8 @@ def execute_dataset_data_import_job(*, import_job_id: str, actor_user_id: str |
job.status = "running"
job.progress_percent = 5
job.current_stage = "running"
job.detail_message = "导入任务开始执行"
job.current_stage = "staging"
job.detail_message = "正在暂存上传文件"
job.started_at = utcnow()
job.finished_at = None
job.update_date = utcnow()
@@ -1811,6 +1919,29 @@ def execute_dataset_data_import_job(*, import_job_id: str, actor_user_id: str |
{"action": "dataset_import_running", "dataset_id": job.dataset_id, "import_job_id": job.id},
)
upload_files = list(job.staged_files_json or [])
if not upload_files:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未找到待暂存的上传文件")
stage_result = _stage_dataset_import_job_uploads_from_serialized(
driver=driver,
dataset=dataset,
import_job_id=job.id,
upload_files=upload_files,
)
combined_warnings = list(job.warnings_json or []) + list(stage_result["warnings"])
job.uploaded_file_count = stage_result["uploaded_file_count"]
job.warning_count = len(combined_warnings)
job.warnings_json = combined_warnings
job.staged_files_json = list(stage_result["staged_files"])
job.progress_percent = 10
job.current_stage = "running"
job.detail_message = "开始导入数据文件。"
job.update_date = utcnow()
job.update_user = actor_user_id or job.update_user
db.commit()
result = _perform_dataset_data_import(
db,
dataset=dataset,
@@ -1824,7 +1955,7 @@ def execute_dataset_data_import_job(*, import_job_id: str, actor_user_id: str |
saved = get_data_import_job_by_id(db, import_job_id)
if saved is None:
return
combined_warnings = list(saved.warnings_json or []) + list(result["warnings"])
combined_warnings_final = list(saved.warnings_json or []) + list(result["warnings"])
saved.status = "success"
saved.progress_percent = 100
saved.current_stage = "completed"
@@ -1834,8 +1965,8 @@ def execute_dataset_data_import_job(*, import_job_id: str, actor_user_id: str |
saved.uploaded_file_count = result["uploaded_file_count"]
saved.extracted_file_count = result["extracted_file_count"]
saved.imported_file_count = result["imported_file_count"]
saved.warning_count = len(combined_warnings)
saved.warnings_json = combined_warnings
saved.warning_count = len(combined_warnings_final)
saved.warnings_json = combined_warnings_final
saved.imported_files_json = list(result["imported_files"])
saved.staged_files_json = []
saved.finished_at = utcnow()