支持高程批量导入进度与异步分析任务监控

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
2026-05-03 14:20:19 +08:00
parent df7c6ca14e
commit 0952edd402
5 changed files with 170 additions and 28 deletions
+2 -1
View File
@@ -130,7 +130,8 @@ class ElevationDatasetDataImportResponse(BaseModel):
uploaded_file_count: int
extracted_file_count: int
imported_file_count: int
analyzed: bool = False
analysis_task_queued: bool = False
analysis_task_id: str | None = None
warning_count: int
warnings: list[str] = Field(default_factory=list)
imported_files: list[str] = Field(default_factory=list)
+32 -8
View File
@@ -439,15 +439,14 @@ def import_dataset_data_files(
dataset.update_date = utcnow()
db.commit()
analyzed = False
analysis_task_queued = False
analysis_task_id: str | None = None
try:
analyze_dataset(db, dataset_id=dataset.id, actor=actor)
analyzed = True
except HTTPException as exc:
detail = str(exc.detail) if exc.detail else "未知错误"
warnings.append(f"自动分析失败:{detail}")
task = _dispatch_elevation_dataset_analysis_task(dataset_id=dataset.id, actor_user_id=actor.id)
analysis_task_queued = True
analysis_task_id = str(task.id)
except Exception as exc: # pragma: no cover
warnings.append(f"自动分析异常{exc}")
warnings.append(f"自动分析任务派发失败{exc}")
refreshed = get_dataset_by_id(db, dataset.id)
if refreshed is None:
@@ -462,7 +461,8 @@ def import_dataset_data_files(
uploaded_file_count=uploaded_file_count,
extracted_file_count=extracted_file_count,
imported_file_count=len(imported_files),
analyzed=analyzed,
analysis_task_queued=analysis_task_queued,
analysis_task_id=analysis_task_id,
warning_count=len(warnings),
warnings=warnings,
imported_files=sorted(set(imported_files)),
@@ -741,6 +741,12 @@ def _dispatch_elevation_apply_task(*, job_id: str):
return apply_elevation_for_line_job.delay(job_id)
def _dispatch_elevation_dataset_analysis_task(*, dataset_id: str, actor_user_id: str | None):
from ..tasks.elevation_tasks import analyze_elevation_dataset_job
return analyze_elevation_dataset_job.delay(dataset_id, actor_user_id)
def execute_apply_job(job_id: str) -> None:
db = SessionLocal()
try:
@@ -845,6 +851,24 @@ def execute_apply_job(job_id: str) -> None:
db.close()
def execute_dataset_analysis_job(*, dataset_id: str, actor_user_id: str | None) -> None:
db = SessionLocal()
try:
item = get_dataset_by_id(db, dataset_id)
if not item:
return
actor = db.execute(select(User).where(User.id == actor_user_id)).scalar_one_or_none() if actor_user_id else None
if actor is None:
actor = db.execute(select(User).where(User.status == "active").order_by(User.id.asc())).scalars().first()
if actor is None:
return
analyze_dataset(db, dataset_id=dataset_id, actor=actor)
finally:
db.close()
def _ensure_dataset_file_exists(db: Session, *, mount_code: str, file_path: str) -> None:
mount = _require_mount(db, mount_code.strip())
driver = _build_driver_or_400(mount)
+7 -1
View File
@@ -1,10 +1,16 @@
from __future__ import annotations
from ..core.celery_app import celery_app
from ..services.elevation_service import execute_apply_job
from ..services.elevation_service import execute_apply_job, execute_dataset_analysis_job
@celery_app.task(name="app.tasks.elevation_tasks.apply_elevation_for_line_job")
def apply_elevation_for_line_job(job_id: str) -> dict[str, str]:
execute_apply_job(job_id)
return {"job_id": job_id, "status": "done"}
@celery_app.task(name="app.tasks.elevation_tasks.analyze_elevation_dataset_job")
def analyze_elevation_dataset_job(dataset_id: str, actor_user_id: str | None) -> dict[str, str]:
execute_dataset_analysis_job(dataset_id=dataset_id, actor_user_id=actor_user_id)
return {"dataset_id": dataset_id, "status": "done"}