[fix/feat]:[FL-82][ATP模型管理改造]

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
chengkai3
2026-06-11 23:45:57 +08:00
parent fac37ddb8d
commit 4328d9fd34
16 changed files with 640 additions and 2480 deletions
+83
View File
@@ -1,7 +1,11 @@
from __future__ import annotations
import io
from pathlib import Path
import zipfile
from fastapi import HTTPException
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
@@ -55,6 +59,14 @@ def _seed_vfs_mount(session: Session, *, root_dir: Path) -> None:
session.commit()
def _build_zip(entries: dict[str, bytes]) -> bytes:
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive:
for path, content in entries.items():
archive.writestr(path, content)
return buffer.getvalue()
def test_create_release_auto_detects_entry_file_and_manifest(tmp_path) -> None:
testing_session = _build_sessionmaker()
session: Session = testing_session()
@@ -94,6 +106,77 @@ def test_create_release_auto_detects_entry_file_and_manifest(tmp_path) -> None:
session.close()
def test_create_release_from_archive_extracts_zip_and_inherits_asset_dimensions(tmp_path) -> None:
testing_session = _build_sessionmaker()
session: Session = testing_session()
try:
_seed_vfs_mount(session, root_dir=tmp_path / "vfs")
asset = atp_asset_service.create_asset(
session,
AtpAssetCreateRequest(
code="ATP-ASSET-UPLOAD",
name="ZIP 导入模型",
voltage_level="220",
tower_type="sihuita",
scene_type="raoji3",
),
actor_user_id="tester",
)
assert asset is not None
created = atp_asset_service.create_release_from_archive(
session,
asset_id=asset.id,
release_tag="首版",
archive_filename="release.zip",
archive_content=_build_zip(
{
"work.atp": b"ATP INPUT",
"EGM/config.txt": b"egm",
}
),
actor_user_id="tester",
)
assert created.release_no == 1
assert created.release_tag == "首版"
assert created.storage_root_path == "/atp-library/releases/ATP-ASSET-UPLOAD/r1"
assert created.entry_file == "work.atp"
assert created.runner_kind == "hybrid"
assert created.voltage_level == "220"
assert created.tower_type == "sihuita"
assert created.scene_type == "raoji3"
assert (tmp_path / "vfs" / "atp-library" / "releases" / "ATP-ASSET-UPLOAD" / "r1" / "work.atp").exists()
assert (tmp_path / "vfs" / "atp-library" / "releases" / "ATP-ASSET-UPLOAD" / "r1" / "EGM" / "config.txt").exists()
finally:
session.close()
def test_create_release_from_archive_requires_asset_dimensions(tmp_path) -> None:
testing_session = _build_sessionmaker()
session: Session = testing_session()
try:
_seed_vfs_mount(session, root_dir=tmp_path / "vfs")
asset = atp_asset_service.create_asset(
session,
AtpAssetCreateRequest(code="ATP-ASSET-MISSING", name="缺维度模型"),
actor_user_id="tester",
)
assert asset is not None
with pytest.raises(HTTPException, match="电压等级、塔型和场景"):
atp_asset_service.create_release_from_archive(
session,
asset_id=asset.id,
release_tag="r1",
archive_filename="release.zip",
archive_content=_build_zip({"work.atp": b"ATP INPUT"}),
actor_user_id="tester",
)
finally:
session.close()
def test_run_release_dry_run_materializes_directory(tmp_path, monkeypatch) -> None:
testing_session = _build_sessionmaker()
monkeypatch.setattr(core_database, "SessionLocal", testing_session)