diff --git a/api/app/services/fl_analysis_external.py b/api/app/services/fl_analysis_external.py index 4377a09..0795cf9 100644 --- a/api/app/services/fl_analysis_external.py +++ b/api/app/services/fl_analysis_external.py @@ -8,21 +8,18 @@ from dataclasses import dataclass from pathlib import Path from typing import Any +from sqlalchemy import select from sqlalchemy.orm import Session -from ..models.atp_model import AtpModel, AtpModelVersion +from ..models.atp_asset import AtpAsset, AtpAssetRelease from ..models.base import utcnow from ..models.fl_analysis import FlAnalysisJob, FlAnalysisTowerSnapshot -from ..schemas.atp_model import AtpSimulationRunRequest -from .atp_model_service import ( +from .atp_asset_service import ( _resolve_engine_workdir, _resolve_native_engine_executable, - _resolve_target_version, _resolve_timeout, _resolve_wine_engine_executable, - _safe_entry_filename, _truncate_output, - get_model_by_id, ) PLACEHOLDER_PATTERN = re.compile(r"{{\s*([^{}]+?)\s*}}") @@ -49,11 +46,96 @@ NUMERIC_RESULT_KEYS = { } +def get_asset_by_id(db: Session, asset_id: str) -> AtpAsset | None: + """Get ATP asset by ID.""" + return db.execute(select(AtpAsset).where(AtpAsset.id == asset_id)).scalar_one_or_none() + + +def _resolve_target_release( + db: Session, + *, + asset: AtpAsset, + release_id: str | None = None, + release_no: int | None = None, +) -> AtpAssetRelease: + """Resolve target release from asset.""" + if release_id: + matched = db.execute( + select(AtpAssetRelease).where( + AtpAssetRelease.asset_id == asset.id, + AtpAssetRelease.id == release_id, + ) + ).scalar_one_or_none() + if not matched: + raise RuntimeError(f"指定的 ATP 资产版本不存在: {release_id}") + return matched + + if release_no is not None: + matched = db.execute( + select(AtpAssetRelease).where( + AtpAssetRelease.asset_id == asset.id, + AtpAssetRelease.release_no == release_no, + ) + ).scalar_one_or_none() + if not matched: + raise RuntimeError(f"指定的 ATP 资产版本不存在: release_no={release_no}") + return matched + + # Use active release + if asset.active_release_no is not None: + matched = db.execute( + select(AtpAssetRelease).where( + AtpAssetRelease.asset_id == asset.id, + AtpAssetRelease.release_no == asset.active_release_no, + ) + ).scalar_one_or_none() + if matched: + return matched + + # Fall back to latest release + matched = db.execute( + select(AtpAssetRelease) + .where(AtpAssetRelease.asset_id == asset.id) + .order_by(AtpAssetRelease.release_no.desc()) + ).first() + if matched: + return matched[0] + + raise RuntimeError(f"ATP 资产没有可用的版本: {asset.code}") + + +def _safe_entry_filename(raw_name: str | None, *, asset_code: str, release_no: int) -> str: + """Generate safe entry filename.""" + import os + import re + from pathlib import Path + + FILENAME_SANITIZE_PATTERN = re.compile(r'[<>:"/\\|?*\x00-\x1f]') + fallback = f"{asset_code}_v{release_no}.atp" + if not raw_name: + return fallback + + filename = Path(raw_name).name.strip() + if not filename: + return fallback + + cleaned = FILENAME_SANITIZE_PATTERN.sub("_", filename) + cleaned = cleaned.strip("._") + if not cleaned: + return fallback + + if len(cleaned) > 220: + stem, suffix = os.path.splitext(cleaned) + cleaned = f"{stem[:200]}{suffix[:20]}" + + return cleaned + + @dataclass(slots=True) class ResolvedExternalWaveformJob: adapter: str - model: AtpModel - version: AtpModelVersion + asset: AtpAsset + release: AtpAssetRelease timeout_seconds: int extra_args: list[str] environment: dict[str, str] @@ -81,25 +163,31 @@ def resolve_external_waveform_job( raise RuntimeError(f"Unsupported external adapter: {external_adapter}") config = dict(adapter_config_json or {}) - model_id = str(config.get("model_id") or "").strip() - if not model_id: - raise RuntimeError("外部 ATP/Wine 任务缺少 adapter_config_json.model_id") + asset_id = str(config.get("asset_id") or config.get("model_id") or "").strip() + if not asset_id: + raise RuntimeError("外部 ATP/Wine 任务缺少 adapter_config_json.asset_id") - model = get_model_by_id(db, model_id) - if model is None: - raise RuntimeError(f"指定的 ATP 模型不存在: {model_id}") + asset = get_asset_by_id(db, asset_id) + if asset is None: + raise RuntimeError(f"指定的 ATP 资产不存在: {asset_id}") - version_id = str(config.get("version_id") or "").strip() or None - version_no = _coerce_positive_int(config.get("version_no")) - version = _resolve_target_version( + release_id = str(config.get("release_id") or config.get("version_id") or "").strip() or None + release_no = _coerce_positive_int(config.get("release_no") or config.get("version_no")) + release = _resolve_target_release( db, - model=model, - payload=AtpSimulationRunRequest(version_id=version_id, version_no=version_no), + asset=asset, + release_id=release_id, + release_no=release_no, ) - if not (version.atp_text or "").strip(): - raise RuntimeError(f"ATP 模型版本缺少可执行模板: {model.code} v{version.version_no}") - manifest_contract = _read_object((version.artifact_manifest_json or {}).get("fl_analysis")) + # For AtpAssetRelease, the template content might be in manifest_json + # Check if there's an atp_text equivalent or entry_file content + manifest = release.manifest_json or {} + atp_text = manifest.get("atp_text", "") + if not atp_text.strip(): + raise RuntimeError(f"ATP 资产版本缺少可执行模板: {asset.code} v{release.release_no}") + + manifest_contract = _read_object(manifest.get("fl_analysis")) adapter_contract = { key: value for key, value in config.items() @@ -132,8 +220,8 @@ def resolve_external_waveform_job( } return ResolvedExternalWaveformJob( adapter=adapter, - model=model, - version=version, + asset=asset, + release=release, timeout_seconds=timeout_seconds, extra_args=extra_args, environment=environment, @@ -171,17 +259,21 @@ def execute_external_waveform_tower_analysis( execution_options=execution_options, baseline_result=baseline_result, ) + # Get atp_text from manifest_json + manifest = resolved_job.release.manifest_json or {} + atp_text = manifest.get("atp_text", "") + rendered_text = render_atp_template( - resolved_job.version.atp_text or "", + atp_text, context=context, parameter_bindings=resolved_job.parameter_bindings, ) run_dir = _prepare_run_directory(job=job, snapshot=snapshot, resolved_job=resolved_job) input_path = run_dir / _safe_entry_filename( - resolved_job.version.entry_file, - model_code=resolved_job.model.code, - version_no=resolved_job.version.version_no, + resolved_job.release.entry_file, + asset_code=resolved_job.asset.code, + release_no=resolved_job.release.release_no, ) input_path.write_text(rendered_text, encoding="utf-8") @@ -541,11 +633,11 @@ def _merge_external_result( external_execution = { "adapter_status": "executed", "adapter": resolved_job.adapter, - "model_id": resolved_job.model.id, - "model_code": resolved_job.model.code, - "model_name": resolved_job.model.name, - "version_id": resolved_job.version.id, - "version_no": resolved_job.version.version_no, + "asset_id": resolved_job.asset.id, + "asset_code": resolved_job.asset.code, + "asset_name": resolved_job.asset.name, + "release_id": resolved_job.release.id, + "release_no": resolved_job.release.release_no, "engine_command": engine_command, "working_dir": working_dir, "executed_at": utcnow().isoformat(),