refactor: remove scheduler service and unify celery dispatch
This commit is contained in:
@@ -35,15 +35,11 @@ CELERY_TIMEZONE=Asia/Shanghai
|
||||
CELERY_LOG_LEVEL=INFO
|
||||
CELERY_WORKER_CONCURRENCY=2
|
||||
SCHEDULER_EXPIRE_INTERVAL_SECONDS=60
|
||||
SCHEDULER_API_BASE_URL=http://scheduler:19100
|
||||
SCHEDULER_API_TOKEN=
|
||||
SCHEDULER_DEFAULT_QUEUE=default
|
||||
FLOWER_API_BASE_URL=http://flower:5555
|
||||
FLOWER_API_TIMEOUT_SECONDS=10
|
||||
FLOWER_BASIC_AUTH=admin:admin
|
||||
WORKER_REGISTRY_TTL_SECONDS=90
|
||||
CELERY_WORKER_QUEUES=default,celery
|
||||
SCHEDULER_PORT=19100
|
||||
FLOWER_PORT=5555
|
||||
WINE_BINARY_PATH=wine
|
||||
WINE_ALLOWED_ROOT=./data/wine
|
||||
|
||||
@@ -242,9 +242,6 @@ jobs:
|
||||
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1}
|
||||
CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai}
|
||||
SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60}
|
||||
SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100}
|
||||
SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-}
|
||||
SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default}
|
||||
WINE_BINARY_PATH: ${WINE_BINARY_PATH:-wine}
|
||||
WINE_ALLOWED_ROOT: ${WINE_ALLOWED_ROOT:-./data/wine}
|
||||
WINE_DEFAULT_TIMEOUT_SECONDS: ${WINE_DEFAULT_TIMEOUT_SECONDS:-300}
|
||||
@@ -293,9 +290,6 @@ jobs:
|
||||
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1}
|
||||
CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai}
|
||||
SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60}
|
||||
SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100}
|
||||
SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-}
|
||||
SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default}
|
||||
FLOWER_API_BASE_URL: ${FLOWER_API_BASE_URL:-http://flower:5555}
|
||||
FLOWER_API_TIMEOUT_SECONDS: ${FLOWER_API_TIMEOUT_SECONDS:-10}
|
||||
FLOWER_BASIC_AUTH: ${FLOWER_BASIC_AUTH:-admin:admin}
|
||||
@@ -332,37 +326,9 @@ jobs:
|
||||
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1}
|
||||
CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai}
|
||||
SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60}
|
||||
SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100}
|
||||
SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-}
|
||||
SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default}
|
||||
WORKER_REGISTRY_TTL_SECONDS: ${WORKER_REGISTRY_TTL_SECONDS:-90}
|
||||
restart: unless-stopped
|
||||
|
||||
scheduler:
|
||||
image: ${API_IMAGE}
|
||||
container_name: fquiz-scheduler
|
||||
command:
|
||||
- uvicorn
|
||||
- app.scheduler_main:app
|
||||
- --host
|
||||
- 0.0.0.0
|
||||
- --port
|
||||
- "19100"
|
||||
depends_on:
|
||||
redis:
|
||||
condition: service_healthy
|
||||
api:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis:6379/0}
|
||||
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1}
|
||||
CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai}
|
||||
SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-}
|
||||
SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default}
|
||||
ports:
|
||||
- "${SCHEDULER_PORT:-19100}:19100"
|
||||
restart: unless-stopped
|
||||
|
||||
flower:
|
||||
image: ${API_IMAGE}
|
||||
container_name: fquiz-flower
|
||||
@@ -449,10 +415,6 @@ jobs:
|
||||
CELERY_WORKER_CONCURRENCY=2
|
||||
CELERY_WORKER_QUEUES=default,celery
|
||||
SCHEDULER_EXPIRE_INTERVAL_SECONDS=60
|
||||
SCHEDULER_API_BASE_URL=http://scheduler:19100
|
||||
SCHEDULER_API_TOKEN=
|
||||
SCHEDULER_DEFAULT_QUEUE=default
|
||||
SCHEDULER_PORT=19100
|
||||
FLOWER_API_BASE_URL=http://flower:5555
|
||||
FLOWER_API_TIMEOUT_SECONDS=10
|
||||
FLOWER_BASIC_AUTH=admin:admin
|
||||
@@ -539,7 +501,6 @@ jobs:
|
||||
docker logs --tail 200 fquiz-redis || true
|
||||
docker logs --tail 200 fquiz-celery-worker || true
|
||||
docker logs --tail 200 fquiz-celery-beat || true
|
||||
docker logs --tail 200 fquiz-scheduler || true
|
||||
docker logs --tail 200 fquiz-flower || true
|
||||
exit 1
|
||||
fi
|
||||
|
||||
@@ -135,13 +135,11 @@
|
||||
- `GET /api/v1/admin/workers/tasks?worker=...&recent_limit=...`
|
||||
- 两页统一复用权限码:`celery.read` / `celery.manage`。
|
||||
|
||||
## 调度与监控口径(2026-05-01)
|
||||
## 调度与监控口径(2026-05-02)
|
||||
|
||||
- 调度能力拆分为独立 `scheduler` 服务:
|
||||
- scheduler 入口:`/api/v1/scheduler/*`
|
||||
- 任务入队:`POST /api/v1/scheduler/v1/tasks/enqueue`
|
||||
- 任务撤销:`POST /api/v1/scheduler/v1/tasks/revoke`
|
||||
- 通过 `x-scheduler-token`(`SCHEDULER_API_TOKEN`)做可选鉴权。
|
||||
- 调度链路已统一为 API 直连 Celery(不再保留独立 `scheduler` 服务与 `SCHEDULER_API_*` 配置):
|
||||
- Web 任务调用 API 业务接口后,由后端服务层直接 `.delay()` 入队。
|
||||
- 任务执行与定时触发继续由 `celery-worker` / `celery-beat` 负责。
|
||||
- 监控能力统一走 Flower 代理:
|
||||
- 后端代理入口:
|
||||
- `GET /api/v1/admin/flower/workers`
|
||||
|
||||
@@ -9,7 +9,6 @@ from .v1.flower_monitor import router as flower_monitor_router
|
||||
from .v1.lightning import router as lightning_router
|
||||
from .v1.lines import router as lines_router
|
||||
from .v1.question_bank import router as question_bank_router
|
||||
from .v1.scheduler import router as scheduler_router
|
||||
from .v1.system_params import router as system_params_router
|
||||
from .v1.task_monitor import router as task_monitor_router
|
||||
from .v1.users import router as users_router
|
||||
@@ -29,7 +28,6 @@ v1_router.include_router(flower_monitor_router)
|
||||
v1_router.include_router(lightning_router)
|
||||
v1_router.include_router(lines_router)
|
||||
v1_router.include_router(question_bank_router)
|
||||
v1_router.include_router(scheduler_router)
|
||||
v1_router.include_router(wine_router)
|
||||
v1_router.include_router(ws_router)
|
||||
|
||||
|
||||
@@ -111,7 +111,6 @@ def get_elevation_job_detail(
|
||||
@router.post("/jobs/apply-line", response_model=ElevationApplyJobCreateResponse)
|
||||
def create_elevation_apply_line_job(
|
||||
payload: ElevationApplyJobCreateRequest,
|
||||
dispatch_mode: str = Query(default="celery_direct", alias="dispatchMode"),
|
||||
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
|
||||
db: Session = Depends(get_db),
|
||||
) -> ElevationApplyJobCreateResponse:
|
||||
@@ -119,5 +118,4 @@ def create_elevation_apply_line_job(
|
||||
db,
|
||||
payload,
|
||||
actor=current_user.user,
|
||||
dispatch_mode=dispatch_mode,
|
||||
)
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import APIRouter, Depends, Header, HTTPException, status
|
||||
|
||||
from ...core.config import get_settings
|
||||
from ...schemas.scheduler import (
|
||||
SchedulerEnqueueTaskRequest,
|
||||
SchedulerEnqueueTaskResponse,
|
||||
SchedulerRevokeTaskRequest,
|
||||
SchedulerRevokeTaskResponse,
|
||||
)
|
||||
from ...services.scheduler_service import enqueue_task, revoke_task
|
||||
|
||||
router = APIRouter(prefix="/scheduler", tags=["scheduler"])
|
||||
|
||||
|
||||
def _require_scheduler_token(
|
||||
x_scheduler_token: str | None = Header(default=None, alias="x-scheduler-token"),
|
||||
) -> None:
|
||||
expected = get_settings().resolved_scheduler_api_token
|
||||
if not expected:
|
||||
return
|
||||
provided = (x_scheduler_token or "").strip()
|
||||
if provided == expected:
|
||||
return
|
||||
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized scheduler request")
|
||||
|
||||
|
||||
@router.get("/healthz")
|
||||
def scheduler_healthz() -> dict[str, object]:
|
||||
return {
|
||||
"success": True,
|
||||
"status": "ok",
|
||||
"service": "fquiz-scheduler",
|
||||
}
|
||||
|
||||
|
||||
@router.post("/v1/tasks/enqueue", response_model=SchedulerEnqueueTaskResponse)
|
||||
def enqueue_scheduler_task(
|
||||
payload: SchedulerEnqueueTaskRequest,
|
||||
_: None = Depends(_require_scheduler_token),
|
||||
) -> SchedulerEnqueueTaskResponse:
|
||||
return enqueue_task(payload)
|
||||
|
||||
|
||||
@router.post("/v1/tasks/revoke", response_model=SchedulerRevokeTaskResponse)
|
||||
def revoke_scheduler_task(
|
||||
payload: SchedulerRevokeTaskRequest,
|
||||
_: None = Depends(_require_scheduler_token),
|
||||
) -> SchedulerRevokeTaskResponse:
|
||||
return revoke_task(payload)
|
||||
@@ -52,9 +52,6 @@ class Settings(BaseSettings):
|
||||
celery_result_backend: str | None = None
|
||||
celery_timezone: str = "Asia/Shanghai"
|
||||
scheduler_expire_interval_seconds: int = 60
|
||||
scheduler_api_token: str = ""
|
||||
scheduler_default_queue: str = "default"
|
||||
scheduler_api_base_url: str = "http://scheduler:19100"
|
||||
flower_api_base_url: str = "http://flower:5555"
|
||||
flower_api_timeout_seconds: int = 10
|
||||
flower_basic_auth: str = ""
|
||||
@@ -201,22 +198,6 @@ class Settings(BaseSettings):
|
||||
def resolved_celery_result_backend(self) -> str:
|
||||
return (self.celery_result_backend or "redis://redis:6379/1").strip()
|
||||
|
||||
@property
|
||||
def resolved_scheduler_api_token(self) -> str:
|
||||
return (self.scheduler_api_token or "").strip()
|
||||
|
||||
@property
|
||||
def resolved_scheduler_default_queue(self) -> str:
|
||||
queue = (self.scheduler_default_queue or "").strip()
|
||||
return queue or "default"
|
||||
|
||||
@property
|
||||
def resolved_scheduler_api_base_url(self) -> str:
|
||||
value = (self.scheduler_api_base_url or "").strip().rstrip("/")
|
||||
if not value:
|
||||
return "http://scheduler:19100"
|
||||
return value
|
||||
|
||||
@property
|
||||
def resolved_flower_api_base_url(self) -> str:
|
||||
value = (self.flower_api_base_url or "").strip().rstrip("/")
|
||||
|
||||
@@ -1,10 +0,0 @@
|
||||
from fastapi import FastAPI
|
||||
|
||||
from .api.v1.scheduler import router as scheduler_router
|
||||
|
||||
app = FastAPI(
|
||||
title="fquiz-scheduler",
|
||||
version="0.1.0",
|
||||
)
|
||||
|
||||
app.include_router(scheduler_router, prefix="/api/v1")
|
||||
@@ -1,41 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class SchedulerEnqueueTaskRequest(BaseModel):
|
||||
task_name: str = Field(min_length=1, max_length=255, alias="taskName")
|
||||
queue_name: str | None = Field(default=None, max_length=255, alias="queueName")
|
||||
task_id: str | None = Field(default=None, max_length=255, alias="taskId")
|
||||
args: list[Any] = Field(default_factory=list)
|
||||
kwargs: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
|
||||
class SchedulerEnqueueTaskResponse(BaseModel):
|
||||
queued: bool = True
|
||||
task_id: str = Field(alias="taskId")
|
||||
queue_name: str = Field(alias="queueName")
|
||||
task_name: str = Field(alias="taskName")
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
|
||||
class SchedulerRevokeTaskRequest(BaseModel):
|
||||
task_id: str = Field(min_length=1, max_length=255, alias="taskId")
|
||||
terminate: bool = True
|
||||
signal: str = Field(default="SIGTERM", max_length=32)
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
|
||||
|
||||
class SchedulerRevokeTaskResponse(BaseModel):
|
||||
revoked: bool = True
|
||||
task_id: str = Field(alias="taskId")
|
||||
terminate: bool
|
||||
signal: str
|
||||
|
||||
model_config = {"populate_by_name": True}
|
||||
@@ -335,7 +335,6 @@ def create_apply_job(
|
||||
payload: ElevationApplyJobCreateRequest,
|
||||
*,
|
||||
actor: User,
|
||||
dispatch_mode: str = "celery_direct",
|
||||
) -> ElevationApplyJobCreateResponse:
|
||||
line = db.execute(select(Line).where(Line.id == payload.line_id)).scalar_one_or_none()
|
||||
if not line:
|
||||
@@ -380,10 +379,7 @@ def create_apply_job(
|
||||
if not saved:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="创建任务失败")
|
||||
|
||||
task = _dispatch_elevation_apply_task(
|
||||
job_id=saved.id,
|
||||
dispatch_mode=dispatch_mode,
|
||||
)
|
||||
task = _dispatch_elevation_apply_task(job_id=saved.id)
|
||||
saved.task_id = task.id
|
||||
saved.update_user = actor.id
|
||||
saved.update_date = utcnow()
|
||||
@@ -400,60 +396,12 @@ def create_apply_job(
|
||||
return ElevationApplyJobCreateResponse(job=serialize_job(latest), queued=True)
|
||||
|
||||
|
||||
def _dispatch_elevation_apply_task(*, job_id: str, dispatch_mode: str):
|
||||
normalized_mode = (dispatch_mode or "").strip().lower()
|
||||
if normalized_mode == "scheduler_api":
|
||||
return _enqueue_via_scheduler_api(job_id)
|
||||
|
||||
def _dispatch_elevation_apply_task(*, job_id: str):
|
||||
from ..tasks.elevation_tasks import apply_elevation_for_line_job
|
||||
|
||||
return apply_elevation_for_line_job.delay(job_id)
|
||||
|
||||
|
||||
def _enqueue_via_scheduler_api(job_id: str):
|
||||
from ..core.config import get_settings
|
||||
|
||||
import httpx
|
||||
|
||||
settings = get_settings()
|
||||
scheduler_base_url = settings.resolved_scheduler_api_base_url
|
||||
path = "/api/v1/v1/tasks/enqueue"
|
||||
payload = {
|
||||
"taskName": "app.tasks.elevation_tasks.apply_elevation_for_line_job",
|
||||
"taskId": job_id,
|
||||
"queueName": settings.resolved_scheduler_default_queue,
|
||||
"args": [job_id],
|
||||
"kwargs": {},
|
||||
}
|
||||
headers = {"Content-Type": "application/json"}
|
||||
token = settings.resolved_scheduler_api_token
|
||||
if token:
|
||||
headers["x-scheduler-token"] = token
|
||||
try:
|
||||
with httpx.Client(timeout=15) as client:
|
||||
response = client.post(f"{scheduler_base_url}{path}", json=payload, headers=headers)
|
||||
if response.status_code >= 400:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_502_BAD_GATEWAY,
|
||||
detail=f"scheduler enqueue failed: {response.status_code} {response.text}",
|
||||
)
|
||||
data = response.json()
|
||||
task_id = str(data.get("taskId") or data.get("task_id") or job_id).strip() or job_id
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as exc:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_502_BAD_GATEWAY,
|
||||
detail=f"scheduler request failed: {exc}",
|
||||
) from exc
|
||||
|
||||
class _TaskRef:
|
||||
def __init__(self, value: str) -> None:
|
||||
self.id = value
|
||||
|
||||
return _TaskRef(task_id)
|
||||
|
||||
|
||||
def execute_apply_job(job_id: str) -> None:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
|
||||
@@ -1,61 +0,0 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from fastapi import HTTPException, status
|
||||
|
||||
from ..core.celery_app import celery_app
|
||||
from ..core.config import get_settings
|
||||
from ..schemas.scheduler import (
|
||||
SchedulerEnqueueTaskRequest,
|
||||
SchedulerEnqueueTaskResponse,
|
||||
SchedulerRevokeTaskRequest,
|
||||
SchedulerRevokeTaskResponse,
|
||||
)
|
||||
|
||||
|
||||
def enqueue_task(payload: SchedulerEnqueueTaskRequest) -> SchedulerEnqueueTaskResponse:
|
||||
settings = get_settings()
|
||||
task_name = payload.task_name.strip()
|
||||
if not task_name:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="taskName is required")
|
||||
|
||||
queue_name = (payload.queue_name or "").strip() or settings.resolved_scheduler_default_queue
|
||||
task_id = (payload.task_id or "").strip() or None
|
||||
|
||||
try:
|
||||
result = celery_app.send_task(
|
||||
task_name,
|
||||
args=list(payload.args),
|
||||
kwargs=dict(payload.kwargs),
|
||||
task_id=task_id,
|
||||
queue=queue_name,
|
||||
)
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"scheduler enqueue failed: {exc}") from exc
|
||||
|
||||
return SchedulerEnqueueTaskResponse(
|
||||
queued=True,
|
||||
taskId=result.id,
|
||||
queueName=queue_name,
|
||||
taskName=task_name,
|
||||
)
|
||||
|
||||
|
||||
def revoke_task(payload: SchedulerRevokeTaskRequest) -> SchedulerRevokeTaskResponse:
|
||||
task_id = payload.task_id.strip()
|
||||
if not task_id:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="taskId is required")
|
||||
|
||||
signal = (payload.signal or "").strip() or "SIGTERM"
|
||||
terminate = bool(payload.terminate)
|
||||
|
||||
try:
|
||||
celery_app.control.revoke(task_id, terminate=terminate, signal=signal)
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"scheduler revoke failed: {exc}") from exc
|
||||
|
||||
return SchedulerRevokeTaskResponse(
|
||||
revoked=True,
|
||||
taskId=task_id,
|
||||
terminate=terminate,
|
||||
signal=signal,
|
||||
)
|
||||
@@ -125,9 +125,6 @@ services:
|
||||
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1}
|
||||
CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai}
|
||||
SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60}
|
||||
SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100}
|
||||
SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-}
|
||||
SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default}
|
||||
WINE_BINARY_PATH: ${WINE_BINARY_PATH:-wine}
|
||||
WINE_ALLOWED_ROOT: ${WINE_ALLOWED_ROOT:-./data/wine}
|
||||
WINE_DEFAULT_TIMEOUT_SECONDS: ${WINE_DEFAULT_TIMEOUT_SECONDS:-300}
|
||||
@@ -189,9 +186,6 @@ services:
|
||||
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1}
|
||||
CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai}
|
||||
SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60}
|
||||
SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100}
|
||||
SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-}
|
||||
SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default}
|
||||
FLOWER_API_BASE_URL: ${FLOWER_API_BASE_URL:-http://flower:5555}
|
||||
FLOWER_API_TIMEOUT_SECONDS: ${FLOWER_API_TIMEOUT_SECONDS:-10}
|
||||
FLOWER_BASIC_AUTH: ${FLOWER_BASIC_AUTH:-}
|
||||
@@ -235,44 +229,9 @@ services:
|
||||
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1}
|
||||
CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai}
|
||||
SCHEDULER_EXPIRE_INTERVAL_SECONDS: ${SCHEDULER_EXPIRE_INTERVAL_SECONDS:-60}
|
||||
SCHEDULER_API_BASE_URL: ${SCHEDULER_API_BASE_URL:-http://scheduler:19100}
|
||||
SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-}
|
||||
SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default}
|
||||
WORKER_REGISTRY_TTL_SECONDS: ${WORKER_REGISTRY_TTL_SECONDS:-90}
|
||||
restart: unless-stopped
|
||||
|
||||
scheduler:
|
||||
build:
|
||||
context: ./api
|
||||
dockerfile: Dockerfile
|
||||
args:
|
||||
PYTHON_BASE_IMAGE: ${PYTHON_BASE_IMAGE:-docker.m.daocloud.io/library/python:3.11-slim}
|
||||
PIP_INDEX_URL: ${PIP_INDEX_URL:-https://pypi.org/simple}
|
||||
PIP_DEFAULT_TIMEOUT: ${PIP_DEFAULT_TIMEOUT:-300}
|
||||
PIP_RETRIES: ${PIP_RETRIES:-20}
|
||||
container_name: fquiz-scheduler
|
||||
command:
|
||||
- uvicorn
|
||||
- app.scheduler_main:app
|
||||
- --host
|
||||
- 0.0.0.0
|
||||
- --port
|
||||
- "19100"
|
||||
depends_on:
|
||||
redis:
|
||||
condition: service_healthy
|
||||
api:
|
||||
condition: service_healthy
|
||||
environment:
|
||||
CELERY_BROKER_URL: ${CELERY_BROKER_URL:-redis://redis:6379/0}
|
||||
CELERY_RESULT_BACKEND: ${CELERY_RESULT_BACKEND:-redis://redis:6379/1}
|
||||
CELERY_TIMEZONE: ${CELERY_TIMEZONE:-Asia/Shanghai}
|
||||
SCHEDULER_API_TOKEN: ${SCHEDULER_API_TOKEN:-}
|
||||
SCHEDULER_DEFAULT_QUEUE: ${SCHEDULER_DEFAULT_QUEUE:-default}
|
||||
ports:
|
||||
- "${SCHEDULER_PORT:-19100}:19100"
|
||||
restart: unless-stopped
|
||||
|
||||
flower:
|
||||
build:
|
||||
context: ./api
|
||||
|
||||
@@ -51,3 +51,53 @@
|
||||
|
||||
- 风险与影响:
|
||||
- 若服务器上已有其他容器占用 `3000`,本次部署会失败(可预期失败),需先释放端口或手动调整冲突容器。
|
||||
|
||||
## Work Log - 移除 scheduler 服务并统一任务调度为 API 直连 Celery(2026-05-02)
|
||||
|
||||
- 背景:
|
||||
- 用户明确要求“去掉 scheduler”。
|
||||
- 当前仓库默认调度本就为 `celery_direct`,`scheduler` 仅作为可选分支与独立容器存在。
|
||||
|
||||
- 本次改动(最小闭环):
|
||||
- 后端任务派发收敛为直连 Celery:
|
||||
- `api/app/services/elevation_service.py`
|
||||
- 删除 `dispatch_mode` 分支与 `_enqueue_via_scheduler_api` 转发实现。
|
||||
- 保留单一路径:`apply_elevation_for_line_job.delay(job_id)`。
|
||||
- `api/app/api/v1/elevation.py`
|
||||
- 删除 `dispatchMode` 查询参数透传。
|
||||
- 后端路由与配置清理:
|
||||
- `api/app/api/router.py`
|
||||
- 移除 `scheduler` 路由注册。
|
||||
- `api/app/core/config.py`
|
||||
- 删除 `scheduler_api_token` / `scheduler_default_queue` / `scheduler_api_base_url` 及其 `resolved_*` 属性。
|
||||
- 保留 `scheduler_expire_interval_seconds` 作为 Celery Beat 定时任务间隔配置。
|
||||
- 删除 scheduler 相关源文件:
|
||||
- `api/app/api/v1/scheduler.py`
|
||||
- `api/app/services/scheduler_service.py`
|
||||
- `api/app/schemas/scheduler.py`
|
||||
- `api/app/scheduler_main.py`
|
||||
- 运行与部署配置同步:
|
||||
- `docker-compose.yml`
|
||||
- 删除 `scheduler` 服务。
|
||||
- 删除 `api` / `celery-worker` / `celery-beat` 的 `SCHEDULER_API_BASE_URL`、`SCHEDULER_API_TOKEN`、`SCHEDULER_DEFAULT_QUEUE`。
|
||||
- `.env.example`
|
||||
- 删除 `SCHEDULER_API_BASE_URL`、`SCHEDULER_API_TOKEN`、`SCHEDULER_DEFAULT_QUEUE`、`SCHEDULER_PORT`。
|
||||
- `.github/workflows/main.yml`
|
||||
- 删除生产 compose 模板中的 `scheduler` 服务块。
|
||||
- 删除部署模板与默认 `.env` 中的 `SCHEDULER_API_*` / `SCHEDULER_PORT`。
|
||||
- 删除故障诊断日志中的 `fquiz-scheduler`。
|
||||
- 长期记忆更新:
|
||||
- `MEMORY.md`
|
||||
- 将“调度与监控口径”更新为“API 直连 Celery,不再保留 scheduler 服务”。
|
||||
|
||||
- 验证:
|
||||
- 语法检查通过:
|
||||
- `python3 -m py_compile api/app/api/router.py api/app/api/v1/elevation.py api/app/services/elevation_service.py api/app/core/config.py`
|
||||
- 关键残留检查:
|
||||
- `rg -n "scheduler_main|services/scheduler_service|api/v1/scheduler|SCHEDULER_API_BASE_URL|SCHEDULER_API_TOKEN|SCHEDULER_DEFAULT_QUEUE|scheduler_api|fquiz-scheduler|SCHEDULER_PORT|resolved_scheduler_" .`
|
||||
- 仅命中文档历史记录,不再命中运行代码与部署配置。
|
||||
|
||||
- 风险与影响:
|
||||
- 影响范围:任务调度入口、部署编排与环境模板。
|
||||
- 行为变化:不再支持 `dispatchMode=scheduler_api` 与独立 scheduler HTTP 网关调用。
|
||||
- 保持不变:默认任务链路(API 直连 Celery)与 Flower 监控链路。
|
||||
|
||||
Reference in New Issue
Block a user