@@ -45,10 +45,12 @@ def create_user_account(
|
||||
def list_all_users(
|
||||
limit: int = Query(default=50, ge=1, le=200),
|
||||
offset: int = Query(default=0, ge=0),
|
||||
keyword: str | None = Query(default=None, max_length=128),
|
||||
status_filter: str | None = Query(default=None, alias="status"),
|
||||
_: CurrentUser = Depends(require_permission("user.manage")),
|
||||
db: Session = Depends(get_db),
|
||||
) -> UserListResponse:
|
||||
return list_users(db, limit=limit, offset=offset)
|
||||
return list_users(db, limit=limit, offset=offset, keyword=keyword, status=status_filter)
|
||||
|
||||
|
||||
@router.get("/{user_id}", response_model=UserPublic)
|
||||
@@ -81,7 +83,7 @@ def update_user_profile(
|
||||
if not updated:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail="User not found or username exists",
|
||||
detail="User not found or email/username exists",
|
||||
)
|
||||
return updated
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ class UserListResponse(BaseModel):
|
||||
|
||||
|
||||
class UserUpdateRequest(BaseModel):
|
||||
email: str | None = None
|
||||
username: str | None = Field(default=None, min_length=3, max_length=64)
|
||||
status: Literal["active", "disabled", "enabled"] | None = None
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
from uuid import uuid4
|
||||
|
||||
from sqlalchemy import and_, bindparam, func, select, text
|
||||
from sqlalchemy import and_, bindparam, func, or_, select, text
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from sqlalchemy.orm import Session, object_session
|
||||
|
||||
@@ -33,14 +33,40 @@ def _user_with_rbac_stmt():
|
||||
return select(User)
|
||||
|
||||
|
||||
def list_users(db: Session, *, limit: int, offset: int) -> UserListResponse:
|
||||
total = db.scalar(select(func.count()).select_from(User)) or 0
|
||||
stmt = (
|
||||
_user_with_rbac_stmt()
|
||||
.order_by(User.created_at.desc())
|
||||
.offset(offset)
|
||||
.limit(limit)
|
||||
)
|
||||
def list_users(
|
||||
db: Session,
|
||||
*,
|
||||
limit: int,
|
||||
offset: int,
|
||||
keyword: str | None = None,
|
||||
status: str | None = None,
|
||||
) -> UserListResponse:
|
||||
conditions = []
|
||||
normalized_keyword = (keyword or "").strip()
|
||||
if normalized_keyword:
|
||||
like = f"%{normalized_keyword}%"
|
||||
conditions.append(
|
||||
or_(
|
||||
User.id.ilike(like),
|
||||
User.email.ilike(like),
|
||||
User.username.ilike(like),
|
||||
)
|
||||
)
|
||||
normalized_status = (status or "").strip().lower()
|
||||
if normalized_status in {"active", "enabled"}:
|
||||
conditions.append(User.status.in_(["active", "ACTIVE", "ENABLED"]))
|
||||
elif normalized_status == "disabled":
|
||||
conditions.append(User.status.in_(["disabled", "DISABLED", "INACTIVE"]))
|
||||
|
||||
total_stmt = select(func.count()).select_from(User)
|
||||
if conditions:
|
||||
total_stmt = total_stmt.where(*conditions)
|
||||
total = db.scalar(total_stmt) or 0
|
||||
|
||||
stmt = _user_with_rbac_stmt().order_by(User.created_at.desc())
|
||||
if conditions:
|
||||
stmt = stmt.where(*conditions)
|
||||
stmt = stmt.offset(offset).limit(limit)
|
||||
users = db.execute(stmt).unique().scalars().all()
|
||||
return UserListResponse(items=[serialize_user(user) for user in users], total=total)
|
||||
|
||||
@@ -168,13 +194,29 @@ def update_user(
|
||||
if not user:
|
||||
return None
|
||||
|
||||
if payload.username and payload.username != user.username:
|
||||
duplicate = db.scalar(
|
||||
select(User.id).where(User.username == payload.username, User.id != user.id)
|
||||
)
|
||||
if duplicate:
|
||||
if payload.email is not None:
|
||||
next_email = payload.email.strip().lower()
|
||||
if not next_email:
|
||||
return None
|
||||
user.username = payload.username
|
||||
if next_email != user.email:
|
||||
duplicate = db.scalar(
|
||||
select(User.id).where(User.email == next_email, User.id != user.id)
|
||||
)
|
||||
if duplicate:
|
||||
return None
|
||||
user.email = next_email
|
||||
|
||||
if payload.username is not None:
|
||||
next_username = payload.username.strip()
|
||||
if not next_username:
|
||||
return None
|
||||
if next_username != user.username:
|
||||
duplicate = db.scalar(
|
||||
select(User.id).where(User.username == next_username, User.id != user.id)
|
||||
)
|
||||
if duplicate:
|
||||
return None
|
||||
user.username = next_username
|
||||
|
||||
status_changed = False
|
||||
if payload.status:
|
||||
|
||||
Reference in New Issue
Block a user