Skip to content

Commit 67a6f4b

Browse files
committed
test: add multi-user data isolation tests
Three test classes verifying server/group isolation and shared server access. Uses create_user_wise_test_client pattern, SERVER_MODE only.
1 parent 2f10925 commit 67a6f4b

File tree

4 files changed

+214
-1
lines changed

4 files changed

+214
-1
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
##########################################################################
2+
#
3+
# pgAdmin 4 - PostgreSQL Tools
4+
#
5+
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
6+
# This software is released under the PostgreSQL Licence
7+
#
8+
##########################################################################
9+
10+
"""Tests for server data isolation between users in server mode."""
11+
12+
import json
13+
import config
14+
from pgadmin.utils.route import BaseTestGenerator
15+
from regression.python_test_utils import test_utils as utils
16+
from regression.test_setup import config_data
17+
from regression.python_test_utils.test_utils import \
18+
create_user_wise_test_client
19+
from . import utils as servers_utils
20+
21+
test_user_details = None
22+
if config.SERVER_MODE:
23+
test_user_details = \
24+
config_data['pgAdmin4_test_non_admin_credentials']
25+
26+
27+
class ServerDataIsolationGetTestCase(BaseTestGenerator):
28+
"""Verify that a non-admin user cannot access another user's
29+
private (non-shared) server by ID."""
30+
31+
scenarios = [
32+
('User B gets 410 for User A private server',
33+
dict(is_positive_test=False)),
34+
]
35+
36+
def setUp(self):
37+
self.server_id = None
38+
if not config.SERVER_MODE:
39+
self.skipTest(
40+
'Data isolation tests only apply to server mode.'
41+
)
42+
43+
# Create a private (non-shared) server as the admin user
44+
self.server['shared'] = False
45+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
46+
response = self.tester.post(
47+
url,
48+
data=json.dumps(self.server),
49+
content_type='html/json'
50+
)
51+
self.assertEqual(response.status_code, 200)
52+
response_data = json.loads(response.data.decode('utf-8'))
53+
self.assertIn('node', response_data)
54+
self.server_id = response_data['node']['_id']
55+
56+
@create_user_wise_test_client(test_user_details)
57+
def runTest(self):
58+
"""Non-admin user should NOT be able to GET another user's
59+
private server."""
60+
if not self.server_id:
61+
raise Exception("Server not found to test isolation")
62+
63+
url = '/browser/server/obj/{0}/{1}'.format(
64+
utils.SERVER_GROUP, self.server_id)
65+
response = self.tester.get(url, follow_redirects=True)
66+
# Expect 410 Gone (server not accessible to this user)
67+
self.assertIn(
68+
response.status_code, [404, 410],
69+
'Non-admin user should not access another user\'s '
70+
'private server. Got status {0}'.format(
71+
response.status_code)
72+
)
73+
74+
def tearDown(self):
75+
if self.server_id is None:
76+
return
77+
# Clean up with the admin tester (which owns the server)
78+
utils.delete_server_with_api(
79+
self.__class__.tester, self.server_id)
80+
81+
82+
class SharedServerAccessTestCase(BaseTestGenerator):
83+
"""Verify that a shared server IS accessible by a non-admin
84+
user (positive test — shared servers should work after the
85+
isolation fixes)."""
86+
87+
scenarios = [
88+
('User B can access shared server from User A',
89+
dict(is_positive_test=True)),
90+
]
91+
92+
def setUp(self):
93+
self.server_id = None
94+
if not config.SERVER_MODE:
95+
self.skipTest(
96+
'Data isolation tests only apply to server mode.'
97+
)
98+
99+
# Create a shared server as the admin user
100+
self.server['shared'] = True
101+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
102+
response = self.tester.post(
103+
url,
104+
data=json.dumps(self.server),
105+
content_type='html/json'
106+
)
107+
self.assertEqual(response.status_code, 200)
108+
response_data = json.loads(response.data.decode('utf-8'))
109+
self.assertIn('node', response_data)
110+
self.server_id = response_data['node']['_id']
111+
112+
@create_user_wise_test_client(test_user_details)
113+
def runTest(self):
114+
"""Non-admin user SHOULD be able to GET a shared server."""
115+
if not self.server_id:
116+
raise Exception("Server not found to test shared access")
117+
118+
url = '/browser/server/obj/{0}/{1}'.format(
119+
utils.SERVER_GROUP, self.server_id)
120+
response = self.tester.get(url, follow_redirects=True)
121+
self.assertEqual(
122+
response.status_code, 200,
123+
'Non-admin user should be able to access shared server.'
124+
' Got status {0}'.format(response.status_code)
125+
)
126+
127+
def tearDown(self):
128+
if self.server_id is None:
129+
return
130+
utils.delete_server_with_api(
131+
self.__class__.tester, self.server_id)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
##########################################################################
2+
#
3+
# pgAdmin 4 - PostgreSQL Tools
4+
#
5+
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
6+
# This software is released under the PostgreSQL Licence
7+
#
8+
##########################################################################
9+
10+
"""Tests for ServerGroup data isolation between users in server mode."""
11+
12+
import json
13+
import config
14+
from pgadmin.utils.route import BaseTestGenerator
15+
from regression.python_test_utils import test_utils as utils
16+
from regression.test_setup import config_data
17+
from regression.python_test_utils.test_utils import \
18+
create_user_wise_test_client
19+
from pgadmin.model import db, ServerGroup
20+
21+
test_user_details = None
22+
if config.SERVER_MODE:
23+
test_user_details = \
24+
config_data['pgAdmin4_test_non_admin_credentials']
25+
26+
27+
class ServerGroupIsolationTestCase(BaseTestGenerator):
28+
"""Verify that a non-admin user cannot fetch another user's
29+
server group properties by ID."""
30+
31+
scenarios = [
32+
('User B cannot fetch User A server group properties',
33+
dict(is_positive_test=False)),
34+
]
35+
36+
def setUp(self):
37+
self.sg_id = None
38+
if not config.SERVER_MODE:
39+
self.skipTest(
40+
'Data isolation tests only apply to server mode.'
41+
)
42+
43+
# Create a server group as the admin user
44+
url = '/browser/server_group/obj/'
45+
response = self.tester.post(
46+
url,
47+
data=json.dumps({'name': 'isolation_test_group'}),
48+
content_type='html/json'
49+
)
50+
self.assertEqual(response.status_code, 200)
51+
response_data = json.loads(response.data.decode('utf-8'))
52+
self.assertIn('node', response_data)
53+
self.sg_id = response_data['node']['_id']
54+
55+
@create_user_wise_test_client(test_user_details)
56+
def runTest(self):
57+
"""Non-admin user should NOT see another user's server
58+
group properties."""
59+
if not self.sg_id:
60+
raise Exception("Server group not created")
61+
62+
url = '/browser/server_group/obj/{0}'.format(self.sg_id)
63+
response = self.tester.get(url, content_type='html/json')
64+
self.assertIn(
65+
response.status_code, [404, 410],
66+
'Non-admin user should not access another user\'s '
67+
'server group. Got status {0}'.format(
68+
response.status_code)
69+
)
70+
71+
def tearDown(self):
72+
# Clean up with admin
73+
if self.sg_id is None:
74+
return
75+
sg = ServerGroup.query.filter_by(id=self.sg_id).first()
76+
if sg:
77+
db.session.delete(sg)
78+
db.session.commit()

web/pgadmin/tools/erd/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -637,6 +637,8 @@ def _get_connection(sid, did, trans_id, db_name=None):
637637
:return:
638638
"""
639639
manager = get_driver(PG_DEFAULT_DRIVER).connection_manager(sid)
640+
if manager is None:
641+
raise ConnectionLost(sid, None, trans_id)
640642
try:
641643
conn = manager.connection(conn_id=trans_id,
642644
auto_reconnect=True,

web/pgadmin/tools/schema_diff/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,9 @@ def compare_database(params):
521521
if schema_result is None:
522522
socketio.emit(
523523
'compare_database_failed',
524-
gettext("Could not find the required server."),
524+
gettext(
525+
"Failed to fetch schemas from the"
526+
" server."),
525527
namespace=SOCKETIO_NAMESPACE, to=request.sid)
526528
return
527529

0 commit comments

Comments
 (0)