19a39e0433
Co-authored-by: multica-agent <github@multica.ai>
565 lines
17 KiB
Python
565 lines
17 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
|
|
from sqlalchemy import bindparam, text
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
from sqlalchemy.orm import Session
|
|
|
|
# Keep this list aligned with runtime permission checks used by API endpoints.
|
|
DEFAULT_ADMIN_PERMISSION_CODES: set[str] = {
|
|
"user.read",
|
|
"user.write",
|
|
"user.manage",
|
|
"role.read",
|
|
"role.manage",
|
|
"menu.read",
|
|
"menu.manage",
|
|
"system_param.read",
|
|
"system_param.manage",
|
|
"file.read",
|
|
"file.manage",
|
|
"line.read",
|
|
"line.manage",
|
|
"tower.read",
|
|
"tower.manage",
|
|
"lightning.read",
|
|
"lightning.manage",
|
|
"elevation.read",
|
|
"elevation.manage",
|
|
"atp.read",
|
|
"atp.manage",
|
|
"atp.run",
|
|
"celery.read",
|
|
"celery.manage",
|
|
"wine.read",
|
|
"wine.manage",
|
|
"question_bank.read",
|
|
"question_bank.manage",
|
|
}
|
|
|
|
ADMIN_ROLE_IDS = {
|
|
"admin",
|
|
"sys_mgr",
|
|
"sysadmin",
|
|
"administrator",
|
|
}
|
|
|
|
DISABLED_MENU_CODES: set[str] = {
|
|
"dashboard",
|
|
"admin.wxapp",
|
|
"admin.system_message",
|
|
"admin.inbox",
|
|
"admin.code_review",
|
|
"admin.git_desktop",
|
|
"admin.agent",
|
|
"admin.mcp_server",
|
|
"admin.requirements",
|
|
"admin.schedule",
|
|
"admin.mindmap",
|
|
"admin.mermaid_mgr",
|
|
"admin.chat",
|
|
"admin.api_tester",
|
|
"admin.models",
|
|
"admin.orchestration",
|
|
"admin.mdresolve",
|
|
"admin.data_query",
|
|
"admin.hot_search",
|
|
"admin.filedetector",
|
|
"admin.baidu_pan",
|
|
"admin.tag",
|
|
"admin.knowledge_point_mgr",
|
|
"admin.question_bank",
|
|
"admin.cron_task_mgr",
|
|
"admin.todos",
|
|
"admin.job_mgr",
|
|
"admin.jwt_generator",
|
|
"admin.queue_mgr",
|
|
"admin.knowledge_mastery",
|
|
}
|
|
|
|
MENU_CODE_PERMISSION_MAP: dict[str, set[str]] = {
|
|
# quiz legacy menu codes
|
|
"user_mgr": {"user.read", "user.manage"},
|
|
"role_mgr": {"role.read", "role.manage"},
|
|
"menu_mgr": {"menu.read", "menu.manage"},
|
|
"sys_mgr": {"menu.read", "menu.manage"},
|
|
# fquiz menu codes
|
|
"admin.users": {"user.read", "user.manage"},
|
|
"admin.roles": {"role.read", "role.manage"},
|
|
"admin.menus": {"menu.read", "menu.manage"},
|
|
"admin.system_params": {"system_param.read", "system_param.manage"},
|
|
"admin.files": {"file.read", "file.manage"},
|
|
"admin.elevation": {"elevation.read", "elevation.manage"},
|
|
"admin.workers": {"celery.read", "celery.manage"},
|
|
"admin.task_monitor": {"celery.read", "celery.manage"},
|
|
"admin.atp_models": {"atp.read", "atp.manage", "atp.run"},
|
|
"admin.lightning_currents": {"lightning.read", "lightning.manage"},
|
|
"admin.lightning_distribution": {"lightning.read", "lightning.manage"},
|
|
"admin.wine_runner": {"wine.read", "wine.manage"},
|
|
}
|
|
|
|
SYNTHETIC_LEGACY_MENU_ROWS: list[dict[str, Any]] = [
|
|
{
|
|
"menu_id": "admin.workers",
|
|
"menu_name": "admin.workers",
|
|
"menu_label": "Worker监控",
|
|
"menu_type": "MENU",
|
|
"parent_id": None,
|
|
"url": "/admin/workers",
|
|
"menu_icon": "DeploymentUnitOutlined",
|
|
"seq": 53,
|
|
"state": "ENABLED",
|
|
},
|
|
{
|
|
"menu_id": "admin.files",
|
|
"menu_name": "admin.files",
|
|
"menu_label": "文件管理",
|
|
"menu_type": "MENU",
|
|
"parent_id": None,
|
|
"url": "/admin/files",
|
|
"menu_icon": "FolderTree",
|
|
"seq": 56,
|
|
"state": "ENABLED",
|
|
},
|
|
{
|
|
"menu_id": "admin.elevation",
|
|
"menu_name": "admin.elevation",
|
|
"menu_label": "高程数据管理",
|
|
"menu_type": "MENU",
|
|
"parent_id": None,
|
|
"url": "/admin/elevation",
|
|
"menu_icon": "Database",
|
|
"seq": 57,
|
|
"state": "ENABLED",
|
|
},
|
|
{
|
|
"menu_id": "admin.task_monitor",
|
|
"menu_name": "admin.task_monitor",
|
|
"menu_label": "任务监控",
|
|
"menu_type": "MENU",
|
|
"parent_id": None,
|
|
"url": "/admin/task-monitor",
|
|
"menu_icon": "RadarChart",
|
|
"seq": 54,
|
|
"state": "ENABLED",
|
|
},
|
|
{
|
|
"menu_id": "admin.atp_models",
|
|
"menu_name": "admin.atp_models",
|
|
"menu_label": "ATP模型管理",
|
|
"menu_type": "MENU",
|
|
"parent_id": None,
|
|
"url": "/admin/power-lines/atp-viewer",
|
|
"menu_icon": "Experiment",
|
|
"seq": 55,
|
|
"state": "ENABLED",
|
|
},
|
|
{
|
|
"menu_id": "admin.wine_runner",
|
|
"menu_name": "admin.wine_runner",
|
|
"menu_label": "Wine执行器",
|
|
"menu_type": "MENU",
|
|
"parent_id": None,
|
|
"url": "/admin/wine-runner",
|
|
"menu_icon": "Terminal",
|
|
"seq": 65,
|
|
"state": "ENABLED",
|
|
},
|
|
]
|
|
|
|
LEGACY_URL_PATH_MAP = {
|
|
"user": "/admin/users",
|
|
"role": "/admin/roles",
|
|
"menu": "/admin/menus",
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class UserAuthorization:
|
|
role_codes: set[str]
|
|
permission_codes: set[str]
|
|
|
|
|
|
def normalize_user_status(raw_status: str | None) -> str:
|
|
state = (raw_status or "").strip().upper()
|
|
if state in {"ENABLED", "ACTIVE", "1", "TRUE"}:
|
|
return "active"
|
|
if state in {"DISABLED", "INACTIVE", "0", "FALSE"}:
|
|
return "disabled"
|
|
return "disabled" if not state else raw_status.strip().lower()
|
|
|
|
|
|
def is_user_enabled(raw_status: str | None) -> bool:
|
|
return normalize_user_status(raw_status) == "active"
|
|
|
|
|
|
def get_user_authorization(db: Session, user_id: str) -> UserAuthorization:
|
|
role_rows = _load_legacy_roles(db, user_id)
|
|
if not role_rows:
|
|
# Fallback for non-legacy tables.
|
|
role_rows = _load_modern_roles(db, user_id)
|
|
|
|
enabled_roles: list[tuple[str, str | None]] = []
|
|
for role_id, role_name, role_state in role_rows:
|
|
normalized_id = (role_id or "").strip()
|
|
if not normalized_id:
|
|
continue
|
|
if role_state is not None and not _is_role_enabled(role_state):
|
|
continue
|
|
enabled_roles.append((normalized_id, role_name))
|
|
|
|
role_codes = {role_id for role_id, _ in enabled_roles}
|
|
is_admin = any(_is_admin_role(role_id, role_name) for role_id, role_name in enabled_roles)
|
|
if not is_admin and not role_codes and _is_builtin_admin_user_id(user_id):
|
|
is_admin = True
|
|
if not is_admin and not role_codes and _should_apply_default_user_role_fallback(db):
|
|
role_codes.add("user")
|
|
if is_admin:
|
|
role_codes.add("admin")
|
|
|
|
permission_codes: set[str] = set()
|
|
if is_admin:
|
|
permission_codes.update(DEFAULT_ADMIN_PERMISSION_CODES)
|
|
elif role_codes:
|
|
permission_codes.update(_load_legacy_permissions(db, role_codes))
|
|
if not permission_codes:
|
|
permission_codes.update(_load_modern_permissions(db, role_codes))
|
|
|
|
return UserAuthorization(role_codes=role_codes, permission_codes=permission_codes)
|
|
|
|
|
|
def _is_builtin_admin_user_id(user_id: str) -> bool:
|
|
normalized = user_id.strip().lower()
|
|
return normalized in {"admin", "administrator", "root", "sysadmin"}
|
|
|
|
|
|
def _should_apply_default_user_role_fallback(db: Session) -> bool:
|
|
# Compatibility fallback: some legacy dumps contain user/role/menu tables
|
|
# but miss the user_role_rela mapping table used by the old auth queries.
|
|
if _legacy_user_role_relation_exists(db):
|
|
return False
|
|
if _legacy_user_role_exists(db):
|
|
return True
|
|
return _modern_user_role_exists(db)
|
|
|
|
|
|
def _legacy_user_role_relation_exists(db: Session) -> bool:
|
|
try:
|
|
result = db.scalar(text("SELECT to_regclass('public.user_role_rela')"))
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return True
|
|
return result is not None
|
|
|
|
|
|
def _legacy_user_role_exists(db: Session) -> bool:
|
|
try:
|
|
result = db.scalar(
|
|
text(
|
|
"""
|
|
SELECT id
|
|
FROM user_role
|
|
WHERE id = 'user' AND UPPER(COALESCE(state, 'ENABLED')) IN ('ENABLED', 'ACTIVE', '1', 'TRUE')
|
|
LIMIT 1
|
|
"""
|
|
)
|
|
)
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return False
|
|
return result is not None
|
|
|
|
|
|
def _modern_user_role_exists(db: Session) -> bool:
|
|
try:
|
|
from sqlalchemy import select
|
|
|
|
from ..models.rbac import Role
|
|
|
|
role_id = db.scalar(select(Role.id).where(Role.code == "user").limit(1))
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return False
|
|
return role_id is not None
|
|
|
|
|
|
def build_legacy_menu_tree(
|
|
db: Session,
|
|
*,
|
|
role_codes: set[str] | None,
|
|
) -> list[dict[str, Any]]:
|
|
rows = _load_legacy_menus(db)
|
|
if not rows:
|
|
return []
|
|
|
|
if role_codes is not None and "admin" not in role_codes:
|
|
allowed_menu_ids = _load_legacy_allowed_menu_ids(db, role_codes)
|
|
if not allowed_menu_ids:
|
|
return []
|
|
rows = [row for row in rows if row["menu_id"] in allowed_menu_ids]
|
|
|
|
# Keep only enabled menus for menu tree response.
|
|
rows = [row for row in rows if _is_menu_enabled(row.get("state"))]
|
|
rows = [row for row in rows if str(row.get("menu_name") or "").strip() not in DISABLED_MENU_CODES]
|
|
|
|
rows.sort(key=lambda row: ((row.get("seq") or 0), row.get("menu_id") or ""))
|
|
nodes: dict[str, dict[str, Any]] = {}
|
|
roots: list[dict[str, Any]] = []
|
|
for row in rows:
|
|
legacy_id = str(row["menu_id"])
|
|
menu_name = (row.get("menu_name") or legacy_id).strip()
|
|
node: dict[str, Any] = {
|
|
"id": legacy_id,
|
|
"code": menu_name,
|
|
"name": (row.get("menu_label") or menu_name).strip(),
|
|
"path": _normalize_menu_path(row.get("url")),
|
|
"icon": row.get("menu_icon"),
|
|
"parent_id": (str(row.get("parent_id")) if row.get("parent_id") else None),
|
|
"type": _normalize_menu_type(row.get("menu_type")),
|
|
"sort_order": int(row.get("seq") or 0),
|
|
"status": "enabled" if _is_menu_enabled(row.get("state")) else "disabled",
|
|
"visible": True,
|
|
"cacheable": False,
|
|
"component": None,
|
|
"permission_code": _primary_permission_for_menu(menu_name, row.get("menu_type")),
|
|
"children": [],
|
|
}
|
|
nodes[legacy_id] = node
|
|
|
|
for node in nodes.values():
|
|
parent_id = node.get("parent_id")
|
|
if parent_id and parent_id in nodes:
|
|
nodes[parent_id]["children"].append(node)
|
|
else:
|
|
roots.append(node)
|
|
|
|
_sort_menu_tree(roots)
|
|
return roots
|
|
|
|
|
|
def _sort_menu_tree(nodes: list[dict[str, Any]]) -> None:
|
|
nodes.sort(key=lambda item: (item["sort_order"], item["id"]))
|
|
for node in nodes:
|
|
children = node.get("children")
|
|
if isinstance(children, list):
|
|
_sort_menu_tree(children)
|
|
|
|
|
|
def _load_legacy_roles(db: Session, user_id: str) -> list[tuple[str, str | None, str | None]]:
|
|
stmt = text(
|
|
"""
|
|
SELECT ur.id AS role_id, ur.name AS role_name, ur.state AS role_state
|
|
FROM user_role_rela urr
|
|
JOIN user_role ur ON ur.id = urr.role_id
|
|
WHERE urr.user_id = :user_id
|
|
"""
|
|
)
|
|
try:
|
|
rows = db.execute(stmt, {"user_id": user_id}).mappings().all()
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return []
|
|
return [
|
|
(
|
|
str(row["role_id"]),
|
|
row.get("role_name"),
|
|
row.get("role_state"),
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
|
|
def _load_modern_roles(db: Session, user_id: str) -> list[tuple[str, str | None, str | None]]:
|
|
try:
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
# Ensure Role mapper is registered before resolving User.roles.
|
|
from ..models.rbac import Role as _Role # noqa: F401
|
|
from ..models.user import User
|
|
|
|
user = db.execute(
|
|
select(User).options(joinedload(User.roles)).where(User.id == user_id)
|
|
).unique().scalar_one_or_none()
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return []
|
|
if not user:
|
|
return []
|
|
roles = []
|
|
for role in getattr(user, "roles", []):
|
|
role_id = str(getattr(role, "code", None) or getattr(role, "id", ""))
|
|
role_name = getattr(role, "name", None)
|
|
role_state = getattr(role, "state", None)
|
|
if role_id:
|
|
roles.append((role_id, role_name, role_state))
|
|
return roles
|
|
|
|
|
|
def _load_legacy_permissions(db: Session, role_codes: set[str]) -> set[str]:
|
|
real_role_ids = sorted(code for code in role_codes if code != "admin")
|
|
if not real_role_ids:
|
|
return set()
|
|
|
|
stmt = text(
|
|
"""
|
|
SELECT DISTINCT m.menu_name, m.menu_type, m.state
|
|
FROM role_menu_rela rmr
|
|
JOIN menu m ON m.menu_id = rmr.menu_id
|
|
WHERE rmr.role_id IN :role_ids
|
|
"""
|
|
).bindparams(bindparam("role_ids", expanding=True))
|
|
try:
|
|
rows = db.execute(stmt, {"role_ids": real_role_ids}).mappings().all()
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return set()
|
|
|
|
permission_codes: set[str] = set()
|
|
for row in rows:
|
|
if not _is_menu_enabled(row.get("state")):
|
|
continue
|
|
menu_name = (row.get("menu_name") or "").strip()
|
|
if menu_name in DISABLED_MENU_CODES:
|
|
continue
|
|
permission_codes.update(_permissions_for_menu(menu_name, row.get("menu_type")))
|
|
return permission_codes
|
|
|
|
|
|
def _load_modern_permissions(db: Session, role_codes: set[str]) -> set[str]:
|
|
real_role_ids = sorted(code for code in role_codes if code != "admin")
|
|
if not real_role_ids:
|
|
return set()
|
|
|
|
try:
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import joinedload
|
|
|
|
from ..models.rbac import Role
|
|
|
|
roles = db.execute(
|
|
select(Role)
|
|
.options(joinedload(Role.permissions))
|
|
.where(Role.code.in_(real_role_ids))
|
|
).unique().scalars().all()
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return set()
|
|
|
|
return {
|
|
permission.code
|
|
for role in roles
|
|
for permission in getattr(role, "permissions", [])
|
|
if getattr(permission, "code", None)
|
|
}
|
|
|
|
|
|
def _load_legacy_menus(db: Session) -> list[dict[str, Any]]:
|
|
stmt = text(
|
|
"""
|
|
SELECT
|
|
m.menu_id,
|
|
m.menu_name,
|
|
m.menu_label,
|
|
m.menu_type,
|
|
m.parent_id,
|
|
m.url,
|
|
m.menu_icon,
|
|
m.seq,
|
|
m.state
|
|
FROM menu m
|
|
"""
|
|
)
|
|
try:
|
|
rows = db.execute(stmt).mappings().all()
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return []
|
|
items = [dict(row) for row in rows]
|
|
existing_codes = {str(row.get("menu_name") or "").strip() for row in items}
|
|
for row in SYNTHETIC_LEGACY_MENU_ROWS:
|
|
if str(row["menu_name"]) not in existing_codes:
|
|
items.append(dict(row))
|
|
return items
|
|
|
|
|
|
def _load_legacy_allowed_menu_ids(db: Session, role_codes: set[str]) -> set[str]:
|
|
real_role_ids = sorted(code for code in role_codes if code != "admin")
|
|
if not real_role_ids:
|
|
return set()
|
|
|
|
stmt = text(
|
|
"""
|
|
SELECT DISTINCT menu_id
|
|
FROM role_menu_rela
|
|
WHERE role_id IN :role_ids
|
|
"""
|
|
).bindparams(bindparam("role_ids", expanding=True))
|
|
try:
|
|
rows = db.execute(stmt, {"role_ids": real_role_ids}).all()
|
|
except SQLAlchemyError:
|
|
_rollback_safely(db)
|
|
return set()
|
|
return {str(row[0]) for row in rows}
|
|
|
|
|
|
def _rollback_safely(db: Session) -> None:
|
|
try:
|
|
db.rollback()
|
|
except SQLAlchemyError:
|
|
return
|
|
|
|
|
|
def _is_role_enabled(raw_state: str) -> bool:
|
|
state = raw_state.strip().upper()
|
|
return state in {"ENABLED", "ACTIVE", "1", "TRUE"}
|
|
|
|
|
|
def _is_admin_role(role_id: str, role_name: str | None) -> bool:
|
|
normalized_id = role_id.strip().lower()
|
|
if normalized_id in ADMIN_ROLE_IDS:
|
|
return True
|
|
normalized_name = (role_name or "").strip().lower()
|
|
return "admin" in normalized_name or "管理员" in (role_name or "")
|
|
|
|
|
|
def _is_menu_enabled(raw_state: str | None) -> bool:
|
|
state = (raw_state or "").strip().upper()
|
|
return state in {"ENABLED", "ACTIVE", "1", "TRUE", ""}
|
|
|
|
|
|
def _permissions_for_menu(menu_name: str, menu_type: str | None) -> set[str]:
|
|
if not menu_name:
|
|
return set()
|
|
mapped = MENU_CODE_PERMISSION_MAP.get(menu_name)
|
|
if mapped:
|
|
return set(mapped)
|
|
if (menu_type or "").strip().upper() == "BUTTON" and "." in menu_name:
|
|
return {menu_name}
|
|
return set()
|
|
|
|
|
|
def _primary_permission_for_menu(menu_name: str, menu_type: str | None) -> str | None:
|
|
permissions = sorted(_permissions_for_menu(menu_name, menu_type))
|
|
return permissions[0] if permissions else None
|
|
|
|
|
|
def _normalize_menu_type(raw_menu_type: str | None) -> str:
|
|
menu_type = (raw_menu_type or "").strip().lower()
|
|
if menu_type in {"directory", "button", "menu"}:
|
|
return menu_type
|
|
return "menu"
|
|
|
|
|
|
def _normalize_menu_path(raw_url: str | None) -> str | None:
|
|
url = (raw_url or "").strip()
|
|
if not url:
|
|
return None
|
|
if url in LEGACY_URL_PATH_MAP:
|
|
return LEGACY_URL_PATH_MAP[url]
|
|
if url.startswith(("http://", "https://", "/")):
|
|
return url
|
|
return f"/admin/{url}"
|