[migrate]:[FL-37][补齐防雷报告图7雷区分布图输出]

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
chengkai3
2026-06-08 05:15:45 +08:00
parent 171a4a6089
commit 3451589401
2 changed files with 328 additions and 4 deletions
+312 -3
View File
@@ -1,5 +1,8 @@
from __future__ import annotations from __future__ import annotations
import base64
import struct
import zlib
from collections import Counter from collections import Counter
from datetime import datetime from datetime import datetime
from html import escape from html import escape
@@ -219,6 +222,7 @@ def build_report_document(report_data: Mapping[str, Any]) -> tuple[str, bytes]:
risk_threshold_rows = _build_risk_threshold_reference_rows(risk_rows, fallback_voltage=line_voltage) risk_threshold_rows = _build_risk_threshold_reference_rows(risk_rows, fallback_voltage=line_voltage)
risk_result_rows = _build_risk_result_rows(risk_rows) risk_result_rows = _build_risk_result_rows(risk_rows)
lightning_map_rows = _build_lightning_map_rows(risk_rows) lightning_map_rows = _build_lightning_map_rows(risk_rows)
lightning_map_section = _render_lightning_map_section(risk_rows, lightning_map_rows)
high_risk_reason_rows = _build_high_risk_reason_rows(selected_risk_rows) high_risk_reason_rows = _build_high_risk_reason_rows(selected_risk_rows)
mitigation_table_rows = _build_mitigation_detail_rows(selected_mitigation_rows) mitigation_table_rows = _build_mitigation_detail_rows(selected_mitigation_rows)
scenario_table_rows = _build_scenario_table_rows(selected_scenario_rows) scenario_table_rows = _build_scenario_table_rows(selected_scenario_rows)
@@ -323,6 +327,26 @@ def build_report_document(report_data: Mapping[str, Any]) -> tuple[str, bytes]:
.bar-rest {{ .bar-rest {{
background: #e5edf5; background: #e5edf5;
}} }}
.figure-card {{
margin: 10px auto 6px;
width: 100%;
text-align: center;
}}
.report-image {{
display: block;
width: 100%;
max-width: 720px;
margin: 0 auto;
border: 1px solid #cad5df;
background: #fff;
}}
.figure-note {{
margin: 6px 0 14px;
text-align: center;
text-indent: 0;
color: #475569;
font-size: 10.5pt;
}}
</style> </style>
</head> </head>
<body> <body>
@@ -395,9 +419,7 @@ def build_report_document(report_data: Mapping[str, Any]) -> tuple[str, bytes]:
<p>{escape(_build_lightning_distribution_text(lightning_entries, len(risk_rows)))}</p> <p>{escape(_build_lightning_distribution_text(lightning_entries, len(risk_rows)))}</p>
{_render_table(["雷区", "等级", "范围", "杆塔数", "占比"], _lightning_distribution_rows(lightning_entries))} {_render_table(["雷区", "等级", "范围", "杆塔数", "占比"], _lightning_distribution_rows(lightning_entries))}
{_render_bar_chart("图6 线路杆塔雷区统计图", lightning_entries)} {_render_bar_chart("图6 线路杆塔雷区统计图", lightning_entries)}
<p class="muted">说明:源端此处为雷区分布图片,当前 Word 兼容 HTML 以坐标明细表替代,保留雷区分布口径。</p> {lightning_map_section}
<p class="caption">图7 线路杆塔地闪密度雷区分布图(表格替代)</p>
{_render_table(["杆塔号", "经度", "纬度", "雷区", "地闪密度Ng"], lightning_map_rows or [["-", "-", "-", "当前未提取到可用于分布展示的坐标或雷区数据", "-"]])}
<h3>2.7线路杆塔避雷线保护角</h3> <h3>2.7线路杆塔避雷线保护角</h3>
<h4>2.7.1线路杆塔避雷线保护角分类规则</h4> <h4>2.7.1线路杆塔避雷线保护角分类规则</h4>
@@ -571,6 +593,293 @@ def _render_bar_chart(caption: str, entries: Sequence[Mapping[str, Any]]) -> str
) )
def _render_lightning_map_section(
rows: Sequence[Mapping[str, Any]],
fallback_rows: Sequence[Sequence[Any]],
) -> str:
figure_html = _render_lightning_map_figure(rows)
if figure_html:
return (
'<p class="muted">说明:图7按当前报告中的杆塔经纬度与地闪密度结果渲染,颜色与表7雷区划分保持一致,灰色折线表示线路走向。</p>'
'<p class="caption">图7 线路杆塔地闪密度雷区分布图</p>'
f"{figure_html}"
)
return (
'<p class="muted">说明:当前未提取到足够的杆塔坐标或地闪密度数据,图7暂以坐标明细表替代。</p>'
'<p class="caption">图7 线路杆塔地闪密度雷区分布图(表格替代)</p>'
+ _render_table(
["杆塔号", "经度", "纬度", "雷区", "地闪密度Ng"],
fallback_rows or [["-", "-", "-", "当前未提取到可用于分布展示的坐标或雷区数据", "-"]],
)
)
def _render_lightning_map_figure(rows: Sequence[Mapping[str, Any]]) -> str:
points = _build_lightning_map_points(rows)
if not points:
return ""
encoded_png = base64.b64encode(_render_lightning_map_png(points)).decode("ascii")
skipped_count = max(len(rows) - len(points), 0)
note = f"共渲染 {len(points)} 座杆塔,颜色表示雷区等级,灰色折线表示线路走向。"
if skipped_count > 0:
note += f" 另有 {skipped_count} 座因缺少坐标或地闪密度未纳入图形。"
return (
'<div class="figure-card">'
f'<img class="report-image" src="data:image/png;base64,{encoded_png}" alt="线路杆塔地闪密度雷区分布图" />'
"</div>"
f'<p class="figure-note">{escape(note)}</p>'
)
def _build_lightning_map_points(rows: Sequence[Mapping[str, Any]]) -> list[dict[str, Any]]:
rendered: list[dict[str, Any]] = []
for row in rows:
base = _row_base(row)
longitude = _as_optional_float(base.get("longitude"))
latitude = _as_optional_float(base.get("latitude"))
density = _row_lightning_density(row)
code = _classify_lightning_zone_code(density)
if longitude is None or latitude is None or code is None:
continue
if not _is_valid_geo_point(longitude=longitude, latitude=latitude):
continue
zone = next((item for item in _LIGHTNING_ZONE_RULES if item["code"] == code), None)
rendered.append(
{
"tower_no": _display(row.get("tower_no")),
"longitude": longitude,
"latitude": latitude,
"density": density,
"zone_code": code,
"zone_label": f'{zone["code"]} {zone["zone"]}' if zone else code,
"color": str(zone["color"]) if zone else "#0F766E",
}
)
return rendered
def _render_lightning_map_png(points: Sequence[Mapping[str, Any]]) -> bytes:
width = 720
height = 420
plot_left = 54
plot_right = width - 42
plot_top = 28
plot_bottom = height - 58
plot_width = plot_right - plot_left
plot_height = plot_bottom - plot_top
pixels = bytearray(width * height * 3)
_fill_rect(pixels, width, height, 0, 0, width, height, (251, 252, 253))
_fill_rect(pixels, width, height, plot_left, plot_top, plot_width, plot_height, (245, 249, 252))
longitudes = [float(point["longitude"]) for point in points]
latitudes = [float(point["latitude"]) for point in points]
min_lon = min(longitudes)
max_lon = max(longitudes)
min_lat = min(latitudes)
max_lat = max(latitudes)
lon_span = max(max_lon - min_lon, 1e-6)
lat_span = max(max_lat - min_lat, 1e-6)
lon_pad = max(lon_span * 0.08, 0.002)
lat_pad = max(lat_span * 0.08, 0.002)
min_lon -= lon_pad
max_lon += lon_pad
min_lat -= lat_pad
max_lat += lat_pad
for step in range(6):
x = plot_left + round(step * plot_width / 5)
y = plot_top + round(step * plot_height / 5)
_draw_line(pixels, width, height, x, plot_top, x, plot_bottom, (224, 232, 240))
_draw_line(pixels, width, height, plot_left, y, plot_right, y, (224, 232, 240))
_draw_rect(pixels, width, height, plot_left, plot_top, plot_width, plot_height, (148, 163, 184))
def project(longitude: float, latitude: float) -> tuple[int, int]:
lon_ratio = (longitude - min_lon) / max(max_lon - min_lon, 1e-6)
lat_ratio = (latitude - min_lat) / max(max_lat - min_lat, 1e-6)
x = plot_left + round(lon_ratio * plot_width)
y = plot_bottom - round(lat_ratio * plot_height)
return x, y
projected = [project(float(point["longitude"]), float(point["latitude"])) for point in points]
for start, end in zip(projected, projected[1:]):
_draw_thick_line(pixels, width, height, start[0], start[1], end[0], end[1], (100, 116, 139), thickness=3)
peak_density = max((float(point["density"]) for point in points if point.get("density") is not None), default=0.0)
for point, (x, y) in zip(points, projected):
density = _as_optional_float(point.get("density")) or 0.0
radius = 5 if peak_density <= 0 else 5 + int(round((density / max(peak_density, 1e-6)) * 2))
_draw_filled_circle(pixels, width, height, x, y, radius + 2, (255, 255, 255))
_draw_filled_circle(pixels, width, height, x, y, radius, _hex_to_rgb(str(point.get("color") or "#0F766E")))
return _encode_png(width, height, pixels)
def _is_valid_geo_point(*, longitude: float, latitude: float) -> bool:
return -180.0 <= longitude <= 180.0 and -90.0 <= latitude <= 90.0
def _fill_rect(
pixels: bytearray,
width: int,
height: int,
x: int,
y: int,
rect_width: int,
rect_height: int,
color: tuple[int, int, int],
) -> None:
x_start = max(x, 0)
y_start = max(y, 0)
x_end = min(x + rect_width, width)
y_end = min(y + rect_height, height)
for row in range(y_start, y_end):
for col in range(x_start, x_end):
_set_pixel(pixels, width, height, col, row, color)
def _draw_rect(
pixels: bytearray,
width: int,
height: int,
x: int,
y: int,
rect_width: int,
rect_height: int,
color: tuple[int, int, int],
) -> None:
_draw_line(pixels, width, height, x, y, x + rect_width, y, color)
_draw_line(pixels, width, height, x, y, x, y + rect_height, color)
_draw_line(pixels, width, height, x + rect_width, y, x + rect_width, y + rect_height, color)
_draw_line(pixels, width, height, x, y + rect_height, x + rect_width, y + rect_height, color)
def _draw_thick_line(
pixels: bytearray,
width: int,
height: int,
x0: int,
y0: int,
x1: int,
y1: int,
color: tuple[int, int, int],
*,
thickness: int,
) -> None:
half = max(thickness // 2, 0)
if abs(x1 - x0) >= abs(y1 - y0):
for offset in range(-half, half + 1):
_draw_line(pixels, width, height, x0, y0 + offset, x1, y1 + offset, color)
else:
for offset in range(-half, half + 1):
_draw_line(pixels, width, height, x0 + offset, y0, x1 + offset, y1, color)
def _draw_line(
pixels: bytearray,
width: int,
height: int,
x0: int,
y0: int,
x1: int,
y1: int,
color: tuple[int, int, int],
) -> None:
dx = abs(x1 - x0)
dy = -abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx + dy
current_x = x0
current_y = y0
while True:
_set_pixel(pixels, width, height, current_x, current_y, color)
if current_x == x1 and current_y == y1:
break
doubled = err * 2
if doubled >= dy:
err += dy
current_x += sx
if doubled <= dx:
err += dx
current_y += sy
def _draw_filled_circle(
pixels: bytearray,
width: int,
height: int,
center_x: int,
center_y: int,
radius: int,
color: tuple[int, int, int],
) -> None:
if radius <= 0:
return
radius_sq = radius * radius
for offset_y in range(-radius, radius + 1):
for offset_x in range(-radius, radius + 1):
if offset_x * offset_x + offset_y * offset_y <= radius_sq:
_set_pixel(pixels, width, height, center_x + offset_x, center_y + offset_y, color)
def _set_pixel(
pixels: bytearray,
width: int,
height: int,
x: int,
y: int,
color: tuple[int, int, int],
) -> None:
if x < 0 or y < 0 or x >= width or y >= height:
return
offset = (y * width + x) * 3
pixels[offset] = color[0]
pixels[offset + 1] = color[1]
pixels[offset + 2] = color[2]
def _hex_to_rgb(value: str) -> tuple[int, int, int]:
cleaned = value.strip().lstrip("#")
if len(cleaned) != 6:
return (15, 118, 110)
try:
return (
int(cleaned[0:2], 16),
int(cleaned[2:4], 16),
int(cleaned[4:6], 16),
)
except ValueError:
return (15, 118, 110)
def _encode_png(width: int, height: int, pixels: bytearray) -> bytes:
stride = width * 3
raw = bytearray()
for row in range(height):
start = row * stride
raw.append(0)
raw.extend(pixels[start:start + stride])
def chunk(tag: bytes, payload: bytes) -> bytes:
return (
struct.pack("!I", len(payload))
+ tag
+ payload
+ struct.pack("!I", zlib.crc32(tag + payload) & 0xFFFFFFFF)
)
header = struct.pack("!IIBBBBB", width, height, 8, 2, 0, 0, 0)
return (
b"\x89PNG\r\n\x1a\n"
+ chunk(b"IHDR", header)
+ chunk(b"IDAT", zlib.compress(bytes(raw), level=9))
+ chunk(b"IEND", b"")
)
def _read_rows(payload: Mapping[str, Any], key: str) -> list[Mapping[str, Any]]: def _read_rows(payload: Mapping[str, Any], key: str) -> list[Mapping[str, Any]]:
value = payload.get(key) value = payload.get(key)
if not isinstance(value, Sequence) or isinstance(value, (str, bytes, bytearray)): if not isinstance(value, Sequence) or isinstance(value, (str, bytes, bytearray)):
+16 -1
View File
@@ -254,7 +254,9 @@ def test_build_report_document_renders_word_compatible_html() -> None:
assert "示例线路-报告" in filename assert "示例线路-报告" in filename
assert "示例线路" in html assert "示例线路" in html
assert "2.3线路杆塔高度" in html assert "2.3线路杆塔高度" in html
assert "图7 线路杆塔地闪密度雷区分布图(表格替代)" in html assert "图7 线路杆塔地闪密度雷区分布图" in html
assert "图7 线路杆塔地闪密度雷区分布图(表格替代)" not in html
assert 'src="data:image/png;base64,' in html
assert "图8 线路杆塔避雷线保护角信息统计图" in html assert "图8 线路杆塔避雷线保护角信息统计图" in html
assert "表10 输电线路雷击风险等级划分规则" in html assert "表10 输电线路雷击风险等级划分规则" in html
assert "表13 高风险杆塔差异化防雷措施" in html assert "表13 高风险杆塔差异化防雷措施" in html
@@ -263,3 +265,16 @@ def test_build_report_document_renders_word_compatible_html() -> None:
assert "表14 采取措施后的计算结果表" in html assert "表14 采取措施后的计算结果表" in html
assert "反击耐雷水平(kA)" in html assert "反击耐雷水平(kA)" in html
assert "001" in html assert "001" in html
def test_build_report_document_falls_back_to_table_when_map_points_missing() -> None:
report_data = _sample_report_data()
for row in report_data["risk_rows"]:
row["base_tower_json"]["longitude"] = None
row["base_tower_json"]["latitude"] = None
_, content = build_report_document(report_data)
html = content.decode("utf-8")
assert "图7 线路杆塔地闪密度雷区分布图(表格替代)" in html
assert 'src="data:image/png;base64,' not in html