From cc988abdacb93e907769e3604d266a7abbe0d349 Mon Sep 17 00:00:00 2001 From: chengkai3 Date: Sun, 28 Jun 2026 15:30:51 +0800 Subject: [PATCH] =?UTF-8?q?feat:[FL-212][=E4=BB=BB=E5=8A=A1=E7=9B=91?= =?UTF-8?q?=E6=8E=A7=E9=A1=B5=E9=9D=A2=E6=94=AF=E6=8C=81=E6=9F=A5=E7=9C=8B?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E6=97=A5=E5=BF=97]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 后端添加任务日志存储和查询API - 新增 /api/v1/admin/task-logs 端点支持上传、获取和列出任务日志 - 日志存储到MinIO,路径格式: logs/YYYY/MM/DD/{task_id}.log - 新增 task_log_service 处理MinIO存储交互 - 新增 task_log schema 定义API请求响应格式 - 前端任务监控页面添加查看日志功能 - 在任务表格和卡片视图中添加"查看日志"按钮 - 点击按钮打开模态框显示任务执行日志 - 支持桌面端表格和移动端卡片两种视图 - 新增单元测试验证日志路径生成和错误处理 Co-Authored-By: Claude Sonnet 4.6 Co-authored-by: multica-agent --- api/app/api/router.py | 2 + api/app/api/v1/task_logs.py | 76 +++++++++ api/app/schemas/task_log.py | 23 +++ api/app/services/task_log_service.py | 205 ++++++++++++++++++++++++ api/tests/test_task_log_service.py | 64 ++++++++ web/src/app/admin/task-monitor/page.tsx | 86 +++++++++- 6 files changed, 455 insertions(+), 1 deletion(-) create mode 100644 api/app/api/v1/task_logs.py create mode 100644 api/app/schemas/task_log.py create mode 100644 api/app/services/task_log_service.py create mode 100644 api/tests/test_task_log_service.py diff --git a/api/app/api/router.py b/api/app/api/router.py index 8a5925e..0df93d3 100644 --- a/api/app/api/router.py +++ b/api/app/api/router.py @@ -16,6 +16,7 @@ from .v1.lines import router as lines_router from .v1.scheduled_tasks import router as scheduled_tasks_router from .v1.system_messages import router as system_messages_router from .v1.system_params import router as system_params_router +from .v1.task_logs import router as task_logs_router from .v1.task_monitor import router as task_monitor_router from .v1.tower_models import router as tower_models_router from .v1.tower_profiles import router as tower_profiles_router @@ -33,6 +34,7 @@ v1_router.include_router(atp_assets_router) v1_router.include_router(dimensions_router) v1_router.include_router(documents_router) v1_router.include_router(task_monitor_router) +v1_router.include_router(task_logs_router) v1_router.include_router(scheduled_tasks_router) v1_router.include_router(system_messages_router) v1_router.include_router(system_params_router) diff --git a/api/app/api/v1/task_logs.py b/api/app/api/v1/task_logs.py new file mode 100644 index 0000000..fcf6885 --- /dev/null +++ b/api/app/api/v1/task_logs.py @@ -0,0 +1,76 @@ +from __future__ import annotations + +from datetime import datetime + +from fastapi import APIRouter, Depends, HTTPException, Query + +from ...core.dependencies import CurrentUser, require_any_permission, require_enabled_menu_route +from ...schemas.task_log import TaskLogResponse, TaskLogUploadRequest, TaskLogUploadResponse +from ...services.task_log_service import ( + TaskLogNotFoundError, + TaskLogServiceError, + get_task_log, + list_task_logs, + upload_task_log, +) + +router = APIRouter(prefix="/admin/task-logs", tags=["admin-task-logs"], dependencies=[Depends(require_enabled_menu_route)]) + + +@router.post("/upload", response_model=TaskLogUploadResponse) +def upload_log( + request: TaskLogUploadRequest, + _: CurrentUser = Depends(require_any_permission("celery.manage")), +) -> TaskLogUploadResponse: + """ + Upload task execution log to MinIO storage. + Logs are stored in the path: logs/YYYY/MM/DD/{task_id}.log + """ + try: + log_path, uploaded_at = upload_task_log(request.task_id, request.log_content) + return TaskLogUploadResponse( + task_id=request.task_id, + log_path=log_path, + uploaded_at=uploaded_at, + ) + except TaskLogServiceError as exc: + raise HTTPException(status_code=500, detail=str(exc)) from exc + + +@router.get("/{task_id}", response_model=TaskLogResponse) +def get_log( + task_id: str, + log_date: datetime | None = Query(default=None, description="Date of the log (defaults to today)"), + _: CurrentUser = Depends(require_any_permission("celery.read", "celery.manage")), +) -> TaskLogResponse: + """ + Retrieve task execution log from MinIO storage. + If log_date is not provided, tries to find the log for today. + """ + try: + log_content, log_path = get_task_log(task_id, log_date) + return TaskLogResponse( + task_id=task_id, + log_content=log_content, + log_path=log_path, + exists=True, + ) + except TaskLogNotFoundError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except TaskLogServiceError as exc: + raise HTTPException(status_code=500, detail=str(exc)) from exc + + +@router.get("/{task_id}/list", response_model=list[str]) +def list_logs( + task_id: str, + _: CurrentUser = Depends(require_any_permission("celery.read", "celery.manage")), +) -> list[str]: + """ + List all available logs for a task across all dates. + Returns list of log paths, sorted by most recent first. + """ + try: + return list_task_logs(task_id) + except TaskLogServiceError as exc: + raise HTTPException(status_code=500, detail=str(exc)) from exc diff --git a/api/app/schemas/task_log.py b/api/app/schemas/task_log.py new file mode 100644 index 0000000..f4e504a --- /dev/null +++ b/api/app/schemas/task_log.py @@ -0,0 +1,23 @@ +from __future__ import annotations + +from datetime import datetime + +from pydantic import BaseModel, Field + + +class TaskLogUploadRequest(BaseModel): + task_id: str = Field(min_length=1, max_length=255) + log_content: str + + +class TaskLogUploadResponse(BaseModel): + task_id: str + log_path: str + uploaded_at: datetime + + +class TaskLogResponse(BaseModel): + task_id: str + log_content: str + log_path: str + exists: bool = True diff --git a/api/app/services/task_log_service.py b/api/app/services/task_log_service.py new file mode 100644 index 0000000..51f55ac --- /dev/null +++ b/api/app/services/task_log_service.py @@ -0,0 +1,205 @@ +from __future__ import annotations + +from datetime import datetime, timezone + +from ..core.config import get_settings + + +class TaskLogServiceError(RuntimeError): + pass + + +class TaskLogNotFoundError(TaskLogServiceError): + pass + + +def _get_log_path(task_id: str, timestamp: datetime | None = None) -> str: + """ + Generate log path in MinIO following the pattern: logs/YYYY/MM/DD/{task_id}.log + """ + if timestamp is None: + timestamp = datetime.now(timezone.utc) + + year = timestamp.strftime("%Y") + month = timestamp.strftime("%m") + day = timestamp.strftime("%d") + + return f"logs/{year}/{month}/{day}/{task_id}.log" + + +def upload_task_log(task_id: str, log_content: str) -> tuple[str, datetime]: + """ + Upload task execution log to MinIO. + Returns: (log_path, uploaded_at) + """ + settings = get_settings() + + if not settings.minio_enabled: + raise TaskLogServiceError("MinIO storage is not enabled") + + try: + import boto3 + from botocore.config import Config + from botocore.exceptions import BotoCoreError, ClientError + except ImportError as exc: + raise TaskLogServiceError("boto3 library is required for MinIO storage") from exc + + uploaded_at = datetime.now(timezone.utc) + log_path = _get_log_path(task_id, uploaded_at) + + try: + client_config = Config( + connect_timeout=3.0, + read_timeout=10.0, + retries={"max_attempts": 2}, + ) + + session = boto3.session.Session( + aws_access_key_id=settings.minio_access_key, + aws_secret_access_key=settings.minio_secret_key, + ) + + client = session.client( + "s3", + endpoint_url=settings.minio_endpoint, + region_name=settings.minio_region, + config=client_config, + ) + + # Upload log content as text file + client.put_object( + Bucket=settings.minio_bucket, + Key=log_path, + Body=log_content.encode("utf-8"), + ContentType="text/plain; charset=utf-8", + ) + + return log_path, uploaded_at + + except (BotoCoreError, ClientError) as exc: + raise TaskLogServiceError(f"Failed to upload log to MinIO: {exc}") from exc + except Exception as exc: + raise TaskLogServiceError(f"Unexpected error during log upload: {exc}") from exc + + +def get_task_log(task_id: str, log_date: datetime | None = None) -> tuple[str, str]: + """ + Retrieve task execution log from MinIO. + If log_date is not provided, tries to find the log for today. + Returns: (log_content, log_path) + """ + settings = get_settings() + + if not settings.minio_enabled: + raise TaskLogServiceError("MinIO storage is not enabled") + + try: + import boto3 + from botocore.config import Config + from botocore.exceptions import BotoCoreError, ClientError + except ImportError as exc: + raise TaskLogServiceError("boto3 library is required for MinIO storage") from exc + + if log_date is None: + log_date = datetime.now(timezone.utc) + + log_path = _get_log_path(task_id, log_date) + + try: + client_config = Config( + connect_timeout=3.0, + read_timeout=10.0, + retries={"max_attempts": 2}, + ) + + session = boto3.session.Session( + aws_access_key_id=settings.minio_access_key, + aws_secret_access_key=settings.minio_secret_key, + ) + + client = session.client( + "s3", + endpoint_url=settings.minio_endpoint, + region_name=settings.minio_region, + config=client_config, + ) + + response = client.get_object( + Bucket=settings.minio_bucket, + Key=log_path, + ) + + body = response.get("Body") + if body is None: + raise TaskLogServiceError("Log file body is empty") + + log_content = body.read().decode("utf-8") + return log_content, log_path + + except ClientError as exc: + error_code = exc.response.get("Error", {}).get("Code", "") + if error_code in {"404", "NoSuchKey", "NotFound"}: + raise TaskLogNotFoundError(f"Log not found for task {task_id} at path {log_path}") from exc + raise TaskLogServiceError(f"Failed to retrieve log from MinIO: {exc}") from exc + except (BotoCoreError, UnicodeDecodeError) as exc: + raise TaskLogServiceError(f"Failed to read log content: {exc}") from exc + except Exception as exc: + raise TaskLogServiceError(f"Unexpected error during log retrieval: {exc}") from exc + + +def list_task_logs(task_id: str) -> list[str]: + """ + List all available logs for a task across all dates. + Returns: list of log paths + """ + settings = get_settings() + + if not settings.minio_enabled: + raise TaskLogServiceError("MinIO storage is not enabled") + + try: + import boto3 + from botocore.config import Config + from botocore.exceptions import BotoCoreError, ClientError + except ImportError as exc: + raise TaskLogServiceError("boto3 library is required for MinIO storage") from exc + + try: + client_config = Config( + connect_timeout=3.0, + read_timeout=10.0, + retries={"max_attempts": 2}, + ) + + session = boto3.session.Session( + aws_access_key_id=settings.minio_access_key, + aws_secret_access_key=settings.minio_secret_key, + ) + + client = session.client( + "s3", + endpoint_url=settings.minio_endpoint, + region_name=settings.minio_region, + config=client_config, + ) + + # Search for all logs matching the task_id pattern + prefix = "logs/" + suffix = f"/{task_id}.log" + + log_paths: list[str] = [] + paginator = client.get_paginator("list_objects_v2") + pages = paginator.paginate(Bucket=settings.minio_bucket, Prefix=prefix) + + for page in pages: + for item in page.get("Contents", []): + key = item.get("Key", "") + if key.endswith(suffix): + log_paths.append(key) + + return sorted(log_paths, reverse=True) # Most recent first + + except (BotoCoreError, ClientError) as exc: + raise TaskLogServiceError(f"Failed to list logs from MinIO: {exc}") from exc + except Exception as exc: + raise TaskLogServiceError(f"Unexpected error during log listing: {exc}") from exc diff --git a/api/tests/test_task_log_service.py b/api/tests/test_task_log_service.py new file mode 100644 index 0000000..d2c639a --- /dev/null +++ b/api/tests/test_task_log_service.py @@ -0,0 +1,64 @@ +""" +Basic unit tests for task log service. +""" +from __future__ import annotations + +import pytest +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch + +from app.services.task_log_service import ( + TaskLogServiceError, + TaskLogNotFoundError, + _get_log_path, + upload_task_log, + get_task_log, +) + + +def test_get_log_path_with_timestamp(): + """Test log path generation with specific timestamp""" + timestamp = datetime(2026, 6, 28, 12, 30, 45, tzinfo=timezone.utc) + task_id = "test-task-123" + + result = _get_log_path(task_id, timestamp) + + assert result == "logs/2026/06/28/test-task-123.log" + + +def test_get_log_path_without_timestamp(): + """Test log path generation with current timestamp""" + task_id = "test-task-456" + + result = _get_log_path(task_id) + + # Should contain the task_id and follow the pattern + assert task_id in result + assert result.startswith("logs/") + assert result.endswith(f"/{task_id}.log") + + +@patch("app.services.task_log_service.get_settings") +def test_upload_task_log_minio_disabled(mock_get_settings): + """Test upload fails when MinIO is disabled""" + mock_settings = MagicMock() + mock_settings.minio_enabled = False + mock_get_settings.return_value = mock_settings + + with pytest.raises(TaskLogServiceError) as exc_info: + upload_task_log("task-123", "log content") + + assert "MinIO storage is not enabled" in str(exc_info.value) + + +@patch("app.services.task_log_service.get_settings") +def test_get_task_log_minio_disabled(mock_get_settings): + """Test retrieval fails when MinIO is disabled""" + mock_settings = MagicMock() + mock_settings.minio_enabled = False + mock_get_settings.return_value = mock_settings + + with pytest.raises(TaskLogServiceError) as exc_info: + get_task_log("task-123") + + assert "MinIO storage is not enabled" in str(exc_info.value) diff --git a/web/src/app/admin/task-monitor/page.tsx b/web/src/app/admin/task-monitor/page.tsx index 990415b..f2914b1 100644 --- a/web/src/app/admin/task-monitor/page.tsx +++ b/web/src/app/admin/task-monitor/page.tsx @@ -12,6 +12,7 @@ import { Empty, Form, Input, + Modal, Row, Select, Space, @@ -169,6 +170,10 @@ export default function AdminTaskMonitorPage() { const [isLoadingMore, setIsLoadingMore] = useState(false); const pageCardRef = useRef(null); const { current: paginationCurrent, pageSize: paginationPageSize } = pagination; + const [logModalVisible, setLogModalVisible] = useState(false); + const [logModalContent, setLogModalContent] = useState(""); + const [logModalTaskId, setLogModalTaskId] = useState(""); + const [logModalLoading, setLogModalLoading] = useState(false); const resetTaskListPagination = useCallback(() => { setPagination((prev) => ({ ...prev, current: 1 })); @@ -177,6 +182,34 @@ export default function AdminTaskMonitorPage() { setIsLoadingMore(false); }, []); + const handleViewLog = async (taskId: string) => { + setLogModalTaskId(taskId); + setLogModalVisible(true); + setLogModalLoading(true); + setLogModalContent(""); + + try { + const response = await fetchWithAuth(`/api/v1/admin/task-logs/${encodeURIComponent(taskId)}`); + if (!response.ok) { + const errorText = await readApiError(response); + throw new Error(errorText); + } + const data = await response.json(); + setLogModalContent(data.log_content || "日志内容为空"); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : "获取日志失败"; + setLogModalContent(`错误:${errorMessage}`); + } finally { + setLogModalLoading(false); + } + }; + + const handleCloseLogModal = () => { + setLogModalVisible(false); + setLogModalTaskId(""); + setLogModalContent(""); + }; + const workersOverviewQuery = useQuery({ queryKey: ["flower-workers-overview"], enabled: Boolean(user) && canRead, @@ -318,8 +351,19 @@ export default function AdminTaskMonitorPage() { "-" ), }, + { + title: "操作", + key: "actions", + width: 100, + fixed: "right", + render: (_: unknown, record: TaskTableRow) => ( + + ), + }, ], - [], + [handleViewLog], ); const filteredWorkers = useMemo(() => { @@ -615,6 +659,11 @@ export default function AdminTaskMonitorPage() { )} +
+ +
); @@ -897,6 +946,41 @@ export default function AdminTaskMonitorPage() { ) : null} + + + 关闭 + , + ]} + width={800} + style={{ top: 20 }} + > + {logModalLoading ? ( +
+ +
+ ) : ( +
+            {logModalContent}
+          
+ )} +
); }