Files

110 lines
3.8 KiB
Python
Raw Permalink Normal View History

from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import create_engine, select
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.database import Base
from app.models.audit_log import AuditLog
from app.models.scheduled_task import ScheduledTask
from app.schemas.scheduled_task import ScheduledTaskCreateRequest, ScheduledTaskUpdateRequest
from app.services.scheduled_task_service import (
cleanup_audit_logs,
compute_next_run_at,
create_scheduled_task,
get_scheduled_task_by_key,
seed_default_scheduled_tasks,
update_scheduled_task,
)
def _build_sessionmaker(*tables):
engine = create_engine(
"sqlite+pysqlite://",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
Base.metadata.create_all(bind=engine, tables=list(tables))
return sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False)
def test_seed_default_scheduled_tasks_creates_syslog_cleanup_task() -> None:
testing_session = _build_sessionmaker(ScheduledTask.__table__)
session: Session = testing_session()
try:
seed_default_scheduled_tasks(session)
created = get_scheduled_task_by_key(session, "syslog.cleanup.default")
assert created is not None
assert created.task_type == "syslog_cleanup"
assert created.enabled is True
assert created.retain_days == 30
assert created.next_run_at is not None
finally:
session.close()
def test_create_and_update_scheduled_task_recomputes_next_run() -> None:
testing_session = _build_sessionmaker(ScheduledTask.__table__)
session: Session = testing_session()
try:
created = create_scheduled_task(
session,
ScheduledTaskCreateRequest(
task_key="syslog.cleanup.weekly",
name="每周日志清理",
task_type="syslog_cleanup",
description="weekly cleanup",
cron_expression="0 2 * * 1",
timezone="Asia/Shanghai",
retain_days=14,
enabled=True,
),
actor_user_id=None,
)
assert created is not None
original_next_run = created.next_run_at
assert original_next_run is not None
updated = update_scheduled_task(
session,
created.id,
ScheduledTaskUpdateRequest(cron_expression="0 4 * * 1", retain_days=21),
actor_user_id=None,
)
assert updated is not None
assert updated.retain_days == 21
assert updated.next_run_at is not None
assert updated.next_run_at != original_next_run
finally:
session.close()
def test_cleanup_audit_logs_only_removes_expired_rows() -> None:
testing_session = _build_sessionmaker(AuditLog.__table__)
session: Session = testing_session()
try:
expired = AuditLog(user_id=None, action="expired", detail="old")
fresh = AuditLog(user_id=None, action="fresh", detail="new")
session.add_all([expired, fresh])
session.flush()
expired.created_at = expired.created_at - timedelta(days=45)
fresh.created_at = fresh.created_at - timedelta(days=5)
session.commit()
deleted_count = cleanup_audit_logs(session, retain_days=30)
session.commit()
assert deleted_count == 1
remaining_actions = session.scalars(select(AuditLog.action).order_by(AuditLog.id.asc())).all()
assert remaining_actions == ["fresh"]
finally:
session.close()
def test_compute_next_run_at_returns_future_utc_timestamp() -> None:
next_run = compute_next_run_at("0 3 * * *", "Asia/Shanghai", from_time=None)
assert next_run.tzinfo is not None
assert next_run > datetime.now(timezone.utc)