#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Excel服务部门层级信息补充脚本 功能: 1. 读取Excel文件,为每条记录添加"绑定服务部门"和"服务部门所属部门"两列 2. 根据层级规则和区名关键词自动判断上级部门 3. 生成统计报告并保存补充后的Excel文件 """ import pandas as pd import sys from pathlib import Path from collections import Counter # 区名关键词映射表 REGION_KEYWORDS = { '禅城区': '禅城区服务部门', '南海区': '南海区服务部门', '顺德区': '顺德区服务部门', '三水区': '三水区服务部门', '高明区': '高明区服务部门' } MUNICIPAL_DEPT = '市级服务部门' def extract_region_service_dept(unit_name): """ 从单位名称中提取区级服务部门 Args: unit_name: 单位名称字符串 Returns: 对应的区级服务部门名称,如果未找到则返回None """ if pd.isna(unit_name): return None unit_name_str = str(unit_name) # 按优先级顺序匹配区名关键词 for keyword, service_dept in REGION_KEYWORDS.items(): if keyword in unit_name_str: return service_dept return None def determine_parent_department(unit_name, level_value): """ 确定服务部门所属部门 规则: 1. 如果层级值包含"市级" → 市级服务部门 2. 如果单位名称包含区名关键词 → 对应区级服务部门 3. 否则默认为市级服务部门(兜底策略) Args: unit_name: 单位名称 level_value: 层级/风险提示/备注列的值 Returns: 服务部门所属部门名称 """ # 规则1: 检查层级值是否包含"市级" if pd.notna(level_value) and '市级' in str(level_value): return MUNICIPAL_DEPT # 规则2: 从单位名称中提取区级服务部门 region_dept = extract_region_service_dept(unit_name) if region_dept: return region_dept # 规则3: 兜底策略 - 默认为市级服务部门 return MUNICIPAL_DEPT def supplement_excel_file(input_path, output_path): """ 补充Excel文件的部门层级信息 Args: input_path: 输入Excel文件路径 output_path: 输出Excel文件路径 """ print(f"[INFO] Reading Excel file: {input_path}") # 读取Excel文件 try: df = pd.read_excel(input_path) print(f"[OK] Successfully read {len(df)} records") except Exception as e: print(f"[ERROR] Failed to read Excel file: {e}") sys.exit(1) # 显示原始列名 print(f"\n[INFO] Original columns ({len(df.columns)} columns):") for i, col in enumerate(df.columns): print(f" {i+1}. {col}") # 添加"绑定服务部门"列(直接复制"单位名称"列) # 假设第二列是"单位名称" unit_name_col = df.columns[1] # 第二列 df['绑定服务部门'] = df[unit_name_col] # 添加"服务部门所属部门"列(根据规则计算) level_col = df.columns[4] # "层级/风险提示/备注"列(第5列) df['服务部门所属部门'] = df.apply( lambda row: determine_parent_department(row[unit_name_col], row[level_col]), axis=1 ) print(f"\n[OK] Added 2 columns:") print(f" - Binding Service Department (copied from '{unit_name_col}')") print(f" - Parent Service Department (calculated based on hierarchy rules)") # 生成统计报告 print(f"\n[STATS] Parent Service Department Distribution:") parent_dept_counts = Counter(df['服务部门所属部门']) # 按标准顺序显示 standard_order = [MUNICIPAL_DEPT] + list(REGION_KEYWORDS.values()) for dept in standard_order: count = parent_dept_counts.get(dept, 0) percentage = (count / len(df) * 100) if len(df) > 0 else 0 print(f" {dept}: {count} 条 ({percentage:.1f}%)") # 检查无法匹配的记录 unmapped_records = df[ (~df[level_col].astype(str).str.contains('市级', na=False)) & (~df[unit_name_col].astype(str).str.contains('|'.join(REGION_KEYWORDS.keys()), na=False)) ] if len(unmapped_records) > 0: print(f"\n[WARNING] Found {len(unmapped_records)} records that could not be matched by region keywords, assigned to Municipal Service Department by default:") print(" Sample records:") for idx, row in unmapped_records.head(5).iterrows(): print(f" - {row[unit_name_col]} (层级: {row[level_col]})") if len(unmapped_records) > 5: print(f" ... 还有 {len(unmapped_records) - 5} 条") # 保存补充后的Excel文件 print(f"\n[SAVE] Saving supplemented Excel file: {output_path}") try: df.to_excel(output_path, index=False, engine='openpyxl') print(f"[OK] Successfully saved {len(df)} records to {output_path}") except Exception as e: print(f"[ERROR] Failed to save Excel file: {e}") sys.exit(1) # 验证数据完整性 print(f"\n[VERIFY] Data Integrity Validation:") null_binding = df['绑定服务部门'].isna().sum() null_parent = df['服务部门所属部门'].isna().sum() print(f" Total records: {len(df)}") print(f" Null values in Binding Service Department: {null_binding}") print(f" Null values in Parent Service Department: {null_parent}") print(f" Total columns: {len(df.columns)}") if null_binding == 0 and null_parent == 0: print(f"[OK] Data integrity validation passed") else: print(f"[WARNING] Found null values, please check data source") # 显示示例数据 print(f"\n[DATA] Sample data (first 5 rows):") sample_cols = [unit_name_col, level_col, '绑定服务部门', '服务部门所属部门'] print(df[sample_cols].head(10).to_string(index=False)) return df def main(): """主函数""" # 定义文件路径 base_path = Path(__file__).parent.parent input_file = base_path / 'data' / '法律风险提示管理员名单_合并-20260309.xlsx' output_file = base_path / 'data' / '法律风险提示管理员名单_合并-20260309_supplemented.xlsx' print("=" * 80) print("Excel服务部门层级信息补充工具") print("=" * 80) # 检查输入文件是否存在 if not input_file.exists(): print(f"[ERROR] Input file does not exist: {input_file}") sys.exit(1) # 执行补充处理 df = supplement_excel_file(str(input_file), str(output_file)) print("\n" + "=" * 80) print("[OK] Processing completed!") print(f"[FILE] Output file: {output_file}") print("=" * 80) if __name__ == '__main__': main()