完成高程数据集优化并支持多文件导入自动分析
Co-authored-by: multica-agent <github@multica.ai>
This commit is contained in:
@@ -12,6 +12,7 @@ from ...schemas.elevation import (
|
||||
ElevationApplyJobSummary,
|
||||
ElevationDatasetAnalyzeResponse,
|
||||
ElevationDatasetBatchImportResponse,
|
||||
ElevationDatasetDataImportResponse,
|
||||
ElevationDatasetCreateRequest,
|
||||
ElevationDatasetListResponse,
|
||||
ElevationDatasetPreviewResponse,
|
||||
@@ -24,6 +25,7 @@ from ...services.elevation_service import (
|
||||
create_dataset,
|
||||
delete_dataset,
|
||||
get_job_by_id,
|
||||
import_dataset_data_files,
|
||||
import_datasets_from_csv,
|
||||
list_datasets,
|
||||
list_jobs,
|
||||
@@ -74,6 +76,21 @@ def import_elevation_datasets(
|
||||
)
|
||||
|
||||
|
||||
@router.post("/datasets/{dataset_id}/data/import", response_model=ElevationDatasetDataImportResponse)
|
||||
def import_elevation_dataset_data(
|
||||
dataset_id: str,
|
||||
files: list[UploadFile] = File(...),
|
||||
current_user: CurrentUser = Depends(require_permission("elevation.manage")),
|
||||
db: Session = Depends(get_db),
|
||||
) -> ElevationDatasetDataImportResponse:
|
||||
return import_dataset_data_files(
|
||||
db,
|
||||
dataset_id=dataset_id,
|
||||
files=files,
|
||||
actor=current_user.user,
|
||||
)
|
||||
|
||||
|
||||
@router.patch("/datasets/{dataset_id}", response_model=ElevationDatasetSummary)
|
||||
def update_elevation_dataset(
|
||||
dataset_id: str,
|
||||
|
||||
@@ -184,6 +184,53 @@ def _ensure_user_audit_column_compatibility() -> None:
|
||||
)
|
||||
|
||||
|
||||
def _ensure_elevation_dataset_column_compatibility() -> None:
|
||||
"""
|
||||
Keep `elevation_dataset` columns aligned with the current ORM mapping.
|
||||
"""
|
||||
if not database_url.startswith("postgresql"):
|
||||
return
|
||||
|
||||
schema = settings.resolved_db_schema
|
||||
with engine.begin() as connection:
|
||||
db_inspector = inspect(connection)
|
||||
if not db_inspector.has_table("elevation_dataset", schema=schema):
|
||||
return
|
||||
|
||||
column_names = {
|
||||
column["name"]
|
||||
for column in db_inspector.get_columns("elevation_dataset", schema=schema)
|
||||
}
|
||||
|
||||
if "dataset_dir" not in column_names:
|
||||
connection.execute(
|
||||
text("ALTER TABLE elevation_dataset ADD COLUMN IF NOT EXISTS dataset_dir VARCHAR(2048)"),
|
||||
)
|
||||
connection.execute(
|
||||
text("UPDATE elevation_dataset SET dataset_dir = '/elevation/datasets/' || code WHERE dataset_dir IS NULL"),
|
||||
)
|
||||
connection.execute(
|
||||
text("ALTER TABLE elevation_dataset ALTER COLUMN dataset_dir SET NOT NULL"),
|
||||
)
|
||||
logger.warning(
|
||||
"Detected missing elevation_dataset.dataset_dir; added and backfilled from dataset code.",
|
||||
)
|
||||
|
||||
if "usage_status" not in column_names:
|
||||
connection.execute(
|
||||
text("ALTER TABLE elevation_dataset ADD COLUMN IF NOT EXISTS usage_status VARCHAR(32)"),
|
||||
)
|
||||
connection.execute(
|
||||
text("UPDATE elevation_dataset SET usage_status = 'idle' WHERE usage_status IS NULL"),
|
||||
)
|
||||
connection.execute(
|
||||
text("ALTER TABLE elevation_dataset ALTER COLUMN usage_status SET NOT NULL"),
|
||||
)
|
||||
logger.warning(
|
||||
"Detected missing elevation_dataset.usage_status; added with default 'idle'.",
|
||||
)
|
||||
|
||||
|
||||
def get_db() -> Generator[Session, None, None]:
|
||||
db = SessionLocal()
|
||||
try:
|
||||
@@ -222,6 +269,7 @@ def init_db() -> None:
|
||||
_ensure_user_pk_column_compatibility()
|
||||
_ensure_user_timestamp_column_compatibility()
|
||||
_ensure_user_audit_column_compatibility()
|
||||
_ensure_elevation_dataset_column_compatibility()
|
||||
Base.metadata.create_all(bind=engine)
|
||||
with SessionLocal() as db:
|
||||
local_hosts = {"db", "localhost", "127.0.0.1", "::1"}
|
||||
|
||||
@@ -18,6 +18,7 @@ class ElevationDataset(Base):
|
||||
__tablename__ = "elevation_dataset"
|
||||
__table_args__ = (
|
||||
Index("idx_elevation_dataset_status", "status"),
|
||||
Index("idx_elevation_dataset_usage_status", "usage_status"),
|
||||
Index("idx_elevation_dataset_mount_code", "mount_code"),
|
||||
)
|
||||
|
||||
@@ -31,9 +32,11 @@ class ElevationDataset(Base):
|
||||
source: Mapped[str | None] = mapped_column(String(128), index=True)
|
||||
file_format: Mapped[str] = mapped_column(String(32), default="csv", index=True)
|
||||
mount_code: Mapped[str] = mapped_column(String(64), nullable=False, index=True)
|
||||
dataset_dir: Mapped[str] = mapped_column(String(2048), nullable=False)
|
||||
file_path: Mapped[str] = mapped_column(String(2048), nullable=False)
|
||||
resolution_m: Mapped[float | None] = mapped_column(Float)
|
||||
status: Mapped[str] = mapped_column(String(32), default="active", index=True)
|
||||
usage_status: Mapped[str] = mapped_column(String(32), default="idle", index=True)
|
||||
sample_count: Mapped[int] = mapped_column(Integer, default=0)
|
||||
bbox_min_lon: Mapped[float | None] = mapped_column(Float)
|
||||
bbox_max_lon: Mapped[float | None] = mapped_column(Float)
|
||||
|
||||
@@ -7,6 +7,7 @@ from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
ElevationDatasetStatus = Literal["active", "disabled"]
|
||||
ElevationDatasetUsageStatus = Literal["idle", "in_use"]
|
||||
ElevationApplyMode = Literal["fill_null_only", "overwrite_all"]
|
||||
ElevationApplyJobStatus = Literal["pending", "running", "success", "failed"]
|
||||
|
||||
@@ -18,9 +19,11 @@ class ElevationDatasetSummary(BaseModel):
|
||||
source: str | None = None
|
||||
file_format: str
|
||||
mount_code: str
|
||||
dataset_dir: str
|
||||
file_path: str
|
||||
resolution_m: float | None = None
|
||||
status: ElevationDatasetStatus
|
||||
usage_status: ElevationDatasetUsageStatus
|
||||
sample_count: int = 0
|
||||
bbox_min_lon: float | None = None
|
||||
bbox_max_lon: float | None = None
|
||||
@@ -51,8 +54,8 @@ class ElevationDatasetCreateRequest(BaseModel):
|
||||
code: str = Field(min_length=2, max_length=64)
|
||||
name: str = Field(min_length=2, max_length=255)
|
||||
source: str | None = Field(default=None, max_length=128)
|
||||
mount_code: str = Field(min_length=2, max_length=64)
|
||||
file_path: str = Field(min_length=1, max_length=2048)
|
||||
mount_code: str | None = Field(default=None, min_length=2, max_length=64)
|
||||
file_name: str | None = Field(default=None, min_length=1, max_length=255)
|
||||
resolution_m: float | None = Field(default=None, gt=0)
|
||||
notes: str | None = Field(default=None, max_length=2000)
|
||||
|
||||
@@ -122,6 +125,17 @@ class ElevationDatasetPreviewResponse(BaseModel):
|
||||
warnings: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class ElevationDatasetDataImportResponse(BaseModel):
|
||||
dataset: ElevationDatasetSummary
|
||||
uploaded_file_count: int
|
||||
extracted_file_count: int
|
||||
imported_file_count: int
|
||||
analyzed: bool = False
|
||||
warning_count: int
|
||||
warnings: list[str] = Field(default_factory=list)
|
||||
imported_files: list[str] = Field(default_factory=list)
|
||||
|
||||
|
||||
class ElevationApplyJobSummary(BaseModel):
|
||||
id: str
|
||||
line_id: str
|
||||
|
||||
@@ -3,6 +3,8 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import csv
|
||||
import io
|
||||
import mimetypes
|
||||
import zipfile
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from tempfile import NamedTemporaryFile
|
||||
@@ -25,6 +27,7 @@ from ..schemas.elevation import (
|
||||
ElevationApplyJobSummary,
|
||||
ElevationDatasetAnalyzeResponse,
|
||||
ElevationDatasetBatchImportResponse,
|
||||
ElevationDatasetDataImportResponse,
|
||||
ElevationDatasetPreviewCell,
|
||||
ElevationDatasetPreviewDiagnostics,
|
||||
ElevationDatasetCreateRequest,
|
||||
@@ -34,15 +37,16 @@ from ..schemas.elevation import (
|
||||
ElevationDatasetSummary,
|
||||
ElevationDatasetUpdateRequest,
|
||||
)
|
||||
from .file_service import _build_driver_or_400, _require_mount
|
||||
from .file_service import _build_driver_or_400, _require_mount, list_enabled_mounts
|
||||
from .push_service import publish_topic
|
||||
from .storage_driver import StorageInvalidPathError, StoragePathNotFoundError
|
||||
from .storage_driver import StorageDriverError, StorageInvalidPathError, StoragePathNotFoundError, join_virtual_path, normalize_virtual_path
|
||||
|
||||
ELEVATION_TOPIC = "admin.elevation"
|
||||
POWER_LINES_TOPIC = "admin.power-lines"
|
||||
CSV_ENCODINGS = ("utf-8-sig", "utf-8", "gbk", "latin-1")
|
||||
CSV_IMPORT_TEXT_ENCODINGS = ("utf-8-sig", "utf-8", "gbk", "latin-1")
|
||||
NEAREST_MATCH_MAX_DISTANCE_M = 2000.0
|
||||
ELEVATION_DATASET_ROOT = "/elevation/datasets"
|
||||
ELEVATION_FILE_EXT_FORMAT_MAP = {
|
||||
".csv": "csv",
|
||||
".img": "img",
|
||||
@@ -50,6 +54,8 @@ ELEVATION_FILE_EXT_FORMAT_MAP = {
|
||||
".tiff": "tiff",
|
||||
}
|
||||
RASTER_FILE_FORMATS = {"img", "tif", "tiff"}
|
||||
IMPORTABLE_ELEVATION_EXTENSIONS = set(ELEVATION_FILE_EXT_FORMAT_MAP.keys())
|
||||
IMPORTABLE_ARCHIVE_EXTENSIONS = {".zip"}
|
||||
MAX_SAMPLE_COUNT_INT = 2_147_483_647
|
||||
|
||||
|
||||
@@ -103,9 +109,11 @@ def serialize_dataset(item: ElevationDataset) -> ElevationDatasetSummary:
|
||||
source=item.source,
|
||||
file_format=item.file_format,
|
||||
mount_code=item.mount_code,
|
||||
dataset_dir=item.dataset_dir,
|
||||
file_path=item.file_path,
|
||||
resolution_m=item.resolution_m,
|
||||
status=item.status, # type: ignore[arg-type]
|
||||
usage_status=item.usage_status, # type: ignore[arg-type]
|
||||
sample_count=item.sample_count,
|
||||
bbox_min_lon=item.bbox_min_lon,
|
||||
bbox_max_lon=item.bbox_max_lon,
|
||||
@@ -205,25 +213,47 @@ def create_dataset(
|
||||
*,
|
||||
actor: User,
|
||||
) -> ElevationDatasetSummary | None:
|
||||
if get_dataset_by_code(db, payload.code):
|
||||
normalized_code = payload.code.strip()
|
||||
if get_dataset_by_code(db, normalized_code):
|
||||
return None
|
||||
|
||||
normalized_file_path = payload.file_path.strip()
|
||||
file_format = _detect_file_format(normalized_file_path)
|
||||
if file_format in RASTER_FILE_FORMATS:
|
||||
_require_rasterio_available()
|
||||
_ensure_dataset_file_exists(db, mount_code=payload.mount_code, file_path=normalized_file_path)
|
||||
mount_code = _resolve_dataset_mount_code(db, requested_mount_code=payload.mount_code)
|
||||
dataset_dir = _resolve_dataset_dir(normalized_code)
|
||||
normalized_file_path: str
|
||||
file_format: str
|
||||
has_bound_file = _normalize_str(payload.file_name) is not None
|
||||
if has_bound_file:
|
||||
normalized_file_path = _resolve_dataset_file_path(
|
||||
dataset_code=normalized_code,
|
||||
filename=payload.file_name,
|
||||
)
|
||||
file_format = _detect_file_format(normalized_file_path)
|
||||
if file_format in RASTER_FILE_FORMATS:
|
||||
_require_rasterio_available()
|
||||
_ensure_dataset_file_exists(
|
||||
db,
|
||||
mount_code=mount_code,
|
||||
file_path=normalized_file_path,
|
||||
)
|
||||
else:
|
||||
normalized_file_path = _resolve_dataset_file_path(
|
||||
dataset_code=normalized_code,
|
||||
filename="dataset.csv",
|
||||
)
|
||||
file_format = "csv"
|
||||
|
||||
now = utcnow()
|
||||
item = ElevationDataset(
|
||||
code=payload.code.strip(),
|
||||
code=normalized_code,
|
||||
name=payload.name.strip(),
|
||||
source=_normalize_str(payload.source),
|
||||
file_format=file_format,
|
||||
mount_code=payload.mount_code.strip(),
|
||||
mount_code=mount_code,
|
||||
dataset_dir=dataset_dir,
|
||||
file_path=normalized_file_path,
|
||||
resolution_m=payload.resolution_m,
|
||||
status="active",
|
||||
usage_status="idle",
|
||||
notes=_normalize_str(payload.notes),
|
||||
create_date=now,
|
||||
create_user=actor.id,
|
||||
@@ -259,7 +289,7 @@ def import_datasets_from_csv(
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="CSV 文件没有可导入的数据行")
|
||||
|
||||
stats = ElevationDatasetBatchImportStats()
|
||||
batch_created_ids: list[str] = []
|
||||
batch_created_ids: list[tuple[str, bool]] = []
|
||||
|
||||
for row_index, row in enumerate(rows, start=2):
|
||||
normalized_row = {
|
||||
@@ -273,15 +303,18 @@ def import_datasets_from_csv(
|
||||
code = _pick_csv_value(normalized_row, ["code", "编码"])
|
||||
name = _pick_csv_value(normalized_row, ["name", "名称"])
|
||||
mount_code = _pick_csv_value(normalized_row, ["mount_code", "挂载编码", "挂载"])
|
||||
file_name = _pick_csv_value(normalized_row, ["file_name", "文件名", "filename"])
|
||||
file_path = _pick_csv_value(normalized_row, ["file_path", "文件路径", "路径"])
|
||||
if not file_name and file_path:
|
||||
file_name = Path(file_path).name
|
||||
source = _pick_csv_value(normalized_row, ["source", "来源"])
|
||||
notes = _pick_csv_value(normalized_row, ["notes", "备注"])
|
||||
resolution_text = _pick_csv_value(normalized_row, ["resolution_m", "分辨率", "分辨率m", "分辨率(米)"])
|
||||
|
||||
if not code or not name or not mount_code or not file_path:
|
||||
if not code or not name:
|
||||
stats.skipped_count += 1
|
||||
if stats.warnings is not None:
|
||||
stats.warnings.append(f"第 {row_index} 行缺少必填字段(code/name/mount_code/file_path),已跳过")
|
||||
stats.warnings.append(f"第 {row_index} 行缺少必填字段(code/name),已跳过")
|
||||
continue
|
||||
|
||||
try:
|
||||
@@ -290,7 +323,7 @@ def import_datasets_from_csv(
|
||||
name=name,
|
||||
source=source,
|
||||
mount_code=mount_code,
|
||||
file_path=file_path,
|
||||
file_name=file_name,
|
||||
resolution_m=_parse_csv_optional_positive_float(resolution_text),
|
||||
notes=notes,
|
||||
)
|
||||
@@ -303,7 +336,7 @@ def import_datasets_from_csv(
|
||||
stats.imported_count += 1
|
||||
if stats.items is not None:
|
||||
stats.items.append(created)
|
||||
batch_created_ids.append(created.id)
|
||||
batch_created_ids.append((created.id, bool(_normalize_str(file_name))))
|
||||
except HTTPException as exc:
|
||||
stats.skipped_count += 1
|
||||
detail = str(exc.detail) if exc.detail else "未知错误"
|
||||
@@ -311,7 +344,9 @@ def import_datasets_from_csv(
|
||||
stats.warnings.append(f"第 {row_index} 行导入失败({code}):{detail}")
|
||||
continue
|
||||
|
||||
for dataset_id in batch_created_ids:
|
||||
for dataset_id, should_analyze in batch_created_ids:
|
||||
if not should_analyze:
|
||||
continue
|
||||
try:
|
||||
analyze_dataset(db, dataset_id=dataset_id, actor=actor)
|
||||
stats.analyzed_count += 1
|
||||
@@ -333,6 +368,107 @@ def import_datasets_from_csv(
|
||||
)
|
||||
|
||||
|
||||
def import_dataset_data_files(
|
||||
db: Session,
|
||||
*,
|
||||
dataset_id: str,
|
||||
files: list[UploadFile],
|
||||
actor: User,
|
||||
) -> ElevationDatasetDataImportResponse:
|
||||
dataset = get_dataset_by_id(db, dataset_id)
|
||||
if not dataset:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="高程数据集不存在")
|
||||
if dataset.status != "active":
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="高程数据集未启用")
|
||||
if not files:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="请至少上传一个文件")
|
||||
|
||||
mount = _require_mount(db, dataset.mount_code)
|
||||
driver = _build_driver_or_400(mount)
|
||||
dataset_dir = _resolve_dataset_dir(dataset.code)
|
||||
_ensure_dataset_directory(driver=driver, dataset_dir=dataset_dir)
|
||||
|
||||
uploaded_file_count = 0
|
||||
extracted_file_count = 0
|
||||
warnings: list[str] = []
|
||||
imported_files: list[str] = []
|
||||
|
||||
for upload in files:
|
||||
filename = (upload.filename or "").strip()
|
||||
suffix = Path(filename).suffix.lower()
|
||||
if not filename:
|
||||
warnings.append("存在空文件名上传项,已跳过")
|
||||
continue
|
||||
|
||||
if suffix in IMPORTABLE_ARCHIVE_EXTENSIONS:
|
||||
archive_bytes = _read_upload_content(upload)
|
||||
zip_result = _extract_zip_to_dataset_directory(
|
||||
driver=driver,
|
||||
dataset_dir=dataset_dir,
|
||||
zip_content=archive_bytes,
|
||||
)
|
||||
extracted_file_count += zip_result["extracted_count"]
|
||||
imported_files.extend(zip_result["imported_files"])
|
||||
warnings.extend(zip_result["warnings"])
|
||||
continue
|
||||
|
||||
if suffix not in IMPORTABLE_ELEVATION_EXTENSIONS:
|
||||
warnings.append(f"文件 {filename} 类型不支持,已跳过(仅支持 csv/img/tif/tiff/zip)")
|
||||
continue
|
||||
|
||||
content = _read_upload_content(upload)
|
||||
target_path = _write_dataset_file(
|
||||
driver=driver,
|
||||
dataset_dir=dataset_dir,
|
||||
filename=filename,
|
||||
content=content,
|
||||
content_type=upload.content_type,
|
||||
)
|
||||
uploaded_file_count += 1
|
||||
imported_files.append(target_path)
|
||||
|
||||
preferred_file_path = _pick_preferred_dataset_file_path(paths=imported_files)
|
||||
if preferred_file_path is None:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="未导入任何可用高程文件")
|
||||
|
||||
dataset.dataset_dir = dataset_dir
|
||||
dataset.file_path = preferred_file_path
|
||||
dataset.file_format = _detect_file_format(preferred_file_path)
|
||||
dataset.usage_status = "idle"
|
||||
dataset.update_user = actor.id
|
||||
dataset.update_date = utcnow()
|
||||
db.commit()
|
||||
|
||||
analyzed = False
|
||||
try:
|
||||
analyze_dataset(db, dataset_id=dataset.id, actor=actor)
|
||||
analyzed = True
|
||||
except HTTPException as exc:
|
||||
detail = str(exc.detail) if exc.detail else "未知错误"
|
||||
warnings.append(f"自动分析失败:{detail}")
|
||||
except Exception as exc: # pragma: no cover
|
||||
warnings.append(f"自动分析异常:{exc}")
|
||||
|
||||
refreshed = get_dataset_by_id(db, dataset.id)
|
||||
if refreshed is None:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="数据集刷新失败")
|
||||
|
||||
_publish_elevation_change(
|
||||
"elevation.dataset.data_imported",
|
||||
{"action": "dataset_data_imported", "dataset_id": refreshed.id},
|
||||
)
|
||||
return ElevationDatasetDataImportResponse(
|
||||
dataset=serialize_dataset(refreshed),
|
||||
uploaded_file_count=uploaded_file_count,
|
||||
extracted_file_count=extracted_file_count,
|
||||
imported_file_count=len(imported_files),
|
||||
analyzed=analyzed,
|
||||
warning_count=len(warnings),
|
||||
warnings=warnings,
|
||||
imported_files=sorted(set(imported_files)),
|
||||
)
|
||||
|
||||
|
||||
def update_dataset(
|
||||
db: Session,
|
||||
dataset_id: str,
|
||||
@@ -590,6 +726,8 @@ def create_apply_job(
|
||||
if not latest:
|
||||
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="任务派发失败")
|
||||
|
||||
_refresh_dataset_usage_status(db, dataset_id=latest.dataset_id)
|
||||
|
||||
_publish_elevation_change(
|
||||
"elevation.job.created",
|
||||
{"action": "job_created", "job_id": latest.id, "line_id": latest.line_id},
|
||||
@@ -616,6 +754,7 @@ def execute_apply_job(job_id: str) -> None:
|
||||
job.started_at = utcnow()
|
||||
job.update_date = utcnow()
|
||||
db.commit()
|
||||
_refresh_dataset_usage_status(db, dataset_id=job.dataset_id)
|
||||
_publish_elevation_change(
|
||||
"elevation.job.running",
|
||||
{"action": "job_running", "job_id": job.id, "line_id": job.line_id},
|
||||
@@ -629,6 +768,7 @@ def execute_apply_job(job_id: str) -> None:
|
||||
job.finished_at = utcnow()
|
||||
job.update_date = utcnow()
|
||||
db.commit()
|
||||
_refresh_dataset_usage_status(db, dataset_id=job.dataset_id)
|
||||
_publish_elevation_change(
|
||||
"elevation.job.failed",
|
||||
{"action": "job_failed", "job_id": job.id, "line_id": job.line_id},
|
||||
@@ -658,6 +798,8 @@ def execute_apply_job(job_id: str) -> None:
|
||||
detail=f"不支持的高程文件格式: {file_format}",
|
||||
)
|
||||
|
||||
_refresh_dataset_usage_status(db, dataset_id=dataset.id)
|
||||
|
||||
warning_note = "; ".join(warnings[:5]) if warnings else None
|
||||
job.updated_tower_count = stats["updated_tower_count"]
|
||||
job.skipped_tower_count = stats["skipped_tower_count"]
|
||||
@@ -668,6 +810,7 @@ def execute_apply_job(job_id: str) -> None:
|
||||
job.finished_at = utcnow()
|
||||
job.update_date = utcnow()
|
||||
db.commit()
|
||||
_refresh_dataset_usage_status(db, dataset_id=job.dataset_id)
|
||||
|
||||
_publish_elevation_change(
|
||||
"elevation.job.success",
|
||||
@@ -692,6 +835,7 @@ def execute_apply_job(job_id: str) -> None:
|
||||
failed.finished_at = utcnow()
|
||||
failed.update_date = utcnow()
|
||||
db.commit()
|
||||
_refresh_dataset_usage_status(db, dataset_id=failed.dataset_id)
|
||||
_publish_elevation_change(
|
||||
"elevation.job.failed",
|
||||
{"action": "job_failed", "job_id": failed.id, "line_id": failed.line_id},
|
||||
@@ -704,14 +848,197 @@ def execute_apply_job(job_id: str) -> None:
|
||||
def _ensure_dataset_file_exists(db: Session, *, mount_code: str, file_path: str) -> None:
|
||||
mount = _require_mount(db, mount_code.strip())
|
||||
driver = _build_driver_or_400(mount)
|
||||
normalized_path = normalize_virtual_path(file_path.strip())
|
||||
try:
|
||||
driver.read_file(file_path.strip())
|
||||
driver.read_file(normalized_path)
|
||||
except StoragePathNotFoundError as exc:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"数据文件不存在: {file_path}") from exc
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"数据文件不存在: {normalized_path}") from exc
|
||||
except StorageInvalidPathError as exc:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
|
||||
|
||||
|
||||
def _resolve_dataset_mount_code(db: Session, *, requested_mount_code: str | None) -> str:
|
||||
code = _normalize_str(requested_mount_code)
|
||||
if code is not None:
|
||||
_require_mount(db, code)
|
||||
return code
|
||||
|
||||
mounts = list_enabled_mounts(db)
|
||||
if not mounts:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="无可用文件挂载点")
|
||||
return mounts[0].code
|
||||
|
||||
|
||||
def _resolve_dataset_dir(dataset_code: str) -> str:
|
||||
normalized_code = _normalize_dataset_code(dataset_code)
|
||||
return f"{ELEVATION_DATASET_ROOT}/{normalized_code}"
|
||||
|
||||
|
||||
def _resolve_dataset_file_path(*, dataset_code: str, filename: str | None) -> str:
|
||||
normalized_name = _normalize_dataset_filename(filename)
|
||||
dataset_dir = _resolve_dataset_dir(dataset_code)
|
||||
return join_virtual_path(dataset_dir, normalized_name)
|
||||
|
||||
|
||||
def _normalize_dataset_code(value: str) -> str:
|
||||
code = value.strip()
|
||||
if not code:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="数据集编码不能为空")
|
||||
if any(char in code for char in ("/", "\\")):
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="数据集编码不能包含路径分隔符")
|
||||
return code
|
||||
|
||||
|
||||
def _normalize_dataset_filename(value: str | None) -> str:
|
||||
name = (value or "").strip()
|
||||
if not name:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件名不能为空")
|
||||
name = Path(name).name.strip()
|
||||
if not name or name in {".", ".."}:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件名不合法")
|
||||
suffix = Path(name).suffix.lower()
|
||||
if suffix not in IMPORTABLE_ELEVATION_EXTENSIONS:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_400_BAD_REQUEST,
|
||||
detail=f"不支持的高程文件类型: {suffix or 'unknown'},仅支持 .csv/.img/.tif/.tiff",
|
||||
)
|
||||
return name
|
||||
|
||||
|
||||
def _ensure_dataset_directory(*, driver: Any, dataset_dir: str) -> None:
|
||||
try:
|
||||
normalized_dir = normalize_virtual_path(dataset_dir)
|
||||
driver.ensure_directory(ELEVATION_DATASET_ROOT)
|
||||
driver.ensure_directory(normalized_dir)
|
||||
except StorageInvalidPathError as exc:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
|
||||
except StorageDriverError as exc:
|
||||
raise HTTPException(status_code=status.HTTP_502_BAD_GATEWAY, detail=str(exc)) from exc
|
||||
|
||||
|
||||
def _read_upload_content(upload: UploadFile) -> bytes:
|
||||
name = (upload.filename or "-").strip() or "-"
|
||||
try:
|
||||
content = upload.file.read()
|
||||
except Exception as exc:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"读取上传文件失败:{exc}") from exc
|
||||
finally:
|
||||
try:
|
||||
upload.file.close()
|
||||
except Exception:
|
||||
pass
|
||||
if not content:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"上传文件为空:{name}")
|
||||
return content
|
||||
|
||||
|
||||
def _write_dataset_file(
|
||||
*,
|
||||
driver: Any,
|
||||
dataset_dir: str,
|
||||
filename: str,
|
||||
content: bytes,
|
||||
content_type: str | None,
|
||||
) -> str:
|
||||
normalized_name = _normalize_dataset_filename(filename)
|
||||
target_path = join_virtual_path(dataset_dir, normalized_name)
|
||||
try:
|
||||
driver.write_file(
|
||||
target_path,
|
||||
content=content,
|
||||
content_type=content_type or mimetypes.guess_type(normalized_name)[0],
|
||||
)
|
||||
except StorageInvalidPathError as exc:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(exc)) from exc
|
||||
except StorageDriverError as exc:
|
||||
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(exc)) from exc
|
||||
return target_path
|
||||
|
||||
|
||||
def _extract_zip_to_dataset_directory(
|
||||
*,
|
||||
driver: Any,
|
||||
dataset_dir: str,
|
||||
zip_content: bytes,
|
||||
) -> dict[str, Any]:
|
||||
warnings: list[str] = []
|
||||
imported_files: list[str] = []
|
||||
extracted_count = 0
|
||||
try:
|
||||
with zipfile.ZipFile(io.BytesIO(zip_content)) as archive:
|
||||
for member in archive.infolist():
|
||||
if member.is_dir():
|
||||
continue
|
||||
member_name = Path(member.filename).name
|
||||
if not member_name:
|
||||
warnings.append(f"压缩包条目 {member.filename} 文件名无效,已跳过")
|
||||
continue
|
||||
suffix = Path(member_name).suffix.lower()
|
||||
if suffix not in IMPORTABLE_ELEVATION_EXTENSIONS:
|
||||
warnings.append(f"压缩包条目 {member_name} 类型不支持,已跳过")
|
||||
continue
|
||||
try:
|
||||
data = archive.read(member)
|
||||
except Exception as exc:
|
||||
warnings.append(f"压缩包条目 {member_name} 读取失败:{exc}")
|
||||
continue
|
||||
if not data:
|
||||
warnings.append(f"压缩包条目 {member_name} 内容为空,已跳过")
|
||||
continue
|
||||
path = _write_dataset_file(
|
||||
driver=driver,
|
||||
dataset_dir=dataset_dir,
|
||||
filename=member_name,
|
||||
content=data,
|
||||
content_type=mimetypes.guess_type(member_name)[0],
|
||||
)
|
||||
imported_files.append(path)
|
||||
extracted_count += 1
|
||||
except zipfile.BadZipFile as exc:
|
||||
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"ZIP 文件损坏:{exc}") from exc
|
||||
return {
|
||||
"warnings": warnings,
|
||||
"imported_files": imported_files,
|
||||
"extracted_count": extracted_count,
|
||||
}
|
||||
|
||||
|
||||
def _pick_preferred_dataset_file_path(*, paths: list[str]) -> str | None:
|
||||
if not paths:
|
||||
return None
|
||||
for extension in (".img", ".tif", ".tiff", ".csv"):
|
||||
for path in paths:
|
||||
if path.lower().endswith(extension):
|
||||
return path
|
||||
return paths[0]
|
||||
|
||||
|
||||
def _refresh_dataset_usage_status(db: Session, *, dataset_id: str) -> None:
|
||||
dataset = get_dataset_by_id(db, dataset_id)
|
||||
if dataset is None:
|
||||
return
|
||||
running_count = int(
|
||||
db.scalar(
|
||||
select(func.count())
|
||||
.select_from(ElevationApplyJob)
|
||||
.where(
|
||||
ElevationApplyJob.dataset_id == dataset_id,
|
||||
ElevationApplyJob.status.in_(("pending", "running")),
|
||||
)
|
||||
)
|
||||
or 0
|
||||
)
|
||||
next_status = "in_use" if running_count > 0 else "idle"
|
||||
if dataset.usage_status != next_status:
|
||||
dataset.usage_status = next_status
|
||||
dataset.update_date = utcnow()
|
||||
db.commit()
|
||||
_publish_elevation_change(
|
||||
"elevation.dataset.usage_status_changed",
|
||||
{"action": "dataset_usage_status_changed", "dataset_id": dataset_id},
|
||||
)
|
||||
|
||||
|
||||
def _load_dataset_points(
|
||||
db: Session,
|
||||
dataset: ElevationDataset,
|
||||
|
||||
Reference in New Issue
Block a user