From 9ba1cc4388c7a48020086cb61865b92e01fb64f7 Mon Sep 17 00:00:00 2001 From: chengkai3 Date: Sun, 14 Jun 2026 01:06:53 +0800 Subject: [PATCH] =?UTF-8?q?[feat]:[FL-118][=E5=A2=9E=E5=8A=A0=E5=AF=86?= =?UTF-8?q?=E7=A0=81=E9=94=99=E8=AF=AF5=E6=AC=A1=E7=A6=81=E6=AD=A2?= =?UTF-8?q?=E7=99=BB=E5=BD=95=E5=8D=8A=E5=B0=8F=E6=97=B6=E5=8A=9F=E8=83=BD?= =?UTF-8?q?]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 在User模型添加failed_login_attempts和failed_login_locked_until字段 - 在database.py添加字段迁移兼容性函数_ensure_user_login_lockout_column_compatibility - 修改auth_service.py的login_user函数实现登录锁定逻辑: * 检查账户是否处于锁定状态 * 密码错误时递增失败计数 * 失败5次后锁定账户30分钟 * 登录成功后重置失败计数和锁定状态 - 添加单元测试test_login_lockout.py验证功能 Co-authored-by: multica-agent --- api/app/core/database.py | 42 ++++++++ api/app/models/user.py | 11 +- api/app/services/auth_service.py | 73 ++++++++++++- api/tests/test_login_lockout.py | 169 +++++++++++++++++++++++++++++++ 4 files changed, 292 insertions(+), 3 deletions(-) create mode 100644 api/tests/test_login_lockout.py diff --git a/api/app/core/database.py b/api/app/core/database.py index 5b0f9af..863da64 100644 --- a/api/app/core/database.py +++ b/api/app/core/database.py @@ -479,6 +479,47 @@ def _ensure_tower_profile_column_compatibility() -> None: ) +def _ensure_user_login_lockout_column_compatibility() -> None: + """ + Keep `users` login lockout columns aligned with the current ORM mapping. + """ + if not database_url.startswith("postgresql"): + return + + schema = settings.resolved_db_schema + with engine.begin() as connection: + db_inspector = inspect(connection) + if not db_inspector.has_table("users", schema=schema): + return + + column_names = { + column["name"] + for column in db_inspector.get_columns("users", schema=schema) + } + + if "failed_login_attempts" not in column_names: + connection.execute( + text("ALTER TABLE users ADD COLUMN IF NOT EXISTS failed_login_attempts INTEGER"), + ) + connection.execute( + text("UPDATE users SET failed_login_attempts = 0 WHERE failed_login_attempts IS NULL"), + ) + connection.execute( + text("ALTER TABLE users ALTER COLUMN failed_login_attempts SET NOT NULL"), + ) + logger.warning( + "Detected missing users.failed_login_attempts; added with default 0.", + ) + + if "failed_login_locked_until" not in column_names: + connection.execute( + text("ALTER TABLE users ADD COLUMN IF NOT EXISTS failed_login_locked_until TIMESTAMP"), + ) + logger.warning( + "Detected missing users.failed_login_locked_until; added nullable lockout time column.", + ) + + def get_db() -> Generator[Session, None, None]: db = SessionLocal() try: @@ -515,6 +556,7 @@ def init_db() -> None: _ensure_user_pk_column_compatibility() _ensure_user_timestamp_column_compatibility() _ensure_user_audit_column_compatibility() + _ensure_user_login_lockout_column_compatibility() _ensure_elevation_dataset_column_compatibility() _ensure_atp_simulation_run_column_compatibility() _ensure_tower_model_column_compatibility() diff --git a/api/app/models/user.py b/api/app/models/user.py index ef5bb14..f2ac271 100644 --- a/api/app/models/user.py +++ b/api/app/models/user.py @@ -4,7 +4,7 @@ from datetime import datetime from typing import TYPE_CHECKING from uuid import uuid4 -from sqlalchemy import DateTime, String +from sqlalchemy import DateTime, Integer, String from sqlalchemy.orm import Mapped, mapped_column, relationship from ..core.config import get_settings @@ -56,6 +56,15 @@ class User(Base): default=utcnow, onupdate=utcnow, ) + failed_login_attempts: Mapped[int] = mapped_column( + Integer, + default=0, + nullable=False, + ) + failed_login_locked_until: Mapped[datetime | None] = mapped_column( + DateTime(timezone=False), + nullable=True, + ) @property def last_login_at(self) -> datetime | None: return self.updated_at diff --git a/api/app/services/auth_service.py b/api/app/services/auth_service.py index 3ddf949..c5d41c5 100644 --- a/api/app/services/auth_service.py +++ b/api/app/services/auth_service.py @@ -101,14 +101,79 @@ def login_user( ) -> AuthResult: requested_user_id = payload.user_id.strip() user = get_user_by_id(db, requested_user_id) - if not user or not verify_password(payload.password, user.password_hash): + + # Check if user exists + if not user: write_audit_log( db, action="auth.login_failed", - actor_user_id=user.id if user else None, + actor_user_id=None, + detail=compose_audit_detail( + f"attempted_user_id={requested_user_id}", + "reason=user_not_found", + ), + ) + db.commit() + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid user_id or password", + ) + + # Check if account is locked + now = utcnow() + if user.failed_login_locked_until and user.failed_login_locked_until > now: + remaining_seconds = int((user.failed_login_locked_until - now).total_seconds()) + write_audit_log( + db, + action="auth.login_failed", + actor_user_id=user.id, + detail=compose_audit_detail( + f"attempted_user_id={requested_user_id}", + f"username={user.username}", + "reason=account_locked", + f"remaining_seconds={remaining_seconds}", + ), + ) + db.commit() + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail=f"Account is locked. Please try again in {remaining_seconds} seconds.", + ) + + # Verify password + if not verify_password(payload.password, user.password_hash): + # Increment failed login attempts + user.failed_login_attempts += 1 + + # Lock account if attempts >= 5 + if user.failed_login_attempts >= 5: + user.failed_login_locked_until = now + timedelta(minutes=30) + write_audit_log( + db, + action="auth.login_failed", + actor_user_id=user.id, + detail=compose_audit_detail( + f"attempted_user_id={requested_user_id}", + "reason=invalid_credentials", + f"failed_attempts={user.failed_login_attempts}", + "account_locked=true", + "lock_duration_minutes=30", + ), + ) + db.commit() + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Too many failed login attempts. Account locked for 30 minutes.", + ) + + write_audit_log( + db, + action="auth.login_failed", + actor_user_id=user.id, detail=compose_audit_detail( f"attempted_user_id={requested_user_id}", "reason=invalid_credentials", + f"failed_attempts={user.failed_login_attempts}", ), ) db.commit() @@ -134,6 +199,10 @@ def login_user( detail="User is disabled", ) + # Reset failed login attempts on successful login + user.failed_login_attempts = 0 + user.failed_login_locked_until = None + return issue_auth_result_for_user( db, user_id=user.id, diff --git a/api/tests/test_login_lockout.py b/api/tests/test_login_lockout.py new file mode 100644 index 0000000..815a434 --- /dev/null +++ b/api/tests/test_login_lockout.py @@ -0,0 +1,169 @@ +"""Test login lockout functionality.""" +from __future__ import annotations + +import os +import unittest +from datetime import timedelta + +os.environ.setdefault("DATABASE_URL", "sqlite+pysqlite:///:memory:") +os.environ.setdefault("MINIO_ENABLED", "false") + +from fastapi import HTTPException +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import StaticPool + +from api.app.core.database import Base +from api.app.models.base import utcnow +from api.app.models.user import User +from api.app.schemas.auth import LoginRequest +from api.app.services.auth_service import login_user +from api.app.core.security import hash_password + + +class LoginLockoutTestCase(unittest.TestCase): + def setUp(self) -> None: + self.engine = create_engine( + "sqlite+pysqlite://", + connect_args={"check_same_thread": False}, + poolclass=StaticPool, + ) + self.SessionLocal = sessionmaker( + bind=self.engine, + autocommit=False, + autoflush=False, + expire_on_commit=False, + ) + Base.metadata.create_all(bind=self.engine) + + def tearDown(self) -> None: + Base.metadata.drop_all(bind=self.engine) + self.engine.dispose() + + def test_login_lockout_after_5_failed_attempts(self) -> None: + """Test that account is locked after 5 failed login attempts.""" + db = self.SessionLocal() + try: + # Create test user + test_user = User( + id="test_lockout_user", + email="lockout@test.com", + username="lockout_user", + password_hash=hash_password("correct_password"), + status="ENABLED", + failed_login_attempts=0, + ) + db.add(test_user) + db.commit() + + # Attempt login 5 times with wrong password + for i in range(5): + with self.assertRaises(HTTPException) as exc_context: + login_user( + db, + LoginRequest(user_id="test_lockout_user", password="wrong_password"), + user_agent=None, + ip_address=None, + ) + + # First 4 attempts should return 401 + if i < 4: + self.assertEqual(exc_context.exception.status_code, 401) + # 5th attempt should lock account and return 403 + else: + self.assertEqual(exc_context.exception.status_code, 403) + self.assertIn("locked", exc_context.exception.detail.lower()) + + # Verify user is locked + db.refresh(test_user) + self.assertEqual(test_user.failed_login_attempts, 5) + self.assertIsNotNone(test_user.failed_login_locked_until) + self.assertGreater(test_user.failed_login_locked_until, utcnow()) + + # Attempt login with correct password should still fail due to lock + with self.assertRaises(HTTPException) as exc_context: + login_user( + db, + LoginRequest(user_id="test_lockout_user", password="correct_password"), + user_agent=None, + ip_address=None, + ) + self.assertEqual(exc_context.exception.status_code, 403) + self.assertIn("locked", exc_context.exception.detail.lower()) + + finally: + db.close() + + def test_login_success_resets_failed_attempts(self) -> None: + """Test that successful login resets failed login attempts.""" + db = self.SessionLocal() + try: + # Create test user with some failed attempts + test_user = User( + id="test_reset_user", + email="reset@test.com", + username="reset_user", + password_hash=hash_password("correct_password"), + status="ENABLED", + failed_login_attempts=3, + ) + db.add(test_user) + db.commit() + + # Successful login should reset counter + result = login_user( + db, + LoginRequest(user_id="test_reset_user", password="correct_password"), + user_agent=None, + ip_address=None, + ) + + self.assertEqual(result.user.id, "test_reset_user") + + # Verify failed attempts are reset + db.refresh(test_user) + self.assertEqual(test_user.failed_login_attempts, 0) + self.assertIsNone(test_user.failed_login_locked_until) + + finally: + db.close() + + def test_login_after_lock_expires(self) -> None: + """Test that user can login after lock period expires.""" + db = self.SessionLocal() + try: + # Create test user locked in the past + past_time = utcnow() - timedelta(minutes=1) + test_user = User( + id="test_expired_lock_user", + email="expired@test.com", + username="expired_user", + password_hash=hash_password("correct_password"), + status="ENABLED", + failed_login_attempts=5, + failed_login_locked_until=past_time, + ) + db.add(test_user) + db.commit() + + # Login should succeed since lock has expired + result = login_user( + db, + LoginRequest(user_id="test_expired_lock_user", password="correct_password"), + user_agent=None, + ip_address=None, + ) + + self.assertEqual(result.user.id, "test_expired_lock_user") + + # Verify failed attempts are reset + db.refresh(test_user) + self.assertEqual(test_user.failed_login_attempts, 0) + self.assertIsNone(test_user.failed_login_locked_until) + + finally: + db.close() + + +if __name__ == "__main__": + unittest.main()