Files
fquiz/api/app/services/legacy_admin_rbac_service.py
T
2026-04-26 00:14:25 +08:00

798 lines
25 KiB
Python

from __future__ import annotations
import asyncio
from datetime import datetime
from uuid import uuid4
from sqlalchemy import bindparam, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from ..schemas.admin import (
MenuCreateRequest,
MenuListResponse,
MenuPublic,
MenuUpdateRequest,
RoleCreateRequest,
RoleListResponse,
RolePublic,
RoleUpdateRequest,
)
from .legacy_authz_service import (
DEFAULT_ADMIN_PERMISSION_CODES,
LEGACY_URL_PATH_MAP,
MENU_CODE_PERMISSION_MAP,
)
from .push_service import publish_topic
from .user_service import queue_users_auth_refresh
PROTECTED_ROLE_IDS = {"admin", "user", "sys_mgr"}
REMOVED_MENU_CODES = {
"admin.wxapp",
"admin.system_message",
"admin.inbox",
"admin.code_review",
"admin.git_desktop",
"admin.mdresolve",
"admin.data_query",
"admin.hot_search",
"admin.filedetector",
"admin.baidu_pan",
"admin.tag",
"admin.knowledge_point_mgr",
"admin.life_countdown",
"admin.password",
"admin.token_usage",
"admin.history",
"admin.vocabulary",
"admin.diary",
"admin.homework",
"admin.question_bank",
"admin.cron_task_mgr",
"admin.todos",
"admin.job_mgr",
"admin.jwt_generator",
"admin.queue_mgr",
"admin.knowledge_mastery",
}
PROTECTED_MENU_CODES = {
"dashboard",
"admin.users",
"admin.roles",
"admin.menus",
"admin.system_params",
"admin.wxapp",
"admin.agent",
"admin.mcp_server",
"admin.files",
"admin.filedetector",
"admin.baidu_pan",
"admin.requirements",
"admin.power_lines",
"admin.lightning_currents",
"admin.lightning_distribution",
"admin.task_monitor",
"admin.data_query",
"admin.hot_search",
"admin.schedule",
"admin.cron_task_mgr",
"admin.todos",
"admin.mindmap",
"admin.mdresolve",
"admin.mermaid_mgr",
"admin.tag",
"admin.knowledge_point_mgr",
"admin.job_mgr",
"admin.syslog",
"admin.chat",
"admin.jwt_generator",
"admin.api_tester",
"admin.models",
"admin.orchestration",
"admin.wine_runner",
# quiz legacy defaults
"sys_mgr",
"menu_mgr",
"role_mgr",
"user_mgr",
}
def list_roles(db: Session) -> RoleListResponse:
rows = db.execute(
text(
"""
SELECT id, name
FROM user_role
ORDER BY create_date DESC NULLS LAST, id ASC
"""
)
).mappings().all()
role_ids = [str(row["id"]) for row in rows]
role_menu_ids = _load_role_menu_ids_map(db, role_ids)
menu_rows = _load_menus_map(db, sorted({menu_id for ids in role_menu_ids.values() for menu_id in ids}))
items: list[RolePublic] = []
for row in rows:
role_id = str(row["id"])
menu_ids = sorted(menu_id for menu_id in role_menu_ids.get(role_id, []) if menu_id in menu_rows)
permission_codes = sorted(_permission_codes_from_menu_rows(menu_rows, menu_ids))
items.append(
RolePublic(
id=role_id,
code=role_id,
name=(row.get("name") or role_id).strip(),
permission_codes=permission_codes,
menu_ids=menu_ids,
)
)
return RoleListResponse(items=items, total=len(items))
def get_role_by_id(db: Session, role_id: str) -> RolePublic | None:
role_id = role_id.strip()
if not role_id:
return None
rows = db.execute(
text("SELECT id, name FROM user_role WHERE id = :id"),
{"id": role_id},
).mappings().all()
if not rows:
return None
role_menu_ids = _load_role_menu_ids_map(db, [role_id]).get(role_id, [])
menu_rows = _load_menus_map(db, role_menu_ids)
filtered_menu_ids = sorted(menu_id for menu_id in role_menu_ids if menu_id in menu_rows)
return RolePublic(
id=role_id,
code=role_id,
name=(rows[0].get("name") or role_id).strip(),
permission_codes=sorted(_permission_codes_from_menu_rows(menu_rows, filtered_menu_ids)),
menu_ids=filtered_menu_ids,
)
def create_role(db: Session, payload: RoleCreateRequest) -> RolePublic | None:
role_id = payload.code.strip()
if not role_id:
return None
role_name = payload.name.strip()
if not role_name:
return None
existing = db.scalar(text("SELECT id FROM user_role WHERE id = :id"), {"id": role_id})
if existing:
return None
menu_ids = sorted(set(menu_id.strip() for menu_id in payload.menu_ids if menu_id.strip()))
if not _menu_ids_exist(db, menu_ids):
return None
now = datetime.now()
try:
db.execute(
text(
"""
INSERT INTO user_role (id, name, descr, state, create_date, update_date)
VALUES (:id, :name, :descr, 'ENABLED', :create_date, :update_date)
"""
),
{
"id": role_id,
"name": role_name,
"descr": role_name,
"create_date": now,
"update_date": now,
},
)
_replace_role_menus_internal(db, role_id, menu_ids)
db.commit()
except SQLAlchemyError:
db.rollback()
return None
_fire_and_forget(
publish_topic(
"admin.roles",
name="roles.changed",
payload={"action": "created", "role_id": role_id, "role_code": role_id},
requires_refetch=["/api/v1/admin/roles"],
dedupe_key=f"roles:created:{role_id}",
)
)
return get_role_by_id(db, role_id)
def update_role(db: Session, role_id: str, payload: RoleUpdateRequest) -> RolePublic | None:
role_id = role_id.strip()
if not role_id:
return None
role_exists = db.scalar(text("SELECT id FROM user_role WHERE id = :id"), {"id": role_id})
if not role_exists:
return None
impacted_user_ids = _get_role_user_ids(db, role_id)
menus_changed = False
try:
if payload.name is not None:
role_name = payload.name.strip()
if not role_name:
db.rollback()
return None
db.execute(
text("UPDATE user_role SET name = :name, descr = :descr, update_date = :update_date WHERE id = :id"),
{
"id": role_id,
"name": role_name,
"descr": role_name,
"update_date": datetime.now(),
},
)
if payload.menu_ids is not None:
menu_ids = sorted(set(menu_id.strip() for menu_id in payload.menu_ids if menu_id.strip()))
if not _menu_ids_exist(db, menu_ids):
db.rollback()
return None
_replace_role_menus_internal(db, role_id, menu_ids)
menus_changed = True
db.commit()
except SQLAlchemyError:
db.rollback()
return None
if menus_changed:
queue_users_auth_refresh(db, impacted_user_ids)
_fire_and_forget(
publish_topic(
"admin.roles",
name="roles.changed",
payload={"action": "updated", "role_id": role_id, "role_code": role_id},
requires_refetch=["/api/v1/admin/roles"],
dedupe_key=f"roles:updated:{role_id}",
)
)
if menus_changed:
_fire_and_forget(
publish_topic(
"admin.menus",
name="menus.changed",
payload={"action": "role_menus_updated", "role_id": role_id, "role_code": role_id},
requires_refetch=["/api/v1/admin/menus", "/api/v1/admin/me/menus"],
dedupe_key=f"menus:role_updated:{role_id}",
)
)
return get_role_by_id(db, role_id)
def delete_role(db: Session, role_id: str) -> bool:
role_id = role_id.strip()
if not role_id or role_id in PROTECTED_ROLE_IDS:
return False
exists = db.scalar(text("SELECT id FROM user_role WHERE id = :id"), {"id": role_id})
if not exists:
return False
impacted_user_ids = _get_role_user_ids(db, role_id)
try:
db.execute(text("DELETE FROM role_menu_rela WHERE role_id = :role_id"), {"role_id": role_id})
db.execute(text("DELETE FROM user_role_rela WHERE role_id = :role_id"), {"role_id": role_id})
db.execute(text("DELETE FROM user_role WHERE id = :id"), {"id": role_id})
db.commit()
except SQLAlchemyError:
db.rollback()
return False
queue_users_auth_refresh(db, impacted_user_ids)
_fire_and_forget(
publish_topic(
"admin.roles",
name="roles.changed",
payload={"action": "deleted", "role_id": role_id, "role_code": role_id},
requires_refetch=["/api/v1/admin/roles"],
dedupe_key=f"roles:deleted:{role_id}",
)
)
_fire_and_forget(
publish_topic(
"admin.menus",
name="menus.changed",
payload={"action": "role_deleted", "role_id": role_id, "role_code": role_id},
requires_refetch=["/api/v1/admin/me/menus"],
dedupe_key=f"menus:role_deleted:{role_id}",
)
)
return True
def list_permissions(_: Session) -> list[dict[str, str | int]]:
codes = sorted(set(DEFAULT_ADMIN_PERMISSION_CODES))
return [{"id": idx + 1, "code": code, "name": code} for idx, code in enumerate(codes)]
def list_menus(db: Session) -> MenuListResponse:
rows = _load_menus_rows(db)
items = [serialize_menu_row(row) for row in rows]
return MenuListResponse(items=items, total=len(items))
def get_menu_by_id(db: Session, menu_id: str) -> MenuPublic | None:
normalized_menu_id = menu_id.strip()
if not normalized_menu_id:
return None
row = db.execute(
text(
"""
SELECT menu_id, menu_name, menu_label, menu_type, parent_id, url, menu_icon, seq, state, menu_descr
FROM menu
WHERE menu_id = :menu_id
"""
),
{"menu_id": normalized_menu_id},
).mappings().first()
if not row:
return None
if str(row.get("menu_name") or "").strip() in REMOVED_MENU_CODES:
return None
return serialize_menu_row(row)
def create_menu(db: Session, payload: MenuCreateRequest) -> MenuPublic | None:
menu_code = payload.code.strip()
if not menu_code:
return None
if menu_code in REMOVED_MENU_CODES:
return None
menu_name = payload.name.strip()
if not menu_name:
return None
exists = db.scalar(text("SELECT menu_id FROM menu WHERE menu_name = :menu_name"), {"menu_name": menu_code})
if exists:
return None
parent_id = payload.parent_id.strip() if payload.parent_id else None
if parent_id and parent_id == menu_code:
return None
if parent_id and not db.scalar(text("SELECT menu_id FROM menu WHERE menu_id = :menu_id"), {"menu_id": parent_id}):
return None
menu_id = menu_code if len(menu_code) <= 32 else uuid4().hex
try:
db.execute(
text(
"""
INSERT INTO menu (
menu_id, menu_name, menu_label, menu_type, parent_id, url, menu_icon, seq, state,
menu_descr, create_date, update_date
)
VALUES (
:menu_id, :menu_name, :menu_label, :menu_type, :parent_id, :url, :menu_icon, :seq, :state,
:menu_descr, :create_date, :update_date
)
"""
),
{
"menu_id": menu_id,
"menu_name": menu_code,
"menu_label": menu_name,
"menu_type": _to_legacy_menu_type(payload.type),
"parent_id": parent_id,
"url": _to_legacy_url(payload.path),
"menu_icon": payload.icon,
"seq": payload.sort_order,
"state": _to_legacy_state(payload.status),
"menu_descr": payload.permission_code or payload.component,
"create_date": datetime.now(),
"update_date": datetime.now(),
},
)
db.commit()
except SQLAlchemyError:
db.rollback()
return None
_fire_and_forget(
publish_topic(
"admin.menus",
name="menus.changed",
payload={"action": "created", "menu_id": menu_id, "menu_code": menu_code},
requires_refetch=["/api/v1/admin/menus", "/api/v1/admin/me/menus"],
dedupe_key=f"menus:created:{menu_id}",
)
)
return get_menu_by_id(db, menu_id)
def update_menu(db: Session, menu_id: str, payload: MenuUpdateRequest) -> MenuPublic | None:
menu = get_menu_by_id(db, menu_id)
if not menu:
return None
update_data = payload.model_dump(exclude_unset=True)
next_name = menu.name
if "name" in update_data and update_data["name"] is not None:
candidate_name = str(update_data["name"]).strip()
if not candidate_name:
return None
next_name = candidate_name
next_parent_id = menu.parent_id
if "parent_id" in update_data:
parent_id = update_data["parent_id"]
if parent_id:
normalized_parent = parent_id.strip()
if normalized_parent == menu.id:
return None
parent_exists = db.scalar(text("SELECT menu_id FROM menu WHERE menu_id = :menu_id"), {"menu_id": normalized_parent})
if not parent_exists:
return None
next_parent_id = normalized_parent
else:
next_parent_id = None
impacted_user_ids = _get_users_with_menu_access(db, menu.id)
try:
db.execute(
text(
"""
UPDATE menu
SET
menu_label = :menu_label,
url = :url,
menu_icon = :menu_icon,
parent_id = :parent_id,
menu_type = :menu_type,
seq = :seq,
state = :state,
menu_descr = :menu_descr,
update_date = :update_date
WHERE menu_id = :menu_id
"""
),
{
"menu_id": menu.id,
"menu_label": next_name,
"url": _to_legacy_url(update_data.get("path", menu.path)),
"menu_icon": update_data.get("icon", menu.icon),
"parent_id": next_parent_id,
"menu_type": _to_legacy_menu_type(update_data.get("type", menu.type)),
"seq": int(update_data.get("sort_order", menu.sort_order)),
"state": _to_legacy_state(update_data.get("status", menu.status)),
"menu_descr": update_data.get("permission_code", menu.permission_code) or update_data.get("component"),
"update_date": datetime.now(),
},
)
db.commit()
except SQLAlchemyError:
db.rollback()
return None
queue_users_auth_refresh(db, impacted_user_ids)
_fire_and_forget(
publish_topic(
"admin.menus",
name="menus.changed",
payload={"action": "updated", "menu_id": menu.id, "menu_code": menu.code},
requires_refetch=["/api/v1/admin/menus", "/api/v1/admin/me/menus"],
dedupe_key=f"menus:updated:{menu.id}",
)
)
return get_menu_by_id(db, menu.id)
def delete_menu(db: Session, menu_id: str) -> bool:
menu = get_menu_by_id(db, menu_id)
if not menu:
return False
if menu.code in PROTECTED_MENU_CODES:
return False
child_exists = db.scalar(text("SELECT menu_id FROM menu WHERE parent_id = :parent_id LIMIT 1"), {"parent_id": menu.id})
if child_exists:
return False
impacted_user_ids = _get_users_with_menu_access(db, menu.id)
try:
db.execute(text("DELETE FROM role_menu_rela WHERE menu_id = :menu_id"), {"menu_id": menu.id})
db.execute(text("DELETE FROM menu WHERE menu_id = :menu_id"), {"menu_id": menu.id})
db.commit()
except SQLAlchemyError:
db.rollback()
return False
queue_users_auth_refresh(db, impacted_user_ids)
_fire_and_forget(
publish_topic(
"admin.menus",
name="menus.changed",
payload={"action": "deleted", "menu_id": menu.id, "menu_code": menu.code},
requires_refetch=["/api/v1/admin/menus", "/api/v1/admin/me/menus"],
dedupe_key=f"menus:deleted:{menu.id}",
)
)
return True
def list_role_menu_ids(db: Session, role_id: str) -> list[str] | None:
role_exists = db.scalar(text("SELECT id FROM user_role WHERE id = :id"), {"id": role_id})
if not role_exists:
return None
rows = db.execute(
text("SELECT menu_id FROM role_menu_rela WHERE role_id = :role_id ORDER BY menu_id ASC"),
{"role_id": role_id},
).all()
menu_ids = [str(row[0]) for row in rows]
menu_rows = _load_menus_map(db, menu_ids)
return [menu_id for menu_id in menu_ids if menu_id in menu_rows]
def replace_role_menus(db: Session, role_id: str, menu_ids: list[str]) -> RolePublic | None:
role_exists = db.scalar(text("SELECT id FROM user_role WHERE id = :id"), {"id": role_id})
if not role_exists:
return None
normalized_menu_ids = sorted(set(menu_id.strip() for menu_id in menu_ids if menu_id.strip()))
if not _menu_ids_exist(db, normalized_menu_ids):
return None
impacted_user_ids = _get_role_user_ids(db, role_id)
try:
_replace_role_menus_internal(db, role_id, normalized_menu_ids)
db.commit()
except SQLAlchemyError:
db.rollback()
return None
queue_users_auth_refresh(db, impacted_user_ids)
_fire_and_forget(
publish_topic(
"admin.roles",
name="roles.changed",
payload={"action": "menus_replaced", "role_id": role_id, "role_code": role_id},
requires_refetch=["/api/v1/admin/roles"],
dedupe_key=f"roles:menus_replaced:{role_id}",
)
)
_fire_and_forget(
publish_topic(
"admin.menus",
name="menus.changed",
payload={"action": "role_menus_replaced", "role_id": role_id, "role_code": role_id},
requires_refetch=["/api/v1/admin/menus", "/api/v1/admin/me/menus"],
dedupe_key=f"menus:role_menus_replaced:{role_id}",
)
)
return get_role_by_id(db, role_id)
def serialize_menu_row(row: dict[str, object]) -> MenuPublic:
menu_code = str(row.get("menu_name") or row.get("menu_id") or "")
return MenuPublic(
id=str(row["menu_id"]),
code=menu_code,
name=str(row.get("menu_label") or menu_code),
path=_to_api_path(row.get("url")),
icon=(row.get("menu_icon") or None),
parent_id=(str(row["parent_id"]) if row.get("parent_id") else None),
type=_to_api_menu_type(row.get("menu_type")),
sort_order=int(row.get("seq") or 0),
status=_to_api_state(row.get("state")),
visible=_to_api_state(row.get("state")) == "enabled",
cacheable=False,
component=None,
permission_code=_primary_permission(menu_code, row.get("menu_type")),
)
def _load_role_menu_ids_map(db: Session, role_ids: list[str]) -> dict[str, list[str]]:
mapping = {role_id: [] for role_id in role_ids}
if not role_ids:
return mapping
rows = db.execute(
text(
"""
SELECT role_id, menu_id
FROM role_menu_rela
WHERE role_id IN :role_ids
ORDER BY menu_id ASC
"""
).bindparams(bindparam("role_ids", expanding=True)),
{"role_ids": role_ids},
).mappings().all()
for row in rows:
mapping.setdefault(str(row["role_id"]), []).append(str(row["menu_id"]))
return mapping
def _load_menus_map(db: Session, menu_ids: list[str]) -> dict[str, dict[str, object]]:
if not menu_ids:
return {}
rows = db.execute(
text(
"""
SELECT menu_id, menu_name, menu_type, state
FROM menu
WHERE menu_id IN :menu_ids
"""
).bindparams(bindparam("menu_ids", expanding=True)),
{"menu_ids": menu_ids},
).mappings().all()
return {
str(row["menu_id"]): dict(row)
for row in rows
if str(row.get("menu_name") or "").strip() not in REMOVED_MENU_CODES
}
def _permission_codes_from_menu_rows(
menu_rows: dict[str, dict[str, object]],
menu_ids: list[str],
) -> set[str]:
codes: set[str] = set()
for menu_id in menu_ids:
row = menu_rows.get(menu_id)
if not row:
continue
if _to_api_state(row.get("state")) != "enabled":
continue
menu_name = str(row.get("menu_name") or "")
if menu_name in REMOVED_MENU_CODES:
continue
menu_type = row.get("menu_type")
mapped = MENU_CODE_PERMISSION_MAP.get(menu_name, set())
codes.update(mapped)
if not mapped and str(menu_type or "").upper() == "BUTTON" and "." in menu_name:
codes.add(menu_name)
return codes
def _menu_ids_exist(db: Session, menu_ids: list[str]) -> bool:
if not menu_ids:
return True
rows = db.execute(
text("SELECT menu_id, menu_name FROM menu WHERE menu_id IN :menu_ids").bindparams(bindparam("menu_ids", expanding=True)),
{"menu_ids": menu_ids},
).mappings().all()
existing = {
str(row["menu_id"])
for row in rows
if str(row.get("menu_name") or "").strip() not in REMOVED_MENU_CODES
}
return set(menu_ids).issubset(existing)
def _replace_role_menus_internal(db: Session, role_id: str, menu_ids: list[str]) -> None:
db.execute(text("DELETE FROM role_menu_rela WHERE role_id = :role_id"), {"role_id": role_id})
if not menu_ids:
return
stmt = text(
"""
INSERT INTO role_menu_rela (rela_id, role_id, menu_id)
VALUES (:rela_id, :role_id, :menu_id)
"""
)
for menu_id in menu_ids:
db.execute(
stmt,
{
"rela_id": uuid4().hex,
"role_id": role_id,
"menu_id": menu_id,
},
)
def _get_role_user_ids(db: Session, role_id: str) -> list[str]:
try:
rows = db.execute(
text("SELECT user_id FROM user_role_rela WHERE role_id = :role_id"),
{"role_id": role_id},
).all()
except SQLAlchemyError:
db.rollback()
return []
return sorted({str(row[0]) for row in rows})
def _get_users_with_menu_access(db: Session, menu_id: str) -> list[str]:
try:
rows = db.execute(
text(
"""
SELECT DISTINCT urr.user_id
FROM role_menu_rela rmr
JOIN user_role_rela urr ON urr.role_id = rmr.role_id
WHERE rmr.menu_id = :menu_id
"""
),
{"menu_id": menu_id},
).all()
except SQLAlchemyError:
db.rollback()
return []
return sorted({str(row[0]) for row in rows})
def _load_menus_rows(db: Session) -> list[dict[str, object]]:
rows = db.execute(
text(
"""
SELECT menu_id, menu_name, menu_label, menu_type, parent_id, url, menu_icon, seq, state, menu_descr
FROM menu
ORDER BY seq ASC NULLS LAST, menu_id ASC
"""
)
).mappings().all()
return [
dict(row)
for row in rows
if str(row.get("menu_name") or "").strip() not in REMOVED_MENU_CODES
]
def _primary_permission(menu_name: str, menu_type: object) -> str | None:
mapped = MENU_CODE_PERMISSION_MAP.get(menu_name)
if mapped:
return sorted(mapped)[0]
if str(menu_type or "").upper() == "BUTTON" and "." in menu_name:
return menu_name
return None
def _to_api_state(raw_state: object) -> str:
return "enabled" if str(raw_state or "").upper() in {"ENABLED", "ACTIVE", "1", "TRUE", ""} else "disabled"
def _to_legacy_state(status: str | None) -> str:
return "DISABLED" if (status or "").strip().lower() == "disabled" else "ENABLED"
def _to_api_menu_type(raw_type: object) -> str:
value = str(raw_type or "").strip().upper()
if value == "DIRECTORY":
return "directory"
if value == "BUTTON":
return "button"
return "menu"
def _to_legacy_menu_type(raw_type: str | None) -> str:
value = (raw_type or "").strip().lower()
if value == "directory":
return "DIRECTORY"
if value == "button":
return "BUTTON"
return "MENU"
def _to_api_path(raw_url: object) -> str | None:
url = str(raw_url or "").strip()
if not url:
return None
if url in LEGACY_URL_PATH_MAP:
return LEGACY_URL_PATH_MAP[url]
if url.startswith(("http://", "https://", "/")):
return url
return f"/admin/{url}"
def _to_legacy_url(path: str | None) -> str:
normalized = (path or "").strip()
if not normalized:
return ""
reverse = {value: key for key, value in LEGACY_URL_PATH_MAP.items()}
if normalized in reverse:
return reverse[normalized]
if normalized.startswith("/admin/"):
tail = normalized.removeprefix("/admin/")
if tail:
return tail
return normalized
def _fire_and_forget(coro: object) -> None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
loop.create_task(coro)