[migrate]:[FL-24][参数准备闭环与就绪校验]

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
chengkai3
2026-06-07 20:37:53 +08:00
parent e21472bbd9
commit aebf152cd4
16 changed files with 1480 additions and 89 deletions
+24
View File
@@ -11,8 +11,12 @@ from ...schemas.lightning import (
LightningCurrentEventListResponse,
LightningCurrentEventSummary,
LightningCurrentEventUpdateRequest,
LightningCurrentPreparationRequest,
LightningCurrentPreparationResponse,
LightningCurrentExceedanceResponse,
LightningCurrentImportResponse,
LightningDensityPreparationRequest,
LightningDensityPreparationResponse,
LightningCurrentSampleListResponse,
LightningDistributionImportResponse,
LightningDistributionReportResponse,
@@ -35,6 +39,8 @@ from ...services.lightning_service import (
import_lightning_event_from_file,
list_lightning_events,
list_lightning_samples,
prepare_line_lightning_current,
prepare_line_lightning_density,
serialize_lightning_event,
update_lightning_event,
)
@@ -169,6 +175,24 @@ def import_lightning_distribution_file(
)
@router.post("/prepare-current", response_model=LightningCurrentPreparationResponse)
def prepare_lightning_current_for_line(
payload: LightningCurrentPreparationRequest,
current_user: CurrentUser = Depends(require_permission("lightning.manage")),
db: Session = Depends(get_db),
) -> LightningCurrentPreparationResponse:
return prepare_line_lightning_current(db, payload, actor_user_id=current_user.user.id)
@router.post("/prepare-density", response_model=LightningDensityPreparationResponse)
def prepare_lightning_density_for_line(
payload: LightningDensityPreparationRequest,
current_user: CurrentUser = Depends(require_permission("lightning.manage")),
db: Session = Depends(get_db),
) -> LightningDensityPreparationResponse:
return prepare_line_lightning_density(db, payload, actor_user_id=current_user.user.id)
@router.get("/{event_id}", response_model=LightningCurrentEventSummary)
def get_lightning_event_detail(
event_id: str,
+40
View File
@@ -5,6 +5,8 @@ from typing import Any, Literal
from pydantic import BaseModel, Field, field_validator
from .line import LineSummary
LightningPolarity = Literal["positive", "negative", "mixed", "unknown"]
@@ -294,3 +296,41 @@ class LightningDistributionReportResponse(BaseModel):
positive_ratio: float = 0.0
ng_per_km2_year: float = 0.0
most_severe_event: LightningDistributionEventBrief | None = None
class LightningCurrentPreparationRequest(BaseModel):
line_id: str = Field(min_length=1, max_length=64)
region_id: str | None = Field(default=None, max_length=64)
is_synthetic: bool | None = None
class LightningCurrentPreparationResponse(BaseModel):
line: LineSummary
current_a: float
current_b: float
sampled_event_count: int
updated_tower_count: int
created_profile_count: int = 0
warning_count: int = 0
warnings: list[str] = Field(default_factory=list)
class LightningDensityPreparationRequest(BaseModel):
line_id: str = Field(min_length=1, max_length=64)
region_id: str | None = Field(default=None, max_length=64)
is_synthetic: bool | None = None
radius_km: float = Field(default=3.0, gt=0.05, le=50.0)
years: float | None = Field(default=None, gt=0)
class LightningDensityPreparationResponse(BaseModel):
line: LineSummary
updated_tower_count: int
missing_geo_count: int = 0
radius_km: float
data_years: float
avg_density: float | None = None
min_density: float | None = None
max_density: float | None = None
warning_count: int = 0
warnings: list[str] = Field(default_factory=list)
+1
View File
@@ -13,6 +13,7 @@ class LineSummary(BaseModel):
phase_sequence_json: dict[str, Any] = Field(default_factory=dict)
arrester_install_json: dict[str, Any] = Field(default_factory=dict)
lightning_param_json: dict[str, Any] = Field(default_factory=dict)
preparation_json: dict[str, Any] = Field(default_factory=dict)
tower_count: int = 0
create_date: datetime
create_user: str | None = None
+212 -53
View File
@@ -41,6 +41,7 @@ from ..schemas.elevation import (
ElevationDatasetUpdateRequest,
)
from .file_service import _build_driver_or_400, _require_mount, list_enabled_mounts
from .line_preparation_service import record_line_preparation_source
from .push_service import publish_topic
from .storage_driver import StorageDriverError, StorageInvalidPathError, StoragePathNotFoundError, join_virtual_path, normalize_virtual_path
@@ -934,6 +935,23 @@ def execute_apply_job(job_id: str) -> None:
job.error_message = warning_note
job.finished_at = utcnow()
job.update_date = utcnow()
line.update_date = utcnow()
line.update_user = actor_user_id
record_line_preparation_source(
line,
component="ground_slope",
payload={
"prepared_at": utcnow().isoformat(),
"prepared_by_user_id": actor_user_id,
"dataset_id": dataset.id,
"dataset_code": dataset.code,
"job_id": job.id,
"mode": job.mode,
"updated_tower_count": job.updated_tower_count,
"missing_geo_count": job.missing_geo_count,
"unmatched_count": job.unmatched_count,
},
)
db.commit()
_refresh_dataset_usage_status(db, dataset_id=job.dataset_id)
@@ -1326,12 +1344,16 @@ def _apply_points_to_line_towers(
skipped_tower_count = 0
missing_geo_count = 0
unmatched_count = 0
point_sampler = _build_points_sampler(points)
for tower in towers:
for index, tower in enumerate(towers):
if tower.longitude is None or tower.latitude is None:
missing_geo_count += 1
continue
if mode == "fill_null_only" and tower.altitude_m is not None:
skip_altitude_update = mode == "fill_null_only" and tower.altitude_m is not None
skip_slope_update = mode == "fill_null_only" and tower.slope_1 is not None and tower.slope_2 is not None
if skip_altitude_update and skip_slope_update:
skipped_tower_count += 1
continue
@@ -1349,7 +1371,23 @@ def _apply_points_to_line_towers(
unmatched_count += 1
continue
tower.altitude_m = round(altitude, 3)
changed = False
sampled_altitude = round(float(altitude), 3)
if not skip_altitude_update:
tower.altitude_m = sampled_altitude
changed = True
slope_pair = _compute_tower_slope_pair(
towers=towers,
tower_index=index,
center_altitude=sampled_altitude,
sample_altitude=point_sampler,
)
if slope_pair is not None and not skip_slope_update:
tower.slope_1 = round(slope_pair[0], 3)
tower.slope_2 = round(slope_pair[1], 3)
changed = True
raw_extra = dict(tower.raw_extra_json or {})
raw_extra["elevation"] = {
"dataset_id": dataset.id,
@@ -1361,7 +1399,10 @@ def _apply_points_to_line_towers(
}
tower.raw_extra_json = raw_extra
tower.update_date = utcnow()
updated_tower_count += 1
if changed:
updated_tower_count += 1
else:
skipped_tower_count += 1
db.commit()
return {
@@ -1397,6 +1438,146 @@ def _find_nearest_point(
return best_altitude, best_distance
def _build_points_sampler(points: list[ElevationSamplePoint]):
def sample(lon: float, lat: float) -> float | None:
match = _find_nearest_point(lon=lon, lat=lat, points=points)
if match is None:
return None
altitude, distance_m = match
if distance_m > NEAREST_MATCH_MAX_DISTANCE_M:
return None
return float(altitude)
return sample
def _build_raster_sampler(*, rasterio: Any, src: Any, src_crs: Any, band_nodata: Any):
def sample(lon: float, lat: float) -> float | None:
transformed_lon = lon
transformed_lat = lat
if src_crs and str(src_crs) not in {"EPSG:4326", "OGC:CRS84"}:
try:
xs, ys = rasterio.warp.transform(
"EPSG:4326",
src_crs,
[lon],
[lat],
)
transformed_lon = float(xs[0])
transformed_lat = float(ys[0])
except Exception:
return None
if not _is_point_within_bounds(
x=transformed_lon,
y=transformed_lat,
left=float(src.bounds.left),
right=float(src.bounds.right),
bottom=float(src.bounds.bottom),
top=float(src.bounds.top),
):
return None
try:
sampled = next(src.sample([(transformed_lon, transformed_lat)], masked=True), None)
except Exception:
sampled = None
if sampled is None or len(sampled) == 0:
return None
value = sampled[0]
if _is_masked_value(value):
return None
if band_nodata is not None and _almost_equal(float(value), float(band_nodata)):
return None
altitude = float(value)
if not _is_finite_number(altitude):
return None
return altitude
return sample
def _compute_tower_slope_pair(
*,
towers: list[LineTower],
tower_index: int,
center_altitude: float,
sample_altitude: Any,
) -> tuple[float, float] | None:
import math
if len(towers) < 2:
return None
tower = towers[tower_index]
if tower.longitude is None or tower.latitude is None:
return None
neighbor = _resolve_direction_neighbor(towers=towers, tower_index=tower_index)
if neighbor is None or neighbor.longitude is None or neighbor.latitude is None:
return None
dx_m = _longitude_distance_m(
lon_from=float(tower.longitude),
lon_to=float(neighbor.longitude),
latitude=float(tower.latitude),
)
dy_m = (float(neighbor.latitude) - float(tower.latitude)) * 111_320.0
vector_length = math.hypot(dx_m, dy_m)
if vector_length < 1e-6:
return None
unit_x = dx_m / vector_length
unit_y = dy_m / vector_length
negative_samples: list[float] = []
positive_samples: list[float] = []
for offset_m in (-200.0, -150.0, -100.0, -50.0, 50.0, 100.0, 150.0, 200.0):
sample_lon = _offset_longitude(
lon=float(tower.longitude),
latitude=float(tower.latitude),
offset_m=offset_m * unit_x,
)
sample_lat = _offset_latitude(lat=float(tower.latitude), offset_m=offset_m * unit_y)
altitude = sample_altitude(sample_lon, sample_lat)
if altitude is None:
return None
if offset_m < 0:
negative_samples.append(float(altitude))
else:
positive_samples.append(float(altitude))
slope_1 = sum(math.degrees(math.atan((center_altitude - altitude) / 50.0)) for altitude in negative_samples) / 4.0
slope_2 = sum(math.degrees(math.atan((center_altitude - altitude) / 50.0)) for altitude in positive_samples) / 4.0
return slope_1, slope_2
def _resolve_direction_neighbor(*, towers: list[LineTower], tower_index: int) -> LineTower | None:
if tower_index >= len(towers) - 1:
return towers[tower_index - 1] if tower_index > 0 else None
return towers[tower_index + 1]
def _longitude_distance_m(*, lon_from: float, lon_to: float, latitude: float) -> float:
import math
km_per_degree = max(111.32 * abs(math.cos(math.radians(latitude))), 1e-6)
return (lon_to - lon_from) * km_per_degree * 1000.0
def _offset_longitude(*, lon: float, latitude: float, offset_m: float) -> float:
import math
km_per_degree = max(111.32 * abs(math.cos(math.radians(latitude))), 1e-6)
return lon + offset_m / (km_per_degree * 1000.0)
def _offset_latitude(*, lat: float, offset_m: float) -> float:
return lat + offset_m / 111_320.0
def _haversine_distance_m(
*,
lon_a: float,
@@ -1801,68 +1982,43 @@ def _apply_raster_to_line_towers(
warnings.append(warning_text)
src_crs = src.crs
band_nodata = src.nodatavals[0] if src.nodatavals else None
raster_sampler = _build_raster_sampler(rasterio=rasterio, src=src, src_crs=src_crs, band_nodata=band_nodata)
for tower in towers:
for index, tower in enumerate(towers):
if tower.longitude is None or tower.latitude is None:
missing_geo_count += 1
continue
if mode == "fill_null_only" and tower.altitude_m is not None:
skip_altitude_update = mode == "fill_null_only" and tower.altitude_m is not None
skip_slope_update = mode == "fill_null_only" and tower.slope_1 is not None and tower.slope_2 is not None
if skip_altitude_update and skip_slope_update:
skipped_tower_count += 1
continue
lon = float(tower.longitude)
lat = float(tower.latitude)
transformed_lon = lon
transformed_lat = lat
if src_crs and str(src_crs) not in {"EPSG:4326", "OGC:CRS84"}:
try:
xs, ys = rasterio.warp.transform(
"EPSG:4326",
src_crs,
[lon],
[lat],
)
transformed_lon = float(xs[0])
transformed_lat = float(ys[0])
except Exception:
unmatched_count += 1
continue
if not _is_point_within_bounds(
x=transformed_lon,
y=transformed_lat,
left=float(src.bounds.left),
right=float(src.bounds.right),
bottom=float(src.bounds.bottom),
top=float(src.bounds.top),
):
altitude = raster_sampler(lon, lat)
if altitude is None:
unmatched_count += 1
continue
try:
sampled = next(src.sample([(transformed_lon, transformed_lat)], masked=True), None)
except Exception:
sampled = None
changed = False
sampled_altitude = round(float(altitude), 3)
if not skip_altitude_update:
tower.altitude_m = sampled_altitude
changed = True
if sampled is None or len(sampled) == 0:
unmatched_count += 1
continue
slope_pair = _compute_tower_slope_pair(
towers=towers,
tower_index=index,
center_altitude=sampled_altitude,
sample_altitude=raster_sampler,
)
if slope_pair is not None and not skip_slope_update:
tower.slope_1 = round(slope_pair[0], 3)
tower.slope_2 = round(slope_pair[1], 3)
changed = True
value = sampled[0]
if _is_masked_value(value):
unmatched_count += 1
continue
if band_nodata is not None and _almost_equal(float(value), float(band_nodata)):
unmatched_count += 1
continue
altitude = float(value)
if not _is_finite_number(altitude):
unmatched_count += 1
continue
tower.altitude_m = round(altitude, 3)
raw_extra = dict(tower.raw_extra_json or {})
raw_extra["elevation"] = {
"dataset_id": dataset.id,
@@ -1874,7 +2030,10 @@ def _apply_raster_to_line_towers(
}
tower.raw_extra_json = raw_extra
tower.update_date = utcnow()
updated_tower_count += 1
if changed:
updated_tower_count += 1
else:
skipped_tower_count += 1
db.commit()
return (
+11
View File
@@ -36,6 +36,7 @@ from .fl_analysis_rules import (
grade_snapshot_payload,
grade_tongtiao_snapshot_payload,
)
from .line_preparation_service import summarize_line_preparation
from .push_service import publish_topic
FL_ANALYSIS_TOPIC = "admin.fl-analysis"
@@ -170,6 +171,16 @@ def create_job(
if not line:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="线路不存在")
if payload.job_type in {"normal", "tongtiao", "risk"}:
preparation = summarize_line_preparation(db, line)
missing_items = [str(item) for item in preparation.get("missing_items") or []]
if missing_items:
missing_text = "".join(missing_items)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"线路参数准备未完成:{missing_text}。请先完成相关回填后再创建任务。",
)
execution_options = _normalize_execution_options(payload.job_type, payload.execution_options_json or {})
if payload.job_type == "mitigation":
total_tower_count = _validate_mitigation_options(db, line_id=line.id, execution_options=execution_options)
+360
View File
@@ -15,17 +15,23 @@ from sqlalchemy.orm import Session
from ..models.base import utcnow
from ..models.lightning_event import LightningCurrentEvent
from ..models.lightning_sample import LightningCurrentSample
from ..models.line import Line
from ..models.line_tower import LineTower
from ..models.tower_profile import TowerProfile
from ..schemas.lightning import (
LightningCurrentEventListResponse,
LightningCurrentEventSummary,
LightningCurrentEventUpdateRequest,
LightningCurrentPreparationRequest,
LightningCurrentPreparationResponse,
LightningDistributionEventBrief,
LightningDistributionGridCell,
LightningDistributionImportResponse,
LightningDistributionScatterPoint,
LightningDistributionStatsResponse,
LightningDistributionSummary,
LightningDensityPreparationRequest,
LightningDensityPreparationResponse,
LightningDistributionReportResponse,
LightningCurrentExceedancePoint,
LightningCurrentExceedanceResponse,
@@ -43,9 +49,12 @@ from ..schemas.lightning import (
LightningTowerTerrainComputeResponse,
LightningTowerTerrainMetrics,
)
from .line_preparation_service import record_line_preparation_source, summarize_line_preparation
from .line_service import serialize_line
from .push_service import publish_topic
LIGHTNING_TOPIC = "admin.lightning-currents"
POWER_LINES_TOPIC = "admin.power-lines"
TEXT_ENCODINGS = ("utf-8-sig", "utf-8", "gbk", "latin-1")
MAX_SAMPLES = 2_000_000
INSERT_CHUNK_SIZE = 5_000
@@ -1298,6 +1307,268 @@ def get_peak_exceedance_curve(
return LightningCurrentExceedanceResponse(total_events=total, thresholds=points)
def prepare_line_lightning_current(
db: Session,
payload: LightningCurrentPreparationRequest,
*,
actor_user_id: str,
) -> LightningCurrentPreparationResponse:
line = db.execute(select(Line).where(Line.id == payload.line_id)).scalar_one_or_none()
if not line:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="线路不存在")
towers = db.execute(
select(LineTower)
.where(LineTower.line_id == line.id)
.order_by(LineTower.seq_no.asc(), LineTower.id.asc())
).scalars().all()
if not towers:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="当前线路没有可回填的杆塔数据")
filters: list[Any] = [LightningCurrentEvent.peak_abs_current_ka.is_not(None)]
normalized_region = _normalize_str(payload.region_id)
if normalized_region:
filters.append(LightningCurrentEvent.region_id == normalized_region)
if payload.is_synthetic is not None:
filters.append(LightningCurrentEvent.is_synthetic == payload.is_synthetic)
peaks = [
float(item)
for item in db.execute(select(LightningCurrentEvent.peak_abs_current_ka).where(*filters)).scalars().all()
if item is not None and float(item) > 0
]
if not peaks:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未找到可用于线路雷电流拟合的幅值样本")
current_a, current_b, warnings = _fit_line_current_parameters(peaks)
now = utcnow()
tower_ids = [tower.id for tower in towers]
existing_profiles = db.execute(select(TowerProfile).where(TowerProfile.tower_id.in_(tower_ids))).scalars().all()
profile_map = {item.tower_id: item for item in existing_profiles}
created_profile_count = 0
for tower in towers:
profile = profile_map.get(tower.id)
if profile is None:
profile = TowerProfile(
tower_id=tower.id,
geometry_layers_json={},
extra_profile_json={},
create_date=now,
create_user=actor_user_id,
update_date=now,
update_user=actor_user_id,
)
db.add(profile)
profile_map[tower.id] = profile
created_profile_count += 1
extra_profile = dict(profile.extra_profile_json or {})
extra_profile["lightning_current_preparation"] = {
"line_id": line.id,
"line_code": line.code,
"current_a": current_a,
"current_b": current_b,
"sampled_event_count": len(peaks),
"region_id": normalized_region,
"is_synthetic": payload.is_synthetic,
"prepared_at": now.isoformat(),
}
profile.current_a = current_a
profile.current_b = current_b
profile.current_type = "line_prepared"
profile.extra_profile_json = extra_profile
profile.update_date = now
profile.update_user = actor_user_id
line_params = dict(line.lightning_param_json or {})
line_params["雷电流幅值a"] = current_a
line_params["雷电流幅值b"] = current_b
line.lightning_param_json = line_params
line.update_date = now
line.update_user = actor_user_id
record_line_preparation_source(
line,
component="lightning_current",
payload={
"prepared_at": now.isoformat(),
"prepared_by_user_id": actor_user_id,
"sampled_event_count": len(peaks),
"region_id": normalized_region,
"is_synthetic": payload.is_synthetic,
"current_a": current_a,
"current_b": current_b,
},
)
db.commit()
preparation_json = summarize_line_preparation(db, line, tower_count=len(towers))
_publish_line_change(
"power-lines.lightning-current.prepared",
{"action": "lightning_current_prepared", "line_id": line.id},
)
return LightningCurrentPreparationResponse(
line=serialize_line(line, tower_count=len(towers), preparation_json=preparation_json),
current_a=current_a,
current_b=current_b,
sampled_event_count=len(peaks),
updated_tower_count=len(towers),
created_profile_count=created_profile_count,
warning_count=len(warnings),
warnings=warnings,
)
def prepare_line_lightning_density(
db: Session,
payload: LightningDensityPreparationRequest,
*,
actor_user_id: str,
) -> LightningDensityPreparationResponse:
line = db.execute(select(Line).where(Line.id == payload.line_id)).scalar_one_or_none()
if not line:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="线路不存在")
towers = db.execute(
select(LineTower)
.where(LineTower.line_id == line.id)
.order_by(LineTower.seq_no.asc(), LineTower.id.asc())
).scalars().all()
if not towers:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="当前线路没有可回填的杆塔数据")
geo_towers = [tower for tower in towers if tower.longitude is not None and tower.latitude is not None]
if not geo_towers:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="当前线路缺少杆塔经纬度,无法计算地闪密度")
lat_delta = payload.radius_km / DEGREE_TO_KM
lon_deltas = [
payload.radius_km / _safe_km_per_lon(float(tower.latitude))
for tower in geo_towers
if tower.latitude is not None
]
lon_delta = max(lon_deltas) if lon_deltas else lat_delta
min_lat = min(float(tower.latitude) for tower in geo_towers) - lat_delta
max_lat = max(float(tower.latitude) for tower in geo_towers) + lat_delta
min_lon = min(float(tower.longitude) for tower in geo_towers) - lon_delta
max_lon = max(float(tower.longitude) for tower in geo_towers) + lon_delta
filters = _build_distribution_filters(
min_lat=min_lat,
max_lat=max_lat,
min_lon=min_lon,
max_lon=max_lon,
region_id=payload.region_id,
city=None,
location_tag=None,
polarity=None,
is_synthetic=payload.is_synthetic,
)
candidate_rows = db.execute(
select(
LightningCurrentEvent.longitude,
LightningCurrentEvent.latitude,
LightningCurrentEvent.event_time,
).where(*filters)
).all()
if not candidate_rows:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未找到可用于地闪密度计算的雷击分布数据")
now = utcnow()
area_km2 = math.pi * (payload.radius_km ** 2)
updated_tower_count = 0
missing_geo_count = 0
density_values: list[float] = []
all_event_times = [row.event_time for row in candidate_rows if row.event_time is not None]
source_years = payload.years if payload.years is not None else _resolve_data_years_from_timestamps(all_event_times)
warnings: list[str] = []
for tower in towers:
if tower.longitude is None or tower.latitude is None:
missing_geo_count += 1
continue
tower_times: list[datetime] = []
strike_count = 0
for row in candidate_rows:
if row.longitude is None or row.latitude is None:
continue
distance_km = _haversine_km(
float(tower.latitude),
float(tower.longitude),
float(row.latitude),
float(row.longitude),
)
if distance_km > payload.radius_km:
continue
strike_count += 1
if row.event_time is not None:
tower_times.append(row.event_time)
tower_years = payload.years if payload.years is not None else _resolve_data_years_from_timestamps(tower_times)
tower_years = max(tower_years, 1e-6)
density = strike_count / (area_km2 * tower_years) if strike_count > 0 else 0.0
tower.lightning_density = round(density, 6)
tower.update_date = now
tower.update_user = actor_user_id
raw_extra = dict(tower.raw_extra_json or {})
raw_extra["lightning_density"] = {
"line_id": line.id,
"line_code": line.code,
"radius_km": payload.radius_km,
"data_years": round(tower_years, 6),
"strike_count": strike_count,
"region_id": _normalize_str(payload.region_id),
"is_synthetic": payload.is_synthetic,
"prepared_at": now.isoformat(),
}
tower.raw_extra_json = raw_extra
density_values.append(float(tower.lightning_density))
updated_tower_count += 1
if missing_geo_count > 0:
warnings.append(f"{missing_geo_count} 座杆塔缺少经纬度,未能回填地闪密度")
line.update_date = now
line.update_user = actor_user_id
record_line_preparation_source(
line,
component="lightning_density",
payload={
"prepared_at": now.isoformat(),
"prepared_by_user_id": actor_user_id,
"region_id": _normalize_str(payload.region_id),
"is_synthetic": payload.is_synthetic,
"radius_km": payload.radius_km,
"data_years": round(source_years, 6),
"updated_tower_count": updated_tower_count,
"missing_geo_count": missing_geo_count,
"avg_density": round(sum(density_values) / len(density_values), 6) if density_values else None,
"min_density": round(min(density_values), 6) if density_values else None,
"max_density": round(max(density_values), 6) if density_values else None,
},
)
db.commit()
preparation_json = summarize_line_preparation(db, line, tower_count=len(towers))
_publish_line_change(
"power-lines.lightning-density.prepared",
{"action": "lightning_density_prepared", "line_id": line.id},
)
return LightningDensityPreparationResponse(
line=serialize_line(line, tower_count=len(towers), preparation_json=preparation_json),
updated_tower_count=updated_tower_count,
missing_geo_count=missing_geo_count,
radius_km=payload.radius_km,
data_years=round(source_years, 6),
avg_density=round(sum(density_values) / len(density_values), 6) if density_values else None,
min_density=round(min(density_values), 6) if density_values else None,
max_density=round(max(density_values), 6) if density_values else None,
warning_count=len(warnings),
warnings=warnings,
)
def _build_distribution_filters(
*,
min_lat: float | None,
@@ -2085,6 +2356,83 @@ def _parse_float(value: Any) -> float | None:
return None
def _fit_line_current_parameters(values: list[float]) -> tuple[float, float, list[str]]:
cleaned = sorted(float(item) for item in values if item > 0)
if not cleaned:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="雷电流幅值样本为空")
warnings: list[str] = []
unique_values = {round(item, 6) for item in cleaned}
if len(unique_values) == 1:
return round(cleaned[0], 3), 2.6, ["样本幅值单一,已使用默认 b=2.6"]
peak_max = max(cleaned)
peak_min = min(cleaned)
thresholds = [peak_max]
probabilities = [1.0 / len(cleaned)]
current = peak_max
while current > peak_min:
current = round(current - 0.1, 10)
thresholds.append(current)
exceedance_count = sum(1 for item in cleaned if item >= current)
probabilities.append(exceedance_count / len(cleaned))
lower_index: int | None = None
upper_index: int | None = None
best_lower = -1.0
best_upper = 1.0
for index, probability in enumerate(probabilities):
delta = probability - 0.5
if delta < 0:
if delta > best_lower:
best_lower = delta
lower_index = index
else:
if delta < best_upper:
best_upper = delta
upper_index = index
if lower_index is None or upper_index is None or lower_index == upper_index:
current_a = _median(cleaned)
warnings.append("样本分布不足以插值求解 a,已回退到中位数")
else:
upper_probability = probabilities[upper_index]
lower_probability = probabilities[lower_index]
upper_threshold = thresholds[upper_index]
lower_threshold = thresholds[lower_index]
denominator = lower_probability - upper_probability
if abs(denominator) < 1e-9:
current_a = _median(cleaned)
warnings.append("样本概率分布异常,已回退到中位数")
else:
current_a = (0.5 - upper_probability) * (lower_threshold - upper_threshold) / denominator + upper_threshold
exponent_values: list[float] = []
if current_a <= 0:
current_a = _median(cleaned)
warnings.append("样本拟合得到的 a 非法,已回退到中位数")
for threshold, probability in zip(thresholds[:-1], probabilities[:-1], strict=False):
if probability <= 0 or probability >= 1:
continue
if threshold <= current_a:
continue
denominator = math.log(threshold / current_a)
if abs(denominator) < 1e-9:
continue
numerator = math.log(1.0 / probability - 1.0)
exponent = numerator / denominator
if math.isfinite(exponent):
exponent_values.append(exponent)
if exponent_values:
current_b = sum(exponent_values) / len(exponent_values)
else:
current_b = 2.6
warnings.append("样本分布不足以稳定拟合 b,已使用默认 b=2.6")
return round(current_a, 3), round(current_b, 3), warnings
def _normalize_str(value: Any) -> str | None:
if value is None:
return None
@@ -2110,6 +2458,18 @@ def _generate_event_id() -> str:
return f"LC-{now}-{uuid4().hex[:6]}"
def _publish_line_change(event_name: str, payload: dict[str, Any]) -> None:
_fire_and_forget(
publish_topic(
POWER_LINES_TOPIC,
name=event_name,
payload=payload,
requires_refetch=["/api/v1/lines"],
dedupe_key=f"{event_name}:{payload.get('line_id', 'unknown')}",
)
)
def _publish_lightning_change(event_name: str, payload: dict[str, Any]) -> None:
_fire_and_forget(
publish_topic(
@@ -0,0 +1,233 @@
from __future__ import annotations
from typing import Any
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from ..models.line import Line
from ..models.line_tower import LineTower
from ..models.tower_profile import TowerProfile
PREPARATION_LABELS = {
"lightning_current": "雷电流幅值",
"lightning_density": "地闪密度",
"ground_slope": "地面倾角",
}
PREPARATION_SOURCE_KEY = "preparation_sources"
def summarize_line_preparations(
db: Session,
lines: list[Line],
*,
tower_count_map: dict[str, int] | None = None,
) -> dict[str, dict[str, Any]]:
line_ids = [line.id for line in lines]
if not line_ids:
return {}
counts = _load_preparation_count_maps(db, line_ids=line_ids, tower_count_map=tower_count_map)
summaries: dict[str, dict[str, Any]] = {}
for line in lines:
tower_total = counts["tower"].get(line.id, 0)
current_ready = counts["lightning_current"].get(line.id, 0)
density_ready = counts["lightning_density"].get(line.id, 0)
slope_ready = counts["ground_slope"].get(line.id, 0)
summaries[line.id] = _build_summary(
line,
tower_total=tower_total,
current_ready=current_ready,
density_ready=density_ready,
slope_ready=slope_ready,
)
return summaries
def summarize_line_preparation(
db: Session,
line: Line,
*,
tower_count: int | None = None,
) -> dict[str, Any]:
tower_count_map = {line.id: tower_count} if tower_count is not None else None
return summarize_line_preparations(db, [line], tower_count_map=tower_count_map).get(line.id, {})
def record_line_preparation_source(
line: Line,
*,
component: str,
payload: dict[str, Any],
) -> None:
line_params = dict(line.lightning_param_json or {})
sources = _extract_preparation_sources(line_params)
sources[component] = {
**dict(payload),
"component": component,
"label": PREPARATION_LABELS.get(component, component),
}
line_params[PREPARATION_SOURCE_KEY] = sources
line.lightning_param_json = line_params
def _build_summary(
line: Line,
*,
tower_total: int,
current_ready: int,
density_ready: int,
slope_ready: int,
) -> dict[str, Any]:
line_params = dict(line.lightning_param_json or {})
current_a = _coerce_float(line_params.get("雷电流幅值a"))
current_b = _coerce_float(line_params.get("雷电流幅值b"))
sources = _extract_preparation_sources(line_params)
current_summary = _build_component_summary(
component="lightning_current",
tower_total=tower_total,
ready_count=current_ready,
source=sources.get("lightning_current"),
values={
"current_a": current_a,
"current_b": current_b,
},
line_ready=(current_a is not None and current_b is not None),
)
density_summary = _build_component_summary(
component="lightning_density",
tower_total=tower_total,
ready_count=density_ready,
source=sources.get("lightning_density"),
)
slope_summary = _build_component_summary(
component="ground_slope",
tower_total=tower_total,
ready_count=slope_ready,
source=sources.get("ground_slope"),
)
items = [current_summary, density_summary, slope_summary]
missing_items = [str(item["label"]) for item in items if not bool(item["ready"])]
return {
"all_ready": not missing_items,
"missing_items": missing_items,
"lightning_current": current_summary,
"lightning_density": density_summary,
"ground_slope": slope_summary,
}
def _build_component_summary(
*,
component: str,
tower_total: int,
ready_count: int,
source: dict[str, Any] | None,
values: dict[str, Any] | None = None,
line_ready: bool = True,
) -> dict[str, Any]:
ready = tower_total > 0 and ready_count >= tower_total and line_ready
return {
"key": component,
"label": PREPARATION_LABELS.get(component, component),
"ready": ready,
"status": "ready" if ready else "missing",
"tower_total_count": tower_total,
"tower_ready_count": min(ready_count, tower_total),
"missing_tower_count": max(tower_total - ready_count, 0),
"line_ready": line_ready,
"values": dict(values or {}),
"source": dict(source or {}),
}
def _load_preparation_count_maps(
db: Session,
*,
line_ids: list[str],
tower_count_map: dict[str, int] | None,
) -> dict[str, dict[str, int]]:
tower_counts = tower_count_map or _count_towers(db, line_ids)
density_counts = _count_tower_field(
db,
line_ids,
field_name="lightning_density",
)
slope_counts = _count_towers_with_slopes(db, line_ids)
current_counts = _count_profiles_with_currents(db, line_ids)
return {
"tower": tower_counts,
"lightning_current": current_counts,
"lightning_density": density_counts,
"ground_slope": slope_counts,
}
def _count_towers(db: Session, line_ids: list[str]) -> dict[str, int]:
rows = db.execute(
select(LineTower.line_id, func.count())
.where(LineTower.line_id.in_(line_ids))
.group_by(LineTower.line_id)
).all()
return {str(line_id): int(count or 0) for line_id, count in rows}
def _count_tower_field(db: Session, line_ids: list[str], *, field_name: str) -> dict[str, int]:
field = getattr(LineTower, field_name)
rows = db.execute(
select(LineTower.line_id, func.count())
.where(LineTower.line_id.in_(line_ids), field.is_not(None))
.group_by(LineTower.line_id)
).all()
return {str(line_id): int(count or 0) for line_id, count in rows}
def _count_towers_with_slopes(db: Session, line_ids: list[str]) -> dict[str, int]:
rows = db.execute(
select(LineTower.line_id, func.count())
.where(
LineTower.line_id.in_(line_ids),
LineTower.slope_1.is_not(None),
LineTower.slope_2.is_not(None),
)
.group_by(LineTower.line_id)
).all()
return {str(line_id): int(count or 0) for line_id, count in rows}
def _count_profiles_with_currents(db: Session, line_ids: list[str]) -> dict[str, int]:
rows = db.execute(
select(LineTower.line_id, func.count())
.select_from(LineTower)
.join(TowerProfile, TowerProfile.tower_id == LineTower.id)
.where(
LineTower.line_id.in_(line_ids),
TowerProfile.current_a.is_not(None),
TowerProfile.current_b.is_not(None),
)
.group_by(LineTower.line_id)
).all()
return {str(line_id): int(count or 0) for line_id, count in rows}
def _extract_preparation_sources(line_params: dict[str, Any]) -> dict[str, dict[str, Any]]:
raw = line_params.get(PREPARATION_SOURCE_KEY)
if not isinstance(raw, dict):
return {}
normalized: dict[str, dict[str, Any]] = {}
for key, value in raw.items():
if isinstance(key, str) and isinstance(value, dict):
normalized[key] = dict(value)
return normalized
def _coerce_float(value: Any) -> float | None:
if value is None or value == "":
return None
try:
return float(value)
except (TypeError, ValueError):
return None
+28 -5
View File
@@ -29,6 +29,7 @@ from ..schemas.line import (
LineTowerUpdateRequest,
LineUpdateRequest,
)
from .line_preparation_service import summarize_line_preparation, summarize_line_preparations
from .push_service import publish_topic
LINE_TOPIC = "admin.power-lines"
@@ -48,7 +49,12 @@ class CsvImportStats:
self.warnings = []
def serialize_line(line: Line, *, tower_count: int = 0) -> LineSummary:
def serialize_line(
line: Line,
*,
tower_count: int = 0,
preparation_json: dict[str, Any] | None = None,
) -> LineSummary:
return LineSummary(
id=line.id,
code=line.code,
@@ -57,6 +63,7 @@ def serialize_line(line: Line, *, tower_count: int = 0) -> LineSummary:
phase_sequence_json=line.phase_sequence_json or {},
arrester_install_json=line.arrester_install_json or {},
lightning_param_json=line.lightning_param_json or {},
preparation_json=preparation_json or {},
tower_count=tower_count,
create_date=line.create_date,
create_user=line.create_user,
@@ -113,9 +120,17 @@ def list_lines(
items = db.execute(stmt.order_by(Line.update_date.desc(), Line.code.asc())).scalars().all()
line_ids = [item.id for item in items]
tower_count_map = _load_tower_counts(db, line_ids)
preparation_map = summarize_line_preparations(db, items, tower_count_map=tower_count_map)
return LineListResponse(
items=[serialize_line(item, tower_count=tower_count_map.get(item.id, 0)) for item in items],
items=[
serialize_line(
item,
tower_count=tower_count_map.get(item.id, 0),
preparation_json=preparation_map.get(item.id, {}),
)
for item in items
],
total=total,
)
@@ -165,7 +180,7 @@ def create_line(
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to load created line")
_publish_line_change("power-lines.created", {"action": "created", "line_id": saved.id})
return serialize_line(saved, tower_count=0)
return serialize_line(saved, tower_count=0, preparation_json=summarize_line_preparation(db, saved, tower_count=0))
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate unique line code")
@@ -203,7 +218,11 @@ def update_line(
tower_count = int(db.scalar(select(func.count()).select_from(LineTower).where(LineTower.line_id == line_id)) or 0)
_publish_line_change("power-lines.updated", {"action": "updated", "line_id": line_id})
return serialize_line(saved, tower_count=tower_count)
return serialize_line(
saved,
tower_count=tower_count,
preparation_json=summarize_line_preparation(db, saved, tower_count=tower_count),
)
def delete_line(db: Session, line_id: str) -> tuple[bool, int]:
@@ -539,7 +558,11 @@ def import_line_towers_from_csv(
)
return LineTowerImportResponse(
line=serialize_line(line, tower_count=tower_count),
line=serialize_line(
line,
tower_count=tower_count,
preparation_json=summarize_line_preparation(db, line, tower_count=tower_count),
),
imported_count=stats.imported_count,
updated_count=stats.updated_count,
skipped_count=stats.skipped_count,
+8 -1
View File
@@ -66,7 +66,12 @@ def test_execute_job_runs_external_adapter_and_backfills_results(monkeypatch, tm
monkeypatch.setattr(fl_analysis_external, "_resolve_engine_workdir", lambda: tmp_path)
monkeypatch.setattr(fl_analysis_external, "_resolve_native_engine_executable", lambda: ("/bin/sh", None))
line = Line(code="L-001", name="示例线路", voltage_kv=220, lightning_param_json={})
line = Line(
code="L-001",
name="示例线路",
voltage_kv=220,
lightning_param_json={"雷电流幅值a": 31.0, "雷电流幅值b": 2.6},
)
session.add(line)
session.flush()
@@ -108,6 +113,8 @@ def test_execute_job_runs_external_adapter_and_backfills_results(monkeypatch, tm
arrester_c="",
shield_wire_height_m=41.0,
insulator_length_m=4200.0,
current_a=31.0,
current_b=2.6,
current_type="Heidler",
current_head_time_us=2.6,
current_tail_time_us=50.0,
+172
View File
@@ -0,0 +1,172 @@
from __future__ import annotations
from types import SimpleNamespace
import pytest
from fastapi import HTTPException
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, sessionmaker
from app.core.database import Base
from app.models.fl_analysis import FlAnalysisJob
from app.models.lightning_event import LightningCurrentEvent
from app.models.line import Line
from app.models.line_tower import LineTower
from app.models.tower_profile import TowerProfile
from app.schemas.fl_analysis import FlAnalysisJobCreateRequest
from app.schemas.lightning import LightningCurrentPreparationRequest
from app.services import elevation_service, fl_analysis_service, lightning_service
def _build_session() -> Session:
engine = create_engine("sqlite+pysqlite:///:memory:")
Base.metadata.create_all(
bind=engine,
tables=[
Line.__table__,
LineTower.__table__,
TowerProfile.__table__,
LightningCurrentEvent.__table__,
FlAnalysisJob.__table__,
],
)
testing_session = sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False)
return testing_session()
def test_prepare_line_lightning_current_backfills_profiles_and_line_state(monkeypatch) -> None:
session = _build_session()
try:
monkeypatch.setattr(lightning_service, "_publish_line_change", lambda *args, **kwargs: None)
line = Line(code="L-001", name="示例线路", voltage_kv=220, lightning_param_json={})
session.add(line)
session.flush()
session.add_all(
[
LineTower(line_id=line.id, seq_no=1, tower_no="N1", longitude=120.0, latitude=30.0),
LineTower(line_id=line.id, seq_no=2, tower_no="N2", longitude=120.001, latitude=30.001),
]
)
session.add_all(
[
LightningCurrentEvent(
event_id="LC-001",
peak_abs_current_ka=18.0,
peak_current_ka=18.0,
polarity="negative",
sample_count=1,
stroke_count=1,
stroke_peaks_json=[],
region_id="HB",
),
LightningCurrentEvent(
event_id="LC-002",
peak_abs_current_ka=32.0,
peak_current_ka=32.0,
polarity="negative",
sample_count=1,
stroke_count=1,
stroke_peaks_json=[],
region_id="HB",
),
LightningCurrentEvent(
event_id="LC-003",
peak_abs_current_ka=46.0,
peak_current_ka=46.0,
polarity="negative",
sample_count=1,
stroke_count=1,
stroke_peaks_json=[],
region_id="HB",
),
]
)
session.commit()
response = lightning_service.prepare_line_lightning_current(
session,
LightningCurrentPreparationRequest(line_id=line.id, region_id="HB"),
actor_user_id="tester",
)
profiles = session.execute(select(TowerProfile).order_by(TowerProfile.tower_id.asc())).scalars().all()
assert len(profiles) == 2
assert all(profile.current_a == response.current_a for profile in profiles)
assert all(profile.current_b == response.current_b for profile in profiles)
assert response.created_profile_count == 2
assert response.line.preparation_json["lightning_current"]["ready"] is True
assert session.get(Line, line.id).lightning_param_json["雷电流幅值a"] == response.current_a
assert session.get(Line, line.id).lightning_param_json["雷电流幅值b"] == response.current_b
finally:
session.close()
def test_fl_analysis_create_job_rejects_unprepared_line(monkeypatch) -> None:
session = _build_session()
try:
monkeypatch.setattr(fl_analysis_service, "_publish_change", lambda *args, **kwargs: None)
line = Line(code="L-002", name="待分析线路", voltage_kv=500, lightning_param_json={})
session.add(line)
session.flush()
session.add(LineTower(line_id=line.id, seq_no=1, tower_no="T1", longitude=120.0, latitude=30.0))
session.commit()
with pytest.raises(HTTPException) as captured:
fl_analysis_service.create_job(
session,
FlAnalysisJobCreateRequest(line_id=line.id, job_type="normal"),
actor=SimpleNamespace(id="tester"),
)
assert captured.value.status_code == 400
assert "雷电流幅值" in str(captured.value.detail)
assert "地闪密度" in str(captured.value.detail)
assert "地面倾角" in str(captured.value.detail)
finally:
session.close()
def test_apply_points_to_line_towers_computes_ground_slopes() -> None:
session = _build_session()
try:
line = Line(code="L-003", name="高程线路", voltage_kv=110, lightning_param_json={})
session.add(line)
session.flush()
meter_to_lat = 1 / 111_320.0
session.add_all(
[
LineTower(line_id=line.id, seq_no=1, tower_no="P1", longitude=120.0, latitude=30.0 + 300 * meter_to_lat),
LineTower(line_id=line.id, seq_no=2, tower_no="P2", longitude=120.0, latitude=30.0 + 600 * meter_to_lat),
LineTower(line_id=line.id, seq_no=3, tower_no="P3", longitude=120.0, latitude=30.0 + 900 * meter_to_lat),
]
)
session.commit()
points = [
elevation_service.ElevationSamplePoint(
lon=120.0,
lat=30.0 + distance_m * meter_to_lat,
altitude_m=100.0 + distance_m * 0.12,
)
for distance_m in range(0, 1251, 50)
]
stats = elevation_service._apply_points_to_line_towers(
session,
line_id=line.id,
dataset=SimpleNamespace(id="ds-1", code="DEM-001"),
mode="overwrite_all",
points=points,
)
towers = session.execute(select(LineTower).where(LineTower.line_id == line.id).order_by(LineTower.seq_no.asc())).scalars().all()
assert stats["updated_tower_count"] == 3
assert all(tower.altitude_m is not None for tower in towers)
assert towers[1].slope_1 is not None
assert towers[1].slope_2 is not None
finally:
session.close()
+2 -1
View File
@@ -8,13 +8,14 @@ from sqlalchemy.orm import Session, sessionmaker
from app.core.database import Base
from app.models.line import Line
from app.models.line_tower import LineTower
from app.models.tower_profile import TowerProfile
from app.schemas.line import LineCreateRequest
from app.services import line_service
def _build_session() -> Session:
engine = create_engine("sqlite+pysqlite:///:memory:")
Base.metadata.create_all(bind=engine, tables=[Line.__table__, LineTower.__table__])
Base.metadata.create_all(bind=engine, tables=[Line.__table__, LineTower.__table__, TowerProfile.__table__])
testing_session = sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False)
return testing_session()
+31 -3
View File
@@ -29,6 +29,7 @@ import { ElevationPreviewCesiumMap } from "@/components/elevation-preview-cesium
import { Card } from "@/components/ui-antd";
import { useTopicSubscription } from "@/hooks/use-topic-subscription";
import { getApiBaseUrl, readApiError } from "@/lib/api";
import { readLinePreparation } from "@/lib/line-preparation";
import type {
ElevationApplyJobCreateResponse,
ElevationApplyJobListResponse,
@@ -156,6 +157,7 @@ export default function AdminElevationPage() {
const [datasetForm] = Form.useForm<DatasetFormValues>();
const [applyForm] = Form.useForm<ApplyFormValues>();
const selectedApplyLineId = Form.useWatch("line_id", applyForm);
const canRead = hasPermission("elevation.read") || hasPermission("elevation.manage");
const canManage = hasPermission("elevation.manage");
@@ -370,7 +372,7 @@ export default function AdminElevationPage() {
},
onSuccess: async () => {
setError("");
messageApi.success("高程回填任务已提交");
messageApi.success("高程与地面倾角回填任务已提交");
setApplyModalOpen(false);
applyForm.resetFields();
await refreshElevationData();
@@ -443,9 +445,9 @@ export default function AdminElevationPage() {
staleTime: 0,
});
const datasets = datasetsQuery.data?.items ?? [];
const datasets = useMemo(() => datasetsQuery.data?.items ?? [], [datasetsQuery.data?.items]);
const jobs = jobsQuery.data?.items ?? [];
const lines = linesQuery.data?.items ?? [];
const lines = useMemo(() => linesQuery.data?.items ?? [], [linesQuery.data?.items]);
const lineOptions = useMemo(
() =>
@@ -455,6 +457,11 @@ export default function AdminElevationPage() {
})),
[lines],
);
const selectedApplyLine = useMemo(
() => lines.find((item) => item.id === selectedApplyLineId) ?? null,
[lines, selectedApplyLineId],
);
const selectedApplyPreparation = useMemo(() => readLinePreparation(selectedApplyLine), [selectedApplyLine]);
const datasetOptions = useMemo(
() =>
datasets
@@ -1123,6 +1130,27 @@ export default function AdminElevationPage() {
<Form.Item name="line_id" label="线路" rules={[{ required: true, message: "请选择线路" }]}>
<Select showSearch options={lineOptions} optionFilterProp="label" placeholder="选择线路" />
</Form.Item>
{selectedApplyLine ? (
<Alert
type={selectedApplyPreparation.ground_slope.ready ? "success" : "info"}
showIcon
className="mb-4"
message={selectedApplyPreparation.ground_slope.ready ? "该线路已具备地面倾角准备记录" : "本次任务会同时补高程与地面倾角"}
description={
<Space size={[8, 8]} wrap>
{[
selectedApplyPreparation.lightning_current,
selectedApplyPreparation.lightning_density,
selectedApplyPreparation.ground_slope,
].map((item) => (
<Tag key={item.key} color={item.ready ? "green" : "red"}>
{`${item.label} ${item.tower_ready_count}/${item.tower_total_count}`}
</Tag>
))}
</Space>
}
/>
) : null}
<Form.Item name="dataset_id" label="高程数据集" rules={[{ required: true, message: "请选择高程数据集" }]}>
<Select showSearch options={datasetOptions} optionFilterProp="label" placeholder="选择高程数据集" />
</Form.Item>
+43 -1
View File
@@ -25,6 +25,7 @@ import type { ColumnsType } from "antd/es/table";
import { useAuth } from "@/components/auth-provider";
import { Card } from "@/components/ui-antd";
import { readApiError } from "@/lib/api";
import { readLinePreparation } from "@/lib/line-preparation";
import type {
AtpEngineStatusResponse,
AtpModelListResponse,
@@ -281,6 +282,10 @@ function readOptionalNumber(record: Record<string, unknown>, key: string): numbe
return typeof value === "number" ? value : null;
}
function preparationColor(ready: boolean): string {
return ready ? "green" : "red";
}
function readDownloadFilename(headerValue: string | null, fallback: string): string {
if (!headerValue) {
return fallback;
@@ -1064,6 +1069,7 @@ export default function AdminFlAnalysisPage() {
const selectedLine = useMemo(() => {
return linesQuery.data?.items.find((item) => item.id === selectedLineId) ?? null;
}, [linesQuery.data?.items, selectedLineId]);
const selectedLinePreparation = useMemo(() => readLinePreparation(selectedLine), [selectedLine]);
const externalAdapterActive = selectedExternalAdapter === "atp" || selectedExternalAdapter === "wine";
const engineMode = engineQuery.data?.mode;
const adapterOptions = [
@@ -1192,12 +1198,48 @@ export default function AdminFlAnalysisPage() {
/>
</Form.Item>
<Form.Item label=" ">
<Button type="primary" htmlType="submit" loading={createJobMutation.isPending} className="w-full">
<Button
type="primary"
htmlType="submit"
loading={createJobMutation.isPending}
disabled={!selectedLine || !selectedLinePreparation.all_ready}
className="w-full"
>
{formatJobType(selectedCreateJobType)}
</Button>
</Form.Item>
</div>
{selectedLine ? (
<Alert
type={selectedLinePreparation.all_ready ? "success" : "warning"}
showIcon
message={selectedLinePreparation.all_ready ? "参数准备已完成" : `当前线路缺少:${selectedLinePreparation.missing_items.join("、")}`}
description={
<Space size={[8, 8]} wrap>
{[
selectedLinePreparation.lightning_current,
selectedLinePreparation.lightning_density,
selectedLinePreparation.ground_slope,
].map((item) => {
const source = readObject(item.source);
const preparedAt = readOptionalString(source, "prepared_at");
const currentA = readOptionalNumber(readObject(item.values), "current_a");
const currentB = readOptionalNumber(readObject(item.values), "current_b");
const suffix = item.key === "lightning_current" && currentA !== null && currentB !== null
? ` (${currentA.toFixed(3)} / ${currentB.toFixed(3)})`
: "";
return (
<Tag key={item.key} color={preparationColor(item.ready)}>
{`${item.label}${suffix} ${item.tower_ready_count}/${item.tower_total_count}${preparedAt ? ` @ ${formatDateTime(preparedAt)}` : ""}`}
</Tag>
);
})}
</Space>
}
/>
) : null}
{selectedCreateJobType === "normal" || selectedCreateJobType === "tongtiao" ? (
<>
<Alert
+195 -25
View File
@@ -20,20 +20,25 @@ import {
Typography,
} from "antd";
import type { ColumnsType } from "antd/es/table";
import { useCallback, useEffect, useMemo, useRef, useState, type CSSProperties } from "react";
import { useCallback, useMemo, useRef, useState, type CSSProperties } from "react";
import { useAuth } from "@/components/auth-provider";
import { LightningDistributionMap } from "@/components/lightning-distribution-map";
import { Card } from "@/components/ui-antd";
import { useTopicSubscription } from "@/hooks/use-topic-subscription";
import { readApiError } from "@/lib/api";
import { readLinePreparation } from "@/lib/line-preparation";
import type {
LineListResponse,
LineSummary,
LightningCurrentEventListResponse,
LightningCurrentEventSummary,
LightningCurrentExceedanceResponse,
LightningCurrentImportResponse,
LightningCurrentPreparationResponse,
LightningCurrentSampleListResponse,
LightningCurrentSampleItem,
LightningDensityPreparationResponse,
LightningDistributionImportResponse,
LightningDistributionReportResponse,
LightningDistributionStatsResponse,
@@ -174,6 +179,9 @@ export default function AdminLightningCurrentsPage() {
const [towerBufferValues, setTowerBufferValues] = useState<TowerBufferFormValues>(INITIAL_TOWER_BUFFER_VALUES);
const [reportPeriod, setReportPeriod] = useState<"week" | "month">("week");
const [selectedEventId, setSelectedEventId] = useState<string | null>(null);
const [selectedLineId, setSelectedLineId] = useState("");
const [prepareDensityRadiusKm, setPrepareDensityRadiusKm] = useState(3);
const [prepareDensityYears, setPrepareDensityYears] = useState<number | null>(null);
const [error, setError] = useState("");
const [success, setSuccess] = useState("");
@@ -200,13 +208,6 @@ export default function AdminLightningCurrentsPage() {
return `/api/v1/lightning-currents?${params.toString()}`;
}, [keyword, regionFilter, polarityFilter, syntheticFilter]);
const samplePath = useMemo(() => {
if (!selectedEventId) {
return "";
}
return `/api/v1/lightning-currents/${selectedEventId}/samples?limit=200&offset=0`;
}, [selectedEventId]);
const exceedancePath = useMemo(() => {
const params = new URLSearchParams();
if (regionFilter.trim()) {
@@ -280,6 +281,18 @@ export default function AdminLightningCurrentsPage() {
if (syntheticFilter !== "all") params.set("is_synthetic", syntheticFilter);
return `/api/v1/lightning-currents/reports/distribution?${params.toString()}`;
}, [distributionFilters, keyword, regionFilter, reportPeriod, syntheticFilter]);
const linesQuery = useQuery({
queryKey: ["/api/v1/lines"],
enabled: !!user && canRead,
queryFn: async () => {
const response = await fetchWithAuth("/api/v1/lines");
if (!response.ok) {
throw new Error(await readApiError(response));
}
return (await response.json()) as LineListResponse;
},
});
const activeSelectedLineId = selectedLineId || linesQuery.data?.items[0]?.id || "";
const eventsQuery = useQuery({
queryKey: [eventListPath],
@@ -292,10 +305,20 @@ export default function AdminLightningCurrentsPage() {
return (await response.json()) as LightningCurrentEventListResponse;
},
});
const events = useMemo(() => eventsQuery.data?.items ?? [], [eventsQuery.data?.items]);
const activeSelectedEventId = selectedEventId && events.some((item) => item.id === selectedEventId)
? selectedEventId
: (events[0]?.id ?? null);
const samplePath = useMemo(() => {
if (!activeSelectedEventId) {
return "";
}
return `/api/v1/lightning-currents/${activeSelectedEventId}/samples?limit=200&offset=0`;
}, [activeSelectedEventId]);
const samplesQuery = useQuery({
queryKey: [samplePath],
enabled: !!user && canRead && !isDistributionOnly && !!selectedEventId,
enabled: !!user && canRead && !isDistributionOnly && !!activeSelectedEventId,
queryFn: async () => {
if (!samplePath) {
return { items: [], total: 0, limit: 0, offset: 0 } satisfies LightningCurrentSampleListResponse;
@@ -378,7 +401,10 @@ export default function AdminLightningCurrentsPage() {
predicate: (query) =>
Array.isArray(query.queryKey)
&& typeof query.queryKey[0] === "string"
&& query.queryKey[0].startsWith("/api/v1/lightning-currents"),
&& (
query.queryKey[0].startsWith("/api/v1/lightning-currents")
|| query.queryKey[0].startsWith("/api/v1/lines")
),
});
}, [queryClient]);
@@ -389,11 +415,10 @@ export default function AdminLightningCurrentsPage() {
}, [refreshAll]),
);
const events = eventsQuery.data?.items ?? [];
const samples = samplesQuery.data?.items ?? [];
const exceedance = exceedanceQuery.data?.thresholds ?? [];
const distributionStats = distributionStatsQuery.data;
const distributionGridCells = distributionStats?.grid_cells ?? [];
const distributionGridCells = useMemo(() => distributionStats?.grid_cells ?? [], [distributionStats?.grid_cells]);
const distributionScatterPoints = distributionStats?.scatter_points ?? [];
const distributionPCurve = distributionStats?.p_curve ?? [];
const towerBufferStats = towerBufferQuery.data;
@@ -401,19 +426,14 @@ export default function AdminLightningCurrentsPage() {
const syntheticCompare = syntheticCompareQuery.data;
const distributionReport = reportQuery.data;
const selectedEvent = useMemo(
() => events.find((item) => item.id === selectedEventId) ?? null,
[events, selectedEventId],
() => events.find((item) => item.id === activeSelectedEventId) ?? null,
[activeSelectedEventId, events],
);
useEffect(() => {
if (!selectedEventId && events.length > 0) {
setSelectedEventId(events[0].id);
return;
}
if (selectedEventId && !events.some((item) => item.id === selectedEventId)) {
setSelectedEventId(events.length > 0 ? events[0].id : null);
}
}, [events, selectedEventId]);
const selectedLine = useMemo(
() => linesQuery.data?.items.find((item) => item.id === activeSelectedLineId) ?? null,
[activeSelectedLineId, linesQuery.data?.items],
);
const selectedLinePreparation = useMemo(() => readLinePreparation(selectedLine), [selectedLine]);
const importMutation = useMutation({
mutationFn: async (file: File) => {
@@ -519,6 +539,70 @@ export default function AdminLightningCurrentsPage() {
},
});
const prepareCurrentMutation = useMutation({
mutationFn: async () => {
if (!activeSelectedLineId) {
throw new Error("请选择线路");
}
const response = await fetchWithAuth("/api/v1/lightning-currents/prepare-current", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
line_id: activeSelectedLineId,
region_id: regionFilter.trim() || null,
is_synthetic: syntheticFilter === "all" ? null : syntheticFilter === "true",
}),
});
if (!response.ok) {
throw new Error(await readApiError(response));
}
return (await response.json()) as LightningCurrentPreparationResponse;
},
onSuccess: async (payload) => {
setError("");
setSuccess(`已为 ${payload.line.name || payload.line.code} 回填雷电流幅值 a/b = ${payload.current_a} / ${payload.current_b}`);
await refreshAll();
},
onError: (candidate) => {
setSuccess("");
setError(candidate instanceof Error ? candidate.message : "线路雷电流回填失败");
},
});
const prepareDensityMutation = useMutation({
mutationFn: async () => {
if (!activeSelectedLineId) {
throw new Error("请选择线路");
}
const response = await fetchWithAuth("/api/v1/lightning-currents/prepare-density", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
line_id: activeSelectedLineId,
region_id: regionFilter.trim() || null,
is_synthetic: syntheticFilter === "all" ? null : syntheticFilter === "true",
radius_km: prepareDensityRadiusKm,
years: prepareDensityYears,
}),
});
if (!response.ok) {
throw new Error(await readApiError(response));
}
return (await response.json()) as LightningDensityPreparationResponse;
},
onSuccess: async (payload) => {
setError("");
setSuccess(
`已为 ${payload.line.name || payload.line.code} 回填地闪密度,平均值 ${formatNumber(payload.avg_density, 6)} Ng/km²·年`,
);
await refreshAll();
},
onError: (candidate) => {
setSuccess("");
setError(candidate instanceof Error ? candidate.message : "线路地闪密度回填失败");
},
});
const eventColumns = useMemo<ColumnsType<LightningCurrentEventSummary>>(
() => [
{
@@ -742,6 +826,92 @@ export default function AdminLightningCurrentsPage() {
)}
{success && <Alert type="success" showIcon message="操作成功" description={success} />}
<Card title="线路参数准备">
<Space direction="vertical" size={12} className="w-full">
<Typography.Text type="secondary">
线使
</Typography.Text>
<div className="grid gap-3 md:grid-cols-2 xl:grid-cols-4">
<Select
showSearch
optionFilterProp="label"
value={activeSelectedLineId || undefined}
onChange={setSelectedLineId}
placeholder="选择线路"
loading={linesQuery.isLoading}
options={(linesQuery.data?.items ?? []).map((item: LineSummary) => ({
value: item.id,
label: `${item.name || item.code} / ${item.code}`,
}))}
/>
<InputNumber
min={0.05}
max={50}
step={0.5}
value={prepareDensityRadiusKm}
onChange={(value) => setPrepareDensityRadiusKm(typeof value === "number" ? value : 3)}
addonAfter="km"
placeholder="密度半径"
className="w-full"
/>
<InputNumber
min={0.1}
max={100}
step={0.5}
value={prepareDensityYears}
onChange={(value) => setPrepareDensityYears(typeof value === "number" ? value : null)}
addonAfter="年"
placeholder="数据年限(可选)"
className="w-full"
/>
<Space.Compact block>
<Button
type="primary"
onClick={() => prepareCurrentMutation.mutate()}
loading={prepareCurrentMutation.isPending}
disabled={!canManage || !activeSelectedLineId}
>
</Button>
<Button
onClick={() => prepareDensityMutation.mutate()}
loading={prepareDensityMutation.isPending}
disabled={!canManage || !activeSelectedLineId}
>
</Button>
</Space.Compact>
</div>
{selectedLine ? (
<Alert
type={selectedLinePreparation.all_ready ? "success" : "warning"}
showIcon
message={selectedLinePreparation.all_ready ? "当前线路准备已齐备" : `缺少:${selectedLinePreparation.missing_items.join("、")}`}
description={
<Space size={[8, 8]} wrap>
{[
selectedLinePreparation.lightning_current,
selectedLinePreparation.lightning_density,
selectedLinePreparation.ground_slope,
].map((item) => {
const source = item.source;
const preparedAt = typeof source.prepared_at === "string" ? source.prepared_at : null;
const values = item.values;
const currentA = typeof values.current_a === "number" ? values.current_a : null;
const currentB = typeof values.current_b === "number" ? values.current_b : null;
return (
<Tag key={item.key} color={item.ready ? "green" : "red"}>
{`${item.label}${currentA !== null && currentB !== null ? ` (${formatNumber(currentA, 3)} / ${formatNumber(currentB, 3)})` : ""} ${item.tower_ready_count}/${item.tower_total_count}${preparedAt ? ` @ ${new Date(preparedAt).toLocaleString("zh-CN", { hour12: false })}` : ""}`}
</Tag>
);
})}
</Space>
}
/>
) : null}
</Space>
</Card>
{!isDistributionOnly && (
<Card title="雷电幅值统计导入与事件管理">
<Space direction="vertical" size={12} className="w-full">
@@ -1109,7 +1279,7 @@ export default function AdminLightningCurrentsPage() {
loading={eventsQuery.isFetching}
pagination={false}
scroll={{ x: 1700 }}
rowClassName={(row) => (row.id === selectedEventId ? "fquiz-row-selected" : "")}
rowClassName={(row) => (row.id === activeSelectedEventId ? "fquiz-row-selected" : "")}
onRow={(row) => ({
onClick: () => setSelectedEventId(row.id),
})}
+95
View File
@@ -0,0 +1,95 @@
import type { LineSummary } from "@/types/auth";
export type LinePreparationComponentKey = "lightning_current" | "lightning_density" | "ground_slope";
export type LinePreparationComponent = {
key: LinePreparationComponentKey;
label: string;
ready: boolean;
status: string;
tower_total_count: number;
tower_ready_count: number;
missing_tower_count: number;
line_ready: boolean;
values: Record<string, unknown>;
source: Record<string, unknown>;
};
export type LinePreparationSummary = {
all_ready: boolean;
missing_items: string[];
lightning_current: LinePreparationComponent;
lightning_density: LinePreparationComponent;
ground_slope: LinePreparationComponent;
};
const LABELS: Record<LinePreparationComponentKey, string> = {
lightning_current: "雷电流幅值",
lightning_density: "地闪密度",
ground_slope: "地面倾角",
};
function readObject(value: unknown): Record<string, unknown> {
if (value && typeof value === "object" && !Array.isArray(value)) {
return value as Record<string, unknown>;
}
return {};
}
function readBoolean(value: unknown): boolean {
return value === true;
}
function readNumber(value: unknown): number {
return typeof value === "number" && Number.isFinite(value) ? value : 0;
}
function readStringArray(value: unknown): string[] {
if (!Array.isArray(value)) {
return [];
}
return value.filter((item): item is string => typeof item === "string" && item.trim().length > 0);
}
function emptyComponent(key: LinePreparationComponentKey): LinePreparationComponent {
return {
key,
label: LABELS[key],
ready: false,
status: "missing",
tower_total_count: 0,
tower_ready_count: 0,
missing_tower_count: 0,
line_ready: false,
values: {},
source: {},
};
}
function readComponent(value: unknown, key: LinePreparationComponentKey): LinePreparationComponent {
const record = readObject(value);
const fallback = emptyComponent(key);
return {
key,
label: typeof record.label === "string" ? record.label : fallback.label,
ready: readBoolean(record.ready),
status: typeof record.status === "string" ? record.status : fallback.status,
tower_total_count: readNumber(record.tower_total_count),
tower_ready_count: readNumber(record.tower_ready_count),
missing_tower_count: readNumber(record.missing_tower_count),
line_ready: typeof record.line_ready === "boolean" ? record.line_ready : fallback.line_ready,
values: readObject(record.values),
source: readObject(record.source),
};
}
export function readLinePreparation(line: Pick<LineSummary, "preparation_json"> | null | undefined): LinePreparationSummary {
const record = readObject(line?.preparation_json);
return {
all_ready: readBoolean(record.all_ready),
missing_items: readStringArray(record.missing_items),
lightning_current: readComponent(record.lightning_current, "lightning_current"),
lightning_density: readComponent(record.lightning_density, "lightning_density"),
ground_slope: readComponent(record.ground_slope, "ground_slope"),
};
}
+25
View File
@@ -521,6 +521,7 @@ export type LineSummary = {
phase_sequence_json: Record<string, unknown>;
arrester_install_json: Record<string, unknown>;
lightning_param_json: Record<string, unknown>;
preparation_json: Record<string, unknown>;
tower_count: number;
create_date: string;
create_user: string | null;
@@ -878,6 +879,30 @@ export type LightningCurrentImportResponse = {
warnings: string[];
};
export type LightningCurrentPreparationResponse = {
line: LineSummary;
current_a: number;
current_b: number;
sampled_event_count: number;
updated_tower_count: number;
created_profile_count: number;
warning_count: number;
warnings: string[];
};
export type LightningDensityPreparationResponse = {
line: LineSummary;
updated_tower_count: number;
missing_geo_count: number;
radius_km: number;
data_years: number;
avg_density: number | null;
min_density: number | null;
max_density: number | null;
warning_count: number;
warnings: string[];
};
export type LightningCurrentSampleItem = {
id: number;
event_ref_id: string;