diff --git a/.env.example b/.env.example index 25b39a2..a49510b 100644 --- a/.env.example +++ b/.env.example @@ -35,6 +35,16 @@ CELERY_TIMEZONE=Asia/Shanghai CELERY_LOG_LEVEL=INFO CELERY_WORKER_CONCURRENCY=2 SCHEDULER_EXPIRE_INTERVAL_SECONDS=60 +SCHEDULER_API_BASE_URL=http://scheduler:19100 +SCHEDULER_API_TOKEN= +SCHEDULER_DEFAULT_QUEUE=default +FLOWER_API_BASE_URL=http://flower:5555 +FLOWER_API_TIMEOUT_SECONDS=10 +FLOWER_BASIC_AUTH=admin:admin +WORKER_REGISTRY_TTL_SECONDS=90 +CELERY_WORKER_QUEUES=default,celery +SCHEDULER_PORT=19100 +FLOWER_PORT=5555 WINE_BINARY_PATH=wine WINE_ALLOWED_ROOT=./data/wine WINE_DEFAULT_TIMEOUT_SECONDS=300 diff --git a/MEMORY.md b/MEMORY.md index 598fe43..bb2ea27 100644 --- a/MEMORY.md +++ b/MEMORY.md @@ -125,6 +125,32 @@ - 运行时真实 Provider Key 不从数据库反解,统一从环境变量 `LLM_PROVIDER_API_KEYS` 注入(支持 `openai=sk-...` 或 JSON 字典字符串)。 - 一期模型调用采用非流式 OpenAI-compatible `POST /chat/completions`,后续如需流式再扩展 SSE/WS。 +## Celery 监控口径(2026-05-01) + +- Celery 监控采用“双页面”: + - `Worker监控`:`/admin/workers`(聚焦 worker 在线状态、并发、队列、单 worker 任务快照) + - `任务监控`:`/admin/task-monitor`(聚焦全局任务状态分布、队列积压、任务明细) +- Worker 监控 API 入口: + - `GET /api/v1/admin/workers/overview` + - `GET /api/v1/admin/workers/tasks?worker=...&recent_limit=...` +- 两页统一复用权限码:`celery.read` / `celery.manage`。 + +## 调度与监控口径(2026-05-01) + +- 调度能力拆分为独立 `scheduler` 服务: + - scheduler 入口:`/api/v1/scheduler/*` + - 任务入队:`POST /api/v1/scheduler/v1/tasks/enqueue` + - 任务撤销:`POST /api/v1/scheduler/v1/tasks/revoke` + - 通过 `x-scheduler-token`(`SCHEDULER_API_TOKEN`)做可选鉴权。 +- 监控能力统一走 Flower 代理: + - 后端代理入口: + - `GET /api/v1/admin/flower/workers` + - `GET /api/v1/admin/flower/worker-tasks` + - 前端 `Worker监控` 与 `任务监控` 页面都通过后端代理读取 Flower 数据,不直接连 Flower。 +- Worker 自动注册机制: + - Celery worker 启停/心跳通过 signals 写入 `worker_registry` 表。 + - Beat 定时任务 `app.tasks.worker_registry_tasks.sweep_worker_registry_offline` 负责离线兜底标记。 + ## 前端 Radix 全量化口径(2026-04-18) - `web/src/app/**` 已完成“去语义类 + 组件全量 Radix 化”:不再依赖 `surface-card` / `btn-*` / `control` / `table-*` / `notice*` / `text-muted` 等自定义语义类。 diff --git a/api/app/api/router.py b/api/app/api/router.py index c9c6965..08b36ec 100644 --- a/api/app/api/router.py +++ b/api/app/api/router.py @@ -5,9 +5,11 @@ from .v1.admin_files import router as admin_files_router from .v1.atp_models import router as atp_models_router from .v1.auth import router as auth_router from .v1.elevation import router as elevation_router +from .v1.flower_monitor import router as flower_monitor_router from .v1.lightning import router as lightning_router from .v1.lines import router as lines_router from .v1.question_bank import router as question_bank_router +from .v1.scheduler import router as scheduler_router from .v1.system_params import router as system_params_router from .v1.task_monitor import router as task_monitor_router from .v1.users import router as users_router @@ -23,12 +25,15 @@ v1_router.include_router(atp_models_router) v1_router.include_router(task_monitor_router) v1_router.include_router(system_params_router) v1_router.include_router(elevation_router) +v1_router.include_router(flower_monitor_router) v1_router.include_router(lightning_router) v1_router.include_router(lines_router) v1_router.include_router(question_bank_router) +v1_router.include_router(scheduler_router) v1_router.include_router(wine_router) v1_router.include_router(ws_router) + @v1_router.get("/ping") def ping() -> dict[str, str]: return {"message": "pong"} diff --git a/api/app/api/v1/elevation.py b/api/app/api/v1/elevation.py index 0f40bf9..a50d50b 100644 --- a/api/app/api/v1/elevation.py +++ b/api/app/api/v1/elevation.py @@ -111,7 +111,13 @@ def get_elevation_job_detail( @router.post("/jobs/apply-line", response_model=ElevationApplyJobCreateResponse) def create_elevation_apply_line_job( payload: ElevationApplyJobCreateRequest, + dispatch_mode: str = Query(default="celery_direct", alias="dispatchMode"), current_user: CurrentUser = Depends(require_permission("elevation.manage")), db: Session = Depends(get_db), ) -> ElevationApplyJobCreateResponse: - return create_apply_job(db, payload, actor=current_user.user) + return create_apply_job( + db, + payload, + actor=current_user.user, + dispatch_mode=dispatch_mode, + ) diff --git a/api/app/api/v1/flower_monitor.py b/api/app/api/v1/flower_monitor.py new file mode 100644 index 0000000..3e952a8 --- /dev/null +++ b/api/app/api/v1/flower_monitor.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from fastapi import APIRouter, Depends, Query + +from ...core.dependencies import CurrentUser, require_any_permission +from ...schemas.flower_monitor import ( + FlowerWorkerTaskOverviewResponse, + FlowerWorkersOverviewResponse, +) +from ...services.flower_monitor_service import ( + build_worker_task_overview, + build_workers_overview, +) + +router = APIRouter(prefix="/admin/flower", tags=["admin-flower"]) + + +@router.get("/workers", response_model=FlowerWorkersOverviewResponse) +def get_flower_workers_overview( + force_refresh: bool = Query(default=False, alias="forceRefresh"), + _: CurrentUser = Depends(require_any_permission("celery.read", "celery.manage")), +) -> FlowerWorkersOverviewResponse: + return build_workers_overview(force_refresh=force_refresh) + + +@router.get("/worker-tasks", response_model=FlowerWorkerTaskOverviewResponse) +def get_flower_worker_tasks( + worker: str = Query(min_length=1, max_length=255), + force_refresh: bool = Query(default=False, alias="forceRefresh"), + recent_limit: int = Query(default=100, alias="recentLimit", ge=1, le=200), + _: CurrentUser = Depends(require_any_permission("celery.read", "celery.manage")), +) -> FlowerWorkerTaskOverviewResponse: + return build_worker_task_overview( + worker=worker, + force_refresh=force_refresh, + recent_limit=recent_limit, + ) diff --git a/api/app/api/v1/scheduler.py b/api/app/api/v1/scheduler.py new file mode 100644 index 0000000..2655b42 --- /dev/null +++ b/api/app/api/v1/scheduler.py @@ -0,0 +1,51 @@ +from __future__ import annotations + +from fastapi import APIRouter, Depends, Header, HTTPException, status + +from ...core.config import get_settings +from ...schemas.scheduler import ( + SchedulerEnqueueTaskRequest, + SchedulerEnqueueTaskResponse, + SchedulerRevokeTaskRequest, + SchedulerRevokeTaskResponse, +) +from ...services.scheduler_service import enqueue_task, revoke_task + +router = APIRouter(prefix="/scheduler", tags=["scheduler"]) + + +def _require_scheduler_token( + x_scheduler_token: str | None = Header(default=None, alias="x-scheduler-token"), +) -> None: + expected = get_settings().resolved_scheduler_api_token + if not expected: + return + provided = (x_scheduler_token or "").strip() + if provided == expected: + return + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized scheduler request") + + +@router.get("/healthz") +def scheduler_healthz() -> dict[str, object]: + return { + "success": True, + "status": "ok", + "service": "fquiz-scheduler", + } + + +@router.post("/v1/tasks/enqueue", response_model=SchedulerEnqueueTaskResponse) +def enqueue_scheduler_task( + payload: SchedulerEnqueueTaskRequest, + _: None = Depends(_require_scheduler_token), +) -> SchedulerEnqueueTaskResponse: + return enqueue_task(payload) + + +@router.post("/v1/tasks/revoke", response_model=SchedulerRevokeTaskResponse) +def revoke_scheduler_task( + payload: SchedulerRevokeTaskRequest, + _: None = Depends(_require_scheduler_token), +) -> SchedulerRevokeTaskResponse: + return revoke_task(payload) diff --git a/api/app/core/celery_app.py b/api/app/core/celery_app.py index 9754156..730b4d1 100644 --- a/api/app/core/celery_app.py +++ b/api/app/core/celery_app.py @@ -10,7 +10,11 @@ celery_app = Celery( "fquiz", broker=settings.resolved_celery_broker_url, backend=settings.resolved_celery_result_backend, - include=["app.tasks.schedule_tasks", "app.tasks.elevation_tasks"], + include=[ + "app.tasks.schedule_tasks", + "app.tasks.elevation_tasks", + "app.tasks.worker_registry_tasks", + ], ) celery_app.conf.update( @@ -20,6 +24,10 @@ celery_app.conf.update( "task": "app.tasks.schedule_tasks.expire_overdue_schedule_items", "schedule": settings.scheduler_expire_interval_seconds, }, + "sweep-worker-registry-offline": { + "task": "app.tasks.worker_registry_tasks.sweep_worker_registry_offline", + "schedule": 30.0, + }, }, enable_utc=True, result_serializer="json", @@ -29,3 +37,6 @@ celery_app.conf.update( timezone=settings.celery_timezone, worker_prefetch_multiplier=1, ) + +# Register worker lifecycle signals for auto-registration/heartbeat/offline states. +from . import worker_signals as _worker_signals # noqa: F401,E402 diff --git a/api/app/core/config.py b/api/app/core/config.py index 91fa77b..a13146e 100644 --- a/api/app/core/config.py +++ b/api/app/core/config.py @@ -52,6 +52,13 @@ class Settings(BaseSettings): celery_result_backend: str | None = None celery_timezone: str = "Asia/Shanghai" scheduler_expire_interval_seconds: int = 60 + scheduler_api_token: str = "" + scheduler_default_queue: str = "default" + scheduler_api_base_url: str = "http://scheduler:19100" + flower_api_base_url: str = "http://flower:5555" + flower_api_timeout_seconds: int = 10 + flower_basic_auth: str = "" + worker_registry_ttl_seconds: int = 90 wine_binary_path: str = "wine" wine_allowed_root: str = "./data/wine" @@ -83,6 +90,8 @@ class Settings(BaseSettings): "chat_context_message_limit", "db_port", "scheduler_expire_interval_seconds", + "flower_api_timeout_seconds", + "worker_registry_ttl_seconds", "wine_default_timeout_seconds", "wine_max_timeout_seconds", "atp_engine_default_timeout_seconds", @@ -192,6 +201,33 @@ class Settings(BaseSettings): def resolved_celery_result_backend(self) -> str: return (self.celery_result_backend or "redis://redis:6379/1").strip() + @property + def resolved_scheduler_api_token(self) -> str: + return (self.scheduler_api_token or "").strip() + + @property + def resolved_scheduler_default_queue(self) -> str: + queue = (self.scheduler_default_queue or "").strip() + return queue or "default" + + @property + def resolved_scheduler_api_base_url(self) -> str: + value = (self.scheduler_api_base_url or "").strip().rstrip("/") + if not value: + return "http://scheduler:19100" + return value + + @property + def resolved_flower_api_base_url(self) -> str: + value = (self.flower_api_base_url or "").strip().rstrip("/") + if not value: + return "http://flower:5555" + return value + + @property + def resolved_flower_basic_auth(self) -> str: + return (self.flower_basic_auth or "").strip() + @lru_cache def get_settings() -> Settings: diff --git a/api/app/core/database.py b/api/app/core/database.py index 2b84725..4af67ba 100644 --- a/api/app/core/database.py +++ b/api/app/core/database.py @@ -215,6 +215,7 @@ def init_db() -> None: system_param, todo, user, + worker_registry, ) # noqa: F401 from ..services.seed_service import seed_defaults diff --git a/api/app/core/worker_signals.py b/api/app/core/worker_signals.py new file mode 100644 index 0000000..1e42636 --- /dev/null +++ b/api/app/core/worker_signals.py @@ -0,0 +1,70 @@ +from __future__ import annotations + +from celery import signals + +from ..services.worker_registry_service import mark_worker_offline, register_worker + + +def _normalize_worker_name(sender: object | None) -> str: + if sender is None: + return "" + text = str(sender).strip() + return text + + +def _extract_queue_names(sender: object | None) -> list[str]: + consumer = getattr(sender, "consumer", None) + if consumer is None: + return [] + queues = getattr(consumer, "queues", None) + if queues is None: + return [] + names: list[str] = [] + try: + iterator = list(queues) + except TypeError: + iterator = [] + for item in iterator: + name = getattr(item, "name", None) + if name is None: + continue + value = str(name).strip() + if value: + names.append(value) + return names + + +@signals.worker_ready.connect +def on_worker_ready(sender=None, **kwargs): # type: ignore[no-untyped-def] + worker_name = _normalize_worker_name(sender) + if not worker_name: + return + queues = _extract_queue_names(sender) + register_worker( + worker_name=worker_name, + status="online", + queues=queues, + heartbeat_increment=False, + metadata={"event": "worker_ready"}, + ) + + +@signals.heartbeat_sent.connect +def on_worker_heartbeat(sender=None, **kwargs): # type: ignore[no-untyped-def] + worker_name = _normalize_worker_name(sender) + if not worker_name: + return + register_worker( + worker_name=worker_name, + status="online", + heartbeat_increment=True, + metadata={"event": "heartbeat_sent"}, + ) + + +@signals.worker_shutdown.connect +def on_worker_shutdown(sender=None, **kwargs): # type: ignore[no-untyped-def] + worker_name = _normalize_worker_name(sender) + if not worker_name: + return + mark_worker_offline(worker_name) diff --git a/api/app/models/__init__.py b/api/app/models/__init__.py index de396c2..7f7ead1 100644 --- a/api/app/models/__init__.py +++ b/api/app/models/__init__.py @@ -4,7 +4,7 @@ Import all model modules during package initialization so SQLAlchemy can resolve string-based relationships regardless of route/service import order. """ -from . import atp_model, audit_log, auth_session, calendar_event, elevation, file_storage, hot_search, lightning_event, lightning_sample, line, line_tower, menu, model_registry, object_group, question_bank, rbac, requirement, system_param, todo, user +from . import atp_model, audit_log, auth_session, calendar_event, elevation, file_storage, hot_search, lightning_event, lightning_sample, line, line_tower, menu, model_registry, object_group, question_bank, rbac, requirement, system_param, todo, user, worker_registry __all__ = [ "atp_model", @@ -27,4 +27,5 @@ __all__ = [ "system_param", "todo", "user", + "worker_registry", ] diff --git a/api/app/models/worker_registry.py b/api/app/models/worker_registry.py new file mode 100644 index 0000000..d59ee6b --- /dev/null +++ b/api/app/models/worker_registry.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from datetime import datetime +from uuid import uuid4 + +from sqlalchemy import DateTime, Index, Integer, String +from sqlalchemy.orm import Mapped, mapped_column + +from ..core.database import Base +from .base import utcnow + + +class WorkerRegistry(Base): + __tablename__ = "worker_registry" + __table_args__ = ( + Index("idx_worker_registry_worker", "worker_name"), + Index("idx_worker_registry_status", "status"), + Index("idx_worker_registry_last_seen", "last_seen_at"), + ) + + id: Mapped[str] = mapped_column( + String(32), + primary_key=True, + default=lambda: uuid4().hex, + ) + worker_name: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) + status: Mapped[str] = mapped_column(String(32), default="online", index=True) + queues_csv: Mapped[str | None] = mapped_column(String(2000)) + pid: Mapped[int | None] = mapped_column(Integer) + heartbeat_count: Mapped[int] = mapped_column(Integer, default=0) + first_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow) + last_seen_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) + metadata_json: Mapped[str | None] = mapped_column(String(4000)) + create_date: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True) + create_user: Mapped[str | None] = mapped_column(String(64), index=True) + update_date: Mapped[datetime] = mapped_column( + DateTime(timezone=True), + default=utcnow, + onupdate=utcnow, + ) + update_user: Mapped[str | None] = mapped_column(String(64), index=True) diff --git a/api/app/scheduler_main.py b/api/app/scheduler_main.py new file mode 100644 index 0000000..5a8e1f7 --- /dev/null +++ b/api/app/scheduler_main.py @@ -0,0 +1,10 @@ +from fastapi import FastAPI + +from .api.v1.scheduler import router as scheduler_router + +app = FastAPI( + title="fquiz-scheduler", + version="0.1.0", +) + +app.include_router(scheduler_router, prefix="/api/v1") diff --git a/api/app/schemas/flower_monitor.py b/api/app/schemas/flower_monitor.py new file mode 100644 index 0000000..0eae387 --- /dev/null +++ b/api/app/schemas/flower_monitor.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, Field + + +class FlowerWorkerItem(BaseModel): + worker: str + status: str + queue_names: list[str] = Field(default_factory=list) + registered_count: int = 0 + processed_count: int = 0 + concurrency: int = 0 + prefetch_count: int = 0 + active_count: int = 0 + reserved_count: int = 0 + scheduled_count: int = 0 + last_heartbeat_at: datetime | None = None + + +class FlowerWorkersSummary(BaseModel): + total: int = 0 + online: int = 0 + offline: int = 0 + + +class FlowerWorkersOverviewResponse(BaseModel): + generated_at: datetime + workers: list[FlowerWorkerItem] = Field(default_factory=list) + summary: FlowerWorkersSummary = Field(default_factory=FlowerWorkersSummary) + + +class FlowerTaskItem(BaseModel): + task_id: str + name: str + state: str + source: str + worker: str + queue_name: str | None = None + args_text: str | None = None + kwargs_text: str | None = None + eta: datetime | None = None + received_at: datetime | None = None + started_at: datetime | None = None + finished_at: datetime | None = None + runtime_seconds: float | None = None + result_text: str | None = None + exception_text: str | None = None + + +class FlowerWorkerTaskSummary(BaseModel): + active: int = 0 + reserved: int = 0 + scheduled: int = 0 + recent: int = 0 + + +class FlowerWorkerTaskOverviewResponse(BaseModel): + generated_at: datetime + worker: str + active_tasks: list[FlowerTaskItem] = Field(default_factory=list) + reserved_tasks: list[FlowerTaskItem] = Field(default_factory=list) + scheduled_tasks: list[FlowerTaskItem] = Field(default_factory=list) + recent_tasks: list[FlowerTaskItem] = Field(default_factory=list) + summary: FlowerWorkerTaskSummary = Field(default_factory=FlowerWorkerTaskSummary) diff --git a/api/app/schemas/scheduler.py b/api/app/schemas/scheduler.py new file mode 100644 index 0000000..8ff65ba --- /dev/null +++ b/api/app/schemas/scheduler.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +from typing import Any + +from pydantic import BaseModel, Field + + +class SchedulerEnqueueTaskRequest(BaseModel): + task_name: str = Field(min_length=1, max_length=255, alias="taskName") + queue_name: str | None = Field(default=None, max_length=255, alias="queueName") + task_id: str | None = Field(default=None, max_length=255, alias="taskId") + args: list[Any] = Field(default_factory=list) + kwargs: dict[str, Any] = Field(default_factory=dict) + + model_config = {"populate_by_name": True} + + +class SchedulerEnqueueTaskResponse(BaseModel): + queued: bool = True + task_id: str = Field(alias="taskId") + queue_name: str = Field(alias="queueName") + task_name: str = Field(alias="taskName") + + model_config = {"populate_by_name": True} + + +class SchedulerRevokeTaskRequest(BaseModel): + task_id: str = Field(min_length=1, max_length=255, alias="taskId") + terminate: bool = True + signal: str = Field(default="SIGTERM", max_length=32) + + model_config = {"populate_by_name": True} + + +class SchedulerRevokeTaskResponse(BaseModel): + revoked: bool = True + task_id: str = Field(alias="taskId") + terminate: bool + signal: str + + model_config = {"populate_by_name": True} diff --git a/api/app/services/admin_service.py b/api/app/services/admin_service.py index 03587e8..110ccf8 100644 --- a/api/app/services/admin_service.py +++ b/api/app/services/admin_service.py @@ -403,7 +403,7 @@ def update_menu(db: Session, menu_id: int, payload: MenuUpdateRequest) -> MenuPu def delete_menu(db: Session, menu_id: int) -> bool: menu = get_menu_by_id(db, menu_id) - if not menu or menu.code in {"dashboard", "admin.users", "admin.roles", "admin.menus", "admin.system_params", "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", "admin.task_monitor", "admin.atp_models", "admin.files", "admin.elevation", "admin.syslog", "admin.wine_runner"}: + if not menu or menu.code in {"dashboard", "admin.users", "admin.roles", "admin.menus", "admin.system_params", "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", "admin.workers", "admin.task_monitor", "admin.atp_models", "admin.files", "admin.elevation", "admin.syslog", "admin.wine_runner"}: return False child_exists = db.scalar(select(Menu.id).where(Menu.parent_id == menu_id)) if child_exists is not None: diff --git a/api/app/services/elevation_service.py b/api/app/services/elevation_service.py index 8e4d831..dee4a61 100644 --- a/api/app/services/elevation_service.py +++ b/api/app/services/elevation_service.py @@ -335,6 +335,7 @@ def create_apply_job( payload: ElevationApplyJobCreateRequest, *, actor: User, + dispatch_mode: str = "celery_direct", ) -> ElevationApplyJobCreateResponse: line = db.execute(select(Line).where(Line.id == payload.line_id)).scalar_one_or_none() if not line: @@ -379,9 +380,10 @@ def create_apply_job( if not saved: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="创建任务失败") - from ..tasks.elevation_tasks import apply_elevation_for_line_job - - task = apply_elevation_for_line_job.delay(saved.id) + task = _dispatch_elevation_apply_task( + job_id=saved.id, + dispatch_mode=dispatch_mode, + ) saved.task_id = task.id saved.update_user = actor.id saved.update_date = utcnow() @@ -398,6 +400,60 @@ def create_apply_job( return ElevationApplyJobCreateResponse(job=serialize_job(latest), queued=True) +def _dispatch_elevation_apply_task(*, job_id: str, dispatch_mode: str): + normalized_mode = (dispatch_mode or "").strip().lower() + if normalized_mode == "scheduler_api": + return _enqueue_via_scheduler_api(job_id) + + from ..tasks.elevation_tasks import apply_elevation_for_line_job + + return apply_elevation_for_line_job.delay(job_id) + + +def _enqueue_via_scheduler_api(job_id: str): + from ..core.config import get_settings + + import httpx + + settings = get_settings() + scheduler_base_url = settings.resolved_scheduler_api_base_url + path = "/api/v1/v1/tasks/enqueue" + payload = { + "taskName": "app.tasks.elevation_tasks.apply_elevation_for_line_job", + "taskId": job_id, + "queueName": settings.resolved_scheduler_default_queue, + "args": [job_id], + "kwargs": {}, + } + headers = {"Content-Type": "application/json"} + token = settings.resolved_scheduler_api_token + if token: + headers["x-scheduler-token"] = token + try: + with httpx.Client(timeout=15) as client: + response = client.post(f"{scheduler_base_url}{path}", json=payload, headers=headers) + if response.status_code >= 400: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"scheduler enqueue failed: {response.status_code} {response.text}", + ) + data = response.json() + task_id = str(data.get("taskId") or data.get("task_id") or job_id).strip() or job_id + except HTTPException: + raise + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_502_BAD_GATEWAY, + detail=f"scheduler request failed: {exc}", + ) from exc + + class _TaskRef: + def __init__(self, value: str) -> None: + self.id = value + + return _TaskRef(task_id) + + def execute_apply_job(job_id: str) -> None: db = SessionLocal() try: diff --git a/api/app/services/flower_monitor_service.py b/api/app/services/flower_monitor_service.py new file mode 100644 index 0000000..9f4bb6b --- /dev/null +++ b/api/app/services/flower_monitor_service.py @@ -0,0 +1,334 @@ +from __future__ import annotations + +import base64 +import json +from datetime import datetime, timezone +from typing import Any +from urllib.parse import quote_plus + +import httpx +from fastapi import HTTPException, status + +from ..core.config import get_settings +from ..models.base import utcnow +from ..schemas.flower_monitor import ( + FlowerTaskItem, + FlowerWorkerItem, + FlowerWorkerTaskOverviewResponse, + FlowerWorkerTaskSummary, + FlowerWorkersOverviewResponse, + FlowerWorkersSummary, +) + + +def build_workers_overview(*, force_refresh: bool) -> FlowerWorkersOverviewResponse: + refresh = "true" if force_refresh else "false" + workers_map = _call_flower_json(f"/api/workers?refresh={refresh}") + status_map = _call_flower_json("/api/workers?status=true") + if not isinstance(workers_map, dict): + workers_map = {} + if not isinstance(status_map, dict): + status_map = {} + + worker_names = sorted(set(workers_map.keys()) | set(status_map.keys())) + workers: list[FlowerWorkerItem] = [] + for worker_name in worker_names: + worker_raw = _as_record(workers_map.get(worker_name)) + is_online = bool(status_map.get(worker_name)) + workers.append(_normalize_worker(worker_name, worker_raw, is_online)) + + workers.sort(key=lambda item: (0 if item.status == "ONLINE" else 1, item.worker)) + summary = FlowerWorkersSummary( + total=len(workers), + online=sum(1 for item in workers if item.status == "ONLINE"), + offline=sum(1 for item in workers if item.status != "ONLINE"), + ) + return FlowerWorkersOverviewResponse( + generated_at=utcnow(), + workers=workers, + summary=summary, + ) + + +def build_worker_task_overview( + *, + worker: str, + force_refresh: bool, + recent_limit: int, +) -> FlowerWorkerTaskOverviewResponse: + normalized_worker = worker.strip() + if not normalized_worker: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="worker is required") + + refresh = "true" if force_refresh else "false" + safe_recent_limit = max(1, min(200, int(recent_limit))) + workers_map = _call_flower_json( + f"/api/workers?refresh={refresh}&workername={quote_plus(normalized_worker)}" + ) + tasks_map = _call_flower_json( + f"/api/tasks?limit={safe_recent_limit}&workername={quote_plus(normalized_worker)}" + ) + worker_raw = _pick_worker_raw(_as_record(workers_map), normalized_worker) + active_tasks = _build_snapshot_tasks(normalized_worker, worker_raw, "ACTIVE") + reserved_tasks = _build_snapshot_tasks(normalized_worker, worker_raw, "RESERVED") + scheduled_tasks = _build_snapshot_tasks(normalized_worker, worker_raw, "SCHEDULED") + recent_tasks = _build_recent_tasks(normalized_worker, _as_record(tasks_map)) + summary = FlowerWorkerTaskSummary( + active=len(active_tasks), + reserved=len(reserved_tasks), + scheduled=len(scheduled_tasks), + recent=len(recent_tasks), + ) + return FlowerWorkerTaskOverviewResponse( + generated_at=utcnow(), + worker=normalized_worker, + active_tasks=active_tasks, + reserved_tasks=reserved_tasks, + scheduled_tasks=scheduled_tasks, + recent_tasks=recent_tasks, + summary=summary, + ) + + +def _call_flower_json(path: str) -> Any: + settings = get_settings() + url = f"{settings.resolved_flower_api_base_url}{path}" + headers = {"Accept": "application/json"} + basic_auth = settings.resolved_flower_basic_auth + if basic_auth: + token = base64.b64encode(basic_auth.encode("utf-8")).decode("ascii") + headers["Authorization"] = f"Basic {token}" + timeout = max(3, int(settings.flower_api_timeout_seconds)) + try: + with httpx.Client(timeout=timeout) as client: + response = client.get(url, headers=headers) + except httpx.TimeoutException as exc: + raise HTTPException(status_code=status.HTTP_504_GATEWAY_TIMEOUT, detail=f"flower request timeout: {path}") from exc + except httpx.HTTPError as exc: + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=f"flower request failed: {path}: {exc}") from exc + if response.status_code >= 400: + detail = (response.text or "").strip() or response.reason_phrase or "flower error" + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=f"flower error {response.status_code}: {detail}") + try: + return response.json() + except ValueError: + raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=f"flower returned non-json payload: {path}") + + +def _normalize_worker(worker_name: str, worker_raw: dict[str, Any], is_online: bool) -> FlowerWorkerItem: + stats = _as_record(worker_raw.get("stats")) + return FlowerWorkerItem( + worker=worker_name, + status="ONLINE" if is_online else "OFFLINE", + queue_names=_parse_active_queue_names(worker_raw.get("active_queues")), + registered_count=len(_as_array(worker_raw.get("registered"))), + processed_count=_parse_processed_count(stats.get("total")), + concurrency=_safe_int(_as_record(stats.get("pool")).get("max-concurrency") or _as_record(stats.get("pool")).get("max_concurrency")), + prefetch_count=_safe_int(stats.get("prefetch_count")), + active_count=len(_as_array(worker_raw.get("active"))), + reserved_count=len(_as_array(worker_raw.get("reserved"))), + scheduled_count=len(_as_array(worker_raw.get("scheduled"))), + last_heartbeat_at=_parse_datetime(worker_raw.get("timestamp")), + ) + + +def _pick_worker_raw(workers_map: dict[str, Any], worker_name: str) -> dict[str, Any]: + if worker_name in workers_map: + return _as_record(workers_map.get(worker_name)) + entries = list(workers_map.items()) + if len(entries) == 1: + return _as_record(entries[0][1]) + normalized = worker_name.strip().lower() + for name, value in entries: + if str(name).strip().lower() == normalized: + return _as_record(value) + return {} + + +def _build_snapshot_tasks(worker_name: str, worker_raw: dict[str, Any], source: str) -> list[FlowerTaskItem]: + key = source.lower() + snapshot = _as_array(worker_raw.get(key)) + items: list[FlowerTaskItem] = [] + for index, raw in enumerate(snapshot, start=1): + items.append( + _normalize_task_item( + raw, + source=source, + fallback_task_id=f"{worker_name}:{source}:{index}", + default_worker=worker_name, + ) + ) + return items + + +def _build_recent_tasks(worker_name: str, tasks_map: dict[str, Any]) -> list[FlowerTaskItem]: + items: list[FlowerTaskItem] = [] + for task_id, raw in tasks_map.items(): + item = _normalize_task_item(raw, source="RECENT", fallback_task_id=str(task_id), default_worker=worker_name) + if item.worker.strip().lower() != worker_name.strip().lower(): + continue + items.append(item) + items.sort( + key=lambda item: _sortable_timestamp( + item.received_at or item.started_at or item.finished_at or item.eta + ), + reverse=True, + ) + return items + + +def _normalize_task_item( + raw_task: Any, + *, + source: str, + fallback_task_id: str, + default_worker: str, +) -> FlowerTaskItem: + raw = _as_record(raw_task) + request = _as_record(raw.get("request")) + task_data = dict(raw) + if request: + task_data.update(request) + task_id = _safe_text(task_data.get("uuid") or task_data.get("id") or fallback_task_id) or fallback_task_id + task_name = _safe_text(task_data.get("name") or task_data.get("type")) or "-" + worker_name = _safe_text(task_data.get("worker") or task_data.get("hostname")) or default_worker + state_raw = _safe_text(task_data.get("state")) + state = state_raw.upper() if state_raw else ("UNKNOWN" if source == "RECENT" else source) + queue_name = _extract_queue_name(task_data) + return FlowerTaskItem( + task_id=task_id, + name=task_name, + state=state, + source=source, + worker=worker_name, + queue_name=queue_name, + args_text=_stringify_payload(task_data.get("args")), + kwargs_text=_stringify_payload(task_data.get("kwargs")), + eta=_parse_datetime(task_data.get("eta")), + received_at=_parse_datetime(task_data.get("received") or task_data.get("timestamp")), + started_at=_parse_datetime(task_data.get("started") or task_data.get("time_start")), + finished_at=_parse_datetime(task_data.get("succeeded") or task_data.get("failed") or task_data.get("revoked")), + runtime_seconds=_safe_float(task_data.get("runtime")), + result_text=_stringify_payload(task_data.get("result")), + exception_text=_stringify_payload(task_data.get("exception")), + ) + + +def _parse_active_queue_names(raw: Any) -> list[str]: + values = [] + for item in _as_array(raw): + name = _safe_text(_as_record(item).get("name")) + if name: + values.append(name) + return sorted(set(values)) + + +def _parse_processed_count(raw_total: Any) -> int: + total = _as_record(raw_total) + count = 0 + for value in total.values(): + count += _safe_int(value) + return count + + +def _extract_queue_name(task_data: dict[str, Any]) -> str | None: + delivery = _as_record(task_data.get("delivery_info")) + routing_key = _safe_text(delivery.get("routing_key")) + if routing_key: + return routing_key + exchange = _safe_text(task_data.get("exchange") or delivery.get("exchange")) + return exchange + + +def _parse_datetime(value: Any) -> datetime | None: + if value is None: + return None + if isinstance(value, datetime): + if value.tzinfo is None: + return value.replace(tzinfo=timezone.utc) + return value.astimezone(timezone.utc) + if isinstance(value, (int, float)): + numeric = float(value) + if numeric <= 0: + return None + if numeric < 1e11: + numeric *= 1000 + try: + return datetime.fromtimestamp(numeric / 1000, timezone.utc) + except (OverflowError, OSError, ValueError): + return None + text = str(value).strip() + if not text: + return None + if text.replace(".", "", 1).isdigit(): + try: + return _parse_datetime(float(text)) + except ValueError: + return None + text = text.replace("Z", "+00:00") + try: + parsed = datetime.fromisoformat(text) + except ValueError: + return None + if parsed.tzinfo is None: + return parsed.replace(tzinfo=timezone.utc) + return parsed.astimezone(timezone.utc) + + +def _stringify_payload(value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + text = value.strip() + else: + try: + text = json.dumps(value, ensure_ascii=False) + except TypeError: + text = str(value) + text = text.strip() + if not text: + return None + if len(text) > 1000: + return text[:997] + "..." + return text + + +def _safe_text(value: Any) -> str: + return str(value or "").strip() + + +def _safe_int(value: Any) -> int: + try: + return int(value) + except (TypeError, ValueError): + return 0 + + +def _safe_float(value: Any) -> float | None: + try: + parsed = float(value) + except (TypeError, ValueError): + return None + if parsed < 0: + return None + return parsed + + +def _as_record(value: Any) -> dict[str, Any]: + if isinstance(value, dict): + return dict(value) + return {} + + +def _as_array(value: Any) -> list[Any]: + if isinstance(value, list): + return list(value) + return [] + + +def _sortable_timestamp(value: datetime | None) -> float: + if value is None: + return 0.0 + if value.tzinfo is None: + value = value.replace(tzinfo=timezone.utc) + return value.timestamp() diff --git a/api/app/services/legacy_admin_rbac_service.py b/api/app/services/legacy_admin_rbac_service.py index a6a9a84..341082b 100644 --- a/api/app/services/legacy_admin_rbac_service.py +++ b/api/app/services/legacy_admin_rbac_service.py @@ -75,6 +75,7 @@ PROTECTED_MENU_CODES = { "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", + "admin.workers", "admin.task_monitor", "admin.atp_models", "admin.data_query", diff --git a/api/app/services/legacy_authz_service.py b/api/app/services/legacy_authz_service.py index ac915f4..09db2f5 100644 --- a/api/app/services/legacy_authz_service.py +++ b/api/app/services/legacy_authz_service.py @@ -91,6 +91,7 @@ MENU_CODE_PERMISSION_MAP: dict[str, set[str]] = { "admin.system_params": {"system_param.read", "system_param.manage"}, "admin.files": {"file.read", "file.manage"}, "admin.elevation": {"elevation.read", "elevation.manage"}, + "admin.workers": {"celery.read", "celery.manage"}, "admin.task_monitor": {"celery.read", "celery.manage"}, "admin.atp_models": {"atp.read", "atp.manage", "atp.run"}, "admin.lightning_currents": {"lightning.read", "lightning.manage"}, @@ -100,6 +101,17 @@ MENU_CODE_PERMISSION_MAP: dict[str, set[str]] = { } SYNTHETIC_LEGACY_MENU_ROWS: list[dict[str, Any]] = [ + { + "menu_id": "admin.workers", + "menu_name": "admin.workers", + "menu_label": "Worker监控", + "menu_type": "MENU", + "parent_id": None, + "url": "/admin/workers", + "menu_icon": "DeploymentUnitOutlined", + "seq": 53, + "state": "ENABLED", + }, { "menu_id": "admin.files", "menu_name": "admin.files", @@ -108,7 +120,7 @@ SYNTHETIC_LEGACY_MENU_ROWS: list[dict[str, Any]] = [ "parent_id": None, "url": "/admin/files", "menu_icon": "FolderTree", - "seq": 54, + "seq": 56, "state": "ENABLED", }, { @@ -119,7 +131,7 @@ SYNTHETIC_LEGACY_MENU_ROWS: list[dict[str, Any]] = [ "parent_id": None, "url": "/admin/elevation", "menu_icon": "Database", - "seq": 56, + "seq": 57, "state": "ENABLED", }, { @@ -130,7 +142,7 @@ SYNTHETIC_LEGACY_MENU_ROWS: list[dict[str, Any]] = [ "parent_id": None, "url": "/admin/task-monitor", "menu_icon": "RadarChart", - "seq": 53, + "seq": 54, "state": "ENABLED", }, { @@ -141,7 +153,7 @@ SYNTHETIC_LEGACY_MENU_ROWS: list[dict[str, Any]] = [ "parent_id": None, "url": "/admin/power-lines/atp-viewer", "menu_icon": "Experiment", - "seq": 54, + "seq": 55, "state": "ENABLED", }, { diff --git a/api/app/services/scheduler_service.py b/api/app/services/scheduler_service.py new file mode 100644 index 0000000..1ea4b27 --- /dev/null +++ b/api/app/services/scheduler_service.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from fastapi import HTTPException, status + +from ..core.celery_app import celery_app +from ..core.config import get_settings +from ..schemas.scheduler import ( + SchedulerEnqueueTaskRequest, + SchedulerEnqueueTaskResponse, + SchedulerRevokeTaskRequest, + SchedulerRevokeTaskResponse, +) + + +def enqueue_task(payload: SchedulerEnqueueTaskRequest) -> SchedulerEnqueueTaskResponse: + settings = get_settings() + task_name = payload.task_name.strip() + if not task_name: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="taskName is required") + + queue_name = (payload.queue_name or "").strip() or settings.resolved_scheduler_default_queue + task_id = (payload.task_id or "").strip() or None + + try: + result = celery_app.send_task( + task_name, + args=list(payload.args), + kwargs=dict(payload.kwargs), + task_id=task_id, + queue=queue_name, + ) + except Exception as exc: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"scheduler enqueue failed: {exc}") from exc + + return SchedulerEnqueueTaskResponse( + queued=True, + taskId=result.id, + queueName=queue_name, + taskName=task_name, + ) + + +def revoke_task(payload: SchedulerRevokeTaskRequest) -> SchedulerRevokeTaskResponse: + task_id = payload.task_id.strip() + if not task_id: + raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="taskId is required") + + signal = (payload.signal or "").strip() or "SIGTERM" + terminate = bool(payload.terminate) + + try: + celery_app.control.revoke(task_id, terminate=terminate, signal=signal) + except Exception as exc: + raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"scheduler revoke failed: {exc}") from exc + + return SchedulerRevokeTaskResponse( + revoked=True, + taskId=task_id, + terminate=terminate, + signal=signal, + ) diff --git a/api/app/services/seed_service.py b/api/app/services/seed_service.py index 0fc0675..7eeebfe 100644 --- a/api/app/services/seed_service.py +++ b/api/app/services/seed_service.py @@ -186,6 +186,19 @@ DEFAULT_MENUS: list[dict[str, object]] = [ "cacheable": False, "permission_code": "lightning.read", }, + { + "code": "admin.workers", + "name": "Worker监控", + "path": "/admin/workers", + "icon": "DeploymentUnitOutlined", + "parent_code": None, + "type": "menu", + "sort_order": 53, + "status": "enabled", + "visible": True, + "cacheable": False, + "permission_code": "celery.read", + }, { "code": "admin.task_monitor", "name": "任务监控", @@ -193,7 +206,7 @@ DEFAULT_MENUS: list[dict[str, object]] = [ "icon": "RadarChart", "parent_code": None, "type": "menu", - "sort_order": 53, + "sort_order": 54, "status": "enabled", "visible": True, "cacheable": False, @@ -206,7 +219,7 @@ DEFAULT_MENUS: list[dict[str, object]] = [ "icon": "Experiment", "parent_code": None, "type": "menu", - "sort_order": 54, + "sort_order": 55, "status": "enabled", "visible": True, "cacheable": False, @@ -219,7 +232,7 @@ DEFAULT_MENUS: list[dict[str, object]] = [ "icon": "FolderTree", "parent_code": None, "type": "menu", - "sort_order": 55, + "sort_order": 56, "status": "enabled", "visible": True, "cacheable": False, @@ -232,7 +245,7 @@ DEFAULT_MENUS: list[dict[str, object]] = [ "icon": "Database", "parent_code": None, "type": "menu", - "sort_order": 56, + "sort_order": 57, "status": "enabled", "visible": True, "cacheable": False, @@ -245,7 +258,7 @@ DEFAULT_MENUS: list[dict[str, object]] = [ "icon": "FileText", "parent_code": None, "type": "menu", - "sort_order": 57, + "sort_order": 58, "status": "enabled", "visible": True, "cacheable": False, @@ -267,7 +280,7 @@ DEFAULT_MENUS: list[dict[str, object]] = [ ] ROLE_MENU_BINDINGS: dict[str, list[str]] = { - "admin": ["dashboard", "admin.users", "admin.roles", "admin.menus", "admin.system_params", "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", "admin.task_monitor", "admin.atp_models", "admin.files", "admin.elevation", "admin.syslog", "admin.wine_runner"], + "admin": ["dashboard", "admin.users", "admin.roles", "admin.menus", "admin.system_params", "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", "admin.workers", "admin.task_monitor", "admin.atp_models", "admin.files", "admin.elevation", "admin.syslog", "admin.wine_runner"], "user": ["dashboard"], } diff --git a/api/app/services/task_monitor_service.py b/api/app/services/task_monitor_service.py index 31976b6..dd99365 100644 --- a/api/app/services/task_monitor_service.py +++ b/api/app/services/task_monitor_service.py @@ -1,25 +1,16 @@ from __future__ import annotations -import json from collections.abc import Iterable from datetime import datetime, timezone -from urllib.parse import urlsplit, urlunsplit +from typing import Any -try: - import redis -except Exception: # pragma: no cover - optional dependency in non-runtime environments - redis = None - -from ..core.celery_app import celery_app -from ..core.config import get_settings -from ..models.base import utcnow from ..schemas.task_monitor import ( TaskMonitorBucketItem, TaskMonitorOverviewResponse, TaskMonitorQueueItem, TaskMonitorTaskItem, - TaskMonitorWorkerItem, ) +from .flower_monitor_service import build_worker_task_overview, build_workers_overview STATE_LABELS = { "PENDING": "待执行", @@ -45,193 +36,103 @@ STATE_PRIORITY = { def build_task_monitor_overview(*, task_limit: int, history_limit: int) -> TaskMonitorOverviewResponse: - settings = get_settings() - now = utcnow() - overview = TaskMonitorOverviewResponse( - generated_at=now, - broker_url=_mask_url(settings.resolved_celery_broker_url), - result_backend=_mask_url(settings.resolved_celery_result_backend), - ) + workers_overview = build_workers_overview(force_refresh=False) + task_items: list[TaskMonitorTaskItem] = [] + queue_runtime_counts: dict[str, dict[str, int]] = {} + queue_consumer_counts: dict[str, int] = {} - inspector = celery_app.control.inspect(timeout=1.0) - stats = _safe_inspect_call(inspector.stats) - active = _safe_inspect_call(inspector.active) - reserved = _safe_inspect_call(inspector.reserved) - scheduled = _safe_inspect_call(inspector.scheduled) - active_queues = _safe_inspect_call(inspector.active_queues) - ping = _safe_inspect_call(inspector.ping) + for worker in workers_overview.workers: + queue_names = worker.queue_names + for queue_name in queue_names: + queue_consumer_counts[queue_name] = queue_consumer_counts.get(queue_name, 0) + 1 + queue_runtime_counts.setdefault( + queue_name, + {"active": 0, "reserved": 0, "scheduled": 0}, + ) - worker_names = sorted(set(stats) | set(active) | set(reserved) | set(scheduled) | set(active_queues) | set(ping)) - overview.workers = [ - _build_worker_item( - worker, - stats=stats.get(worker) or {}, - active_tasks=active.get(worker) or [], - reserved_tasks=reserved.get(worker) or [], - scheduled_tasks=scheduled.get(worker) or [], - queues=active_queues.get(worker) or [], - online=worker in ping if ping else True, + worker_tasks = build_worker_task_overview( + worker=worker.worker, + force_refresh=False, + recent_limit=max(history_limit, 1), ) - for worker in worker_names - ] - overview.workers_online = sum(1 for item in overview.workers if item.online) - overview.worker_concurrency_total = sum(item.max_concurrency for item in overview.workers) + task_items.extend(_convert_flower_tasks(worker_tasks.active_tasks, source_state="STARTED")) + task_items.extend(_convert_flower_tasks(worker_tasks.reserved_tasks, source_state="RECEIVED")) + task_items.extend(_convert_flower_tasks(worker_tasks.scheduled_tasks, source_state="SCHEDULED")) + task_items.extend(_convert_flower_tasks(worker_tasks.recent_tasks, source_state=None)) - runtime_tasks = [ - *_build_task_items(active, state="STARTED", now=now), - *_build_task_items(reserved, state="RECEIVED", now=now), - *_build_task_items(scheduled, state="SCHEDULED", now=now), - ] - runtime_tasks_by_id = {item.task_id: item for item in runtime_tasks if item.task_id} + for task in worker_tasks.active_tasks: + if task.queue_name: + queue_runtime_counts.setdefault(task.queue_name, {"active": 0, "reserved": 0, "scheduled": 0}) + queue_runtime_counts[task.queue_name]["active"] += 1 + for task in worker_tasks.reserved_tasks: + if task.queue_name: + queue_runtime_counts.setdefault(task.queue_name, {"active": 0, "reserved": 0, "scheduled": 0}) + queue_runtime_counts[task.queue_name]["reserved"] += 1 + for task in worker_tasks.scheduled_tasks: + if task.queue_name: + queue_runtime_counts.setdefault(task.queue_name, {"active": 0, "reserved": 0, "scheduled": 0}) + queue_runtime_counts[task.queue_name]["scheduled"] += 1 - history_tasks = _load_history_task_items(settings.resolved_celery_result_backend, limit=history_limit) - for item in history_tasks: - if not item.task_id or item.task_id in runtime_tasks_by_id: + runtime_tasks_by_id: dict[str, TaskMonitorTaskItem] = {} + for item in task_items: + if not item.task_id: continue - runtime_tasks_by_id[item.task_id] = item + existing = runtime_tasks_by_id.get(item.task_id) + if existing is None: + runtime_tasks_by_id[item.task_id] = item + continue + if _task_priority(item.state) < _task_priority(existing.state): + runtime_tasks_by_id[item.task_id] = item all_tasks = sorted( runtime_tasks_by_id.values(), key=lambda item: ( - STATE_PRIORITY.get(item.state, 99), + _task_priority(item.state), -_task_sort_timestamp(item).timestamp(), item.task_id, ), ) - overview.tasks = all_tasks[:task_limit] - overview.task_state_buckets = _build_state_buckets(runtime_tasks_by_id.values()) - queue_names = _collect_queue_names(active_queues, runtime_tasks_by_id.values()) - queue_pending_counts = _load_queue_pending_counts(settings.resolved_celery_broker_url, queue_names) - overview.queues = _build_queue_items( - active_queues=active_queues, - tasks=runtime_tasks_by_id.values(), - pending_counts=queue_pending_counts, - ) - overview.queue_pending_total = sum(item.pending_count for item in overview.queues) - - return overview - - -def _safe_inspect_call(call): - try: - result = call() - except Exception: - return {} - if not isinstance(result, dict): - return {} - return result - - -def _build_worker_item( - worker: str, - *, - stats: dict, - active_tasks: list[dict], - reserved_tasks: list[dict], - scheduled_tasks: list[dict], - queues: list[dict], - online: bool, -) -> TaskMonitorWorkerItem: - pool = stats.get("pool") if isinstance(stats.get("pool"), dict) else {} - max_concurrency = _to_int( - pool.get("max-concurrency") - or pool.get("max_concurrency") - or len(pool.get("processes") or []) - ) - total = stats.get("total") if isinstance(stats.get("total"), dict) else {} - - return TaskMonitorWorkerItem( - worker=worker, - online=online, - queue_names=sorted({_queue_name_from_queue(item) for item in queues if _queue_name_from_queue(item)}), - max_concurrency=max_concurrency, - prefetch_count=_to_int(stats.get("prefetch_count")), - uptime_seconds=_to_int(stats.get("uptime")), - processed_total=sum(_to_int(value) for value in total.values()), - active_count=len(active_tasks), - reserved_count=len(reserved_tasks), - scheduled_count=len(scheduled_tasks), - ) - - -def _build_task_items(tasks_by_worker: dict, *, state: str, now: datetime) -> list[TaskMonitorTaskItem]: - items: list[TaskMonitorTaskItem] = [] - for worker, raw_tasks in tasks_by_worker.items(): - for raw_item in raw_tasks or []: - task = raw_item.get("request") if state == "SCHEDULED" else raw_item - if not isinstance(task, dict): - continue - task_id = str(task.get("id") or "").strip() - if not task_id: - continue - - eta = _parse_datetime(raw_item.get("eta")) if state == "SCHEDULED" else _parse_datetime(task.get("eta")) - started_at = _timestamp_to_datetime(task.get("time_start")) - runtime_seconds = None - if started_at and state == "STARTED": - runtime_seconds = max(0.0, round((now - started_at).total_seconds(), 3)) - - items.append( - TaskMonitorTaskItem( - task_id=task_id, - name=str(task.get("name") or task.get("task") or "-"), - state=state, - queue_name=_queue_name_from_task(task), - worker=str(worker), - retries=_to_int(task.get("retries")), - eta=eta, - started_at=started_at, - runtime_seconds=runtime_seconds, - ) - ) - return items - - -def _build_queue_items( - *, - active_queues: dict, - tasks: Iterable[TaskMonitorTaskItem], - pending_counts: dict[str, int], -) -> list[TaskMonitorQueueItem]: - queue_names: set[str] = set() - consumer_counts: dict[str, int] = {} - active_counts: dict[str, int] = {} - reserved_counts: dict[str, int] = {} - scheduled_counts: dict[str, int] = {} - - for queues in active_queues.values(): - for queue in queues or []: - name = _queue_name_from_queue(queue) - if not name: - continue - queue_names.add(name) - consumer_counts[name] = consumer_counts.get(name, 0) + 1 - - for task in tasks: - if not task.queue_name: - continue - queue_names.add(task.queue_name) - if task.state == "STARTED": - active_counts[task.queue_name] = active_counts.get(task.queue_name, 0) + 1 - elif task.state == "RECEIVED": - reserved_counts[task.queue_name] = reserved_counts.get(task.queue_name, 0) + 1 - elif task.state == "SCHEDULED": - scheduled_counts[task.queue_name] = scheduled_counts.get(task.queue_name, 0) + 1 - - return sorted( + queues = sorted( [ TaskMonitorQueueItem( name=name, - pending_count=max(0, _to_int(pending_counts.get(name))), - consumer_count=consumer_counts.get(name, 0), - active_count=active_counts.get(name, 0), - reserved_count=reserved_counts.get(name, 0), - scheduled_count=scheduled_counts.get(name, 0), + pending_count=0, + consumer_count=queue_consumer_counts.get(name, 0), + active_count=queue_runtime_counts.get(name, {}).get("active", 0), + reserved_count=queue_runtime_counts.get(name, {}).get("reserved", 0), + scheduled_count=queue_runtime_counts.get(name, {}).get("scheduled", 0), ) - for name in queue_names + for name in sorted(set(queue_runtime_counts) | set(queue_consumer_counts)) ], - key=lambda item: (-item.pending_count, item.name), + key=lambda item: (-item.active_count - item.reserved_count - item.scheduled_count, item.name), + ) + + return TaskMonitorOverviewResponse( + generated_at=workers_overview.generated_at, + broker_url="flower", + result_backend="flower", + workers_online=workers_overview.summary.online, + worker_concurrency_total=sum(item.concurrency for item in workers_overview.workers), + queue_pending_total=sum(item.pending_count for item in queues), + task_state_buckets=_build_state_buckets(runtime_tasks_by_id.values()), + workers=[ + { + "worker": worker.worker, + "online": worker.status == "ONLINE", + "queue_names": worker.queue_names, + "max_concurrency": worker.concurrency, + "prefetch_count": worker.prefetch_count, + "uptime_seconds": 0, + "processed_total": worker.processed_count, + "active_count": worker.active_count, + "reserved_count": worker.reserved_count, + "scheduled_count": worker.scheduled_count, + } + for worker in workers_overview.workers + ], + queues=queues, + tasks=all_tasks[:task_limit], ) @@ -245,201 +146,35 @@ def _build_state_buckets(tasks: Iterable[TaskMonitorTaskItem]) -> list[TaskMonit ] -def _collect_queue_names(active_queues: dict, tasks: Iterable[TaskMonitorTaskItem]) -> list[str]: - queue_names: set[str] = set() - - for queues in active_queues.values(): - for queue in queues or []: - name = _queue_name_from_queue(queue) - if name: - queue_names.add(name) - - for task in tasks: - if task.queue_name: - queue_names.add(task.queue_name) - - return sorted(queue_names) - - -def _load_queue_pending_counts(broker_url: str, queue_names: list[str]) -> dict[str, int]: - if not queue_names or not _is_redis_url(broker_url): - return {name: 0 for name in queue_names} - - client = _build_redis_client(broker_url) - if client is None: - return {name: 0 for name in queue_names} - - counts: dict[str, int] = {} - try: - for queue_name in queue_names: - counts[queue_name] = _to_int(client.llen(queue_name)) - except Exception: - return {name: 0 for name in queue_names} - return counts - - -def _load_history_task_items(result_backend_url: str, *, limit: int) -> list[TaskMonitorTaskItem]: - if limit <= 0 or not _is_redis_url(result_backend_url): - return [] - - client = _build_redis_client(result_backend_url) - if client is None: - return [] - - scan_max = max(200, limit * 20) - keys: list[str] = [] - cursor = 0 - - try: - while True: - cursor, batch = client.scan(cursor=cursor, match="celery-task-meta-*", count=200) - keys.extend(str(key) for key in batch) - if cursor == 0 or len(keys) >= scan_max: - break - except Exception: - return [] - +def _convert_flower_tasks(tasks: list[Any], *, source_state: str | None) -> list[TaskMonitorTaskItem]: items: list[TaskMonitorTaskItem] = [] - for key in keys[:scan_max]: - try: - payload_raw = client.get(key) - except Exception: - continue - if not payload_raw: - continue - - try: - payload = json.loads(payload_raw) - except (TypeError, json.JSONDecodeError): - continue - if not isinstance(payload, dict): - continue - - state = str(payload.get("status") or "").strip().upper() - if not state: - continue - - task_id = str(payload.get("task_id") or key.removeprefix("celery-task-meta-")).strip() - if not task_id: - continue - - done_at = _parse_datetime(payload.get("date_done")) - error = _build_error(payload.get("result"), payload.get("traceback")) if state in {"FAILURE", "RETRY", "REVOKED"} else None - + for task in tasks: + state = source_state or task.state items.append( TaskMonitorTaskItem( - task_id=task_id, - name=str(payload.get("name") or "-"), + task_id=task.task_id, + name=task.name, state=state, - done_at=done_at, - error=error, + queue_name=task.queue_name, + worker=task.worker, + retries=0, + eta=task.eta, + started_at=task.started_at, + done_at=task.finished_at, + runtime_seconds=task.runtime_seconds, + error=task.exception_text, ) ) - - items.sort(key=lambda item: -_task_sort_timestamp(item).timestamp()) - return items[:limit] + return items -def _queue_name_from_queue(queue: dict) -> str: - if not isinstance(queue, dict): - return "" - return str(queue.get("name") or queue.get("routing_key") or "").strip() - - -def _queue_name_from_task(task: dict) -> str | None: - delivery_info = task.get("delivery_info") if isinstance(task.get("delivery_info"), dict) else {} - queue_name = delivery_info.get("routing_key") or task.get("queue") - if not queue_name: - return None - return str(queue_name) - - -def _is_redis_url(value: str) -> bool: - scheme = urlsplit(value).scheme - return scheme in {"redis", "rediss"} - - -def _build_redis_client(redis_url: str): - if redis is None: - return None - try: - return redis.Redis.from_url( - redis_url, - decode_responses=True, - socket_connect_timeout=1, - socket_timeout=1, - ) - except Exception: - return None - - -def _parse_datetime(value: object) -> datetime | None: - if not isinstance(value, str) or not value.strip(): - return None - normalized = value.strip().replace("Z", "+00:00") - try: - parsed = datetime.fromisoformat(normalized) - except ValueError: - return None - if parsed.tzinfo is None: - return parsed.replace(tzinfo=timezone.utc) - return parsed.astimezone(timezone.utc) - - -def _timestamp_to_datetime(value: object) -> datetime | None: - try: - timestamp = float(value) - except (TypeError, ValueError): - return None - if timestamp <= 0: - return None - try: - return datetime.fromtimestamp(timestamp, timezone.utc) - except (OSError, OverflowError, ValueError): - return None +def _task_priority(state: str) -> int: + normalized = (state or "").strip().upper() + return STATE_PRIORITY.get(normalized, 99) def _task_sort_timestamp(item: TaskMonitorTaskItem) -> datetime: for candidate in [item.started_at, item.done_at, item.eta]: - if candidate is None: - continue - if candidate.tzinfo is None: - return candidate.replace(tzinfo=timezone.utc) - return candidate.astimezone(timezone.utc) - return datetime.fromtimestamp(0, timezone.utc) - - -def _build_error(result, traceback_value) -> str | None: - result_text = None - if result is not None: - result_text = str(result).strip() - - traceback_text = None - if isinstance(traceback_value, str): - traceback_text = traceback_value.strip() - - for candidate in [result_text, traceback_text]: - if not candidate: - continue - if len(candidate) <= 400: + if candidate is not None: return candidate - return f"{candidate[:397]}..." - return None - - -def _to_int(value: object) -> int: - try: - return int(value or 0) - except (TypeError, ValueError): - return 0 - - -def _mask_url(value: str) -> str: - parsed = urlsplit(value) - if not parsed.password: - return value - username = parsed.username or "" - hostname = parsed.hostname or "" - port = f":{parsed.port}" if parsed.port else "" - netloc = f"{username}:***@{hostname}{port}" - return urlunsplit((parsed.scheme, netloc, parsed.path, parsed.query, parsed.fragment)) + return datetime.fromtimestamp(0, tz=timezone.utc) diff --git a/api/app/services/worker_registry_service.py b/api/app/services/worker_registry_service.py new file mode 100644 index 0000000..d1c8441 --- /dev/null +++ b/api/app/services/worker_registry_service.py @@ -0,0 +1,165 @@ +from __future__ import annotations + +import json +import os +from datetime import timedelta +from typing import Any + +from sqlalchemy import select + +from ..core.database import SessionLocal +from ..models.base import utcnow +from ..models.worker_registry import WorkerRegistry + + +def register_worker( + *, + worker_name: str, + status: str, + queues: list[str] | None = None, + pid: int | None = None, + heartbeat_increment: bool = False, + metadata: dict[str, Any] | None = None, +) -> None: + normalized_worker = worker_name.strip() + if not normalized_worker: + return + + now = utcnow() + queues_csv = _normalize_queues(queues) + metadata_text = _to_json_text(metadata) + normalized_status = (status or "").strip().lower() or "online" + normalized_pid = _coerce_pid(pid) + + db = SessionLocal() + try: + row = db.execute( + select(WorkerRegistry).where(WorkerRegistry.worker_name == normalized_worker) + ).scalar_one_or_none() + if row is None: + row = WorkerRegistry( + worker_name=normalized_worker, + status=normalized_status, + queues_csv=queues_csv, + pid=normalized_pid, + heartbeat_count=1 if heartbeat_increment else 0, + first_seen_at=now, + last_seen_at=now, + metadata_json=metadata_text, + create_date=now, + update_date=now, + ) + db.add(row) + db.commit() + return + + row.status = normalized_status + if queues_csv is not None: + row.queues_csv = queues_csv + if normalized_pid is not None: + row.pid = normalized_pid + if heartbeat_increment: + row.heartbeat_count = int(row.heartbeat_count or 0) + 1 + if metadata_text is not None: + row.metadata_json = metadata_text + row.last_seen_at = now + row.update_date = now + db.commit() + except Exception: + db.rollback() + finally: + db.close() + + +def mark_worker_offline(worker_name: str) -> None: + normalized_worker = worker_name.strip() + if not normalized_worker: + return + now = utcnow() + db = SessionLocal() + try: + row = db.execute( + select(WorkerRegistry).where(WorkerRegistry.worker_name == normalized_worker) + ).scalar_one_or_none() + if row is None: + row = WorkerRegistry( + worker_name=normalized_worker, + status="offline", + heartbeat_count=0, + first_seen_at=now, + last_seen_at=now, + create_date=now, + update_date=now, + ) + db.add(row) + db.commit() + return + row.status = "offline" + row.last_seen_at = now + row.update_date = now + db.commit() + except Exception: + db.rollback() + finally: + db.close() + + +def sweep_offline_workers(*, ttl_seconds: int) -> int: + safe_ttl = max(10, int(ttl_seconds)) + threshold = utcnow() - timedelta(seconds=safe_ttl) + db = SessionLocal() + changed = 0 + try: + rows = db.execute( + select(WorkerRegistry).where( + WorkerRegistry.status == "online", + WorkerRegistry.last_seen_at < threshold, + ) + ).scalars().all() + if not rows: + return 0 + now = utcnow() + for row in rows: + row.status = "offline" + row.update_date = now + changed += 1 + db.commit() + return changed + except Exception: + db.rollback() + return 0 + finally: + db.close() + + +def _normalize_queues(queues: list[str] | None) -> str | None: + if queues is None: + return None + normalized = sorted({item.strip() for item in queues if isinstance(item, str) and item.strip()}) + if not normalized: + return "" + return ",".join(normalized) + + +def _to_json_text(value: dict[str, Any] | None) -> str | None: + if value is None: + return None + try: + text = json.dumps(value, ensure_ascii=False) + except TypeError: + text = json.dumps({"repr": repr(value)}, ensure_ascii=False) + if len(text) > 3900: + text = text[:3897] + "..." + return text + + +def _coerce_pid(value: int | None) -> int | None: + if value is not None: + try: + pid = int(value) + except (TypeError, ValueError): + pid = None + if pid and pid > 0: + return pid + fallback = os.getpid() + return fallback if fallback > 0 else None diff --git a/api/app/tasks/worker_registry_tasks.py b/api/app/tasks/worker_registry_tasks.py new file mode 100644 index 0000000..865689c --- /dev/null +++ b/api/app/tasks/worker_registry_tasks.py @@ -0,0 +1,14 @@ +from __future__ import annotations + +from ..core.celery_app import celery_app +from ..core.config import get_settings +from ..services.worker_registry_service import sweep_offline_workers + + +@celery_app.task(name="app.tasks.worker_registry_tasks.sweep_worker_registry_offline") +def sweep_worker_registry_offline() -> dict[str, int]: + settings = get_settings() + updated_count = sweep_offline_workers(ttl_seconds=settings.worker_registry_ttl_seconds) + return { + "updated_count": int(updated_count), + } diff --git a/docker-compose.yml b/docker-compose.yml index f532aac..bf6671e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -125,6 +125,9 @@ services: CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1} CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai} SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60} + SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100} + SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-} + SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default} WINE_BINARY_PATH: ${WINE_BINARY_PATH:-wine} WINE_ALLOWED_ROOT: ${WINE_ALLOWED_ROOT:-./data/wine} WINE_DEFAULT_TIMEOUT_SECONDS: ${WINE_DEFAULT_TIMEOUT_SECONDS:-300} @@ -165,6 +168,7 @@ services: - worker - --loglevel=${CELERY_LOG_LEVEL:-INFO} - --concurrency=${CELERY_WORKER_CONCURRENCY:-2} + - --queues=${CELERY_WORKER_QUEUES:-default,celery} depends_on: api: condition: service_healthy @@ -185,6 +189,13 @@ services: CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1} CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai} SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60} + SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100} + SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-} + SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default} + FLOWER_API_BASE_URL: ${FLOWER_API_BASE_URL:-http://flower:5555} + FLOWER_API_TIMEOUT_SECONDS: ${FLOWER_API_TIMEOUT_SECONDS:-10} + FLOWER_BASIC_AUTH: ${FLOWER_BASIC_AUTH:-} + WORKER_REGISTRY_TTL_SECONDS: ${WORKER_REGISTRY_TTL_SECONDS:-90} restart: unless-stopped celery-beat: @@ -224,6 +235,73 @@ services: CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1} CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai} SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60} + SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100} + SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-} + SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default} + WORKER_REGISTRY_TTL_SECONDS: ${WORKER_REGISTRY_TTL_SECONDS:-90} + restart: unless-stopped + + scheduler: + build: + context: ./api + dockerfile: Dockerfile + args: + PYTHON_BASE_IMAGE: ${PYTHON_BASE_IMAGE:-docker.m.daocloud.io/library/python:3.11-slim} + PIP_INDEX_URL: ${PIP_INDEX_URL:-https://pypi.org/simple} + PIP_DEFAULT_TIMEOUT: ${PIP_DEFAULT_TIMEOUT:-300} + PIP_RETRIES: ${PIP_RETRIES:-20} + container_name: fquiz-scheduler + command: + - uvicorn + - app.scheduler_main:app + - --host + - 0.0.0.0 + - --port + - "19100" + depends_on: + redis: + condition: service_healthy + api: + condition: service_healthy + environment: + CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis:6379/0} + CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1} + CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai} + SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-} + SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default} + ports: + - "${SCHEDULER_PORT:-19100}:19100" + restart: unless-stopped + + flower: + build: + context: ./api + dockerfile: Dockerfile + args: + PYTHON_BASE_IMAGE: ${PYTHON_BASE_IMAGE:-docker.m.daocloud.io/library/python:3.11-slim} + PIP_INDEX_URL: ${PIP_INDEX_URL:-https://pypi.org/simple} + PIP_DEFAULT_TIMEOUT: ${PIP_DEFAULT_TIMEOUT:-300} + PIP_RETRIES: ${PIP_RETRIES:-20} + container_name: fquiz-flower + command: + - celery + - -A + - app.core.celery_app.celery_app + - flower + - --address=0.0.0.0 + - --port=5555 + - --persistent=False + - --basic-auth=${FLOWER_BASIC_AUTH:-admin:admin} + depends_on: + redis: + condition: service_healthy + environment: + CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis:6379/0} + CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1} + CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai} + FLOWER_BASIC_AUTH: ${FLOWER_BASIC_AUTH:-admin:admin} + ports: + - "${FLOWER_PORT:-5555}:5555" restart: unless-stopped web: diff --git a/memory/2026-05-01.md b/memory/2026-05-01.md index ce6cbf9..bc6a9e0 100644 --- a/memory/2026-05-01.md +++ b/memory/2026-05-01.md @@ -598,3 +598,152 @@ - 风险与影响: - 影响范围仅前端全局 404 页面。 - 文案当前为英文(对齐原闪现页面),若后续需要中英双语可再单独调整。 + +## Work Log - 迁移 Celery 与任务/Worker 双监控页面到 fquiz(2026-05-01) + +- 背景: + - 需求要求将 `modo-next` 的 Celery 调度与两类监控页(任务监控、Worker 监控)迁移到当前 `fquiz`。 + - 仓库已有 Celery 与 `/admin/task-monitor`,缺口是“独立 Worker 监控页 + 对应 API/菜单入口”。 + +- 本次改动(最小闭环): + - 后端新增 Worker 监控模型与接口: + - `api/app/schemas/worker_monitor.py` + - 新增 `WorkerMonitorOverviewResponse`、`WorkerMonitorTaskOverviewResponse` 及相关 item/summary 模型。 + - `api/app/services/worker_monitor_service.py` + - 基于 Celery inspect 提供 Worker 概览(状态/队列/并发/处理量)。 + - 提供单 Worker 任务快照(active/reserved/scheduled)与 recent 历史任务聚合。 + - 复用现有 `task_monitor_service` 的通用解析工具,保持行为一致性。 + - `api/app/api/v1/worker_monitor.py` + - 新增路由: + - `GET /api/v1/admin/workers/overview` + - `GET /api/v1/admin/workers/tasks?worker=...&recent_limit=...` + - 权限统一 `celery.read` / `celery.manage`。 + - `api/app/api/router.py` + - 挂载 `worker_monitor` 路由。 + + - 前端新增独立 Worker 监控页: + - `web/src/app/admin/workers/page.tsx` + - 新增 Worker 列表页:支持名称/队列/在线状态筛选、自动刷新、手动刷新。 + - 新增 Worker 任务抽屉:查看 active/reserved/scheduled/recent 四类任务明细(含 args/kwargs/error)。 + - 数据源对齐新后端 API。 + + - 菜单与权限入口联动: + - `api/app/services/seed_service.py` + - 新增菜单 `admin.workers`(`/admin/workers`)。 + - admin 默认菜单绑定加入 `admin.workers`。 + - 调整后续菜单 `sort_order`,避免排序冲突。 + - `api/app/services/legacy_authz_service.py` + - 增加 `admin.workers -> celery.read/celery.manage` 权限映射。 + - synthetic legacy menu rows 增加 `admin.workers`。 + - 同步顺序号,保持 legacy 菜单序一致。 + - `api/app/services/legacy_admin_rbac_service.py` + - `PROTECTED_MENU_CODES` 加入 `admin.workers`,避免误删。 + - `api/app/services/admin_service.py` + - 菜单删除保护集合加入 `admin.workers`。 + - `web/src/app/admin/menus/page.tsx` + - 前端受保护菜单集合加入 `admin.workers`。 + - `web/src/app/admin/page.tsx` + - 后台首页新增“Worker监控”卡片入口。 + +- 验证: + - 后端语法检查: + - `python3 -m py_compile api/app/schemas/worker_monitor.py api/app/services/worker_monitor_service.py api/app/api/v1/worker_monitor.py api/app/api/router.py api/app/services/seed_service.py api/app/services/legacy_authz_service.py api/app/services/admin_service.py` -> 通过。 + - 前端构建: + - `rm -f web/.next/lock && npm run build:web` -> 通过。 + - 构建产物路由包含:`/admin/workers`、`/admin/task-monitor`。 + +- 风险与影响: + - 影响范围:Celery 监控模块(新增 Worker 监控维度),权限仍复用 `celery.read/manage`。 + - `recent_tasks` 依赖 Celery Redis result backend 中任务元数据是否含 `worker/hostname` 字段;若上游未写入,recent 列表会偏少,但不影响 active/reserved/scheduled 实时快照。 + +## Work Log - 新增 Scheduler + Flower + Worker 自动注册并切换监控数据源(2026-05-01) + +- 背景: + - 新需求要求当前 `fquiz` 引入独立 `scheduler` 服务、`flower` 监控服务,监控页面改走 Flower,并提供 worker 自动注册机制。 + +- 本次改动(最小闭环): + + - 后端新增 `scheduler` 调度 API: + - `api/app/schemas/scheduler.py` + - 定义任务入队/撤销请求响应模型。 + - `api/app/services/scheduler_service.py` + - 封装 `celery_app.send_task` 与 `celery_app.control.revoke`。 + - `api/app/api/v1/scheduler.py` + - 提供: + - `GET /api/v1/scheduler/healthz` + - `POST /api/v1/scheduler/v1/tasks/enqueue` + - `POST /api/v1/scheduler/v1/tasks/revoke` + - 支持 `x-scheduler-token`(环境变量配置后生效)。 + - `api/app/scheduler_main.py` + - 独立 scheduler 进程入口(只挂 scheduler 路由)。 + - `api/app/api/router.py` + - 主 API 路由挂载 scheduler 与 flower monitor 路由。 + + - 后端新增 Flower 监控代理层(页面不直连 Flower): + - `api/app/schemas/flower_monitor.py` + - 定义 worker 概览与任务快照模型。 + - `api/app/services/flower_monitor_service.py` + - 代理调用 Flower API: + - `/api/workers?refresh=...` + - `/api/workers?status=true` + - `/api/tasks?limit=...&workername=...` + - 统一解析 worker/task 字段、状态、时间戳。 + - `api/app/api/v1/flower_monitor.py` + - 提供: + - `GET /api/v1/admin/flower/workers` + - `GET /api/v1/admin/flower/worker-tasks` + - 权限:`celery.read`/`celery.manage`。 + + - Worker 自动注册机制: + - `api/app/models/worker_registry.py` + - 新增 `worker_registry` 表模型。 + - `api/app/services/worker_registry_service.py` + - worker 注册、离线标记、离线扫描。 + - `api/app/core/worker_signals.py` + - 绑定 Celery 信号:`worker_ready` / `heartbeat_sent` / `worker_shutdown`。 + - `api/app/tasks/worker_registry_tasks.py` + - 定时离线扫描任务。 + - `api/app/core/celery_app.py` + - include 新任务模块,beat schedule 增加 `sweep_worker_registry_offline`。 + - 引入 worker signal 注册。 + - `api/app/core/database.py` + `api/app/models/__init__.py` + - 把 `worker_registry` 纳入 ORM 初始化与建表。 + + - 监控页面切 Flower: + - `web/src/app/admin/workers/page.tsx` + - 改为请求 `/api/v1/admin/flower/workers` 与 `/api/v1/admin/flower/worker-tasks`。 + - 展示 Flower 维度字段(在线状态、并发、registered、processed、心跳、任务来源等)。 + - `web/src/app/admin/task-monitor/page.tsx` + - 重构为 Flower 聚合视图:批量拉取每个 worker 任务快照并汇总状态分布/任务表。 + - `api/app/services/task_monitor_service.py` + - 从原 inspect+redis 直读切换为基于 Flower 代理服务聚合,保持原接口契约。 + + - 调度链路接入 scheduler(可选): + - `api/app/services/elevation_service.py` + - `create_apply_job` 增加 `dispatch_mode`,支持 `scheduler_api`。 + - `api/app/api/v1/elevation.py` 新增 `dispatchMode` 查询参数透传。 + - 默认仍为 `celery_direct`,不破坏现有行为。 + + - 运行环境与编排配置: + - `docker-compose.yml` + - 新增 `scheduler` 服务(19100)。 + - 新增 `flower` 服务(5555,basic auth)。 + - `celery-worker` 增加 `CELERY_WORKER_QUEUES` 配置。 + - API/worker/beat 注入 `SCHEDULER_*`、`FLOWER_*`、`WORKER_REGISTRY_TTL_SECONDS`。 + - `.env.example` + - 新增 `SCHEDULER_API_BASE_URL`、`SCHEDULER_API_TOKEN`、`SCHEDULER_DEFAULT_QUEUE`。 + - 新增 `FLOWER_API_BASE_URL`、`FLOWER_API_TIMEOUT_SECONDS`、`FLOWER_BASIC_AUTH`、`FLOWER_PORT`。 + - 新增 `WORKER_REGISTRY_TTL_SECONDS`、`CELERY_WORKER_QUEUES`、`SCHEDULER_PORT`。 + +- 验证: + - 后端语法校验通过: + - `python3 -m py_compile ...`(覆盖 scheduler/flower/worker_registry/celery/task-monitor 相关文件) + - 前端构建通过: + - `npm run build:web` -> 通过,页面路由包含 `/admin/workers`、`/admin/task-monitor`。 + - Compose 渲染校验通过: + - `docker compose config` 渲染包含 `scheduler`、`flower` 服务以及关键 env/command 字段。 + +- 风险与影响: + - Flower 代理链路新增了对 `FLOWER_BASIC_AUTH`、`FLOWER_API_BASE_URL` 的配置依赖,配置不一致会导致监控页 502。 + - `task-monitor` 改为按 worker 聚合调用,worker 数量多时会增加 API 聚合开销。 + - `dispatchMode=scheduler_api` 需确保 `scheduler` 服务可达;默认 `celery_direct` 兼容旧行为。 diff --git a/web/src/app/admin/menus/page.tsx b/web/src/app/admin/menus/page.tsx index 0aebc5e..5f8901a 100644 --- a/web/src/app/admin/menus/page.tsx +++ b/web/src/app/admin/menus/page.tsx @@ -70,6 +70,7 @@ const PROTECTED_MENU_CODES = new Set([ "admin.power_lines", "admin.lightning_currents", "admin.lightning_distribution", + "admin.workers", "admin.data_query", "admin.hot_search", "admin.task_monitor", diff --git a/web/src/app/admin/page.tsx b/web/src/app/admin/page.tsx index b452c23..f6eeb0c 100644 --- a/web/src/app/admin/page.tsx +++ b/web/src/app/admin/page.tsx @@ -6,6 +6,7 @@ import { AuditOutlined, CodeOutlined, DatabaseOutlined, + DeploymentUnitOutlined, FileSearchOutlined, FolderOpenOutlined, GlobalOutlined, @@ -72,6 +73,14 @@ const CARDS: DashboardCard[] = [ icon: , visible: (hasPermission) => hasPermission("file.read") || hasPermission("file.manage"), }, + { + href: "/workers", + title: "Worker监控", + description: "查看 Worker 在线状态、并发、活跃队列和每个 Worker 的任务快照。", + category: "协作", + icon: , + visible: (hasPermission) => hasPermission("celery.read") || hasPermission("celery.manage"), + }, { href: "/task-monitor", title: "任务监控", diff --git a/web/src/app/admin/task-monitor/page.tsx b/web/src/app/admin/task-monitor/page.tsx index d81091a..cb6e9a4 100644 --- a/web/src/app/admin/task-monitor/page.tsx +++ b/web/src/app/admin/task-monitor/page.tsx @@ -11,8 +11,9 @@ import { Card, Col, Empty, - InputNumber, + Input, Row, + Select, Space, Spin, Statistic, @@ -30,62 +31,67 @@ import { readApiError } from "@/lib/api"; const { Text } = Typography; const AntCard = Card as unknown as ComponentType; -const DEFAULT_TASK_LIMIT = 100; -const DEFAULT_HISTORY_LIMIT = 100; +const DEFAULT_RECENT_LIMIT = 100; -type TaskMonitorBucketItem = { - key: string; - label: string; - count: number; -}; - -type TaskMonitorWorkerItem = { +type FlowerWorkerItem = { worker: string; - online: boolean; + status: string; queue_names: string[]; - max_concurrency: number; + registered_count: number; + processed_count: number; + concurrency: number; prefetch_count: number; - uptime_seconds: number; - processed_total: number; active_count: number; reserved_count: number; scheduled_count: number; + last_heartbeat_at: string | null; }; -type TaskMonitorQueueItem = { - name: string; - pending_count: number; - consumer_count: number; - active_count: number; - reserved_count: number; - scheduled_count: number; +type FlowerWorkersOverviewResponse = { + generated_at: string; + workers: FlowerWorkerItem[]; + summary: { + total: number; + online: number; + offline: number; + }; }; -type TaskMonitorTaskItem = { +type FlowerTaskItem = { task_id: string; name: string; state: string; + source: string; + worker: string; queue_name: string | null; - worker: string | null; - retries: number; + args_text: string | null; + kwargs_text: string | null; eta: string | null; + received_at: string | null; started_at: string | null; - done_at: string | null; + finished_at: string | null; runtime_seconds: number | null; - error: string | null; + result_text: string | null; + exception_text: string | null; }; -type TaskMonitorOverviewResponse = { +type FlowerWorkerTaskOverviewResponse = { generated_at: string; - broker_url: string; - result_backend: string; - workers_online: number; - worker_concurrency_total: number; - queue_pending_total: number; - task_state_buckets: TaskMonitorBucketItem[]; - workers: TaskMonitorWorkerItem[]; - queues: TaskMonitorQueueItem[]; - tasks: TaskMonitorTaskItem[]; + worker: string; + active_tasks: FlowerTaskItem[]; + reserved_tasks: FlowerTaskItem[]; + scheduled_tasks: FlowerTaskItem[]; + recent_tasks: FlowerTaskItem[]; + summary: { + active: number; + reserved: number; + scheduled: number; + recent: number; + }; +}; + +type TaskTableRow = FlowerTaskItem & { + key: string; }; function formatDateTime(value: string | null | undefined): string { @@ -99,27 +105,6 @@ function formatDateTime(value: string | null | undefined): string { return parsed.format("YYYY-MM-DD HH:mm:ss"); } -function formatDuration(seconds: number): string { - const total = Math.max(0, Math.trunc(seconds)); - const hours = Math.floor(total / 3600); - const minutes = Math.floor((total % 3600) / 60); - const remain = total % 60; - if (hours > 0) { - return `${hours}h ${minutes}m ${remain}s`; - } - if (minutes > 0) { - return `${minutes}m ${remain}s`; - } - return `${remain}s`; -} - -function normalizePositiveInt(value: number | null | undefined, fallback: number, min: number, max: number): number { - if (typeof value !== "number" || Number.isNaN(value)) { - return fallback; - } - return Math.min(max, Math.max(min, Math.trunc(value))); -} - function renderTaskStateTag(state: string) { const normalized = (state || "").toUpperCase(); const color = @@ -141,53 +126,96 @@ function renderTaskStateTag(state: string) { return {normalized || "UNKNOWN"}; } -function renderOnlineTag(online: boolean) { - return online ? 在线 : 离线; +function renderWorkerStatusTag(status: string) { + return (status || "").toUpperCase() === "ONLINE" ? 在线 : 离线; +} + +function containsText(source: string | null | undefined, keyword: string): boolean { + if (!keyword) { + return true; + } + return (source || "").toLowerCase().includes(keyword.toLowerCase()); +} + +function toTaskRows(workerName: string, source: string, tasks: FlowerTaskItem[]): TaskTableRow[] { + return tasks.map((item, index) => ({ + ...item, + source: item.source || source, + worker: item.worker || workerName, + key: `${workerName}:${source}:${item.task_id}:${index + 1}`, + })); +} + +function parseStatusFilter(value: string | undefined): "all" | "online" | "offline" { + if (value === "online" || value === "offline") { + return value; + } + return "all"; } export default function AdminTaskMonitorPage() { const { user, initializing, fetchWithAuth, hasPermission } = useAuth(); - const [taskLimit, setTaskLimit] = useState(DEFAULT_TASK_LIMIT); - const [historyLimit, setHistoryLimit] = useState(DEFAULT_HISTORY_LIMIT); - const [autoRefresh, setAutoRefresh] = useState(true); - const canRead = hasPermission("celery.read") || hasPermission("celery.manage"); - const overviewPath = useMemo(() => { - const params = new URLSearchParams(); - params.set("task_limit", String(taskLimit)); - params.set("history_limit", String(historyLimit)); - return `/api/v1/admin/task-monitor/overview?${params.toString()}`; - }, [historyLimit, taskLimit]); + const [autoRefresh, setAutoRefresh] = useState(true); + const [workerKeyword, setWorkerKeyword] = useState(""); + const [queueKeyword, setQueueKeyword] = useState(""); + const [taskKeyword, setTaskKeyword] = useState(""); + const [statusFilter, setStatusFilter] = useState<"all" | "online" | "offline">("all"); - const overviewQuery = useQuery({ - queryKey: ["task-monitor-overview", overviewPath], + const workersOverviewQuery = useQuery({ + queryKey: ["flower-workers-overview"], enabled: Boolean(user) && canRead, queryFn: async () => { - const response = await fetchWithAuth(overviewPath); + const response = await fetchWithAuth("/api/v1/admin/flower/workers?forceRefresh=false"); if (!response.ok) { throw new Error(await readApiError(response)); } - return (await response.json()) as TaskMonitorOverviewResponse; + return (await response.json()) as FlowerWorkersOverviewResponse; }, refetchInterval: autoRefresh ? 5_000 : false, staleTime: 15_000, }); - const workerColumns = useMemo>( + const workerNames = useMemo(() => (workersOverviewQuery.data?.workers || []).map((item) => item.worker), [workersOverviewQuery.data?.workers]); + + const allTasksQuery = useQuery({ + queryKey: ["flower-worker-tasks-batch", workerNames], + enabled: Boolean(user) && canRead && workerNames.length > 0, + queryFn: async () => { + const settled = await Promise.all( + workerNames.map(async (worker) => { + const params = new URLSearchParams(); + params.set("worker", worker); + params.set("recentLimit", String(DEFAULT_RECENT_LIMIT)); + params.set("forceRefresh", "false"); + const response = await fetchWithAuth(`/api/v1/admin/flower/worker-tasks?${params.toString()}`); + if (!response.ok) { + throw new Error(await readApiError(response)); + } + return (await response.json()) as FlowerWorkerTaskOverviewResponse; + }), + ); + return settled; + }, + refetchInterval: autoRefresh ? 5_000 : false, + staleTime: 15_000, + }); + + const workerColumns = useMemo>( () => [ { title: "Worker", dataIndex: "worker", key: "worker", - width: 260, + width: 280, }, { title: "状态", - dataIndex: "online", - key: "online", + dataIndex: "status", + key: "status", width: 90, - render: (value: boolean) => renderOnlineTag(value), + render: (value: string) => renderWorkerStatusTag(value), }, { title: "队列", @@ -197,28 +225,27 @@ export default function AdminTaskMonitorPage() { }, { title: "并发", - dataIndex: "max_concurrency", - key: "max_concurrency", - width: 90, + dataIndex: "concurrency", + key: "concurrency", + width: 80, }, { - title: "预取", + title: "Prefetch", dataIndex: "prefetch_count", key: "prefetch_count", width: 90, }, { - title: "在线时长", - dataIndex: "uptime_seconds", - key: "uptime_seconds", - width: 120, - render: (value: number) => formatDuration(value), + title: "已注册任务", + dataIndex: "registered_count", + key: "registered_count", + width: 110, }, { title: "累计处理", - dataIndex: "processed_total", - key: "processed_total", - width: 120, + dataIndex: "processed_count", + key: "processed_count", + width: 110, }, { title: "Active/Reserved/Scheduled", @@ -226,65 +253,31 @@ export default function AdminTaskMonitorPage() { width: 190, render: (_: unknown, record) => `${record.active_count}/${record.reserved_count}/${record.scheduled_count}`, }, - ], - [], - ); - - const queueColumns = useMemo>( - () => [ { - title: "队列", - dataIndex: "name", - key: "name", - }, - { - title: "Pending", - dataIndex: "pending_count", - key: "pending_count", - width: 110, - }, - { - title: "Consumer", - dataIndex: "consumer_count", - key: "consumer_count", - width: 110, - }, - { - title: "Active", - dataIndex: "active_count", - key: "active_count", - width: 100, - }, - { - title: "Reserved", - dataIndex: "reserved_count", - key: "reserved_count", - width: 110, - }, - { - title: "Scheduled", - dataIndex: "scheduled_count", - key: "scheduled_count", - width: 120, + title: "最近心跳", + dataIndex: "last_heartbeat_at", + key: "last_heartbeat_at", + width: 170, + render: (value: string | null) => formatDateTime(value), }, ], [], ); - const taskColumns = useMemo>( + const taskColumns = useMemo>( () => [ { title: "Task ID", dataIndex: "task_id", key: "task_id", - width: 280, + width: 260, render: (value: string) => {value}, }, { title: "任务名", dataIndex: "name", key: "name", - width: 260, + width: 220, render: (value: string) => value || "-", }, { @@ -294,11 +287,18 @@ export default function AdminTaskMonitorPage() { width: 110, render: (value: string) => renderTaskStateTag(value), }, + { + title: "来源", + dataIndex: "source", + key: "source", + width: 100, + render: (value: string) => value || "-", + }, { title: "队列", dataIndex: "queue_name", key: "queue_name", - width: 130, + width: 120, render: (value: string | null) => value || "-", }, { @@ -306,18 +306,12 @@ export default function AdminTaskMonitorPage() { dataIndex: "worker", key: "worker", width: 220, - render: (value: string | null) => value || "-", + render: (value: string) => value || "-", }, { - title: "重试", - dataIndex: "retries", - key: "retries", - width: 80, - }, - { - title: "ETA", - dataIndex: "eta", - key: "eta", + title: "接收", + dataIndex: "received_at", + key: "received_at", width: 170, render: (value: string | null) => formatDateTime(value), }, @@ -330,8 +324,8 @@ export default function AdminTaskMonitorPage() { }, { title: "完成", - dataIndex: "done_at", - key: "done_at", + dataIndex: "finished_at", + key: "finished_at", width: 170, render: (value: string | null) => formatDateTime(value), }, @@ -340,13 +334,27 @@ export default function AdminTaskMonitorPage() { dataIndex: "runtime_seconds", key: "runtime_seconds", width: 110, - render: (value: number | null) => (value === null ? "-" : `${value.toFixed(1)}s`), + render: (value: number | null) => (value === null ? "-" : `${value.toFixed(3)}s`), }, { - title: "错误", - dataIndex: "error", - key: "error", - width: 260, + title: "Args", + dataIndex: "args_text", + key: "args_text", + width: 220, + render: (value: string | null) => (value ? {value} : "-"), + }, + { + title: "Kwargs", + dataIndex: "kwargs_text", + key: "kwargs_text", + width: 220, + render: (value: string | null) => (value ? {value} : "-"), + }, + { + title: "Exception", + dataIndex: "exception_text", + key: "exception_text", + width: 220, render: (value: string | null) => value ? ( @@ -360,7 +368,75 @@ export default function AdminTaskMonitorPage() { [], ); - if (initializing || (overviewQuery.isLoading && !overviewQuery.data && canRead && Boolean(user))) { + const filteredWorkers = useMemo(() => { + const rows = workersOverviewQuery.data?.workers || []; + return rows.filter((item) => { + const online = (item.status || "").toUpperCase() === "ONLINE"; + if (statusFilter === "online" && !online) { + return false; + } + if (statusFilter === "offline" && online) { + return false; + } + if (!containsText(item.worker, workerKeyword.trim())) { + return false; + } + if (queueKeyword.trim()) { + const text = item.queue_names.join(", "); + if (!containsText(text, queueKeyword.trim())) { + return false; + } + } + return true; + }); + }, [workersOverviewQuery.data?.workers, statusFilter, workerKeyword, queueKeyword]); + + const allTaskRows = useMemo(() => { + const workerTaskOverviews = allTasksQuery.data || []; + const rows: TaskTableRow[] = []; + for (const overview of workerTaskOverviews) { + rows.push(...toTaskRows(overview.worker, "ACTIVE", overview.active_tasks)); + rows.push(...toTaskRows(overview.worker, "RESERVED", overview.reserved_tasks)); + rows.push(...toTaskRows(overview.worker, "SCHEDULED", overview.scheduled_tasks)); + rows.push(...toTaskRows(overview.worker, "RECENT", overview.recent_tasks)); + } + return rows; + }, [allTasksQuery.data]); + + const filteredTaskRows = useMemo(() => { + const workerSet = new Set(filteredWorkers.map((item) => item.worker)); + const keyword = taskKeyword.trim(); + return allTaskRows.filter((item) => { + if (!workerSet.has(item.worker)) { + return false; + } + if (!keyword) { + return true; + } + const haystack = [item.task_id, item.name, item.queue_name || "", item.worker, item.args_text || "", item.kwargs_text || ""] + .join(" ") + .toLowerCase(); + return haystack.includes(keyword.toLowerCase()); + }); + }, [allTaskRows, filteredWorkers, taskKeyword]); + + const stateBuckets = useMemo(() => { + const counts = new Map(); + for (const row of filteredTaskRows) { + const state = (row.state || "UNKNOWN").toUpperCase(); + counts.set(state, (counts.get(state) || 0) + 1); + } + return Array.from(counts.entries()) + .sort((left, right) => right[1] - left[1] || left[0].localeCompare(right[0])) + .map(([state, count]) => ({ state, count })); + }, [filteredTaskRows]); + + const queuePendingTotal = filteredWorkers.reduce( + (sum, item) => sum + item.active_count + item.reserved_count + item.scheduled_count, + 0, + ); + + if (initializing || (workersOverviewQuery.isLoading && !workersOverviewQuery.data && canRead && Boolean(user))) { return (
@@ -396,85 +472,111 @@ export default function AdminTaskMonitorPage() { ); } - const overview = overviewQuery.data; + const workersOverview = workersOverviewQuery.data; return ( - - 任务列表上限 - setTaskLimit(normalizePositiveInt(value, DEFAULT_TASK_LIMIT, 1, 500))} - /> - - - 历史任务扫描上限 - setHistoryLimit(normalizePositiveInt(value, DEFAULT_HISTORY_LIMIT, 0, 500))} - /> - + setWorkerKeyword(event.target.value)} + style={{ width: 220 }} + /> + setQueueKeyword(event.target.value)} + style={{ width: 220 }} + /> + setTaskKeyword(event.target.value)} + style={{ width: 240 }} + /> + setWorkerKeyword(event.target.value)} + style={{ width: 220 }} + /> + setQueueKeyword(event.target.value)} + style={{ width: 220 }} + /> +