fix: honor permit overrides and bindings

This commit is contained in:
Codex Agent 2025-11-14 11:43:43 +08:00
parent 0076d2db2f
commit 90fa969046
2 changed files with 160 additions and 21 deletions

View File

@ -210,6 +210,26 @@ def _clean_text(value: Any) -> str:
return str(value).strip()
def _normalize_permit_token(value: Any) -> str:
"""Normalize permit names for dictionary lookups (case/whitespace insensitive)."""
text = _clean_text(value)
if not text:
return ""
return text.lower()
def _permit_name_aliases(value: Any) -> Set[str]:
"""Return canonical + token aliases for a permit name."""
aliases: Set[str] = set()
canonical = _clean_text(value)
token = _normalize_permit_token(value)
if canonical:
aliases.add(canonical)
if token and token != canonical:
aliases.add(token)
return aliases
def _normalize_theme_binding_value(value: Any) -> str:
"""Normalize theme binding payload values (including virtual markers)."""
text = _clean_text(value)
@ -318,6 +338,16 @@ def _canonicalize_region_label(label: str) -> str:
return text
def _normalize_sheet_token(value: Any) -> str:
"""Normalize sheet/region identifiers for override maps."""
text = _clean_text(value)
if not text:
return ""
canonical = _canonicalize_region_label(text)
token_source = canonical or text
return token_source.lower()
def _normalize_import_row(
raw_row: Dict[str, Any],
sheet_name: str,
@ -690,7 +720,13 @@ def start_permit_import_session(
)
for region_id, permit_id, permit_name in cur.fetchall():
rid = str(region_id)
permit_lookup.setdefault(rid, {})[str(permit_name)] = str(permit_id)
canonical_name = _clean_text(permit_name)
normalized_permit = _normalize_permit_token(permit_name)
permit_id_str = str(permit_id)
if canonical_name:
permit_lookup.setdefault(rid, {})[canonical_name] = permit_id_str
if normalized_permit:
permit_lookup.setdefault(rid, {})[normalized_permit] = permit_id_str
sheet_summaries: List[Dict[str, Any]] = []
session_sheets: Dict[str, Dict[str, Any]] = {}
@ -707,10 +743,18 @@ def start_permit_import_session(
existing_permits = permit_lookup.get(region_id_str or "", {})
duplicate_permits = sorted(
[name for name in permit_groups.keys() if name in existing_permits]
[
name
for name in permit_groups.keys()
if _normalize_permit_token(name) in existing_permits
]
)
new_permits = sorted(
[name for name in permit_groups.keys() if name not in existing_permits]
[
name
for name in permit_groups.keys()
if _normalize_permit_token(name) not in existing_permits
]
)
logger.info(
@ -903,7 +947,16 @@ def _fetch_region_permit_name_map(conn: pg.Connection, region_id: str) -> Dict[s
""",
(region_id,),
)
return {str(name): str(pid) for name, pid in cur.fetchall()}
mapping: Dict[str, str] = {}
for name, pid in cur.fetchall():
canonical = _clean_text(name)
token = _normalize_permit_token(name)
permit_id = str(pid)
if canonical:
mapping[canonical] = permit_id
if token:
mapping[token] = permit_id
return mapping
def _backup_permit_before_import(
@ -1045,33 +1098,44 @@ def commit_permit_import_session(
overrides_map: Dict[str, Set[str]] = {}
if overrides:
for sheet_key, permit_names in overrides.items():
sheet_token = _clean_text(sheet_key)
sheet_token = _normalize_sheet_token(sheet_key) or _clean_text(sheet_key)
if not sheet_token:
continue
overrides_map[sheet_token] = {
_clean_text(name) for name in (permit_names or []) if _clean_text(name)
}
bucket = overrides_map.setdefault(sheet_token, set())
for raw_name in permit_names or []:
aliases = _permit_name_aliases(raw_name)
if aliases:
bucket.update(aliases)
theme_binding_map: Dict[str, Dict[str, List[str]]] = {}
if theme_bindings:
for sheet_key, permit_map in theme_bindings.items():
sheet_token = _clean_text(sheet_key)
sheet_token = _normalize_sheet_token(sheet_key) or _clean_text(sheet_key)
if not sheet_token:
continue
permit_binding: Dict[str, List[str]] = {}
permit_binding = theme_binding_map.setdefault(sheet_token, {})
for permit_key, theme_values in (permit_map or {}).items():
permit_token = _clean_text(permit_key)
if not permit_token:
canonical_permit = _clean_text(permit_key)
permit_token = _normalize_permit_token(permit_key)
if not canonical_permit and not permit_token:
continue
normalized_themes: List[str] = []
for raw_theme in theme_values or []:
normalized = _normalize_theme_binding_value(raw_theme)
if normalized:
normalized_themes.append(normalized)
if normalized_themes:
if not normalized_themes:
continue
if canonical_permit:
permit_binding[canonical_permit] = normalized_themes
if permit_token and permit_token != canonical_permit:
permit_binding[permit_token] = normalized_themes
if permit_binding:
theme_binding_map[sheet_token] = permit_binding
if overrides_map:
override_debug = ", ".join(f"{sheet}:{len(names)}" for sheet, names in overrides_map.items())
logger.info("[PERMIT-IMPORT] Confirmed overrides => %s", override_debug)
else:
logger.info("[PERMIT-IMPORT] No override confirmations supplied")
default_change_summary = change_summary or (f"Excel导入{workbook_filename}" if workbook_filename else "Excel导入")
@ -1130,7 +1194,18 @@ def commit_permit_import_session(
sheet_data["existing_permits"] = {}
existing_permits = dict(sheet_data.get("existing_permits", {}))
override_set = overrides_map.get(sheet_name, set())
sheet_clean_name = _clean_text(sheet_name)
sheet_token = _normalize_sheet_token(sheet_name) or sheet_clean_name
override_set = overrides_map.get(sheet_token)
if override_set is None and sheet_clean_name != sheet_token:
override_set = overrides_map.get(sheet_clean_name)
if override_set is None:
override_set = set()
binding_sheet_map = theme_binding_map.get(sheet_token)
if binding_sheet_map is None and sheet_clean_name != sheet_token:
binding_sheet_map = theme_binding_map.get(sheet_clean_name)
if binding_sheet_map is None:
binding_sheet_map = {}
permit_groups: Dict[str, List[Dict[str, Any]]] = sheet_data.get("permit_groups", {})
sheet_snapshot_count = 0
@ -1141,11 +1216,12 @@ def commit_permit_import_session(
for permit_name, permit_rows in permit_groups.items():
canonical_permit_name = _clean_text(permit_name)
if not canonical_permit_name:
permit_token = _normalize_permit_token(permit_name)
if not canonical_permit_name or not permit_token:
continue
permit_id = existing_permits.get(canonical_permit_name)
should_override = canonical_permit_name in override_set
permit_id = existing_permits.get(permit_token) or existing_permits.get(canonical_permit_name)
should_override = (permit_token in override_set) or (canonical_permit_name in override_set)
permit_modified = False
if permit_id and not should_override:
@ -1187,7 +1263,8 @@ def commit_permit_import_session(
permit_modified = True
else:
permit_id = _ensure_permit(conn, canonical_permit_name)
existing_permits[canonical_permit_name] = permit_id
for alias in _permit_name_aliases(canonical_permit_name) or {canonical_permit_name}:
existing_permits[alias] = permit_id
sheet_created.append(canonical_permit_name)
result["created_permits"].append(
{
@ -1198,7 +1275,11 @@ def commit_permit_import_session(
)
permit_modified = True
binding_override = theme_binding_map.get(sheet_name, {}).get(canonical_permit_name)
binding_override = None
for alias in _permit_name_aliases(canonical_permit_name) or {canonical_permit_name}:
binding_override = binding_sheet_map.get(alias)
if binding_override:
break
override_theme_names: Set[str] = set()
binds_all_themes = False
if binding_override:

View File

@ -0,0 +1,58 @@
from __future__ import annotations
from io import BytesIO
from openpyxl import Workbook
from lawrisk.services import licensing_repo
def _build_workbook() -> bytes:
wb = Workbook()
ws = wb.active
ws.title = "市级"
ws.append(["许可事项", "主题", "风险提示"])
ws.append([
"成品油、危险化学品经营许可",
"市场监管、应急管理",
"测试风险",
])
buf = BytesIO()
wb.save(buf)
return buf.getvalue()
def test_split_multi_value_does_not_use_dunhao_by_default() -> None:
assert licensing_repo._split_multi_value("甲、乙") == ["甲、乙"]
assert licensing_repo._split_multi_value("甲、乙", allow_dunhao=True) == ["", ""]
def test_parse_workbook_keeps_permit_name_with_dunhao() -> None:
file_bytes = _build_workbook()
parsed = licensing_repo._parse_import_workbook(file_bytes, "sample.xlsx")
sheet = parsed["sheets"]["市级"]
rows = sheet["rows"]
assert len(rows) == 1
row = rows[0]
assert row["permit_name"] == "成品油、危险化学品经营许可"
# Themes still split because allow_dunhao=True for that field
assert row["theme_names"] == ["市场监管", "应急管理"]
def test_normalize_theme_binding_value_maps_all_theme_marker() -> None:
assert licensing_repo._normalize_theme_binding_value("__ALL_THEMES__") == licensing_repo.ALL_THEMES_SENTINEL
assert licensing_repo._normalize_theme_binding_value("所有主题") == licensing_repo.ALL_THEMES_SENTINEL
def test_normalize_permit_token_lowercases_ascii_names() -> None:
assert licensing_repo._normalize_permit_token(" PermitX ") == "permitx"
def test_normalize_sheet_token_canonicalizes_alias_names() -> None:
assert licensing_repo._normalize_sheet_token("禅城区(无意见)") == "禅城区"
def test_permit_name_aliases_cover_canonical_and_token() -> None:
aliases = licensing_repo._permit_name_aliases(" 许可A ")
assert "许可A" in aliases
assert "许可a" in aliases