From b8f61a72aac7270d49b33905b23adbd9fa76519f Mon Sep 17 00:00:00 2001 From: chengkai3 Date: Sat, 27 Jun 2026 10:13:51 +0800 Subject: [PATCH] =?UTF-8?q?[feat]:[FL-194][=E5=88=A0=E9=99=A4AtpModel?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=EF=BC=8C=E4=BF=9D=E7=95=99AtpAsset]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 删除AtpModel、AtpModelVersion、AtpSimulationRun模型及相关代码 - 删除/api/v1/atp/models API端点 - 将engine status功能迁移到atp_asset_service - 更新路由和模型注册,移除atp_model引用 - 删除相关测试文件 - 更新fl_analysis_service使用atp_asset_service的_truncate_output Co-Authored-By: Claude Sonnet 4.6 Co-authored-by: multica-agent --- api/app/api/router.py | 2 - api/app/api/v1/atp_assets.py | 4 +- api/app/api/v1/atp_models.py | 222 ----- api/app/models/__init__.py | 3 +- api/app/models/atp_model.py | 152 --- api/app/schemas/atp_asset.py | 13 + api/app/schemas/atp_model.py | 155 --- api/app/services/atp_asset_service.py | 82 ++ api/app/services/atp_model_service.py | 1107 --------------------- api/app/services/fl_analysis_service.py | 2 +- api/app/tasks/atp_model_tasks.py | 20 - api/tests/test_async_dispatch_services.py | 437 -------- api/tests/test_atp_engine_task.py | 110 -- api/tests/test_atp_model_service.py | 66 -- 14 files changed, 99 insertions(+), 2276 deletions(-) delete mode 100644 api/app/api/v1/atp_models.py delete mode 100644 api/app/models/atp_model.py delete mode 100644 api/app/schemas/atp_model.py delete mode 100644 api/app/services/atp_model_service.py delete mode 100644 api/app/tasks/atp_model_tasks.py delete mode 100644 api/tests/test_async_dispatch_services.py delete mode 100644 api/tests/test_atp_engine_task.py delete mode 100644 api/tests/test_atp_model_service.py diff --git a/api/app/api/router.py b/api/app/api/router.py index 6117fd5..70b31a3 100644 --- a/api/app/api/router.py +++ b/api/app/api/router.py @@ -4,7 +4,6 @@ from .v1.admin import router as admin_router from .v1.admin_files import router as admin_files_router from .v1.ai_chat import router as ai_chat_router from .v1.atp_assets import router as atp_assets_router -from .v1.atp_models import router as atp_models_router from .v1.auth import router as auth_router from .v1.documents import router as documents_router from .v1.elevation import router as elevation_router @@ -30,7 +29,6 @@ v1_router.include_router(admin_router) v1_router.include_router(admin_files_router) v1_router.include_router(ai_chat_router) v1_router.include_router(atp_assets_router) -v1_router.include_router(atp_models_router) v1_router.include_router(documents_router) v1_router.include_router(task_monitor_router) v1_router.include_router(scheduled_tasks_router) diff --git a/api/app/api/v1/atp_assets.py b/api/app/api/v1/atp_assets.py index dbca9a5..78e0431 100644 --- a/api/app/api/v1/atp_assets.py +++ b/api/app/api/v1/atp_assets.py @@ -19,8 +19,8 @@ from ...schemas.atp_asset import ( AtpAssetRunListResponse, AtpAssetRunRequest, AtpAssetUpdateRequest, + AtpEngineStatusResponse, ) -from ...schemas.atp_model import AtpEngineStatusResponse from ...services.atp_asset_service import ( activate_release, create_asset, @@ -28,6 +28,7 @@ from ...services.atp_asset_service import ( create_release_from_archive, delete_asset, get_asset_by_id, + get_engine_status, get_release_by_id, get_run_detail, list_assets, @@ -40,7 +41,6 @@ from ...services.atp_asset_service import ( update_asset, update_release, ) -from ...services.atp_model_service import get_engine_status router = APIRouter(prefix="/atp", tags=["atp-assets"], dependencies=[Depends(require_enabled_menu_route)]) diff --git a/api/app/api/v1/atp_models.py b/api/app/api/v1/atp_models.py deleted file mode 100644 index fd98a13..0000000 --- a/api/app/api/v1/atp_models.py +++ /dev/null @@ -1,222 +0,0 @@ -from __future__ import annotations - -from fastapi import APIRouter, Depends, HTTPException, Query, status -from sqlalchemy.orm import Session - -from ...core.database import get_db -from ...core.dependencies import CurrentUser, require_any_permission, require_enabled_menu_route, require_permission -from ...schemas.atp_model import ( - AtpEngineStatusResponse, - AtpModelCreateRequest, - AtpModelListResponse, - AtpModelSummary, - AtpModelUpdateRequest, - AtpModelVersionCreateRequest, - AtpModelVersionDetail, - AtpModelVersionListResponse, - AtpModelVersionUpdateRequest, - AtpSimulationRunDetail, - AtpSimulationRunListResponse, - AtpSimulationRunRequest, -) -from ...services.atp_model_service import ( - activate_model_version, - create_model, - create_model_version, - delete_model, - get_engine_status, - get_model_by_id, - get_model_run_detail, - get_model_version_by_id, - list_model_runs, - list_model_versions, - list_models, - run_model_version, - serialize_model, - serialize_version_detail, - update_model, - update_model_version, -) - -router = APIRouter(prefix="/atp/models", tags=["atp-models"], dependencies=[Depends(require_enabled_menu_route)]) - - -@router.get("/engine/status", response_model=AtpEngineStatusResponse) -def get_atp_engine_status_endpoint( - _: CurrentUser = Depends(require_any_permission("atp.read", "atp.run", "atp.manage")), -) -> AtpEngineStatusResponse: - return get_engine_status() - - -@router.get("", response_model=AtpModelListResponse) -def get_atp_model_list( - keyword: str | None = Query(default=None), - status_filter: str | None = Query(default=None, alias="status"), - limit: int = Query(default=100, ge=1, le=500), - offset: int = Query(default=0, ge=0), - _: CurrentUser = Depends(require_any_permission("atp.read", "atp.run", "atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelListResponse: - return list_models(db, keyword=keyword, status_filter=status_filter, limit=limit, offset=offset) - - -@router.post("", response_model=AtpModelSummary) -def create_atp_model_endpoint( - payload: AtpModelCreateRequest, - current_user: CurrentUser = Depends(require_permission("atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelSummary: - created = create_model(db, payload, actor_user_id=current_user.user.id) - if not created: - raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Model code already exists") - return created - - -@router.get("/{model_id}", response_model=AtpModelSummary) -def get_atp_model_detail( - model_id: str, - _: CurrentUser = Depends(require_any_permission("atp.read", "atp.run", "atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelSummary: - item = get_model_by_id(db, model_id) - if not item: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - - version_count = int(len(item.versions)) - run_count = int(len(item.runs)) - last_run = item.runs[0] if item.runs else None - return serialize_model( - item, - version_count=version_count, - run_count=run_count, - last_run_status=last_run.status if last_run else None, - last_run_date=last_run.create_date if last_run else None, - ) - - -@router.patch("/{model_id}", response_model=AtpModelSummary) -def update_atp_model_endpoint( - model_id: str, - payload: AtpModelUpdateRequest, - current_user: CurrentUser = Depends(require_permission("atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelSummary: - updated = update_model(db, model_id, payload, actor_user_id=current_user.user.id) - if not updated: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - return updated - - -@router.delete("/{model_id}") -def delete_atp_model_endpoint( - model_id: str, - _: CurrentUser = Depends(require_permission("atp.manage")), - db: Session = Depends(get_db), -) -> dict[str, bool]: - deleted = delete_model(db, model_id) - if not deleted: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - return {"success": True} - - -@router.get("/{model_id}/versions", response_model=AtpModelVersionListResponse) -def get_atp_model_versions( - model_id: str, - limit: int = Query(default=100, ge=1, le=500), - offset: int = Query(default=0, ge=0), - _: CurrentUser = Depends(require_any_permission("atp.read", "atp.run", "atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelVersionListResponse: - if not get_model_by_id(db, model_id): - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - return list_model_versions(db, model_id=model_id, limit=limit, offset=offset) - - -@router.post("/{model_id}/versions", response_model=AtpModelVersionDetail) -def create_atp_model_version_endpoint( - model_id: str, - payload: AtpModelVersionCreateRequest, - current_user: CurrentUser = Depends(require_permission("atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelVersionDetail: - return create_model_version(db, model_id=model_id, payload=payload, actor_user_id=current_user.user.id) - - -@router.get("/{model_id}/versions/{version_id}", response_model=AtpModelVersionDetail) -def get_atp_model_version_detail( - model_id: str, - version_id: str, - _: CurrentUser = Depends(require_any_permission("atp.read", "atp.run", "atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelVersionDetail: - item = get_model_version_by_id(db, model_id=model_id, version_id=version_id) - if not item: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Version not found") - return serialize_version_detail(item) - - -@router.patch("/{model_id}/versions/{version_id}", response_model=AtpModelVersionDetail) -def update_atp_model_version_endpoint( - model_id: str, - version_id: str, - payload: AtpModelVersionUpdateRequest, - current_user: CurrentUser = Depends(require_permission("atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelVersionDetail: - return update_model_version( - db, - model_id=model_id, - version_id=version_id, - payload=payload, - actor_user_id=current_user.user.id, - ) - - -@router.post("/{model_id}/versions/{version_id}/activate", response_model=AtpModelSummary) -def activate_atp_model_version_endpoint( - model_id: str, - version_id: str, - current_user: CurrentUser = Depends(require_permission("atp.manage")), - db: Session = Depends(get_db), -) -> AtpModelSummary: - return activate_model_version( - db, - model_id=model_id, - version_id=version_id, - actor_user_id=current_user.user.id, - ) - - -@router.get("/{model_id}/runs", response_model=AtpSimulationRunListResponse) -def get_atp_model_runs( - model_id: str, - limit: int = Query(default=100, ge=1, le=500), - offset: int = Query(default=0, ge=0), - _: CurrentUser = Depends(require_any_permission("atp.read", "atp.run", "atp.manage")), - db: Session = Depends(get_db), -) -> AtpSimulationRunListResponse: - if not get_model_by_id(db, model_id): - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - return list_model_runs(db, model_id=model_id, limit=limit, offset=offset) - - -@router.post("/{model_id}/runs", response_model=AtpSimulationRunDetail) -def run_atp_model_endpoint( - model_id: str, - payload: AtpSimulationRunRequest, - current_user: CurrentUser = Depends(require_any_permission("atp.run", "atp.manage")), - db: Session = Depends(get_db), -) -> AtpSimulationRunDetail: - return run_model_version(db, model_id=model_id, payload=payload, actor_user_id=current_user.user.id) - - -@router.get("/{model_id}/runs/{run_id}", response_model=AtpSimulationRunDetail) -def get_atp_model_run_detail( - model_id: str, - run_id: str, - _: CurrentUser = Depends(require_any_permission("atp.read", "atp.run", "atp.manage")), - db: Session = Depends(get_db), -) -> AtpSimulationRunDetail: - if not get_model_by_id(db, model_id): - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - return get_model_run_detail(db, model_id=model_id, run_id=run_id) diff --git a/api/app/models/__init__.py b/api/app/models/__init__.py index c5e156a..1471483 100644 --- a/api/app/models/__init__.py +++ b/api/app/models/__init__.py @@ -4,12 +4,11 @@ Import all model modules during package initialization so SQLAlchemy can resolve string-based relationships regardless of route/service import order. """ -from . import ai_chat, atp_asset, atp_model, audit_log, auth_session, document, elevation, file_storage, fl_analysis, lightning_event, lightning_sample, line, line_tower, menu, object_group, rbac, scheduled_task, system_message, system_param, tower_model, tower_profile, user, wine, worker_registry +from . import ai_chat, atp_asset, audit_log, auth_session, document, elevation, file_storage, fl_analysis, lightning_event, lightning_sample, line, line_tower, menu, object_group, rbac, scheduled_task, system_message, system_param, tower_model, tower_profile, user, wine, worker_registry __all__ = [ "ai_chat", "atp_asset", - "atp_model", "audit_log", "auth_session", "document", diff --git a/api/app/models/atp_model.py b/api/app/models/atp_model.py deleted file mode 100644 index 3c62adc..0000000 --- a/api/app/models/atp_model.py +++ /dev/null @@ -1,152 +0,0 @@ -from __future__ import annotations - -from datetime import datetime -from typing import Any -from uuid import uuid4 - -from sqlalchemy import JSON, DateTime, ForeignKey, Index, Integer, String, Text, UniqueConstraint -from sqlalchemy.orm import Mapped, mapped_column, relationship - -from ..core.database import Base -from .base import utcnow - -class AtpModel(Base): - __tablename__ = "atp_model" - __table_args__ = ( - UniqueConstraint("code", name="uq_atp_model_code"), - Index("idx_atp_model_status", "status"), - Index("idx_atp_model_source", "source_type"), - ) - - id: Mapped[str] = mapped_column( - String(32), - primary_key=True, - default=lambda: uuid4().hex, - ) - code: Mapped[str] = mapped_column(String(64), nullable=False, index=True) - name: Mapped[str] = mapped_column(String(255), nullable=False) - source_type: Mapped[str] = mapped_column(String(32), default="atpdraw", index=True) - description: Mapped[str] = mapped_column(Text(), default="") - status: Mapped[str] = mapped_column(String(20), default="enabled", index=True) - tags_json: Mapped[list[str]] = mapped_column(JSON, default=list) - latest_version_no: Mapped[int] = mapped_column(Integer, default=0) - active_version_no: Mapped[int | None] = mapped_column(Integer) - create_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) - create_user: Mapped[str | None] = mapped_column(String(64), index=True) - update_date: Mapped[datetime] = mapped_column( - DateTime(timezone=True), - default=utcnow, - onupdate=utcnow, - ) - update_user: Mapped[str | None] = mapped_column(String(64), index=True) - - versions: Mapped[list[AtpModelVersion]] = relationship( - "AtpModelVersion", - back_populates="model", - lazy="selectin", - cascade="all, delete-orphan", - order_by="AtpModelVersion.version_no.desc()", - ) - runs: Mapped[list[AtpSimulationRun]] = relationship( - "AtpSimulationRun", - back_populates="model", - lazy="selectin", - cascade="all, delete-orphan", - order_by="AtpSimulationRun.create_date.desc()", - ) - - -class AtpModelVersion(Base): - __tablename__ = "atp_model_version" - __table_args__ = ( - UniqueConstraint("model_id", "version_no", name="uq_atp_model_version_model_no"), - Index("idx_atp_model_version_status", "status"), - Index("idx_atp_model_version_model_status", "model_id", "status"), - Index("idx_atp_model_version_content_hash", "content_hash"), - ) - - id: Mapped[str] = mapped_column( - String(32), - primary_key=True, - default=lambda: uuid4().hex, - ) - model_id: Mapped[str] = mapped_column( - String(32), - ForeignKey("atp_model.id", ondelete="CASCADE"), - nullable=False, - index=True, - ) - version_no: Mapped[int] = mapped_column(Integer, nullable=False, index=True) - version_tag: Mapped[str | None] = mapped_column(String(64), index=True) - status: Mapped[str] = mapped_column(String(20), default="draft", index=True) - entry_file: Mapped[str | None] = mapped_column(String(255)) - change_note: Mapped[str] = mapped_column(Text(), default="") - artifact_manifest_json: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict) - graph_json: Mapped[dict[str, Any]] = mapped_column(JSON, default=dict) - atp_text: Mapped[str] = mapped_column(Text(), default="") - content_hash: Mapped[str] = mapped_column(String(64), index=True) - create_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) - create_user: Mapped[str | None] = mapped_column(String(64), index=True) - update_date: Mapped[datetime] = mapped_column( - DateTime(timezone=True), - default=utcnow, - onupdate=utcnow, - ) - update_user: Mapped[str | None] = mapped_column(String(64), index=True) - - model: Mapped[AtpModel] = relationship("AtpModel", back_populates="versions", lazy="selectin") - runs: Mapped[list[AtpSimulationRun]] = relationship( - "AtpSimulationRun", - back_populates="version", - lazy="selectin", - order_by="AtpSimulationRun.create_date.desc()", - ) - - -class AtpSimulationRun(Base): - __tablename__ = "atp_simulation_run" - __table_args__ = ( - Index("idx_atp_simulation_run_status", "status"), - Index("idx_atp_simulation_run_model", "model_id", "create_date"), - ) - - id: Mapped[str] = mapped_column( - String(32), - primary_key=True, - default=lambda: uuid4().hex, - ) - model_id: Mapped[str] = mapped_column( - String(32), - ForeignKey("atp_model.id", ondelete="CASCADE"), - nullable=False, - index=True, - ) - version_id: Mapped[str | None] = mapped_column( - String(32), - ForeignKey("atp_model_version.id", ondelete="SET NULL"), - index=True, - ) - status: Mapped[str] = mapped_column(String(20), default="pending", index=True) - engine_mode: Mapped[str] = mapped_column(String(20), default="wine", index=True) - task_id: Mapped[str | None] = mapped_column(String(128), index=True) - engine_command: Mapped[str | None] = mapped_column(String(1000)) - working_dir: Mapped[str | None] = mapped_column(String(1000)) - timeout_seconds: Mapped[int] = mapped_column(Integer, default=600) - exit_code: Mapped[int | None] = mapped_column(Integer) - started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) - finished_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) - duration_ms: Mapped[int | None] = mapped_column(Integer) - stdout_text: Mapped[str | None] = mapped_column(Text()) - stderr_text: Mapped[str | None] = mapped_column(Text()) - error_message: Mapped[str | None] = mapped_column(Text()) - create_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) - create_user: Mapped[str | None] = mapped_column(String(64), index=True) - update_date: Mapped[datetime] = mapped_column( - DateTime(timezone=True), - default=utcnow, - onupdate=utcnow, - ) - update_user: Mapped[str | None] = mapped_column(String(64), index=True) - - model: Mapped[AtpModel] = relationship("AtpModel", back_populates="runs", lazy="selectin") - version: Mapped[AtpModelVersion | None] = relationship("AtpModelVersion", back_populates="runs", lazy="selectin") diff --git a/api/app/schemas/atp_asset.py b/api/app/schemas/atp_asset.py index 8913b1d..c818718 100644 --- a/api/app/schemas/atp_asset.py +++ b/api/app/schemas/atp_asset.py @@ -209,3 +209,16 @@ class AtpAssetRunRequest(BaseModel): class AtpAssetReleaseUploadResponse(BaseModel): task_id: str status: str + + +class AtpEngineStatusResponse(BaseModel): + mode: AtpAssetEngineMode + available: bool + executable_path: str + resolved_executable: str | None = None + storage_root: str + workdir: str + default_timeout_seconds: int + max_timeout_seconds: int + checks: dict[str, dict[str, Any]] = Field(default_factory=dict) + error: str | None = None diff --git a/api/app/schemas/atp_model.py b/api/app/schemas/atp_model.py deleted file mode 100644 index 6da228f..0000000 --- a/api/app/schemas/atp_model.py +++ /dev/null @@ -1,155 +0,0 @@ -from __future__ import annotations - -from datetime import datetime -from typing import Any, Literal - -from pydantic import BaseModel, Field - -AtpModelStatus = Literal["enabled", "disabled"] -AtpModelSourceType = Literal["atpdraw", "atp", "manual"] -AtpModelVersionStatus = Literal["draft", "released", "archived"] -AtpSimulationRunStatus = Literal["pending", "running", "success", "failed"] -AtpEngineMode = Literal["wine", "native"] - - -class AtpModelSummary(BaseModel): - id: str - code: str - name: str - source_type: AtpModelSourceType - description: str - status: AtpModelStatus - tags_json: list[str] = Field(default_factory=list) - latest_version_no: int = 0 - active_version_no: int | None = None - version_count: int = 0 - run_count: int = 0 - last_run_status: AtpSimulationRunStatus | None = None - last_run_date: datetime | None = None - create_date: datetime - create_user: str | None = None - update_date: datetime - update_user: str | None = None - - -class AtpModelListResponse(BaseModel): - items: list[AtpModelSummary] - total: int - - -class AtpModelCreateRequest(BaseModel): - code: str = Field(min_length=1, max_length=64) - name: str = Field(min_length=1, max_length=255) - source_type: AtpModelSourceType = "atpdraw" - description: str = Field(default="", max_length=8000) - status: AtpModelStatus = "enabled" - tags_json: list[str] = Field(default_factory=list, max_length=128) - - -class AtpModelUpdateRequest(BaseModel): - name: str | None = Field(default=None, min_length=1, max_length=255) - source_type: AtpModelSourceType | None = None - description: str | None = Field(default=None, max_length=8000) - status: AtpModelStatus | None = None - tags_json: list[str] | None = Field(default=None, max_length=128) - - -class AtpModelVersionSummary(BaseModel): - id: str - model_id: str - version_no: int - version_tag: str | None = None - status: AtpModelVersionStatus - entry_file: str | None = None - change_note: str - artifact_manifest_json: dict[str, Any] = Field(default_factory=dict) - content_hash: str - atp_text_size: int - create_date: datetime - create_user: str | None = None - update_date: datetime - update_user: str | None = None - - -class AtpModelVersionDetail(AtpModelVersionSummary): - atp_text: str - graph_json: dict[str, Any] = Field(default_factory=dict) - - -class AtpModelVersionListResponse(BaseModel): - items: list[AtpModelVersionSummary] - total: int - - -class AtpModelVersionCreateRequest(BaseModel): - version_tag: str | None = Field(default=None, max_length=64) - status: AtpModelVersionStatus = "released" - entry_file: str | None = Field(default=None, max_length=255) - change_note: str = Field(default="", max_length=8000) - artifact_manifest_json: dict[str, Any] = Field(default_factory=dict) - graph_json: dict[str, Any] = Field(default_factory=dict) - atp_text: str = Field(min_length=1) - - -class AtpModelVersionUpdateRequest(BaseModel): - version_tag: str | None = Field(default=None, max_length=64) - status: AtpModelVersionStatus | None = None - entry_file: str | None = Field(default=None, max_length=255) - change_note: str | None = Field(default=None, max_length=8000) - artifact_manifest_json: dict[str, Any] | None = None - graph_json: dict[str, Any] | None = None - atp_text: str | None = Field(default=None, min_length=1) - - -class AtpSimulationRunSummary(BaseModel): - id: str - model_id: str - version_id: str | None = None - version_no: int | None = None - task_id: str | None = None - status: AtpSimulationRunStatus - engine_mode: AtpEngineMode - engine_command: str | None = None - working_dir: str | None = None - timeout_seconds: int - exit_code: int | None = None - started_at: datetime | None = None - finished_at: datetime | None = None - duration_ms: int | None = None - error_message: str | None = None - stdout_size: int = 0 - stderr_size: int = 0 - create_date: datetime - create_user: str | None = None - - -class AtpSimulationRunDetail(AtpSimulationRunSummary): - stdout_text: str | None = None - stderr_text: str | None = None - - -class AtpSimulationRunListResponse(BaseModel): - items: list[AtpSimulationRunSummary] - total: int - - -class AtpSimulationRunRequest(BaseModel): - version_id: str | None = None - version_no: int | None = Field(default=None, ge=1) - timeout_seconds: int | None = Field(default=None, ge=1) - extra_args: list[str] = Field(default_factory=list, max_length=32) - environment: dict[str, str] = Field(default_factory=dict, max_length=16) - dry_run: bool = False - - -class AtpEngineStatusResponse(BaseModel): - mode: AtpEngineMode - available: bool - executable_path: str - resolved_executable: str | None = None - storage_root: str - workdir: str - default_timeout_seconds: int - max_timeout_seconds: int - checks: dict[str, dict[str, Any]] = Field(default_factory=dict) - error: str | None = None diff --git a/api/app/services/atp_asset_service.py b/api/app/services/atp_asset_service.py index b5328cb..5c443df 100644 --- a/api/app/services/atp_asset_service.py +++ b/api/app/services/atp_asset_service.py @@ -41,8 +41,11 @@ from ..schemas.atp_asset import ( AtpAssetRunSummary, AtpAssetSummary, AtpAssetUpdateRequest, + AtpEngineStatusResponse, ) +from .legacy_atp_adapter import build_legacy_atp_status_checks from .push_service import publish_topic +from .wine_probe import probe_wine_binary from .storage_driver import ( StorageDriver, StorageDriverError, @@ -164,6 +167,85 @@ def _resolve_binary(raw_path: str) -> str | None: return None +def _resolve_storage_root() -> Path: + root = Path(settings.atp_storage_root).expanduser() + return root.resolve(strict=False) + + +def _resolve_engine_workdir() -> Path: + configured = Path(settings.atp_engine_workdir).expanduser() + if configured.is_absolute(): + return configured.resolve(strict=False) + return (_resolve_storage_root() / configured).resolve(strict=False) + + +def _resolve_wine_engine_executable() -> tuple[str | None, str | None, str | None]: + wine_binary = _resolve_binary(settings.wine_binary_path) + if not wine_binary: + return None, None, "Wine binary not found" + + allowed_root = Path(settings.wine_allowed_root).expanduser().resolve(strict=False) + configured = Path(settings.atp_engine_executable).expanduser() + if not configured.is_absolute(): + configured = (allowed_root / configured).resolve(strict=False) + else: + configured = configured.resolve(strict=False) + + if not configured.is_relative_to(allowed_root): + return wine_binary, None, f"ATP engine executable must be inside {allowed_root}" + if not configured.exists() or not configured.is_file(): + return wine_binary, None, f"ATP engine executable not found: {configured}" + + probe = probe_wine_binary(wine_binary) + return wine_binary, str(configured), None if probe.available else (probe.error or "Wine binary unavailable") + + +def _resolve_native_engine_executable() -> tuple[str | None, str | None]: + resolved = _resolve_binary(settings.atp_engine_executable) + if not resolved: + return None, "ATP engine executable not found" + return resolved, None + + +def get_engine_status() -> AtpEngineStatusResponse: + mode = _resolve_engine_mode() + storage_root = str(_resolve_storage_root()) + workdir = str(_resolve_engine_workdir()) + checks = build_legacy_atp_status_checks() + + if mode == "wine": + wine_binary, resolved_engine, error = _resolve_wine_engine_executable() + available = error is None + executable_path = settings.atp_engine_executable.strip() + resolved_binary = f"{wine_binary} -> {resolved_engine}" if wine_binary and resolved_engine else wine_binary + return AtpEngineStatusResponse( + mode="wine", + available=available, + executable_path=executable_path, + resolved_executable=resolved_binary, + storage_root=storage_root, + workdir=workdir, + default_timeout_seconds=settings.atp_engine_default_timeout_seconds, + max_timeout_seconds=settings.atp_engine_max_timeout_seconds, + checks=checks, + error=error, + ) + + resolved_engine, error = _resolve_native_engine_executable() + return AtpEngineStatusResponse( + mode="native", + available=error is None, + executable_path=settings.atp_engine_executable.strip(), + resolved_executable=resolved_engine, + storage_root=storage_root, + workdir=workdir, + default_timeout_seconds=settings.atp_engine_default_timeout_seconds, + max_timeout_seconds=settings.atp_engine_max_timeout_seconds, + checks=checks, + error=error, + ) + + def _resolve_mount(db: Session, mount_code: str) -> FileStorageMount: mount = db.execute( select(FileStorageMount) diff --git a/api/app/services/atp_model_service.py b/api/app/services/atp_model_service.py deleted file mode 100644 index d9588c1..0000000 --- a/api/app/services/atp_model_service.py +++ /dev/null @@ -1,1107 +0,0 @@ -from __future__ import annotations - -import asyncio -import hashlib -import json -import os -from pathlib import Path -import re -import shutil -import subprocess -import time -from typing import Any - -from fastapi import HTTPException, status -from sqlalchemy import func, or_, select -from sqlalchemy.orm import Session - -from ..core.config import get_settings -from ..models.atp_model import AtpModel, AtpModelVersion, AtpSimulationRun -from ..models.base import utcnow -from ..schemas.atp_model import ( - AtpEngineStatusResponse, - AtpModelCreateRequest, - AtpModelListResponse, - AtpModelSummary, - AtpModelUpdateRequest, - AtpModelVersionCreateRequest, - AtpModelVersionDetail, - AtpModelVersionListResponse, - AtpModelVersionSummary, - AtpModelVersionUpdateRequest, - AtpSimulationRunDetail, - AtpSimulationRunListResponse, - AtpSimulationRunRequest, - AtpSimulationRunSummary, -) -from .push_service import publish_topic -from .legacy_atp_adapter import build_legacy_atp_status_checks -from .wine_probe import probe_wine_binary - - -settings = get_settings() -ATP_TOPIC = "admin.atp-models" -VALID_MODEL_STATUS = {"enabled", "disabled"} -VALID_VERSION_STATUS = {"draft", "released", "archived"} -LOG_MAX_CHARS = 200_000 -FILENAME_SANITIZE_PATTERN = re.compile(r"[^A-Za-z0-9._-]+") - - -def _fire_and_forget(coro: Any) -> None: - try: - loop = asyncio.get_running_loop() - except RuntimeError: - close = getattr(coro, "close", None) - if callable(close): - close() - return - loop.create_task(coro) - - -def _normalize_optional_str(value: str | None) -> str | None: - if value is None: - return None - normalized = value.strip() - return normalized or None - - -def _normalize_tags(values: list[str] | None) -> list[str]: - if not values: - return [] - dedup: dict[str, None] = {} - for candidate in values: - normalized = candidate.strip() - if not normalized: - continue - dedup[normalized] = None - return list(dedup.keys())[:128] - - -def _hash_text(value: str) -> str: - return hashlib.sha256(value.encode("utf-8", errors="ignore")).hexdigest() - - -def _truncate_output(value: str | None) -> str | None: - if value is None: - return None - if len(value) <= LOG_MAX_CHARS: - return value - return f"{value[:LOG_MAX_CHARS]}\n...[truncated]" - - -def _safe_entry_filename(raw_name: str | None, *, model_code: str, version_no: int) -> str: - fallback = f"{model_code}_v{version_no}.atp" - if not raw_name: - return fallback - - filename = Path(raw_name).name.strip() - if not filename: - return fallback - - cleaned = FILENAME_SANITIZE_PATTERN.sub("_", filename) - cleaned = cleaned.strip("._") - if not cleaned: - return fallback - - if len(cleaned) > 220: - stem, suffix = os.path.splitext(cleaned) - cleaned = f"{stem[:200]}{suffix[:20]}" - - return cleaned - - -def _resolve_timeout(payload_timeout: int | None) -> int: - timeout_seconds = payload_timeout or settings.atp_engine_default_timeout_seconds - if timeout_seconds > settings.atp_engine_max_timeout_seconds: - raise HTTPException( - status_code=status.HTTP_400_BAD_REQUEST, - detail=f"timeout_seconds cannot exceed {settings.atp_engine_max_timeout_seconds}", - ) - return timeout_seconds - - -def _resolve_engine_mode() -> str: - mode = settings.atp_engine_mode.strip().lower() - return "native" if mode == "native" else "wine" - - -def _resolve_storage_root() -> Path: - root = Path(settings.atp_storage_root).expanduser() - return root.resolve(strict=False) - - -def _resolve_engine_workdir() -> Path: - configured = Path(settings.atp_engine_workdir).expanduser() - if configured.is_absolute(): - return configured.resolve(strict=False) - return (_resolve_storage_root() / configured).resolve(strict=False) - - -def _resolve_binary(raw_path: str) -> str | None: - configured = raw_path.strip() - if not configured: - return None - - resolved = shutil.which(configured) - if resolved: - return resolved - - candidate = Path(configured).expanduser() - if candidate.exists() and candidate.is_file() and os.access(candidate, os.X_OK): - return str(candidate.resolve()) - return None - - -def _resolve_wine_engine_executable() -> tuple[str | None, str | None, str | None]: - wine_binary = _resolve_binary(settings.wine_binary_path) - if not wine_binary: - return None, None, "Wine binary not found" - - allowed_root = Path(settings.wine_allowed_root).expanduser().resolve(strict=False) - configured = Path(settings.atp_engine_executable).expanduser() - if not configured.is_absolute(): - configured = (allowed_root / configured).resolve(strict=False) - else: - configured = configured.resolve(strict=False) - - if not configured.is_relative_to(allowed_root): - return wine_binary, None, f"ATP engine executable must be inside {allowed_root}" - if not configured.exists() or not configured.is_file(): - return wine_binary, None, f"ATP engine executable not found: {configured}" - - probe = probe_wine_binary(wine_binary) - return wine_binary, str(configured), None if probe.available else (probe.error or "Wine binary unavailable") - - -def _resolve_native_engine_executable() -> tuple[str | None, str | None]: - resolved = _resolve_binary(settings.atp_engine_executable) - if not resolved: - return None, "ATP engine executable not found" - return resolved, None - - -def get_engine_status() -> AtpEngineStatusResponse: - mode = _resolve_engine_mode() - storage_root = str(_resolve_storage_root()) - workdir = str(_resolve_engine_workdir()) - checks = build_legacy_atp_status_checks() - - if mode == "wine": - wine_binary, resolved_engine, error = _resolve_wine_engine_executable() - available = error is None - executable_path = settings.atp_engine_executable.strip() - resolved_binary = f"{wine_binary} -> {resolved_engine}" if wine_binary and resolved_engine else wine_binary - return AtpEngineStatusResponse( - mode="wine", - available=available, - executable_path=executable_path, - resolved_executable=resolved_binary, - storage_root=storage_root, - workdir=workdir, - default_timeout_seconds=settings.atp_engine_default_timeout_seconds, - max_timeout_seconds=settings.atp_engine_max_timeout_seconds, - checks=checks, - error=error, - ) - - resolved_engine, error = _resolve_native_engine_executable() - return AtpEngineStatusResponse( - mode="native", - available=error is None, - executable_path=settings.atp_engine_executable.strip(), - resolved_executable=resolved_engine, - storage_root=storage_root, - workdir=workdir, - default_timeout_seconds=settings.atp_engine_default_timeout_seconds, - max_timeout_seconds=settings.atp_engine_max_timeout_seconds, - checks=checks, - error=error, - ) - - -def serialize_model( - item: AtpModel, - *, - version_count: int, - run_count: int, - last_run_status: str | None, - last_run_date, -) -> AtpModelSummary: - return AtpModelSummary( - id=item.id, - code=item.code, - name=item.name, - source_type=item.source_type, # type: ignore[arg-type] - description=item.description, - status=item.status, # type: ignore[arg-type] - tags_json=item.tags_json or [], - latest_version_no=item.latest_version_no, - active_version_no=item.active_version_no, - version_count=version_count, - run_count=run_count, - last_run_status=last_run_status, # type: ignore[arg-type] - last_run_date=last_run_date, - create_date=item.create_date, - create_user=item.create_user, - update_date=item.update_date, - update_user=item.update_user, - ) - - -def serialize_version(item: AtpModelVersion) -> AtpModelVersionSummary: - return AtpModelVersionSummary( - id=item.id, - model_id=item.model_id, - version_no=item.version_no, - version_tag=item.version_tag, - status=item.status, # type: ignore[arg-type] - entry_file=item.entry_file, - change_note=item.change_note, - artifact_manifest_json=item.artifact_manifest_json or {}, - content_hash=item.content_hash, - atp_text_size=len(item.atp_text or ""), - create_date=item.create_date, - create_user=item.create_user, - update_date=item.update_date, - update_user=item.update_user, - ) - - -def serialize_version_detail(item: AtpModelVersion) -> AtpModelVersionDetail: - summary = serialize_version(item) - return AtpModelVersionDetail( - **summary.model_dump(), - atp_text=item.atp_text or "", - graph_json=item.graph_json or {}, - ) - - -def serialize_run(item: AtpSimulationRun) -> AtpSimulationRunSummary: - version_no = item.version.version_no if item.version is not None else None - stdout_text = item.stdout_text or "" - stderr_text = item.stderr_text or "" - - return AtpSimulationRunSummary( - id=item.id, - model_id=item.model_id, - version_id=item.version_id, - version_no=version_no, - task_id=item.task_id, - status=item.status, # type: ignore[arg-type] - engine_mode=item.engine_mode, # type: ignore[arg-type] - engine_command=item.engine_command, - working_dir=item.working_dir, - timeout_seconds=item.timeout_seconds, - exit_code=item.exit_code, - started_at=item.started_at, - finished_at=item.finished_at, - duration_ms=item.duration_ms, - error_message=item.error_message, - stdout_size=len(stdout_text), - stderr_size=len(stderr_text), - create_date=item.create_date, - create_user=item.create_user, - ) - - -def serialize_run_detail(item: AtpSimulationRun) -> AtpSimulationRunDetail: - summary = serialize_run(item) - return AtpSimulationRunDetail( - **summary.model_dump(), - stdout_text=item.stdout_text, - stderr_text=item.stderr_text, - ) - - -def list_models( - db: Session, - *, - keyword: str | None, - status_filter: str | None, - limit: int = 100, - offset: int = 0, -) -> AtpModelListResponse: - stmt = select(AtpModel) - total_stmt = select(func.count()).select_from(AtpModel) - - normalized_keyword = (keyword or "").strip() - if normalized_keyword: - like = f"%{normalized_keyword}%" - predicate = or_(AtpModel.code.ilike(like), AtpModel.name.ilike(like), AtpModel.description.ilike(like)) - stmt = stmt.where(predicate) - total_stmt = total_stmt.where(predicate) - - normalized_status = (status_filter or "").strip().lower() - if normalized_status: - if normalized_status not in VALID_MODEL_STATUS: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid status filter: {status_filter}") - stmt = stmt.where(AtpModel.status == normalized_status) - total_stmt = total_stmt.where(AtpModel.status == normalized_status) - - total = int(db.scalar(total_stmt) or 0) - items = db.execute( - stmt.order_by(AtpModel.update_date.desc(), AtpModel.code.asc()) - .offset(offset) - .limit(limit) - ).scalars().all() - model_ids = [item.id for item in items] - - version_count_map = _load_model_version_count_map(db, model_ids) - run_count_map = _load_model_run_count_map(db, model_ids) - last_run_map = _load_model_last_run_map(db, model_ids) - - return AtpModelListResponse( - items=[ - serialize_model( - item, - version_count=version_count_map.get(item.id, 0), - run_count=run_count_map.get(item.id, 0), - last_run_status=last_run_map.get(item.id, (None, None))[0], - last_run_date=last_run_map.get(item.id, (None, None))[1], - ) - for item in items - ], - total=total, - ) - - -def get_model_by_id(db: Session, model_id: str) -> AtpModel | None: - return db.execute(select(AtpModel).where(AtpModel.id == model_id)).scalar_one_or_none() - - -def get_model_by_code(db: Session, code: str) -> AtpModel | None: - normalized = code.strip().lower() - if not normalized: - return None - return db.execute(select(AtpModel).where(func.lower(AtpModel.code) == normalized)).scalar_one_or_none() - - -def create_model( - db: Session, - payload: AtpModelCreateRequest, - *, - actor_user_id: str, -) -> AtpModelSummary | None: - if get_model_by_code(db, payload.code): - return None - - now = utcnow() - item = AtpModel( - code=payload.code.strip(), - name=payload.name.strip(), - source_type=payload.source_type, - description=payload.description.strip(), - status=payload.status, - tags_json=_normalize_tags(payload.tags_json), - latest_version_no=0, - active_version_no=None, - create_user=actor_user_id, - update_user=actor_user_id, - create_date=now, - update_date=now, - ) - db.add(item) - db.commit() - - saved = get_model_by_id(db, item.id) - if not saved: - return None - - _publish_change("model.created", {"action": "created", "model_id": saved.id}) - return serialize_model(saved, version_count=0, run_count=0, last_run_status=None, last_run_date=None) - - -def update_model( - db: Session, - model_id: str, - payload: AtpModelUpdateRequest, - *, - actor_user_id: str, -) -> AtpModelSummary | None: - item = get_model_by_id(db, model_id) - if not item: - return None - - update_data = payload.model_dump(exclude_unset=True) - - if "name" in update_data and update_data["name"] is not None: - item.name = str(update_data["name"]).strip() - if "source_type" in update_data and update_data["source_type"] is not None: - item.source_type = str(update_data["source_type"]) - if "description" in update_data and update_data["description"] is not None: - item.description = str(update_data["description"]).strip() - if "status" in update_data and update_data["status"] is not None: - item.status = str(update_data["status"]) - if "tags_json" in update_data: - item.tags_json = _normalize_tags(update_data["tags_json"]) - - item.update_user = actor_user_id - item.update_date = utcnow() - db.commit() - - saved = get_model_by_id(db, model_id) - if not saved: - return None - - version_count = _load_model_version_count_map(db, [saved.id]).get(saved.id, 0) - run_count = _load_model_run_count_map(db, [saved.id]).get(saved.id, 0) - last_run_status, last_run_date = _load_model_last_run_map(db, [saved.id]).get(saved.id, (None, None)) - _publish_change("model.updated", {"action": "updated", "model_id": saved.id}) - return serialize_model( - saved, - version_count=version_count, - run_count=run_count, - last_run_status=last_run_status, - last_run_date=last_run_date, - ) - - -def delete_model(db: Session, model_id: str) -> bool: - item = get_model_by_id(db, model_id) - if not item: - return False - - db.delete(item) - db.commit() - _publish_change("model.deleted", {"action": "deleted", "model_id": model_id}) - return True - - -def list_model_versions( - db: Session, - *, - model_id: str, - limit: int, - offset: int, -) -> AtpModelVersionListResponse: - total = int( - db.scalar( - select(func.count()) - .select_from(AtpModelVersion) - .where(AtpModelVersion.model_id == model_id) - ) - or 0 - ) - items = db.execute( - select(AtpModelVersion) - .where(AtpModelVersion.model_id == model_id) - .order_by(AtpModelVersion.version_no.desc(), AtpModelVersion.id.desc()) - .offset(offset) - .limit(limit) - ).scalars().all() - - return AtpModelVersionListResponse(items=[serialize_version(item) for item in items], total=total) - - -def get_model_version_by_id(db: Session, *, model_id: str, version_id: str) -> AtpModelVersion | None: - return db.execute( - select(AtpModelVersion).where( - AtpModelVersion.model_id == model_id, - AtpModelVersion.id == version_id, - ) - ).scalar_one_or_none() - - -def create_model_version( - db: Session, - *, - model_id: str, - payload: AtpModelVersionCreateRequest, - actor_user_id: str, -) -> AtpModelVersionDetail: - model = get_model_by_id(db, model_id) - if not model: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - - max_version_no = int( - db.scalar( - select(func.max(AtpModelVersion.version_no)).where(AtpModelVersion.model_id == model_id) - ) - or 0 - ) - next_version_no = max_version_no + 1 - now = utcnow() - content = payload.atp_text - - item = AtpModelVersion( - model_id=model_id, - version_no=next_version_no, - version_tag=_normalize_optional_str(payload.version_tag), - status=payload.status, - entry_file=_normalize_optional_str(payload.entry_file), - change_note=payload.change_note.strip(), - artifact_manifest_json=payload.artifact_manifest_json, - graph_json=payload.graph_json, - atp_text=content, - content_hash=_hash_text(content), - create_user=actor_user_id, - update_user=actor_user_id, - create_date=now, - update_date=now, - ) - db.add(item) - - model.latest_version_no = max(model.latest_version_no, next_version_no) - if model.active_version_no is None and payload.status != "archived": - model.active_version_no = next_version_no - model.update_user = actor_user_id - model.update_date = now - - db.commit() - - saved = get_model_version_by_id(db, model_id=model_id, version_id=item.id) - if not saved: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Version save failed") - - _publish_change( - "version.created", - { - "action": "version_created", - "model_id": model_id, - "version_id": saved.id, - "version_no": saved.version_no, - }, - ) - return serialize_version_detail(saved) - - -def update_model_version( - db: Session, - *, - model_id: str, - version_id: str, - payload: AtpModelVersionUpdateRequest, - actor_user_id: str, -) -> AtpModelVersionDetail: - item = get_model_version_by_id(db, model_id=model_id, version_id=version_id) - if not item: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Version not found") - - model = get_model_by_id(db, model_id) - if not model: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - - update_data = payload.model_dump(exclude_unset=True) - - if "version_tag" in update_data: - item.version_tag = _normalize_optional_str(update_data["version_tag"]) - if "status" in update_data and update_data["status"] is not None: - item.status = str(update_data["status"]) - if "entry_file" in update_data: - item.entry_file = _normalize_optional_str(update_data["entry_file"]) - if "change_note" in update_data and update_data["change_note"] is not None: - item.change_note = str(update_data["change_note"]).strip() - if "artifact_manifest_json" in update_data and update_data["artifact_manifest_json"] is not None: - item.artifact_manifest_json = dict(update_data["artifact_manifest_json"]) - if "graph_json" in update_data and update_data["graph_json"] is not None: - item.graph_json = dict(update_data["graph_json"]) - if "atp_text" in update_data and update_data["atp_text"] is not None: - content = str(update_data["atp_text"]) - item.atp_text = content - item.content_hash = _hash_text(content) - - now = utcnow() - item.update_user = actor_user_id - item.update_date = now - - if item.status == "archived" and model.active_version_no == item.version_no: - model.active_version_no = None - model.latest_version_no = max(model.latest_version_no, item.version_no) - model.update_user = actor_user_id - model.update_date = now - - db.commit() - - saved = get_model_version_by_id(db, model_id=model_id, version_id=version_id) - if not saved: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Version load failed") - - _publish_change( - "version.updated", - { - "action": "version_updated", - "model_id": model_id, - "version_id": saved.id, - "version_no": saved.version_no, - }, - ) - return serialize_version_detail(saved) - - -def activate_model_version( - db: Session, - *, - model_id: str, - version_id: str, - actor_user_id: str, -) -> AtpModelSummary: - model = get_model_by_id(db, model_id) - if not model: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - - version = get_model_version_by_id(db, model_id=model_id, version_id=version_id) - if not version: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Version not found") - if version.status == "archived": - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Archived version cannot be activated") - - model.active_version_no = version.version_no - model.latest_version_no = max(model.latest_version_no, version.version_no) - model.update_user = actor_user_id - model.update_date = utcnow() - - db.commit() - - version_count = _load_model_version_count_map(db, [model.id]).get(model.id, 0) - run_count = _load_model_run_count_map(db, [model.id]).get(model.id, 0) - last_run_status, last_run_date = _load_model_last_run_map(db, [model.id]).get(model.id, (None, None)) - - _publish_change( - "version.activated", - { - "action": "version_activated", - "model_id": model.id, - "version_id": version.id, - "version_no": version.version_no, - }, - ) - return serialize_model( - model, - version_count=version_count, - run_count=run_count, - last_run_status=last_run_status, - last_run_date=last_run_date, - ) - - -def list_model_runs( - db: Session, - *, - model_id: str, - limit: int, - offset: int, -) -> AtpSimulationRunListResponse: - total = int( - db.scalar( - select(func.count()) - .select_from(AtpSimulationRun) - .where(AtpSimulationRun.model_id == model_id) - ) - or 0 - ) - - runs = db.execute( - select(AtpSimulationRun) - .where(AtpSimulationRun.model_id == model_id) - .order_by(AtpSimulationRun.create_date.desc(), AtpSimulationRun.id.desc()) - .offset(offset) - .limit(limit) - ).scalars().all() - - return AtpSimulationRunListResponse(items=[serialize_run(item) for item in runs], total=total) - - -def get_model_run_by_id(db: Session, *, model_id: str, run_id: str) -> AtpSimulationRun | None: - return db.execute( - select(AtpSimulationRun).where( - AtpSimulationRun.model_id == model_id, - AtpSimulationRun.id == run_id, - ) - ).scalar_one_or_none() - - -def get_model_run_detail(db: Session, *, model_id: str, run_id: str) -> AtpSimulationRunDetail: - run = get_model_run_by_id(db, model_id=model_id, run_id=run_id) - if not run: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Run not found") - return serialize_run_detail(run) - - -def run_model_version( - db: Session, - *, - model_id: str, - payload: AtpSimulationRunRequest, - actor_user_id: str, -) -> AtpSimulationRunDetail: - model = get_model_by_id(db, model_id) - if not model: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Model not found") - - version = _resolve_target_version(db, model=model, payload=payload) - timeout_seconds = _resolve_timeout(payload.timeout_seconds) - - run = AtpSimulationRun( - model_id=model.id, - version_id=version.id, - status="pending", - engine_mode=_resolve_engine_mode(), - timeout_seconds=timeout_seconds, - create_user=actor_user_id, - update_user=actor_user_id, - ) - db.add(run) - db.flush() - run.update_date = utcnow() - db.commit() - - if payload.dry_run: - execute_model_run_job( - run_id=run.id, - payload_data=payload.model_dump(), - actor_user_id=actor_user_id, - ) - db.expire_all() - else: - try: - task = _dispatch_atp_model_run_task( - run_id=run.id, - payload_data=payload.model_dump(), - actor_user_id=actor_user_id, - ) - except Exception as exc: - _mark_run_failed( - db, - run=run, - actor_user_id=actor_user_id, - reason=f"Celery dispatch failed: {exc}", - ) - saved = get_model_run_by_id(db, model_id=model.id, run_id=run.id) - if not saved: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Run save failed") from exc - return serialize_run_detail(saved) - run.task_id = str(task.id) - run.update_date = utcnow() - run.update_user = actor_user_id - db.commit() - - _publish_change( - "run.queued", - { - "action": "run_queued", - "model_id": model.id, - "version_id": version.id, - "run_id": run.id, - "task_id": run.task_id, - }, - ) - - saved = get_model_run_by_id(db, model_id=model.id, run_id=run.id) - if not saved: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Run save failed") - return serialize_run_detail(saved) - - -def execute_model_run_job( - *, - run_id: str, - payload_data: dict[str, Any], - actor_user_id: str | None, -) -> None: - from ..core.database import SessionLocal - - db = SessionLocal() - try: - run = db.execute(select(AtpSimulationRun).where(AtpSimulationRun.id == run_id)).scalar_one_or_none() - if run is None or run.status in {"success", "failed"}: - return - - model = get_model_by_id(db, run.model_id) - if model is None: - return - - if run.version_id is None: - _mark_run_failed( - db, - run=run, - actor_user_id=actor_user_id, - reason="Version not found", - ) - return - - version = get_model_version_by_id(db, model_id=model.id, version_id=run.version_id) - if version is None: - _mark_run_failed( - db, - run=run, - actor_user_id=actor_user_id, - reason="Version not found", - ) - return - - payload = AtpSimulationRunRequest.model_validate(payload_data) - run.started_at = utcnow() - run.status = "running" - run.error_message = None - run.finished_at = None - run.duration_ms = None - run.update_date = utcnow() - run.update_user = actor_user_id - - command, working_dir, error = _build_run_command(model=model, version=version, run=run, payload=payload) - run.engine_command = " ".join(command) if command else None - run.working_dir = str(working_dir) if working_dir else None - - if error: - _mark_run_failed(db, run=run, actor_user_id=actor_user_id, reason=error) - return - - if payload.dry_run: - run.status = "success" - run.exit_code = 0 - run.stdout_text = json.dumps( - { - "dry_run": True, - "command": command, - "working_dir": str(working_dir), - "timeout_seconds": run.timeout_seconds, - }, - ensure_ascii=False, - indent=2, - ) - run.stderr_text = "" - run.finished_at = utcnow() - run.duration_ms = 0 - run.update_user = actor_user_id - run.update_date = utcnow() - db.commit() - _publish_change( - "run.finished", - { - "action": "run_dry_finished", - "model_id": model.id, - "version_id": version.id, - "run_id": run.id, - "status": run.status, - }, - ) - return - - db.commit() - - env = os.environ.copy() - env.update(payload.environment) - started_perf = time.perf_counter() - try: - result = subprocess.run( - command, - cwd=str(working_dir), - env=env, - capture_output=True, - text=True, - encoding="utf-8", - errors="replace", - timeout=run.timeout_seconds, - check=False, - ) - run.exit_code = result.returncode - run.stdout_text = _truncate_output(result.stdout) - run.stderr_text = _truncate_output(result.stderr) - if result.returncode == 0: - run.status = "success" - run.error_message = None - else: - run.status = "failed" - run.error_message = f"ATP engine exited with code {result.returncode}" - except subprocess.TimeoutExpired as exc: - run.status = "failed" - run.exit_code = None - run.stdout_text = _truncate_output((exc.stdout or "") if isinstance(exc.stdout, str) else "") - run.stderr_text = _truncate_output((exc.stderr or "") if isinstance(exc.stderr, str) else "") - run.error_message = f"Execution timed out after {run.timeout_seconds} seconds" - except OSError as exc: - run.status = "failed" - run.exit_code = None - run.stdout_text = None - run.stderr_text = None - run.error_message = str(exc) - - run.duration_ms = max(int((time.perf_counter() - started_perf) * 1000), 0) - run.finished_at = utcnow() - run.update_user = actor_user_id - run.update_date = utcnow() - db.commit() - _publish_change( - "run.finished", - { - "action": "run_finished", - "model_id": model.id, - "version_id": version.id, - "run_id": run.id, - "status": run.status, - "exit_code": run.exit_code, - }, - ) - finally: - db.close() - - -def _dispatch_atp_model_run_task(*, run_id: str, payload_data: dict[str, Any], actor_user_id: str | None): - from ..tasks.atp_model_tasks import execute_atp_model_run_job - - return execute_atp_model_run_job.delay(run_id, payload_data, actor_user_id) - - -def _mark_run_failed( - db: Session, - *, - run: AtpSimulationRun, - actor_user_id: str | None, - reason: str, -) -> None: - run.status = "failed" - run.error_message = reason - run.finished_at = utcnow() - run.duration_ms = 0 if run.duration_ms is None else run.duration_ms - run.update_user = actor_user_id - run.update_date = utcnow() - db.commit() - _publish_change( - "run.failed", - { - "action": "run_failed", - "model_id": run.model_id, - "version_id": run.version_id, - "run_id": run.id, - "reason": reason, - }, - ) - - -def _resolve_target_version(db: Session, *, model: AtpModel, payload: AtpSimulationRunRequest) -> AtpModelVersion: - if payload.version_id: - matched = get_model_version_by_id(db, model_id=model.id, version_id=payload.version_id) - if not matched: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Version not found") - return matched - - if payload.version_no is not None: - matched = db.execute( - select(AtpModelVersion).where( - AtpModelVersion.model_id == model.id, - AtpModelVersion.version_no == payload.version_no, - ) - ).scalar_one_or_none() - if not matched: - raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Version not found") - return matched - - if model.active_version_no is not None: - matched = db.execute( - select(AtpModelVersion).where( - AtpModelVersion.model_id == model.id, - AtpModelVersion.version_no == model.active_version_no, - ) - ).scalar_one_or_none() - if matched is not None: - return matched - - matched = db.execute( - select(AtpModelVersion) - .where(AtpModelVersion.model_id == model.id) - .order_by(AtpModelVersion.version_no.desc(), AtpModelVersion.id.desc()) - .limit(1) - ).scalar_one_or_none() - if not matched: - raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="No version available for simulation") - return matched - - -def _build_run_command( - *, - model: AtpModel, - version: AtpModelVersion, - run: AtpSimulationRun, - payload: AtpSimulationRunRequest, -) -> tuple[list[str] | None, Path | None, str | None]: - storage_root = _resolve_storage_root() - workdir_base = _resolve_engine_workdir() - - try: - storage_root.mkdir(parents=True, exist_ok=True) - workdir_base.mkdir(parents=True, exist_ok=True) - except OSError as exc: - return None, None, f"Failed to prepare ATP storage directory: {exc}" - - run_dir = workdir_base / model.code / f"v{version.version_no}" / run.id - try: - run_dir.mkdir(parents=True, exist_ok=True) - except OSError as exc: - return None, None, f"Failed to prepare run directory: {exc}" - - entry_filename = _safe_entry_filename(version.entry_file, model_code=model.code, version_no=version.version_no) - input_path = run_dir / entry_filename - - try: - input_path.write_text(version.atp_text or "", encoding="utf-8") - except OSError as exc: - return None, run_dir, f"Failed to write ATP input file: {exc}" - - mode = _resolve_engine_mode() - extra_args = [arg for arg in payload.extra_args if arg] - - if mode == "wine": - wine_binary, resolved_engine, error = _resolve_wine_engine_executable() - if error or not wine_binary or not resolved_engine: - return None, run_dir, error or "Wine ATP engine unavailable" - command = [wine_binary, resolved_engine, str(input_path), *extra_args] - return command, run_dir, None - - resolved_engine, error = _resolve_native_engine_executable() - if error or not resolved_engine: - return None, run_dir, error or "Native ATP engine unavailable" - command = [resolved_engine, str(input_path), *extra_args] - return command, run_dir, None - - -def _load_model_version_count_map(db: Session, model_ids: list[str]) -> dict[str, int]: - if not model_ids: - return {} - rows = db.execute( - select(AtpModelVersion.model_id, func.count()) - .where(AtpModelVersion.model_id.in_(model_ids)) - .group_by(AtpModelVersion.model_id) - ).all() - return {str(model_id): int(count) for model_id, count in rows} - - -def _load_model_run_count_map(db: Session, model_ids: list[str]) -> dict[str, int]: - if not model_ids: - return {} - rows = db.execute( - select(AtpSimulationRun.model_id, func.count()) - .where(AtpSimulationRun.model_id.in_(model_ids)) - .group_by(AtpSimulationRun.model_id) - ).all() - return {str(model_id): int(count) for model_id, count in rows} - - -def _load_model_last_run_map(db: Session, model_ids: list[str]) -> dict[str, tuple[str | None, Any]]: - if not model_ids: - return {} - - rows = db.execute( - select(AtpSimulationRun) - .where(AtpSimulationRun.model_id.in_(model_ids)) - .order_by(AtpSimulationRun.model_id.asc(), AtpSimulationRun.create_date.desc(), AtpSimulationRun.id.desc()) - ).scalars().all() - - result: dict[str, tuple[str | None, Any]] = {} - for row in rows: - if row.model_id in result: - continue - result[row.model_id] = (row.status, row.create_date) - return result - - -def _publish_change(event_name: str, payload: dict[str, Any]) -> None: - _fire_and_forget( - publish_topic( - ATP_TOPIC, - name=event_name, - payload=payload, - requires_refetch=[], - dedupe_key=f"atp:{event_name}:{payload.get('model_id', '-')}:" - f"{payload.get('version_id', payload.get('run_id', '-'))}", - ) - ) diff --git a/api/app/services/fl_analysis_service.py b/api/app/services/fl_analysis_service.py index 882a0f6..82c9191 100644 --- a/api/app/services/fl_analysis_service.py +++ b/api/app/services/fl_analysis_service.py @@ -31,7 +31,7 @@ from ..schemas.fl_analysis import ( FlAnalysisTowerResultListResponse, FlAnalysisTowerResultSummary, ) -from .atp_model_service import _truncate_output +from .atp_asset_service import _truncate_output from .fl_analysis_external import execute_external_waveform_tower_analysis, resolve_external_waveform_job from .legacy_atp_adapter import execute_legacy_atp_tower_analysis, resolve_legacy_atp_job from .fl_analysis_report import build_report_document, build_report_summary_payload diff --git a/api/app/tasks/atp_model_tasks.py b/api/app/tasks/atp_model_tasks.py deleted file mode 100644 index 2daf2a1..0000000 --- a/api/app/tasks/atp_model_tasks.py +++ /dev/null @@ -1,20 +0,0 @@ -from __future__ import annotations - -from typing import Any - -from ..core.celery_app import celery_app -from ..services.atp_model_service import execute_model_run_job - - -@celery_app.task(name="app.tasks.atp_model_tasks.execute_atp_model_run_job") -def execute_atp_model_run_job( - run_id: str, - payload_data: dict[str, Any], - actor_user_id: str | None, -) -> dict[str, str]: - execute_model_run_job( - run_id=run_id, - payload_data=payload_data, - actor_user_id=actor_user_id, - ) - return {"run_id": run_id, "status": "done"} diff --git a/api/tests/test_async_dispatch_services.py b/api/tests/test_async_dispatch_services.py deleted file mode 100644 index e5381ae..0000000 --- a/api/tests/test_async_dispatch_services.py +++ /dev/null @@ -1,437 +0,0 @@ -from __future__ import annotations - -import io -from pathlib import Path -from types import SimpleNamespace - -from sqlalchemy import create_engine, select -from sqlalchemy.orm import Session, sessionmaker - -from app.core.database import Base -from app.models.atp_model import AtpModel, AtpModelVersion, AtpSimulationRun -from app.models.elevation import ElevationDataImportJob, ElevationDataset, ElevationFileRecord -from app.models.user import User -from app.models.wine import WineRun -from app.schemas.atp_model import AtpSimulationRunRequest -from app.schemas.wine import WineRunRequest -from app.services import atp_model_service, elevation_service, wine_service - - -class _MemoryStorageDriver: - def __init__(self) -> None: - self.directories: set[str] = set() - self.files: dict[str, bytes] = {} - - def ensure_directory(self, path: str) -> None: - self.directories.add(path.rstrip("/") or "/") - - def write_file(self, path: str, *, content: bytes, content_type: str | None = None) -> SimpleNamespace: - self.files[path] = content - parent_path = path.rsplit("/", 1)[0] or "/" - return SimpleNamespace( - path=path, - parent_path=parent_path, - name=Path(path).name, - is_dir=False, - size=len(content), - modified_at=None, - mime_type=content_type, - ) - - def list_dir(self, path: str) -> list[SimpleNamespace]: - prefix = f"{path.rstrip('/')}/" - entries: list[SimpleNamespace] = [] - for file_path in sorted(self.files): - if not file_path.startswith(prefix): - continue - suffix = file_path[len(prefix):] - if "/" in suffix: - continue - entries.append( - SimpleNamespace( - path=file_path, - parent_path=path, - name=Path(file_path).name, - is_dir=False, - size=len(self.files[file_path]), - modified_at=None, - mime_type=None, - ) - ) - return entries - - def read_file(self, path: str) -> SimpleNamespace: - return SimpleNamespace( - path=path, - name=Path(path).name, - content=self.files[path], - mime_type=None, - ) - - def delete_path(self, path: str, *, is_dir: bool, recursive: bool) -> None: - normalized = path.rstrip("/") - if is_dir: - prefix = f"{normalized}/" - for file_path in list(self.files): - if file_path.startswith(prefix): - del self.files[file_path] - self.directories = {item for item in self.directories if item != normalized and not item.startswith(prefix)} - return - self.files.pop(path, None) - - -def _build_upload(filename: str, content: bytes, content_type: str = "application/octet-stream") -> SimpleNamespace: - return SimpleNamespace(filename=filename, file=io.BytesIO(content), content_type=content_type) - - -def _build_sessionmaker(*tables): - engine = create_engine("sqlite+pysqlite:///:memory:") - Base.metadata.create_all(bind=engine, tables=list(tables)) - return sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False) - - -def test_run_model_version_queues_celery_task(monkeypatch) -> None: - testing_session = _build_sessionmaker( - AtpModel.__table__, - AtpModelVersion.__table__, - AtpSimulationRun.__table__, - ) - session: Session = testing_session() - try: - model = AtpModel( - code="ATP-ASYNC-001", - name="异步仿真模型", - source_type="atp", - status="enabled", - latest_version_no=1, - active_version_no=1, - ) - session.add(model) - session.flush() - - version = AtpModelVersion( - model_id=model.id, - version_no=1, - status="released", - atp_text="sample", - content_hash="hash-v1", - ) - session.add(version) - session.commit() - - monkeypatch.setattr( - atp_model_service, - "_dispatch_atp_model_run_task", - lambda **_: SimpleNamespace(id="celery-atp-1"), - ) - - result = atp_model_service.run_model_version( - session, - model_id=model.id, - payload=AtpSimulationRunRequest(version_id=version.id), - actor_user_id="tester", - ) - - assert result.status == "pending" - assert result.task_id == "celery-atp-1" - - saved = session.execute(select(AtpSimulationRun).where(AtpSimulationRun.id == result.id)).scalar_one() - assert saved.task_id == "celery-atp-1" - assert saved.status == "pending" - assert saved.started_at is None - finally: - session.close() - - -def test_queue_dataset_analysis_reuses_existing_running_task(monkeypatch) -> None: - testing_session = _build_sessionmaker(ElevationDataset.__table__) - session: Session = testing_session() - try: - dataset = ElevationDataset( - code="ELEV-001", - name="样例高程集", - file_format="csv", - mount_code="default", - dataset_dir="/elevation/datasets/ELEV-001", - file_path="/elevation/datasets/ELEV-001/data.csv", - status="active", - usage_status="idle", - ) - session.add(dataset) - session.commit() - - actor = User( - id="actor-1", - email="actor@example.com", - username="actor", - password_hash="hashed", - status="active", - ) - - monkeypatch.setattr( - elevation_service, - "_dispatch_elevation_dataset_analysis_task", - lambda **_: SimpleNamespace(id="elev-task-1"), - ) - - first = elevation_service.queue_dataset_analysis(session, dataset_id=dataset.id, actor=actor) - assert first.queued is True - assert first.task_id == "elev-task-1" - assert first.dataset.analysis_status == "queued" - assert first.dataset.terrain_status == "not_supported" - - second = elevation_service.queue_dataset_analysis(session, dataset_id=dataset.id, actor=actor) - assert second.queued is False - assert second.task_id == "elev-task-1" - assert second.detail == "分析任务已存在,无需重复提交。" - finally: - session.close() - - -def test_queue_dataset_terrain_build_reuses_existing_running_task(monkeypatch) -> None: - testing_session = _build_sessionmaker(ElevationDataset.__table__) - session: Session = testing_session() - try: - dataset = ElevationDataset( - code="ELEV-TERRAIN-001", - name="样例地形集", - file_format="tif", - mount_code="default", - dataset_dir="/elevation/datasets/ELEV-TERRAIN-001", - file_path="/elevation/datasets/ELEV-TERRAIN-001/data.tif", - status="active", - usage_status="idle", - terrain_status="pending", - ) - session.add(dataset) - session.commit() - - actor = User( - id="actor-1", - email="actor@example.com", - username="actor", - password_hash="hashed", - status="active", - ) - - monkeypatch.setattr( - elevation_service, - "_dispatch_elevation_dataset_terrain_task", - lambda **_: SimpleNamespace(id="terrain-task-1"), - ) - - first = elevation_service.queue_dataset_terrain_build(session, dataset_id=dataset.id, actor=actor) - assert first.queued is True - assert first.task_id == "terrain-task-1" - assert first.dataset.terrain_status == "pending" - - second = elevation_service.queue_dataset_terrain_build(session, dataset_id=dataset.id, actor=actor) - assert second.queued is False - assert second.task_id == "terrain-task-1" - assert second.detail == "地形瓦片任务已存在,无需重复提交。" - finally: - session.close() - - -def test_file_record_terrain_layer_and_tile_read_from_record_storage(monkeypatch) -> None: - testing_session = _build_sessionmaker(ElevationFileRecord.__table__) - session: Session = testing_session() - try: - record = ElevationFileRecord( - id="abcdef1234567890abcdef1234567890", - file_name="terrain.tif", - file_path="/elevation/records/ab/cd/terrain.tif", - file_format="tif", - file_size=128, - mount_code="default", - status="active", - terrain_status="ready", - terrain_root_path="/elevation/terrain/records/ab/cd/abcdef1234567890abcdef1234567890", - terrain_url_template="/api/v1/elevation/records/abcdef1234567890abcdef1234567890/terrain/{z}/{x}/{y}.terrain?v=1.0.0", - terrain_min_zoom=0, - terrain_max_zoom=0, - ) - session.add(record) - session.commit() - - driver = _MemoryStorageDriver() - layer_payload = b'{"tilejson":"2.1.0","format":"heightmap-1.0","version":"1.0.0","scheme":"tms","projection":"EPSG:4326","tiles":["{z}/{x}/{y}.terrain?v=1.0.0"],"minzoom":0,"maxzoom":0}' - driver.write_file( - "/elevation/terrain/records/ab/cd/abcdef1234567890abcdef1234567890/layer.json", - content=layer_payload, - content_type="application/json", - ) - driver.write_file( - "/elevation/terrain/records/ab/cd/abcdef1234567890abcdef1234567890/0/0/0.terrain", - content=b"tile-bytes", - content_type="application/octet-stream", - ) - - monkeypatch.setattr(elevation_service, "_require_mount", lambda *_args, **_kwargs: SimpleNamespace(code="default")) - monkeypatch.setattr(elevation_service, "_build_driver_or_400", lambda *_args, **_kwargs: driver) - - layer = elevation_service.get_file_record_terrain_layer(session, record_id=record.id) - tile = elevation_service.get_file_record_terrain_tile(session, record_id=record.id, z=0, x=0, y=0) - - assert layer.maxzoom == 0 - assert layer.tiles == ["{z}/{x}/{y}.terrain?v=1.0.0"] - assert tile == b"tile-bytes" - finally: - session.close() - - -def test_import_dataset_data_files_queue_job_and_worker_keeps_preferred_raster(monkeypatch) -> None: - testing_session = _build_sessionmaker(ElevationDataset.__table__, ElevationDataImportJob.__table__) - session: Session = testing_session() - try: - dataset = ElevationDataset( - code="ELEV-IMPORT-001", - name="批量导入样例", - file_format="csv", - mount_code="default", - dataset_dir="/elevation/datasets/ELEV-IMPORT-001", - file_path="/elevation/datasets/ELEV-IMPORT-001/dataset.csv", - status="active", - usage_status="idle", - sample_count=128, - bbox_min_lon=100.0, - bbox_max_lon=120.0, - bbox_min_lat=20.0, - bbox_max_lat=30.0, - analysis_task_id="old-task", - analysis_status="success", - terrain_status="not_supported", - ) - session.add(dataset) - session.commit() - - actor = User( - id="actor-1", - email="actor@example.com", - username="actor", - password_hash="hashed", - status="active", - ) - driver = _MemoryStorageDriver() - import_calls: list[tuple[str, str | None]] = [] - analysis_calls: list[tuple[str, str | None]] = [] - - monkeypatch.setattr(elevation_service, "_require_mount", lambda *_args, **_kwargs: SimpleNamespace(code="default")) - monkeypatch.setattr(elevation_service, "_build_driver_or_400", lambda *_args, **_kwargs: driver) - monkeypatch.setattr( - elevation_service, - "_dispatch_elevation_dataset_data_import_task", - lambda *, import_job_id, actor_user_id: import_calls.append((import_job_id, actor_user_id)) or SimpleNamespace(id="import-task-1"), - ) - monkeypatch.setattr( - elevation_service, - "_dispatch_elevation_dataset_analysis_task", - lambda *, dataset_id, actor_user_id: analysis_calls.append((dataset_id, actor_user_id)) or SimpleNamespace(id="new-task"), - ) - monkeypatch.setattr(elevation_service, "_publish_elevation_change", lambda *_args, **_kwargs: None) - - first = elevation_service.import_dataset_data_files( - session, - dataset_id=dataset.id, - files=[_build_upload("terrain.img", b"img-bytes", "application/octet-stream")], - actor=actor, - trigger_analysis=True, - ) - assert first.queued is True - assert first.job.task_id == "import-task-1" - assert first.job.status == "pending" - assert first.job.uploaded_file_count == 1 - assert first.job.analysis_task_queued is False - assert import_calls == [(first.job.id, actor.id)] - saved_pending_job = session.get(ElevationDataImportJob, first.job.id) - assert saved_pending_job is not None - assert saved_pending_job.staged_files_json[0]["filename"] == "terrain.img" - assert "content_base64" in saved_pending_job.staged_files_json[0] - - second = elevation_service.import_dataset_data_files( - session, - dataset_id=dataset.id, - files=[_build_upload("points.csv", b"lon,lat,elevation\n1,2,3\n", "text/csv")], - actor=actor, - trigger_analysis=True, - ) - assert second.queued is False - assert second.job.id == first.job.id - assert second.detail == "导入任务已存在,无需重复提交。" - - monkeypatch.setattr(elevation_service, "SessionLocal", testing_session) - elevation_service.execute_dataset_data_import_job(import_job_id=first.job.id, actor_user_id=actor.id) - - verification = testing_session() - try: - saved_dataset = verification.get(ElevationDataset, dataset.id) - saved_job = verification.get(ElevationDataImportJob, first.job.id) - assert saved_dataset is not None - assert saved_job is not None - assert saved_job.status == "success" - assert saved_job.progress_percent == 100 - assert saved_job.analysis_task_queued is True - assert saved_job.analysis_task_id == "new-task" - assert saved_job.imported_file_count == 1 - assert saved_dataset.file_path.endswith("/terrain.img") - assert saved_dataset.file_format == "img" - assert saved_dataset.analysis_status == "queued" - assert saved_dataset.analysis_task_id == "new-task" - assert saved_dataset.sample_count == 0 - assert saved_dataset.bbox_min_lon is None - assert saved_dataset.terrain_status == "pending" - assert analysis_calls == [(dataset.id, actor.id)] - assert set(driver.files) == { - "/elevation/datasets/ELEV-IMPORT-001/terrain.img", - } - finally: - verification.close() - finally: - session.close() - - -def test_wine_create_run_queues_task_and_worker_records_failure(monkeypatch) -> None: - testing_session = _build_sessionmaker(WineRun.__table__) - session: Session = testing_session() - try: - monkeypatch.setattr(wine_service, "_resolve_binary", lambda: "/usr/bin/wine") - monkeypatch.setattr( - wine_service, - "probe_wine_binary", - lambda *_args, **_kwargs: SimpleNamespace(available=True, error=None, version="wine-10.0"), - ) - monkeypatch.setattr(wine_service, "_resolve_executable", lambda _path: Path("/tmp/demo.exe")) - monkeypatch.setattr(wine_service, "_resolve_working_dir", lambda _path, _exe: Path("/tmp")) - monkeypatch.setattr( - wine_service, - "_dispatch_wine_run_task", - lambda **_: SimpleNamespace(id="wine-task-1"), - ) - - created = wine_service.create_run( - session, - payload=WineRunRequest(exe_path="demo.exe", arguments=["/silent"]), - actor_user_id="tester", - ) - - assert created.status == "pending" - assert created.task_id == "wine-task-1" - - monkeypatch.setattr(wine_service, "SessionLocal", testing_session) - monkeypatch.setattr( - wine_service.subprocess, - "run", - lambda *args, **kwargs: SimpleNamespace(returncode=9, stdout="stdout", stderr="stderr"), - ) - - wine_service.execute_run_job(run_id=created.id, actor_user_id="tester") - - saved = session.execute(select(WineRun).where(WineRun.id == created.id)).scalar_one() - session.refresh(saved) - assert saved.status == "failed" - assert saved.exit_code == 9 - assert saved.error_message == "Wine process exited with code 9" - assert saved.stdout_text == "stdout" - assert saved.stderr_text == "stderr" - finally: - session.close() diff --git a/api/tests/test_atp_engine_task.py b/api/tests/test_atp_engine_task.py deleted file mode 100644 index 7413284..0000000 --- a/api/tests/test_atp_engine_task.py +++ /dev/null @@ -1,110 +0,0 @@ -from __future__ import annotations - -from sqlalchemy import create_engine, select -from sqlalchemy.orm import Session, sessionmaker - -from app.core import database as core_database -from app.core.config import get_settings -from app.core.database import Base -from app.models.atp_model import AtpModel, AtpModelVersion, AtpSimulationRun -from app.schemas.atp_model import AtpSimulationRunRequest -from app.services import atp_model_service - - -def _build_sessionmaker(): - engine = create_engine("sqlite+pysqlite:///:memory:") - Base.metadata.create_all( - bind=engine, - tables=[ - AtpModel.__table__, - AtpModelVersion.__table__, - AtpSimulationRun.__table__, - ], - ) - return sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False) - - -def test_run_model_version_dry_run_records_worker_command(monkeypatch, tmp_path) -> None: - testing_session = _build_sessionmaker() - monkeypatch.setattr(core_database, "SessionLocal", testing_session) - monkeypatch.setattr(atp_model_service, "_publish_change", lambda *args, **kwargs: None) - monkeypatch.setattr(atp_model_service, "_resolve_storage_root", lambda: tmp_path / "storage") - monkeypatch.setattr(atp_model_service, "_resolve_engine_workdir", lambda: tmp_path / "runs") - monkeypatch.setattr( - atp_model_service, - "_resolve_wine_engine_executable", - lambda: ("/usr/bin/wine", "/tmp/tpbig.exe", None), - ) - - session: Session = testing_session() - try: - model = AtpModel( - code="ATP-DRY-001", - name="Dry Run ATP", - source_type="atp", - status="enabled", - latest_version_no=1, - active_version_no=1, - ) - session.add(model) - session.flush() - - version = AtpModelVersion( - model_id=model.id, - version_no=1, - status="released", - entry_file="case.atp", - atp_text="BEGIN ATP CASE", - content_hash="dry-hash-v1", - ) - session.add(version) - session.commit() - - result = atp_model_service.run_model_version( - session, - model_id=model.id, - payload=AtpSimulationRunRequest(version_id=version.id, dry_run=True), - actor_user_id="tester", - ) - - assert result.status == "success" - assert result.engine_command is not None - assert result.engine_command.startswith("/usr/bin/wine /tmp/tpbig.exe ") - assert result.engine_command.endswith("/case.atp") - assert result.working_dir is not None - assert result.stdout_text is not None - - saved = session.execute(select(AtpSimulationRun).where(AtpSimulationRun.id == result.id)).scalar_one() - assert saved.status == "success" - assert saved.exit_code == 0 - assert saved.error_message is None - assert "dry_run" in (saved.stdout_text or "") - finally: - session.close() - - -def test_get_engine_status_includes_legacy_asset_checks(monkeypatch, tmp_path) -> None: - allowed_root = tmp_path / "wine-root" - template_root = allowed_root / "ATP" / "templates" - template_root.mkdir(parents=True) - (template_root / "EGM").mkdir() - (allowed_root / "ATP").mkdir(exist_ok=True) - (allowed_root / "ATP" / "tpbig.exe").write_text("binary", encoding="utf-8") - (allowed_root / "ATP" / "rjtzl.exe").write_text("binary", encoding="utf-8") - - settings = get_settings() - monkeypatch.setattr(settings, "wine_allowed_root", str(allowed_root)) - monkeypatch.setattr(settings, "atp_legacy_root", str(allowed_root / "ATP")) - monkeypatch.setattr(settings, "atp_template_root", str(template_root)) - monkeypatch.setattr(settings, "atp_run_root", str(allowed_root / "runs")) - monkeypatch.setattr(settings, "atp_tpbig_executable", "ATP/tpbig.exe") - monkeypatch.setattr(settings, "atp_rjtzl_executable", "ATP/rjtzl.exe") - monkeypatch.setattr(atp_model_service, "_resolve_wine_engine_executable", lambda: ("/usr/bin/wine", "/tmp/tpbig.exe", None)) - - result = atp_model_service.get_engine_status() - - assert "legacy_root" in result.checks - assert result.checks["legacy_root"]["available"] is True - assert result.checks["tpbig_executable"]["available"] is True - assert result.checks["rjtzl_executable"]["available"] is True - assert result.checks["egm_subdir"]["available"] is True diff --git a/api/tests/test_atp_model_service.py b/api/tests/test_atp_model_service.py deleted file mode 100644 index c37431d..0000000 --- a/api/tests/test_atp_model_service.py +++ /dev/null @@ -1,66 +0,0 @@ -from __future__ import annotations - -from sqlalchemy import create_engine, select -from sqlalchemy.orm import Session, sessionmaker - -from app.core.database import Base -from app.models.atp_model import AtpModel, AtpModelVersion, AtpSimulationRun -from app.services.atp_model_service import delete_model - - -def _build_sessionmaker(): - engine = create_engine("sqlite+pysqlite:///:memory:") - Base.metadata.create_all( - bind=engine, - tables=[ - AtpModel.__table__, - AtpModelVersion.__table__, - AtpSimulationRun.__table__, - ], - ) - return sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False) - - -def test_delete_model_cascades_hidden_versions_and_runs() -> None: - testing_session = _build_sessionmaker() - session: Session = testing_session() - try: - model = AtpModel( - code="ATP-001", - name="示例模型", - source_type="atp", - status="enabled", - latest_version_no=1, - active_version_no=1, - ) - session.add(model) - session.flush() - - version = AtpModelVersion( - model_id=model.id, - version_no=1, - status="released", - atp_text="sample", - content_hash="hash-v1", - ) - session.add(version) - session.flush() - - session.add( - AtpSimulationRun( - model_id=model.id, - version_id=version.id, - status="success", - engine_mode="native", - timeout_seconds=60, - ) - ) - session.commit() - - assert delete_model(session, model.id) is True - - assert session.execute(select(AtpModel).where(AtpModel.id == model.id)).scalar_one_or_none() is None - assert session.execute(select(AtpModelVersion).where(AtpModelVersion.model_id == model.id)).scalar_one_or_none() is None - assert session.execute(select(AtpSimulationRun).where(AtpSimulationRun.model_id == model.id)).scalar_one_or_none() is None - finally: - session.close()