"""数据库迁移脚本:添加单位级别和文件绑定字段。 执行数据库迁移以支持单位权限优化功能。 """ from __future__ import annotations import logging import os from typing import Any, Dict from lawrisk.services import licensing_repo as lic_repo logger = logging.getLogger(__name__) def _lic_pg_conn(autocommit: bool = False): """获取licensing_risks数据库连接。""" return lic_repo._lic_pg_conn(autocommit=autocommit) def check_migration_status() -> Dict[str, Any]: """检查迁移状态。 Returns: Dict containing migration status and column information """ with _lic_pg_conn() as conn: cur = conn.cursor() # 检查service_departments表是否包含unit_level字段 cur.execute(""" SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = 'service_departments' AND table_schema = 'public' ORDER BY ordinal_position """) service_dept_columns = { row[0]: {"type": row[1], "nullable": row[2]} for row in cur.fetchall() } # 检查permit_sources表是否包含uploader_department_id和bound_department_id字段 cur.execute(""" SELECT column_name, data_type, is_nullable FROM information_schema.columns WHERE table_name = 'permit_sources' AND table_schema = 'public' ORDER BY ordinal_position """) permit_sources_columns = { row[0]: {"type": row[1], "nullable": row[2]} for row in cur.fetchall() } # 检查索引是否存在 cur.execute(""" SELECT indexname FROM pg_indexes WHERE tablename = 'service_departments' AND schemaname = 'public' """) service_dept_indexes = [row[0] for row in cur.fetchall()] cur.execute(""" SELECT indexname FROM pg_indexes WHERE tablename = 'permit_sources' AND schemaname = 'public' """) permit_sources_indexes = [row[0] for row in cur.fetchall()] # 检查现有数据的unit_level设置 try: cur.execute(""" SELECT unit_level, COUNT(*) as count FROM service_departments GROUP BY unit_level ORDER BY unit_level """) unit_level_distribution = { row[0]: row[1] for row in cur.fetchall() } except Exception: # unit_level字段可能不存在,返回空分布 unit_level_distribution = {} return { "service_departments_columns": service_dept_columns, "permit_sources_columns": permit_sources_columns, "service_departments_indexes": service_dept_indexes, "permit_sources_indexes": permit_sources_indexes, "unit_level_distribution": unit_level_distribution, "migration_complete": ( "unit_level" in service_dept_columns and "uploader_department_id" in permit_sources_columns and "bound_department_id" in permit_sources_columns ) } def execute_migration() -> Dict[str, Any]: """执行数据库迁移。 Returns: Dict containing migration results """ migration_sql = """ -- 1. 为service_departments表添加unit_level字段 ALTER TABLE service_departments ADD COLUMN IF NOT EXISTS unit_level VARCHAR(20) DEFAULT 'unit' CHECK (unit_level IN ('admin', 'municipal', 'district', 'unit')); -- 2. 为service_departments表添加allowed_regions字段(市级单位可访问的行政区) ALTER TABLE service_departments ADD COLUMN IF NOT EXISTS allowed_regions TEXT; -- 3. 为permit_sources表添加uploader_department_id字段 ALTER TABLE permit_sources ADD COLUMN IF NOT EXISTS uploader_department_id uuid REFERENCES service_departments(id); -- 4. 为permit_sources表添加bound_department_id字段 ALTER TABLE permit_sources ADD COLUMN IF NOT EXISTS bound_department_id uuid REFERENCES service_departments(id); -- 5. 创建索引以提高查询性能 CREATE INDEX IF NOT EXISTS idx_service_dept_unit_level ON service_departments(unit_level); CREATE INDEX IF NOT EXISTS idx_service_dept_parent_level ON service_departments(parent_id, unit_level); CREATE INDEX IF NOT EXISTS idx_permit_sources_bound_dept ON permit_sources(bound_department_id); CREATE INDEX IF NOT EXISTS idx_permit_sources_uploader ON permit_sources(uploader_department_id); """ data_migration_sql = """ -- 数据迁移:根据现有grade设置unit_level -- 市局管理员(grade >= 90)设置为admin UPDATE service_departments SET unit_level = 'admin' WHERE grade >= 90; -- 根节点且grade < 90的设置为district(区局子管理员) UPDATE service_departments SET unit_level = 'district' WHERE parent_id IS NULL AND grade < 90; -- 有父节点且grade < 90的设置为unit(区级单位) UPDATE service_departments SET unit_level = 'unit' WHERE parent_id IS NOT NULL AND grade < 90; -- 为unit_level字段添加NOT NULL约束(在更新数据后) ALTER TABLE service_departments ALTER COLUMN unit_level SET NOT NULL; """ results = { "success": False, "schema_migration": False, "data_migration": False, "errors": [] } try: # 执行架构迁移 with _lic_pg_conn(autocommit=True) as conn: cur = conn.cursor() logger.info("开始执行架构迁移...") cur.execute(migration_sql) results["schema_migration"] = True logger.info("架构迁移完成") # 执行数据迁移 with _lic_pg_conn(autocommit=True) as conn: cur = conn.cursor() logger.info("开始执行数据迁移...") cur.execute(data_migration_sql) results["data_migration"] = True logger.info("数据迁移完成") results["success"] = True logger.info("数据库迁移全部完成") except Exception as e: logger.error(f"数据库迁移失败: {str(e)}") results["errors"].append(str(e)) return results if __name__ == "__main__": logging.basicConfig(level=logging.INFO) print("=" * 60) print("许可管理单位权限优化 - 数据库迁移工具") print("=" * 60) # 检查迁移状态 print("\n检查当前迁移状态...") status = check_migration_status() print(f"\n迁移状态: {'已完成' if status['migration_complete'] else '未完成'}") print(f"\nservice_departments表字段数量: {len(status['service_departments_columns'])}") print(f"permit_sources表字段数量: {len(status['permit_sources_columns'])}") print(f"\n单位级别分布:") for level, count in status['unit_level_distribution'].items(): print(f" - {level}: {count} 个") # 执行迁移(如果尚未完成) if not status['migration_complete']: print("\n" + "=" * 60) response = input("数据库尚未完全迁移,是否执行迁移?(y/N): ").strip().lower() if response == 'y': print("\n开始执行数据库迁移...") results = execute_migration() if results['success']: print("\n✓ 数据库迁移成功完成!") print(" - 架构迁移: ✓") print(" - 数据迁移: ✓") else: print("\n✗ 数据库迁移失败!") for error in results['errors']: print(f" - 错误: {error}") else: print("\n跳过数据库迁移") else: print("\n数据库已完全迁移,无需重复执行") print("\n" + "=" * 60)