Skip to content

Commit b913595

Browse files
committed
test: add multi-user data isolation tests
- ServerGroupIsolationTestCase: verify non-admin cannot fetch another user's server group properties - ServerDataIsolationGetTestCase: verify non-admin cannot access another user's private server - SharedServerAccessTestCase: verify shared servers remain accessible cross-user (positive regression test) Tests use create_user_wise_test_client pattern and only run in SERVER_MODE with pgAdmin4_test_non_admin_credentials configured.
1 parent 2101685 commit b913595

File tree

6 files changed

+249
-27
lines changed

6 files changed

+249
-27
lines changed

web/pgadmin/browser/server_groups/servers/__init__.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from pgadmin.utils import get_complete_file_path
4747
from pgadmin.settings.utils import with_object_filters
4848
from pgadmin.utils.server_access import get_accessible_server, \
49-
get_user_server_query
49+
get_user_server_query, get_accessible_server_group
5050

5151

5252
def has_any(data, keys):
@@ -956,8 +956,6 @@ def list(self, gid, object_filters):
956956
servers = get_user_server_query().filter(
957957
Server.servergroup_id == gid,
958958
Server.is_adhoc == 0).order_by(Server.name)
959-
from pgadmin.utils.server_access import \
960-
get_accessible_server_group
961959
sg = get_accessible_server_group(gid)
962960
res = []
963961

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
##########################################################################
2+
#
3+
# pgAdmin 4 - PostgreSQL Tools
4+
#
5+
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
6+
# This software is released under the PostgreSQL Licence
7+
#
8+
##########################################################################
9+
10+
"""Tests for server data isolation between users in server mode."""
11+
12+
import json
13+
import config
14+
from pgadmin.utils.route import BaseTestGenerator
15+
from regression.python_test_utils import test_utils as utils
16+
from regression.test_setup import config_data
17+
from regression.python_test_utils.test_utils import \
18+
create_user_wise_test_client
19+
from . import utils as servers_utils
20+
21+
test_user_details = None
22+
if config.SERVER_MODE:
23+
test_user_details = \
24+
config_data['pgAdmin4_test_non_admin_credentials']
25+
26+
27+
class ServerDataIsolationGetTestCase(BaseTestGenerator):
28+
"""Verify that a non-admin user cannot access another user's
29+
private (non-shared) server by ID."""
30+
31+
scenarios = [
32+
('User B gets 410 for User A private server',
33+
dict(is_positive_test=False)),
34+
]
35+
36+
def setUp(self):
37+
if not config.SERVER_MODE:
38+
self.skipTest(
39+
'Data isolation tests only apply to server mode.'
40+
)
41+
42+
# Create a private (non-shared) server as the admin user
43+
self.server['shared'] = False
44+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
45+
response = self.tester.post(
46+
url,
47+
data=json.dumps(self.server),
48+
content_type='html/json'
49+
)
50+
response_data = json.loads(response.data.decode('utf-8'))
51+
self.server_id = response_data['node']['_id']
52+
53+
@create_user_wise_test_client(test_user_details)
54+
def runTest(self):
55+
"""Non-admin user should NOT be able to GET another user's
56+
private server."""
57+
if not self.server_id:
58+
raise Exception("Server not found to test isolation")
59+
60+
url = '/browser/server/obj/{0}/{1}'.format(
61+
utils.SERVER_GROUP, self.server_id)
62+
response = self.tester.get(url, follow_redirects=True)
63+
# Expect 410 Gone (server not accessible to this user)
64+
self.assertIn(
65+
response.status_code, [404, 410],
66+
'Non-admin user should not access another user\'s '
67+
'private server. Got status {0}'.format(
68+
response.status_code)
69+
)
70+
71+
def tearDown(self):
72+
# Clean up with the admin tester (which owns the server)
73+
utils.delete_server_with_api(
74+
self.__class__.tester, self.server_id)
75+
76+
77+
class SharedServerAccessTestCase(BaseTestGenerator):
78+
"""Verify that a shared server IS accessible by a non-admin
79+
user (positive test — shared servers should work after the
80+
isolation fixes)."""
81+
82+
scenarios = [
83+
('User B can access shared server from User A',
84+
dict(is_positive_test=True)),
85+
]
86+
87+
def setUp(self):
88+
if not config.SERVER_MODE:
89+
self.skipTest(
90+
'Data isolation tests only apply to server mode.'
91+
)
92+
93+
# Create a shared server as the admin user
94+
self.server['shared'] = True
95+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
96+
response = self.tester.post(
97+
url,
98+
data=json.dumps(self.server),
99+
content_type='html/json'
100+
)
101+
response_data = json.loads(response.data.decode('utf-8'))
102+
self.server_id = response_data['node']['_id']
103+
104+
@create_user_wise_test_client(test_user_details)
105+
def runTest(self):
106+
"""Non-admin user SHOULD be able to GET a shared server."""
107+
if not self.server_id:
108+
raise Exception("Server not found to test shared access")
109+
110+
url = '/browser/server/obj/{0}/{1}'.format(
111+
utils.SERVER_GROUP, self.server_id)
112+
response = self.tester.get(url, follow_redirects=True)
113+
self.assertEqual(
114+
response.status_code, 200,
115+
'Non-admin user should be able to access shared server.'
116+
' Got status {0}'.format(response.status_code)
117+
)
118+
119+
def tearDown(self):
120+
utils.delete_server_with_api(
121+
self.__class__.tester, self.server_id)

web/pgadmin/browser/server_groups/servers/utils.py

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -657,19 +657,29 @@ def disconnect_from_all_servers():
657657
"""
658658
all_servers = get_user_server_query().all()
659659
for server in all_servers:
660-
manager = get_driver(config.PG_DEFAULT_DRIVER).connection_manager(
661-
server.id)
662-
if manager is None:
663-
continue
664-
# Check if any psql terminal is running for the current disconnecting
665-
# server. If any terminate the psql tool connection.
666-
if 'sid_soid_mapping' in current_app.config and str(server.id) in \
667-
current_app.config['sid_soid_mapping'] and \
668-
str(server.id) in current_app.config['sid_soid_mapping']:
669-
for i in current_app.config['sid_soid_mapping'][str(server.id)]:
670-
sio.emit('disconnect-psql', namespace='/pty', to=i)
671-
672-
manager.release()
660+
try:
661+
manager = get_driver(
662+
config.PG_DEFAULT_DRIVER
663+
).connection_manager(server.id)
664+
if manager is None:
665+
continue
666+
# Check if any psql terminal is running for the
667+
# current disconnecting server.
668+
if 'sid_soid_mapping' in current_app.config \
669+
and str(server.id) in \
670+
current_app.config['sid_soid_mapping']:
671+
for i in current_app.config[
672+
'sid_soid_mapping'][str(server.id)]:
673+
sio.emit(
674+
'disconnect-psql',
675+
namespace='/pty', to=i
676+
)
677+
manager.release()
678+
except Exception:
679+
current_app.logger.warning(
680+
'Failed to disconnect server %s',
681+
server.id
682+
)
673683

674684

675685
def delete_adhoc_servers(sid=None):
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
##########################################################################
2+
#
3+
# pgAdmin 4 - PostgreSQL Tools
4+
#
5+
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
6+
# This software is released under the PostgreSQL Licence
7+
#
8+
##########################################################################
9+
10+
"""Tests for ServerGroup data isolation between users in server mode."""
11+
12+
import json
13+
import config
14+
from pgadmin.utils.route import BaseTestGenerator
15+
from regression.python_test_utils import test_utils as utils
16+
from regression.test_setup import config_data
17+
from regression.python_test_utils.test_utils import \
18+
create_user_wise_test_client
19+
from pgadmin.model import db, ServerGroup
20+
21+
test_user_details = None
22+
if config.SERVER_MODE:
23+
test_user_details = \
24+
config_data['pgAdmin4_test_non_admin_credentials']
25+
26+
27+
class ServerGroupIsolationTestCase(BaseTestGenerator):
28+
"""Verify that a non-admin user cannot fetch another user's
29+
server group properties by ID."""
30+
31+
scenarios = [
32+
('User B cannot fetch User A server group properties',
33+
dict(is_positive_test=False)),
34+
]
35+
36+
def setUp(self):
37+
if not config.SERVER_MODE:
38+
self.skipTest(
39+
'Data isolation tests only apply to server mode.'
40+
)
41+
42+
# Create a server group as the admin user
43+
url = '/browser/server_group/obj/'
44+
response = self.tester.post(
45+
url,
46+
data=json.dumps({'name': 'isolation_test_group'}),
47+
content_type='html/json'
48+
)
49+
response_data = json.loads(response.data.decode('utf-8'))
50+
self.sg_id = response_data['node']['_id']
51+
52+
@create_user_wise_test_client(test_user_details)
53+
def runTest(self):
54+
"""Non-admin user should NOT see another user's server
55+
group properties."""
56+
if not self.sg_id:
57+
raise Exception("Server group not created")
58+
59+
url = '/browser/server_group/obj/{0}'.format(self.sg_id)
60+
response = self.tester.get(url, content_type='html/json')
61+
self.assertIn(
62+
response.status_code, [404, 410],
63+
'Non-admin user should not access another user\'s '
64+
'server group. Got status {0}'.format(
65+
response.status_code)
66+
)
67+
68+
def tearDown(self):
69+
# Clean up with admin
70+
sg = ServerGroup.query.filter_by(id=self.sg_id).first()
71+
if sg:
72+
db.session.delete(sg)
73+
db.session.commit()

web/pgadmin/misc/workspaces/__init__.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,16 @@ def adhoc_connect_server():
145145
server = existing_server
146146
break
147147
else:
148-
server = Server.query.filter_by(host=new_host,
149-
port=new_port,
150-
maintenance_db=new_db,
151-
username=new_username,
152-
name=new_server_name,
153-
role=new_role,
154-
service=new_service,
155-
connection_params=connection_params,
156-
user_id=current_user.id
157-
).first()
148+
server = Server.query.filter_by(
149+
host=new_host, port=new_port,
150+
maintenance_db=new_db,
151+
username=new_username,
152+
name=new_server_name,
153+
role=new_role,
154+
service=new_service,
155+
connection_params=connection_params,
156+
user_id=current_user.id
157+
).first()
158158

159159
# If server is none then no server with the above combination is found.
160160
if server is None:

web/pgadmin/tools/debugger/__init__.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import get_extension_details
3636
from pgadmin.utils.constants import PREF_LABEL_KEYBOARD_SHORTCUTS, \
3737
SERVER_CONNECTION_CLOSED
38-
from pgadmin.tools.user_management.PgAdminPermissions import AllPermissionTypes
38+
from pgadmin.tools.user_management.PgAdminPermissions \
39+
import AllPermissionTypes
40+
from pgadmin.utils.server_access import get_accessible_server
3941
from pgadmin.preferences import preferences
4042

4143
MODULE_NAME = 'debugger'
@@ -1803,6 +1805,12 @@ def get_arguments_sqlite(sid, did, scid, func_id):
18031805
- Function Id
18041806
"""
18051807

1808+
if get_accessible_server(sid) is None:
1809+
return make_json_response(
1810+
status=410, success=0,
1811+
errormsg=gettext("Could not find the required server.")
1812+
)
1813+
18061814
"""Get the count of the existing data available in sqlite database"""
18071815
dbg_func_args_count = int(DebuggerFunctionArguments.query.filter_by(
18081816
server_id=sid,
@@ -1890,6 +1898,12 @@ def set_arguments_sqlite(sid, did, scid, func_id):
18901898
- Function Id
18911899
"""
18921900

1901+
if get_accessible_server(sid) is None:
1902+
return make_json_response(
1903+
status=410, success=0,
1904+
errormsg=gettext("Could not find the required server.")
1905+
)
1906+
18931907
if request.data:
18941908
data = json.loads(request.data)
18951909

@@ -1982,6 +1996,12 @@ def clear_arguments_sqlite(sid, did, scid, func_id):
19821996
- Function Id
19831997
"""
19841998

1999+
if get_accessible_server(sid) is None:
2000+
return make_json_response(
2001+
status=410, success=0,
2002+
errormsg=gettext("Could not find the required server.")
2003+
)
2004+
19852005
try:
19862006
db.session.query(DebuggerFunctionArguments) \
19872007
.filter(DebuggerFunctionArguments.server_id == sid,

0 commit comments

Comments
 (0)