fs-lawrisk/tools/supplement_excel_with_depar...

206 lines
6.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Excel服务部门层级信息补充脚本
功能:
1. 读取Excel文件为每条记录添加"绑定服务部门""服务部门所属部门"两列
2. 根据层级规则和区名关键词自动判断上级部门
3. 生成统计报告并保存补充后的Excel文件
"""
import pandas as pd
import sys
from pathlib import Path
from collections import Counter
# 区名关键词映射表
REGION_KEYWORDS = {
'禅城区': '禅城区服务部门',
'南海区': '南海区服务部门',
'顺德区': '顺德区服务部门',
'三水区': '三水区服务部门',
'高明区': '高明区服务部门'
}
MUNICIPAL_DEPT = '市级服务部门'
def extract_region_service_dept(unit_name):
"""
从单位名称中提取区级服务部门
Args:
unit_name: 单位名称字符串
Returns:
对应的区级服务部门名称如果未找到则返回None
"""
if pd.isna(unit_name):
return None
unit_name_str = str(unit_name)
# 按优先级顺序匹配区名关键词
for keyword, service_dept in REGION_KEYWORDS.items():
if keyword in unit_name_str:
return service_dept
return None
def determine_parent_department(unit_name, level_value):
"""
确定服务部门所属部门
规则:
1. 如果层级值包含"市级" → 市级服务部门
2. 如果单位名称包含区名关键词 → 对应区级服务部门
3. 否则默认为市级服务部门(兜底策略)
Args:
unit_name: 单位名称
level_value: 层级/风险提示/备注列的值
Returns:
服务部门所属部门名称
"""
# 规则1: 检查层级值是否包含"市级"
if pd.notna(level_value) and '市级' in str(level_value):
return MUNICIPAL_DEPT
# 规则2: 从单位名称中提取区级服务部门
region_dept = extract_region_service_dept(unit_name)
if region_dept:
return region_dept
# 规则3: 兜底策略 - 默认为市级服务部门
return MUNICIPAL_DEPT
def supplement_excel_file(input_path, output_path):
"""
补充Excel文件的部门层级信息
Args:
input_path: 输入Excel文件路径
output_path: 输出Excel文件路径
"""
print(f"[INFO] Reading Excel file: {input_path}")
# 读取Excel文件
try:
df = pd.read_excel(input_path)
print(f"[OK] Successfully read {len(df)} records")
except Exception as e:
print(f"[ERROR] Failed to read Excel file: {e}")
sys.exit(1)
# 显示原始列名
print(f"\n[INFO] Original columns ({len(df.columns)} columns):")
for i, col in enumerate(df.columns):
print(f" {i+1}. {col}")
# 添加"绑定服务部门"列(直接复制"单位名称"列)
# 假设第二列是"单位名称"
unit_name_col = df.columns[1] # 第二列
df['绑定服务部门'] = df[unit_name_col]
# 添加"服务部门所属部门"列(根据规则计算)
level_col = df.columns[4] # "层级/风险提示/备注"列第5列
df['服务部门所属部门'] = df.apply(
lambda row: determine_parent_department(row[unit_name_col], row[level_col]),
axis=1
)
print(f"\n[OK] Added 2 columns:")
print(f" - Binding Service Department (copied from '{unit_name_col}')")
print(f" - Parent Service Department (calculated based on hierarchy rules)")
# 生成统计报告
print(f"\n[STATS] Parent Service Department Distribution:")
parent_dept_counts = Counter(df['服务部门所属部门'])
# 按标准顺序显示
standard_order = [MUNICIPAL_DEPT] + list(REGION_KEYWORDS.values())
for dept in standard_order:
count = parent_dept_counts.get(dept, 0)
percentage = (count / len(df) * 100) if len(df) > 0 else 0
print(f" {dept}: {count} 条 ({percentage:.1f}%)")
# 检查无法匹配的记录
unmapped_records = df[
(~df[level_col].astype(str).str.contains('市级', na=False)) &
(~df[unit_name_col].astype(str).str.contains('|'.join(REGION_KEYWORDS.keys()), na=False))
]
if len(unmapped_records) > 0:
print(f"\n[WARNING] Found {len(unmapped_records)} records that could not be matched by region keywords, assigned to Municipal Service Department by default:")
print(" Sample records:")
for idx, row in unmapped_records.head(5).iterrows():
print(f" - {row[unit_name_col]} (层级: {row[level_col]})")
if len(unmapped_records) > 5:
print(f" ... 还有 {len(unmapped_records) - 5}")
# 保存补充后的Excel文件
print(f"\n[SAVE] Saving supplemented Excel file: {output_path}")
try:
df.to_excel(output_path, index=False, engine='openpyxl')
print(f"[OK] Successfully saved {len(df)} records to {output_path}")
except Exception as e:
print(f"[ERROR] Failed to save Excel file: {e}")
sys.exit(1)
# 验证数据完整性
print(f"\n[VERIFY] Data Integrity Validation:")
null_binding = df['绑定服务部门'].isna().sum()
null_parent = df['服务部门所属部门'].isna().sum()
print(f" Total records: {len(df)}")
print(f" Null values in Binding Service Department: {null_binding}")
print(f" Null values in Parent Service Department: {null_parent}")
print(f" Total columns: {len(df.columns)}")
if null_binding == 0 and null_parent == 0:
print(f"[OK] Data integrity validation passed")
else:
print(f"[WARNING] Found null values, please check data source")
# 显示示例数据
print(f"\n[DATA] Sample data (first 5 rows):")
sample_cols = [unit_name_col, level_col, '绑定服务部门', '服务部门所属部门']
print(df[sample_cols].head(10).to_string(index=False))
return df
def main():
"""主函数"""
# 定义文件路径
base_path = Path(__file__).parent.parent
input_file = base_path / 'data' / '法律风险提示管理员名单_合并-20260309.xlsx'
output_file = base_path / 'data' / '法律风险提示管理员名单_合并-20260309_supplemented.xlsx'
print("=" * 80)
print("Excel服务部门层级信息补充工具")
print("=" * 80)
# 检查输入文件是否存在
if not input_file.exists():
print(f"[ERROR] Input file does not exist: {input_file}")
sys.exit(1)
# 执行补充处理
df = supplement_excel_file(str(input_file), str(output_file))
print("\n" + "=" * 80)
print("[OK] Processing completed!")
print(f"[FILE] Output file: {output_file}")
print("=" * 80)
if __name__ == '__main__':
main()