Files
fquiz/api/tests/test_audit_log_coverage.py
T

490 lines
17 KiB
Python
Raw Normal View History

from __future__ import annotations
import os
import unittest
from contextlib import ExitStack
from io import BytesIO
from tempfile import TemporaryDirectory
from types import SimpleNamespace
from unittest.mock import patch
os.environ.setdefault("DATABASE_URL", "sqlite+pysqlite:///:memory:")
os.environ.setdefault("MINIO_ENABLED", "false")
from fastapi import HTTPException
from sqlalchemy import create_engine, select, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from api.app import models # noqa: F401
from api.app.core.database import Base
from api.app.core.security import hash_password
from api.app.models.audit_log import AuditLog
from api.app.models.file_storage import FileStorageBackend, FileStorageMount
from api.app.models.user import User
from api.app.schemas.admin import MenuCreateRequest, MenuUpdateRequest, RoleCreateRequest, RoleUpdateRequest
from api.app.schemas.auth import LoginRequest
from api.app.schemas.file_storage import (
FileCreateDirectoryRequest,
FileDeleteRequest,
FileMoveRequest,
FileRenameRequest,
)
from api.app.schemas.system_param import SystemParamCreateRequest, SystemParamUpdateRequest
from api.app.schemas.user import UserCreateRequest, UserPasswordResetRequest, UserRoleUpdateRequest, UserUpdateRequest
from api.app.services import auth_service, file_service, legacy_admin_rbac_service, system_param_service, user_service
from api.app.services.legacy_authz_service import UserAuthorization
class AuditLogCoverageTest(unittest.TestCase):
def setUp(self) -> None:
self.engine = create_engine(
"sqlite+pysqlite://",
connect_args={"check_same_thread": False},
poolclass=StaticPool,
)
self.SessionLocal = sessionmaker(
bind=self.engine,
autocommit=False,
autoflush=False,
expire_on_commit=False,
)
Base.metadata.create_all(bind=self.engine)
self.session = self.SessionLocal()
self.tempdir = TemporaryDirectory()
self.session_local_patcher = patch("api.app.core.database.SessionLocal", self.SessionLocal)
self.session_local_patcher.start()
def tearDown(self) -> None:
self.session_local_patcher.stop()
self.session.close()
Base.metadata.drop_all(bind=self.engine)
self.engine.dispose()
self.tempdir.cleanup()
def test_auth_login_failure_writes_audit_log(self) -> None:
user = self._create_user(user_id="alice", username="alice", password="correct-pass")
with self.assertRaises(HTTPException) as exc_info:
auth_service.login_user(
self.session,
LoginRequest(user_id=user.id, password="wrong-pass"),
user_agent="pytest",
ip_address="127.0.0.1",
)
self.assertEqual(exc_info.exception.status_code, 401)
log = self._latest_log("auth.login_failed")
self.assertEqual(log.user_id, user.id)
self.assertIn(f"attempted_user_id={user.id}", log.detail or "")
self.assertIn("reason=invalid_credentials", log.detail or "")
def test_user_and_system_param_actions_write_audit_logs(self) -> None:
actor = self._create_user(user_id="admin", username="admin", password="admin-pass")
created = user_service.create_user(
self.session,
UserCreateRequest(
user_id="bob",
email="bob@example.com",
username="bobby",
password="initial-pass",
),
actor_user_id=actor.id,
)
self.assertIsNotNone(created)
updated = user_service.update_user(
self.session,
"bob",
UserUpdateRequest(username="robert", status="disabled"),
actor_user_id=actor.id,
)
self.assertIsNotNone(updated)
reset = user_service.reset_user_password(
self.session,
"bob",
UserPasswordResetRequest(new_password="updated-pass"),
actor_user_id=actor.id,
)
self.assertIsNotNone(reset)
created_param = system_param_service.create_system_param(
self.session,
SystemParamCreateRequest(
param_key="site_name",
param_name="Site Name",
param_value="fquiz",
description="visible name",
),
actor_user_id=actor.id,
)
self.assertIsNotNone(created_param)
updated_param = system_param_service.update_system_param(
self.session,
created_param.id,
SystemParamUpdateRequest(status="disabled", description="changed description"),
actor_user_id=actor.id,
)
self.assertIsNotNone(updated_param)
self.assertTrue(
system_param_service.delete_system_param(
self.session,
created_param.id,
actor_user_id=actor.id,
)
)
self.assertTrue(user_service.delete_user(self.session, "bob", actor_user_id=actor.id))
actions = self._logged_actions()
self.assertIn("user.create", actions)
self.assertIn("user.update", actions)
self.assertIn("user.password.reset", actions)
self.assertIn("user.delete", actions)
self.assertIn("system_param.create", actions)
self.assertIn("system_param.update", actions)
self.assertIn("system_param.delete", actions)
user_update_log = self._latest_log("user.update")
self.assertIn("changed_fields=status,username", user_update_log.detail or "")
self.assertIn("status_transition=ENABLED->DISABLED", user_update_log.detail or "")
param_update_log = self._latest_log("system_param.update")
self.assertIn("param_key=site_name", param_update_log.detail or "")
self.assertIn("changed_fields=description,status", param_update_log.detail or "")
def test_legacy_role_menu_and_user_role_actions_write_audit_logs(self) -> None:
actor = self._create_user(user_id="admin", username="admin", password="admin-pass")
target = self._create_user(user_id="carol", username="carol", password="carol-pass")
self._create_legacy_rbac_tables()
self.session.execute(
text(
"""
INSERT INTO user_role (id, name, descr, state, create_date, update_date)
VALUES
('editor', 'Editor', 'Editor', 'ENABLED', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
"""
)
)
self.session.commit()
with patch(
"api.app.services.user_service.get_user_authorization",
return_value=UserAuthorization(role_codes={"editor"}, permission_codes=set()),
):
updated_user = user_service.set_user_roles(
self.session,
target.id,
UserRoleUpdateRequest(role_codes=["editor"]),
actor_user_id=actor.id,
)
self.assertIsNotNone(updated_user)
with ExitStack() as stack:
stack.enter_context(
patch("api.app.services.legacy_admin_rbac_service._legacy_role_table_exists", return_value=True)
)
stack.enter_context(
patch("api.app.services.legacy_admin_rbac_service._legacy_menu_table_exists", return_value=True)
)
stack.enter_context(
patch("api.app.services.legacy_admin_rbac_service._legacy_user_role_relation_exists", return_value=True)
)
first_menu = legacy_admin_rbac_service.create_menu(
self.session,
MenuCreateRequest(
code="custom.audit.menu",
name="Custom Audit Menu",
path="/admin/custom-audit",
icon="Audit",
sort_order=10,
),
actor_user_id=actor.id,
)
self.assertIsNotNone(first_menu)
second_menu = legacy_admin_rbac_service.create_menu(
self.session,
MenuCreateRequest(
code="custom.audit.archive",
name="Custom Audit Archive",
path="/admin/custom-archive",
icon="Archive",
sort_order=20,
),
actor_user_id=actor.id,
)
self.assertIsNotNone(second_menu)
updated_menu = legacy_admin_rbac_service.update_menu(
self.session,
first_menu.id,
MenuUpdateRequest(name="Custom Audit Menu 2", status="disabled"),
actor_user_id=actor.id,
)
self.assertIsNotNone(updated_menu)
created_role = legacy_admin_rbac_service.create_role(
self.session,
RoleCreateRequest(code="auditor", name="Auditor", menu_ids=[]),
actor_user_id=actor.id,
)
self.assertIsNotNone(created_role)
updated_role = legacy_admin_rbac_service.update_role(
self.session,
created_role.id,
RoleUpdateRequest(name="Auditor Updated"),
actor_user_id=actor.id,
)
self.assertIsNotNone(updated_role)
replaced_role = legacy_admin_rbac_service.replace_role_menus(
self.session,
created_role.id,
[first_menu.id, second_menu.id],
actor_user_id=actor.id,
)
self.assertIsNotNone(replaced_role)
self.assertTrue(
legacy_admin_rbac_service.delete_role(
self.session,
created_role.id,
actor_user_id=actor.id,
)
)
self.assertTrue(
legacy_admin_rbac_service.delete_menu(
self.session,
second_menu.id,
actor_user_id=actor.id,
)
)
actions = self._logged_actions()
self.assertIn("user.roles.replace", actions)
self.assertIn("menu.create", actions)
self.assertIn("menu.update", actions)
self.assertIn("menu.delete", actions)
self.assertIn("role.create", actions)
self.assertIn("role.update", actions)
self.assertIn("role.menus.replace", actions)
self.assertIn("role.delete", actions)
role_replace_log = self._latest_log("role.menus.replace")
self.assertIn("role_code=auditor", role_replace_log.detail or "")
self.assertIn("menu_ids=", role_replace_log.detail or "")
def test_file_actions_write_audit_logs(self) -> None:
actor = self._create_user(user_id="admin", username="admin", password="admin-pass")
mount = self._create_vfs_mount()
file_service.create_directory(
self.session,
FileCreateDirectoryRequest(mount_code=mount.code, parent_path="/", name="docs"),
actor=actor,
)
file_service.create_directory(
self.session,
FileCreateDirectoryRequest(mount_code=mount.code, parent_path="/", name="archive"),
actor=actor,
)
uploaded = file_service.upload_file_to_path(
self.session,
mount_code=mount.code,
parent_path="/",
file=SimpleNamespace(
filename="guide.txt",
file=BytesIO(b"audit coverage"),
content_type="text/plain",
),
actor=actor,
)
self.assertEqual(uploaded.path, "/guide.txt")
renamed = file_service.rename_file_path(
self.session,
FileRenameRequest(
mount_code=mount.code,
path="/guide.txt",
is_dir=False,
new_name="guide-v2.txt",
),
actor=actor,
)
self.assertEqual(renamed.target_path, "/guide-v2.txt")
moved = file_service.move_file_path(
self.session,
FileMoveRequest(
mount_code=mount.code,
path="/guide-v2.txt",
is_dir=False,
target_parent_path="/archive",
),
actor=actor,
)
self.assertEqual(moved.target_path, "/archive/guide-v2.txt")
filename, content, content_type = file_service.download_file_from_path(
self.session,
mount_code=mount.code,
path="/archive/guide-v2.txt",
actor=actor,
)
self.assertEqual(filename, "guide-v2.txt")
self.assertEqual(content, b"audit coverage")
self.assertEqual(content_type, "text/plain")
zip_name, zip_content, zip_content_type = file_service.download_directory_as_zip(
self.session,
mount_code=mount.code,
path="/archive",
actor=actor,
)
self.assertEqual(zip_name, "archive.zip")
self.assertEqual(zip_content_type, "application/zip")
self.assertGreater(len(zip_content), 0)
deleted = file_service.delete_file_path(
self.session,
FileDeleteRequest(
mount_code=mount.code,
path="/archive/guide-v2.txt",
is_dir=False,
recursive=False,
),
actor=actor,
)
self.assertEqual(deleted.path, "/archive/guide-v2.txt")
actions = self._logged_actions()
self.assertIn("file.mkdir", actions)
self.assertIn("file.upload", actions)
self.assertIn("file.rename", actions)
self.assertIn("file.move", actions)
self.assertIn("file.download", actions)
self.assertIn("file.download_zip", actions)
self.assertIn("file.delete", actions)
download_log = self._latest_log("file.download")
self.assertIn("mount_code=main", download_log.detail or "")
self.assertIn("path=/archive/guide-v2.txt", download_log.detail or "")
def _create_user(
self,
*,
user_id: str,
username: str,
password: str,
status: str = "ENABLED",
) -> User:
user = User(
id=user_id,
email=f"{user_id}@example.com",
username=username,
password_hash=hash_password(password),
status=status,
)
self.session.add(user)
self.session.commit()
return user
def _create_vfs_mount(self) -> FileStorageMount:
backend = FileStorageBackend(
code="local",
name="Local",
driver_type="VFS",
status="enabled",
is_default=True,
config_json={"root_dir": self.tempdir.name},
)
mount = FileStorageMount(
code="main",
name="Main",
backend=backend,
mount_path="/",
root_path="/",
is_enabled=True,
)
self.session.add(mount)
self.session.commit()
return mount
def _create_legacy_rbac_tables(self) -> None:
self.session.execute(
text(
"""
CREATE TABLE user_role (
id TEXT PRIMARY KEY,
name TEXT,
descr TEXT,
state TEXT,
create_date DATETIME,
update_date DATETIME
)
"""
)
)
self.session.execute(
text(
"""
CREATE TABLE user_role_rela (
rela_id TEXT PRIMARY KEY,
user_id TEXT,
role_id TEXT
)
"""
)
)
self.session.execute(
text(
"""
CREATE TABLE menu (
menu_id TEXT PRIMARY KEY,
menu_name TEXT,
menu_label TEXT,
menu_type TEXT,
parent_id TEXT,
url TEXT,
menu_icon TEXT,
seq INTEGER,
state TEXT,
menu_descr TEXT,
create_date DATETIME,
update_date DATETIME
)
"""
)
)
self.session.execute(
text(
"""
CREATE TABLE role_menu_rela (
rela_id TEXT PRIMARY KEY,
role_id TEXT,
menu_id TEXT
)
"""
)
)
self.session.commit()
def _latest_log(self, action: str) -> AuditLog:
log = self.session.scalars(
select(AuditLog).where(AuditLog.action == action).order_by(AuditLog.id.desc())
).first()
self.assertIsNotNone(log, f"expected audit log for {action}")
return log
def _logged_actions(self) -> set[str]:
return {log.action for log in self.session.scalars(select(AuditLog)).all()}
if __name__ == "__main__":
unittest.main()