[migrate]:[FL-31][add fl-analysis result export]

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
chengkai3
2026-06-07 23:28:54 +08:00
parent 0d88ac2eac
commit 0746e3ce28
4 changed files with 504 additions and 0 deletions
+13
View File
@@ -17,6 +17,7 @@ from ...schemas.fl_analysis import (
from ...schemas.tower_profile import TowerProfileDetail, TowerProfileUpsertRequest
from ...services.fl_analysis_service import (
create_job,
download_result_csv,
download_report_document,
get_job_by_id,
list_jobs,
@@ -90,6 +91,18 @@ def get_fl_analysis_job_results(
return list_tower_results(db, job_id=job_id, run_id=run_id or item.latest_run_id)
@router.get("/jobs/{job_id}/results/download")
def download_fl_analysis_results(
job_id: str,
run_id: str | None = Query(default=None),
_: CurrentUser = Depends(require_any_permission("line.read", "line.manage")),
db: Session = Depends(get_db),
) -> StreamingResponse:
filename, content = download_result_csv(db, job_id=job_id, run_id=run_id)
headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
return StreamingResponse(iter([content]), media_type="text/csv", headers=headers)
@router.get("/jobs/{job_id}/report/download")
def download_fl_analysis_report(
job_id: str,
+253
View File
@@ -1,7 +1,11 @@
from __future__ import annotations
import asyncio
import csv
import io
import json
import time
from collections.abc import Callable
from datetime import datetime
from typing import Any
@@ -297,6 +301,35 @@ def download_report_document(db: Session, *, job_id: str) -> tuple[str, bytes]:
return build_report_document(report_data)
def download_result_csv(db: Session, *, job_id: str, run_id: str | None = None) -> tuple[str, bytes]:
job = get_job_by_id(db, job_id)
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="防雷分析任务不存在")
if job.job_type == "report":
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="报告任务请使用报告下载入口")
target_run_id = run_id or job.latest_run_id
if not target_run_id:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="当前任务暂无可导出的结果")
rows = _load_result_rows(db, job_id=job.id, run_id=target_run_id)
return export_fl_analysis_results_to_csv(job, rows)
def export_fl_analysis_results_to_csv(job: FlAnalysisJob, rows: list[FlAnalysisTowerResult]) -> tuple[str, bytes]:
columns = _result_export_columns(job.job_type)
output = io.StringIO(newline="")
writer = csv.writer(output)
writer.writerow([header for header, _getter in columns])
for item in rows:
context = _build_result_export_context(item)
writer.writerow([_format_export_cell(getter(context)) for _header, getter in columns])
filename = f"fl_analysis_{job.job_type}_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
return filename, output.getvalue().encode("utf-8-sig")
def execute_job(job_id: str) -> None:
db = SessionLocal()
started_perf = time.perf_counter()
@@ -818,6 +851,226 @@ def _load_result_rows(db: Session, *, job_id: str, run_id: str) -> list[FlAnalys
)
def _build_result_export_context(item: FlAnalysisTowerResult) -> dict[str, Any]:
snapshot = item.snapshot
base = _safe_dict(snapshot.base_tower_json if snapshot is not None else None)
profile = _safe_dict(snapshot.profile_json if snapshot is not None else None)
result = _safe_dict(item.result_json)
inputs = _safe_dict(result.get("inputs"))
workflow = _safe_dict(result.get("workflow"))
selected_case = _safe_dict(result.get("selected_case"))
actions = [entry for entry in result.get("mitigation_actions") or [] if isinstance(entry, dict)]
reason_details = [entry for entry in result.get("reason_details") or [] if isinstance(entry, dict)]
return {
"item": item,
"snapshot": snapshot,
"base": base,
"profile": profile,
"result": result,
"inputs": inputs,
"workflow": workflow,
"selected_case": selected_case,
"actions": actions,
"actions_by_code": {
str(action.get("code") or "").strip(): action
for action in actions
if str(action.get("code") or "").strip()
},
"reason_details": reason_details,
}
def _result_export_columns(job_type: str) -> list[tuple[str, Callable[[dict[str, Any]], Any]]]:
columns = _common_result_export_columns()
if job_type == "mitigation":
return columns + _mitigation_result_export_columns()
if job_type == "normal":
return columns + _normal_result_export_columns() + _risk_result_export_columns()
if job_type == "tongtiao":
return columns + _normal_result_export_columns() + _tongtiao_result_export_columns() + _risk_result_export_columns()
return columns + _risk_result_export_columns()
def _common_result_export_columns() -> list[tuple[str, Callable[[dict[str, Any]], Any]]]:
return [
("序号", lambda ctx: ctx["snapshot"].seq_no if ctx["snapshot"] is not None else None),
("杆塔号", lambda ctx: ctx["snapshot"].tower_no if ctx["snapshot"] is not None else None),
("杆塔模型", lambda ctx: ctx["snapshot"].tower_model if ctx["snapshot"] is not None else None),
(
"塔型",
lambda ctx: _pick_export_value(
ctx["snapshot"].tower_type if ctx["snapshot"] is not None else None,
ctx["base"].get("tower_type"),
ctx["profile"].get("structure_kind"),
),
),
("反击方式", lambda ctx: _pick_export_value(ctx["profile"].get("stroke_mode"), ctx["inputs"].get("stroke_mode"))),
("雷电流幅值a", lambda ctx: _pick_export_number(ctx["inputs"].get("current_a"), ctx["profile"].get("current_a"))),
("雷电流幅值b", lambda ctx: _pick_export_number(ctx["inputs"].get("current_b"), ctx["profile"].get("current_b"))),
("地闪密度", lambda ctx: _pick_export_number(ctx["inputs"].get("lightning_density"), ctx["base"].get("lightning_density"))),
("波头时间/μs", lambda ctx: _pick_export_number(ctx["profile"].get("current_head_time_us"))),
("波尾时间/μs", lambda ctx: _pick_export_number(ctx["profile"].get("current_tail_time_us"))),
("地面倾角1", lambda ctx: _pick_export_number(ctx["base"].get("slope_1"))),
("地面倾角2", lambda ctx: _pick_export_number(ctx["base"].get("slope_2"))),
("海拔(m)", lambda ctx: _pick_export_number(ctx["base"].get("altitude_m"))),
("地形", lambda ctx: ctx["base"].get("terrain")),
("接地电阻(Ω)", lambda ctx: _pick_export_number(ctx["inputs"].get("ground_resistance_ohm"), ctx["base"].get("ground_resistance_ohm"))),
("绝缘子串长(mm)", lambda ctx: _pick_export_number(ctx["inputs"].get("insulator_length_mm"))),
("保护角(°)", lambda ctx: _pick_export_number(ctx["inputs"].get("protection_angle_deg"))),
]
def _normal_result_export_columns() -> list[tuple[str, Callable[[dict[str, Any]], Any]]]:
return [
("最不利波头时间(μs)", lambda ctx: _pick_export_number(ctx["selected_case"].get("head_time_us"))),
("最不利波尾时间(μs)", lambda ctx: _pick_export_number(ctx["selected_case"].get("tail_time_us"))),
("雷电流波形", lambda ctx: ctx["workflow"].get("current_waveform")),
("闪络判据", lambda ctx: ctx["workflow"].get("flashover_method")),
("海拔修正", lambda ctx: ctx["workflow"].get("altitude_correction")),
("感应电压公式", lambda ctx: ctx["workflow"].get("induced_voltage_formula")),
("扫描点数", lambda ctx: _pick_export_number(ctx["workflow"].get("scan_point_count"))),
]
def _tongtiao_result_export_columns() -> list[tuple[str, Callable[[dict[str, Any]], Any]]]:
return [
("主导相组", lambda ctx: ctx["result"].get("dominant_phase_set")),
("闪络相", lambda ctx: ctx["result"].get("flashover_phase")),
("相别结果(JSON)", lambda ctx: ctx["result"].get("phase_results") or []),
("多相结果(JSON)", lambda ctx: ctx["result"].get("multi_phase_results") or []),
]
def _risk_result_export_columns() -> list[tuple[str, Callable[[dict[str, Any]], Any]]]:
return [
("反击耐雷水平(kA)", lambda ctx: _pick_export_number(ctx["result"].get("counterstrike_withstand_ka"))),
("反击跳闸率", lambda ctx: _pick_export_number(ctx["result"].get("counterstrike_trip_rate"))),
("绕击耐雷水平(kA)", lambda ctx: _pick_export_number(ctx["result"].get("shielding_withstand_ka"))),
("绕击跳闸率", lambda ctx: _pick_export_number(ctx["result"].get("shielding_trip_rate"))),
("风险等级", lambda ctx: _format_export_risk_level(_pick_export_value(ctx["item"].risk_level, ctx["result"].get("risk_level")))),
("风险档次", lambda ctx: _pick_export_number(ctx["result"].get("risk_grade"))),
("得分", lambda ctx: _pick_export_number(ctx["result"].get("score"))),
("高风险原因", lambda ctx: ctx["result"].get("cause_analysis")),
("措施建议", lambda ctx: ctx["result"].get("mitigation_recommendation")),
("原因明细(JSON)", lambda ctx: ctx["reason_details"]),
("综合结论", lambda ctx: _pick_export_value(ctx["item"].summary_text, ctx["result"].get("summary_text"))),
]
def _mitigation_result_export_columns() -> list[tuple[str, Callable[[dict[str, Any]], Any]]]:
return [
("当前风险等级", lambda ctx: _format_export_risk_level(ctx["result"].get("current_risk_level"))),
("当前得分", lambda ctx: _pick_export_number(ctx["result"].get("current_score"))),
("预期风险等级", lambda ctx: _format_export_risk_level(_pick_export_value(ctx["result"].get("expected_risk_level"), ctx["item"].risk_level))),
("预期得分", lambda ctx: _pick_export_number(ctx["result"].get("expected_score"), ctx["result"].get("score"))),
("改造结论", lambda ctx: ctx["result"].get("recommendation_result")),
("推荐措施", lambda ctx: ctx["result"].get("mitigation_recommendation")),
("推荐动作", lambda ctx: _join_action_summaries(ctx["actions"])),
("保护角推荐值(°)", lambda ctx: _pick_export_number(_read_action_value(_read_export_action(ctx, "shielding_geometry"), "target_value"))),
("保护角原始值(°)", lambda ctx: _pick_export_number(ctx["inputs"].get("protection_angle_deg"))),
("绝缘子串长推荐值(mm)", lambda ctx: _pick_export_number(_read_action_value(_read_export_action(ctx, "insulator_upgrade"), "target_value"))),
("绝缘子串长原始值(mm)", lambda ctx: _pick_export_number(_read_action_value(_read_export_action(ctx, "insulator_upgrade"), "current_value"), ctx["inputs"].get("insulator_length_mm"))),
("接地电阻推荐值(Ω)", lambda ctx: _pick_export_number(_read_action_value(_read_export_action(ctx, "grounding_upgrade"), "target_value"))),
("接地电阻原始值(Ω)", lambda ctx: _pick_export_number(_read_action_value(_read_export_action(ctx, "grounding_upgrade"), "current_value"), ctx["inputs"].get("ground_resistance_ohm"), ctx["base"].get("ground_resistance_ohm"))),
("安装避雷器", lambda ctx: ctx["result"].get("recommendation_result")),
("避雷器推荐相别", lambda ctx: _read_action_phase_text(_read_export_action(ctx, "arrester_install"))),
("高风险原因", lambda ctx: ctx["result"].get("cause_analysis")),
("原因明细(JSON)", lambda ctx: ctx["reason_details"]),
("推荐动作明细(JSON)", lambda ctx: ctx["actions"]),
("综合结论", lambda ctx: _pick_export_value(ctx["item"].summary_text, ctx["result"].get("summary_text"))),
]
def _read_export_action(ctx: dict[str, Any], code: str) -> dict[str, Any] | None:
return ctx["actions_by_code"].get(code)
def _read_action_value(action: dict[str, Any] | None, key: str) -> Any:
if action is None:
return None
return action.get(key)
def _read_action_phase_text(action: dict[str, Any] | None) -> str | None:
if action is None:
return None
phases = action.get("phases")
if not isinstance(phases, list):
return None
values = [str(phase).strip() for phase in phases if str(phase).strip()]
return ",".join(values) if values else None
def _join_action_summaries(actions: list[dict[str, Any]]) -> str | None:
values = [str(action.get("summary") or "").strip() for action in actions if str(action.get("summary") or "").strip()]
if not values:
return None
return "".join(values)
def _format_export_cell(value: Any) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, int):
return str(value)
if isinstance(value, float):
if value.is_integer():
return str(int(value))
return f"{value:.6f}".rstrip("0").rstrip(".")
if isinstance(value, (dict, list)):
return _json_dumps_compact(value)
return str(value)
def _safe_dict(value: Any) -> dict[str, Any]:
if isinstance(value, dict):
return value
return {}
def _json_dumps_compact(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, separators=(",", ":"))
def _pick_export_value(*values: Any) -> Any:
for value in values:
if value is None:
continue
if isinstance(value, str) and not value.strip():
continue
return value
return None
def _pick_export_number(*values: Any) -> float | int | None:
for value in values:
if isinstance(value, bool):
continue
if isinstance(value, int):
return value
if isinstance(value, float):
return value
parsed = _as_float(value)
if parsed is not None:
return parsed
return None
def _format_export_risk_level(value: Any) -> str | None:
normalized = str(value or "").strip().lower()
if not normalized:
return None
if normalized == "high":
return "高风险"
if normalized == "medium":
return "中风险"
if normalized == "low":
return "低风险"
return str(value)
def _read_report_generated_at(job: FlAnalysisJob) -> datetime | None:
value = (job.result_summary_json or {}).get("generated_at")
if not isinstance(value, str) or not value.strip():
+187
View File
@@ -0,0 +1,187 @@
from __future__ import annotations
import csv
import io
from pathlib import Path
from types import SimpleNamespace
from app.services.fl_analysis_service import export_fl_analysis_results_to_csv
API_FILE = Path(__file__).resolve().parents[1] / "app" / "api" / "v1" / "fl_analysis.py"
def _decode_csv(content: bytes) -> list[list[str]]:
return list(csv.reader(io.StringIO(content.decode("utf-8-sig"))))
def _build_job(job_type: str) -> SimpleNamespace:
return SimpleNamespace(
job_type=job_type,
job_name=f"{job_type}-job",
line=SimpleNamespace(code="XL-001"),
)
def _build_row(*, result_json: dict[str, object], risk_level: str | None = None) -> SimpleNamespace:
return SimpleNamespace(
id="result-1",
risk_level=risk_level,
summary_text=result_json.get("summary_text"),
result_json=result_json,
snapshot=SimpleNamespace(
seq_no=1,
tower_no="001",
tower_model="220-TEST-ZX",
tower_type="直线",
base_tower_json={
"tower_no": "001",
"tower_type": "直线",
"slope_1": 3.2,
"slope_2": 1.8,
"altitude_m": 1680.0,
"terrain": "山地",
"ground_resistance_ohm": 22.0,
"lightning_density": 4.8,
},
profile_json={
"stroke_mode": "反击",
"current_a": 72.5,
"current_b": 1.35,
"current_head_time_us": 2.6,
"current_tail_time_us": 50.0,
},
),
)
def test_export_normal_results_csv_contains_waveform_columns() -> None:
result_json = {
"risk_level": "high",
"risk_grade": 3,
"score": 88,
"summary_text": "001普通计算结果,高风险。",
"cause_analysis": "接地电阻偏高",
"mitigation_recommendation": "优先降低接地电阻",
"counterstrike_withstand_ka": 71.2,
"counterstrike_trip_rate": 0.0123,
"shielding_withstand_ka": 64.8,
"shielding_trip_rate": 0.0081,
"reason_details": [{"code": "ground_resistance", "label": "接地电阻", "grade": 1, "triggered": True}],
"inputs": {
"current_a": 72.5,
"current_b": 1.35,
"ground_resistance_ohm": 22.0,
"lightning_density": 4.8,
"insulator_length_mm": 4200.0,
"protection_angle_deg": 24.5,
},
"workflow": {
"current_waveform": "double_slope",
"flashover_method": "intersection",
"altitude_correction": "formula1",
"induced_voltage_formula": "formula2",
"scan_point_count": 4,
},
"selected_case": {
"head_time_us": 2.4,
"tail_time_us": 45.0,
},
}
_filename, content = export_fl_analysis_results_to_csv(_build_job("normal"), [_build_row(result_json=result_json)])
rows = _decode_csv(content)
assert len(rows) == 2
header = rows[0]
values = rows[1]
assert "最不利波头时间(μs)" in header
assert "雷电流波形" in header
assert "当前风险等级" not in header
assert values[header.index("风险等级")] == "高风险"
assert values[header.index("雷电流波形")] == "double_slope"
assert values[header.index("最不利波头时间(μs)")] == "2.4"
def test_export_mitigation_results_csv_contains_recommendation_columns() -> None:
result_json = {
"risk_level": "medium",
"current_risk_level": "high",
"current_score": 92,
"expected_risk_level": "medium",
"expected_score": 63,
"summary_text": "001当前高风险,建议后预期降为中风险。",
"cause_analysis": "接地电阻偏高;保护角暴露偏大",
"mitigation_recommendation": "降低接地电阻;优化保护角;补装避雷器",
"recommendation_result": "需要安装避雷器",
"reason_details": [{"code": "ground_resistance", "label": "接地电阻", "grade": 1, "triggered": True}],
"inputs": {
"current_a": 72.5,
"ground_resistance_ohm": 35.0,
"insulator_length_mm": 4200.0,
"protection_angle_deg": 27.0,
},
"mitigation_actions": [
{
"code": "grounding_upgrade",
"label": "降低接地电阻",
"summary": "将接地电阻优化至 5.0 Ω 以内",
"current_value": 35.0,
"target_value": 5.0,
"unit": "ohm",
},
{
"code": "insulator_upgrade",
"label": "提高绝缘子串长度",
"summary": "将绝缘子串长度提高至约 5200.0 mm",
"current_value": 4200.0,
"target_value": 5200.0,
"unit": "mm",
},
{
"code": "shielding_geometry",
"label": "优化保护角",
"summary": "按非建线口径将保护角收紧至约 18.5°",
"target_value": 18.5,
"unit": "deg",
},
{
"code": "arrester_install",
"label": "补装避雷器",
"summary": "建议在 A,C 相补装或复核避雷器",
"phases": ["A", "C"],
},
],
}
_filename, content = export_fl_analysis_results_to_csv(
_build_job("mitigation"),
[_build_row(result_json=result_json, risk_level="medium")],
)
rows = _decode_csv(content)
assert len(rows) == 2
header = rows[0]
values = rows[1]
assert "绝缘子串长推荐值(mm)" in header
assert "避雷器推荐相别" in header
assert "最不利波头时间(μs)" not in header
assert values[header.index("当前风险等级")] == "高风险"
assert values[header.index("避雷器推荐相别")] == "A,C"
assert values[header.index("绝缘子串长推荐值(mm)")] == "5200"
def test_export_risk_results_csv_keeps_header_when_rows_are_empty() -> None:
_filename, content = export_fl_analysis_results_to_csv(_build_job("risk"), [])
rows = _decode_csv(content)
assert len(rows) == 1
assert "风险等级" in rows[0]
assert "综合结论" in rows[0]
def test_fl_analysis_api_exposes_results_download_route() -> None:
source = API_FILE.read_text(encoding="utf-8")
assert '@router.get("/jobs/{job_id}/results/download")' in source
assert "download_result_csv" in source
+51
View File
@@ -761,6 +761,39 @@ export default function AdminFlAnalysisPage() {
},
});
const downloadResultsMutation = useMutation({
mutationFn: async ({ jobId, runId, jobType }: { jobId: string; runId?: string | null; jobType: string }) => {
const params = new URLSearchParams();
if (runId) {
params.set("run_id", runId);
}
const suffix = params.size > 0 ? `?${params.toString()}` : "";
const response = await fetchWithAuth(`/api/v1/fl-analysis/jobs/${jobId}/results/download${suffix}`);
if (!response.ok) {
throw new Error(await readApiError(response));
}
const blob = await response.blob();
const filename = readDownloadFilename(
response.headers.get("content-disposition"),
`防雷分析-${jobType}-结果.csv`,
);
const objectUrl = URL.createObjectURL(blob);
const anchor = document.createElement("a");
anchor.href = objectUrl;
anchor.download = filename;
document.body.appendChild(anchor);
anchor.click();
anchor.remove();
URL.revokeObjectURL(objectUrl);
},
onSuccess: () => {
messageApi.success("结果导出成功");
},
onError: (error) => {
messageApi.error(error instanceof Error ? error.message : "结果导出失败");
},
});
const resultColumns = useMemo<ColumnsType<FlAnalysisTowerResultSummary>>(() => {
const columns: ColumnsType<FlAnalysisTowerResultSummary> = [
{
@@ -1126,6 +1159,9 @@ export default function AdminFlAnalysisPage() {
const reportSourceJobName = readOptionalString(selectedJobSummary, "source_job_name");
const reportMitigationJobName = readOptionalString(selectedJobSummary, "mitigation_job_name");
const reportDocumentName = readOptionalString(selectedJobSummary, "document_filename");
const canDownloadResults = selectedJob?.job_type !== "report"
&& selectedJob?.status === "success"
&& (towerResultsQuery.data?.items.length ?? 0) > 0;
return (
<>
@@ -1485,6 +1521,21 @@ export default function AdminFlAnalysisPage() {
</Button>
) : null}
{selectedJob.job_type !== "report" ? (
<Button
onClick={() => {
downloadResultsMutation.mutate({
jobId: selectedJob.id,
runId: selectedJobDetail?.latest_run_id ?? selectedJob.latest_run_id,
jobType: selectedJob.job_type,
});
}}
loading={downloadResultsMutation.isPending}
disabled={!canDownloadResults}
>
</Button>
) : null}
{selectedJob.job_type === "report" ? (
<Button
type="primary"