import os import sys import pg8000.native as pg sys.path.append(os.getcwd()) from lawrisk.utils.env_loader import load_env load_env() def main(): try: conn = pg.Connection( user=os.getenv('LIC_PG_USER', 'postgres'), host=os.getenv('LIC_PG_HOST', '8.138.196.105'), port=int(os.getenv('LIC_PG_PORT', 5432)), password=os.getenv('LIC_PG_PASSWORD', ''), database=os.getenv('LIC_PG_DATABASE', 'licensing_risks') ) print("Connected to DB.") # 1. Investigate User FSSJSS print("-" * 30) print("Investigating User 'FSSJSS'") user_res = conn.run(""" SELECT au.id, au.username, au.service_department_id, sd.name, sd.code, sd.parent_id FROM auth_users au JOIN service_departments sd ON sd.id = au.service_department_id WHERE au.username = 'fssjss' """) if not user_res: print("User FSSJSS not found!") return u_id, u_name, u_dept_id, u_dept_name, u_dept_code, u_parent_id = user_res[0] print(f"User: {u_name}") print(f"Department: {u_dept_name} ({u_dept_code}) ID: {u_dept_id}") print(f"Parent Dept ID: {u_parent_id}") # 2. Investigate SYSADMIN Department print("-" * 30) print("Investigating Department 'SYSADMIN'") sys_res = conn.run("SELECT id, name, parent_id FROM service_departments WHERE code = 'SYSADMIN'") if not sys_res: print("SYSADMIN Department not found!") else: sys_id, sys_name, sys_parent = sys_res[0] print(f"SYSADMIN ID: {sys_id}, Name: {sys_name}, Parent: {sys_parent}") if str(sys_id) == str(u_dept_id): print("ALERT: FSSJSS is directly in SYSADMIN department!") if str(sys_id) == str(u_parent_id): print("ALERT: FSSJSS is a direct child of SYSADMIN!") # 3. Check Recursive Access for FSSJSS print("-" * 30) print("Checking accessible departments for FSSJSS...") accessible_query = """ WITH RECURSIVE sub AS ( SELECT id, name, code FROM service_departments WHERE id = :root_id UNION ALL SELECT sd.id, sd.name, sd.code FROM service_departments sd JOIN sub ON sd.parent_id = sub.id ) SELECT id, name, code FROM sub """ access_rows = conn.run(accessible_query, root_id=u_dept_id) accessible_ids = [str(row[0]) for row in access_rows] print(f"FSSJSS can see data from {len(accessible_ids)} departments.") for row in access_rows: print(f" - {row[1]} ({row[2]}) ID: {row[0]}") if str(sys_id) in accessible_ids: print("CRITICAL: SYSADMIN is in the accessible list! (This implies SYSADMIN is a descendant of FSSJSS??)") else: print("OK: SYSADMIN is NOT in the accessible list.") # 4. Investigate a City Permit print("-" * 30) print("Investigating a sample City Permit") # Find a permit in '市级' permit_res = conn.run(""" SELECT ps.permit_id, ps.bound_department_id, ps.uploader_department_id, p.name FROM permit_sources ps JOIN region_permit_details rpd ON rpd.permit_id = ps.permit_id AND rpd.region_id = ps.region_id JOIN regions r ON r.id = rpd.region_id JOIN permits p ON p.id = ps.permit_id WHERE r.name = '市级' LIMIT 1 """) if permit_res: p_id, bound_id, uploader_id, p_name = permit_res[0] print(f"Permit: {p_name}") print(f"Bound Dept ID: {bound_id}") print(f"Uploader Dept ID: {uploader_id}") if str(bound_id) == str(sys_id): print("Permit is correctly bound to SYSADMIN.") else: print(f"Permit is bound to {bound_id}, NOT SYSADMIN!") if str(bound_id) in accessible_ids: print("CRITICAL: The bound department IS in FSSJSS's accessible list.") else: print("OK: The bound department is NOT in FSSJSS's accessible list.") else: print("No city permits found for check.") except Exception as e: print(f"Error: {e}") if __name__ == "__main__": main()