[fix/feat]:[FL-42][清理题库 Markdown 导题 热搜遗留模块]

Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
chengkai3
2026-06-08 12:46:35 +08:00
parent a6bb67752c
commit 5a41cd4d3d
21 changed files with 36 additions and 2366 deletions
-2
View File
@@ -10,7 +10,6 @@ from .v1.fl_analysis import router as fl_analysis_router
from .v1.flower_monitor import router as flower_monitor_router
from .v1.lightning import router as lightning_router
from .v1.lines import router as lines_router
from .v1.question_bank import router as question_bank_router
from .v1.system_params import router as system_params_router
from .v1.task_monitor import router as task_monitor_router
from .v1.tower_models import router as tower_models_router
@@ -35,7 +34,6 @@ v1_router.include_router(lightning_router)
v1_router.include_router(lines_router)
v1_router.include_router(tower_models_router)
v1_router.include_router(tower_profiles_router)
v1_router.include_router(question_bank_router)
v1_router.include_router(wine_router)
v1_router.include_router(ws_router)
-92
View File
@@ -1,92 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from ...core.database import get_db
from ...core.dependencies import CurrentUser, require_any_permission, require_permission
from ...schemas.hot_search import (
HotSearchFollowTopicCreateRequest,
HotSearchFollowTopicListResponse,
HotSearchFollowTopicSummary,
HotSearchFollowTopicUpdateRequest,
HotSearchListResponse,
HotSearchQueryRequest,
HotSearchRecordSummary,
)
from ...services.hot_search_service import (
create_hot_search_follow_topic,
delete_hot_search_follow_topic,
get_hot_search_record,
list_hot_search_follow_topics,
list_hot_search_records,
update_hot_search_follow_topic,
)
router = APIRouter(prefix="/admin/hot-search", tags=["admin-hot-search"])
@router.get("/follow-topics", response_model=HotSearchFollowTopicListResponse)
def list_follow_topics_endpoint(
_: CurrentUser = Depends(require_any_permission("question_bank.read", "question_bank.manage")),
db: Session = Depends(get_db),
) -> HotSearchFollowTopicListResponse:
return list_hot_search_follow_topics(db)
@router.post("/follow-topics", response_model=HotSearchFollowTopicSummary)
def create_follow_topic_endpoint(
payload: HotSearchFollowTopicCreateRequest,
current_user: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> HotSearchFollowTopicSummary:
item = create_hot_search_follow_topic(db, payload, actor_user_id=current_user.user.id)
if not item:
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Follow topic already exists")
return item
@router.patch("/follow-topics/{topic_id}", response_model=HotSearchFollowTopicSummary)
def update_follow_topic_endpoint(
topic_id: int,
payload: HotSearchFollowTopicUpdateRequest,
current_user: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> HotSearchFollowTopicSummary:
item, error = update_hot_search_follow_topic(db, topic_id, payload, actor_user_id=current_user.user.id)
if not item:
if error == "duplicate_topic_name":
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail="Follow topic already exists")
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Follow topic not found")
return item
@router.delete("/follow-topics/{topic_id}")
def delete_follow_topic_endpoint(
topic_id: int,
_: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> dict[str, bool]:
deleted = delete_hot_search_follow_topic(db, topic_id)
if not deleted:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Follow topic not found")
return {"success": True}
@router.post("/search", response_model=HotSearchListResponse)
def search_hot_search_records(
payload: HotSearchQueryRequest,
_: CurrentUser = Depends(require_any_permission("question_bank.read", "question_bank.manage")),
db: Session = Depends(get_db),
) -> HotSearchListResponse:
return list_hot_search_records(db, payload)
@router.get("/{record_id}", response_model=HotSearchRecordSummary)
def get_hot_search_record_detail(
record_id: int,
_: CurrentUser = Depends(require_any_permission("question_bank.read", "question_bank.manage")),
db: Session = Depends(get_db),
) -> HotSearchRecordSummary:
item = get_hot_search_record(db, record_id)
if not item:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Hot search record not found")
return item
-31
View File
@@ -1,31 +0,0 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from ...core.database import get_db
from ...core.dependencies import CurrentUser, require_any_permission, require_permission
from ...schemas.mdresolve import (
MdResolveImportRequest,
MdResolveImportResponse,
MdResolveParseRequest,
MdResolveParseResponse,
)
from ...services.mdresolve_service import import_drafts_to_question_bank, parse_markdown_to_drafts
router = APIRouter(prefix="/admin/mdresolve", tags=["admin-mdresolve"])
@router.post("/parse", response_model=MdResolveParseResponse)
def parse_markdown_endpoint(
payload: MdResolveParseRequest,
_: CurrentUser = Depends(require_any_permission("question_bank.read", "question_bank.manage")),
) -> MdResolveParseResponse:
return parse_markdown_to_drafts(payload)
@router.post("/import", response_model=MdResolveImportResponse)
def import_markdown_endpoint(
payload: MdResolveImportRequest,
current_user: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> MdResolveImportResponse:
return import_drafts_to_question_bank(db, payload, actor_user_id=current_user.user.id)
-121
View File
@@ -1,121 +0,0 @@
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session
from ...core.database import get_db
from ...core.dependencies import CurrentUser, require_any_permission, require_permission
from ...schemas.question_bank import (
QuestionBankCreateRequest,
QuestionBankListResponse,
QuestionBankSummary,
QuestionBankUpdateRequest,
QuestionTagDeleteRequest,
QuestionTagListResponse,
QuestionTagMutationResponse,
QuestionTagRenameRequest,
)
from ...services.question_bank_service import (
create_question,
delete_question,
delete_question_tag,
get_question_by_id,
list_question_tags,
list_questions,
rename_question_tag,
serialize_question,
update_question,
)
router = APIRouter(prefix="/admin/question-bank", tags=["admin-question-bank"])
@router.get("", response_model=QuestionBankListResponse)
def get_question_list(
keyword: str | None = Query(default=None),
status_filter: str | None = Query(default=None, alias="status"),
difficulty: str | None = Query(default=None),
question_type: str | None = Query(default=None),
tag: str | None = Query(default=None),
_: CurrentUser = Depends(require_any_permission("question_bank.read", "question_bank.manage")),
db: Session = Depends(get_db),
) -> QuestionBankListResponse:
return list_questions(
db,
keyword=keyword,
status_filter=status_filter,
difficulty=difficulty,
question_type=question_type,
tag=tag,
)
@router.get("/tags", response_model=QuestionTagListResponse)
def get_question_tag_list(
keyword: str | None = Query(default=None),
_: CurrentUser = Depends(require_any_permission("question_bank.read", "question_bank.manage")),
db: Session = Depends(get_db),
) -> QuestionTagListResponse:
return list_question_tags(db, keyword=keyword)
@router.patch("/tags/rename", response_model=QuestionTagMutationResponse)
def rename_question_tag_endpoint(
payload: QuestionTagRenameRequest,
_: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> QuestionTagMutationResponse:
return rename_question_tag(db, payload)
@router.api_route("/tags", methods=["DELETE"], response_model=QuestionTagMutationResponse)
def delete_question_tag_endpoint(
payload: QuestionTagDeleteRequest,
_: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> QuestionTagMutationResponse:
return delete_question_tag(db, payload)
@router.post("", response_model=QuestionBankSummary)
def create_question_endpoint(
payload: QuestionBankCreateRequest,
current_user: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> QuestionBankSummary:
return create_question(db, payload, actor_user_id=current_user.user.id)
@router.get("/{question_id}", response_model=QuestionBankSummary)
def get_question_detail(
question_id: int,
_: CurrentUser = Depends(require_any_permission("question_bank.read", "question_bank.manage")),
db: Session = Depends(get_db),
) -> QuestionBankSummary:
item = get_question_by_id(db, question_id)
if not item:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Question not found")
return serialize_question(item)
@router.patch("/{question_id}", response_model=QuestionBankSummary)
def update_question_endpoint(
question_id: int,
payload: QuestionBankUpdateRequest,
current_user: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> QuestionBankSummary:
updated = update_question(db, question_id, payload, actor_user_id=current_user.user.id)
if not updated:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Question not found")
return updated
@router.delete("/{question_id}")
def delete_question_endpoint(
question_id: int,
_: CurrentUser = Depends(require_permission("question_bank.manage")),
db: Session = Depends(get_db),
) -> dict[str, bool]:
deleted = delete_question(db, question_id)
if not deleted:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Question not found")
return {"success": True}
-2
View File
@@ -384,14 +384,12 @@ def init_db() -> None:
elevation,
file_storage,
fl_analysis,
hot_search,
lightning_event,
lightning_sample,
line,
line_tower,
menu,
object_group,
question_bank,
rbac,
system_param,
tower_model,
+5 -3
View File
@@ -4,25 +4,27 @@ Import all model modules during package initialization so SQLAlchemy can
resolve string-based relationships regardless of route/service import order.
"""
from . import atp_model, audit_log, auth_session, elevation, file_storage, fl_analysis, hot_search, lightning_event, lightning_sample, line, line_tower, menu, object_group, question_bank, rbac, system_param, tower_model, tower_profile, user, worker_registry
from . import atp_model, audit_log, auth_session, calendar_event, elevation, file_storage, fl_analysis, lightning_event, lightning_sample, line, line_tower, menu, model_registry, object_group, rbac, requirement, system_param, todo, tower_model, tower_profile, user, worker_registry
__all__ = [
"atp_model",
"audit_log",
"auth_session",
"calendar_event",
"elevation",
"file_storage",
"fl_analysis",
"hot_search",
"lightning_event",
"lightning_sample",
"line",
"line_tower",
"menu",
"model_registry",
"object_group",
"question_bank",
"rbac",
"requirement",
"system_param",
"todo",
"tower_model",
"tower_profile",
"user",
-94
View File
@@ -1,94 +0,0 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any
from sqlalchemy import JSON, Boolean, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from ..core.database import Base
from .base import utcnow
if TYPE_CHECKING:
from .user import User
class HotSearchRecord(Base):
__tablename__ = "hot_search_records"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
source: Mapped[str] = mapped_column(String(32), default="TOUTIAO", index=True)
external_id: Mapped[str | None] = mapped_column(String(128), default=None, index=True)
title: Mapped[str] = mapped_column(String(512), index=True)
url: Mapped[str | None] = mapped_column(Text(), default=None)
hot_value: Mapped[str | None] = mapped_column(String(128), default=None)
rank_index: Mapped[int | None] = mapped_column(Integer, default=None, index=True)
crawl_time: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow, index=True)
batch_no: Mapped[str | None] = mapped_column(String(64), default=None, index=True)
detail_markdown: Mapped[str | None] = mapped_column(Text(), default=None)
extra_json: Mapped[dict[str, Any] | None] = mapped_column(JSON, default=None)
matched_topics_json: Mapped[list[str] | None] = mapped_column(JSON, default=None)
creator_user_id: Mapped[str | None] = mapped_column(
String(36),
ForeignKey("users.user_id", ondelete="SET NULL"),
index=True,
)
updater_user_id: Mapped[str | None] = mapped_column(
String(36),
ForeignKey("users.user_id", ondelete="SET NULL"),
index=True,
)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=utcnow,
onupdate=utcnow,
)
creator: Mapped[User | None] = relationship(
"User",
foreign_keys=[creator_user_id],
lazy="selectin",
)
updater: Mapped[User | None] = relationship(
"User",
foreign_keys=[updater_user_id],
lazy="selectin",
)
class HotSearchFollowTopic(Base):
__tablename__ = "hot_search_follow_topics"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
topic_name: Mapped[str] = mapped_column(String(128), unique=True, index=True)
keywords: Mapped[str | None] = mapped_column(Text(), default=None)
enabled: Mapped[bool] = mapped_column(Boolean, default=True, index=True)
seq: Mapped[int] = mapped_column(Integer, default=0, index=True)
creator_user_id: Mapped[str | None] = mapped_column(
String(36),
ForeignKey("users.user_id", ondelete="SET NULL"),
index=True,
)
updater_user_id: Mapped[str | None] = mapped_column(
String(36),
ForeignKey("users.user_id", ondelete="SET NULL"),
index=True,
)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=utcnow,
onupdate=utcnow,
)
creator: Mapped[User | None] = relationship(
"User",
foreign_keys=[creator_user_id],
lazy="selectin",
)
updater: Mapped[User | None] = relationship(
"User",
foreign_keys=[updater_user_id],
lazy="selectin",
)
-54
View File
@@ -1,54 +0,0 @@
from __future__ import annotations
from datetime import datetime
from typing import TYPE_CHECKING, Any
from sqlalchemy import JSON, DateTime, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from ..core.database import Base
from .base import utcnow
if TYPE_CHECKING:
from .user import User
class QuestionBank(Base):
__tablename__ = "question_bank"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
question_type: Mapped[str] = mapped_column(String(32), default="single_choice", index=True)
stem: Mapped[str] = mapped_column(Text())
options_json: Mapped[list[dict[str, Any]] | None] = mapped_column(JSON)
answer: Mapped[str] = mapped_column(Text())
analysis: Mapped[str | None] = mapped_column(Text(), default="")
difficulty: Mapped[str] = mapped_column(String(16), default="medium", index=True)
status: Mapped[str] = mapped_column(String(16), default="draft", index=True)
tags_json: Mapped[list[str] | None] = mapped_column(JSON)
creator_user_id: Mapped[str | None] = mapped_column(
String(36),
ForeignKey("users.user_id", ondelete="SET NULL"),
index=True,
)
updater_user_id: Mapped[str | None] = mapped_column(
String(36),
ForeignKey("users.user_id", ondelete="SET NULL"),
index=True,
)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=utcnow)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
default=utcnow,
onupdate=utcnow,
)
creator: Mapped[User | None] = relationship(
"User",
foreign_keys=[creator_user_id],
lazy="selectin",
)
updater: Mapped[User | None] = relationship(
"User",
foreign_keys=[updater_user_id],
lazy="selectin",
)
-70
View File
@@ -1,70 +0,0 @@
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel, Field
from .user import UserPublic
class HotSearchRecordSummary(BaseModel):
id: int
source: str
external_id: str | None = None
title: str
url: str | None = None
hot_value: str | None = None
rank_index: int | None = None
crawl_time: datetime
batch_no: str | None = None
detail_markdown: str | None = None
extra_json: dict | None = None
matched_topics: list[str] = Field(default_factory=list)
creator_user_id: str | None = None
updater_user_id: str | None = None
created_at: datetime
updated_at: datetime
creator: UserPublic | None = None
updater: UserPublic | None = None
class HotSearchListResponse(BaseModel):
items: list[HotSearchRecordSummary]
total: int
class HotSearchQueryRequest(BaseModel):
source: str | None = Field(default=None, max_length=32)
title_keyword: str | None = Field(default=None, max_length=255)
followed_only: bool = False
class HotSearchFollowTopicSummary(BaseModel):
id: int
topic_name: str
keywords: str | None = None
enabled: bool = True
seq: int = 0
created_at: datetime
updated_at: datetime
creator: UserPublic | None = None
updater: UserPublic | None = None
class HotSearchFollowTopicListResponse(BaseModel):
items: list[HotSearchFollowTopicSummary]
total: int
class HotSearchFollowTopicCreateRequest(BaseModel):
topic_name: str = Field(min_length=1, max_length=128)
keywords: str | None = Field(default=None, max_length=2000)
enabled: bool = True
seq: int = Field(default=0, ge=0, le=999999)
class HotSearchFollowTopicUpdateRequest(BaseModel):
topic_name: str | None = Field(default=None, min_length=1, max_length=128)
keywords: str | None = Field(default=None, max_length=2000)
enabled: bool | None = None
seq: int | None = Field(default=None, ge=0, le=999999)
-50
View File
@@ -1,50 +0,0 @@
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
from .question_bank import QuestionBankSummary
QuestionType = Literal["single_choice", "multiple_choice", "true_false", "short_answer"]
QuestionStatus = Literal["draft", "published", "archived"]
QuestionDifficulty = Literal["easy", "medium", "hard"]
class MdResolveOption(BaseModel):
key: str = Field(min_length=1, max_length=16)
content: str = Field(min_length=1, max_length=20000)
class MdResolveQuestionDraft(BaseModel):
question_type: QuestionType = "single_choice"
stem: str = Field(min_length=1, max_length=20000)
options_json: list[MdResolveOption] | None = None
answer: str = Field(min_length=1, max_length=20000)
analysis: str | None = Field(default=None, max_length=20000)
difficulty: QuestionDifficulty = "medium"
status: QuestionStatus = "draft"
tags_json: list[str] = Field(default_factory=list)
class MdResolveParseRequest(BaseModel):
markdown: str = Field(min_length=1, max_length=300000)
default_question_type: QuestionType = "single_choice"
default_difficulty: QuestionDifficulty = "medium"
default_status: QuestionStatus = "draft"
class MdResolveParseResponse(BaseModel):
items: list[MdResolveQuestionDraft]
total: int
warnings: list[str] = Field(default_factory=list)
class MdResolveImportRequest(BaseModel):
items: list[MdResolveQuestionDraft] = Field(min_length=1, max_length=500)
class MdResolveImportResponse(BaseModel):
created_count: int
items: list[QuestionBankSummary]
warnings: list[str] = Field(default_factory=list)
-85
View File
@@ -1,85 +0,0 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Literal
from pydantic import BaseModel, Field
from .user import UserPublic
QuestionType = Literal["single_choice", "multiple_choice", "true_false", "short_answer"]
QuestionStatus = Literal["draft", "published", "archived"]
QuestionDifficulty = Literal["easy", "medium", "hard"]
class QuestionOption(BaseModel):
key: str = Field(min_length=1, max_length=16)
content: str = Field(min_length=1, max_length=2000)
class QuestionBankSummary(BaseModel):
id: int
question_type: QuestionType
stem: str
options_json: list[dict[str, Any]] | None = None
answer: str
analysis: str | None = None
difficulty: QuestionDifficulty
status: QuestionStatus
tags_json: list[str] | None = None
creator_user_id: str | None = None
updater_user_id: str | None = None
created_at: datetime
updated_at: datetime
creator: UserPublic | None = None
updater: UserPublic | None = None
class QuestionBankListResponse(BaseModel):
items: list[QuestionBankSummary]
total: int
class QuestionBankCreateRequest(BaseModel):
question_type: QuestionType = "single_choice"
stem: str = Field(min_length=1, max_length=20000)
options_json: list[dict[str, Any]] | None = None
answer: str = Field(min_length=1, max_length=20000)
analysis: str | None = Field(default=None, max_length=20000)
difficulty: QuestionDifficulty = "medium"
status: QuestionStatus = "draft"
tags_json: list[str] | None = None
class QuestionBankUpdateRequest(BaseModel):
question_type: QuestionType | None = None
stem: str | None = Field(default=None, min_length=1, max_length=20000)
options_json: list[dict[str, Any]] | None = None
answer: str | None = Field(default=None, min_length=1, max_length=20000)
analysis: str | None = Field(default=None, max_length=20000)
difficulty: QuestionDifficulty | None = None
status: QuestionStatus | None = None
tags_json: list[str] | None = None
class QuestionTagSummary(BaseModel):
name: str
count: int
class QuestionTagListResponse(BaseModel):
items: list[QuestionTagSummary]
total: int
class QuestionTagRenameRequest(BaseModel):
old_tag: str = Field(min_length=1, max_length=128)
new_tag: str = Field(min_length=1, max_length=128)
class QuestionTagDeleteRequest(BaseModel):
tag: str = Field(min_length=1, max_length=128)
class QuestionTagMutationResponse(BaseModel):
affected_questions: int
+1
View File
@@ -53,6 +53,7 @@ REMOVED_MENU_CODES = {
"admin.baidu_pan",
"admin.tag",
"admin.knowledge_point_mgr",
"admin.question_bank",
"admin.cron_task_mgr",
"admin.todos",
"admin.job_mgr",
-359
View File
@@ -1,359 +0,0 @@
from __future__ import annotations
import asyncio
from datetime import datetime
from sqlalchemy import Select, func, or_, select
from sqlalchemy.orm import Session, selectinload
from ..models.hot_search import HotSearchFollowTopic, HotSearchRecord
from ..schemas.hot_search import (
HotSearchFollowTopicCreateRequest,
HotSearchFollowTopicListResponse,
HotSearchFollowTopicSummary,
HotSearchFollowTopicUpdateRequest,
HotSearchListResponse,
HotSearchQueryRequest,
HotSearchRecordSummary,
)
from .push_service import publish_topic
from .user_service import serialize_user
HOT_SEARCH_TOPIC = "admin.hot_search"
HOT_SEARCH_FOLLOW_TOPIC = "admin.hot_search.follow_topics"
def _record_stmt() -> Select[tuple[HotSearchRecord]]:
return select(HotSearchRecord).options(
selectinload(HotSearchRecord.creator),
selectinload(HotSearchRecord.updater),
)
def _topic_stmt() -> Select[tuple[HotSearchFollowTopic]]:
return select(HotSearchFollowTopic).options(
selectinload(HotSearchFollowTopic.creator),
selectinload(HotSearchFollowTopic.updater),
)
def _normalize_topic_name(value: str) -> str:
return value.strip()
def _normalize_keywords(value: str | None) -> list[str]:
if not value:
return []
cleaned = value.replace("", ",").replace("\n", ",")
parts = [part.strip().lower() for part in cleaned.split(",")]
return [part for part in parts if part]
def _extract_text_haystack(record: HotSearchRecord) -> str:
chunks: list[str] = []
for candidate in [record.title, record.detail_markdown, record.hot_value, record.url]:
if candidate:
chunks.append(candidate.lower())
return " ".join(chunks)
def _calc_matched_topics(record: HotSearchRecord, topics: list[HotSearchFollowTopic]) -> list[str]:
if not topics:
return []
haystack = _extract_text_haystack(record)
if not haystack:
return []
matched: list[str] = []
for topic in topics:
if topic.enabled is False:
continue
keywords = _normalize_keywords(topic.keywords)
if not keywords:
continue
if any(keyword in haystack for keyword in keywords):
matched.append(topic.topic_name)
return matched
def _to_record_summary(record: HotSearchRecord, matched_topics: list[str]) -> HotSearchRecordSummary:
return HotSearchRecordSummary(
id=record.id,
source=record.source,
external_id=record.external_id,
title=record.title,
url=record.url,
hot_value=record.hot_value,
rank_index=record.rank_index,
crawl_time=record.crawl_time,
batch_no=record.batch_no,
detail_markdown=record.detail_markdown,
extra_json=record.extra_json,
matched_topics=matched_topics,
creator_user_id=record.creator_user_id,
updater_user_id=record.updater_user_id,
created_at=record.created_at,
updated_at=record.updated_at,
creator=serialize_user(record.creator) if record.creator else None,
updater=serialize_user(record.updater) if record.updater else None,
)
def _to_topic_summary(topic: HotSearchFollowTopic) -> HotSearchFollowTopicSummary:
return HotSearchFollowTopicSummary(
id=topic.id,
topic_name=topic.topic_name,
keywords=topic.keywords,
enabled=topic.enabled,
seq=topic.seq,
created_at=topic.created_at,
updated_at=topic.updated_at,
creator=serialize_user(topic.creator) if topic.creator else None,
updater=serialize_user(topic.updater) if topic.updater else None,
)
def _seed_initial_hot_search_records(db: Session) -> None:
existing = db.scalar(select(func.count(HotSearchRecord.id))) or 0
if existing > 0:
return
now = datetime.now().replace(microsecond=0)
samples = [
HotSearchRecord(
source="TOUTIAO",
external_id="sample-1",
title="AI 模型价格再次下调,开发者关注推理成本",
hot_value="1960万",
rank_index=1,
url="https://example.com/hot-search/sample-1",
detail_markdown="## 事件摘要\n\n多家模型服务商下调 API 价格,企业正评估迁移窗口。",
batch_no="bootstrap",
crawl_time=now,
extra_json={"channel": "sample", "category": "ai"},
),
HotSearchRecord(
source="TOUTIAO",
external_id="sample-2",
title="多地中小学上线 AI 助教系统,作业讲评提效",
hot_value="1320万",
rank_index=2,
url="https://example.com/hot-search/sample-2",
detail_markdown="## 校园场景\n\nAI 助教用于错题归因与个性化讲解,教师反馈效率提升。",
batch_no="bootstrap",
crawl_time=now,
extra_json={"channel": "sample", "category": "education"},
),
HotSearchRecord(
source="TOUTIAO",
external_id="sample-3",
title="开源社区发布新一代推理框架,支持边缘部署",
hot_value="980万",
rank_index=3,
url="https://example.com/hot-search/sample-3",
detail_markdown="## 技术亮点\n\n新增量化与缓存优化,边缘设备延迟降低约 30%",
batch_no="bootstrap",
crawl_time=now,
extra_json={"channel": "sample", "category": "opensource"},
),
]
db.add_all(samples)
db.flush()
def _seed_initial_follow_topics(db: Session) -> None:
existing = db.scalar(select(func.count(HotSearchFollowTopic.id))) or 0
if existing > 0:
return
topics = [
HotSearchFollowTopic(topic_name="AI模型", keywords="ai,模型,推理,大模型", enabled=True, seq=10),
HotSearchFollowTopic(topic_name="教育场景", keywords="作业,学校,助教,教学", enabled=True, seq=20),
HotSearchFollowTopic(topic_name="开源技术", keywords="开源,框架,部署", enabled=True, seq=30),
]
db.add_all(topics)
db.flush()
def seed_hot_search_defaults(db: Session) -> None:
_seed_initial_hot_search_records(db)
_seed_initial_follow_topics(db)
def _build_search_stmt(payload: HotSearchQueryRequest) -> Select[tuple[HotSearchRecord]]:
stmt = _record_stmt()
if payload.source and payload.source.strip():
stmt = stmt.where(HotSearchRecord.source == payload.source.strip().upper())
if payload.title_keyword and payload.title_keyword.strip():
keyword = payload.title_keyword.strip()
like = f"%{keyword}%"
stmt = stmt.where(
or_(
HotSearchRecord.title.ilike(like),
HotSearchRecord.detail_markdown.ilike(like),
HotSearchRecord.hot_value.ilike(like),
)
)
return stmt
def list_hot_search_records(db: Session, payload: HotSearchQueryRequest) -> HotSearchListResponse:
topics = db.execute(
_topic_stmt().where(HotSearchFollowTopic.enabled.is_(True)).order_by(HotSearchFollowTopic.seq.asc(), HotSearchFollowTopic.id.asc())
).scalars().all()
stmt = _build_search_stmt(payload)
rows = db.execute(stmt.order_by(HotSearchRecord.crawl_time.desc(), HotSearchRecord.rank_index.asc().nullslast(), HotSearchRecord.id.desc())).scalars().all()
items: list[HotSearchRecordSummary] = []
for row in rows:
matched_topics = _calc_matched_topics(row, topics)
if payload.followed_only and not matched_topics:
continue
items.append(_to_record_summary(row, matched_topics))
return HotSearchListResponse(items=items, total=len(items))
def get_hot_search_record(db: Session, record_id: int) -> HotSearchRecordSummary | None:
record = db.execute(_record_stmt().where(HotSearchRecord.id == record_id)).scalar_one_or_none()
if not record:
return None
topics = db.execute(
_topic_stmt().where(HotSearchFollowTopic.enabled.is_(True)).order_by(HotSearchFollowTopic.seq.asc(), HotSearchFollowTopic.id.asc())
).scalars().all()
matched_topics = _calc_matched_topics(record, topics)
return _to_record_summary(record, matched_topics)
def list_hot_search_follow_topics(db: Session) -> HotSearchFollowTopicListResponse:
items = db.execute(_topic_stmt().order_by(HotSearchFollowTopic.seq.asc(), HotSearchFollowTopic.id.asc())).scalars().all()
return HotSearchFollowTopicListResponse(items=[_to_topic_summary(item) for item in items], total=len(items))
def _get_topic_by_name(db: Session, topic_name: str) -> HotSearchFollowTopic | None:
normalized = _normalize_topic_name(topic_name).lower()
return db.scalar(select(HotSearchFollowTopic).where(func.lower(HotSearchFollowTopic.topic_name) == normalized))
def create_hot_search_follow_topic(
db: Session,
payload: HotSearchFollowTopicCreateRequest,
*,
actor_user_id: str,
) -> HotSearchFollowTopicSummary | None:
topic_name = _normalize_topic_name(payload.topic_name)
if not topic_name:
return None
existed = _get_topic_by_name(db, topic_name)
if existed:
return None
item = HotSearchFollowTopic(
topic_name=topic_name,
keywords=(payload.keywords or "").strip() or None,
enabled=payload.enabled,
seq=payload.seq,
creator_user_id=actor_user_id,
updater_user_id=actor_user_id,
)
db.add(item)
db.commit()
saved = db.execute(_topic_stmt().where(HotSearchFollowTopic.id == item.id)).scalar_one_or_none()
if not saved:
return None
_fire_and_forget(
publish_topic(
HOT_SEARCH_FOLLOW_TOPIC,
name="hot_search.follow_topic.changed",
payload={"action": "created", "topic_id": saved.id, "topic_name": saved.topic_name},
requires_refetch=["/api/v1/admin/hot-search/follow-topics", "/api/v1/admin/hot-search/search"],
dedupe_key=f"hot-search:follow-topic:created:{saved.id}",
)
)
return _to_topic_summary(saved)
def update_hot_search_follow_topic(
db: Session,
topic_id: int,
payload: HotSearchFollowTopicUpdateRequest,
*,
actor_user_id: str,
) -> tuple[HotSearchFollowTopicSummary | None, str | None]:
item = db.execute(_topic_stmt().where(HotSearchFollowTopic.id == topic_id)).scalar_one_or_none()
if not item:
return None, "not_found"
update_data = payload.model_dump(exclude_unset=True)
if "topic_name" in update_data and update_data["topic_name"] is not None:
topic_name = _normalize_topic_name(str(update_data["topic_name"]))
if not topic_name:
return None, "invalid_topic_name"
existed = _get_topic_by_name(db, topic_name)
if existed and existed.id != item.id:
return None, "duplicate_topic_name"
item.topic_name = topic_name
if "keywords" in update_data:
item.keywords = (str(update_data["keywords"]) if update_data["keywords"] is not None else "").strip() or None
if "enabled" in update_data and update_data["enabled"] is not None:
item.enabled = bool(update_data["enabled"])
if "seq" in update_data and update_data["seq"] is not None:
item.seq = int(update_data["seq"])
item.updater_user_id = actor_user_id
db.commit()
saved = db.execute(_topic_stmt().where(HotSearchFollowTopic.id == topic_id)).scalar_one_or_none()
if not saved:
return None, "not_found"
_fire_and_forget(
publish_topic(
HOT_SEARCH_FOLLOW_TOPIC,
name="hot_search.follow_topic.changed",
payload={"action": "updated", "topic_id": saved.id, "topic_name": saved.topic_name},
requires_refetch=["/api/v1/admin/hot-search/follow-topics", "/api/v1/admin/hot-search/search"],
dedupe_key=f"hot-search:follow-topic:updated:{saved.id}",
)
)
return _to_topic_summary(saved), None
def delete_hot_search_follow_topic(db: Session, topic_id: int) -> bool:
item = db.execute(_topic_stmt().where(HotSearchFollowTopic.id == topic_id)).scalar_one_or_none()
if not item:
return False
deleted_id = item.id
db.delete(item)
db.commit()
_fire_and_forget(
publish_topic(
HOT_SEARCH_FOLLOW_TOPIC,
name="hot_search.follow_topic.changed",
payload={"action": "deleted", "topic_id": deleted_id},
requires_refetch=["/api/v1/admin/hot-search/follow-topics", "/api/v1/admin/hot-search/search"],
dedupe_key=f"hot-search:follow-topic:deleted:{deleted_id}",
)
)
return True
def _fire_and_forget(coro: object) -> None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
loop.create_task(coro)
-2
View File
@@ -37,8 +37,6 @@ DEFAULT_ADMIN_PERMISSION_CODES: set[str] = {
"celery.manage",
"wine.read",
"wine.manage",
"question_bank.read",
"question_bank.manage",
}
ADMIN_ROLE_IDS = {
-336
View File
@@ -1,336 +0,0 @@
from __future__ import annotations
import asyncio
import re
from dataclasses import dataclass
from sqlalchemy.orm import Session
from ..schemas.mdresolve import (
MdResolveImportRequest,
MdResolveImportResponse,
MdResolveOption,
MdResolveParseRequest,
MdResolveParseResponse,
MdResolveQuestionDraft,
)
from ..schemas.question_bank import QuestionBankCreateRequest, QuestionBankSummary
from .push_service import publish_topic
from .question_bank_service import create_question
MDRESOLVE_TOPIC = "admin.question_bank"
@dataclass
class _ParseContext:
default_question_type: str
default_difficulty: str
default_status: str
warnings: list[str]
def parse_markdown_to_drafts(payload: MdResolveParseRequest) -> MdResolveParseResponse:
lines = payload.markdown.splitlines()
blocks = _split_blocks(lines)
warnings: list[str] = []
ctx = _ParseContext(
default_question_type=payload.default_question_type,
default_difficulty=payload.default_difficulty,
default_status=payload.default_status,
warnings=warnings,
)
items: list[MdResolveQuestionDraft] = []
for index, block in enumerate(blocks, start=1):
draft = _parse_block(block, index=index, ctx=ctx)
if draft:
items.append(draft)
return MdResolveParseResponse(items=items, total=len(items), warnings=warnings)
def import_drafts_to_question_bank(
db: Session,
payload: MdResolveImportRequest,
*,
actor_user_id: str,
) -> MdResolveImportResponse:
warnings: list[str] = []
created: list[QuestionBankSummary] = []
for index, item in enumerate(payload.items, start=1):
tags = _normalize_tags(item.tags_json)
create_payload = QuestionBankCreateRequest(
question_type=item.question_type,
stem=item.stem.strip(),
options_json=[opt.model_dump() for opt in item.options_json] if item.options_json else None,
answer=item.answer.strip(),
analysis=(item.analysis or "").strip() or None,
difficulty=item.difficulty,
status=item.status,
tags_json=tags,
)
try:
saved = create_question(db, create_payload, actor_user_id=actor_user_id)
created.append(saved)
except Exception as ex:
warnings.append(f"{index} 条导入失败:{ex}")
if created:
_fire_and_forget(
publish_topic(
MDRESOLVE_TOPIC,
name="mdresolve.imported",
payload={"action": "batch_import", "created_count": len(created)},
requires_refetch=["/api/v1/admin/question-bank"],
dedupe_key=f"mdresolve:import:{actor_user_id}:{len(created)}",
)
)
return MdResolveImportResponse(created_count=len(created), items=created, warnings=warnings)
def _split_blocks(lines: list[str]) -> list[list[str]]:
blocks: list[list[str]] = []
current: list[str] = []
def flush() -> None:
nonlocal current
if current:
blocks.append(current)
current = []
for raw in lines:
line = raw.rstrip()
if re.match(r"^\s*(#+\s*)?(第?\s*\d+\s*[、..))]\s*)?题\b", line):
flush()
current = [line]
continue
if re.match(r"^\s*(\d+[、..))])\s+", line) and current:
flush()
current = [line]
continue
if not current and not line.strip():
continue
current.append(line)
flush()
return blocks
def _parse_block(block: list[str], *, index: int, ctx: _ParseContext) -> MdResolveQuestionDraft | None:
text_lines = [line.strip() for line in block if line.strip()]
if not text_lines:
return None
stem = ""
answer = ""
analysis = ""
options: list[MdResolveOption] = []
tags: list[str] = []
question_type = ctx.default_question_type
difficulty = ctx.default_difficulty
status = ctx.default_status
option_started = False
for i, line in enumerate(text_lines):
key, value = _split_kv(line)
if key in {"题干", "问题", "题目", "stem", "question"}:
stem = value
continue
if key in {"答案", "answer", "正确答案"}:
answer = value
continue
if key in {"解析", "analysis", "说明"}:
analysis = value
continue
if key in {"标签", "tags", "tag"}:
tags = _normalize_tags(re.split(r"[,;\s]+", value))
continue
if key in {"难度", "difficulty"}:
difficulty = _normalize_difficulty(value, default=ctx.default_difficulty)
continue
if key in {"状态", "status"}:
status = _normalize_status(value, default=ctx.default_status)
continue
if key in {"题型", "type", "question_type"}:
question_type = _normalize_question_type(value, default=ctx.default_question_type)
continue
option = _parse_option_line(line)
if option:
options.append(option)
option_started = True
continue
if not stem:
stem = _strip_question_prefix(line)
continue
if option_started and not answer and i == len(text_lines) - 1:
# 常见格式:最后一行直接写答案字母
normalized = _normalize_answer_token(line)
if normalized:
answer = normalized
continue
if analysis:
analysis = f"{analysis}\n{line}" if analysis else line
if not stem:
ctx.warnings.append(f"{index} 题缺少题干,已跳过")
return None
if not answer:
inferred = _infer_answer_from_stem(stem)
if inferred:
answer = inferred
else:
ctx.warnings.append(f"{index} 题缺少答案,已跳过")
return None
if question_type in {"single_choice", "multiple_choice"} and not options:
ctx.warnings.append(f"{index} 题未解析到选项,已降级为简答题")
question_type = "short_answer"
return MdResolveQuestionDraft(
question_type=question_type,
stem=stem,
options_json=options or None,
answer=answer,
analysis=analysis or None,
difficulty=difficulty,
status=status,
tags_json=tags,
)
def _split_kv(line: str) -> tuple[str, str]:
for sep in [":", ""]:
if sep in line:
left, right = line.split(sep, 1)
key = left.strip().lower()
return key, right.strip()
return "", line.strip()
def _parse_option_line(line: str) -> MdResolveOption | None:
m = re.match(r"^\s*([A-Ha-h])[\.、:\)]\s*(.+)$", line)
if m:
return MdResolveOption(key=m.group(1).upper(), content=m.group(2).strip())
m2 = re.match(r"^\s*[-*]\s*([A-Ha-h])\s*[\.、:\)]\s*(.+)$", line)
if m2:
return MdResolveOption(key=m2.group(1).upper(), content=m2.group(2).strip())
return None
def _strip_question_prefix(line: str) -> str:
line = re.sub(r"^\s*(#+\s*)?", "", line)
line = re.sub(r"^\s*(第?\s*\d+\s*[、..))])\s*", "", line)
line = re.sub(r"^\s*题\s*[:]?\s*", "", line)
return line.strip()
def _normalize_answer_token(raw: str) -> str:
value = raw.strip().upper()
value = value.replace("答案", "").replace(":", "").replace("", "").strip()
if re.fullmatch(r"[A-H](\s*[,/\s]\s*[A-H]){0,7}", value):
values = re.split(r"[,/\s]+", value)
values = [v for v in values if v]
return ",".join(values)
return ""
def _infer_answer_from_stem(stem: str) -> str:
match = re.search(r"?答案[:]\s*([A-Ha-h](?:\s*[,/\s]\s*[A-Ha-h])*)", stem)
if not match:
return ""
return _normalize_answer_token(match.group(1))
def _normalize_question_type(raw: str, *, default: str) -> str:
value = raw.strip().lower()
mapping = {
"单选": "single_choice",
"单选题": "single_choice",
"single": "single_choice",
"single_choice": "single_choice",
"多选": "multiple_choice",
"多选题": "multiple_choice",
"multiple": "multiple_choice",
"multiple_choice": "multiple_choice",
"判断": "true_false",
"判断题": "true_false",
"true_false": "true_false",
"简答": "short_answer",
"简答题": "short_answer",
"short_answer": "short_answer",
}
return mapping.get(value, default)
def _normalize_difficulty(raw: str, *, default: str) -> str:
value = raw.strip().lower()
mapping = {
"easy": "easy",
"简单": "easy",
"medium": "medium",
"": "medium",
"中等": "medium",
"hard": "hard",
"困难": "hard",
"": "hard",
}
return mapping.get(value, default)
def _normalize_status(raw: str, *, default: str) -> str:
value = raw.strip().lower()
mapping = {
"draft": "draft",
"草稿": "draft",
"published": "published",
"发布": "published",
"已发布": "published",
"archived": "archived",
"归档": "archived",
"已归档": "archived",
}
return mapping.get(value, default)
def _normalize_tags(tags: list[str] | None) -> list[str]:
if not tags:
return []
dedup: list[str] = []
seen = set()
for tag in tags:
value = str(tag).strip()
if not value or value in seen:
continue
seen.add(value)
dedup.append(value)
return dedup
def _fire_and_forget(coro: object) -> None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
loop.create_task(coro)
-359
View File
@@ -1,359 +0,0 @@
from __future__ import annotations
import asyncio
from sqlalchemy import func, or_, select
from sqlalchemy.orm import Session, selectinload
from ..models.question_bank import QuestionBank
from ..schemas.question_bank import (
QuestionBankCreateRequest,
QuestionBankListResponse,
QuestionBankSummary,
QuestionBankUpdateRequest,
QuestionTagDeleteRequest,
QuestionTagListResponse,
QuestionTagMutationResponse,
QuestionTagRenameRequest,
QuestionTagSummary,
)
from .push_service import publish_topic
from .user_service import serialize_user
QUESTION_BANK_TOPIC = "admin.question_bank"
def _question_bank_stmt():
return select(QuestionBank).options(
selectinload(QuestionBank.creator),
selectinload(QuestionBank.updater),
)
def _normalize_tag(tag: str) -> str:
return str(tag).strip()
def _normalize_tags(tags: list[str] | None) -> list[str]:
if not tags:
return []
dedup: list[str] = []
seen = set()
for tag in tags:
normalized = _normalize_tag(tag)
if not normalized or normalized in seen:
continue
seen.add(normalized)
dedup.append(normalized)
return dedup
def serialize_question(item: QuestionBank) -> QuestionBankSummary:
return QuestionBankSummary(
id=item.id,
question_type=item.question_type,
stem=item.stem,
options_json=item.options_json,
answer=item.answer,
analysis=item.analysis,
difficulty=item.difficulty,
status=item.status,
tags_json=item.tags_json,
creator_user_id=item.creator_user_id,
updater_user_id=item.updater_user_id,
created_at=item.created_at,
updated_at=item.updated_at,
creator=serialize_user(item.creator) if item.creator else None,
updater=serialize_user(item.updater) if item.updater else None,
)
def list_questions(
db: Session,
*,
keyword: str | None,
status_filter: str | None,
difficulty: str | None,
question_type: str | None,
tag: str | None,
) -> QuestionBankListResponse:
stmt = _question_bank_stmt()
normalized_keyword = (keyword or "").strip()
if normalized_keyword:
like = f"%{normalized_keyword}%"
stmt = stmt.where(
or_(
QuestionBank.stem.ilike(like),
QuestionBank.answer.ilike(like),
QuestionBank.analysis.ilike(like),
)
)
if status_filter in {"draft", "published", "archived"}:
stmt = stmt.where(QuestionBank.status == status_filter)
if difficulty in {"easy", "medium", "hard"}:
stmt = stmt.where(QuestionBank.difficulty == difficulty)
if question_type in {"single_choice", "multiple_choice", "true_false", "short_answer"}:
stmt = stmt.where(QuestionBank.question_type == question_type)
normalized_tag = (tag or "").strip()
if normalized_tag:
stmt = stmt.where(QuestionBank.tags_json.contains([normalized_tag]))
total_stmt = select(func.count()).select_from(QuestionBank)
if normalized_keyword:
like = f"%{normalized_keyword}%"
total_stmt = total_stmt.where(
or_(
QuestionBank.stem.ilike(like),
QuestionBank.answer.ilike(like),
QuestionBank.analysis.ilike(like),
)
)
if status_filter in {"draft", "published", "archived"}:
total_stmt = total_stmt.where(QuestionBank.status == status_filter)
if difficulty in {"easy", "medium", "hard"}:
total_stmt = total_stmt.where(QuestionBank.difficulty == difficulty)
if question_type in {"single_choice", "multiple_choice", "true_false", "short_answer"}:
total_stmt = total_stmt.where(QuestionBank.question_type == question_type)
if normalized_tag:
total_stmt = total_stmt.where(QuestionBank.tags_json.contains([normalized_tag]))
total = db.scalar(total_stmt) or 0
items = db.execute(stmt.order_by(QuestionBank.updated_at.desc(), QuestionBank.id.desc())).scalars().all()
return QuestionBankListResponse(items=[serialize_question(item) for item in items], total=total)
def list_question_tags(db: Session, *, keyword: str | None) -> QuestionTagListResponse:
rows = db.execute(select(QuestionBank.tags_json).where(QuestionBank.tags_json.is_not(None))).scalars().all()
counters: dict[str, int] = {}
for row in rows:
if not isinstance(row, list):
continue
tags = _normalize_tags([str(tag) for tag in row])
for name in tags:
counters[name] = counters.get(name, 0) + 1
normalized_keyword = (keyword or "").strip().lower()
items = [
QuestionTagSummary(name=name, count=count)
for name, count in counters.items()
if not normalized_keyword or normalized_keyword in name.lower()
]
items.sort(key=lambda item: (-item.count, item.name))
return QuestionTagListResponse(items=items, total=len(items))
def rename_question_tag(
db: Session,
payload: QuestionTagRenameRequest,
) -> QuestionTagMutationResponse:
old_tag = _normalize_tag(payload.old_tag)
new_tag = _normalize_tag(payload.new_tag)
if not old_tag or not new_tag or old_tag == new_tag:
return QuestionTagMutationResponse(affected_questions=0)
questions = db.execute(select(QuestionBank).where(QuestionBank.tags_json.is_not(None))).scalars().all()
affected = 0
for question in questions:
if not isinstance(question.tags_json, list):
continue
tags = _normalize_tags([str(tag) for tag in question.tags_json])
if old_tag not in tags:
continue
replaced: list[str] = []
seen = set()
for tag in tags:
candidate = new_tag if tag == old_tag else tag
if not candidate or candidate in seen:
continue
seen.add(candidate)
replaced.append(candidate)
question.tags_json = replaced
affected += 1
if affected <= 0:
return QuestionTagMutationResponse(affected_questions=0)
db.commit()
_fire_and_forget(
publish_topic(
QUESTION_BANK_TOPIC,
name="question_bank.tags_changed",
payload={"action": "renamed", "old_tag": old_tag, "new_tag": new_tag, "affected_questions": affected},
requires_refetch=["/api/v1/admin/question-bank", "/api/v1/admin/question-bank/tags"],
dedupe_key=f"question-bank:tag-renamed:{old_tag}:{new_tag}",
)
)
return QuestionTagMutationResponse(affected_questions=affected)
def delete_question_tag(
db: Session,
payload: QuestionTagDeleteRequest,
) -> QuestionTagMutationResponse:
target_tag = _normalize_tag(payload.tag)
if not target_tag:
return QuestionTagMutationResponse(affected_questions=0)
questions = db.execute(select(QuestionBank).where(QuestionBank.tags_json.is_not(None))).scalars().all()
affected = 0
for question in questions:
if not isinstance(question.tags_json, list):
continue
tags = _normalize_tags([str(tag) for tag in question.tags_json])
if target_tag not in tags:
continue
question.tags_json = [tag for tag in tags if tag != target_tag]
affected += 1
if affected <= 0:
return QuestionTagMutationResponse(affected_questions=0)
db.commit()
_fire_and_forget(
publish_topic(
QUESTION_BANK_TOPIC,
name="question_bank.tags_changed",
payload={"action": "deleted", "tag": target_tag, "affected_questions": affected},
requires_refetch=["/api/v1/admin/question-bank", "/api/v1/admin/question-bank/tags"],
dedupe_key=f"question-bank:tag-deleted:{target_tag}",
)
)
return QuestionTagMutationResponse(affected_questions=affected)
def get_question_by_id(db: Session, question_id: int) -> QuestionBank | None:
return db.execute(_question_bank_stmt().where(QuestionBank.id == question_id)).scalar_one_or_none()
def create_question(
db: Session,
payload: QuestionBankCreateRequest,
*,
actor_user_id: str,
) -> QuestionBankSummary:
item = QuestionBank(
question_type=payload.question_type,
stem=payload.stem.strip(),
options_json=payload.options_json,
answer=payload.answer.strip(),
analysis=(payload.analysis or "").strip(),
difficulty=payload.difficulty,
status=payload.status,
tags_json=_normalize_tags(payload.tags_json),
creator_user_id=actor_user_id,
updater_user_id=actor_user_id,
)
db.add(item)
db.commit()
saved = get_question_by_id(db, item.id)
if not saved:
raise RuntimeError("Question create succeeded but reload failed")
_fire_and_forget(
publish_topic(
QUESTION_BANK_TOPIC,
name="question_bank.changed",
payload={"action": "created", "question_id": saved.id},
requires_refetch=["/api/v1/admin/question-bank", "/api/v1/admin/question-bank/tags"],
dedupe_key=f"question-bank:created:{saved.id}",
)
)
return serialize_question(saved)
def update_question(
db: Session,
question_id: int,
payload: QuestionBankUpdateRequest,
*,
actor_user_id: str,
) -> QuestionBankSummary | None:
item = get_question_by_id(db, question_id)
if not item:
return None
update_data = payload.model_dump(exclude_unset=True)
if "question_type" in update_data and update_data["question_type"] is not None:
item.question_type = update_data["question_type"]
if "stem" in update_data and update_data["stem"] is not None:
item.stem = str(update_data["stem"]).strip()
if "options_json" in update_data:
item.options_json = update_data["options_json"]
if "answer" in update_data and update_data["answer"] is not None:
item.answer = str(update_data["answer"]).strip()
if "analysis" in update_data:
item.analysis = (str(update_data["analysis"]) if update_data["analysis"] is not None else "").strip()
if "difficulty" in update_data and update_data["difficulty"] is not None:
item.difficulty = update_data["difficulty"]
if "status" in update_data and update_data["status"] is not None:
item.status = update_data["status"]
if "tags_json" in update_data:
item.tags_json = _normalize_tags(update_data["tags_json"])
item.updater_user_id = actor_user_id
db.commit()
saved = get_question_by_id(db, question_id)
if not saved:
return None
_fire_and_forget(
publish_topic(
QUESTION_BANK_TOPIC,
name="question_bank.changed",
payload={"action": "updated", "question_id": saved.id},
requires_refetch=[
"/api/v1/admin/question-bank",
"/api/v1/admin/question-bank/tags",
f"/api/v1/admin/question-bank/{saved.id}",
],
dedupe_key=f"question-bank:updated:{saved.id}",
)
)
return serialize_question(saved)
def delete_question(db: Session, question_id: int) -> bool:
item = get_question_by_id(db, question_id)
if not item:
return False
deleted_id = item.id
db.delete(item)
db.commit()
_fire_and_forget(
publish_topic(
QUESTION_BANK_TOPIC,
name="question_bank.changed",
payload={"action": "deleted", "question_id": deleted_id},
requires_refetch=["/api/v1/admin/question-bank", "/api/v1/admin/question-bank/tags"],
dedupe_key=f"question-bank:deleted:{deleted_id}",
)
)
return True
def _fire_and_forget(coro: object) -> None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
return
loop.create_task(coro)
-4
View File
@@ -93,8 +93,6 @@ DEFAULT_PERMISSIONS: dict[str, str] = {
"system_param.manage": "Manage system parameters",
"file.read": "Read file mounts and indexed entries",
"file.manage": "Manage file operations and storage sync",
"question_bank.read": "Read question bank entries",
"question_bank.manage": "Manage question bank entries",
"line.read": "Read power lines",
"line.manage": "Manage power lines",
"tower.read": "Read line towers",
@@ -129,8 +127,6 @@ DEFAULT_ROLES: dict[str, dict[str, object]] = {
"system_param.manage",
"file.read",
"file.manage",
"question_bank.read",
"question_bank.manage",
"line.read",
"line.manage",
"tower.read",
-1
View File
@@ -26,7 +26,6 @@ TOPIC_RULES: dict[str, TopicRule] = {
"admin.elevation": TopicRule(any_permission_codes={"elevation.read", "elevation.manage"}),
"admin.atp-models": TopicRule(any_permission_codes={"atp.read", "atp.run", "atp.manage"}),
"admin.audit_logs": TopicRule(any_permission_codes={"menu.read", "menu.manage"}),
"admin.question_bank": TopicRule(any_permission_codes={"question_bank.read", "question_bank.manage"}),
}