-
Notifications
You must be signed in to change notification settings - Fork 859
Expand file tree
/
Copy pathserver_access.py
More file actions
160 lines (121 loc) · 4.85 KB
/
server_access.py
File metadata and controls
160 lines (121 loc) · 4.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
##########################################################################
#
# pgAdmin 4 - PostgreSQL Tools
#
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
# This software is released under the PostgreSQL Licence
#
##########################################################################
"""Centralized server access-checking utilities for data isolation.
In server mode, multiple users share the same pgAdmin instance. These
helpers enforce that users can only access servers they own or that
have been explicitly shared with them via SharedServer entries.
"""
from sqlalchemy import or_
from flask_security import current_user
from pgadmin.model import db, Server, ServerGroup
import config
def _is_admin():
"""Check if current user has Administrator role."""
return current_user.has_role('Administrator')
def get_server(sid, only_owned=False):
"""Fetch a server by ID, verifying the current user has access.
Args:
sid: Server ID.
only_owned: If True, only return servers owned by the current
user. Use this for write operations (change_password,
clear_saved_password, etc.) that must not mutate another
user's server record via shared access.
Returns the server if:
- Desktop mode (single user, no isolation needed), OR
- The user owns it, OR
- The server is shared AND only_owned is False, OR
- The user has the Administrator role.
Returns None otherwise (caller should return 404).
Note: In pgAdmin, Server.shared=True means the server is visible
to all authenticated users. SharedServer records are created
lazily for per-user customization, not for access control.
"""
if not config.SERVER_MODE:
return Server.query.filter_by(id=sid).first()
# Administrators can access all servers if ADMIN_CAN_SEE_ALL_SERVERS is True
if _is_admin() and config.ADMIN_CAN_SEE_ALL_SERVERS:
return Server.query.filter_by(id=sid).first()
if only_owned:
return Server.query.filter_by(
id=sid, user_id=current_user.id).first()
# Single query: owned OR shared
server = Server.query.filter(
Server.id == sid,
or_(
Server.user_id == current_user.id,
Server.shared
)
).first()
return server
def get_servers_from_group(gid, only_owned=False):
"""Fetch servers from a group
Args:
gid: Server group ID.
only_owned: If True, only return servers owned by the current
user.
"""
query = get_user_server_query().filter(Server.servergroup_id == gid)
if only_owned:
query = query.filter(Server.user_id == current_user.id)
return query
def get_server_group(gid,only_owned=False):
"""Fetch a server group by ID, verifying user access.
Returns the group if:
- Desktop mode, OR
- The user owns it, OR
- It contains shared servers (Server.shared=True), OR
- The user has the Administrator role.
Returns None otherwise.
"""
if not config.SERVER_MODE:
return ServerGroup.query.filter_by(id=gid).first()
# Administrators can access all groups if ADMIN_CAN_SEE_ALL_SERVERS is True
# even if they don't own any servers in the group
if _is_admin() and config.ADMIN_CAN_SEE_ALL_SERVERS:
return ServerGroup.query.filter_by(id=gid).first()
sg = get_server_groups_for_user(only_owned=only_owned).filter_by(id=gid).first()
return sg
def get_server_groups_for_user(only_owned=False):
"""Return server groups visible to the current user.
Includes groups owned by the user plus groups containing shared
servers (Server.shared=True, visible to all authenticated users).
Administrators see all groups if ADMIN_CAN_SEE_ALL_SERVERS is True.
"""
if not config.SERVER_MODE:
return ServerGroup.query.filter_by(
user_id=current_user.id
)
# Administrators can access all groups if ADMIN_CAN_SEE_ALL_SERVERS is True
# even if they don't own any servers in the group
if _is_admin() and config.ADMIN_CAN_SEE_ALL_SERVERS:
return ServerGroup.query
sg = ServerGroup.query.filter(
ServerGroup.user_id == current_user.id
)
if not only_owned:
sg = sg.union(
ServerGroup.query.join(ServerGroup.servers)
.filter(Server.shared)
)
return sg
def get_user_server_query():
"""Return a base query for servers accessible to the current user.
Includes owned servers + shared servers (visible to all users).
Administrators see all servers if ADMIN_CAN_SEE_ALL_SERVERS is True.
"""
if not config.SERVER_MODE:
return Server.query
if _is_admin() and config.ADMIN_CAN_SEE_ALL_SERVERS:
return Server.query
return Server.query.filter(
or_(
Server.user_id == current_user.id,
Server.shared
)
)