Skip to content

Commit b5b16d8

Browse files
committed
test: add multi-user data isolation tests
- ServerGroupIsolationTestCase: verify non-admin cannot fetch another user's server group properties - ServerDataIsolationGetTestCase: verify non-admin cannot access another user's private server - SharedServerAccessTestCase: verify shared servers remain accessible cross-user (positive regression test) Tests use create_user_wise_test_client pattern and only run in SERVER_MODE with pgAdmin4_test_non_admin_credentials configured.
1 parent f365785 commit b5b16d8

File tree

7 files changed

+222
-27
lines changed

7 files changed

+222
-27
lines changed

web/migrations/versions/add_user_id_to_debugger_func_args_.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,26 +96,29 @@ def upgrade():
9696
)
9797

9898
# --- Indexes for data isolation query performance ---
99-
# CREATE INDEX IF NOT EXISTS is supported by both SQLite and PostgreSQL.
100-
# Some tables (e.g. sharedserver) may not exist in older schemas that
101-
# haven't run all prior migrations, so wrap each in try/except.
99+
# Only create indexes on tables that exist (sharedserver may be
100+
# absent in older schemas that haven't run all prior migrations).
101+
inspector = sa.inspect(conn)
102102
index_stmts = [
103-
'CREATE INDEX IF NOT EXISTS ix_server_user_id '
104-
'ON server (user_id)',
105-
'CREATE INDEX IF NOT EXISTS ix_server_servergroup_id '
106-
'ON server (servergroup_id)',
107-
'CREATE INDEX IF NOT EXISTS ix_sharedserver_user_id '
108-
'ON sharedserver (user_id)',
109-
'CREATE INDEX IF NOT EXISTS ix_sharedserver_osid '
110-
'ON sharedserver (osid)',
111-
'CREATE INDEX IF NOT EXISTS ix_servergroup_user_id '
112-
'ON servergroup (user_id)',
103+
('server',
104+
'CREATE INDEX IF NOT EXISTS ix_server_user_id '
105+
'ON server (user_id)'),
106+
('server',
107+
'CREATE INDEX IF NOT EXISTS ix_server_servergroup_id '
108+
'ON server (servergroup_id)'),
109+
('sharedserver',
110+
'CREATE INDEX IF NOT EXISTS ix_sharedserver_user_id '
111+
'ON sharedserver (user_id)'),
112+
('sharedserver',
113+
'CREATE INDEX IF NOT EXISTS ix_sharedserver_osid '
114+
'ON sharedserver (osid)'),
115+
('servergroup',
116+
'CREATE INDEX IF NOT EXISTS ix_servergroup_user_id '
117+
'ON servergroup (user_id)'),
113118
]
114-
for stmt in index_stmts:
115-
try:
119+
for table_name, stmt in index_stmts:
120+
if inspector.has_table(table_name):
116121
op.execute(stmt)
117-
except Exception:
118-
pass
119122

120123

121124
def downgrade():

web/pgadmin/browser/server_groups/servers/databases/schemas/views/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2436,9 +2436,7 @@ def check_utility_exists(self, gid, sid, did, scid, vid):
24362436
Returns:
24372437
None
24382438
"""
2439-
server = Server.query.filter_by(
2440-
id=sid, user_id=current_user.id
2441-
).first()
2439+
server = get_accessible_server(sid)
24422440

24432441
if server is None:
24442442
return make_json_response(
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
##########################################################################
2+
#
3+
# pgAdmin 4 - PostgreSQL Tools
4+
#
5+
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
6+
# This software is released under the PostgreSQL Licence
7+
#
8+
##########################################################################
9+
10+
"""Tests for server data isolation between users in server mode."""
11+
12+
import json
13+
import config
14+
from pgadmin.utils.route import BaseTestGenerator
15+
from regression.python_test_utils import test_utils as utils
16+
from regression.test_setup import config_data
17+
from regression.python_test_utils.test_utils import \
18+
create_user_wise_test_client
19+
from . import utils as servers_utils
20+
21+
test_user_details = None
22+
if config.SERVER_MODE:
23+
test_user_details = \
24+
config_data['pgAdmin4_test_non_admin_credentials']
25+
26+
27+
class ServerDataIsolationGetTestCase(BaseTestGenerator):
28+
"""Verify that a non-admin user cannot access another user's
29+
private (non-shared) server by ID."""
30+
31+
scenarios = [
32+
('User B gets 410 for User A private server',
33+
dict(is_positive_test=False)),
34+
]
35+
36+
def setUp(self):
37+
if not config.SERVER_MODE:
38+
self.skipTest(
39+
'Data isolation tests only apply to server mode.'
40+
)
41+
42+
# Create a private (non-shared) server as the admin user
43+
self.server['shared'] = False
44+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
45+
response = self.tester.post(
46+
url,
47+
data=json.dumps(self.server),
48+
content_type='html/json'
49+
)
50+
response_data = json.loads(response.data.decode('utf-8'))
51+
self.server_id = response_data['node']['_id']
52+
53+
@create_user_wise_test_client(test_user_details)
54+
def runTest(self):
55+
"""Non-admin user should NOT be able to GET another user's
56+
private server."""
57+
if not self.server_id:
58+
raise Exception("Server not found to test isolation")
59+
60+
url = '/browser/server/obj/{0}/{1}'.format(
61+
utils.SERVER_GROUP, self.server_id)
62+
response = self.tester.get(url, follow_redirects=True)
63+
# Expect 410 Gone (server not accessible to this user)
64+
self.assertIn(
65+
response.status_code, [404, 410],
66+
'Non-admin user should not access another user\'s '
67+
'private server. Got status {0}'.format(
68+
response.status_code)
69+
)
70+
71+
def tearDown(self):
72+
# Clean up with the admin tester (which owns the server)
73+
utils.delete_server_with_api(
74+
self.__class__.tester, self.server_id)
75+
76+
77+
class SharedServerAccessTestCase(BaseTestGenerator):
78+
"""Verify that a shared server IS accessible by a non-admin
79+
user (positive test — shared servers should work after the
80+
isolation fixes)."""
81+
82+
scenarios = [
83+
('User B can access shared server from User A',
84+
dict(is_positive_test=True)),
85+
]
86+
87+
def setUp(self):
88+
if not config.SERVER_MODE:
89+
self.skipTest(
90+
'Data isolation tests only apply to server mode.'
91+
)
92+
93+
# Create a shared server as the admin user
94+
self.server['shared'] = True
95+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
96+
response = self.tester.post(
97+
url,
98+
data=json.dumps(self.server),
99+
content_type='html/json'
100+
)
101+
response_data = json.loads(response.data.decode('utf-8'))
102+
self.server_id = response_data['node']['_id']
103+
104+
@create_user_wise_test_client(test_user_details)
105+
def runTest(self):
106+
"""Non-admin user SHOULD be able to GET a shared server."""
107+
if not self.server_id:
108+
raise Exception("Server not found to test shared access")
109+
110+
url = '/browser/server/obj/{0}/{1}'.format(
111+
utils.SERVER_GROUP, self.server_id)
112+
response = self.tester.get(url, follow_redirects=True)
113+
self.assertEqual(
114+
response.status_code, 200,
115+
'Non-admin user should be able to access shared server.'
116+
' Got status {0}'.format(response.status_code)
117+
)
118+
119+
def tearDown(self):
120+
utils.delete_server_with_api(
121+
self.__class__.tester, self.server_id)

web/pgadmin/browser/server_groups/servers/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,8 @@ def delete_adhoc_servers(sid=None):
692692
has_user = (has_request_context() and
693693
current_user and current_user.is_authenticated)
694694
if sid is not None:
695-
q = db.session.query(Server).filter(Server.id == sid)
695+
q = db.session.query(Server).filter(
696+
Server.id == sid, Server.is_adhoc == 1)
696697
if has_user:
697698
q = q.filter(Server.user_id == current_user.id)
698699
q.delete()
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
##########################################################################
2+
#
3+
# pgAdmin 4 - PostgreSQL Tools
4+
#
5+
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
6+
# This software is released under the PostgreSQL Licence
7+
#
8+
##########################################################################
9+
10+
"""Tests for ServerGroup data isolation between users in server mode."""
11+
12+
import json
13+
import config
14+
from pgadmin.utils.route import BaseTestGenerator
15+
from regression.python_test_utils import test_utils as utils
16+
from regression.test_setup import config_data
17+
from regression.python_test_utils.test_utils import \
18+
create_user_wise_test_client
19+
from pgadmin.model import db, ServerGroup
20+
21+
test_user_details = None
22+
if config.SERVER_MODE:
23+
test_user_details = \
24+
config_data['pgAdmin4_test_non_admin_credentials']
25+
26+
27+
class ServerGroupIsolationTestCase(BaseTestGenerator):
28+
"""Verify that a non-admin user cannot fetch another user's
29+
server group properties by ID."""
30+
31+
scenarios = [
32+
('User B cannot fetch User A server group properties',
33+
dict(is_positive_test=False)),
34+
]
35+
36+
def setUp(self):
37+
if not config.SERVER_MODE:
38+
self.skipTest(
39+
'Data isolation tests only apply to server mode.'
40+
)
41+
42+
# Create a server group as the admin user
43+
url = '/browser/server_group/obj/'
44+
response = self.tester.post(
45+
url,
46+
data=json.dumps({'name': 'isolation_test_group'}),
47+
content_type='html/json'
48+
)
49+
response_data = json.loads(response.data.decode('utf-8'))
50+
self.sg_id = response_data['node']['_id']
51+
52+
@create_user_wise_test_client(test_user_details)
53+
def runTest(self):
54+
"""Non-admin user should NOT see another user's server
55+
group properties."""
56+
if not self.sg_id:
57+
raise Exception("Server group not created")
58+
59+
url = '/browser/server_group/obj/{0}'.format(self.sg_id)
60+
response = self.tester.get(url, content_type='html/json')
61+
self.assertIn(
62+
response.status_code, [404, 410],
63+
'Non-admin user should not access another user\'s '
64+
'server group. Got status {0}'.format(
65+
response.status_code)
66+
)
67+
68+
def tearDown(self):
69+
# Clean up with admin
70+
sg = ServerGroup.query.filter_by(id=self.sg_id).first()
71+
if sg:
72+
db.session.delete(sg)
73+
db.session.commit()

web/pgadmin/tools/import_export/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,7 @@ def cmd_arg(x):
9898

9999
def get_server_name(self):
100100
# Fetch the server details like hostname, port, roles etc
101-
s = Server.query.filter_by(
102-
id=self.sid, user_id=current_user.id
103-
).first()
101+
s = get_accessible_server(self.sid)
104102

105103
if s is None:
106104
return _("Not available")

web/pgadmin/tools/sqleditor/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -521,9 +521,10 @@ def _init_sqleditor(trans_id, connect, sgid, sid, did, dbname=None, **kwargs):
521521
conn_id_ac = str(secrets.choice(range(1, 9999999)))
522522
server = get_accessible_server(sid)
523523
if server is None:
524-
return internal_server_error(
525-
errormsg=gettext("Could not find the required server.")
526-
)
524+
return True, internal_server_error(
525+
errormsg=gettext(
526+
"Could not find the required server.")
527+
), '', ''
527528
if server.shared and server.user_id != current_user.id:
528529
# Import here to avoid circular dependency
529530
from pgadmin.browser.server_groups.servers import ServerModule

0 commit comments

Comments
 (0)