[migrate]:[FL-21][防雷分析普通/同跳计算接入ATP执行链路]

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
chengkai3
2026-06-07 19:30:47 +08:00
parent 90aba88da0
commit e21472bbd9
5 changed files with 1116 additions and 9 deletions
+4 -1
View File
@@ -89,7 +89,10 @@ class FlAnalysisJobCreateRequest(BaseModel):
job_name: str | None = Field(default=None, min_length=1, max_length=255)
job_type: FlAnalysisJobType = "normal"
external_adapter: FlAnalysisAdapter = "placeholder"
adapter_config_json: dict[str, Any] = Field(default_factory=dict)
adapter_config_json: dict[str, Any] = Field(
default_factory=dict,
description="normal/tongtiao 任务在 external_adapter=atp/wine 时可传 model_id、version_id/version_no、result_file、parameter_bindings 等 ATP 执行配置。",
)
execution_options_json: dict[str, Any] = Field(
default_factory=dict,
description="normal/tongtiao 任务可传波形、闪络判据以及波头/波尾扫描参数;mitigation 任务可传 source_job_id、selected_tower_ids、non_construction。",
+610
View File
@@ -0,0 +1,610 @@
from __future__ import annotations
import json
import os
import re
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from sqlalchemy.orm import Session
from ..models.atp_model import AtpModel, AtpModelVersion
from ..models.base import utcnow
from ..models.fl_analysis import FlAnalysisJob, FlAnalysisTowerSnapshot
from ..schemas.atp_model import AtpSimulationRunRequest
from .atp_model_service import (
_resolve_engine_workdir,
_resolve_native_engine_executable,
_resolve_target_version,
_resolve_timeout,
_resolve_wine_engine_executable,
_safe_entry_filename,
_truncate_output,
get_model_by_id,
)
PLACEHOLDER_PATTERN = re.compile(r"{{\s*([^{}]+?)\s*}}")
DEFAULT_JSON_MARKERS = ("FL_ANALYSIS_RESULT_BEGIN", "FL_ANALYSIS_RESULT_END")
LEGACY_RESULT_KEY_MAP = {
"反击耐雷水平": "counterstrike_withstand_ka",
"反击跳闸率": "counterstrike_trip_rate",
"绕击耐雷水平": "shielding_withstand_ka",
"绕击跳闸率": "shielding_trip_rate",
"雷击风险等级": "risk_grade",
"风险等级": "risk_grade",
"单相闪络相": "flashover_phase",
"闪络相": "flashover_phase",
"主导相组": "dominant_phase_set",
"同跳跳闸率": "counterstrike_trip_rate",
}
NUMERIC_RESULT_KEYS = {
"counterstrike_withstand_ka",
"counterstrike_trip_rate",
"shielding_withstand_ka",
"shielding_trip_rate",
"score",
"risk_grade",
}
@dataclass(slots=True)
class ResolvedExternalWaveformJob:
adapter: str
model: AtpModel
version: AtpModelVersion
timeout_seconds: int
extra_args: list[str]
environment: dict[str, str]
contract: dict[str, Any]
parameter_bindings: dict[str, Any]
@dataclass(slots=True)
class ExternalTowerExecutionResult:
result_json: dict[str, Any]
engine_command: str
working_dir: str
stdout_text: str | None
stderr_text: str | None
def resolve_external_waveform_job(
db: Session,
*,
external_adapter: str,
adapter_config_json: dict[str, Any],
) -> ResolvedExternalWaveformJob:
adapter = str(external_adapter or "").strip().lower()
if adapter not in {"atp", "wine"}:
raise RuntimeError(f"Unsupported external adapter: {external_adapter}")
config = dict(adapter_config_json or {})
model_id = str(config.get("model_id") or "").strip()
if not model_id:
raise RuntimeError("外部 ATP/Wine 任务缺少 adapter_config_json.model_id")
model = get_model_by_id(db, model_id)
if model is None:
raise RuntimeError(f"指定的 ATP 模型不存在: {model_id}")
version_id = str(config.get("version_id") or "").strip() or None
version_no = _coerce_positive_int(config.get("version_no"))
version = _resolve_target_version(
db,
model=model,
payload=AtpSimulationRunRequest(version_id=version_id, version_no=version_no),
)
if not (version.atp_text or "").strip():
raise RuntimeError(f"ATP 模型版本缺少可执行模板: {model.code} v{version.version_no}")
manifest_contract = _read_object((version.artifact_manifest_json or {}).get("fl_analysis"))
adapter_contract = {
key: value
for key, value in config.items()
if key
in {
"result_file",
"result_format",
"result_labels",
"result_json_pointer",
"result_key_map",
"stdout_json_markers",
}
}
parameter_bindings = {
**_read_object(manifest_contract.get("parameter_bindings")),
**_read_object(config.get("parameter_bindings")),
}
contract = {
**manifest_contract,
**adapter_contract,
}
contract["parameter_bindings"] = parameter_bindings
timeout_seconds = _resolve_timeout(_coerce_positive_int(config.get("timeout_seconds")))
extra_args = [str(item).strip() for item in list(config.get("extra_args") or []) if str(item).strip()]
environment = {
str(key): str(value)
for key, value in _read_object(config.get("environment")).items()
if str(key).strip()
}
return ResolvedExternalWaveformJob(
adapter=adapter,
model=model,
version=version,
timeout_seconds=timeout_seconds,
extra_args=extra_args,
environment=environment,
contract=contract,
parameter_bindings=parameter_bindings,
)
def render_atp_template(
template: str,
*,
context: dict[str, Any],
parameter_bindings: dict[str, Any] | None = None,
) -> str:
bindings = parameter_bindings or {}
def replace_placeholder(match: re.Match[str]) -> str:
expression = match.group(1).strip()
return _render_placeholder(expression, context=context, parameter_bindings=bindings)
return PLACEHOLDER_PATTERN.sub(replace_placeholder, template)
def execute_external_waveform_tower_analysis(
resolved_job: ResolvedExternalWaveformJob,
*,
job: FlAnalysisJob,
snapshot: FlAnalysisTowerSnapshot,
execution_options: dict[str, Any],
baseline_result: dict[str, Any],
) -> ExternalTowerExecutionResult:
context = _build_template_context(
job=job,
snapshot=snapshot,
execution_options=execution_options,
baseline_result=baseline_result,
)
rendered_text = render_atp_template(
resolved_job.version.atp_text or "",
context=context,
parameter_bindings=resolved_job.parameter_bindings,
)
run_dir = _prepare_run_directory(job=job, snapshot=snapshot, resolved_job=resolved_job)
input_path = run_dir / _safe_entry_filename(
resolved_job.version.entry_file,
model_code=resolved_job.model.code,
version_no=resolved_job.version.version_no,
)
input_path.write_text(rendered_text, encoding="utf-8")
command = _build_command(
adapter=resolved_job.adapter,
input_path=input_path,
extra_args=resolved_job.extra_args,
)
env = os.environ.copy()
env.update(resolved_job.environment)
try:
result = subprocess.run(
command,
cwd=str(run_dir),
env=env,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=resolved_job.timeout_seconds,
check=False,
)
except subprocess.TimeoutExpired as exc:
raise RuntimeError(
f"{snapshot.tower_no} 外部 ATP/Wine 执行超时({resolved_job.timeout_seconds}s"
) from exc
except OSError as exc:
raise RuntimeError(f"{snapshot.tower_no} 外部 ATP/Wine 启动失败: {exc}") from exc
stdout_text = _truncate_output(result.stdout)
stderr_text = _truncate_output(result.stderr)
if result.returncode != 0:
raise RuntimeError(
f"{snapshot.tower_no} 外部 ATP/Wine 执行失败,退出码 {result.returncode}"
)
parsed_payload = _read_external_result_payload(
contract=resolved_job.contract,
stdout_text=result.stdout,
run_dir=run_dir,
)
merged_result = _merge_external_result(
baseline_result=baseline_result,
external_payload=parsed_payload,
resolved_job=resolved_job,
engine_command=" ".join(command),
working_dir=str(run_dir),
)
return ExternalTowerExecutionResult(
result_json=merged_result,
engine_command=" ".join(command),
working_dir=str(run_dir),
stdout_text=stdout_text,
stderr_text=stderr_text,
)
def _build_template_context(
*,
job: FlAnalysisJob,
snapshot: FlAnalysisTowerSnapshot,
execution_options: dict[str, Any],
baseline_result: dict[str, Any],
) -> dict[str, Any]:
return {
"job": {
"id": job.id,
"job_name": job.job_name,
"job_type": job.job_type,
"external_adapter": job.external_adapter,
},
"snapshot": {
"id": snapshot.id,
"seq_no": snapshot.seq_no,
"tower_no": snapshot.tower_no,
"tower_model": snapshot.tower_model,
"tower_type": snapshot.tower_type,
"longitude": snapshot.longitude,
"latitude": snapshot.latitude,
"altitude_m": snapshot.altitude_m,
"terrain": snapshot.terrain,
},
"base_tower": snapshot.base_tower_json or {},
"profile": snapshot.profile_json or {},
"execution_options": execution_options,
"workflow": baseline_result.get("workflow") or {},
"baseline_result": baseline_result,
}
def _prepare_run_directory(
*,
job: FlAnalysisJob,
snapshot: FlAnalysisTowerSnapshot,
resolved_job: ResolvedExternalWaveformJob,
) -> Path:
base_dir = _resolve_engine_workdir() / "fl-analysis" / job.id / snapshot.id
base_dir.mkdir(parents=True, exist_ok=True)
return base_dir
def _build_command(*, adapter: str, input_path: Path, extra_args: list[str]) -> list[str]:
if adapter == "wine":
wine_binary, engine_path, error = _resolve_wine_engine_executable()
if error or not wine_binary or not engine_path:
raise RuntimeError(error or "Wine ATP engine unavailable")
return [wine_binary, engine_path, str(input_path), *extra_args]
engine_path, error = _resolve_native_engine_executable()
if error or not engine_path:
raise RuntimeError(error or "Native ATP engine unavailable")
return [engine_path, str(input_path), *extra_args]
def _render_placeholder(
expression: str,
*,
context: dict[str, Any],
parameter_bindings: dict[str, Any],
) -> str:
if expression in parameter_bindings:
return _render_bound_placeholder(parameter_bindings[expression], context=context, name=expression)
path, spec, width, default, required = _parse_inline_expression(expression)
value = _read_path(context, path)
return _stringify_value(
value,
format_spec=spec,
width=width,
default=default,
required=required,
name=expression,
)
def _render_bound_placeholder(binding: Any, *, context: dict[str, Any], name: str) -> str:
if isinstance(binding, str):
path, spec, width, default, required = _parse_inline_expression(binding)
value = _read_path(context, path)
return _stringify_value(
value,
format_spec=spec,
width=width,
default=default,
required=required,
name=name,
)
mapping = _read_object(binding)
path = str(mapping.get("path") or mapping.get("source") or "").strip()
if not path:
raise RuntimeError(f"ATP 模板绑定缺少 path/source: {name}")
value = _read_path(context, path)
return _stringify_value(
value,
format_spec=str(mapping.get("format") or "").strip() or None,
width=_coerce_positive_int(mapping.get("width")),
default=mapping.get("default"),
required=bool(mapping.get("required", "default" not in mapping)),
name=name,
)
def _parse_inline_expression(expression: str) -> tuple[str, str | None, int | None, Any, bool]:
parts = [part.strip() for part in expression.split("|")]
path = parts[0]
spec = parts[1] if len(parts) > 1 and parts[1] else None
width = _coerce_positive_int(parts[2]) if len(parts) > 2 and parts[2] else None
if len(parts) > 3:
return path, spec, width, parts[3], False
return path, spec, width, None, True
def _stringify_value(
value: Any,
*,
format_spec: str | None,
width: int | None,
default: Any,
required: bool,
name: str,
) -> str:
if value is None:
if required:
raise RuntimeError(f"ATP 模板占位符缺少取值: {name}")
text = "" if default is None else str(default)
return text.rjust(width) if width else text
text: str
if format_spec:
text = _format_value(value, format_spec)
elif isinstance(value, bool):
text = "1" if value else "0"
else:
text = str(value)
return text.rjust(width) if width else text
def _format_value(value: Any, format_spec: str) -> str:
numeric_value = float(value)
normalized_spec = format_spec.strip()
if normalized_spec.startswith("F"):
precision = abs(int(normalized_spec[1:] or "0"))
text = f"{numeric_value:.{precision}f}"
if precision > 0:
text = text.rstrip("0").rstrip(".")
elif "." not in text:
text = f"{text}."
return text
if normalized_spec.startswith("E"):
precision = abs(int(normalized_spec[1:] or "0"))
return f"{numeric_value:.{precision}E}"
return format(value, normalized_spec)
def _read_external_result_payload(
*,
contract: dict[str, Any],
stdout_text: str,
run_dir: Path,
) -> dict[str, Any]:
result_file = str(contract.get("result_file") or "").strip()
source_text: str
if result_file:
result_path = (run_dir / result_file).resolve(strict=False)
if not result_path.is_relative_to(run_dir.resolve(strict=False)):
raise RuntimeError(f"外部结果文件越界: {result_file}")
if not result_path.exists():
raise RuntimeError(f"外部 ATP/Wine 未生成结果文件: {result_file}")
source_text = result_path.read_text(encoding="utf-8", errors="replace")
else:
source_text = stdout_text
payload = _parse_external_payload(source_text, contract)
pointer = str(contract.get("result_json_pointer") or "").strip()
if pointer:
pointed = _read_path({"payload": payload}, f"payload.{pointer}")
if not isinstance(pointed, dict):
raise RuntimeError(f"外部结果指针无效: {pointer}")
payload = pointed
if not isinstance(payload, dict):
raise RuntimeError("外部 ATP/Wine 结果不是 JSON 对象")
return _normalize_external_result_payload(payload, _read_object(contract.get("result_key_map")))
def _parse_external_payload(source_text: str, contract: dict[str, Any]) -> dict[str, Any]:
labels = [str(item).strip() for item in list(contract.get("result_labels") or []) if str(item).strip()]
if labels:
values = [item.strip() for item in re.split(r"[|\r\n]+", source_text) if item.strip()]
return {
label: _coerce_scalar(values[index]) if index < len(values) else None
for index, label in enumerate(labels)
}
result_format = str(contract.get("result_format") or "").strip().lower()
if result_format in {"key_value", "kv", "text"}:
return _parse_key_value_text(source_text)
markers = contract.get("stdout_json_markers")
if isinstance(markers, list) and len(markers) == 2:
candidate = _extract_marked_json(source_text, str(markers[0]), str(markers[1]))
if candidate is not None:
return json.loads(candidate)
candidate = source_text.strip()
if candidate.startswith("{") and candidate.endswith("}"):
return json.loads(candidate)
marked_json = _extract_marked_json(source_text, *DEFAULT_JSON_MARKERS)
if marked_json is not None:
return json.loads(marked_json)
raise RuntimeError("外部 ATP/Wine 结果无法解析为 JSON")
def _parse_key_value_text(source_text: str) -> dict[str, Any]:
result: dict[str, Any] = {}
for line in source_text.splitlines():
text = line.strip()
if not text:
continue
if "=" in text:
key, value = text.split("=", 1)
elif ":" in text:
key, value = text.split(":", 1)
else:
continue
result[key.strip()] = _coerce_scalar(value.strip())
return result
def _extract_marked_json(source_text: str, start_marker: str, end_marker: str) -> str | None:
start_index = source_text.find(start_marker)
if start_index < 0:
return None
start_index += len(start_marker)
end_index = source_text.find(end_marker, start_index)
if end_index < 0:
return None
return source_text[start_index:end_index].strip()
def _normalize_external_result_payload(
payload: dict[str, Any],
result_key_map: dict[str, Any],
) -> dict[str, Any]:
normalized: dict[str, Any] = {}
for raw_key, raw_value in payload.items():
key = str(result_key_map.get(raw_key) or raw_key).strip()
key = LEGACY_RESULT_KEY_MAP.get(key, key)
value = raw_value
if key in NUMERIC_RESULT_KEYS:
value = _coerce_scalar(raw_value)
normalized[key] = value
risk_level = normalized.get("risk_level")
if isinstance(risk_level, (int, float)) and "risk_grade" not in normalized:
normalized["risk_grade"] = int(risk_level)
normalized["risk_level"] = _risk_level_from_grade(int(risk_level))
if "risk_level" not in normalized and "risk_grade" in normalized:
grade = _coerce_positive_int(normalized.get("risk_grade")) or 1
normalized["risk_grade"] = grade
normalized["risk_level"] = _risk_level_from_grade(grade)
return normalized
def _merge_external_result(
*,
baseline_result: dict[str, Any],
external_payload: dict[str, Any],
resolved_job: ResolvedExternalWaveformJob,
engine_command: str,
working_dir: str,
) -> dict[str, Any]:
merged = dict(baseline_result)
workflow = {
**_read_object(baseline_result.get("workflow")),
**_read_object(external_payload.get("workflow")),
}
if workflow:
merged["workflow"] = workflow
for key, value in external_payload.items():
if key == "workflow" and isinstance(value, dict):
continue
merged[key] = value
if merged.get("selected_case") is None and baseline_result.get("selected_case") is not None:
merged["selected_case"] = baseline_result.get("selected_case")
if merged.get("summary_text") is None and baseline_result.get("summary_text") is not None:
merged["summary_text"] = baseline_result.get("summary_text")
if merged.get("cause_analysis") is None and baseline_result.get("cause_analysis") is not None:
merged["cause_analysis"] = baseline_result.get("cause_analysis")
if merged.get("mitigation_recommendation") is None and baseline_result.get("mitigation_recommendation") is not None:
merged["mitigation_recommendation"] = baseline_result.get("mitigation_recommendation")
external_execution = {
"adapter_status": "executed",
"adapter": resolved_job.adapter,
"model_id": resolved_job.model.id,
"model_code": resolved_job.model.code,
"model_name": resolved_job.model.name,
"version_id": resolved_job.version.id,
"version_no": resolved_job.version.version_no,
"engine_command": engine_command,
"working_dir": working_dir,
"executed_at": utcnow().isoformat(),
}
merged["external_execution"] = external_execution
merged["external_result_json"] = external_payload
return merged
def _read_path(context: dict[str, Any], path: str) -> Any:
current: Any = context
for segment in [item for item in path.split(".") if item]:
if isinstance(current, dict):
current = current.get(segment)
continue
if isinstance(current, list) and segment.isdigit():
index = int(segment)
if 0 <= index < len(current):
current = current[index]
continue
return None
return current
def _read_object(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return {str(key): item for key, item in value.items()}
return {}
def _coerce_positive_int(value: Any) -> int | None:
try:
parsed = int(value)
except (TypeError, ValueError):
return None
return parsed if parsed > 0 else None
def _coerce_scalar(value: Any) -> Any:
if isinstance(value, (int, float, bool)) or value is None:
return value
text = str(value).strip()
if not text:
return text
lowered = text.lower()
if lowered in {"true", "false"}:
return lowered == "true"
try:
if any(char in text for char in {".", "e", "E"}):
return float(text)
return int(text)
except ValueError:
return text
def _risk_level_from_grade(grade: int) -> str:
if grade >= 3:
return "high"
if grade == 2:
return "medium"
return "low"
+84 -3
View File
@@ -27,6 +27,8 @@ from ..schemas.fl_analysis import (
FlAnalysisTowerResultListResponse,
FlAnalysisTowerResultSummary,
)
from .atp_model_service import _truncate_output
from .fl_analysis_external import execute_external_waveform_tower_analysis, resolve_external_waveform_job
from .fl_analysis_report import build_report_document, build_report_summary_payload
from .fl_analysis_rules import (
grade_mitigation_snapshot_payload,
@@ -180,6 +182,21 @@ def create_job(
)
or 0
)
if payload.job_type in {"normal", "tongtiao"}:
if payload.external_adapter in {"atp", "wine"}:
try:
resolve_external_waveform_job(
db,
external_adapter=payload.external_adapter,
adapter_config_json=payload.adapter_config_json or {},
)
except RuntimeError as exc:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
elif payload.external_adapter != "placeholder":
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="普通计算和同跳计算仅支持 placeholder/atp/wine 适配器",
)
if total_tower_count <= 0:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="当前线路没有可分析的杆塔数据")
@@ -362,6 +379,20 @@ def execute_job(job_id: str) -> None:
result_count = 0
summary = _new_result_summary()
tower_map = {tower.id: tower for tower in towers}
external_job = None
stdout_chunks: list[str] = []
stderr_chunks: list[str] = []
if job.job_type in {"normal", "tongtiao"} and job.external_adapter in {"atp", "wine"}:
external_job = resolve_external_waveform_job(
db,
external_adapter=job.external_adapter,
adapter_config_json=job.adapter_config_json or {},
)
summary["external_model_id"] = external_job.model.id
summary["external_model_code"] = external_job.model.code
summary["external_model_name"] = external_job.model.name
summary["external_version_id"] = external_job.version.id
summary["external_version_no"] = external_job.version.version_no
for snapshot in snapshots:
payload = {
"base_tower_json": snapshot.base_tower_json or {},
@@ -376,9 +407,43 @@ def execute_job(job_id: str) -> None:
non_construction=bool(execution_options.get("non_construction")),
)
elif job.job_type == "normal":
graded = grade_normal_snapshot_payload(payload, execution_options=execution_options)
baseline_result = grade_normal_snapshot_payload(payload, execution_options=execution_options)
if external_job is not None:
execution = execute_external_waveform_tower_analysis(
external_job,
job=job,
snapshot=snapshot,
execution_options=execution_options,
baseline_result=baseline_result,
)
graded = execution.result_json
run.engine_command = run.engine_command or execution.engine_command
run.working_dir = run.working_dir or execution.working_dir
if execution.stdout_text:
stdout_chunks.append(f"[{snapshot.tower_no}] {execution.stdout_text}")
if execution.stderr_text:
stderr_chunks.append(f"[{snapshot.tower_no}] {execution.stderr_text}")
else:
graded = baseline_result
elif job.job_type == "tongtiao":
graded = grade_tongtiao_snapshot_payload(payload, execution_options=execution_options)
baseline_result = grade_tongtiao_snapshot_payload(payload, execution_options=execution_options)
if external_job is not None:
execution = execute_external_waveform_tower_analysis(
external_job,
job=job,
snapshot=snapshot,
execution_options=execution_options,
baseline_result=baseline_result,
)
graded = execution.result_json
run.engine_command = run.engine_command or execution.engine_command
run.working_dir = run.working_dir or execution.working_dir
if execution.stdout_text:
stdout_chunks.append(f"[{snapshot.tower_no}] {execution.stdout_text}")
if execution.stderr_text:
stderr_chunks.append(f"[{snapshot.tower_no}] {execution.stderr_text}")
else:
graded = baseline_result
else:
graded = grade_snapshot_payload(payload)
db.add(
@@ -415,6 +480,12 @@ def execute_job(job_id: str) -> None:
summary["non_construction"] = bool(execution_options.get("non_construction"))
elif job.job_type in {"normal", "tongtiao"}:
summary["workflow"] = _workflow_summary_from_execution_options(execution_options)
if external_job is not None:
summary["external_engine_adapter"] = job.external_adapter
if stdout_chunks:
run.stdout_text = _truncate_output("\n\n".join(stdout_chunks))
if stderr_chunks:
run.stderr_text = _truncate_output("\n\n".join(stderr_chunks))
db.commit()
_finish_rule_based_run(
@@ -423,6 +494,7 @@ def execute_job(job_id: str) -> None:
run_id=run.id,
started_perf=started_perf,
summary=summary,
adapter_status="executed" if external_job is not None else "computed",
)
except Exception as exc:
@@ -448,6 +520,7 @@ def _finish_rule_based_run(
run_id: str,
started_perf: float,
summary: dict[str, Any],
adapter_status: str = "computed",
) -> None:
job = get_job_by_id(db, job_id)
run = db.execute(select(FlAnalysisRun).where(FlAnalysisRun.id == run_id)).scalar_one_or_none()
@@ -467,7 +540,7 @@ def _finish_rule_based_run(
job.error_message = None
job.result_summary_json = {
**summary,
"adapter_status": "computed",
"adapter_status": adapter_status,
"external_adapter": job.external_adapter,
}
job.finished_at = now
@@ -1236,6 +1309,14 @@ def _as_int(value: Any) -> int | None:
return None
def _as_float(value: Any) -> float | None:
try:
parsed = float(value)
except (TypeError, ValueError):
return None
return parsed
def _placeholder_message_for_adapter(adapter: str) -> str:
if adapter == "wine":
return "Wine 外部程序适配器已预留,真实执行链路尚未接入"
+203
View File
@@ -0,0 +1,203 @@
from __future__ import annotations
from types import SimpleNamespace
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, sessionmaker
from app.core.database import Base
from app.models.atp_model import AtpModel, AtpModelVersion, AtpSimulationRun
from app.models.fl_analysis import FlAnalysisJob, FlAnalysisRun, FlAnalysisTowerResult, FlAnalysisTowerSnapshot
from app.models.line import Line
from app.models.line_tower import LineTower
from app.models.tower_profile import TowerProfile
from app.schemas.fl_analysis import FlAnalysisJobCreateRequest
from app.services import fl_analysis_external, fl_analysis_service
def _build_sessionmaker():
engine = create_engine("sqlite+pysqlite:///:memory:")
Base.metadata.create_all(
bind=engine,
tables=[
Line.__table__,
LineTower.__table__,
TowerProfile.__table__,
FlAnalysisJob.__table__,
FlAnalysisRun.__table__,
FlAnalysisTowerSnapshot.__table__,
FlAnalysisTowerResult.__table__,
AtpModel.__table__,
AtpModelVersion.__table__,
AtpSimulationRun.__table__,
],
)
return sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False)
def test_render_atp_template_supports_binding_formats_and_defaults() -> None:
rendered = fl_analysis_external.render_atp_template(
"R={{GROUND_RES}}\nHEAD={{profile.current_head_time_us|F1|6|2.6}}\nNAME={{snapshot.tower_no}}",
context={
"snapshot": {"tower_no": "N-01"},
"base_tower": {"ground_resistance_ohm": 12.34},
"profile": {},
},
parameter_bindings={
"GROUND_RES": {
"path": "base_tower.ground_resistance_ohm",
"format": "F1",
"width": 6,
}
},
)
assert "R= 12.3" in rendered
assert "HEAD= 2.6" in rendered
assert "NAME=N-01" in rendered
def test_execute_job_runs_external_adapter_and_backfills_results(monkeypatch, tmp_path) -> None:
testing_session = _build_sessionmaker()
session: Session = testing_session()
try:
monkeypatch.setattr(fl_analysis_service, "SessionLocal", testing_session)
monkeypatch.setattr(fl_analysis_service, "_publish_change", lambda *args, **kwargs: None)
monkeypatch.setattr(fl_analysis_external, "_resolve_engine_workdir", lambda: tmp_path)
monkeypatch.setattr(fl_analysis_external, "_resolve_native_engine_executable", lambda: ("/bin/sh", None))
line = Line(code="L-001", name="示例线路", voltage_kv=220, lightning_param_json={})
session.add(line)
session.flush()
tower = LineTower(
line_id=line.id,
seq_no=1,
tower_no="N1",
tower_model="220-TEST-ZX",
tower_type="直线",
altitude_m=1680.0,
ground_resistance_ohm=12.0,
lightning_density=3.2,
span_large_m=260.0,
slope_1=3.0,
slope_2=1.5,
circuit_geometry_json={
"I": {
"phase_spacing_m": {"upper": 9.0, "middle": 4.5, "lower": 8.5},
"phase_height_m": {"upper": 29.0, "middle": 31.0, "lower": 25.0},
},
"lightning_wire": {
"left_mid_distance_m": 9.0,
"right_mid_distance_m": 9.0,
"height_m": 41.0,
},
"insulator_length_mm": 4200.0,
},
lightning_result_json={},
)
session.add(tower)
session.flush()
session.add(
TowerProfile(
tower_id=tower.id,
structure_kind="直线",
arrester_a="",
arrester_b="",
arrester_c="",
shield_wire_height_m=41.0,
insulator_length_m=4200.0,
current_type="Heidler",
current_head_time_us=2.6,
current_tail_time_us=50.0,
)
)
model = AtpModel(code="fl-normal-model", name="普通计算ATP模型", source_type="atp", status="enabled")
session.add(model)
session.flush()
session.add(
AtpModelVersion(
model_id=model.id,
version_no=1,
status="released",
entry_file="runner.sh",
artifact_manifest_json={
"fl_analysis": {
"result_file": "result.json",
"parameter_bindings": {
"GROUND_RES": {
"path": "base_tower.ground_resistance_ohm",
"format": "F1",
}
},
}
},
atp_text="""#!/bin/sh
cat > result.json <<'JSON'
{"counterstrike_withstand_ka": {{GROUND_RES}}, "risk_grade": 3, "score": 91, "summary_text": "ATP执行完成"}
JSON
""",
content_hash="normal-v1",
)
)
session.commit()
created = fl_analysis_service.create_job(
session,
FlAnalysisJobCreateRequest(
line_id=line.id,
job_name="普通计算-ATP",
job_type="normal",
external_adapter="atp",
adapter_config_json={"model_id": model.id},
execution_options_json={
"current_waveform": "double_slope",
"flashover_method": "intersection",
"altitude_correction": "formula1",
"induced_voltage_formula": "formula2",
"head_time_min_us": 2.4,
"head_time_max_us": 2.6,
"head_time_step_us": 0.2,
"tail_time_min_us": 45.0,
"tail_time_max_us": 50.0,
"tail_time_step_us": 5.0,
},
),
actor=SimpleNamespace(id="tester"),
)
session.close()
fl_analysis_service.execute_job(created.job.id)
verify_session: Session = testing_session()
try:
saved_job = fl_analysis_service.get_job_by_id(verify_session, created.job.id)
assert saved_job is not None
assert saved_job.status == "success"
assert saved_job.result_summary_json["adapter_status"] == "executed"
assert saved_job.result_summary_json["external_model_code"] == "fl-normal-model"
assert saved_job.result_summary_json["external_version_no"] == 1
result_row = verify_session.execute(
select(FlAnalysisTowerResult).where(FlAnalysisTowerResult.job_id == created.job.id)
).scalar_one()
assert result_row.risk_level == "high"
assert result_row.summary_text == "ATP执行完成"
assert result_row.result_json["counterstrike_withstand_ka"] == 12.0
assert result_row.result_json["external_execution"]["adapter"] == "atp"
assert result_row.result_json["external_execution"]["model_code"] == "fl-normal-model"
saved_run = verify_session.execute(
select(FlAnalysisRun).where(FlAnalysisRun.job_id == created.job.id)
).scalar_one()
assert saved_run.status == "success"
assert saved_run.runner_kind == "atp"
assert saved_run.engine_command is not None
assert saved_run.working_dir is not None
finally:
verify_session.close()
finally:
session.close()
+215 -5
View File
@@ -26,6 +26,11 @@ import { useAuth } from "@/components/auth-provider";
import { Card } from "@/components/ui-antd";
import { readApiError } from "@/lib/api";
import type {
AtpEngineStatusResponse,
AtpModelListResponse,
AtpModelSummary,
AtpModelVersionListResponse,
AtpModelVersionSummary,
FlAnalysisJobDetail,
FlAnalysisJobListResponse,
FlAnalysisJobSummary,
@@ -39,6 +44,9 @@ type CreateJobFormValues = {
job_name: string;
line_id: string;
job_type: "normal" | "tongtiao" | "risk";
external_adapter: "placeholder" | "wine" | "atp";
atp_model_id: string;
atp_version_id: string;
current_waveform: "heidler" | "double_slope" | "double_exponential";
flashover_method: "guideline" | "intersection" | "leader_development";
altitude_correction: "none" | "formula1" | "formula2";
@@ -138,6 +146,9 @@ const CREATE_JOB_DEFAULTS: CreateJobFormValues = {
job_name: "",
line_id: "",
job_type: "normal",
external_adapter: "placeholder",
atp_model_id: "",
atp_version_id: "",
current_waveform: "heidler",
flashover_method: "intersection",
altitude_correction: "none",
@@ -194,6 +205,14 @@ function formatInducedVoltageFormula(value: string | null | undefined): string {
return value || "-";
}
function formatExternalAdapter(value: string | null | undefined): string {
if (value === "wine") return "Wine / ATP";
if (value === "atp") return "原生 ATP";
if (value === "placeholder") return "规则近似";
if (value === "custom") return "自定义";
return value || "-";
}
function riskColor(value: string | null | undefined): string {
if (value === "high") return "red";
if (value === "medium") return "orange";
@@ -367,9 +386,12 @@ export default function AdminFlAnalysisPage() {
const [messageApi, contextHolder] = message.useMessage();
const selectedLineId = Form.useWatch("line_id", createJobForm);
const selectedCreateJobType = Form.useWatch("job_type", createJobForm) ?? CREATE_JOB_DEFAULTS.job_type;
const selectedExternalAdapter = Form.useWatch("external_adapter", createJobForm) ?? CREATE_JOB_DEFAULTS.external_adapter;
const selectedAtpModelId = Form.useWatch("atp_model_id", createJobForm) ?? "";
const canRead = hasPermission("line.read") || hasPermission("line.manage");
const canManage = hasPermission("line.manage") || hasPermission("tower.manage");
const canReadAtp = hasPermission("atp.read") || hasPermission("atp.run") || hasPermission("atp.manage");
const linesQuery = useQuery({
queryKey: ["/api/v1/lines"],
@@ -395,6 +417,43 @@ export default function AdminFlAnalysisPage() {
},
});
const engineQuery = useQuery({
queryKey: ["/api/v1/atp/models/engine/status"],
enabled: !!user && canReadAtp,
queryFn: async () => {
const response = await fetchWithAuth("/api/v1/atp/models/engine/status");
if (!response.ok) {
throw new Error(await readApiError(response));
}
return (await response.json()) as AtpEngineStatusResponse;
},
staleTime: 30_000,
});
const atpModelsQuery = useQuery({
queryKey: ["/api/v1/atp/models", "enabled"],
enabled: !!user && canReadAtp,
queryFn: async () => {
const response = await fetchWithAuth("/api/v1/atp/models?status=enabled");
if (!response.ok) {
throw new Error(await readApiError(response));
}
return (await response.json()) as AtpModelListResponse;
},
});
const atpVersionsQuery = useQuery({
queryKey: ["/api/v1/atp/models/versions", selectedAtpModelId],
enabled: !!user && canReadAtp && !!selectedAtpModelId,
queryFn: async () => {
const response = await fetchWithAuth(`/api/v1/atp/models/${selectedAtpModelId}/versions?limit=200&offset=0`);
if (!response.ok) {
throw new Error(await readApiError(response));
}
return (await response.json()) as AtpModelVersionListResponse;
},
});
const selectedJob = useMemo(() => {
if (!selectedJobId) {
return jobsQuery.data?.items[0] ?? null;
@@ -439,6 +498,12 @@ export default function AdminFlAnalysisPage() {
return rows.filter((item) => item.risk_level !== "low");
}, [selectedJob?.job_type, towerResultsQuery.data?.items]);
const atpModels = useMemo(() => atpModelsQuery.data?.items ?? [], [atpModelsQuery.data]);
const selectedAtpModel = useMemo(
() => atpModels.find((item) => item.id === selectedAtpModelId) ?? null,
[atpModels, selectedAtpModelId],
);
useEffect(() => {
const firstLine = linesQuery.data?.items[0];
if (firstLine && !createJobForm.getFieldValue("line_id")) {
@@ -446,6 +511,37 @@ export default function AdminFlAnalysisPage() {
}
}, [createJobForm, linesQuery.data?.items]);
useEffect(() => {
if (!["normal", "tongtiao"].includes(selectedCreateJobType)) {
return;
}
if (!["atp", "wine"].includes(selectedExternalAdapter)) {
return;
}
if (!atpModels.length || createJobForm.getFieldValue("atp_model_id")) {
return;
}
createJobForm.setFieldValue("atp_model_id", atpModels[0].id);
}, [atpModels, createJobForm, selectedCreateJobType, selectedExternalAdapter]);
useEffect(() => {
if (!["atp", "wine"].includes(selectedExternalAdapter)) {
return;
}
const versions = atpVersionsQuery.data?.items ?? [];
if (!versions.length) {
return;
}
const currentVersionId = createJobForm.getFieldValue("atp_version_id");
if (currentVersionId && versions.some((item) => item.id === currentVersionId)) {
return;
}
const preferredVersion =
versions.find((item) => item.version_no === selectedAtpModel?.active_version_no)
?? versions[0];
createJobForm.setFieldValue("atp_version_id", preferredVersion.id);
}, [atpVersionsQuery.data?.items, createJobForm, selectedAtpModel?.active_version_no, selectedExternalAdapter]);
async function invalidateFlAnalysisQueries(): Promise<void> {
await queryClient.invalidateQueries({
predicate: (query) =>
@@ -506,7 +602,9 @@ export default function AdminFlAnalysisPage() {
line_id: values.line_id,
job_name: values.job_name.trim() || null,
job_type: values.job_type,
external_adapter: "placeholder",
external_adapter: values.job_type === "normal" || values.job_type === "tongtiao"
? values.external_adapter
: "placeholder",
};
if (values.job_type === "normal" || values.job_type === "tongtiao") {
payload.execution_options_json = {
@@ -521,6 +619,12 @@ export default function AdminFlAnalysisPage() {
tail_time_max_us: values.tail_time_max_us,
tail_time_step_us: values.tail_time_step_us,
};
if (values.external_adapter !== "placeholder") {
payload.adapter_config_json = {
model_id: values.atp_model_id,
version_id: values.atp_version_id || undefined,
};
}
}
return payload;
}
@@ -960,6 +1064,30 @@ export default function AdminFlAnalysisPage() {
const selectedLine = useMemo(() => {
return linesQuery.data?.items.find((item) => item.id === selectedLineId) ?? null;
}, [linesQuery.data?.items, selectedLineId]);
const externalAdapterActive = selectedExternalAdapter === "atp" || selectedExternalAdapter === "wine";
const engineMode = engineQuery.data?.mode;
const adapterOptions = [
{
value: "placeholder",
label: "规则近似(占位)",
disabled: false,
},
{
value: "atp",
label: "原生 ATP",
disabled: !canReadAtp || engineMode === "wine" || engineQuery.data?.available === false,
},
{
value: "wine",
label: "Wine / ATP",
disabled: !canReadAtp || engineMode === "native" || engineQuery.data?.available === false,
},
] as const;
const workflowExecutionMessage = externalAdapterActive
? engineQuery.data?.available
? `当前将通过 ${formatExternalAdapter(selectedExternalAdapter)} 链路执行 ATP 模型,并把外部结果回填到任务明细。`
: `当前已选择 ${formatExternalAdapter(selectedExternalAdapter)},但 ATP 引擎不可用:${engineQuery.data?.error || "请先检查执行器配置" }`
: `当前按 ${formatJobType(selectedCreateJobType)} 口径生成规则近似版结果;切换到 ATP/Wine 适配器后会走真实外部执行链路。`;
if (!initializing && !user) {
return <Alert type="warning" showIcon message="请先登录后查看防雷分析结果。" />;
@@ -981,6 +1109,10 @@ export default function AdminFlAnalysisPage() {
const selectedJobExecutionOptions = readObject(selectedJobDetail?.execution_options_json);
const selectedJobSummary = readObject(selectedJobDetail?.result_summary_json);
const selectedJobWorkflow = readObject(selectedJobDetail?.result_summary_json).workflow as WorkflowSummary | undefined;
const selectedJobExternalModelCode = readOptionalString(selectedJobSummary, "external_model_code");
const selectedJobExternalModelName = readOptionalString(selectedJobSummary, "external_model_name");
const selectedJobExternalVersionNo = readOptionalNumber(selectedJobSummary, "external_version_no");
const detailExternalExecution = readObject(detailResultObject.external_execution);
const sourceJobId = readOptionalString(selectedJobExecutionOptions, "source_job_id");
const canCreateMitigation = selectedJob?.job_type === "risk";
const canCreateReport = selectedJob?.job_type === "risk" || selectedJob?.job_type === "mitigation";
@@ -1000,7 +1132,7 @@ export default function AdminFlAnalysisPage() {
</Typography.Title>
<Typography.Text type="secondary">
/ / / / ATP/Wine Word
/ / / / ATP/Wine 退 Word
</Typography.Text>
</div>
@@ -1017,7 +1149,7 @@ export default function AdminFlAnalysisPage() {
createJobMutation.mutate(values);
}}
>
<div className="grid gap-3 md:grid-cols-4">
<div className="grid gap-3 md:grid-cols-2 xl:grid-cols-5">
<Form.Item
name="line_id"
label="线路"
@@ -1043,6 +1175,15 @@ export default function AdminFlAnalysisPage() {
]}
/>
</Form.Item>
{selectedCreateJobType === "normal" || selectedCreateJobType === "tongtiao" ? (
<Form.Item
name="external_adapter"
label="执行适配器"
rules={[{ required: true, message: "请选择执行适配器" }]}
>
<Select options={adapterOptions.map((item) => ({ ...item }))} />
</Form.Item>
) : null}
<Form.Item name="job_name" label="任务名">
<Input
placeholder={selectedLine
@@ -1060,10 +1201,60 @@ export default function AdminFlAnalysisPage() {
{selectedCreateJobType === "normal" || selectedCreateJobType === "tongtiao" ? (
<>
<Alert
type="info"
type={externalAdapterActive && engineQuery.data?.available === false ? "warning" : "info"}
showIcon
message={`当前按${formatJobType(selectedCreateJobType)}口径生成波头/波尾扫描结果,并在最不利点输出结果。`}
message={workflowExecutionMessage}
/>
{atpModelsQuery.error && externalAdapterActive ? (
<Alert
type="error"
showIcon
message={atpModelsQuery.error instanceof Error ? atpModelsQuery.error.message : "ATP 模型列表加载失败"}
/>
) : null}
{externalAdapterActive ? (
<div className="grid gap-3 md:grid-cols-2 xl:grid-cols-4">
<Form.Item
name="atp_model_id"
label="ATP模型"
rules={[{ required: true, message: "请选择 ATP 模型" }]}
>
<Select
showSearch
optionFilterProp="label"
loading={atpModelsQuery.isLoading}
placeholder="选择 ATP 模型"
options={atpModels.map((item: AtpModelSummary) => ({
value: item.id,
label: `${item.name} / ${item.code}`,
}))}
/>
</Form.Item>
<Form.Item
name="atp_version_id"
label="模型版本"
rules={[{ required: true, message: "请选择模型版本" }]}
>
<Select
showSearch
optionFilterProp="label"
loading={atpVersionsQuery.isLoading}
placeholder="选择模型版本"
options={(atpVersionsQuery.data?.items ?? []).map((item: AtpModelVersionSummary) => ({
value: item.id,
label: `v${item.version_no}${item.version_tag ? ` / ${item.version_tag}` : ""}`,
}))}
/>
</Form.Item>
<Alert
type="info"
showIcon
message={`执行模式:${engineQuery.data ? formatExternalAdapter(engineQuery.data.mode === "wine" ? "wine" : "atp") : "-"}`}
description={selectedAtpModel ? `当前模型:${selectedAtpModel.name} / ${selectedAtpModel.code}` : "从 ATP 模型管理中选择已发布模板版本。"}
className="md:col-span-2"
/>
</div>
) : null}
<div className="grid gap-3 md:grid-cols-4">
<Form.Item name="current_waveform" label="雷电流波形">
<Select
@@ -1196,6 +1387,14 @@ export default function AdminFlAnalysisPage() {
</>
) : selectedJobDetail.job_type === "normal" || selectedJobDetail.job_type === "tongtiao" ? (
<>
<Descriptions.Item label="适配器">{formatExternalAdapter(selectedJobDetail.external_adapter)}</Descriptions.Item>
<Descriptions.Item label="ATP模型">
{selectedJobExternalModelName || selectedJobExternalModelCode || "-"}
{selectedJobExternalModelName && selectedJobExternalModelCode ? ` / ${selectedJobExternalModelCode}` : ""}
</Descriptions.Item>
<Descriptions.Item label="模型版本">
{typeof selectedJobExternalVersionNo === "number" ? `v${selectedJobExternalVersionNo}` : "-"}
</Descriptions.Item>
<Descriptions.Item label="平均得分">{String(selectedJobDetail.result_summary_json?.score_average ?? "-")}</Descriptions.Item>
<Descriptions.Item label="平均扫描点">{String(readObject(selectedJobDetail.result_summary_json).scan_point_average ?? "-")}</Descriptions.Item>
<Descriptions.Item label="雷电流波形">{formatCurrentWaveform(selectedJobWorkflow?.current_waveform)}</Descriptions.Item>
@@ -1330,6 +1529,17 @@ export default function AdminFlAnalysisPage() {
<Descriptions.Item label="感应电压公式">{formatInducedVoltageFormula(detailWorkflow.induced_voltage_formula)}</Descriptions.Item>
<Descriptions.Item label="波头范围">{formatRangeSummary(detailWorkflow.head_time_range_us)}</Descriptions.Item>
<Descriptions.Item label="波尾范围">{formatRangeSummary(detailWorkflow.tail_time_range_us)}</Descriptions.Item>
<Descriptions.Item label="执行适配器">
{formatExternalAdapter(readOptionalString(detailExternalExecution, "adapter"))}
</Descriptions.Item>
<Descriptions.Item label="ATP模型">
{readOptionalString(detailExternalExecution, "model_name") || readOptionalString(detailExternalExecution, "model_code") || "-"}
</Descriptions.Item>
<Descriptions.Item label="模型版本">
{typeof readOptionalNumber(detailExternalExecution, "version_no") === "number"
? `v${readOptionalNumber(detailExternalExecution, "version_no")}`
: "-"}
</Descriptions.Item>
{selectedJob?.job_type === "tongtiao" ? (
<>
<Descriptions.Item label="主导相组">{readOptionalString(detailResultObject, "dominant_phase_set") ?? "-"}</Descriptions.Item>