修复worker监控页不显示与自动注册异常

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
2026-05-03 13:16:24 +08:00
parent 7b79e12254
commit d7f05e36bf
4 changed files with 60 additions and 4 deletions
+29 -2
View File
@@ -1,5 +1,7 @@
from __future__ import annotations
import re
from celery import signals
from ..services.worker_registry_service import mark_worker_offline, register_worker
@@ -8,8 +10,33 @@ from ..services.worker_registry_service import mark_worker_offline, register_wor
def _normalize_worker_name(sender: object | None) -> str:
if sender is None:
return ""
text = str(sender).strip()
return text
candidates = (
getattr(sender, "hostname", None),
getattr(getattr(sender, "eventer", None), "hostname", None),
getattr(getattr(sender, "consumer", None), "hostname", None),
sender,
)
for candidate in candidates:
normalized = _clean_worker_name(candidate)
if normalized:
return normalized
return ""
def _clean_worker_name(value: object | None) -> str:
if value is None:
return ""
text = str(value).strip()
if not text:
return ""
if "@" in text and not text.startswith("<"):
return text
if text.startswith("<") and "object at 0x" in text:
return ""
match = re.search(r"([A-Za-z0-9_.-]+@[A-Za-z0-9_.:-]+)", text)
if match:
return match.group(1).strip()
return ""
def _extract_queue_names(sender: object | None) -> list[str]:
@@ -25,6 +25,9 @@ def build_workers_overview(*, force_refresh: bool) -> FlowerWorkersOverviewRespo
refresh = "true" if force_refresh else "false"
workers_map = _call_flower_json(f"/api/workers?refresh={refresh}")
status_map = _call_flower_json("/api/workers?status=true")
if not force_refresh and isinstance(workers_map, dict) and not workers_map and isinstance(status_map, dict) and status_map:
# Flower may return an empty workers cache before a refresh; self-heal once.
workers_map = _call_flower_json("/api/workers?refresh=true")
if not isinstance(workers_map, dict):
workers_map = {}
if not isinstance(status_map, dict):