fs-lawrisk/lawrisk/utils/migrate_unit_permission.py

237 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""数据库迁移脚本:添加单位级别和文件绑定字段。
执行数据库迁移以支持单位权限优化功能。
"""
from __future__ import annotations
import logging
import os
from typing import Any, Dict
from lawrisk.services import licensing_repo as lic_repo
logger = logging.getLogger(__name__)
def _lic_pg_conn(autocommit: bool = False):
"""获取licensing_risks数据库连接。"""
return lic_repo._lic_pg_conn(autocommit=autocommit)
def check_migration_status() -> Dict[str, Any]:
"""检查迁移状态。
Returns:
Dict containing migration status and column information
"""
with _lic_pg_conn() as conn:
cur = conn.cursor()
# 检查service_departments表是否包含unit_level字段
cur.execute("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'service_departments'
AND table_schema = 'public'
ORDER BY ordinal_position
""")
service_dept_columns = {
row[0]: {"type": row[1], "nullable": row[2]}
for row in cur.fetchall()
}
# 检查permit_sources表是否包含uploader_department_id和bound_department_id字段
cur.execute("""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'permit_sources'
AND table_schema = 'public'
ORDER BY ordinal_position
""")
permit_sources_columns = {
row[0]: {"type": row[1], "nullable": row[2]}
for row in cur.fetchall()
}
# 检查索引是否存在
cur.execute("""
SELECT indexname
FROM pg_indexes
WHERE tablename = 'service_departments'
AND schemaname = 'public'
""")
service_dept_indexes = [row[0] for row in cur.fetchall()]
cur.execute("""
SELECT indexname
FROM pg_indexes
WHERE tablename = 'permit_sources'
AND schemaname = 'public'
""")
permit_sources_indexes = [row[0] for row in cur.fetchall()]
# 检查现有数据的unit_level设置
try:
cur.execute("""
SELECT unit_level, COUNT(*) as count
FROM service_departments
GROUP BY unit_level
ORDER BY unit_level
""")
unit_level_distribution = {
row[0]: row[1] for row in cur.fetchall()
}
except Exception:
# unit_level字段可能不存在返回空分布
unit_level_distribution = {}
return {
"service_departments_columns": service_dept_columns,
"permit_sources_columns": permit_sources_columns,
"service_departments_indexes": service_dept_indexes,
"permit_sources_indexes": permit_sources_indexes,
"unit_level_distribution": unit_level_distribution,
"migration_complete": (
"unit_level" in service_dept_columns and
"uploader_department_id" in permit_sources_columns and
"bound_department_id" in permit_sources_columns
)
}
def execute_migration() -> Dict[str, Any]:
"""执行数据库迁移。
Returns:
Dict containing migration results
"""
migration_sql = """
-- 1. 为service_departments表添加unit_level字段
ALTER TABLE service_departments
ADD COLUMN IF NOT EXISTS unit_level VARCHAR(20) DEFAULT 'unit'
CHECK (unit_level IN ('admin', 'municipal', 'district', 'unit'));
-- 2. 为service_departments表添加allowed_regions字段市级单位可访问的行政区
ALTER TABLE service_departments
ADD COLUMN IF NOT EXISTS allowed_regions TEXT;
-- 3. 为permit_sources表添加uploader_department_id字段
ALTER TABLE permit_sources
ADD COLUMN IF NOT EXISTS uploader_department_id uuid REFERENCES service_departments(id);
-- 4. 为permit_sources表添加bound_department_id字段
ALTER TABLE permit_sources
ADD COLUMN IF NOT EXISTS bound_department_id uuid REFERENCES service_departments(id);
-- 5. 创建索引以提高查询性能
CREATE INDEX IF NOT EXISTS idx_service_dept_unit_level
ON service_departments(unit_level);
CREATE INDEX IF NOT EXISTS idx_service_dept_parent_level
ON service_departments(parent_id, unit_level);
CREATE INDEX IF NOT EXISTS idx_permit_sources_bound_dept
ON permit_sources(bound_department_id);
CREATE INDEX IF NOT EXISTS idx_permit_sources_uploader
ON permit_sources(uploader_department_id);
"""
data_migration_sql = """
-- 数据迁移根据现有grade设置unit_level
-- 市局管理员grade >= 90设置为admin
UPDATE service_departments
SET unit_level = 'admin'
WHERE grade >= 90;
-- 根节点且grade < 90的设置为district区局子管理员
UPDATE service_departments
SET unit_level = 'district'
WHERE parent_id IS NULL AND grade < 90;
-- 有父节点且grade < 90的设置为unit区级单位
UPDATE service_departments
SET unit_level = 'unit'
WHERE parent_id IS NOT NULL AND grade < 90;
-- 为unit_level字段添加NOT NULL约束在更新数据后
ALTER TABLE service_departments
ALTER COLUMN unit_level SET NOT NULL;
"""
results = {
"success": False,
"schema_migration": False,
"data_migration": False,
"errors": []
}
try:
# 执行架构迁移
with _lic_pg_conn(autocommit=True) as conn:
cur = conn.cursor()
logger.info("开始执行架构迁移...")
cur.execute(migration_sql)
results["schema_migration"] = True
logger.info("架构迁移完成")
# 执行数据迁移
with _lic_pg_conn(autocommit=True) as conn:
cur = conn.cursor()
logger.info("开始执行数据迁移...")
cur.execute(data_migration_sql)
results["data_migration"] = True
logger.info("数据迁移完成")
results["success"] = True
logger.info("数据库迁移全部完成")
except Exception as e:
logger.error(f"数据库迁移失败: {str(e)}")
results["errors"].append(str(e))
return results
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
print("=" * 60)
print("许可管理单位权限优化 - 数据库迁移工具")
print("=" * 60)
# 检查迁移状态
print("\n检查当前迁移状态...")
status = check_migration_status()
print(f"\n迁移状态: {'已完成' if status['migration_complete'] else '未完成'}")
print(f"\nservice_departments表字段数量: {len(status['service_departments_columns'])}")
print(f"permit_sources表字段数量: {len(status['permit_sources_columns'])}")
print(f"\n单位级别分布:")
for level, count in status['unit_level_distribution'].items():
print(f" - {level}: {count}")
# 执行迁移(如果尚未完成)
if not status['migration_complete']:
print("\n" + "=" * 60)
response = input("数据库尚未完全迁移,是否执行迁移?(y/N): ").strip().lower()
if response == 'y':
print("\n开始执行数据库迁移...")
results = execute_migration()
if results['success']:
print("\n✓ 数据库迁移成功完成!")
print(" - 架构迁移: ✓")
print(" - 数据迁移: ✓")
else:
print("\n✗ 数据库迁移失败!")
for error in results['errors']:
print(f" - 错误: {error}")
else:
print("\n跳过数据库迁移")
else:
print("\n数据库已完全迁移,无需重复执行")
print("\n" + "=" * 60)