#!/usr/bin/env python # -*- coding: utf-8 -*- """ 生成许可事项三位一体对比表 - 增强版 功能: 1. 物理文件追踪:记录原始文件名 2. 模糊匹配:将带有编号和部门的文件名映射到标准事项名 3. 状态预警:精确定位漏导入的文件 """ import os import re import pandas as pd from pathlib import Path from openpyxl import Workbook from openpyxl.styles import Font, PatternFill, Alignment, Border, Side import pg8000.dbapi as pg from lawrisk.utils.env_loader import load_env # 加载环境变量 load_env() def get_db_connection(): conn_params = { "host": os.getenv("LIC_PG_HOST", "172.24.240.1"), "port": int(os.getenv("LIC_PG_PORT", "5432")), "user": os.getenv("LIC_PG_USER", "postgres"), "password": os.getenv("LIC_PG_PASSWORD", ""), "database": os.getenv("LIC_PG_DATABASE", "licensing_risks"), } return pg.connect(**conn_params) def clean_name(name): """提取核心事项名称用于匹配,优化括号处理以区分细分事项""" if not name: return "" # 移除文件扩展名 name = os.path.splitext(name)[0] # 移除开头的数字和干扰项 name = re.sub(r'^\d+[\s\-\.]*', '', name) name = re.sub(r'风险提示表[\(\(].*?[\)\)]', '', name) name = name.replace('风险提示表', '').replace('清单', '').replace('事项', '') # 移除已知无关后缀 junk = ['_转自XLS', '_v2', '_final', '_市级', '(转自XLS)', '(1)', '(2)', '-职建科', '-就业科提出修改意见', '(以此件为准)'] for j in junk: name = name.replace(j, '') # 移除末尾的部门信息 name = re.sub(r'[,,][\u4e00-\u9fa5]+(部门|局).*?$', '', name) # 智能处理括号:保留含区分性关键词的内容,移除无效信息(如纯数字、部门名) def cleanup_brackets(text): def repl(match): content = match.group(1).strip() # 必须保留的关键词,用于区分“告知承诺”与“非告知承诺”等 keep_keywords = ["告知承诺", "电子烟", "预包装", "经营", "许可", "备案", "小作坊", "生产", "销售", "设置审批", "执业登记"] if any(kw in content for kw in keep_keywords): return f"({content})" return "" return re.sub(r'[\(\(](.*?)[\)\)]', repl, text) name = cleanup_brackets(name) # 移除多余标点 name = name.strip(',,。 ))))') return name.strip() def get_client_items_map(): """获取客户需求事项清单""" file_path = "需要显示在系统上面的事项.xlsx" if not os.path.exists(file_path): return {} df = pd.read_excel(file_path) # 假设第一列是事项名称 items = {} for val in df.iloc[:, 0].dropna().astype(str): name = val.strip() items[name] = {"original": name, "cleaned": clean_name(name)} return items import json def get_received_files_map(): """扫描文件夹,从 JSON 文件内部读取许可名称,建立 核心名 -> 原始文件名 的映射""" folder_path = "市级初版-20251219/许可风险提示" if not os.path.exists(folder_path): return {}, {} files_map = {} # cleaned_name -> [filename1, filename2] 用于匹配 file_to_main_name = {} # filename -> JSON内部找到的第一个事项全称 json_files = [f for f in os.listdir(folder_path) if f.endswith('.json')] print(f"检测到 {len(json_files)} 个 JSON 文件") for filename in json_files: file_path = os.path.join(folder_path, filename) try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) permit_names = [] # 遍历所有 sheet 和所有行 sheets = data.get('sheets', {}) for sheet_data in sheets.values(): for row in sheet_data.get('rows', []): p_name = row.get('permit_name') if p_name: p_name = p_name.strip() if p_name and p_name not in permit_names: permit_names.append(p_name) if permit_names: # 记录该文件的代表性名称(第一个) file_to_main_name[filename] = permit_names[0] # 记录该文件包含的所有名称,用于后续匹配 for p_name in permit_names: cleaned = clean_name(p_name) if cleaned: if cleaned not in files_map: files_map[cleaned] = [] if filename not in files_map[cleaned]: files_map[cleaned].append(filename) else: # 如果没找到名称,退回到文件名 rep_name = clean_name(filename) file_to_main_name[filename] = rep_name if rep_name: if rep_name not in files_map: files_map[rep_name] = [] files_map[rep_name].append(filename) except Exception as e: print(f"警告: 解析文件 {filename} 失败: {e}") return files_map, file_to_main_name def get_database_items(): """获取数据库中的事项""" try: conn = get_db_connection() cursor = conn.cursor() cursor.execute("SELECT DISTINCT name FROM permits WHERE name IS NOT NULL") db_items = {row[0].strip() for row in cursor.fetchall()} conn.close() return db_items except: return set() def main(): print("正在搜集三维度数据...") # 定义需要排除的测试或无效事项 BLACKLIST = { "测试许可_SearchTest", "演出经纪机构变更", "演出经纪机构从事营业性演出经营活动审批" } client_map = get_client_items_map() # 标准名 -> {original, cleaned} received_map, file_to_main_name = get_received_files_map() # 清理后的名 -> [原始文件名s], 文件名 -> 代表性名称 db_items = get_database_items() # 数据库里的标准名 # 过滤黑名单 db_items = {name for name in db_items if name not in BLACKLIST} # 过滤代表名称中的黑名单 file_to_main_name = {f: n for f, n in file_to_main_name.items() if n not in BLACKLIST} # 建立数据库名的清理后映射 db_cleaned_map = {clean_name(name): name for name in db_items} results = [] processed_cleaned_files = set() used_files = set() # 1. 以客户列表为基准 for std_name, info in client_map.items(): if std_name in BLACKLIST: continue cleaned = info['cleaned'] # 查找物理文件 matched_files = received_map.get(cleaned, []) if matched_files: processed_cleaned_files.add(cleaned) used_files.update(matched_files) # 标记这些文件已被占用 # 查找数据库 in_db = std_name in db_items db_canonical_name = std_name if in_db else db_cleaned_map.get(cleaned, "") status = "" if matched_files and db_canonical_name: status = "完整(三方都有)" elif matched_files and not db_canonical_name: status = "待入库" elif not matched_files and db_canonical_name: status = "缺少源文件" else: status = "缺少文件和数据" results.append({ "事项名称": std_name, "客户提供 (Excel)": "✔", "我们接收到 (Files)": "✔" if matched_files else "", "系统已存在 (DB)": "✔" if db_canonical_name else "", "状态说明": status }) # 2. 处理那些客户清单里没有被“消耗”的物理文件 (真正的事项名称) # 只要这个文件没在第1步被匹配到任何一个客户事项,就把它作为一个额外项列出 for filename, internal_name in file_to_main_name.items(): if filename not in used_files: cleaned = clean_name(internal_name) # 二次检查:防止清理后的名在 client_map 中已经存在 is_new = True for info in client_map.values(): if info['cleaned'] == cleaned: is_new = False break if not is_new: used_files.add(filename) # 实际上是匹配到了但没在 used_files 里,修正下 continue db_name = db_cleaned_map.get(cleaned, "") results.append({ "事项名称": internal_name, "客户提供 (Excel)": "", "我们接收到 (Files)": "✔", "系统已存在 (DB)": "✔" if db_name else "", "状态说明": "未入库且客户未要求" if not db_name else "已接收且已入库(客户未要求)" }) used_files.add(filename) processed_cleaned_files.add(cleaned) # 3. 处理那些仅在数据库里,其他地方都没有的事项 for db_name in db_items: db_cleaned = clean_name(db_name) # 检查这个事项是否已经在前面的步骤中被记录过 if db_cleaned not in processed_cleaned_files: results.append({ "事项名称": db_name, "客户提供 (Excel)": "", "我们接收到 (Files)": "", "系统已存在 (DB)": "✔", "状态说明": "仅数据库有" }) # 保存为 Excel df = pd.DataFrame(results) output_file = "许可事项三位一体对比表_v3.xlsx" # 使用 openpyxl 进行美化保存 wb = Workbook() ws = wb.active ws.title = "三位一体对比表" # 样式定义 thin = Side(border_style="thin", color="000000") border = Border(top=thin, left=thin, right=thin, bottom=thin) header_fill = PatternFill(start_color="D9D9D9", end_color="D9D9D9", fill_type="solid") headers = list(df.columns) for col, header in enumerate(headers, 1): cell = ws.cell(row=1, column=col, value=header) cell.fill = header_fill cell.font = Font(bold=True) cell.alignment = Alignment(horizontal="center", vertical="center") cell.border = border for r_idx, row_data in enumerate(results, 2): for c_idx, (key, value) in enumerate(row_data.items(), 1): cell = ws.cell(row=r_idx, column=c_idx, value=value) cell.border = border cell.alignment = Alignment(wrap_text=True, vertical="center") # 状态说明列如果含有“✔”则居中 if key in ["客户提供 (Excel)", "我们接收到 (Files)", "系统已存在 (DB)"] and "✔" in str(value): cell.alignment = Alignment(horizontal="center", vertical="center") # 调整列宽 ws.column_dimensions['A'].width = 50 ws.column_dimensions['B'].width = 15 ws.column_dimensions['C'].width = 30 ws.column_dimensions['D'].width = 30 ws.column_dimensions['E'].width = 25 ws.freeze_panes = "A2" wb.save(output_file) print(f"成功生成对比表: {output_file}") if __name__ == "__main__": main()