diff --git a/api/app/services/fl_analysis_report.py b/api/app/services/fl_analysis_report.py index aa45111..c5e5d16 100644 --- a/api/app/services/fl_analysis_report.py +++ b/api/app/services/fl_analysis_report.py @@ -1,5 +1,8 @@ from __future__ import annotations +import base64 +import struct +import zlib from collections import Counter from datetime import datetime from html import escape @@ -219,6 +222,7 @@ def build_report_document(report_data: Mapping[str, Any]) -> tuple[str, bytes]: risk_threshold_rows = _build_risk_threshold_reference_rows(risk_rows, fallback_voltage=line_voltage) risk_result_rows = _build_risk_result_rows(risk_rows) lightning_map_rows = _build_lightning_map_rows(risk_rows) + lightning_map_section = _render_lightning_map_section(risk_rows, lightning_map_rows) high_risk_reason_rows = _build_high_risk_reason_rows(selected_risk_rows) mitigation_table_rows = _build_mitigation_detail_rows(selected_mitigation_rows) scenario_table_rows = _build_scenario_table_rows(selected_scenario_rows) @@ -323,6 +327,26 @@ def build_report_document(report_data: Mapping[str, Any]) -> tuple[str, bytes]: .bar-rest {{ background: #e5edf5; }} + .figure-card {{ + margin: 10px auto 6px; + width: 100%; + text-align: center; + }} + .report-image {{ + display: block; + width: 100%; + max-width: 720px; + margin: 0 auto; + border: 1px solid #cad5df; + background: #fff; + }} + .figure-note {{ + margin: 6px 0 14px; + text-align: center; + text-indent: 0; + color: #475569; + font-size: 10.5pt; + }}
@@ -395,9 +419,7 @@ def build_report_document(report_data: Mapping[str, Any]) -> tuple[str, bytes]:{escape(_build_lightning_distribution_text(lightning_entries, len(risk_rows)))}
{_render_table(["雷区", "等级", "范围", "杆塔数", "占比"], _lightning_distribution_rows(lightning_entries))} {_render_bar_chart("图6 线路杆塔雷区统计图", lightning_entries)} -说明:源端此处为雷区分布图片,当前 Word 兼容 HTML 以坐标明细表替代,保留雷区分布口径。
-图7 线路杆塔地闪密度雷区分布图(表格替代)
- {_render_table(["杆塔号", "经度", "纬度", "雷区", "地闪密度Ng"], lightning_map_rows or [["-", "-", "-", "当前未提取到可用于分布展示的坐标或雷区数据", "-"]])} + {lightning_map_section}说明:图7按当前报告中的杆塔经纬度与地闪密度结果渲染,颜色与表7雷区划分保持一致,灰色折线表示线路走向。
' + '图7 线路杆塔地闪密度雷区分布图
' + f"{figure_html}" + ) + return ( + '说明:当前未提取到足够的杆塔坐标或地闪密度数据,图7暂以坐标明细表替代。
' + '图7 线路杆塔地闪密度雷区分布图(表格替代)
' + + _render_table( + ["杆塔号", "经度", "纬度", "雷区", "地闪密度Ng"], + fallback_rows or [["-", "-", "-", "当前未提取到可用于分布展示的坐标或雷区数据", "-"]], + ) + ) + + +def _render_lightning_map_figure(rows: Sequence[Mapping[str, Any]]) -> str: + points = _build_lightning_map_points(rows) + if not points: + return "" + + encoded_png = base64.b64encode(_render_lightning_map_png(points)).decode("ascii") + skipped_count = max(len(rows) - len(points), 0) + note = f"共渲染 {len(points)} 座杆塔,颜色表示雷区等级,灰色折线表示线路走向。" + if skipped_count > 0: + note += f" 另有 {skipped_count} 座因缺少坐标或地闪密度未纳入图形。" + return ( + '{escape(note)}
' + ) + + +def _build_lightning_map_points(rows: Sequence[Mapping[str, Any]]) -> list[dict[str, Any]]: + rendered: list[dict[str, Any]] = [] + for row in rows: + base = _row_base(row) + longitude = _as_optional_float(base.get("longitude")) + latitude = _as_optional_float(base.get("latitude")) + density = _row_lightning_density(row) + code = _classify_lightning_zone_code(density) + if longitude is None or latitude is None or code is None: + continue + if not _is_valid_geo_point(longitude=longitude, latitude=latitude): + continue + zone = next((item for item in _LIGHTNING_ZONE_RULES if item["code"] == code), None) + rendered.append( + { + "tower_no": _display(row.get("tower_no")), + "longitude": longitude, + "latitude": latitude, + "density": density, + "zone_code": code, + "zone_label": f'{zone["code"]} {zone["zone"]}' if zone else code, + "color": str(zone["color"]) if zone else "#0F766E", + } + ) + return rendered + + +def _render_lightning_map_png(points: Sequence[Mapping[str, Any]]) -> bytes: + width = 720 + height = 420 + plot_left = 54 + plot_right = width - 42 + plot_top = 28 + plot_bottom = height - 58 + plot_width = plot_right - plot_left + plot_height = plot_bottom - plot_top + + pixels = bytearray(width * height * 3) + _fill_rect(pixels, width, height, 0, 0, width, height, (251, 252, 253)) + _fill_rect(pixels, width, height, plot_left, plot_top, plot_width, plot_height, (245, 249, 252)) + + longitudes = [float(point["longitude"]) for point in points] + latitudes = [float(point["latitude"]) for point in points] + min_lon = min(longitudes) + max_lon = max(longitudes) + min_lat = min(latitudes) + max_lat = max(latitudes) + lon_span = max(max_lon - min_lon, 1e-6) + lat_span = max(max_lat - min_lat, 1e-6) + lon_pad = max(lon_span * 0.08, 0.002) + lat_pad = max(lat_span * 0.08, 0.002) + min_lon -= lon_pad + max_lon += lon_pad + min_lat -= lat_pad + max_lat += lat_pad + + for step in range(6): + x = plot_left + round(step * plot_width / 5) + y = plot_top + round(step * plot_height / 5) + _draw_line(pixels, width, height, x, plot_top, x, plot_bottom, (224, 232, 240)) + _draw_line(pixels, width, height, plot_left, y, plot_right, y, (224, 232, 240)) + + _draw_rect(pixels, width, height, plot_left, plot_top, plot_width, plot_height, (148, 163, 184)) + + def project(longitude: float, latitude: float) -> tuple[int, int]: + lon_ratio = (longitude - min_lon) / max(max_lon - min_lon, 1e-6) + lat_ratio = (latitude - min_lat) / max(max_lat - min_lat, 1e-6) + x = plot_left + round(lon_ratio * plot_width) + y = plot_bottom - round(lat_ratio * plot_height) + return x, y + + projected = [project(float(point["longitude"]), float(point["latitude"])) for point in points] + for start, end in zip(projected, projected[1:]): + _draw_thick_line(pixels, width, height, start[0], start[1], end[0], end[1], (100, 116, 139), thickness=3) + + peak_density = max((float(point["density"]) for point in points if point.get("density") is not None), default=0.0) + for point, (x, y) in zip(points, projected): + density = _as_optional_float(point.get("density")) or 0.0 + radius = 5 if peak_density <= 0 else 5 + int(round((density / max(peak_density, 1e-6)) * 2)) + _draw_filled_circle(pixels, width, height, x, y, radius + 2, (255, 255, 255)) + _draw_filled_circle(pixels, width, height, x, y, radius, _hex_to_rgb(str(point.get("color") or "#0F766E"))) + + return _encode_png(width, height, pixels) + + +def _is_valid_geo_point(*, longitude: float, latitude: float) -> bool: + return -180.0 <= longitude <= 180.0 and -90.0 <= latitude <= 90.0 + + +def _fill_rect( + pixels: bytearray, + width: int, + height: int, + x: int, + y: int, + rect_width: int, + rect_height: int, + color: tuple[int, int, int], +) -> None: + x_start = max(x, 0) + y_start = max(y, 0) + x_end = min(x + rect_width, width) + y_end = min(y + rect_height, height) + for row in range(y_start, y_end): + for col in range(x_start, x_end): + _set_pixel(pixels, width, height, col, row, color) + + +def _draw_rect( + pixels: bytearray, + width: int, + height: int, + x: int, + y: int, + rect_width: int, + rect_height: int, + color: tuple[int, int, int], +) -> None: + _draw_line(pixels, width, height, x, y, x + rect_width, y, color) + _draw_line(pixels, width, height, x, y, x, y + rect_height, color) + _draw_line(pixels, width, height, x + rect_width, y, x + rect_width, y + rect_height, color) + _draw_line(pixels, width, height, x, y + rect_height, x + rect_width, y + rect_height, color) + + +def _draw_thick_line( + pixels: bytearray, + width: int, + height: int, + x0: int, + y0: int, + x1: int, + y1: int, + color: tuple[int, int, int], + *, + thickness: int, +) -> None: + half = max(thickness // 2, 0) + if abs(x1 - x0) >= abs(y1 - y0): + for offset in range(-half, half + 1): + _draw_line(pixels, width, height, x0, y0 + offset, x1, y1 + offset, color) + else: + for offset in range(-half, half + 1): + _draw_line(pixels, width, height, x0 + offset, y0, x1 + offset, y1, color) + + +def _draw_line( + pixels: bytearray, + width: int, + height: int, + x0: int, + y0: int, + x1: int, + y1: int, + color: tuple[int, int, int], +) -> None: + dx = abs(x1 - x0) + dy = -abs(y1 - y0) + sx = 1 if x0 < x1 else -1 + sy = 1 if y0 < y1 else -1 + err = dx + dy + current_x = x0 + current_y = y0 + while True: + _set_pixel(pixels, width, height, current_x, current_y, color) + if current_x == x1 and current_y == y1: + break + doubled = err * 2 + if doubled >= dy: + err += dy + current_x += sx + if doubled <= dx: + err += dx + current_y += sy + + +def _draw_filled_circle( + pixels: bytearray, + width: int, + height: int, + center_x: int, + center_y: int, + radius: int, + color: tuple[int, int, int], +) -> None: + if radius <= 0: + return + radius_sq = radius * radius + for offset_y in range(-radius, radius + 1): + for offset_x in range(-radius, radius + 1): + if offset_x * offset_x + offset_y * offset_y <= radius_sq: + _set_pixel(pixels, width, height, center_x + offset_x, center_y + offset_y, color) + + +def _set_pixel( + pixels: bytearray, + width: int, + height: int, + x: int, + y: int, + color: tuple[int, int, int], +) -> None: + if x < 0 or y < 0 or x >= width or y >= height: + return + offset = (y * width + x) * 3 + pixels[offset] = color[0] + pixels[offset + 1] = color[1] + pixels[offset + 2] = color[2] + + +def _hex_to_rgb(value: str) -> tuple[int, int, int]: + cleaned = value.strip().lstrip("#") + if len(cleaned) != 6: + return (15, 118, 110) + try: + return ( + int(cleaned[0:2], 16), + int(cleaned[2:4], 16), + int(cleaned[4:6], 16), + ) + except ValueError: + return (15, 118, 110) + + +def _encode_png(width: int, height: int, pixels: bytearray) -> bytes: + stride = width * 3 + raw = bytearray() + for row in range(height): + start = row * stride + raw.append(0) + raw.extend(pixels[start:start + stride]) + + def chunk(tag: bytes, payload: bytes) -> bytes: + return ( + struct.pack("!I", len(payload)) + + tag + + payload + + struct.pack("!I", zlib.crc32(tag + payload) & 0xFFFFFFFF) + ) + + header = struct.pack("!IIBBBBB", width, height, 8, 2, 0, 0, 0) + return ( + b"\x89PNG\r\n\x1a\n" + + chunk(b"IHDR", header) + + chunk(b"IDAT", zlib.compress(bytes(raw), level=9)) + + chunk(b"IEND", b"") + ) + + def _read_rows(payload: Mapping[str, Any], key: str) -> list[Mapping[str, Any]]: value = payload.get(key) if not isinstance(value, Sequence) or isinstance(value, (str, bytes, bytearray)): diff --git a/api/tests/test_fl_analysis_report.py b/api/tests/test_fl_analysis_report.py index 39549ea..4576b55 100644 --- a/api/tests/test_fl_analysis_report.py +++ b/api/tests/test_fl_analysis_report.py @@ -254,7 +254,9 @@ def test_build_report_document_renders_word_compatible_html() -> None: assert "示例线路-报告" in filename assert "示例线路" in html assert "2.3线路杆塔高度" in html - assert "图7 线路杆塔地闪密度雷区分布图(表格替代)" in html + assert "图7 线路杆塔地闪密度雷区分布图" in html + assert "图7 线路杆塔地闪密度雷区分布图(表格替代)" not in html + assert 'src="data:image/png;base64,' in html assert "图8 线路杆塔避雷线保护角信息统计图" in html assert "表10 输电线路雷击风险等级划分规则" in html assert "表13 高风险杆塔差异化防雷措施" in html @@ -263,3 +265,16 @@ def test_build_report_document_renders_word_compatible_html() -> None: assert "表14 采取措施后的计算结果表" in html assert "反击耐雷水平(kA)" in html assert "001" in html + + +def test_build_report_document_falls_back_to_table_when_map_points_missing() -> None: + report_data = _sample_report_data() + for row in report_data["risk_rows"]: + row["base_tower_json"]["longitude"] = None + row["base_tower_json"]["latitude"] = None + + _, content = build_report_document(report_data) + html = content.decode("utf-8") + + assert "图7 线路杆塔地闪密度雷区分布图(表格替代)" in html + assert 'src="data:image/png;base64,' not in html