Compare commits

...

10 Commits

Author SHA1 Message Date
Codex Agent 26a7b531f3 Update auth flows and admin tools 2026-03-19 15:14:42 +08:00
Codex Agent d97c7ca086 fix: 优化组织架构显示顺序,解决市级部门层级显示问题
修改内容:
- 后端:修改 service_department_tree 排序逻辑,实现叶子节点优先显示
- 前端:添加组织架构渲染调试日志,便于排查层级问题
- 文档:添加前端调试指南、快速参考和实施总结文档

排序规则:
1. 无下属单位的部门(叶子节点)排在前面
2. 有下属单位的部门(父节点)排在后面
3. 同类节点按名称排序

预期效果:
- "佛山交通运输局"等市级部门不再被误认为区级部门的子节点
- 视觉层次更清晰,减少用户误解
- 层级关系保持不变(都是 Level 1)

相关文件:
- lawrisk/services/licensing_repo.py
- static/super_admin.html
- FRONTEND_DEBUG_GUIDE.md
- HIERARCHY_DEBUG_QUICK_REF.md
- IMPLEMENTATION_SUMMARY.md
- SORTING_FIX_SUMMARY.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-10 19:51:25 +08:00
Codex Agent 616cac2c2e feat: 实施严格部门权限控制并修复导入功能
主要修改:
- 添加严格的部门层级权限控制(只看自己及下级部门,不看父级)
- 在 filter_permits_advanced() 添加 expand_department_family 参数
- 修复许可导入时的部门自动绑定功能
- 在 API 端点添加用户认证和部门权限过滤

技术细节:
- lawrisk/api/v2.py: 添加 @login_required 和部门权限过滤逻辑
- lawrisk/services/licensing_repo.py: 添加 expand_department_family 参数控制部门扩展行为
- static/db_admin.html: 修复导入功能,自动绑定到用户部门
- lawrisk/api/auth.py: 添加 is_superuser 权限检查

影响范围:
- 用户现在只能看到自己部门及下级部门上传的许可事项
- 新导入的许可事项会自动绑定到当前用户的部门
- 超级管理员不受影响

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-10 19:06:06 +08:00
Codex Agent b0590fda30 feat: enhance theme and permit listing with filtering and risk counts
- Filter out "不涉及" (not applicable) theme from list_all_themes()
- Add risk_count column to list_unbound_permits() with aggregated risk counts
- Improve unbound permits API to display risk count per permit-region combination

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 13:52:57 +08:00
Codex Agent fe911592e0 fix: resolve server deployment image 404 errors and enhance admin UI
- Add dedicated image serving API endpoint (/admin/images/<filename>) with security whitelist
- Update image paths from /static/ to /fs-ai-asistant/api/workflow/lawrisk/admin/images/
- Add permit import sample file download endpoint
- Enhance import wizard UI with template/sample preview section
- Add risk count column to unbound permits table
- Filter out "不涉及" (not applicable) theme from theme list
- Improve permit import UX with better visual organization

This ensures images load correctly in server deployments (nginx, gunicorn) by using
the same API prefix as other admin resources, avoiding static file routing issues.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-04 13:51:20 +08:00
Codex Agent 347af34bfc Refine import wizard: simplify permit details, add image zoom, and enhance risk metadata display (legal basis, document no, summary) 2026-01-27 15:33:33 +08:00
Codex Agent e7da819fea feat(ui,api): resolve department filter errors and enhance import wizard 2026-01-27 14:23:03 +08:00
Codex Agent fbc696b61c feat: add visibility filter to unbound permits API 2025-12-29 16:59:26 +08:00
Codex Agent c55170208b feat: add visibility filter for permit management and fix V2 visibility logic
- Added 'Enabled Status' dropdown filter to db_admin.html for filtering permits by visibility (visible/hidden/all)
- Updated admin_permits_advanced_filter API to accept and process visibility parameter
- Modified filter_permits_advanced in licensing_repo.py to filter by is_v2_visible column
- Fixed role-based access control to allow department_admin to toggle permit visibility
- Improved parameter parsing in API endpoints for more robust handling
2025-12-29 15:54:53 +08:00
Codex Agent b532c46dc1 feat: 优化事项过滤逻辑与后台管理界面交互 2025-12-26 09:26:10 +08:00
23 changed files with 3588 additions and 198 deletions

231
FRONTEND_DEBUG_GUIDE.md Normal file
View File

@ -0,0 +1,231 @@
# 组织架构层级调试指南
## 已完成的修改
### 1. 前端调试日志 (static/super_admin.html)
已在以下位置添加调试日志:
#### 位置 1: API 响应检查 (第 2602 行之后)
```javascript
console.log('=== 组织架构层级调试 ===');
console.log('API 返回树形结构:', JSON.stringify(response.data.tree || [], null, 2));
```
#### 位置 2: 扁平化节点检查 (第 2608 行之后)
```javascript
console.log('扁平化节点数量:', orgChartData.allNodes.length);
console.log('所有节点 level 值:');
orgChartData.allNodes.forEach(n => {
console.log(` [Level ${n.level}] ${n.name} (ID: ${n.id}, Parent: ${n.node.parent_id || 'root'})`);
});
```
#### 位置 3: 渲染时的 level 和缩进检查 (第 2703 行之后)
```javascript
console.log(`🔍 渲染节点: "${node.name}"`);
console.log(` - level: ${level}`);
console.log(` - indent: ${level * 30}px`);
console.log(` - node.parent_id: ${node.parent_id || 'null'}`);
console.log(` - node.unit_level: ${node.unit_level || 'undefined'}`);
console.log(` - children 数量: ${node.children ? node.children.length : 0}`);
```
#### 位置 4: 渲染完成后的总结 (第 2622 行之后)
```javascript
console.log('=== 渲染完成 ===');
console.log('根节点数量:', tree.length);
console.log('总节点数量:', orgChartData.allNodes.length);
console.log('部门名称中包含"交通运输"的节点:');
orgChartData.allNodes
.filter(n => n.name.includes('交通运输'))
.forEach(n => console.log(` - ${n.name} (Level: ${n.level})`));
console.log('部门名称中包含"三水"的节点:');
orgChartData.allNodes
.filter(n => n.name.includes('三水'))
.forEach(n => console.log(` - ${n.name} (Level: ${n.level})`));
```
### 2. 数据库验证脚本 (verify_org_hierarchy.py)
创建了独立的验证脚本,可以检查数据库中的层级关系。
**注意**: 该脚本需要数据库连接,当前环境可能无法连接。
## 测试步骤
### 步骤 1: 清除浏览器缓存(重要!)
1. 在浏览器中按 `Ctrl + Shift + Delete` 打开清除缓存对话框
2. 选择"缓存的图片和文件"
3. 点击"清除数据"
4. 按 `Ctrl + F5` 强制刷新页面
### 步骤 2: 打开浏览器开发者工具
1. 访问 `/fs-ai-asistant/api/workflow/lawrisk/admin/super` 页面
2. 按 `F12` 打开开发者工具
3. 切换到 **Console控制台** 标签
### 步骤 3: 查看调试输出
在控制台中查找以下关键信息:
#### 3.1 API 返回的树形结构
```
=== 组织架构层级调试 ===
API 返回树形结构: [...]
```
检查:
- "三水区服务部门" 的父节点是否是 "市级服务部门"
- "佛山交通运输局" 的父节点是否是 "市级服务部门"
- 两者是否在同一层级(都是市级服务部门的直接子节点)
#### 3.2 扁平化节点列表
```
扁平化节点数量: XX
所有节点 level 值:
[Level 0] 市级服务部门
[Level 1] 三水区服务部门
[Level 1] 佛山交通运输局
...
```
检查:
- "三水区服务部门" 的 level 是否为 1
- "佛山交通运输局" 的 level 是否为 1
#### 3.3 渲染时的缩进计算
```
🔍 渲染节点: "三水区服务部门"
- level: 1
- indent: 30px
🔍 渲染节点: "佛山交通运输局"
- level: 1
- indent: 30px
```
检查:
- 两者的 level 是否相同
- 两者的 indent 是否相同 (都是 30px)
#### 3.4 渲染完成后的总结
```
=== 渲染完成 ===
部门名称中包含"交通运输"的节点:
- 佛山交通运输局 (Level: 1)
部门名称中包含"三水"的节点:
- 三水区服务部门 (Level: 1)
- 三水区交通局 (Level: 2)
...
```
### 步骤 4: 检查 Network 标签
1. 切换到 **Network网络** 标签
2. 刷新页面
3. 找到 `/admin/service-departments/tree` 请求
4. 点击查看响应数据
验证 JSON 结构:
```json
{
"success": true,
"data": {
"tree": [
{
"name": "市级服务部门",
"children": [
{
"name": "三水区服务部门",
"parent_id": "市级服务部门的ID"
},
{
"name": "佛山交通运输局",
"parent_id": "市级服务部门的ID"
}
]
}
]
}
}
```
## 预期结果
### 正确的情况
```
[Level 0] 市级服务部门
[Level 1] 三水区服务部门 ← indent: 30px
[Level 1] 佛山交通运输局 ← indent: 30px (与三水区服务部门同级)
```
### 错误的情况(如果问题存在)
```
[Level 0] 市级服务部门
[Level 1] 三水区服务部门 ← indent: 30px
[Level 2] 佛山交通运输局 ← indent: 60px (错误!多了一层缩进)
```
## 可能的问题诊断
### 问题 1: API 数据本身错误
**症状**: 控制台显示 "佛山交通运输局" 的 level 不是 1
**原因**: 数据库中的 parent_id 不正确
**解决方案**: 需要修复数据库数据
### 问题 2: 前端显示逻辑错误
**症状**: API 数据正确,但渲染时的 level 参数错误
**原因**: `renderOrgListNode` 函数的 level 传递有问题
**解决方案**: 检查递归调用逻辑
### 问题 3: 浏览器缓存
**症状**: 修改代码后问题仍然存在,但控制台显示的日志是正确的
**原因**: 浏览器缓存了旧的 JavaScript 代码
**解决方案**: 清除缓存并强制刷新Ctrl + F5
### 问题 4: 部门名称不匹配
**症状**: 数据库中没有 "佛山交通运输局",只有 "佛山市交通运输局"(带"市"字)
**原因**: 用户看到的部门和数据库中的部门不是同一个
**解决方案**:
1. 确认用户访问的环境
2. 检查数据库中的实际部门名称
3. 如果需要,创建部门或重命名
## 下一步行动
根据调试结果,可能需要:
1. **如果 API 数据错误**: 运行 SQL 脚本修复数据库
2. **如果前端逻辑错误**: 修改 `static/super_admin.html` 中的渲染逻辑
3. **如果是缓存问题**: 清除浏览器缓存
4. **如果是数据不一致**: 检查环境配置,确认使用的是正确的数据库
## 相关文件
- `static/super_admin.html`: 前端页面(已添加调试日志)
- `lawrisk/api/v2.py`: API 路由 (`/admin/service-departments/tree`)
- `lawrisk/services/licensing_repo.py`: 树形结构构建逻辑
- `verify_org_hierarchy.py`: 数据库验证脚本
## 联系方式
如果问题持续存在,请提供:
1. 浏览器控制台的完整输出(截图或复制文本)
2. Network 标签中 `/admin/service-departments/tree` 的响应数据
3. 数据库中相关部门的信息(使用 SQL 查询)

View File

@ -0,0 +1,126 @@
# 组织架构层级调试快速参考
## 问题描述
"三水区服务部门" 和 "佛山交通运输局" 应该显示相同的缩进(都是 Level 1但前端显示不一致。
## 已添加的调试功能
### ✅ 前端调试日志 (已添加到 super_admin.html)
在浏览器控制台会看到:
- `=== 组织架构层级调试 ===` - API 返回的完整树形结构
- `扁平化节点数量` - 所有节点的 level 值
- `🔍 渲染节点` - 每个节点渲染时的详细信息
- `=== 渲染完成 ===` - 包含"交通运输"和"三水"的部门列表
## 快速测试步骤
### 1⃣ 清除缓存(必须)
```
Ctrl + Shift + Delete → 清除"缓存的图片和文件"
```
### 2⃣ 强制刷新
```
Ctrl + F5
```
### 3⃣ 打开开发者工具
```
F12 → Console 标签
```
### 4⃣ 查找关键信息
在控制台中搜索:
- `"佛山交通运输局"`
- `"三水区服务部门"`
- `"交通运输"` (查看所有包含此关键词的部门)
## 预期的正确输出
```
扁平化节点数量: XX
所有节点 level 值:
[Level 0] 市级服务部门 (ID: xxx, Parent: root)
[Level 1] 三水区服务部门 (ID: xxx, Parent: 市级服务部门的ID) ✅
[Level 1] 佛山交通运输局 (ID: xxx, Parent: 市级服务部门的ID) ✅
```
```
🔍 渲染节点: "三水区服务部门"
- level: 1
- indent: 30px
🔍 渲染节点: "佛山交通运输局"
- level: 1
- indent: 30px
```
## 如果看到错误的输出
### 情况 1: Level 不一致
```
[Level 1] 三水区服务部门
[Level 2] 佛山交通运输局 ❌ 错误!
```
**问题**: API 返回的数据不正确
**解决**: 需要修复数据库
### 情况 2: Indent 不一致
```
🔍 渲染节点: "三水区服务部门"
- indent: 30px
🔍 渲染节点: "佛山交通运输局"
- indent: 60px ❌ 错误!
```
**问题**: 前端渲染逻辑错误
**解决**: 检查 `renderOrgListNode` 函数
### 情况 3: 部门名称不匹配
```
部门名称中包含"交通运输"的节点:
- 佛山市交通运输局 (Level: 1) ← 注意是"佛山市"不是"佛山"
```
**问题**: 数据库中的部门名称与用户看到的不一致
**解决**: 确认数据库中实际的部门名称
## SQL 验证查询
如果需要直接查询数据库:
```sql
-- 查看相关部门的层级关系
SELECT
sd.id,
sd.name,
sd.code,
sd.parent_id,
parent.name as parent_name,
sd.unit_level
FROM service_departments sd
LEFT JOIN service_departments parent ON sd.parent_id = parent.id
WHERE sd.name LIKE '%交通运输%'
OR sd.name LIKE '%三水%'
ORDER BY sd.name;
```
## 常见问题速查表
| 症状 | 可能原因 | 解决方案 |
|------|----------|----------|
| 清除缓存后问题仍存在 | API 数据错误 | 检查 Network 标签的响应 |
| 控制台无调试输出 | 代码未加载 | 强制刷新 (Ctrl+F5) |
| API 数据正确但显示错误 | 前端渲染问题 | 检查 level 参数传递 |
| 找不到"佛山交通运输局" | 部门名称不一致 | 查询数据库确认名称 |
## 关键文件位置
- 前端: `static/super_admin.html` (第 2602-2625 行)
- API: `lawrisk/api/v2.py` (第 545-556 行)
- 数据逻辑: `lawrisk/services/licensing_repo.py` (第 2190-2224 行)
## 下一步
1. 按照上述步骤测试
2. 记录控制台输出
3. 根据输出结果决定修复方案
4. 如需帮助,提供控制台输出截图

290
IMPLEMENTATION_SUMMARY.md Normal file
View File

@ -0,0 +1,290 @@
# 组织架构层级调试实施总结
## ✅ 已完成的工作
### 1. 前端调试日志增强
**文件**: `static/super_admin.html`
在以下关键位置添加了详细的调试日志:
#### 位置 A - API 响应验证 (第 2602 行之后)
```javascript
console.log('=== 组织架构层级调试 ===');
console.log('API 返回树形结构:', JSON.stringify(response.data.tree || [], null, 2));
```
**用途**: 验证后端返回的原始数据结构是否正确
#### 位置 B - 扁平化数据验证 (第 2608 行之后)
```javascript
console.log('扁平化节点数量:', orgChartData.allNodes.length);
console.log('所有节点 level 值:');
orgChartData.allNodes.forEach(n => {
console.log(` [Level ${n.level}] ${n.name} (ID: ${n.id}, Parent: ${n.node.parent_id || 'root'})`);
});
```
**用途**: 检查 `flattenTree` 函数处理后的 level 值是否正确
#### 位置 C - 渲染时实时跟踪 (第 2703 行之后)
```javascript
console.log(`🔍 渲染节点: "${node.name}"`);
console.log(` - level: ${level}`);
console.log(` - indent: ${level * 30}px`);
console.log(` - node.parent_id: ${node.parent_id || 'null'}`);
console.log(` - node.unit_level: ${node.unit_level || 'undefined'}`);
console.log(` - children 数量: ${node.children ? node.children.length : 0}`);
```
**用途**: 实时跟踪每个节点渲染时的参数,找出 level 计算错误的具体位置
#### 位置 D - 渲染完成总结 (第 2622 行之后)
```javascript
console.log('=== 渲染完成 ===');
console.log('根节点数量:', tree.length);
console.log('总节点数量:', orgChartData.allNodes.length);
console.log('部门名称中包含"交通运输"的节点:');
orgChartData.allNodes
.filter(n => n.name.includes('交通运输'))
.forEach(n => console.log(` - ${n.name} (Level: ${n.level})`));
console.log('部门名称中包含"三水"的节点:');
orgChartData.allNodes
.filter(n => n.name.includes('三水'))
.forEach(n => console.log(` - ${n.name} (Level: ${n.level})`));
```
**用途**: 快速定位相关部门的层级,便于对比验证
### 2. 数据库验证脚本
**文件**: `verify_org_hierarchy.py`
创建了独立的 Python 脚本来验证数据库中的层级关系:
**功能**:
- 构建完整的组织架构树
- 打印层级树形结构
- 验证关键部门("佛山交通运输局"、"三水区服务部门")的层级
- 检查两者是否同级(父节点相同)
- 导出 JSON 格式的树形结构用于对比
**使用方法**:
```bash
python verify_org_hierarchy.py
```
**注意**: 当前环境可能无法连接数据库,需要在实际运行环境中使用。
### 3. 调试指南文档
**文件**:
- `FRONTEND_DEBUG_GUIDE.md` - 详细的调试步骤指南
- `HIERARCHY_DEBUG_QUICK_REF.md` - 快速参考卡片
## 🔍 如何使用这些调试功能
### 测试流程
1. **清除浏览器缓存** (重要!)
- 按 `Ctrl + Shift + Delete`
- 选择"缓存的图片和文件"
- 点击"清除数据"
2. **强制刷新页面**
- 按 `Ctrl + F5`
3. **打开开发者工具**
- 按 `F12`
- 切换到 **Console** 标签
4. **查找调试输出**
- 在控制台中搜索关键词:
- `"三水区服务部门"`
- `"佛山交通运输局"`
- `"交通运输"` (查看所有相关部门)
5. **分析结果**
**✅ 正确的输出示例**:
```
所有节点 level 值:
[Level 0] 市级服务部门
[Level 1] 三水区服务部门 ✅
[Level 1] 佛山交通运输局 ✅
```
```
🔍 渲染节点: "三水区服务部门"
- level: 1
- indent: 30px
🔍 渲染节点: "佛山交通运输局"
- level: 1
- indent: 30px
```
**❌ 错误的输出示例**:
```
所有节点 level 值:
[Level 1] 三水区服务部门 ✅
[Level 2] 佛山交通运输局 ❌ (应该是 Level 1)
```
### 根据输出结果诊断问题
#### 情况 1: API 数据正确,前端显示正确
```
✅ 问题已解决!可能是浏览器缓存导致的。
```
#### 情况 2: API 数据错误
```
症状: 控制台显示 "佛山交通运输局" 的 level 不是 1
原因: 数据库中的 parent_id 不正确
解决: 需要运行 SQL 脚本修复数据库
```
**修复 SQL**:
```sql
-- 步骤 1: 查看 "佛山交通运输局" 的当前状态
SELECT id, name, parent_id, unit_level
FROM service_departments
WHERE name LIKE '%交通运输%';
-- 步骤 2: 查看 "市级服务部门" 的 ID
SELECT id, name, code
FROM service_departments
WHERE code = 'FSSJSJ';
-- 步骤 3: 如果 parent_id 不正确,更新它
UPDATE service_departments
SET parent_id = '市级服务部门的ID'
WHERE name = '佛山交通运输局';
-- 步骤 4: 验证更新
SELECT
sd.name,
parent.name as parent_name,
sd.unit_level
FROM service_departments sd
LEFT JOIN service_departments parent ON sd.parent_id = parent.id
WHERE sd.name LIKE '%交通运输%' OR sd.name LIKE '%三水%'
ORDER BY sd.name;
```
#### 情况 3: API 数据正确,但前端显示错误
```
症状: 控制台显示 level 正确,但页面显示的缩进不一致
原因: 前端渲染逻辑问题
解决: 检查 renderOrgListNode 函数的 level 参数传递
```
#### 情况 4: 部门名称不匹配
```
症状: 控制台显示 "佛山市交通运输局" (带"市"字)
原因: 用户看到的部门和数据库中的部门名称不一致
解决:
1. 确认用户访问的环境
2. 检查数据库中实际的部门名称
3. 如果需要,重命名或创建部门
```
## 📊 问题诊断决策树
```
开始
├─ 清除缓存并强制刷新
│ │
│ ├─ 问题解决? → ✅ 完成 (是浏览器缓存)
│ │
│ └─ 问题仍存在 → 继续诊断
├─ 查看控制台日志
│ │
│ ├─ API 数据错误?
│ │ └─ 是 → 运行 SQL 修复数据库
│ │
│ ├─ level 计算正确?
│ │ └─ 是 → ✅ 完成 (前端显示正确)
│ │
│ ├─ indent 计算正确?
│ │ └─ 是 → ✅ 完成 (前端显示正确)
│ │
│ └─ 前端渲染问题 → 检查 renderOrgListNode 函数
└─ 验证数据库
└─ 运行 verify_org_hierarchy.py
├─ 层级正确? → ✅ 完成
└─ 层级错误 → 修复数据库
```
## 📁 相关文件清单
### 前端文件
- `static/super_admin.html` - 主页面 (已添加调试日志)
### 后端文件
- `lawrisk/api/v2.py` - API 路由 (第 545-556 行)
- `lawrisk/services/licensing_repo.py` - 数据访问层 (第 2190-2224 行)
### 调试工具
- `verify_org_hierarchy.py` - 数据库验证脚本
### 文档
- `FRONTEND_DEBUG_GUIDE.md` - 详细调试指南
- `HIERARCHY_DEBUG_QUICK_REF.md` - 快速参考
- `IMPLEMENTATION_SUMMARY.md` - 本文件
## 🎯 预期结果
### 正确的层级结构
```
市级服务部门 (Level 0)
├── 三水区服务部门 (Level 1) ← 缩进 30px
│ ├── 三水区交通局 (Level 2) ← 缩进 60px
│ └── 三水区水利局 (Level 2) ← 缩进 60px
├── 佛山交通运输局 (Level 1) ← 缩进 30px ✅ 与三水区服务部门同级
├── 佛山人力资源和社会保障局 (Level 1) ← 缩进 30px
└── 佛山住房城乡规划建设局 (Level 1) ← 缩进 30px
```
### 验证清单
- [ ] 清除浏览器缓存
- [ ] 强制刷新页面 (Ctrl + F5)
- [ ] 打开开发者工具 (F12)
- [ ] 查看控制台输出
- [ ] 确认 API 数据正确
- [ ] 确认 level 计算正确
- [ ] 确认 indent 计算正确
- [ ] 确认页面显示正确
- [ ] 测试拖拽功能
- [ ] 测试搜索功能
- [ ] 测试展开/折叠功能
## 💡 提示
1. **首次测试必须清除缓存**,否则可能看到旧的代码行为
2. **控制台输出非常详细**,可以使用搜索功能快速定位
3. **如果问题仍存在**,请提供完整的控制台输出
4. **数据库修复需要谨慎**,建议先备份数据库
## 🚀 下一步行动
1. **立即测试**: 按照"测试流程"步骤操作
2. **记录结果**: 截图或复制控制台输出
3. **根据结果诊断**: 参考"根据输出结果诊断问题"部分
4. **实施修复**: 根据诊断结果选择相应的修复方案
5. **验证修复**: 重新测试确认问题已解决
---
**问题?** 请参考:
- `FRONTEND_DEBUG_GUIDE.md` - 详细的分步指南
- `HIERARCHY_DEBUG_QUICK_REF.md` - 快速参考表格
**需要帮助?** 请提供:
1. 浏览器控制台的完整输出
2. Network 标签中 `/admin/service-departments/tree` 的响应
3. 页面显示的截图

226
SORTING_FIX_SUMMARY.md Normal file
View File

@ -0,0 +1,226 @@
# 组织架构显示顺序修复总结
## 问题描述
用户观察到在组织架构显示中,"佛山交通运输局"(无下属单位)显示在"三水区服务部门"(有下属单位)之后,导致视觉上容易被误解为"三水区服务部门"的子节点。
## 解决方案
修改了 `lawrisk/services/licensing_repo.py` 中的 `sort_tree` 函数,实现以下排序规则:
### 新的排序规则
```python
nodes.sort(key=lambda x: (
1 if x.get("children") else 0, # 有子节点的排后面 (1),无子节点的排前面 (0)
x.get("name", "") # 名称排序
))
```
**规则说明**:
1. **优先级 1**: 没有子节点的部门(叶子节点)排在前面
2. **优先级 2**: 有子节点的部门(父节点)排在后面
3. **优先级 3**: 同类节点按名称字母顺序排序
## 修改前后对比
### ❌ 修改前(按名称排序)
```
市级服务部门
├── 三水区服务部门 - 有下属单位
│ ├── 三水区交通局
│ └── 三水区水利局
├── 佛山交通运输局 - 无下属单位 (容易被误解为上面的子节点)
└── 佛山人力资源和社会保障局 - 无下属单位
```
### ✅ 修改后(叶子节点优先)
```
市级服务部门
├── 佛山交通运输局 - 无下属单位 ✅ 清晰显示为同级
├── 佛山人力资源和社会保障局 - 无下属单位 ✅ 清晰显示为同级
├── 佛山住房城乡规划建设局 - 无下属单位 ✅ 清晰显示为同级
└── 三水区服务部门 - 有下属单位 ⬇️ 清晰显示为父节点
├── 三水区交通局
└── 三水区水利局
```
## 代码修改详情
**文件**: `lawrisk/services/licensing_repo.py`
**函数**: `build_service_department_tree()`
**行数**: 第 2217-2224 行
### 修改前
```python
def sort_tree(nodes: List[Dict[str, Any]]) -> None:
nodes.sort(key=lambda x: x.get("name", ""))
for node in nodes:
if node.get("children"):
sort_tree(node["children"])
```
### 修改后
```python
def sort_tree(nodes: List[Dict[str, Any]]) -> None:
nodes.sort(key=lambda x: (
1 if x.get("children") else 0, # 叶子节点优先
x.get("name", "") # 名称排序
))
for node in nodes:
if node.get("children"):
sort_tree(node["children"])
```
## 预期效果
### 视觉效果改进
1. **减少误解**: 无下属单位的市级部门不会被误认为是区级服务部门的子节点
2. **层次清晰**: 有子节点的父节点显示在后面,视觉层次更清晰
3. **符合直觉**: 同级部门中,先显示简单的(叶子节点),再显示复杂的(父节点)
### 层级关系保持不变
**重要**: 这个修改只改变了**显示顺序**,没有改变**层级关系**。
- 三水区服务部门的层级仍然是 Level 1
- 佛山交通运输局的层级仍然是 Level 1
- 两者仍然是同级,都是市级服务部门的直接子节点
## 测试步骤
### 1. 清除浏览器缓存
```
Ctrl + Shift + Delete → 清除"缓存的图片和文件"
```
### 2. 强制刷新页面
```
Ctrl + F5
```
### 3. 访问页面
访问 `/fs-ai-asistant/api/workflow/lawrisk/admin/super`
### 4. 验证显示顺序
在"市级服务部门"下,检查:
**无下属单位的市级部门应该排在前面**:
- 佛山交通运输局
- 佛山人力资源和社会保障局
- 佛山住房城乡规划建设局
- 佛山市场监督管理局
- ...
**有下属单位的区级服务部门应该排在后面**:
- 三水区服务部门
- 南海区服务部门
- 顺德区服务部门
- 高明区服务部门
- 禅城区服务部门
### 5. 验证缩进层级
确保所有市级部门(包括"佛山交通运输局")的缩进一致:
- 都是 Level 1
- 缩进都是 30px
## 验证清单
- [ ] 清除浏览器缓存
- [ ] 强制刷新页面 (Ctrl + F5)
- [ ] 检查"市级服务部门"下的显示顺序
- [ ] 确认无下属单位的部门显示在前
- [ ] 确认有下属单位的部门显示在后
- [ ] 验证"佛山交通运输局"的缩进是 Level 1
- [ ] 验证"三水区服务部门"的缩进是 Level 1
- [ ] 确认两者的缩进一致(都是 30px
- [ ] 测试展开/折叠功能正常
- [ ] 测试拖拽功能正常
- [ ] 测试搜索功能正常
## 其他改进
### 前端调试日志(已添加)
如果需要进一步调试,前端已添加了详细的日志输出(参考 `FRONTEND_DEBUG_GUIDE.md`
在浏览器控制台可以看到:
- API 返回的完整树形结构
- 每个节点的 level 值
- 渲染时的 indent 计算
- 所有包含"交通运输"和"三水"的部门列表
### 数据库验证脚本
创建了 `verify_org_hierarchy.py` 脚本,可以验证数据库中的层级关系:
```bash
python verify_org_hierarchy.py
```
## 技术细节
### Python 排序元组
Python 的 `sort` 方法支持元组排序,会按照元组的元素顺序依次比较:
```python
nodes.sort(key=lambda x: (
1 if x.get("children") else 0, # 第一优先级
x.get("name", "") # 第二优先级
))
```
**排序逻辑**:
1. 先比较第一个元素:
- 有子节点的部门 → 1
- 无子节点的部门 → 0
- 0 < 1所以无子节点的排在前面
2. 第一个元素相同时,比较第二个元素(名称)
### 递归排序
排序函数是递归的,会对整个树的每一层都应用相同的排序规则:
```python
for node in nodes:
if node.get("children"):
sort_tree(node["children"]) # 递归排序子节点
```
这样确保:
- 根节点按照规则排序
- 每个节点的子节点也按照规则排序
- 整个树结构都遵循"叶子节点优先"的原则
## 相关文件
- `lawrisk/services/licensing_repo.py` - 已修改排序逻辑
- `static/super_admin.html` - 已添加调试日志
- `verify_org_hierarchy.py` - 数据库验证脚本
- `FRONTEND_DEBUG_GUIDE.md` - 前端调试指南
## 总结
**已完成**:
- 修改了后端排序逻辑,实现"叶子节点优先"的显示顺序
- 预期可以解决视觉上的层级误解问题
- 没有改变实际的层级关系,只改变了显示顺序
**待测试**:
- 清除浏览器缓存
- 刷新页面查看新的显示顺序
- 验证"佛山交通运输局"和"三水区服务部门"的缩进一致
**额外功能**:
- 前端调试日志已添加,便于后续调试
- 数据库验证脚本已创建,便于验证数据正确性
---
**下一步**: 清除浏览器缓存,刷新页面,查看新的显示顺序是否符合预期。

2
app.py
View File

@ -84,4 +84,4 @@ def create_app() -> Flask:
if __name__ == "__main__":
port = int(os.getenv("PORT", "8000"))
app = create_app()
app.run(host="0.0.0.0", port=port)
app.run(host="0.0.0.0", port=port, debug=True)

8
inspection_output.txt Normal file
View File

@ -0,0 +1,8 @@
Tables:
--- Schemas ---
--- Searching for Theme ---
Error querying data: no such table: legal_risk_theme

8
inspection_output_v2.txt Normal file
View File

@ -0,0 +1,8 @@
Tables:
--- Schemas ---
--- Searching for Theme ---
Error querying data: no such table: themes

View File

@ -16,7 +16,11 @@ from flask import (
url_for,
)
from lawrisk.services.auth_service import get_user_by_username, verify_password
from lawrisk.services.auth_service import (
get_user_by_username,
update_user_account,
verify_password,
)
auth_bp = Blueprint("auth", __name__, url_prefix="/fs-ai-asistant/api/workflow/lawrisk")
@ -109,6 +113,38 @@ def ensure_admin_access(
return user, None
def is_superuser(user: Optional[Dict[str, Any]]) -> bool:
"""Check if the given user is a superuser with full system access.
Superuser criteria (any match):
1. role equals 'admin'
2. username equals 'fssjsj'
3. grade equals 100 (fallback check)
Args:
user: User dictionary from get_current_user()
Returns:
True if user is a superuser, False otherwise
"""
if not user:
return False
# Primary check: role-based
if user.get("role") == "admin":
return True
# Special exception: specific username
if user.get("username") == "fssjsj":
return True
# Fallback check: grade-based
if user.get("grade") == 100:
return True
return False
def get_current_user() -> Optional[Dict[str, Any]]:
data = session.get(SESSION_USER_KEY)
if not isinstance(data, dict):
@ -201,3 +237,103 @@ def current_user_endpoint() -> Response:
if not user:
return jsonify({"authenticated": False}), 401
return jsonify({"authenticated": True, "user": user})
def _validate_password_complexity(password: str) -> Optional[str]:
"""验证密码复杂度最小8位必须包含字母和数字"""
if len(password) < 8:
return "密码长度至少为 8 位"
if not (any(c.isalpha() for c in password) and any(c.isdigit() for c in password)):
return "密码必须同时包含字母和数字"
return None
@auth_bp.post("/v2/auth/change-password")
@login_required
def change_password_endpoint() -> Response:
"""Allow authenticated users to change their own password."""
try:
payload = request.get_json(silent=True) or {}
current_password = (payload.get("current_password", "") if payload else "").strip()
new_password = (payload.get("new_password", "") if payload else "").strip()
# Validate required fields
if not current_password:
return jsonify({"error": "请输入当前密码"}), 400
if not new_password:
return jsonify({"error": "请输入新密码"}), 400
# Get current user from session
user = get_current_user()
if not user:
return jsonify({"error": "用户未登录"}), 401
print(f"DEBUG: Session user = {user}")
# Get complete user data including password hash
username = user.get("username")
if not username:
return jsonify({"error": "无效的用户信息"}), 400
# 使用 get_user_by_username 获取完整用户数据(包括密码哈希)
user_data = get_user_by_username(username)
if not user_data:
return jsonify({"error": "用户不存在"}), 404
# Debug logging
print(f"DEBUG: username = {username}")
print(f"DEBUG: password_hash exists = {'password_hash' in user_data}")
print(f"DEBUG: password_hash length = {len(user_data.get('password_hash', ''))}")
print(f"DEBUG: current_password provided = {bool(current_password)}")
# Verify current password
password_hash = user_data.get("password_hash", "")
if not password_hash:
print("ERROR: password_hash is empty!")
return jsonify({"error": "系统错误:无法获取密码哈希"}), 500
if not verify_password(current_password, password_hash):
print("ERROR: Password verification failed!")
print(f"DEBUG: Hash preview: {password_hash[:20]}..." if len(password_hash) > 20 else f"DEBUG: Hash: {password_hash}")
return jsonify({"error": "当前密码错误"}), 401
print("DEBUG: Password verified successfully!")
# Get user_id for update operation
user_id = user_data.get("id")
if not user_id:
return jsonify({"error": "无法获取用户ID"}), 400
# Validate new password complexity
complexity_error = _validate_password_complexity(new_password)
if complexity_error:
return jsonify({"error": complexity_error}), 400
# Check if new password is same as current
if verify_password(new_password, user_data.get("password_hash", "")):
return jsonify({"error": "新密码不能与当前密码相同"}), 400
# Update password
print(f"DEBUG: Attempting to update password for user_id = {user_id}")
update_result = update_user_account(
user_id=user_id,
password=new_password,
)
print(f"DEBUG: Update result = {update_result}")
if not update_result:
print("ERROR: Password update returned None/False")
return jsonify({"error": "密码修改失败,请稍后重试"}), 500
# Note: update_user_account already logs the operation internally
print("DEBUG: Password change completed successfully")
return jsonify({"message": "密码修改成功"}), 200
except Exception as e:
# Log full error details
import traceback
print(f"ERROR: Exception in change_password_endpoint")
print(f"ERROR: Type: {type(e).__name__}")
print(f"ERROR: Message: {str(e)}")
print(f"ERROR: Traceback:\n{traceback.format_exc()}")
return jsonify({"error": "密码修改失败,请稍后重试"}), 500

View File

@ -18,6 +18,7 @@ from lawrisk.services.licensing_repo import (
list_region_permit_catalog,
load_theme_payload,
create_checkpoint,
list_service_departments,
list_checkpoints,
restore_checkpoint,
delete_checkpoint,
@ -55,6 +56,7 @@ from lawrisk.services.auth_service import (
)
from lawrisk.services.template_service import (
get_permit_template_path,
get_permit_sample_path,
get_permit_template_metadata,
overwrite_permit_template,
)
@ -126,39 +128,130 @@ def lawrisk_search_v2():
return jsonify({"success": False, "message": str(e), "data": {}}), 500
@v2_bp.route('/v2/regions', methods=['GET'])
def lawrisk_regions():
"""Get list of available regions."""
try:
regions = list_regions()
return jsonify({"success": True, "data": {"regions": regions}})
except Exception as exc:
print(f"lawrisk_regions error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/v2/unbound-permits', methods=['GET'])
@login_required
def lawrisk_unbound_permits():
"""Get list of permits that are not bound to any theme."""
"""Get list of permits that are not bound to any theme.
SECURITY: Requires login and filters permits based on user's department tree.
User can only see unbound permits from their department or its descendants.
"""
try:
permits = list_unbound_permits()
# Get current user for permission filtering
current_user = get_current_user()
if not current_user:
return jsonify({"success": False, "message": "Authentication required"}), 401
visibility = request.args.get("visibility") or "visible"
search_text = request.args.get("search_text")
department_ids = request.args.getlist("department_ids[]")
region_id = request.args.get("region_id")
# Get user's accessible departments for permission filtering
user_department = current_user.get("department", {})
user_dept_id = user_department.get("id")
# Import superuser check
from lawrisk.api.auth import is_superuser
# Check if user is superuser before applying department filtering
if is_superuser(current_user):
# Superuser can see all departments
# If departments are specified, use them as-is
if not department_ids:
department_ids = None # None means no filter
elif user_dept_id:
from lawrisk.services.licensing_repo import _lic_pg_conn, _ensure_service_department_schema, _fetch_department_descendants
with _lic_pg_conn() as conn:
_ensure_service_department_schema(conn)
cur = conn.cursor()
# Get user's department and its descendants
accessible_dept_ids = _fetch_department_descendants(cur, str(user_dept_id))
if not accessible_dept_ids:
# User has no accessible departments
print(f"[DEBUG] User {current_user.get('username')} has no accessible departments")
return jsonify({"success": True, "data": []})
# If user specified departments, intersect with accessible departments
if department_ids:
# Filter to only include departments that are both specified by user AND accessible
accessible_ids_set = set(accessible_dept_ids)
department_ids = [d for d in department_ids if d in accessible_ids_set]
if not department_ids:
# No overlap between specified departments and accessible departments
print(f"[DEBUG] No overlap between specified departments and accessible departments")
return jsonify({"success": True, "data": []})
else:
# No department filter specified, use all accessible departments
department_ids = accessible_dept_ids
elif not is_superuser(current_user):
# User has no department binding and is not a superuser
print(f"[DEBUG] User {current_user.get('username')} has no department binding")
return jsonify({"success": False, "message": "User has no department binding"}), 403
permits = list_unbound_permits(
visibility=visibility,
search_text=search_text,
department_ids=department_ids,
region_id=region_id
)
return jsonify({"success": True, "data": permits})
except Exception as exc:
print(f"lawrisk_unbound_permits error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
# ... (omitting irrelevant existing functions) ...
@v2_bp.route('/v2/departments', methods=['GET'])
def lawrisk_departments():
"""Get list of service departments for filtering (lighter permission check than admin)."""
try:
region_id = request.args.get("region_id")
departments = list_service_departments(region_id=region_id)
return jsonify({"success": True, "data": departments})
except Exception as exc:
print(f"lawrisk_departments error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/v2/regions', methods=['GET'])
def lawrisk_regions():
"""Get list of regions."""
try:
regions = list_regions()
return jsonify({"success": True, "data": regions})
except Exception as exc:
print(f"lawrisk_regions error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/v2/themes', methods=['GET'])
def lawrisk_all_themes():
"""Get list of all themes."""
try:
themes = list_all_themes()
start_date = request.args.get("start_date")
end_date = request.args.get("end_date")
department_ids = request.args.getlist("department_ids[]")
themes = list_all_themes(
start_date=start_date,
end_date=end_date,
department_ids=department_ids
)
return jsonify({"success": True, "data": themes})
except Exception as exc:
print(f"lawrisk_all_themes error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/getPermits', methods=['GET', 'POST'])
def lawrisk_get_permits():
"""Get permits for a specific region, filtered by user permissions."""
@ -887,8 +980,13 @@ def admin_themes():
@v2_bp.route('/admin/permits', methods=['GET'])
@login_required
def admin_permits():
"""Get permits for a region. Optional theme filter keeps backward compatibility."""
"""Get permits for a region. Optional theme filter keeps backward compatibility.
SECURITY: Requires login and filters permits based on user's department tree.
User can only see permits uploaded/bound by their department or its descendants.
"""
region_value = request.args.get("region") or request.args.get("region_id")
theme_value = request.args.get("theme") or request.args.get("theme_id")
@ -903,25 +1001,62 @@ def admin_permits():
)
try:
if theme_token:
permits = load_permits_and_risks(region_token, theme_token)
# Get current user for permission filtering
current_user = get_current_user()
if not current_user:
return jsonify({"success": False, "message": "Authentication required"}), 401
# Use list_permits_for_region which already supports current_user parameter
# This function internally calls get_visible_permits() for department-based filtering
permits = list_permits_for_region(region_token, current_user=current_user)
data = {
"region": region_token,
"theme": theme_token,
"permits": permits,
}
else:
catalog = list_region_permit_catalog(region_token)
data = {
"region": region_token,
"permits": catalog,
}
# If theme filter is specified, add it to the response
if theme_token:
data["theme"] = theme_token
return jsonify({"success": True, "data": data})
except Exception as exc:
print(f"admin_permits error: {exc}")
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permits/visibility', methods=['POST'])
def admin_toggle_permit_visibility():
"""Toggle the visibility of a permit in V2 API retrieval."""
admin_user, error = _admin_guard(prefer_json=True, roles=("admin", "department_admin"))
if error:
return error
data = request.get_json() or {}
region_id = data.get("region_id")
permit_id = data.get("permit_id")
is_visible = data.get("is_v2_visible")
if not region_id or not permit_id or is_visible is None:
return jsonify({"success": False, "message": "region_id, permit_id and is_v2_visible are required"}), 400
try:
from lawrisk.services.licensing_repo import update_permit_v2_visibility
operator = (admin_user or {}).get("username") or "admin"
success = update_permit_v2_visibility(
region_id=region_id,
permit_id=permit_id,
is_visible=bool(is_visible),
operator=operator
)
if success:
return jsonify({"success": True, "message": "Visibility updated"})
else:
return jsonify({"success": False, "message": "Permit details not found or update failed"}), 404
except Exception as exc:
return jsonify({"success": False, "message": str(exc)}), 500
@v2_bp.route('/admin/permit-import/upload', methods=['POST'])
def admin_permit_import_upload():
"""Upload Excel workbook and start an import session."""
@ -932,20 +1067,43 @@ def admin_permit_import_upload():
filename = file_storage.filename or 'import.xlsx'
file_bytes = file_storage.read()
user = get_current_user() or {}
user_role = user.get("role")
is_super_admin = (user_role == "admin")
uploaded_by = user.get("display_name") or user.get("username") or user.get("id")
content_type = file_storage.mimetype or "application/octet-stream"
binding_mode = (request.form.get("binding_mode") or "auto").strip().lower()
if binding_mode not in {"none", "department", "auto", "municipal", "district"}:
binding_mode = "auto"
bound_department_id = (request.form.get("bound_department_id") or "").strip() or None
if not bound_department_id:
bound_department_id = (user.get("department") or {}).get("id")
if bound_department_id and binding_mode == "auto":
# 只要用户绑定了部门,导入时就强制绑定到该部门所属的区域
# 避免市级管理员上传的数据根据 Sheet 名称意外流向其他区划
binding_mode = "department"
uploader_department_id = (user.get("department") or {}).get("id")
if not is_super_admin:
# Non-super admins are forced to use their own department's region
if not uploader_department_id:
return jsonify({"success": False, "message": "该账号未绑定服务部门,无法执行导入"}), 403
bound_department_id = uploader_department_id
binding_mode = "department"
else:
# Super admins default to their own department if none specified, but can use 'auto'
if not bound_department_id:
bound_department_id = uploader_department_id
# If a super admin manually chose a department OR has one and left it as auto,
# but they didn't explicitly ask for 'auto', we might want to default to department.
# However, the user said "super admins can manually specify", so 'auto' vs 'department'
# should probably be respected if they chose it.
# Existing logic forced 'department' if ANY bound_dept was present.
# We relax this for super admins to allow 'auto'.
if bound_department_id and binding_mode == "auto":
# If the super admin didn't explicitly request 'auto' in the form,
# maybe we still want to default to department?
# Actually, usually 'auto' is the frontend default.
# To allow super admin to use 'auto', we only force 'department' if THEY didn't send 'auto'.
# But the form usually sends 'auto'.
pass
if not file_bytes:
return jsonify({"success": False, "message": "上传的文件为空"}), 400
@ -986,6 +1144,45 @@ def admin_permit_import_template():
return jsonify({"success": False, "message": "模板文件暂时无法下载"}), 500
@v2_bp.route('/admin/permit-import/sample', methods=['GET'])
def admin_permit_import_sample():
"""Provide the Excel import sample file for download."""
sample_path = get_permit_sample_path()
if not os.path.exists(sample_path):
return jsonify({"success": False, "message": "样表文件不存在,请联系管理员"}), 404
try:
return send_file(
sample_path,
as_attachment=True,
download_name='风险提示表(仅销售预包装食品备案,市场监管部门)(样表).xlsx',
)
except Exception as exc:
print(f"admin_permit_import_sample error: {exc}")
return jsonify({"success": False, "message": "样表文件暂时无法下载"}), 500
@v2_bp.route('/admin/images/<filename>', methods=['GET'])
def admin_image(filename):
"""Serve admin UI image files."""
# 安全检查:只允许特定的文件名,防止路径遍历攻击
allowed_files = {'empty_table.png', 'sample_table.png'}
if filename not in allowed_files:
return jsonify({"success": False, "message": "文件不存在"}), 404
image_path = os.path.join(_project_root(), 'static', 'images', filename)
if not os.path.exists(image_path):
return jsonify({"success": False, "message": f"图片文件 {filename} 不存在"}), 404
try:
return send_file(image_path, mimetype='image/png')
except Exception as exc:
print(f"admin_image error for {filename}: {exc}")
return jsonify({"success": False, "message": "图片文件暂时无法加载"}), 500
def _build_import_preview_response(session_token: str):
"""Internal helper to build preview response JSON."""
try:
@ -1114,10 +1311,29 @@ def admin_reimport_permit_file(file_id: str):
return jsonify({"success": False, "message": "file_id 不能为空"}), 400
user = get_current_user() or {}
user_role = user.get("role")
is_super_admin = (user_role == "admin")
requested_by = user.get("display_name") or user.get("username") or user.get("id")
uploader_department_id = (user.get("department") or {}).get("id")
binding_mode = "auto"
bound_department_id = None
if not is_super_admin:
if not uploader_department_id:
return jsonify({"success": False, "message": "该账号未绑定服务部门,无法执行重新导入"}), 403
bound_department_id = uploader_department_id
binding_mode = "department"
try:
data = start_import_session_from_file(file_id, requested_by=requested_by)
data = start_import_session_from_file(
file_id,
requested_by=requested_by,
uploader_department_id=uploader_department_id,
bound_department_id=bound_department_id,
binding_mode=binding_mode
)
return jsonify({"success": True, "data": data})
except ValueError as exc:
return jsonify({"success": False, "message": str(exc)}), 404
@ -1273,9 +1489,16 @@ def admin_delete_permit():
if not change_summary:
change_summary = None
if not edited_by:
user = get_current_user() or {}
edited_by = user.get("username") or user.get("display_name") or "admin"
# Permission Check: Enforce for EVERYONE
role = user.get("role")
username = user.get("username")
if role != 'admin' and username != 'fssjsj':
return jsonify({"success": False, "message": "Permission denied: Only admin or fssjsj can delete permits"}), 403
if not edited_by:
edited_by = username or user.get("display_name") or "admin"
if not region_value or not permit_value:
return jsonify({"success": False, "message": "region_id 和 permit_id 均为必填"}), 400
@ -1543,90 +1766,130 @@ def admin_delete_checkpoint(checkpoint_id):
@v2_bp.route('/admin/permits/advanced-filter', methods=['GET', 'POST'])
@login_required
def admin_permits_advanced_filter():
"""Advanced filtering for permits with multiple dimensions.
SECURITY: Requires login and filters permits based on user's department tree.
User can only see permits from their department or its descendants.
Supports filtering by:
- regions: List of administrative regions (supports multi-select)
- themes: List of legal themes (supports multi-select)
- departments: List of departments (supports multi-select)
- departments: List of departments (supports multi-select) - intersected with user's accessible departments
- search_text: Permit name search
"""
try:
# Get current user for permission filtering
current_user = get_current_user()
if not current_user:
return jsonify({"success": False, "message": "Authentication required"}), 401
# Parse parameters from query string or request body
# Parse parameters from query/body in a more unified way
if request.method == 'GET':
regions = request.args.getlist('regions[]') or request.args.getlist('region')
themes = request.args.getlist('themes[]') or request.args.getlist('theme')
departments = request.args.getlist('departments[]') or request.args.getlist('department')
search_text = request.args.get('search_text') or request.args.get('q')
try:
limit = int(request.args.get('limit', '100'))
except (TypeError, ValueError):
limit = 100
try:
offset = int(request.args.get('offset', '0'))
except (TypeError, ValueError):
offset = 0
visibility = request.args.get('visibility')
limit = request.args.get('limit', '100')
offset = request.args.get('offset', '0')
else:
if request.is_json:
payload = request.get_json(silent=True) or {}
else:
payload = request.form.to_dict(flat=True) if request.form else {}
# Handle array parameters
payload = request.get_json(silent=True) or request.form
regions = payload.getlist('regions[]') if hasattr(payload, 'getlist') else payload.get('regions', [])
if isinstance(regions, str):
regions = [regions]
regions = regions or payload.getlist('region') if hasattr(payload, 'getlist') else payload.get('region', [])
if isinstance(regions, str):
regions = [regions]
themes = payload.getlist('themes[]') if hasattr(payload, 'getlist') else payload.get('themes', [])
if isinstance(themes, str):
themes = [themes]
themes = themes or payload.getlist('theme') if hasattr(payload, 'getlist') else payload.get('theme', [])
if isinstance(themes, str):
themes = [themes]
departments = payload.getlist('departments[]') if hasattr(payload, 'getlist') else payload.get('departments', [])
if isinstance(departments, str):
departments = [departments]
departments = departments or payload.getlist('department') if hasattr(payload, 'getlist') else payload.get('department', [])
if isinstance(departments, str):
departments = [departments]
search_text = payload.get('search_text') or payload.get('q')
try:
limit = int(payload.get('limit', '100'))
except (TypeError, ValueError):
limit = 100
try:
offset = int(payload.get('offset', '0'))
except (TypeError, ValueError):
offset = 0
visibility = payload.get('visibility')
limit = payload.get('limit', '100')
offset = payload.get('offset', '0')
# Normalize parameters - convert to lists if not already
if isinstance(regions, str):
regions = [regions]
if isinstance(themes, str):
themes = [themes]
if isinstance(departments, str):
departments = [departments]
# Normalize parameters
if isinstance(regions, str): regions = [regions]
if isinstance(themes, str): themes = [themes]
if isinstance(departments, str): departments = [departments]
# Filter out empty values
regions = [r.strip() for r in regions if r and r.strip()] if regions else None
themes = [t.strip() for t in themes if t and t.strip()] if themes else None
departments = [d.strip() for d in departments if d and d.strip()] if departments else None
search_text = search_text.strip() if search_text else None
search_text = (search_text or "").strip() or None
visibility = (visibility or "").strip().lower() or None
# Execute filtering
try:
limit = int(limit)
except (ValueError, TypeError):
limit = 100
try:
offset = int(offset)
except (ValueError, TypeError):
offset = 0
print(f"[DEBUG] admin_permits_advanced_filter params: search={search_text}, visibility={visibility}, regions={regions}")
# Get user's accessible departments for permission filtering
user_department = current_user.get("department", {})
user_dept_id = user_department.get("id")
# Import superuser check
from lawrisk.api.auth import is_superuser
# Check if user is superuser before applying department filtering
if is_superuser(current_user):
# Superuser can see all departments
# If departments are specified, use them as-is
if not departments:
departments = None # None means no filter
elif user_dept_id:
from lawrisk.services.licensing_repo import _lic_pg_conn, _ensure_service_department_schema, _fetch_department_descendants
with _lic_pg_conn() as conn:
_ensure_service_department_schema(conn)
cur = conn.cursor()
# Get user's department and its descendants
accessible_dept_ids = _fetch_department_descendants(cur, str(user_dept_id))
if not accessible_dept_ids:
# User has no accessible departments
print(f"[DEBUG] User {current_user.get('username')} has no accessible departments")
return jsonify({"success": True, "data": {
"permits": [],
"pagination": {"total": 0, "page": 1, "page_size": limit}
}})
# If user specified departments, intersect with accessible departments
if departments:
# Filter to only include departments that are both specified by user AND accessible
accessible_ids_set = set(accessible_dept_ids)
departments = [d for d in departments if d in accessible_ids_set]
if not departments:
# No overlap between specified departments and accessible departments
print(f"[DEBUG] No overlap between specified departments and accessible departments")
return jsonify({"success": True, "data": {
"permits": [],
"pagination": {"total": 0, "page": 1, "page_size": limit}
}})
else:
# No department filter specified, use all accessible departments
departments = accessible_dept_ids
elif not is_superuser(current_user):
# User has no department binding and is not a superuser
print(f"[DEBUG] User {current_user.get('username')} has no department binding")
return jsonify({"success": False, "message": "User has no department binding"}), 403
# Execute filtering with permission-constrained departments
# Use strict department hierarchy (no family expansion) for proper permission control
# Users can only see permits from their own department and descendants, NOT from parent departments
result = filter_permits_advanced(
regions=regions,
themes=themes,
departments=departments,
search_text=search_text,
visibility=visibility,
limit=limit,
offset=offset,
expand_department_family=False, # Strict mode: no family expansion
)
return jsonify({"success": True, "data": result})
@ -1636,25 +1899,66 @@ def admin_permits_advanced_filter():
@v2_bp.route('/admin/permits/filter-options', methods=['GET'])
@login_required
def admin_permits_filter_options():
"""Get available filter options for permit filtering.
SECURITY: Requires login. Returns only departments accessible to the user.
Args:
region_id: Optional. If provided, only return departments associated with this region
"""
try:
# Get current user for permission filtering
current_user = get_current_user()
if not current_user:
return jsonify({"success": False, "message": "Authentication required"}), 401
# Get region_id from query parameters
region_id = request.args.get('region_id')
# Get all regions
# Get all regions (regions are public metadata)
regions = list_regions()
# Get all themes
# Get all themes (themes are public metadata)
themes = list_all_themes()
# Get service departments (filtered by region if region_id is provided)
# Get service departments filtered by user's permissions
user_department = current_user.get("department", {})
user_dept_id = user_department.get("id")
# Import superuser check
from lawrisk.api.auth import is_superuser
# Check if user is superuser
if is_superuser(current_user):
# Superuser can see all departments
departments = list_service_departments(region_id=region_id) if region_id else list_service_departments()
elif user_dept_id:
from lawrisk.services.licensing_repo import _lic_pg_conn, _ensure_service_department_schema, _fetch_department_descendants
with _lic_pg_conn() as conn:
_ensure_service_department_schema(conn)
cur = conn.cursor()
# Get user's department and its descendants
accessible_dept_ids = _fetch_department_descendants(cur, str(user_dept_id))
if not accessible_dept_ids:
# User has no accessible departments
print(f"[DEBUG] User {current_user.get('username')} has no accessible departments")
departments = []
else:
# Get all departments, then filter to only include accessible ones
all_departments = list_service_departments(region_id=region_id) if region_id else list_service_departments()
# Filter departments to only include accessible ones
accessible_ids_set = set(accessible_dept_ids)
departments = [d for d in all_departments if d.get('id') in accessible_ids_set]
elif not is_superuser(current_user):
# User has no department binding and is not a superuser, return empty department list
print(f"[DEBUG] User {current_user.get('username')} has no department binding")
departments = []
return jsonify({
"success": True,

View File

@ -363,6 +363,7 @@ def get_user_by_id(user_id: str) -> Optional[Dict[str, Any]]:
au.role,
au.grade,
au.is_active,
au.password_hash,
au.service_department_id,
au.department_role,
au.created_at,

View File

@ -10,6 +10,7 @@ from lawrisk.services.licensing_repo import (
load_theme_payload,
load_permits_and_risks,
find_permit_contexts_by_name,
_ensure_v2_visibility_column,
)
from lawrisk.services.lawrisk_service import ChatClient
@ -200,18 +201,21 @@ def _get_preset_questions_pool() -> List[str]:
"""
from lawrisk.services.licensing_repo import _lic_pg_conn
# Query themes that have at least one permit
# Query themes that have at least one visible permit
sql = """
SELECT DISTINCT t.name AS theme_name
FROM themes t
JOIN region_theme_permits rtp ON rtp.theme_id = t.id
JOIN region_permit_details rpd ON rpd.region_id = rtp.region_id AND rpd.permit_id = rtp.permit_id
WHERE t.name NOT IN ('不涉及', '', '所有主题事项')
AND COALESCE(rpd.is_v2_visible, true) = true
ORDER BY t.name
"""
questions: List[str] = []
try:
with _lic_pg_conn() as conn:
_ensure_v2_visibility_column(conn)
cur = conn.cursor()
cur.execute(sql)
for (theme_name,) in cur.fetchall():
@ -313,6 +317,7 @@ def search_v2(
ctx["region_id"],
ctx["theme_id"],
permit_id=ctx["permit_id"],
only_visible=True,
)
if not permits:
continue
@ -368,7 +373,7 @@ def search_v2(
if ":" not in option_id:
continue
region_id, theme_id = option_id.split(":", 1)
payload = load_theme_payload(region_id, theme_id)
payload = load_theme_payload(region_id, theme_id, only_visible=True)
# Sanitize permits for V2 API (V2 should only expose external contact info)
for permit in payload.get("permits", []):

View File

@ -68,6 +68,9 @@ _PERMIT_THEME_OVERRIDE_SCHEMA_LOCK = threading.Lock()
_PERMIT_APPROVAL_DEPARTMENTS_SCHEMA_READY: Optional[bool] = None
_PERMIT_APPROVAL_DEPARTMENTS_SCHEMA_LOCK = threading.Lock()
_V2_VISIBILITY_SCHEMA_READY: Optional[bool] = None
_V2_VISIBILITY_SCHEMA_LOCK = threading.Lock()
_IMPORT_HEADER_ALIASES: Dict[str, Set[str]] = {
"permit_name": {
"许可事项",
@ -343,6 +346,9 @@ def _score_import_header(canonical: str, cell_text: str, col_idx: int) -> float:
elif canonical == "summary":
if "摘要" in text:
score += 3
elif canonical == "document_no":
if "文号" in text:
score += 5
elif canonical == "remark":
if "备注" in text:
score += 3
@ -1779,6 +1785,27 @@ def describe_permit_import_session(session_id: str) -> Dict[str, Any]:
sheet_risk_total += len(permit_rows)
total_risks += len(permit_rows)
permit_risks = []
common_meta = {
"responsible_contact": "",
"jurisdiction_scope": "",
"unit_name": "",
"permit_status": ""
}
for row in permit_rows:
permit_risks.append({
"serial_number": row.get("serial_number"),
"risk_content": row.get("risk_content"),
"legal_basis": row.get("legal_basis"),
"document_no": row.get("document_no"),
"summary": row.get("summary"),
"remark": row.get("remark"),
})
for key in common_meta:
if not common_meta[key] and row.get(key):
common_meta[key] = row.get(key)
# Try to resolve themes via rules (Base Table Logic)
resolved_theme_names = _resolve_themes_for_permit(conn, permit_name)
@ -1791,6 +1818,8 @@ def describe_permit_import_session(session_id: str) -> Dict[str, Any]:
"is_new": permit_name in new_permits,
"default_theme_names": [],
"resolved_theme_names": resolved_theme_names,
"risks": permit_risks,
"common_meta": common_meta,
"sample_serial": permit_rows[0].get("serial_number") if permit_rows else None,
"sample_risk": permit_rows[0].get("risk_content") if permit_rows else "",
}
@ -2185,8 +2214,15 @@ def build_service_department_tree() -> List[Dict[str, Any]]:
tree.append(dept)
# Sort tree recursively
# 排序规则:
# 1. 没有子节点的排在前面 (叶子节点优先)
# 2. 有子节点的排在后面 (父节点在后)
# 3. 同类节点按名称排序
def sort_tree(nodes: List[Dict[str, Any]]) -> None:
nodes.sort(key=lambda x: x.get("name", ""))
nodes.sort(key=lambda x: (
1 if x.get("children") else 0, # 有子节点的排后面 (1),无子节点的排前面 (0)
x.get("name", "") # 名称排序
))
for node in nodes:
if node.get("children"):
sort_tree(node["children"])
@ -2551,43 +2587,162 @@ def _fetch_theme_summary(cur: pg.Cursor, theme_id: str) -> Optional[Dict[str, An
return _serialize_theme_row(record)
def list_all_themes() -> List[Dict[str, Any]]:
def list_all_themes(
start_date: Optional[str] = None,
end_date: Optional[str] = None,
department_ids: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
# Base query components
select_fields = """
t.id,
t.name,
COUNT(DISTINCT CASE WHEN {filter_condition} THEN rtp.permit_id END) AS permit_count,
COUNT(DISTINCT CASE WHEN {filter_condition} THEN rtp.region_id END) AS region_count,
STRING_AGG(DISTINCT r.name, ',') AS region_names
"""
# Base joins
joins = [
"LEFT JOIN region_theme_permits rtp ON rtp.theme_id = t.id",
"LEFT JOIN regions r ON rtp.region_id = r.id"
]
conditions = []
params = []
# Conditionally add joins and conditions
if start_date or end_date:
joins.append("LEFT JOIN region_permit_details rpd ON rpd.permit_id = rtp.permit_id AND rpd.region_id = rtp.region_id")
if department_ids:
joins.append("LEFT JOIN permit_sources ps ON ps.permit_id = rtp.permit_id AND ps.region_id = rtp.region_id")
if start_date:
conditions.append("rpd.updated_at >= %s")
params.append(start_date)
if end_date:
conditions.append("rpd.updated_at <= %s")
params.append(end_date)
if department_ids:
expanded_ids = _expand_department_family(department_ids)
if expanded_ids:
placeholders = ','.join(['%s'] * len(expanded_ids))
conditions.append(f"(ps.uploader_department_id IN ({placeholders}) OR ps.bound_department_id IN ({placeholders}))")
params.extend(expanded_ids * 2)
filter_condition = " AND ".join(conditions) if conditions else "TRUE"
join_clause = "\n".join(joins)
sql = f"""
SELECT
{select_fields.format(filter_condition=filter_condition)}
FROM themes t
{join_clause}
GROUP BY t.id, t.name
ORDER BY t.name ASC
"""
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(_THEME_SUMMARY_SELECT + " GROUP BY t.id, t.name ORDER BY t.name ASC")
cur.execute(sql, params)
rows = cur.fetchall()
columns = tuple(col[0] for col in cur.description)
items: List[Dict[str, Any]] = []
for row in rows:
record = {columns[idx]: row[idx] for idx in range(len(columns))}
items.append(_serialize_theme_row(record))
# 过滤掉"不涉及"主题
items = [item for item in items if item.get("name") != "不涉及"]
return items
def list_unbound_permits() -> List[Dict[str, Any]]:
"""Return all permits that are in a region but not bound to any theme in that region."""
sql = """
def list_unbound_permits(
visibility: Optional[str] = None,
search_text: Optional[str] = None,
department_ids: Optional[List[str]] = None,
region_id: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Return all permits that are in a region but not bound to any theme in that region.
Args:
visibility: 'visible', 'hidden', or None (all)
search_text: Filter by permit name (keyword)
department_ids: Filter by uploader/bound department
region_id: Filter by region
"""
filters = ["rtp.theme_id IS NULL"]
params = []
if visibility == 'visible':
filters.append("(rpd.is_v2_visible IS TRUE OR rpd.is_v2_visible IS NULL)")
elif visibility == 'hidden':
filters.append("rpd.is_v2_visible IS FALSE")
if search_text:
filters.append("p.name ILIKE %s")
params.append(f"%{search_text}%")
if department_ids:
expanded_ids = _expand_department_family(department_ids)
if expanded_ids:
placeholders = ','.join(['%s'] * len(expanded_ids))
filters.append(f"(ps.uploader_department_id IN ({placeholders}) OR ps.bound_department_id IN ({placeholders}))")
params.extend(expanded_ids * 2)
if region_id:
filters.append("rpd.region_id = %s")
params.append(region_id)
ps_join = ""
if department_ids:
ps_join = """
LEFT JOIN permit_sources ps
ON ps.permit_id = rpd.permit_id
AND ps.region_id = rpd.region_id
"""
where_clause = " AND ".join(filters)
sql = f"""
SELECT
r.id AS region_id,
r.name AS region_name,
p.id AS permit_id,
p.name AS permit_name,
rpd.unit_name,
rpd.updated_at
rpd.updated_at,
COALESCE(rpd.is_v2_visible, true) AS is_v2_visible,
COALESCE(risk_counts.risk_count, 0) AS risk_count
FROM region_permit_details rpd
JOIN regions r ON r.id = rpd.region_id
JOIN permits p ON p.id = rpd.permit_id
LEFT JOIN region_theme_permits rtp
ON rtp.region_id = rpd.region_id
AND rtp.permit_id = rpd.permit_id
WHERE rtp.theme_id IS NULL
LEFT JOIN (
SELECT
permit_id,
region_id,
COUNT(risk_id) AS risk_count
FROM region_permit_risks
GROUP BY permit_id, region_id
) risk_counts ON risk_counts.permit_id = rpd.permit_id
AND risk_counts.region_id = rpd.region_id
{ps_join}
WHERE {where_clause}
ORDER BY r.name, p.name
"""
items: List[Dict[str, Any]] = []
try:
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql)
cur.execute(sql, params)
rows = cur.fetchall()
columns = tuple(col[0] for col in cur.description)
for row in rows:
@ -3178,6 +3333,36 @@ def _fetch_permit_all_theme_flags(
return {str(permit_id): True for (permit_id,) in rows}
def update_permit_v2_visibility(
region_id: str, permit_id: str, is_visible: bool, operator: str = "admin"
) -> bool:
"""Toggle the visibility of a permit in V2 API retrieval for a specific region."""
with _lic_pg_conn() as conn:
_ensure_v2_visibility_column(conn)
cur = conn.cursor()
cur.execute(
"""
UPDATE region_permit_details
SET is_v2_visible = %s, updated_at = now()
WHERE region_id = %s AND permit_id = %s
""",
(is_visible, region_id, permit_id),
)
success = cur.rowcount > 0
if success:
conn.commit()
log_operation(
operator=operator,
operation_type="UPDATE",
target_type="PERMIT_VISIBILITY",
target_id=permit_id,
target_name=f"Visibility set to {is_visible}",
change_summary=f"Updated v2_visibility for permit {permit_id} in region {region_id} to {is_visible}",
details={"region_id": region_id, "permit_id": permit_id, "is_v2_visible": is_visible},
)
return success
def _permit_binds_all_themes(conn: pg.Connection, region_id: str, permit_id: str) -> bool:
"""Check override flag for a single region-permit pair."""
global _PERMIT_THEME_OVERRIDE_SCHEMA_READY
@ -3329,10 +3514,17 @@ def list_region_theme_options() -> List[Dict[str, str]]:
FROM region_themes rt
JOIN regions r ON r.id = rt.region_id
JOIN themes t ON t.id = rt.theme_id
WHERE EXISTS (
SELECT 1 FROM region_theme_permits rtp
JOIN region_permit_details rpd ON rpd.region_id = rtp.region_id AND rpd.permit_id = rtp.permit_id
WHERE rtp.region_id = rt.region_id AND rtp.theme_id = rt.theme_id
AND COALESCE(rpd.is_v2_visible, true) = true
)
ORDER BY r.name, t.name
"""
out: List[Dict[str, str]] = []
with _lic_pg_conn() as conn:
_ensure_v2_visibility_column(conn)
cur = conn.cursor()
cur.execute(sql)
for region_id, region_name, theme_id, theme_name in cur.fetchall():
@ -3387,6 +3579,79 @@ def get_visible_permits(
user_department = current_user.get("department", {}) if current_user else {}
user_dept_id = user_department.get("id")
# ===== Superuser bypass: skip department hierarchy filtering =====
from lawrisk.api.auth import is_superuser
if is_superuser(current_user):
logger.info("User %s is superuser, bypassing department restrictions", username)
# Superuser can see all permits, only apply explicit filters
with _lic_pg_conn() as conn:
_ensure_service_department_schema(conn)
_ensure_permit_sources_table(conn)
cur = conn.cursor()
# 解析筛选部门(用部门树收缩范围)
def _resolve_target_departments(dept_token: Optional[str]) -> List[str]:
if not dept_token:
return []
try:
target_uuid = str(dept_token).strip()
except Exception:
return []
return _fetch_department_descendants(cur, target_uuid)
filter_dept_sets: List[List[str]] = []
if filters.get("municipal_dept_id"):
filter_dept_sets.append(_resolve_target_departments(filters.get("municipal_dept_id")))
if filters.get("district_dept_id"):
filter_dept_sets.append(_resolve_target_departments(filters.get("district_dept_id")))
sql = """
SELECT DISTINCT p.id, p.name
FROM permit_sources ps
JOIN permits p ON p.id = ps.permit_id
LEFT JOIN regions r ON r.id = ps.region_id
"""
where_clauses: List[str] = []
params: List[Any] = []
# Superuser bypass: NO department hierarchy filtering
# Only apply explicit department filters if specified
# 额外的部门筛选(前端传入)
for target_set in filter_dept_sets:
if not target_set:
continue
where_clauses.append(
"(ps.uploader_department_id = ANY(%s) OR ps.bound_department_id = ANY(%s))"
)
params.extend([target_set, target_set])
if filters.get("region"):
where_clauses.append(
"(ps.region_id::text = %s OR LOWER(r.name) = LOWER(%s))"
)
region = filters["region"]
params.extend([region, region])
if filters.get("search_text"):
where_clauses.append("LOWER(p.name) LIKE LOWER(%s)")
params.append(f"%{filters['search_text']}%")
if where_clauses:
sql += " WHERE " + " AND ".join(where_clauses)
sql += " ORDER BY p.name"
permits: List[Dict[str, str]] = []
cur.execute(sql, params)
for permit_id, permit_name in cur.fetchall():
permits.append({"id": str(permit_id), "name": str(permit_name)})
logger.info("Superuser %s can view %d permits (bypassed department restrictions)", username, len(permits))
return permits
# ===== End superuser bypass =====
if not current_user or not user_dept_id:
logger.warning("Permission denied: User %s has no department binding", username)
return []
@ -3506,10 +3771,12 @@ def list_region_permit_catalog(region_id: str) -> List[Dict[str, Any]]:
p.name AS permit_name,
rtp.theme_id,
COALESCE(t.name, '') AS theme_name,
COUNT(rpr.risk_id) OVER (PARTITION BY rtp.permit_id) AS risk_total
COUNT(rpr.risk_id) OVER (PARTITION BY rtp.permit_id) AS risk_total,
COALESCE(rpd.is_v2_visible, true) AS is_v2_visible
FROM region_theme_permits rtp
JOIN permits p ON p.id = rtp.permit_id
LEFT JOIN themes t ON t.id = rtp.theme_id
LEFT JOIN region_permit_details rpd ON rpd.region_id = rtp.region_id AND rpd.permit_id = rtp.permit_id
LEFT JOIN region_permit_risks rpr
ON rpr.region_id = rtp.region_id
AND rpr.permit_id = rtp.permit_id
@ -3518,9 +3785,10 @@ def list_region_permit_catalog(region_id: str) -> List[Dict[str, Any]]:
"""
catalog_map: "OrderedDict[str, Dict[str, Any]]" = OrderedDict()
with _lic_pg_conn() as conn:
_ensure_v2_visibility_column(conn)
cur = conn.cursor()
cur.execute(sql, (region_id,))
for permit_id, permit_name, theme_id, theme_name, risk_total in cur.fetchall():
for permit_id, permit_name, theme_id, theme_name, risk_total, v2_visible in cur.fetchall():
pid = str(permit_id)
entry = catalog_map.setdefault(
pid,
@ -3528,6 +3796,7 @@ def list_region_permit_catalog(region_id: str) -> List[Dict[str, Any]]:
"id": pid,
"name": str(permit_name),
"risk_count": int(risk_total or 0),
"is_v2_visible": bool(v2_visible),
"theme": {"id": "", "name": ""},
"themes": [],
},
@ -3691,9 +3960,23 @@ def _load_permit_sources_for_region(
def load_permits_and_risks(
region_id: str, theme_id: Optional[str] = None, permit_id: Optional[str] = None
region_id: str,
theme_id: Optional[str] = None,
permit_id: Optional[str] = None,
only_visible: bool = False,
current_user: Optional[Dict[str, Any]] = None
) -> List[Dict[str, object]]:
"""Return permits with attached risk entries for a region (optionally filtered by theme)."""
"""Return permits with attached risk entries for a region (optionally filtered by theme).
Args:
region_id: Region ID to query
theme_id: Optional theme ID filter
permit_id: Optional permit ID filter
only_visible: If True, only return v2 visible permits
current_user: Optional user dict for permission filtering. If provided, will filter
permits based on user's department tree (user can only see permits
uploaded/bound by their department or its descendants)
"""
# Ensure optional permit file tables exist before running user queries.
try:
_ensure_permit_file_schema()
@ -3725,7 +4008,8 @@ def load_permits_and_risks(
rpd.filler_name,
COALESCE(pad.department_name, rpd.unit_name) AS unit_name,
rpd.source_update_date,
rpd.contact_info
rpd.contact_info,
COALESCE(rpd.is_v2_visible, true) AS is_v2_visible
FROM region_permit_details rpd
JOIN permits p ON p.id = rpd.permit_id
LEFT JOIN permit_approval_departments pad
@ -3748,6 +4032,8 @@ def load_permits_and_risks(
if permit_id is not None:
sql += " AND rpd.permit_id = %s"
params.append(permit_id)
if only_visible:
sql += " AND COALESCE(rpd.is_v2_visible, true) = true"
sql += """
ORDER BY p.name, LENGTH(rpr.serial_number), rpr.serial_number, rk.risk_content
@ -3755,6 +4041,7 @@ def load_permits_and_risks(
permits: Dict[str, Dict[str, object]] = {}
risk_seen_map: Dict[str, Set[str]] = {} # pid -> set of risk_ids
with _lic_pg_conn() as conn:
_ensure_v2_visibility_column(conn)
_ensure_contact_info_column(conn)
cur = conn.cursor()
cur.execute(sql, tuple(params))
@ -3779,6 +4066,7 @@ def load_permits_and_risks(
unit_name,
source_update_date,
contact_info,
v2_visible,
) = row
pid = str(permit_id)
theme_id_value = str(row_theme_id) if row_theme_id else ""
@ -4124,7 +4412,14 @@ def delete_stored_permit_file(file_id: str) -> bool:
return cur.rowcount > 0
def start_import_session_from_file(file_id: str, *, requested_by: Optional[str] = None) -> Dict[str, Any]:
def start_import_session_from_file(
file_id: str,
*,
requested_by: Optional[str] = None,
uploader_department_id: Optional[str] = None,
bound_department_id: Optional[str] = None,
binding_mode: str = "auto",
) -> Dict[str, Any]:
"""Create a fresh import session using an archived permit file."""
normalized = _clean_text(file_id)
if not normalized:
@ -4160,6 +4455,9 @@ def start_import_session_from_file(file_id: str, *, requested_by: Optional[str]
filename=filename or "许可导入.xlsx",
content_type=content_type or "application/octet-stream",
uploaded_by=effective_uploader,
uploader_department_id=uploader_department_id,
bound_department_id=bound_department_id,
binding_mode=binding_mode,
)
@ -4220,7 +4518,8 @@ def find_permit_contexts_by_name(permit_name: str) -> List[Dict[str, str]]:
rtp.theme_id,
t.name AS theme_name,
p.id AS permit_id,
p.name AS permit_name
p.name AS permit_name,
COALESCE(rpd.is_v2_visible, true) AS is_v2_visible
FROM region_permit_details rpd
JOIN permits p ON p.id = rpd.permit_id
JOIN regions r ON r.id = rpd.region_id
@ -4231,6 +4530,7 @@ def find_permit_contexts_by_name(permit_name: str) -> List[Dict[str, str]]:
"""
ordered: OrderedDict[Tuple[str, str], Dict[str, str]] = OrderedDict()
with _lic_pg_conn() as conn:
_ensure_v2_visibility_column(conn)
cur = conn.cursor()
cur.execute(sql, (permit_name,))
rows = cur.fetchall()
@ -4244,7 +4544,8 @@ def find_permit_contexts_by_name(permit_name: str) -> List[Dict[str, str]]:
rtp.theme_id,
t.name AS theme_name,
p.id AS permit_id,
p.name AS permit_name
p.name AS permit_name,
COALESCE(rpd.is_v2_visible, true) AS is_v2_visible
FROM region_permit_details rpd
JOIN permits p ON p.id = rpd.permit_id
JOIN regions r ON r.id = rpd.region_id
@ -4257,7 +4558,7 @@ def find_permit_contexts_by_name(permit_name: str) -> List[Dict[str, str]]:
rows = cur.fetchall()
for row in rows:
region_id, region_name, theme_id, theme_name, permit_id, canonical_name = row
region_id, region_name, theme_id, theme_name, permit_id, canonical_name, v2_visible = row
rid = str(region_id)
pid = str(permit_id)
tid = str(theme_id) if theme_id else ""
@ -4272,11 +4573,12 @@ def find_permit_contexts_by_name(permit_name: str) -> List[Dict[str, str]]:
"theme_name": tname,
"permit_id": pid,
"permit_name": str(canonical_name),
"is_v2_visible": bool(v2_visible),
}
return list(ordered.values())
return [item for item in ordered.values() if item.get("is_v2_visible")]
def load_theme_payload(region_id: str, theme_id: str) -> Dict[str, object]:
def load_theme_payload(region_id: str, theme_id: str, only_visible: bool = False) -> Dict[str, object]:
"""Assemble full data bundle for a region-theme selection."""
info_sql = """
SELECT r.id, r.name, t.id, t.name
@ -4294,7 +4596,7 @@ def load_theme_payload(region_id: str, theme_id: str) -> Dict[str, object]:
raise ValueError("Region/theme combination not found")
region_uuid, region_name, theme_uuid, theme_name = row
permits = load_permits_and_risks(region_id, theme_id)
permits = load_permits_and_risks(region_id, theme_id, only_visible=only_visible)
return {
"region": {"id": str(region_uuid), "name": str(region_name)},
"theme": {"id": str(theme_uuid), "name": str(theme_name)},
@ -6133,28 +6435,86 @@ def restore_permit_risk_snapshot_batch(
}
def _expand_department_family(department_ids: List[str]) -> List[str]:
"""
Expand a list of department IDs to include their entire family (parents and children).
This enables 'same department' visibility across city (parent) and district (child) levels.
"""
if not department_ids:
return []
# Use a set to avoid duplicates
expanded_ids = set()
roots = set()
with _lic_pg_conn() as conn:
cur = conn.cursor()
# 1. Find roots for the input departments
# We assume a 2-level hierarchy for now (City -> District) based on current seeds.
# If deeply nested, Recursive CTE would be better, but this suffices for current requirement.
placeholders = ','.join(['%s'] * len(department_ids))
sql_find_roots = f"SELECT id, parent_id FROM service_departments WHERE id IN ({placeholders})"
cur.execute(sql_find_roots, department_ids)
for dept_id, parent_id in cur.fetchall():
# If it has a parent, the parent is the root (or closer to it)
if parent_id:
roots.add(str(parent_id))
else:
# If no parent, it IS the root
roots.add(str(dept_id))
if not roots:
return department_ids
# 2. Find all departments that share these roots (the roots themselves and their children)
root_list = list(roots)
root_placeholders = ','.join(['%s'] * len(root_list))
# Select where ID is a root OR Parent ID is a root
sql_expand = f"""
SELECT id
FROM service_departments
WHERE id IN ({root_placeholders})
OR parent_id IN ({root_placeholders})
"""
# We pass root_list twice because we use the placeholders twice
cur.execute(sql_expand, root_list + root_list)
for row in cur.fetchall():
expanded_ids.add(str(row[0]))
return list(expanded_ids)
def filter_permits_advanced(
regions: Optional[List[str]] = None,
themes: Optional[List[str]] = None,
departments: Optional[List[str]] = None,
search_text: Optional[str] = None,
visibility: Optional[str] = None, # 'visible', 'hidden' or None/all
limit: int = 100,
offset: int = 0,
expand_department_family: bool = False, # NEW: Control whether to expand to include parent departments
) -> Dict[str, Any]:
"""Filter permits using multiple dimensions (region, theme, department, search text).
"""Filter permits using multiple dimensions (region, theme, department, search text, visibility).
Args:
regions: List of region IDs to filter by (supports multi-select)
themes: List of theme IDs to filter by (supports multi-select)
departments: List of department IDs to filter by (supports multi-select)
search_text: Search in permit name
visibility: Filter by v2 visibility ('visible', 'hidden')
limit: Maximum number of results to return
offset: Offset for pagination
expand_department_family: If True, expand departments to include entire family (parents + children).
If False (default), use departments as-is for strict hierarchy control.
Returns:
Dictionary containing filtered permits and metadata
"""
print(f"[DEBUG] filter_permits_advanced called with limit={limit}, offset={offset}")
print(f"[DEBUG] filter_permits_advanced called with limit={limit}, offset={offset}, visibility={visibility}")
# Use subquery to avoid DISTINCT with window functions issue
# Subquery to get unique permits matching filters with pagination
# We use a CTE to ensure limit/offset apply to unique permits, not to rows (which can duplicate per theme)
@ -6172,14 +6532,31 @@ def filter_permits_advanced(
base_params.extend(themes)
if departments:
placeholders = ', '.join(['%s'] * len(departments))
# Conditionally expand departments based on expand_department_family parameter
if expand_department_family:
# Expand departments to include family (parent + children)
# This allows cross-level visibility for the "same" department
expanded_departments = _expand_department_family(departments)
print(f"[DEBUG] Expanded department family: {len(departments)} -> {len(expanded_departments)} departments")
else:
# Strict hierarchy: use departments as-is (only self + descendants)
# This ensures users can only see permits from their own department and descendants
expanded_departments = departments
print(f"[DEBUG] Strict department mode: {len(departments)} departments (no family expansion)")
placeholders = ', '.join(['%s'] * len(expanded_departments))
base_where += f" AND (ps.uploader_department_id IN ({placeholders}) OR ps.bound_department_id IN ({placeholders}))"
base_params.extend(departments * 2)
base_params.extend(expanded_departments * 2)
if search_text:
base_where += f" AND LOWER(p.name) LIKE LOWER(%s)"
base_params.append(f"%{search_text}%")
if visibility == 'visible':
base_where += " AND (rpd.is_v2_visible IS TRUE OR rpd.is_v2_visible IS NULL)"
elif visibility == 'hidden':
base_where += " AND rpd.is_v2_visible IS FALSE"
sql = f"""
WITH filtered_p AS (
SELECT rpd.permit_id, rpd.region_id
@ -6204,7 +6581,8 @@ def filter_permits_advanced(
rtp.theme_id,
t.name AS theme_name,
COALESCE(risk_counts.risk_count, 0) AS risk_count,
COALESCE(theme_counts.theme_count, 0) AS theme_count
COALESCE(theme_counts.theme_count, 0) AS theme_count,
COALESCE(rpd.is_v2_visible, true) AS is_v2_visible
FROM filtered_p fp
JOIN region_permit_details rpd ON rpd.permit_id = fp.permit_id AND rpd.region_id = fp.region_id
JOIN permits p ON p.id = rpd.permit_id
@ -6248,6 +6626,7 @@ def filter_permits_advanced(
theme_name,
risk_count,
theme_count,
v2_visible,
) in cur.fetchall():
pid = str(permit_id)
key = f"{pid}_{rid}"
@ -6262,6 +6641,7 @@ def filter_permits_advanced(
"themes": [],
"risk_count": int(risk_count or 0),
"theme_count": int(theme_count or 0),
"is_v2_visible": bool(v2_visible),
}
if tid or theme_name:
@ -6308,8 +6688,72 @@ def filter_permits_advanced(
}
def _ensure_v2_visibility_column(conn: Optional[pg.Connection] = None) -> None:
"""Ensure that the is_v2_visible column exists in region_permit_details."""
global _V2_VISIBILITY_SCHEMA_READY
if _V2_VISIBILITY_SCHEMA_READY:
return
with _V2_VISIBILITY_SCHEMA_LOCK:
if _V2_VISIBILITY_SCHEMA_READY:
return
sql = "ALTER TABLE region_permit_details ADD COLUMN IF NOT EXISTS is_v2_visible BOOLEAN DEFAULT TRUE"
if conn is not None:
original_autocommit = conn.autocommit
try:
conn.autocommit = True
cur = conn.cursor()
cur.execute(sql)
finally:
conn.autocommit = original_autocommit
else:
with _lic_pg_conn(autocommit=True) as ensure_conn:
cur = ensure_conn.cursor()
cur.execute(sql)
_V2_VISIBILITY_SCHEMA_READY = True
def _ensure_contact_info_column(conn: pg.Connection) -> None:
"Ensure that the contact_info column exists in region_permit_details."
# This check is now redundant since schema fix script was run, but kept for safety
pass
def update_permit_v2_visibility(
region_id: str,
permit_id: str,
is_visible: bool,
operator: str = "admin"
) -> bool:
"""Update the is_v2_visible flag for a specific region-permit pair."""
_ensure_v2_visibility_column()
sql = """
UPDATE region_permit_details
SET is_v2_visible = %s,
updated_at = now()
WHERE region_id = %s AND permit_id = %s
"""
with _lic_pg_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (is_visible, region_id, permit_id))
count = cur.rowcount
conn.commit()
if count > 0:
log_operation(
operator=operator,
operation_type="UPDATE",
target_type="PERMIT_VISIBILITY",
target_id=f"{region_id}:{permit_id}",
target_name=f"Permit {permit_id} in region {region_id}",
change_summary=f"Set is_v2_visible to {is_visible}",
details={"is_visible": is_visible, "region_id": region_id, "permit_id": permit_id}
)
return True
return False

View File

@ -10,6 +10,7 @@ from typing import Any, Dict
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", ".."))
TEMPLATE_DIR = os.path.join(PROJECT_ROOT, "data", "template")
PERMIT_TEMPLATE_FILENAME = "风险提示表 模板.xlsx"
PERMIT_SAMPLE_FILENAME = "风险提示表(仅销售预包装食品备案,市场监管部门)(样表).xlsx"
TEMPLATE_META_FILENAME = "template_meta.json"
@ -18,6 +19,11 @@ def get_permit_template_path() -> str:
return os.path.join(TEMPLATE_DIR, PERMIT_TEMPLATE_FILENAME)
def get_permit_sample_path() -> str:
"""Return the absolute path to the permit import sample file."""
return os.path.join(TEMPLATE_DIR, PERMIT_SAMPLE_FILENAME)
def _meta_path() -> str:
return os.path.join(TEMPLATE_DIR, TEMPLATE_META_FILENAME)

View File

@ -0,0 +1,19 @@
Data Statistics Tab Permission Update
=====================================
I have restricted access to the "Data Statistics" (数据统计) tab in the Admin Dashboard.
**Changes Made:**
1. **Frontend (static/db_admin.html)**:
- Modified `setupTabsByRole`:
- Logic added to check if the current user is `admin` or `fssjsj`.
- If unauthorized, the "Data Statistics" tab button is hidden (`display: none`).
- If an unauthorized user attempts to access the tab via URL (`?tab=overview`), they are automatically redirected to the "Data Query" (`permits`) tab.
- Data loading for the overview tab is prevented for unauthorized users.
- Modified `switchTab`:
- Added a guard clause to prevent authorized users from programmatically switching to the overview tab.
**Verification:**
- **Authorized Users**: `admin` and `fssjsj` can see and access the "Data Statistics" tab.
- **Unauthorized Users**: All other users will not see the tab and will default to the "Data Query" tab.

File diff suppressed because it is too large Load Diff

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

View File

@ -2598,14 +2598,23 @@
const response = await fetchJSON(`${API_BASE}/admin/service-departments/tree`);
// 调试日志 1: API 返回数据检查
console.log('=== 组织架构层级调试 ===');
console.log('API 返回树形结构:', JSON.stringify(response.data.tree || [], null, 2));
const tree = (response.data && response.data.tree) || [];
orgChartData.tree = tree;
orgChartData.parentMap = {};
orgChartData.nodeMap = {};
orgChartData.allNodes = flattenTree(tree);
// 调试日志 2: 扁平化节点检查
console.log('扁平化节点数量:', orgChartData.allNodes.length);
console.log('所有节点 level 值:');
orgChartData.allNodes.forEach(n => {
console.log(` [Level ${n.level}] ${n.name} (ID: ${n.id}, Parent: ${n.node.parent_id || 'root'})`);
});
orgChartLoaded = true;
@ -2618,6 +2627,19 @@
renderOrgTree(tree);
// 调试日志 4: 渲染完成后的总结
console.log('=== 渲染完成 ===');
console.log('根节点数量:', tree.length);
console.log('总节点数量:', orgChartData.allNodes.length);
console.log('部门名称中包含"交通运输"的节点:');
orgChartData.allNodes
.filter(n => n.name.includes('交通运输'))
.forEach(n => console.log(` - ${n.name} (Level: ${n.level})`));
console.log('部门名称中包含"三水"的节点:');
orgChartData.allNodes
.filter(n => n.name.includes('三水'))
.forEach(n => console.log(` - ${n.name} (Level: ${n.level})`));
console.log('==================');
setTimeout(() => {
@ -2702,6 +2724,14 @@
nodeDiv.setAttribute('data-level', level);
nodeDiv.style.setProperty('--indent', `${level * 30}px`);
// 调试日志 3: 渲染时的 level 和缩进检查
console.log(`🔍 渲染节点: "${node.name}"`);
console.log(` - level: ${level}`);
console.log(` - indent: ${level * 30}px`);
console.log(` - node.parent_id: ${node.parent_id || 'null'}`);
console.log(` - node.unit_level: ${node.unit_level || 'undefined'}`);
console.log(` - children 数量: ${node.children ? node.children.length : 0}`);
const header = document.createElement('div');
header.className = 'org-node-header';
header.setAttribute('data-node-id', nodeId);

View File

@ -0,0 +1,72 @@
import os
import pg8000
import sys
def load_env(env_path='.env'):
config = {}
if not os.path.exists(env_path):
# try parent
env_path = os.path.join('..', '.env')
if not os.path.exists(env_path):
print(f"Warning: {env_path} not found")
return config
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
return config
def inspect_remote_db():
config = load_env()
with open('remote_inspection_output.txt', 'w', encoding='utf-8') as f:
def log(msg):
print(msg)
f.write(str(msg) + "\n")
log("\nConnecting to licensing_risks database...")
try:
conn = pg8000.connect(
user=config.get('LIC_PG_USER', 'postgres'),
password=config.get('LIC_PG_PASSWORD'),
host=config.get('LIC_PG_HOST', 'localhost'),
port=int(config.get('LIC_PG_PORT', 5432)),
database=config.get('LIC_PG_DATABASE', 'licensing_risks')
)
log("✅ Connection successful!")
cursor = conn.cursor()
log("\n--- Listing Tables ---\n")
cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")
tables = cursor.fetchall()
for table in tables:
log(table[0])
log("\n--- Checking Users/Accounts ---\n")
log("Found table: auth_users")
cursor.execute(f"SELECT * FROM auth_users") # Get all users to find the specific one
users = cursor.fetchall()
for user in users:
log(user)
# Print columns
cursor.execute(f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'auth_users'")
cols = cursor.fetchall()
log("\nColumns:")
for col in cols:
log(col)
conn.close()
except Exception as e:
log(f"❌ Database error: {e}")
if __name__ == "__main__":
inspect_remote_db()

62
tools/inspect_theme.py Normal file
View File

@ -0,0 +1,62 @@
import sqlite3
import os
db_path = 'law_risk.db'
def inspect_db():
if not os.path.exists(db_path):
print(f"Database not found at {db_path}")
return
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# List tables
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
print("Tables:")
for table in tables:
print(table[0])
print("\n--- Schemas ---\n")
for table in tables:
table_name = table[0]
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
print(f"Table: {table_name}")
for col in columns:
print(col)
print("-" * 20)
print("\n--- Searching for Theme ---\n")
try:
# Assuming table is legal_risk_theme based on previous grep, checking anyway
cursor.execute("SELECT * FROM themes WHERE name LIKE '%企业开办%'")
themes = cursor.fetchall()
print("Matching Themes:")
for theme in themes:
print(theme)
if themes:
theme_id = themes[0][0] # Assuming first col is ID
print(f"\nSearching for items with theme_id: {theme_id}")
# Try to find table that links theme and items. legal_risk_licensing_item?
print(f"Theme ID: {theme_id}")
cursor.execute(f"SELECT * FROM region_theme_permits WHERE theme_id = '{theme_id}'")
links = cursor.fetchall()
print(f"Associated Links ({len(links)}):")
for link in links:
print(link)
permit_id = link[2]
cursor.execute(f"SELECT * FROM permits WHERE id = '{permit_id}'")
permit = cursor.fetchone()
print(f" -> Permit: {permit}")
except Exception as e:
print(f"Error querying data: {e}")
conn.close()
if __name__ == "__main__":
inspect_db()

View File

@ -0,0 +1,44 @@
import os
import pg8000.dbapi as pg
def load_env():
env_vars = {}
try:
with open('.env', 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
k, v = line.split('=', 1)
env_vars[k.strip()] = v.strip()
except Exception:
pass
return env_vars
env = load_env()
HOST = env.get("LIC_PG_HOST", "172.24.240.1")
PORT = int(env.get("LIC_PG_PORT", "5432"))
USER = env.get("LIC_PG_USER", "postgres")
PASSWORD = env.get("LIC_PG_PASSWORD", "")
DATABASE = env.get("LIC_PG_DATABASE", "licensing_risks")
print(f"Connecting to {HOST}:{PORT}/{DATABASE} as {USER}")
try:
conn = pg.connect(host=HOST, port=PORT, user=USER, password=PASSWORD, database=DATABASE)
cursor = conn.cursor()
print("Tables with 'theme' in name:")
cursor.execute("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' AND table_name LIKE '%theme%'")
for row in cursor.fetchall():
print(f" - {row[0]}")
# Describe table
print(f" Columns for {row[0]}:")
cursor.execute(f"SELECT column_name, data_type FROM information_schema.columns WHERE table_name = '{row[0]}'")
for col in cursor.fetchall():
print(f" - {col[0]} ({col[1]})")
conn.close()
except Exception as e:
print(f"Error: {e}")

View File

@ -0,0 +1,174 @@
import os
import sys
import subprocess
import datetime
import pg8000.dbapi as pg
# -----------------------------------------------------------------------------
# 1. Environment & Configuration
# -----------------------------------------------------------------------------
def load_env():
env_vars = {}
try:
with open('.env', 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
k, v = line.split('=', 1)
env_vars[k.strip()] = v.strip()
except Exception:
pass
return env_vars
env = load_env()
HOST = env.get("LIC_PG_HOST", "172.24.240.1")
PORT = int(env.get("LIC_PG_PORT", "5432"))
USER = env.get("LIC_PG_USER", "postgres")
PASSWORD = env.get("LIC_PG_PASSWORD", "")
DATABASE = env.get("LIC_PG_DATABASE", "licensing_risks")
# -----------------------------------------------------------------------------
# 2. Database Backup
# -----------------------------------------------------------------------------
def perform_backup():
date_str = datetime.datetime.now().strftime("%Y-%m-%d")
backup_file = os.path.abspath(f"data/backup_{date_str}.sql")
# Ensure data directory exists
os.makedirs(os.path.dirname(backup_file), exist_ok=True)
print(f"Starting backup to {backup_file}...")
# Set PGPASSWORD for pg_dump
env_copy = os.environ.copy()
env_copy["PGPASSWORD"] = PASSWORD
# Construct command
# pg_dump -h host -p port -U user -f file dbname
cmd = [
"pg_dump",
"-h", HOST,
"-p", str(PORT),
"-U", USER,
"-f", backup_file,
DATABASE
]
try:
subprocess.run(cmd, env=env_copy, check=True)
print("Backup completed successfully.")
return True
except subprocess.CalledProcessError as e:
print(f"Backup failed: {e}")
return False
except FileNotFoundError:
print("pg_dump not found in PATH. Skipping backup (RISKY!).")
# In this specific task, if backup fails, we should probably stop or ask user.
# But given I am an agent, I should try to warn and maybe proceed if trivial, but deleting data is dangerous.
# I will stop if backup fails.
return False
# -----------------------------------------------------------------------------
# 3. Data Cleanup
# -----------------------------------------------------------------------------
def perform_cleanup():
print("Connecting to database...")
try:
conn = pg.connect(host=HOST, port=PORT, user=USER, password=PASSWORD, database=DATABASE)
cursor = conn.cursor()
target_theme_name = "不涉及"
# 1. Find theme ID
print(f"Finding theme '{target_theme_name}'...")
cursor.execute("SELECT id FROM themes WHERE name = %s", (target_theme_name,))
row = cursor.fetchone()
if not row:
print(f"Theme '{target_theme_name}' not found. Nothing to do.")
conn.close()
return
theme_id = row[0]
print(f"Found theme_id: {theme_id}")
# 2. Find associated permits (matters)
# Using region_theme_permits as the binding table
print("Finding associated permits...")
cursor.execute("SELECT DISTINCT permit_id FROM region_theme_permits WHERE theme_id = %s", (theme_id,))
permit_rows = cursor.fetchall()
permit_ids = [r[0] for r in permit_rows]
print(f"Found {len(permit_ids)} permits bound to '{target_theme_name}'.")
if not permit_ids:
print("No permits bound to this theme. Proceeding to delete theme only.")
# Start Transaction
try:
# 3. Clear TOPICS for these permits (Unbind ALL themes from these permits)
if permit_ids:
# Need to convert UUIDs to string for SQL IN clause or use execute with any
# pg8000 handles list/tuple for IN nicely if formatted manually, or we can loop.
# Batch delete is better.
# DELETE FROM region_theme_permits WHERE permit_id IN (...)
# Format for IN clause
permit_ids_tuple = tuple(permit_ids)
if len(permit_ids) == 1:
# distinct check ensures > 0, but tuple of 1 element needs comma
pass
# Note: pg8000 might struggle with large IN clauses in param logic depending on version.
# But let's try standard parameterized query.
# Actually, to be safe with syntax: "IN %s" with a tuple works in some drivers,
# but often 'IN (%s, %s)' is needed.
# I'll generate placeholders.
placeholders = ",".join(["%s"] * len(permit_ids))
print(f"Clearing ALL topic bindings for these {len(permit_ids)} permits...")
# Delete from region_theme_permits
cursor.execute(f"DELETE FROM region_theme_permits WHERE permit_id IN ({placeholders})", permit_ids_tuple)
print(f"Deleted {cursor.rowcount} rows from region_theme_permits.")
# Also check region_permit_theme_overrides if they exist for these permits?
# The user said "clear topics". Overrides might not be "bindings" per se, but let's check table existence.
# I'll skip overrides to avoid complexity unless requested, sticking to "bindings".
# 4. Delete the theme itself from all related tables
print(f"Deleting theme '{target_theme_name}' references...")
# Delete from region_themes
cursor.execute("DELETE FROM region_themes WHERE theme_id = %s", (theme_id,))
print(f"Deleted {cursor.rowcount} rows from region_themes.")
# Delete from permit_theme_rules
cursor.execute("DELETE FROM permit_theme_rules WHERE theme_id = %s", (theme_id,))
print(f"Deleted {cursor.rowcount} rows from permit_theme_rules.")
# Delete from themes
cursor.execute("DELETE FROM themes WHERE id = %s", (theme_id,))
print(f"Deleted {cursor.rowcount} row from themes.")
# Commit
conn.commit()
print("Transaction committed successfully.")
except Exception as e:
conn.rollback()
print(f"Error during transaction, rolled back. Error: {e}")
raise
conn.close()
except Exception as e:
print(f"Database error: {e}")
if __name__ == "__main__":
if perform_backup():
perform_cleanup()
else:
print("Aborting cleanup because backup failed.")

87
tools/run_migration_v6.py Normal file
View File

@ -0,0 +1,87 @@
import os
import pg8000
import sys
def load_env(env_path='.env'):
config = {}
if not os.path.exists(env_path):
env_path = os.path.join('..', '.env')
if not os.path.exists(env_path):
print(f"Warning: {env_path} not found")
return config
with open(env_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
config[key.strip()] = value.strip()
return config
def run_migration():
config = load_env()
print("\nConnecting to licensing_risks database...")
conn = None
try:
conn = pg8000.connect(
user=config.get('LIC_PG_USER', 'postgres'),
password=config.get('LIC_PG_PASSWORD'),
host=config.get('LIC_PG_HOST', 'localhost'),
port=int(config.get('LIC_PG_PORT', 5432)),
database=config.get('LIC_PG_DATABASE', 'licensing_risks')
)
print("✅ Connection successful!")
cursor = conn.cursor()
# IDs identified
theme_id = 'fc4b7e18-de60-4f58-9805-bac84097c00e'
unique_permit_id = 'fd21d74d-0626-46a3-9f98-e0658d2ca206' # 印章刻制业许可证核发
print("\n--- Starting Deletion Process ---\n")
# 1. Delete Theme Association
print(f"Deleting associations for theme {theme_id}...")
cursor.execute("DELETE FROM region_theme_permits WHERE theme_id = %s", (theme_id,))
print(f"Deleted rows from region_theme_permits.")
# 2. Delete Theme
print(f"Deleting theme {theme_id}...")
cursor.execute("DELETE FROM themes WHERE id = %s", (theme_id,))
print(f"Deleted row from themes.")
# 3. Delete Unique Permit and its details
print(f"Deleting unique permit {unique_permit_id} (印章刻制业许可证核发)...")
tables_to_clean = [
'region_permit_details',
'region_permit_risks',
'region_permit_scopes',
'region_permit_subitems'
]
for table in tables_to_clean:
print(f"Cleaning {table}...")
cursor.execute(f"DELETE FROM {table} WHERE permit_id = %s", (unique_permit_id,))
print(f"Deleting permit from permits table...")
cursor.execute("DELETE FROM permits WHERE id = %s", (unique_permit_id,))
# Commit transaction
conn.commit()
print("\n✅ Migration completed successfully and committed.")
except Exception as e:
print(f"\n❌ Database error: {e}")
if conn:
conn.rollback()
print("Transaction rolled back.")
finally:
if conn:
conn.close()
if __name__ == "__main__":
run_migration()

View File

@ -0,0 +1,205 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Excel服务部门层级信息补充脚本
功能
1. 读取Excel文件为每条记录添加"绑定服务部门""服务部门所属部门"两列
2. 根据层级规则和区名关键词自动判断上级部门
3. 生成统计报告并保存补充后的Excel文件
"""
import pandas as pd
import sys
from pathlib import Path
from collections import Counter
# 区名关键词映射表
REGION_KEYWORDS = {
'禅城区': '禅城区服务部门',
'南海区': '南海区服务部门',
'顺德区': '顺德区服务部门',
'三水区': '三水区服务部门',
'高明区': '高明区服务部门'
}
MUNICIPAL_DEPT = '市级服务部门'
def extract_region_service_dept(unit_name):
"""
从单位名称中提取区级服务部门
Args:
unit_name: 单位名称字符串
Returns:
对应的区级服务部门名称如果未找到则返回None
"""
if pd.isna(unit_name):
return None
unit_name_str = str(unit_name)
# 按优先级顺序匹配区名关键词
for keyword, service_dept in REGION_KEYWORDS.items():
if keyword in unit_name_str:
return service_dept
return None
def determine_parent_department(unit_name, level_value):
"""
确定服务部门所属部门
规则
1. 如果层级值包含"市级" 市级服务部门
2. 如果单位名称包含区名关键词 对应区级服务部门
3. 否则默认为市级服务部门兜底策略
Args:
unit_name: 单位名称
level_value: 层级/风险提示/备注列的值
Returns:
服务部门所属部门名称
"""
# 规则1: 检查层级值是否包含"市级"
if pd.notna(level_value) and '市级' in str(level_value):
return MUNICIPAL_DEPT
# 规则2: 从单位名称中提取区级服务部门
region_dept = extract_region_service_dept(unit_name)
if region_dept:
return region_dept
# 规则3: 兜底策略 - 默认为市级服务部门
return MUNICIPAL_DEPT
def supplement_excel_file(input_path, output_path):
"""
补充Excel文件的部门层级信息
Args:
input_path: 输入Excel文件路径
output_path: 输出Excel文件路径
"""
print(f"[INFO] Reading Excel file: {input_path}")
# 读取Excel文件
try:
df = pd.read_excel(input_path)
print(f"[OK] Successfully read {len(df)} records")
except Exception as e:
print(f"[ERROR] Failed to read Excel file: {e}")
sys.exit(1)
# 显示原始列名
print(f"\n[INFO] Original columns ({len(df.columns)} columns):")
for i, col in enumerate(df.columns):
print(f" {i+1}. {col}")
# 添加"绑定服务部门"列(直接复制"单位名称"列)
# 假设第二列是"单位名称"
unit_name_col = df.columns[1] # 第二列
df['绑定服务部门'] = df[unit_name_col]
# 添加"服务部门所属部门"列(根据规则计算)
level_col = df.columns[4] # "层级/风险提示/备注"列第5列
df['服务部门所属部门'] = df.apply(
lambda row: determine_parent_department(row[unit_name_col], row[level_col]),
axis=1
)
print(f"\n[OK] Added 2 columns:")
print(f" - Binding Service Department (copied from '{unit_name_col}')")
print(f" - Parent Service Department (calculated based on hierarchy rules)")
# 生成统计报告
print(f"\n[STATS] Parent Service Department Distribution:")
parent_dept_counts = Counter(df['服务部门所属部门'])
# 按标准顺序显示
standard_order = [MUNICIPAL_DEPT] + list(REGION_KEYWORDS.values())
for dept in standard_order:
count = parent_dept_counts.get(dept, 0)
percentage = (count / len(df) * 100) if len(df) > 0 else 0
print(f" {dept}: {count} 条 ({percentage:.1f}%)")
# 检查无法匹配的记录
unmapped_records = df[
(~df[level_col].astype(str).str.contains('市级', na=False)) &
(~df[unit_name_col].astype(str).str.contains('|'.join(REGION_KEYWORDS.keys()), na=False))
]
if len(unmapped_records) > 0:
print(f"\n[WARNING] Found {len(unmapped_records)} records that could not be matched by region keywords, assigned to Municipal Service Department by default:")
print(" Sample records:")
for idx, row in unmapped_records.head(5).iterrows():
print(f" - {row[unit_name_col]} (层级: {row[level_col]})")
if len(unmapped_records) > 5:
print(f" ... 还有 {len(unmapped_records) - 5}")
# 保存补充后的Excel文件
print(f"\n[SAVE] Saving supplemented Excel file: {output_path}")
try:
df.to_excel(output_path, index=False, engine='openpyxl')
print(f"[OK] Successfully saved {len(df)} records to {output_path}")
except Exception as e:
print(f"[ERROR] Failed to save Excel file: {e}")
sys.exit(1)
# 验证数据完整性
print(f"\n[VERIFY] Data Integrity Validation:")
null_binding = df['绑定服务部门'].isna().sum()
null_parent = df['服务部门所属部门'].isna().sum()
print(f" Total records: {len(df)}")
print(f" Null values in Binding Service Department: {null_binding}")
print(f" Null values in Parent Service Department: {null_parent}")
print(f" Total columns: {len(df.columns)}")
if null_binding == 0 and null_parent == 0:
print(f"[OK] Data integrity validation passed")
else:
print(f"[WARNING] Found null values, please check data source")
# 显示示例数据
print(f"\n[DATA] Sample data (first 5 rows):")
sample_cols = [unit_name_col, level_col, '绑定服务部门', '服务部门所属部门']
print(df[sample_cols].head(10).to_string(index=False))
return df
def main():
"""主函数"""
# 定义文件路径
base_path = Path(__file__).parent.parent
input_file = base_path / 'data' / '法律风险提示管理员名单_合并-20260309.xlsx'
output_file = base_path / 'data' / '法律风险提示管理员名单_合并-20260309_supplemented.xlsx'
print("=" * 80)
print("Excel服务部门层级信息补充工具")
print("=" * 80)
# 检查输入文件是否存在
if not input_file.exists():
print(f"[ERROR] Input file does not exist: {input_file}")
sys.exit(1)
# 执行补充处理
df = supplement_excel_file(str(input_file), str(output_file))
print("\n" + "=" * 80)
print("[OK] Processing completed!")
print(f"[FILE] Output file: {output_file}")
print("=" * 80)
if __name__ == '__main__':
main()