206 lines
6.6 KiB
Python
206 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Excel服务部门层级信息补充脚本
|
||
|
||
功能:
|
||
1. 读取Excel文件,为每条记录添加"绑定服务部门"和"服务部门所属部门"两列
|
||
2. 根据层级规则和区名关键词自动判断上级部门
|
||
3. 生成统计报告并保存补充后的Excel文件
|
||
"""
|
||
|
||
import pandas as pd
|
||
import sys
|
||
from pathlib import Path
|
||
from collections import Counter
|
||
|
||
|
||
# 区名关键词映射表
|
||
REGION_KEYWORDS = {
|
||
'禅城区': '禅城区服务部门',
|
||
'南海区': '南海区服务部门',
|
||
'顺德区': '顺德区服务部门',
|
||
'三水区': '三水区服务部门',
|
||
'高明区': '高明区服务部门'
|
||
}
|
||
|
||
MUNICIPAL_DEPT = '市级服务部门'
|
||
|
||
|
||
def extract_region_service_dept(unit_name):
|
||
"""
|
||
从单位名称中提取区级服务部门
|
||
|
||
Args:
|
||
unit_name: 单位名称字符串
|
||
|
||
Returns:
|
||
对应的区级服务部门名称,如果未找到则返回None
|
||
"""
|
||
if pd.isna(unit_name):
|
||
return None
|
||
|
||
unit_name_str = str(unit_name)
|
||
|
||
# 按优先级顺序匹配区名关键词
|
||
for keyword, service_dept in REGION_KEYWORDS.items():
|
||
if keyword in unit_name_str:
|
||
return service_dept
|
||
|
||
return None
|
||
|
||
|
||
def determine_parent_department(unit_name, level_value):
|
||
"""
|
||
确定服务部门所属部门
|
||
|
||
规则:
|
||
1. 如果层级值包含"市级" → 市级服务部门
|
||
2. 如果单位名称包含区名关键词 → 对应区级服务部门
|
||
3. 否则默认为市级服务部门(兜底策略)
|
||
|
||
Args:
|
||
unit_name: 单位名称
|
||
level_value: 层级/风险提示/备注列的值
|
||
|
||
Returns:
|
||
服务部门所属部门名称
|
||
"""
|
||
# 规则1: 检查层级值是否包含"市级"
|
||
if pd.notna(level_value) and '市级' in str(level_value):
|
||
return MUNICIPAL_DEPT
|
||
|
||
# 规则2: 从单位名称中提取区级服务部门
|
||
region_dept = extract_region_service_dept(unit_name)
|
||
if region_dept:
|
||
return region_dept
|
||
|
||
# 规则3: 兜底策略 - 默认为市级服务部门
|
||
return MUNICIPAL_DEPT
|
||
|
||
|
||
def supplement_excel_file(input_path, output_path):
|
||
"""
|
||
补充Excel文件的部门层级信息
|
||
|
||
Args:
|
||
input_path: 输入Excel文件路径
|
||
output_path: 输出Excel文件路径
|
||
"""
|
||
print(f"[INFO] Reading Excel file: {input_path}")
|
||
|
||
# 读取Excel文件
|
||
try:
|
||
df = pd.read_excel(input_path)
|
||
print(f"[OK] Successfully read {len(df)} records")
|
||
except Exception as e:
|
||
print(f"[ERROR] Failed to read Excel file: {e}")
|
||
sys.exit(1)
|
||
|
||
# 显示原始列名
|
||
print(f"\n[INFO] Original columns ({len(df.columns)} columns):")
|
||
for i, col in enumerate(df.columns):
|
||
print(f" {i+1}. {col}")
|
||
|
||
# 添加"绑定服务部门"列(直接复制"单位名称"列)
|
||
# 假设第二列是"单位名称"
|
||
unit_name_col = df.columns[1] # 第二列
|
||
df['绑定服务部门'] = df[unit_name_col]
|
||
|
||
# 添加"服务部门所属部门"列(根据规则计算)
|
||
level_col = df.columns[4] # "层级/风险提示/备注"列(第5列)
|
||
|
||
df['服务部门所属部门'] = df.apply(
|
||
lambda row: determine_parent_department(row[unit_name_col], row[level_col]),
|
||
axis=1
|
||
)
|
||
|
||
print(f"\n[OK] Added 2 columns:")
|
||
print(f" - Binding Service Department (copied from '{unit_name_col}')")
|
||
print(f" - Parent Service Department (calculated based on hierarchy rules)")
|
||
|
||
# 生成统计报告
|
||
print(f"\n[STATS] Parent Service Department Distribution:")
|
||
parent_dept_counts = Counter(df['服务部门所属部门'])
|
||
|
||
# 按标准顺序显示
|
||
standard_order = [MUNICIPAL_DEPT] + list(REGION_KEYWORDS.values())
|
||
for dept in standard_order:
|
||
count = parent_dept_counts.get(dept, 0)
|
||
percentage = (count / len(df) * 100) if len(df) > 0 else 0
|
||
print(f" {dept}: {count} 条 ({percentage:.1f}%)")
|
||
|
||
# 检查无法匹配的记录
|
||
unmapped_records = df[
|
||
(~df[level_col].astype(str).str.contains('市级', na=False)) &
|
||
(~df[unit_name_col].astype(str).str.contains('|'.join(REGION_KEYWORDS.keys()), na=False))
|
||
]
|
||
|
||
if len(unmapped_records) > 0:
|
||
print(f"\n[WARNING] Found {len(unmapped_records)} records that could not be matched by region keywords, assigned to Municipal Service Department by default:")
|
||
print(" Sample records:")
|
||
for idx, row in unmapped_records.head(5).iterrows():
|
||
print(f" - {row[unit_name_col]} (层级: {row[level_col]})")
|
||
if len(unmapped_records) > 5:
|
||
print(f" ... 还有 {len(unmapped_records) - 5} 条")
|
||
|
||
# 保存补充后的Excel文件
|
||
print(f"\n[SAVE] Saving supplemented Excel file: {output_path}")
|
||
try:
|
||
df.to_excel(output_path, index=False, engine='openpyxl')
|
||
print(f"[OK] Successfully saved {len(df)} records to {output_path}")
|
||
except Exception as e:
|
||
print(f"[ERROR] Failed to save Excel file: {e}")
|
||
sys.exit(1)
|
||
|
||
# 验证数据完整性
|
||
print(f"\n[VERIFY] Data Integrity Validation:")
|
||
null_binding = df['绑定服务部门'].isna().sum()
|
||
null_parent = df['服务部门所属部门'].isna().sum()
|
||
|
||
print(f" Total records: {len(df)}")
|
||
print(f" Null values in Binding Service Department: {null_binding}")
|
||
print(f" Null values in Parent Service Department: {null_parent}")
|
||
print(f" Total columns: {len(df.columns)}")
|
||
|
||
if null_binding == 0 and null_parent == 0:
|
||
print(f"[OK] Data integrity validation passed")
|
||
else:
|
||
print(f"[WARNING] Found null values, please check data source")
|
||
|
||
# 显示示例数据
|
||
print(f"\n[DATA] Sample data (first 5 rows):")
|
||
sample_cols = [unit_name_col, level_col, '绑定服务部门', '服务部门所属部门']
|
||
print(df[sample_cols].head(10).to_string(index=False))
|
||
|
||
return df
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
# 定义文件路径
|
||
base_path = Path(__file__).parent.parent
|
||
input_file = base_path / 'data' / '法律风险提示管理员名单_合并-20260309.xlsx'
|
||
output_file = base_path / 'data' / '法律风险提示管理员名单_合并-20260309_supplemented.xlsx'
|
||
|
||
print("=" * 80)
|
||
print("Excel服务部门层级信息补充工具")
|
||
print("=" * 80)
|
||
|
||
# 检查输入文件是否存在
|
||
if not input_file.exists():
|
||
print(f"[ERROR] Input file does not exist: {input_file}")
|
||
sys.exit(1)
|
||
|
||
# 执行补充处理
|
||
df = supplement_excel_file(str(input_file), str(output_file))
|
||
|
||
print("\n" + "=" * 80)
|
||
print("[OK] Processing completed!")
|
||
print(f"[FILE] Output file: {output_file}")
|
||
print("=" * 80)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|