修复角色列表接口在缺少user_role表时回退modern角色表

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
2026-05-01 13:51:42 +08:00
parent 9a849077f1
commit 2c3ad31ec8
+165 -39
View File
@@ -103,30 +103,30 @@ PROTECTED_MENU_CODES = {
def list_roles(db: Session) -> RoleListResponse:
rows = db.execute(
text(
"""
SELECT id, name
FROM user_role
ORDER BY create_date DESC NULLS LAST, id ASC
"""
)
).mappings().all()
role_source = "legacy" if _legacy_role_table_exists(db) else "modern"
rows = _load_role_rows(db, role_source=role_source)
role_ids = [str(row["id"]) for row in rows]
role_menu_ids = _load_role_menu_ids_map(db, role_ids)
menu_rows = _load_menus_map(db, sorted({menu_id for ids in role_menu_ids.values() for menu_id in ids}))
role_menu_ids = _load_role_menu_ids_map(db, role_ids, role_source=role_source)
menu_rows = _load_menus_map(
db,
sorted({menu_id for ids in role_menu_ids.values() for menu_id in ids}),
role_source=role_source,
)
role_permission_codes = _load_role_permission_codes_map(db, role_ids, role_source=role_source)
items: list[RolePublic] = []
for row in rows:
role_id = str(row["id"])
role_code = role_id if role_source == "legacy" else str(row.get("code") or role_id).strip() or role_id
menu_ids = sorted(menu_id for menu_id in role_menu_ids.get(role_id, []) if menu_id in menu_rows)
permission_codes = sorted(_permission_codes_from_menu_rows(menu_rows, menu_ids))
permission_codes = set(role_permission_codes.get(role_id, []))
permission_codes.update(_permission_codes_from_menu_rows(menu_rows, menu_ids))
items.append(
RolePublic(
id=role_id,
code=role_id,
name=(row.get("name") or role_id).strip(),
permission_codes=permission_codes,
code=role_code,
name=(row.get("name") or role_code).strip(),
permission_codes=sorted(permission_codes),
menu_ids=menu_ids,
)
)
@@ -137,20 +137,41 @@ def get_role_by_id(db: Session, role_id: str) -> RolePublic | None:
role_id = role_id.strip()
if not role_id:
return None
rows = db.execute(
text("SELECT id, name FROM user_role WHERE id = :id"),
{"id": role_id},
).mappings().all()
role_source = "legacy" if _legacy_role_table_exists(db) else "modern"
if role_source == "legacy":
rows = db.execute(
text("SELECT id, name FROM user_role WHERE id = :id"),
{"id": role_id},
).mappings().all()
role_code = role_id
resolved_role_id = role_id
else:
rows = db.execute(
text(
"""
SELECT id::text AS id, code, name
FROM roles
WHERE id::text = :id OR code = :id
LIMIT 1
"""
),
{"id": role_id},
).mappings().all()
resolved_role_id = str(rows[0]["id"]) if rows else ""
role_code = str(rows[0].get("code") or resolved_role_id).strip() if rows else ""
if not rows:
return None
role_menu_ids = _load_role_menu_ids_map(db, [role_id]).get(role_id, [])
menu_rows = _load_menus_map(db, role_menu_ids)
role_menu_ids = _load_role_menu_ids_map(db, [resolved_role_id], role_source=role_source).get(resolved_role_id, [])
menu_rows = _load_menus_map(db, role_menu_ids, role_source=role_source)
role_permission_codes = _load_role_permission_codes_map(db, [resolved_role_id], role_source=role_source)
filtered_menu_ids = sorted(menu_id for menu_id in role_menu_ids if menu_id in menu_rows)
permission_codes = set(role_permission_codes.get(resolved_role_id, []))
permission_codes.update(_permission_codes_from_menu_rows(menu_rows, filtered_menu_ids))
return RolePublic(
id=role_id,
code=role_id,
name=(rows[0].get("name") or role_id).strip(),
permission_codes=sorted(_permission_codes_from_menu_rows(menu_rows, filtered_menu_ids)),
id=resolved_role_id,
code=role_code or resolved_role_id,
name=(rows[0].get("name") or role_code or resolved_role_id).strip(),
permission_codes=sorted(permission_codes),
menu_ids=filtered_menu_ids,
)
@@ -518,15 +539,37 @@ def delete_menu(db: Session, menu_id: str) -> bool:
def list_role_menu_ids(db: Session, role_id: str) -> list[str] | None:
role_exists = db.scalar(text("SELECT id FROM user_role WHERE id = :id"), {"id": role_id})
if not role_exists:
return None
role_source = "legacy" if _legacy_role_table_exists(db) else "modern"
resolved_role_id = role_id
if role_source == "legacy":
role_exists = db.scalar(text("SELECT id FROM user_role WHERE id = :id"), {"id": role_id})
if not role_exists:
return None
else:
role_row = db.execute(
text(
"""
SELECT id::text AS id
FROM roles
WHERE id::text = :id OR code = :id
LIMIT 1
"""
),
{"id": role_id},
).mappings().first()
if not role_row:
return None
resolved_role_id = str(role_row["id"])
rows = db.execute(
text("SELECT menu_id FROM role_menu_rela WHERE role_id = :role_id ORDER BY menu_id ASC"),
{"role_id": role_id},
text(
"SELECT menu_id FROM role_menu_rela WHERE role_id = :role_id ORDER BY menu_id ASC"
if role_source == "legacy"
else "SELECT menu_id::text AS menu_id FROM role_menus WHERE role_id::text = :role_id ORDER BY menu_id ASC"
),
{"role_id": resolved_role_id},
).all()
menu_ids = [str(row[0]) for row in rows]
menu_rows = _load_menus_map(db, menu_ids)
menu_rows = _load_menus_map(db, menu_ids, role_source=role_source)
return [menu_id for menu_id in menu_ids if menu_id in menu_rows]
@@ -587,19 +630,30 @@ def serialize_menu_row(row: dict[str, object]) -> MenuPublic:
)
def _load_role_menu_ids_map(db: Session, role_ids: list[str]) -> dict[str, list[str]]:
def _load_role_menu_ids_map(db: Session, role_ids: list[str], *, role_source: str = "legacy") -> dict[str, list[str]]:
mapping = {role_id: [] for role_id in role_ids}
if not role_ids:
return mapping
rows = db.execute(
text(
if role_source == "legacy":
stmt = text(
"""
SELECT role_id, menu_id
FROM role_menu_rela
WHERE role_id IN :role_ids
ORDER BY menu_id ASC
"""
).bindparams(bindparam("role_ids", expanding=True)),
)
else:
stmt = text(
"""
SELECT role_id::text AS role_id, menu_id::text AS menu_id
FROM role_menus
WHERE role_id::text IN :role_ids
ORDER BY menu_id ASC
"""
)
rows = db.execute(
stmt.bindparams(bindparam("role_ids", expanding=True)),
{"role_ids": role_ids},
).mappings().all()
for row in rows:
@@ -607,17 +661,27 @@ def _load_role_menu_ids_map(db: Session, role_ids: list[str]) -> dict[str, list[
return mapping
def _load_menus_map(db: Session, menu_ids: list[str]) -> dict[str, dict[str, object]]:
def _load_menus_map(db: Session, menu_ids: list[str], *, role_source: str = "legacy") -> dict[str, dict[str, object]]:
if not menu_ids:
return {}
rows = db.execute(
text(
if role_source == "legacy":
stmt = text(
"""
SELECT menu_id, menu_name, menu_type, state
FROM menu
WHERE menu_id IN :menu_ids
"""
).bindparams(bindparam("menu_ids", expanding=True)),
)
else:
stmt = text(
"""
SELECT id::text AS menu_id, code AS menu_name, type AS menu_type, status AS state
FROM menus
WHERE id::text IN :menu_ids
"""
)
rows = db.execute(
stmt.bindparams(bindparam("menu_ids", expanding=True)),
{"menu_ids": menu_ids},
).mappings().all()
return {
@@ -627,6 +691,68 @@ def _load_menus_map(db: Session, menu_ids: list[str]) -> dict[str, dict[str, obj
}
def _load_role_rows(db: Session, *, role_source: str) -> list[dict[str, object]]:
if role_source == "legacy":
rows = db.execute(
text(
"""
SELECT id, name
FROM user_role
ORDER BY create_date DESC NULLS LAST, id ASC
"""
)
).mappings().all()
else:
rows = db.execute(
text(
"""
SELECT id::text AS id, code, name
FROM roles
ORDER BY id ASC
"""
)
).mappings().all()
return [dict(row) for row in rows]
def _load_role_permission_codes_map(
db: Session,
role_ids: list[str],
*,
role_source: str,
) -> dict[str, list[str]]:
mapping = {role_id: [] for role_id in role_ids}
if not role_ids or role_source == "legacy":
return mapping
rows = db.execute(
text(
"""
SELECT rp.role_id::text AS role_id, p.code AS permission_code
FROM role_permissions rp
JOIN permissions p ON p.id = rp.permission_id
WHERE rp.role_id::text IN :role_ids
ORDER BY p.code ASC
"""
).bindparams(bindparam("role_ids", expanding=True)),
{"role_ids": role_ids},
).mappings().all()
for row in rows:
role_key = str(row["role_id"])
code = str(row.get("permission_code") or "").strip()
if not code:
continue
mapping.setdefault(role_key, []).append(code)
return mapping
def _legacy_role_table_exists(db: Session) -> bool:
try:
return bool(db.scalar(text("SELECT to_regclass('public.user_role')")))
except SQLAlchemyError:
db.rollback()
return False
def _permission_codes_from_menu_rows(
menu_rows: dict[str, dict[str, object]],
menu_ids: list[str],