305 lines
11 KiB
Python
305 lines
11 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
生成许可事项三位一体对比表 - 增强版
|
||
功能:
|
||
1. 物理文件追踪:记录原始文件名
|
||
2. 模糊匹配:将带有编号和部门的文件名映射到标准事项名
|
||
3. 状态预警:精确定位漏导入的文件
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import pandas as pd
|
||
from pathlib import Path
|
||
from openpyxl import Workbook
|
||
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
||
import pg8000.dbapi as pg
|
||
from lawrisk.utils.env_loader import load_env
|
||
|
||
# 加载环境变量
|
||
load_env()
|
||
|
||
def get_db_connection():
|
||
conn_params = {
|
||
"host": os.getenv("LIC_PG_HOST", "172.24.240.1"),
|
||
"port": int(os.getenv("LIC_PG_PORT", "5432")),
|
||
"user": os.getenv("LIC_PG_USER", "postgres"),
|
||
"password": os.getenv("LIC_PG_PASSWORD", ""),
|
||
"database": os.getenv("LIC_PG_DATABASE", "licensing_risks"),
|
||
}
|
||
return pg.connect(**conn_params)
|
||
|
||
def clean_name(name):
|
||
"""提取核心事项名称用于匹配,优化括号处理以区分细分事项"""
|
||
if not name: return ""
|
||
# 移除文件扩展名
|
||
name = os.path.splitext(name)[0]
|
||
# 移除开头的数字和干扰项
|
||
name = re.sub(r'^\d+[\s\-\.]*', '', name)
|
||
name = re.sub(r'风险提示表[\(\(].*?[\)\)]', '', name)
|
||
name = name.replace('风险提示表', '').replace('清单', '').replace('事项', '')
|
||
|
||
# 移除已知无关后缀
|
||
junk = ['_转自XLS', '_v2', '_final', '_市级', '(转自XLS)', '(1)', '(2)', '-职建科', '-就业科提出修改意见', '(以此件为准)']
|
||
for j in junk:
|
||
name = name.replace(j, '')
|
||
|
||
# 移除末尾的部门信息
|
||
name = re.sub(r'[,,][\u4e00-\u9fa5]+(部门|局).*?$', '', name)
|
||
|
||
# 智能处理括号:保留含区分性关键词的内容,移除无效信息(如纯数字、部门名)
|
||
def cleanup_brackets(text):
|
||
def repl(match):
|
||
content = match.group(1).strip()
|
||
# 必须保留的关键词,用于区分“告知承诺”与“非告知承诺”等
|
||
keep_keywords = ["告知承诺", "电子烟", "预包装", "经营", "许可", "备案", "小作坊", "生产", "销售", "设置审批", "执业登记"]
|
||
if any(kw in content for kw in keep_keywords):
|
||
return f"({content})"
|
||
return ""
|
||
return re.sub(r'[\(\(](.*?)[\)\)]', repl, text)
|
||
|
||
name = cleanup_brackets(name)
|
||
|
||
# 移除多余标点
|
||
name = name.strip(',,。 ))))')
|
||
return name.strip()
|
||
|
||
|
||
def get_client_items_map():
|
||
"""获取客户需求事项清单"""
|
||
file_path = "需要显示在系统上面的事项.xlsx"
|
||
if not os.path.exists(file_path):
|
||
return {}
|
||
df = pd.read_excel(file_path)
|
||
# 假设第一列是事项名称
|
||
items = {}
|
||
for val in df.iloc[:, 0].dropna().astype(str):
|
||
name = val.strip()
|
||
items[name] = {"original": name, "cleaned": clean_name(name)}
|
||
return items
|
||
|
||
import json
|
||
|
||
def get_received_files_map():
|
||
"""扫描文件夹,从 JSON 文件内部读取许可名称,建立 核心名 -> 原始文件名 的映射"""
|
||
folder_path = "市级初版-20251219/许可风险提示"
|
||
if not os.path.exists(folder_path):
|
||
return {}, {}
|
||
|
||
files_map = {} # cleaned_name -> [filename1, filename2] 用于匹配
|
||
file_to_main_name = {} # filename -> JSON内部找到的第一个事项全称
|
||
|
||
json_files = [f for f in os.listdir(folder_path) if f.endswith('.json')]
|
||
print(f"检测到 {len(json_files)} 个 JSON 文件")
|
||
|
||
for filename in json_files:
|
||
file_path = os.path.join(folder_path, filename)
|
||
try:
|
||
with open(file_path, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
permit_names = []
|
||
|
||
# 遍历所有 sheet 和所有行
|
||
sheets = data.get('sheets', {})
|
||
for sheet_data in sheets.values():
|
||
for row in sheet_data.get('rows', []):
|
||
p_name = row.get('permit_name')
|
||
if p_name:
|
||
p_name = p_name.strip()
|
||
if p_name and p_name not in permit_names:
|
||
permit_names.append(p_name)
|
||
|
||
if permit_names:
|
||
# 记录该文件的代表性名称(第一个)
|
||
file_to_main_name[filename] = permit_names[0]
|
||
# 记录该文件包含的所有名称,用于后续匹配
|
||
for p_name in permit_names:
|
||
cleaned = clean_name(p_name)
|
||
if cleaned:
|
||
if cleaned not in files_map:
|
||
files_map[cleaned] = []
|
||
if filename not in files_map[cleaned]:
|
||
files_map[cleaned].append(filename)
|
||
else:
|
||
# 如果没找到名称,退回到文件名
|
||
rep_name = clean_name(filename)
|
||
file_to_main_name[filename] = rep_name
|
||
if rep_name:
|
||
if rep_name not in files_map:
|
||
files_map[rep_name] = []
|
||
files_map[rep_name].append(filename)
|
||
|
||
except Exception as e:
|
||
print(f"警告: 解析文件 {filename} 失败: {e}")
|
||
|
||
return files_map, file_to_main_name
|
||
|
||
|
||
def get_database_items():
|
||
"""获取数据库中的事项"""
|
||
try:
|
||
conn = get_db_connection()
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT DISTINCT name FROM permits WHERE name IS NOT NULL")
|
||
db_items = {row[0].strip() for row in cursor.fetchall()}
|
||
conn.close()
|
||
return db_items
|
||
except:
|
||
return set()
|
||
|
||
def main():
|
||
print("正在搜集三维度数据...")
|
||
|
||
# 定义需要排除的测试或无效事项
|
||
BLACKLIST = {
|
||
"测试许可_SearchTest",
|
||
"演出经纪机构变更",
|
||
"演出经纪机构从事营业性演出经营活动审批"
|
||
}
|
||
|
||
client_map = get_client_items_map() # 标准名 -> {original, cleaned}
|
||
received_map, file_to_main_name = get_received_files_map() # 清理后的名 -> [原始文件名s], 文件名 -> 代表性名称
|
||
db_items = get_database_items() # 数据库里的标准名
|
||
|
||
# 过滤黑名单
|
||
db_items = {name for name in db_items if name not in BLACKLIST}
|
||
# 过滤代表名称中的黑名单
|
||
file_to_main_name = {f: n for f, n in file_to_main_name.items() if n not in BLACKLIST}
|
||
|
||
# 建立数据库名的清理后映射
|
||
db_cleaned_map = {clean_name(name): name for name in db_items}
|
||
|
||
results = []
|
||
processed_cleaned_files = set()
|
||
used_files = set()
|
||
|
||
# 1. 以客户列表为基准
|
||
for std_name, info in client_map.items():
|
||
if std_name in BLACKLIST:
|
||
continue
|
||
|
||
cleaned = info['cleaned']
|
||
|
||
# 查找物理文件
|
||
matched_files = received_map.get(cleaned, [])
|
||
if matched_files:
|
||
processed_cleaned_files.add(cleaned)
|
||
used_files.update(matched_files) # 标记这些文件已被占用
|
||
|
||
# 查找数据库
|
||
in_db = std_name in db_items
|
||
db_canonical_name = std_name if in_db else db_cleaned_map.get(cleaned, "")
|
||
|
||
status = ""
|
||
if matched_files and db_canonical_name:
|
||
status = "完整(三方都有)"
|
||
elif matched_files and not db_canonical_name:
|
||
status = "待入库"
|
||
elif not matched_files and db_canonical_name:
|
||
status = "缺少源文件"
|
||
else:
|
||
status = "缺少文件和数据"
|
||
|
||
results.append({
|
||
"事项名称": std_name,
|
||
"客户提供 (Excel)": "✔",
|
||
"我们接收到 (Files)": "✔" if matched_files else "",
|
||
"系统已存在 (DB)": "✔" if db_canonical_name else "",
|
||
"状态说明": status
|
||
})
|
||
|
||
# 2. 处理那些客户清单里没有被“消耗”的物理文件 (真正的事项名称)
|
||
# 只要这个文件没在第1步被匹配到任何一个客户事项,就把它作为一个额外项列出
|
||
for filename, internal_name in file_to_main_name.items():
|
||
if filename not in used_files:
|
||
cleaned = clean_name(internal_name)
|
||
|
||
# 二次检查:防止清理后的名在 client_map 中已经存在
|
||
is_new = True
|
||
for info in client_map.values():
|
||
if info['cleaned'] == cleaned:
|
||
is_new = False
|
||
break
|
||
|
||
if not is_new:
|
||
used_files.add(filename) # 实际上是匹配到了但没在 used_files 里,修正下
|
||
continue
|
||
|
||
db_name = db_cleaned_map.get(cleaned, "")
|
||
|
||
results.append({
|
||
"事项名称": internal_name,
|
||
"客户提供 (Excel)": "",
|
||
"我们接收到 (Files)": "✔",
|
||
"系统已存在 (DB)": "✔" if db_name else "",
|
||
"状态说明": "未入库且客户未要求" if not db_name else "已接收且已入库(客户未要求)"
|
||
})
|
||
used_files.add(filename)
|
||
processed_cleaned_files.add(cleaned)
|
||
|
||
|
||
# 3. 处理那些仅在数据库里,其他地方都没有的事项
|
||
for db_name in db_items:
|
||
db_cleaned = clean_name(db_name)
|
||
# 检查这个事项是否已经在前面的步骤中被记录过
|
||
if db_cleaned not in processed_cleaned_files:
|
||
results.append({
|
||
"事项名称": db_name,
|
||
"客户提供 (Excel)": "",
|
||
"我们接收到 (Files)": "",
|
||
"系统已存在 (DB)": "✔",
|
||
"状态说明": "仅数据库有"
|
||
})
|
||
|
||
|
||
|
||
|
||
# 保存为 Excel
|
||
df = pd.DataFrame(results)
|
||
output_file = "许可事项三位一体对比表_v3.xlsx"
|
||
|
||
# 使用 openpyxl 进行美化保存
|
||
wb = Workbook()
|
||
ws = wb.active
|
||
ws.title = "三位一体对比表"
|
||
|
||
# 样式定义
|
||
thin = Side(border_style="thin", color="000000")
|
||
border = Border(top=thin, left=thin, right=thin, bottom=thin)
|
||
header_fill = PatternFill(start_color="D9D9D9", end_color="D9D9D9", fill_type="solid")
|
||
|
||
headers = list(df.columns)
|
||
for col, header in enumerate(headers, 1):
|
||
cell = ws.cell(row=1, column=col, value=header)
|
||
cell.fill = header_fill
|
||
cell.font = Font(bold=True)
|
||
cell.alignment = Alignment(horizontal="center", vertical="center")
|
||
cell.border = border
|
||
|
||
for r_idx, row_data in enumerate(results, 2):
|
||
for c_idx, (key, value) in enumerate(row_data.items(), 1):
|
||
cell = ws.cell(row=r_idx, column=c_idx, value=value)
|
||
cell.border = border
|
||
cell.alignment = Alignment(wrap_text=True, vertical="center")
|
||
|
||
# 状态说明列如果含有“✔”则居中
|
||
if key in ["客户提供 (Excel)", "我们接收到 (Files)", "系统已存在 (DB)"] and "✔" in str(value):
|
||
cell.alignment = Alignment(horizontal="center", vertical="center")
|
||
|
||
|
||
# 调整列宽
|
||
ws.column_dimensions['A'].width = 50
|
||
ws.column_dimensions['B'].width = 15
|
||
ws.column_dimensions['C'].width = 30
|
||
ws.column_dimensions['D'].width = 30
|
||
ws.column_dimensions['E'].width = 25
|
||
|
||
ws.freeze_panes = "A2"
|
||
wb.save(output_file)
|
||
print(f"成功生成对比表: {output_file}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|