diff --git a/api/app/services/atp_asset_service.py b/api/app/services/atp_asset_service.py index 6bcfd0e..0a0b5c5 100644 --- a/api/app/services/atp_asset_service.py +++ b/api/app/services/atp_asset_service.py @@ -350,10 +350,9 @@ def _sanitize_storage_segment(value: str, *, fallback: str) -> str: def _build_release_storage_root(asset_code: str, release_no: int, voltage_level: str, tower_type: str) -> str: - asset_segment = _sanitize_storage_segment(asset_code, fallback="asset") voltage_segment = _sanitize_storage_segment(voltage_level, fallback="unknown-voltage") tower_segment = _sanitize_storage_segment(tower_type, fallback="unknown-tower") - return normalize_virtual_path(f"{ATP_ASSET_RELEASES_ROOT}/{voltage_segment}/{tower_segment}/{asset_segment}/r{release_no}") + return normalize_virtual_path(f"{ATP_ASSET_RELEASES_ROOT}/{voltage_segment}/{tower_segment}/r{release_no}") def _write_archive_to_storage( @@ -970,6 +969,9 @@ def create_release( ) next_release_no = max_release_no + 1 + # Check for storage path conflict before preparing payload + _check_storage_path_conflict(db, payload.storage_root_path, asset_id) + prepared = _prepare_release_payload( db, storage_mount_code=payload.storage_mount_code, @@ -1030,6 +1032,29 @@ def create_release( return serialize_release_detail(saved) +def _check_storage_path_conflict(db: Session, storage_root_path: str, current_asset_id: str) -> None: + """ + Check if the storage path is already used by a different asset. + Raises HTTPException if conflict detected. + """ + existing_release = db.execute( + select(AtpAssetRelease) + .where( + AtpAssetRelease.storage_root_path == storage_root_path, + AtpAssetRelease.asset_id != current_asset_id, + ) + ).scalar_one_or_none() + + if existing_release: + conflicting_asset = get_asset_by_id(db, existing_release.asset_id) + conflict_info = f"模型 {conflicting_asset.code} ({conflicting_asset.name})" if conflicting_asset else "其他模型" + raise HTTPException( + status_code=status.HTTP_409_CONFLICT, + detail=f"存储路径冲突:该路径已被{conflict_info}的 Release #{existing_release.release_no} 占用。" + f"无法上传,可能会覆盖现有文件。请检查电压等级、塔型和版本号配置。", + ) + + def create_release_from_archive( db: Session, *, @@ -1049,6 +1074,9 @@ def create_release_from_archive( ) + 1 storage_root_path = _build_release_storage_root(asset.code, next_release_no, voltage_level, tower_type) + # Check for storage path conflict before writing + _check_storage_path_conflict(db, storage_root_path, asset_id) + mount = _resolve_mount(db, "main") driver = _build_driver_or_400(mount) _write_archive_to_storage( diff --git a/api/tests/test_atp_asset_service.py b/api/tests/test_atp_asset_service.py index de83b7f..ae7a465 100644 --- a/api/tests/test_atp_asset_service.py +++ b/api/tests/test_atp_asset_service.py @@ -140,14 +140,14 @@ def test_create_release_from_archive_extracts_zip_and_inherits_asset_dimensions( assert created.release_no == 1 assert created.release_tag == "首版" - assert created.storage_root_path == "/atp-library/220/sihuita/ATP-ASSET-UPLOAD/r1" + assert created.storage_root_path == "/atp-library/220/sihuita/r1" assert created.entry_file == "work.atp" assert created.runner_kind == "hybrid" assert created.voltage_level == "220" assert created.tower_type == "sihuita" assert created.scene_type == "raoji3" - assert (tmp_path / "vfs" / "atp-library" / "220" / "sihuita" / "ATP-ASSET-UPLOAD" / "r1" / "work.atp").exists() - assert (tmp_path / "vfs" / "atp-library" / "220" / "sihuita" / "ATP-ASSET-UPLOAD" / "r1" / "EGM" / "config.txt").exists() + assert (tmp_path / "vfs" / "atp-library" / "220" / "sihuita" / "r1" / "work.atp").exists() + assert (tmp_path / "vfs" / "atp-library" / "220" / "sihuita" / "r1" / "EGM" / "config.txt").exists() finally: session.close() @@ -177,6 +177,64 @@ def test_create_release_from_archive_requires_asset_dimensions(tmp_path) -> None session.close() +def test_create_release_from_archive_detects_storage_path_conflict(tmp_path) -> None: + testing_session = _build_sessionmaker() + session: Session = testing_session() + try: + _seed_vfs_mount(session, root_dir=tmp_path / "vfs") + + # Create first asset and upload a release + asset1 = atp_asset_service.create_asset( + session, + AtpAssetCreateRequest( + code="ATP-ASSET-001", + name="模型1", + voltage_level="220", + tower_type="sihuita", + scene_type="raoji3", + ), + actor_user_id="tester", + ) + assert asset1 is not None + + release1 = atp_asset_service.create_release_from_archive( + session, + asset_id=asset1.id, + release_tag="v1", + archive_filename="release.zip", + archive_content=_build_zip({"work.atp": b"ATP INPUT 1"}), + actor_user_id="tester", + ) + assert release1.storage_root_path == "/atp-library/220/sihuita/r1" + + # Create second asset with same voltage_level and tower_type + asset2 = atp_asset_service.create_asset( + session, + AtpAssetCreateRequest( + code="ATP-ASSET-002", + name="模型2", + voltage_level="220", + tower_type="sihuita", + scene_type="raoji3", + ), + actor_user_id="tester", + ) + assert asset2 is not None + + # Try to upload a release for asset2 - should conflict because path is same + with pytest.raises(HTTPException, match="存储路径冲突"): + atp_asset_service.create_release_from_archive( + session, + asset_id=asset2.id, + release_tag="v1", + archive_filename="release.zip", + archive_content=_build_zip({"work.atp": b"ATP INPUT 2"}), + actor_user_id="tester", + ) + finally: + session.close() + + def test_list_assets_paginates_after_filtering(tmp_path) -> None: testing_session = _build_sessionmaker() session: Session = testing_session()