diff --git a/lawrisk/api/auth.py b/lawrisk/api/auth.py index ed9bf9e..945ce12 100644 --- a/lawrisk/api/auth.py +++ b/lawrisk/api/auth.py @@ -109,6 +109,38 @@ def ensure_admin_access( return user, None +def is_superuser(user: Optional[Dict[str, Any]]) -> bool: + """Check if the given user is a superuser with full system access. + + Superuser criteria (any match): + 1. role equals 'admin' + 2. username equals 'fssjsj' + 3. grade equals 100 (fallback check) + + Args: + user: User dictionary from get_current_user() + + Returns: + True if user is a superuser, False otherwise + """ + if not user: + return False + + # Primary check: role-based + if user.get("role") == "admin": + return True + + # Special exception: specific username + if user.get("username") == "fssjsj": + return True + + # Fallback check: grade-based + if user.get("grade") == 100: + return True + + return False + + def get_current_user() -> Optional[Dict[str, Any]]: data = session.get(SESSION_USER_KEY) if not isinstance(data, dict): diff --git a/lawrisk/api/v2.py b/lawrisk/api/v2.py index 383c8d5..7078c77 100644 --- a/lawrisk/api/v2.py +++ b/lawrisk/api/v2.py @@ -131,14 +131,69 @@ def lawrisk_search_v2(): @v2_bp.route('/v2/unbound-permits', methods=['GET']) +@login_required def lawrisk_unbound_permits(): - """Get list of permits that are not bound to any theme.""" + """Get list of permits that are not bound to any theme. + + SECURITY: Requires login and filters permits based on user's department tree. + User can only see unbound permits from their department or its descendants. + """ try: + # Get current user for permission filtering + current_user = get_current_user() + if not current_user: + return jsonify({"success": False, "message": "Authentication required"}), 401 + visibility = request.args.get("visibility") or "visible" search_text = request.args.get("search_text") department_ids = request.args.getlist("department_ids[]") region_id = request.args.get("region_id") - + + # Get user's accessible departments for permission filtering + user_department = current_user.get("department", {}) + user_dept_id = user_department.get("id") + + # Import superuser check + from lawrisk.api.auth import is_superuser + + # Check if user is superuser before applying department filtering + if is_superuser(current_user): + # Superuser can see all departments + # If departments are specified, use them as-is + if not department_ids: + department_ids = None # None means no filter + elif user_dept_id: + from lawrisk.services.licensing_repo import _lic_pg_conn, _ensure_service_department_schema, _fetch_department_descendants + + with _lic_pg_conn() as conn: + _ensure_service_department_schema(conn) + cur = conn.cursor() + + # Get user's department and its descendants + accessible_dept_ids = _fetch_department_descendants(cur, str(user_dept_id)) + + if not accessible_dept_ids: + # User has no accessible departments + print(f"[DEBUG] User {current_user.get('username')} has no accessible departments") + return jsonify({"success": True, "data": []}) + + # If user specified departments, intersect with accessible departments + if department_ids: + # Filter to only include departments that are both specified by user AND accessible + accessible_ids_set = set(accessible_dept_ids) + department_ids = [d for d in department_ids if d in accessible_ids_set] + if not department_ids: + # No overlap between specified departments and accessible departments + print(f"[DEBUG] No overlap between specified departments and accessible departments") + return jsonify({"success": True, "data": []}) + else: + # No department filter specified, use all accessible departments + department_ids = accessible_dept_ids + elif not is_superuser(current_user): + # User has no department binding and is not a superuser + print(f"[DEBUG] User {current_user.get('username')} has no department binding") + return jsonify({"success": False, "message": "User has no department binding"}), 403 + permits = list_unbound_permits( visibility=visibility, search_text=search_text, @@ -925,8 +980,13 @@ def admin_themes(): @v2_bp.route('/admin/permits', methods=['GET']) +@login_required def admin_permits(): - """Get permits for a region. Optional theme filter keeps backward compatibility.""" + """Get permits for a region. Optional theme filter keeps backward compatibility. + + SECURITY: Requires login and filters permits based on user's department tree. + User can only see permits uploaded/bound by their department or its descendants. + """ region_value = request.args.get("region") or request.args.get("region_id") theme_value = request.args.get("theme") or request.args.get("theme_id") @@ -941,19 +1001,24 @@ def admin_permits(): ) try: + # Get current user for permission filtering + current_user = get_current_user() + if not current_user: + return jsonify({"success": False, "message": "Authentication required"}), 401 + + # Use list_permits_for_region which already supports current_user parameter + # This function internally calls get_visible_permits() for department-based filtering + permits = list_permits_for_region(region_token, current_user=current_user) + + data = { + "region": region_token, + "permits": permits, + } + + # If theme filter is specified, add it to the response if theme_token: - permits = load_permits_and_risks(region_token, theme_token) - data = { - "region": region_token, - "theme": theme_token, - "permits": permits, - } - else: - catalog = list_region_permit_catalog(region_token) - data = { - "region": region_token, - "permits": catalog, - } + data["theme"] = theme_token + return jsonify({"success": True, "data": data}) except Exception as exc: print(f"admin_permits error: {exc}") @@ -1701,16 +1766,25 @@ def admin_delete_checkpoint(checkpoint_id): @v2_bp.route('/admin/permits/advanced-filter', methods=['GET', 'POST']) +@login_required def admin_permits_advanced_filter(): """Advanced filtering for permits with multiple dimensions. + SECURITY: Requires login and filters permits based on user's department tree. + User can only see permits from their department or its descendants. + Supports filtering by: - regions: List of administrative regions (supports multi-select) - themes: List of legal themes (supports multi-select) - - departments: List of departments (supports multi-select) + - departments: List of departments (supports multi-select) - intersected with user's accessible departments - search_text: Permit name search """ try: + # Get current user for permission filtering + current_user = get_current_user() + if not current_user: + return jsonify({"success": False, "message": "Authentication required"}), 401 + # Parse parameters from query string or request body # Parse parameters from query/body in a more unified way if request.method == 'GET': @@ -1735,13 +1809,13 @@ def admin_permits_advanced_filter(): if isinstance(regions, str): regions = [regions] if isinstance(themes, str): themes = [themes] if isinstance(departments, str): departments = [departments] - + regions = [r.strip() for r in regions if r and r.strip()] if regions else None themes = [t.strip() for t in themes if t and t.strip()] if themes else None departments = [d.strip() for d in departments if d and d.strip()] if departments else None search_text = (search_text or "").strip() or None visibility = (visibility or "").strip().lower() or None - + try: limit = int(limit) except (ValueError, TypeError): @@ -1753,7 +1827,60 @@ def admin_permits_advanced_filter(): print(f"[DEBUG] admin_permits_advanced_filter params: search={search_text}, visibility={visibility}, regions={regions}") - # Execute filtering + # Get user's accessible departments for permission filtering + user_department = current_user.get("department", {}) + user_dept_id = user_department.get("id") + + # Import superuser check + from lawrisk.api.auth import is_superuser + + # Check if user is superuser before applying department filtering + if is_superuser(current_user): + # Superuser can see all departments + # If departments are specified, use them as-is + if not departments: + departments = None # None means no filter + elif user_dept_id: + from lawrisk.services.licensing_repo import _lic_pg_conn, _ensure_service_department_schema, _fetch_department_descendants + + with _lic_pg_conn() as conn: + _ensure_service_department_schema(conn) + cur = conn.cursor() + + # Get user's department and its descendants + accessible_dept_ids = _fetch_department_descendants(cur, str(user_dept_id)) + + if not accessible_dept_ids: + # User has no accessible departments + print(f"[DEBUG] User {current_user.get('username')} has no accessible departments") + return jsonify({"success": True, "data": { + "permits": [], + "pagination": {"total": 0, "page": 1, "page_size": limit} + }}) + + # If user specified departments, intersect with accessible departments + if departments: + # Filter to only include departments that are both specified by user AND accessible + accessible_ids_set = set(accessible_dept_ids) + departments = [d for d in departments if d in accessible_ids_set] + if not departments: + # No overlap between specified departments and accessible departments + print(f"[DEBUG] No overlap between specified departments and accessible departments") + return jsonify({"success": True, "data": { + "permits": [], + "pagination": {"total": 0, "page": 1, "page_size": limit} + }}) + else: + # No department filter specified, use all accessible departments + departments = accessible_dept_ids + elif not is_superuser(current_user): + # User has no department binding and is not a superuser + print(f"[DEBUG] User {current_user.get('username')} has no department binding") + return jsonify({"success": False, "message": "User has no department binding"}), 403 + + # Execute filtering with permission-constrained departments + # Use strict department hierarchy (no family expansion) for proper permission control + # Users can only see permits from their own department and descendants, NOT from parent departments result = filter_permits_advanced( regions=regions, themes=themes, @@ -1762,6 +1889,7 @@ def admin_permits_advanced_filter(): visibility=visibility, limit=limit, offset=offset, + expand_department_family=False, # Strict mode: no family expansion ) return jsonify({"success": True, "data": result}) @@ -1771,25 +1899,66 @@ def admin_permits_advanced_filter(): @v2_bp.route('/admin/permits/filter-options', methods=['GET']) +@login_required def admin_permits_filter_options(): """Get available filter options for permit filtering. + SECURITY: Requires login. Returns only departments accessible to the user. + Args: region_id: Optional. If provided, only return departments associated with this region - """ try: + # Get current user for permission filtering + current_user = get_current_user() + if not current_user: + return jsonify({"success": False, "message": "Authentication required"}), 401 + # Get region_id from query parameters region_id = request.args.get('region_id') - # Get all regions + # Get all regions (regions are public metadata) regions = list_regions() - # Get all themes + # Get all themes (themes are public metadata) themes = list_all_themes() - # Get service departments (filtered by region if region_id is provided) - departments = list_service_departments(region_id=region_id) if region_id else list_service_departments() + # Get service departments filtered by user's permissions + user_department = current_user.get("department", {}) + user_dept_id = user_department.get("id") + + # Import superuser check + from lawrisk.api.auth import is_superuser + + # Check if user is superuser + if is_superuser(current_user): + # Superuser can see all departments + departments = list_service_departments(region_id=region_id) if region_id else list_service_departments() + elif user_dept_id: + from lawrisk.services.licensing_repo import _lic_pg_conn, _ensure_service_department_schema, _fetch_department_descendants + + with _lic_pg_conn() as conn: + _ensure_service_department_schema(conn) + cur = conn.cursor() + + # Get user's department and its descendants + accessible_dept_ids = _fetch_department_descendants(cur, str(user_dept_id)) + + if not accessible_dept_ids: + # User has no accessible departments + print(f"[DEBUG] User {current_user.get('username')} has no accessible departments") + departments = [] + else: + # Get all departments, then filter to only include accessible ones + all_departments = list_service_departments(region_id=region_id) if region_id else list_service_departments() + + # Filter departments to only include accessible ones + accessible_ids_set = set(accessible_dept_ids) + departments = [d for d in all_departments if d.get('id') in accessible_ids_set] + elif not is_superuser(current_user): + # User has no department binding and is not a superuser, return empty department list + print(f"[DEBUG] User {current_user.get('username')} has no department binding") + departments = [] return jsonify({ "success": True, diff --git a/lawrisk/services/licensing_repo.py b/lawrisk/services/licensing_repo.py index 5e7f36f..9959bb5 100644 --- a/lawrisk/services/licensing_repo.py +++ b/lawrisk/services/licensing_repo.py @@ -3572,6 +3572,79 @@ def get_visible_permits( user_department = current_user.get("department", {}) if current_user else {} user_dept_id = user_department.get("id") + # ===== Superuser bypass: skip department hierarchy filtering ===== + from lawrisk.api.auth import is_superuser + + if is_superuser(current_user): + logger.info("User %s is superuser, bypassing department restrictions", username) + # Superuser can see all permits, only apply explicit filters + with _lic_pg_conn() as conn: + _ensure_service_department_schema(conn) + _ensure_permit_sources_table(conn) + cur = conn.cursor() + + # 解析筛选部门(用部门树收缩范围) + def _resolve_target_departments(dept_token: Optional[str]) -> List[str]: + if not dept_token: + return [] + try: + target_uuid = str(dept_token).strip() + except Exception: + return [] + return _fetch_department_descendants(cur, target_uuid) + + filter_dept_sets: List[List[str]] = [] + if filters.get("municipal_dept_id"): + filter_dept_sets.append(_resolve_target_departments(filters.get("municipal_dept_id"))) + if filters.get("district_dept_id"): + filter_dept_sets.append(_resolve_target_departments(filters.get("district_dept_id"))) + + sql = """ + SELECT DISTINCT p.id, p.name + FROM permit_sources ps + JOIN permits p ON p.id = ps.permit_id + LEFT JOIN regions r ON r.id = ps.region_id + """ + where_clauses: List[str] = [] + params: List[Any] = [] + + # Superuser bypass: NO department hierarchy filtering + # Only apply explicit department filters if specified + + # 额外的部门筛选(前端传入) + for target_set in filter_dept_sets: + if not target_set: + continue + where_clauses.append( + "(ps.uploader_department_id = ANY(%s) OR ps.bound_department_id = ANY(%s))" + ) + params.extend([target_set, target_set]) + + if filters.get("region"): + where_clauses.append( + "(ps.region_id::text = %s OR LOWER(r.name) = LOWER(%s))" + ) + region = filters["region"] + params.extend([region, region]) + + if filters.get("search_text"): + where_clauses.append("LOWER(p.name) LIKE LOWER(%s)") + params.append(f"%{filters['search_text']}%") + + if where_clauses: + sql += " WHERE " + " AND ".join(where_clauses) + + sql += " ORDER BY p.name" + + permits: List[Dict[str, str]] = [] + cur.execute(sql, params) + for permit_id, permit_name in cur.fetchall(): + permits.append({"id": str(permit_id), "name": str(permit_name)}) + + logger.info("Superuser %s can view %d permits (bypassed department restrictions)", username, len(permits)) + return permits + # ===== End superuser bypass ===== + if not current_user or not user_dept_id: logger.warning("Permission denied: User %s has no department binding", username) return [] @@ -3880,12 +3953,23 @@ def _load_permit_sources_for_region( def load_permits_and_risks( - region_id: str, - theme_id: Optional[str] = None, + region_id: str, + theme_id: Optional[str] = None, permit_id: Optional[str] = None, - only_visible: bool = False + only_visible: bool = False, + current_user: Optional[Dict[str, Any]] = None ) -> List[Dict[str, object]]: - """Return permits with attached risk entries for a region (optionally filtered by theme).""" + """Return permits with attached risk entries for a region (optionally filtered by theme). + + Args: + region_id: Region ID to query + theme_id: Optional theme ID filter + permit_id: Optional permit ID filter + only_visible: If True, only return v2 visible permits + current_user: Optional user dict for permission filtering. If provided, will filter + permits based on user's department tree (user can only see permits + uploaded/bound by their department or its descendants) + """ # Ensure optional permit file tables exist before running user queries. try: _ensure_permit_file_schema() @@ -6405,6 +6489,7 @@ def filter_permits_advanced( visibility: Optional[str] = None, # 'visible', 'hidden' or None/all limit: int = 100, offset: int = 0, + expand_department_family: bool = False, # NEW: Control whether to expand to include parent departments ) -> Dict[str, Any]: """Filter permits using multiple dimensions (region, theme, department, search text, visibility). @@ -6416,6 +6501,8 @@ def filter_permits_advanced( visibility: Filter by v2 visibility ('visible', 'hidden') limit: Maximum number of results to return offset: Offset for pagination + expand_department_family: If True, expand departments to include entire family (parents + children). + If False (default), use departments as-is for strict hierarchy control. Returns: Dictionary containing filtered permits and metadata @@ -6438,10 +6525,18 @@ def filter_permits_advanced( base_params.extend(themes) if departments: - # Expand departments to include family (parent + children) - # This allows cross-level visibility for the "same" department - expanded_departments = _expand_department_family(departments) - + # Conditionally expand departments based on expand_department_family parameter + if expand_department_family: + # Expand departments to include family (parent + children) + # This allows cross-level visibility for the "same" department + expanded_departments = _expand_department_family(departments) + print(f"[DEBUG] Expanded department family: {len(departments)} -> {len(expanded_departments)} departments") + else: + # Strict hierarchy: use departments as-is (only self + descendants) + # This ensures users can only see permits from their own department and descendants + expanded_departments = departments + print(f"[DEBUG] Strict department mode: {len(departments)} departments (no family expansion)") + placeholders = ', '.join(['%s'] * len(expanded_departments)) base_where += f" AND (ps.uploader_department_id IN ({placeholders}) OR ps.bound_department_id IN ({placeholders}))" base_params.extend(expanded_departments * 2) diff --git a/static/db_admin.html b/static/db_admin.html index 5467b78..10e62f7 100644 --- a/static/db_admin.html +++ b/static/db_admin.html @@ -4369,6 +4369,11 @@ } const formData = new FormData(); formData.append('file', file); + // Bind the permit to the current user's department + if (currentUserProfile && currentUserProfile.department && currentUserProfile.department.id) { + formData.append('bound_department_id', currentUserProfile.department.id); + } + formData.append('binding_mode', 'auto'); permitImportState.uploading = true; permitImportState.error = '';