237 lines
7.8 KiB
Python
237 lines
7.8 KiB
Python
"""数据库迁移脚本:添加单位级别和文件绑定字段。
|
||
|
||
执行数据库迁移以支持单位权限优化功能。
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
from typing import Any, Dict
|
||
|
||
from lawrisk.services import licensing_repo as lic_repo
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def _lic_pg_conn(autocommit: bool = False):
|
||
"""获取licensing_risks数据库连接。"""
|
||
return lic_repo._lic_pg_conn(autocommit=autocommit)
|
||
|
||
|
||
def check_migration_status() -> Dict[str, Any]:
|
||
"""检查迁移状态。
|
||
|
||
Returns:
|
||
Dict containing migration status and column information
|
||
"""
|
||
with _lic_pg_conn() as conn:
|
||
cur = conn.cursor()
|
||
|
||
# 检查service_departments表是否包含unit_level字段
|
||
cur.execute("""
|
||
SELECT column_name, data_type, is_nullable
|
||
FROM information_schema.columns
|
||
WHERE table_name = 'service_departments'
|
||
AND table_schema = 'public'
|
||
ORDER BY ordinal_position
|
||
""")
|
||
service_dept_columns = {
|
||
row[0]: {"type": row[1], "nullable": row[2]}
|
||
for row in cur.fetchall()
|
||
}
|
||
|
||
# 检查permit_sources表是否包含uploader_department_id和bound_department_id字段
|
||
cur.execute("""
|
||
SELECT column_name, data_type, is_nullable
|
||
FROM information_schema.columns
|
||
WHERE table_name = 'permit_sources'
|
||
AND table_schema = 'public'
|
||
ORDER BY ordinal_position
|
||
""")
|
||
permit_sources_columns = {
|
||
row[0]: {"type": row[1], "nullable": row[2]}
|
||
for row in cur.fetchall()
|
||
}
|
||
|
||
# 检查索引是否存在
|
||
cur.execute("""
|
||
SELECT indexname
|
||
FROM pg_indexes
|
||
WHERE tablename = 'service_departments'
|
||
AND schemaname = 'public'
|
||
""")
|
||
service_dept_indexes = [row[0] for row in cur.fetchall()]
|
||
|
||
cur.execute("""
|
||
SELECT indexname
|
||
FROM pg_indexes
|
||
WHERE tablename = 'permit_sources'
|
||
AND schemaname = 'public'
|
||
""")
|
||
permit_sources_indexes = [row[0] for row in cur.fetchall()]
|
||
|
||
# 检查现有数据的unit_level设置
|
||
try:
|
||
cur.execute("""
|
||
SELECT unit_level, COUNT(*) as count
|
||
FROM service_departments
|
||
GROUP BY unit_level
|
||
ORDER BY unit_level
|
||
""")
|
||
unit_level_distribution = {
|
||
row[0]: row[1] for row in cur.fetchall()
|
||
}
|
||
except Exception:
|
||
# unit_level字段可能不存在,返回空分布
|
||
unit_level_distribution = {}
|
||
|
||
return {
|
||
"service_departments_columns": service_dept_columns,
|
||
"permit_sources_columns": permit_sources_columns,
|
||
"service_departments_indexes": service_dept_indexes,
|
||
"permit_sources_indexes": permit_sources_indexes,
|
||
"unit_level_distribution": unit_level_distribution,
|
||
"migration_complete": (
|
||
"unit_level" in service_dept_columns and
|
||
"uploader_department_id" in permit_sources_columns and
|
||
"bound_department_id" in permit_sources_columns
|
||
)
|
||
}
|
||
|
||
|
||
def execute_migration() -> Dict[str, Any]:
|
||
"""执行数据库迁移。
|
||
|
||
Returns:
|
||
Dict containing migration results
|
||
"""
|
||
migration_sql = """
|
||
-- 1. 为service_departments表添加unit_level字段
|
||
ALTER TABLE service_departments
|
||
ADD COLUMN IF NOT EXISTS unit_level VARCHAR(20) DEFAULT 'unit'
|
||
CHECK (unit_level IN ('admin', 'municipal', 'district', 'unit'));
|
||
|
||
-- 2. 为service_departments表添加allowed_regions字段(市级单位可访问的行政区)
|
||
ALTER TABLE service_departments
|
||
ADD COLUMN IF NOT EXISTS allowed_regions TEXT;
|
||
|
||
-- 3. 为permit_sources表添加uploader_department_id字段
|
||
ALTER TABLE permit_sources
|
||
ADD COLUMN IF NOT EXISTS uploader_department_id uuid REFERENCES service_departments(id);
|
||
|
||
-- 4. 为permit_sources表添加bound_department_id字段
|
||
ALTER TABLE permit_sources
|
||
ADD COLUMN IF NOT EXISTS bound_department_id uuid REFERENCES service_departments(id);
|
||
|
||
-- 5. 创建索引以提高查询性能
|
||
CREATE INDEX IF NOT EXISTS idx_service_dept_unit_level
|
||
ON service_departments(unit_level);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_service_dept_parent_level
|
||
ON service_departments(parent_id, unit_level);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_permit_sources_bound_dept
|
||
ON permit_sources(bound_department_id);
|
||
|
||
CREATE INDEX IF NOT EXISTS idx_permit_sources_uploader
|
||
ON permit_sources(uploader_department_id);
|
||
"""
|
||
|
||
data_migration_sql = """
|
||
-- 数据迁移:根据现有grade设置unit_level
|
||
-- 市局管理员(grade >= 90)设置为admin
|
||
UPDATE service_departments
|
||
SET unit_level = 'admin'
|
||
WHERE grade >= 90;
|
||
|
||
-- 根节点且grade < 90的设置为district(区局子管理员)
|
||
UPDATE service_departments
|
||
SET unit_level = 'district'
|
||
WHERE parent_id IS NULL AND grade < 90;
|
||
|
||
-- 有父节点且grade < 90的设置为unit(区级单位)
|
||
UPDATE service_departments
|
||
SET unit_level = 'unit'
|
||
WHERE parent_id IS NOT NULL AND grade < 90;
|
||
|
||
-- 为unit_level字段添加NOT NULL约束(在更新数据后)
|
||
ALTER TABLE service_departments
|
||
ALTER COLUMN unit_level SET NOT NULL;
|
||
"""
|
||
|
||
results = {
|
||
"success": False,
|
||
"schema_migration": False,
|
||
"data_migration": False,
|
||
"errors": []
|
||
}
|
||
|
||
try:
|
||
# 执行架构迁移
|
||
with _lic_pg_conn(autocommit=True) as conn:
|
||
cur = conn.cursor()
|
||
logger.info("开始执行架构迁移...")
|
||
cur.execute(migration_sql)
|
||
results["schema_migration"] = True
|
||
logger.info("架构迁移完成")
|
||
|
||
# 执行数据迁移
|
||
with _lic_pg_conn(autocommit=True) as conn:
|
||
cur = conn.cursor()
|
||
logger.info("开始执行数据迁移...")
|
||
cur.execute(data_migration_sql)
|
||
results["data_migration"] = True
|
||
logger.info("数据迁移完成")
|
||
|
||
results["success"] = True
|
||
logger.info("数据库迁移全部完成")
|
||
|
||
except Exception as e:
|
||
logger.error(f"数据库迁移失败: {str(e)}")
|
||
results["errors"].append(str(e))
|
||
|
||
return results
|
||
|
||
|
||
if __name__ == "__main__":
|
||
logging.basicConfig(level=logging.INFO)
|
||
|
||
print("=" * 60)
|
||
print("许可管理单位权限优化 - 数据库迁移工具")
|
||
print("=" * 60)
|
||
|
||
# 检查迁移状态
|
||
print("\n检查当前迁移状态...")
|
||
status = check_migration_status()
|
||
|
||
print(f"\n迁移状态: {'已完成' if status['migration_complete'] else '未完成'}")
|
||
print(f"\nservice_departments表字段数量: {len(status['service_departments_columns'])}")
|
||
print(f"permit_sources表字段数量: {len(status['permit_sources_columns'])}")
|
||
|
||
print(f"\n单位级别分布:")
|
||
for level, count in status['unit_level_distribution'].items():
|
||
print(f" - {level}: {count} 个")
|
||
|
||
# 执行迁移(如果尚未完成)
|
||
if not status['migration_complete']:
|
||
print("\n" + "=" * 60)
|
||
response = input("数据库尚未完全迁移,是否执行迁移?(y/N): ").strip().lower()
|
||
if response == 'y':
|
||
print("\n开始执行数据库迁移...")
|
||
results = execute_migration()
|
||
|
||
if results['success']:
|
||
print("\n✓ 数据库迁移成功完成!")
|
||
print(" - 架构迁移: ✓")
|
||
print(" - 数据迁移: ✓")
|
||
else:
|
||
print("\n✗ 数据库迁移失败!")
|
||
for error in results['errors']:
|
||
print(f" - 错误: {error}")
|
||
else:
|
||
print("\n跳过数据库迁移")
|
||
else:
|
||
print("\n数据库已完全迁移,无需重复执行")
|
||
|
||
print("\n" + "=" * 60)
|