Files
fquiz/api/tests/test_elevation_apply_service.py
T
2026-06-10 08:31:11 +08:00

163 lines
6.3 KiB
Python

from __future__ import annotations
from types import SimpleNamespace
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.database import Base
from app.models.elevation import ElevationApplyJob, ElevationDataset
from app.models.line import Line
from app.models.line_tower import LineTower
from app.models.tower_profile import TowerProfile
from app.schemas.elevation import ElevationApplyJobCreateRequest
from app.services import elevation_service
def _build_session_factory() -> sessionmaker[Session]:
engine = create_engine(
"sqlite+pysqlite://",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(
bind=engine,
tables=[
Line.__table__,
LineTower.__table__,
TowerProfile.__table__,
ElevationDataset.__table__,
ElevationApplyJob.__table__,
],
)
return sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False)
def test_create_apply_job_dispatches_actor_user_id(monkeypatch) -> None:
testing_session = _build_session_factory()
session = testing_session()
try:
line = Line(code="L-APPLY-001", name="回填线路", voltage_kv=220, lightning_param_json={})
dataset = ElevationDataset(
code="DEM-APPLY-001",
name="高程数据集",
file_format="csv",
mount_code="default",
dataset_dir="/elevation/datasets/DEM-APPLY-001",
file_path="/elevation/datasets/DEM-APPLY-001/dataset.csv",
status="active",
usage_status="idle",
)
session.add_all([line, dataset])
session.flush()
session.add(LineTower(line_id=line.id, seq_no=1, tower_no="T1", longitude=120.0, latitude=30.0))
session.commit()
dispatched: dict[str, str | None] = {}
def _fake_dispatch(*, job_id: str, actor_user_id: str | None) -> SimpleNamespace:
dispatched["job_id"] = job_id
dispatched["actor_user_id"] = actor_user_id
return SimpleNamespace(id="celery-task-1")
monkeypatch.setattr(elevation_service, "_dispatch_elevation_apply_task", _fake_dispatch)
monkeypatch.setattr(elevation_service, "_publish_elevation_change", lambda *args, **kwargs: None)
response = elevation_service.create_apply_job(
session,
ElevationApplyJobCreateRequest(line_id=line.id, dataset_id=dataset.id, mode="overwrite_all"),
actor=SimpleNamespace(id="tester"),
)
saved_job = session.get(ElevationApplyJob, response.job.id)
assert response.queued is True
assert dispatched == {"job_id": response.job.id, "actor_user_id": "tester"}
assert saved_job is not None
assert saved_job.task_id == "celery-task-1"
assert saved_job.create_user == "tester"
assert saved_job.update_user == "tester"
finally:
session.close()
def test_execute_apply_job_uses_saved_actor_for_preparation_source(monkeypatch) -> None:
testing_session = _build_session_factory()
session = testing_session()
try:
line = Line(code="L-APPLY-002", name="高程回填线路", voltage_kv=110, lightning_param_json={})
dataset = ElevationDataset(
code="DEM-APPLY-002",
name="高程数据集",
file_format="csv",
mount_code="default",
dataset_dir="/elevation/datasets/DEM-APPLY-002",
file_path="/elevation/datasets/DEM-APPLY-002/dataset.csv",
status="active",
usage_status="idle",
)
session.add_all([line, dataset])
session.flush()
meter_to_lat = 1 / 111_320.0
session.add_all(
[
LineTower(line_id=line.id, seq_no=1, tower_no="P1", longitude=120.0, latitude=30.0 + 300 * meter_to_lat),
LineTower(line_id=line.id, seq_no=2, tower_no="P2", longitude=120.0, latitude=30.0 + 600 * meter_to_lat),
LineTower(line_id=line.id, seq_no=3, tower_no="P3", longitude=120.0, latitude=30.0 + 900 * meter_to_lat),
]
)
session.flush()
job = ElevationApplyJob(
line_id=line.id,
dataset_id=dataset.id,
mode="overwrite_all",
status="pending",
total_tower_count=3,
create_user="tester",
update_user="tester",
)
session.add(job)
session.commit()
points = [
elevation_service.ElevationSamplePoint(
lon=120.0,
lat=30.0 + distance_m * meter_to_lat,
altitude_m=100.0 + distance_m * 0.12,
)
for distance_m in range(0, 1251, 50)
]
monkeypatch.setattr(elevation_service, "SessionLocal", testing_session)
monkeypatch.setattr(elevation_service, "_load_dataset_points", lambda *_args, **_kwargs: (points, []))
monkeypatch.setattr(elevation_service, "_publish_elevation_change", lambda *args, **kwargs: None)
monkeypatch.setattr(elevation_service, "_publish_line_change", lambda *args, **kwargs: None)
monkeypatch.setattr(elevation_service, "_refresh_dataset_usage_status", lambda *args, **kwargs: None)
elevation_service.execute_apply_job(job.id)
verification_session = testing_session()
try:
saved_job = verification_session.get(ElevationApplyJob, job.id)
saved_line = verification_session.get(Line, line.id)
towers = verification_session.execute(
select(LineTower).where(LineTower.line_id == line.id).order_by(LineTower.seq_no.asc())
).scalars().all()
assert saved_job is not None
assert saved_job.status == "success"
assert saved_line is not None
assert saved_line.update_user == "tester"
assert all(tower.altitude_m is not None for tower in towers)
source = saved_line.lightning_param_json["preparation_sources"]["ground_slope"]
assert source["prepared_by_user_id"] == "tester"
assert source["dataset_id"] == dataset.id
assert source["job_id"] == job.id
finally:
verification_session.close()
finally:
session.close()