Skip to content

Commit 08270d8

Browse files
committed
test: data isolation and shared server security tests
Integration tests (test_server_data_isolation.py, 5 cases): - Private server denied to non-admin user (410) - Shared server accessible by non-admin user (200) - passexec_cmd/post_connection_sql suppressed in properties() - Owner SSL paths stripped from connection_params for non-owners - Server rename does not orphan SharedServer records Unit tests with mocks (test_shared_server_unit.py, 22 cases): - get_shared_server_properties(): suppression of passexec, post_connection_sql; SSL path stripping and override; service override; tunnel field override; None connection_params handling; session expunge verification - create_shared_server(): connection_params sanitization; tunnel port/keep_alive copied from owner; None connection_params - update_connection_parameter(): non-owner routing to SharedServer copy vs owner routing to Server directly - _update_server_details(): write routing by ownership - delete_shared_server owner guard: non-owner cannot unshare - get_shared_server(): raises on None after failed creation Server group isolation (test_sg_data_isolation.py, 1 case): - Non-admin cannot fetch another user's server group properties Batch process mock updates: - backup, import_export, maintenance, restore test mocks updated for Process.for_user() API
1 parent b4b897a commit 08270d8

File tree

7 files changed

+925
-0
lines changed

7 files changed

+925
-0
lines changed
Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
##########################################################################
2+
#
3+
# pgAdmin 4 - PostgreSQL Tools
4+
#
5+
# Copyright (C) 2013 - 2026, The pgAdmin Development Team
6+
# This software is released under the PostgreSQL Licence
7+
#
8+
##########################################################################
9+
10+
"""Tests for server data isolation between users in server mode."""
11+
12+
import json
13+
import config
14+
from pgadmin.utils.route import BaseTestGenerator
15+
from regression.python_test_utils import test_utils as utils
16+
from regression.test_setup import config_data
17+
from regression.python_test_utils.test_utils import \
18+
create_user_wise_test_client
19+
20+
test_user_details = None
21+
if config.SERVER_MODE:
22+
test_user_details = \
23+
config_data['pgAdmin4_test_non_admin_credentials']
24+
25+
26+
class ServerDataIsolationGetTestCase(BaseTestGenerator):
27+
"""Verify that a non-admin user cannot access another user's
28+
private (non-shared) server by ID."""
29+
30+
scenarios = [
31+
('User B gets 410 for User A private server',
32+
dict(is_positive_test=False)),
33+
]
34+
35+
def setUp(self):
36+
self.server_id = None
37+
if not config.SERVER_MODE:
38+
self.skipTest(
39+
'Data isolation tests only apply to server mode.'
40+
)
41+
42+
# Create a private (non-shared) server as the admin user
43+
self.server['shared'] = False
44+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
45+
response = self.tester.post(
46+
url,
47+
data=json.dumps(self.server),
48+
content_type='html/json'
49+
)
50+
self.assertEqual(response.status_code, 200)
51+
response_data = json.loads(response.data.decode('utf-8'))
52+
self.assertIn('node', response_data)
53+
self.server_id = response_data['node']['_id']
54+
55+
@create_user_wise_test_client(test_user_details)
56+
def runTest(self):
57+
"""Non-admin user should NOT be able to GET another user's
58+
private server."""
59+
if not self.server_id:
60+
raise Exception("Server not found to test isolation")
61+
62+
url = '/browser/server/obj/{0}/{1}'.format(
63+
utils.SERVER_GROUP, self.server_id)
64+
response = self.tester.get(url, follow_redirects=True)
65+
# Expect 410 Gone (server not accessible to this user)
66+
self.assertEqual(
67+
response.status_code, 410,
68+
'Non-admin user should not access another user\'s '
69+
'private server. Got status {0}'.format(
70+
response.status_code)
71+
)
72+
73+
def tearDown(self):
74+
if self.server_id is None:
75+
return
76+
# Clean up with the admin tester (which owns the server)
77+
utils.delete_server_with_api(
78+
self.__class__.tester, self.server_id)
79+
80+
81+
class SharedServerAccessTestCase(BaseTestGenerator):
82+
"""Verify that a shared server IS accessible by a non-admin
83+
user (positive test — shared servers should work after the
84+
isolation fixes)."""
85+
86+
scenarios = [
87+
('User B can access shared server from User A',
88+
dict(is_positive_test=True)),
89+
]
90+
91+
def setUp(self):
92+
self.server_id = None
93+
if not config.SERVER_MODE:
94+
self.skipTest(
95+
'Data isolation tests only apply to server mode.'
96+
)
97+
98+
# Create a shared server as the admin user
99+
self.server['shared'] = True
100+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
101+
response = self.tester.post(
102+
url,
103+
data=json.dumps(self.server),
104+
content_type='html/json'
105+
)
106+
self.assertEqual(response.status_code, 200)
107+
response_data = json.loads(response.data.decode('utf-8'))
108+
self.assertIn('node', response_data)
109+
self.server_id = response_data['node']['_id']
110+
111+
@create_user_wise_test_client(test_user_details)
112+
def runTest(self):
113+
"""Non-admin user SHOULD be able to GET a shared server."""
114+
if not self.server_id:
115+
raise Exception("Server not found to test shared access")
116+
117+
url = '/browser/server/obj/{0}/{1}'.format(
118+
utils.SERVER_GROUP, self.server_id)
119+
response = self.tester.get(url, follow_redirects=True)
120+
self.assertEqual(
121+
response.status_code, 200,
122+
'Non-admin user should be able to access shared server.'
123+
' Got status {0}'.format(response.status_code)
124+
)
125+
126+
def tearDown(self):
127+
if self.server_id is None:
128+
return
129+
utils.delete_server_with_api(
130+
self.__class__.tester, self.server_id)
131+
132+
133+
class SharedServerFieldSuppressionTestCase(BaseTestGenerator):
134+
"""Verify that owner-only sensitive fields are suppressed
135+
when a non-owner accesses a shared server's properties."""
136+
137+
scenarios = [
138+
('Shared server suppresses passexec_cmd and '
139+
'post_connection_sql for non-owner',
140+
dict(is_positive_test=True)),
141+
]
142+
143+
def setUp(self):
144+
self.server_id = None
145+
if not config.SERVER_MODE:
146+
self.skipTest(
147+
'Data isolation tests only apply to server mode.'
148+
)
149+
150+
# Create a shared server with sensitive owner-only fields
151+
self.server['shared'] = True
152+
self.server['passexec_cmd'] = '/usr/bin/get-secret'
153+
self.server['passexec_expiration'] = 100
154+
self.server['post_connection_sql'] = 'SET role admin;'
155+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
156+
response = self.tester.post(
157+
url,
158+
data=json.dumps(self.server),
159+
content_type='html/json'
160+
)
161+
self.assertEqual(response.status_code, 200)
162+
response_data = json.loads(response.data.decode('utf-8'))
163+
self.assertIn('node', response_data)
164+
self.server_id = response_data['node']['_id']
165+
166+
@create_user_wise_test_client(test_user_details)
167+
def runTest(self):
168+
"""Non-owner should NOT see passexec_cmd or
169+
post_connection_sql in properties response."""
170+
if not self.server_id:
171+
raise Exception("Server not found to test suppression")
172+
173+
url = '/browser/server/obj/{0}/{1}'.format(
174+
utils.SERVER_GROUP, self.server_id)
175+
response = self.tester.get(url, follow_redirects=True)
176+
self.assertEqual(response.status_code, 200)
177+
data = json.loads(response.data.decode('utf-8'))
178+
179+
# passexec_cmd must be None/null for non-owners
180+
self.assertIsNone(
181+
data.get('passexec_cmd'),
182+
'passexec_cmd should be suppressed for non-owners.'
183+
' Got: {0}'.format(data.get('passexec_cmd'))
184+
)
185+
self.assertIsNone(
186+
data.get('passexec_expiration'),
187+
'passexec_expiration should be suppressed for '
188+
'non-owners.'
189+
)
190+
# post_connection_sql must be None/null for non-owners
191+
self.assertIsNone(
192+
data.get('post_connection_sql'),
193+
'post_connection_sql should be suppressed for '
194+
'non-owners. Got: {0}'.format(
195+
data.get('post_connection_sql'))
196+
)
197+
198+
def tearDown(self):
199+
if self.server_id is None:
200+
return
201+
utils.delete_server_with_api(
202+
self.__class__.tester, self.server_id)
203+
204+
205+
class SharedServerConnectionParamsIsolationTestCase(
206+
BaseTestGenerator):
207+
"""Verify that owner's SSL file paths in connection_params
208+
are not leaked to non-owners of shared servers."""
209+
210+
scenarios = [
211+
('Shared server strips owner SSL paths for non-owner',
212+
dict(is_positive_test=True)),
213+
]
214+
215+
def setUp(self):
216+
self.server_id = None
217+
if not config.SERVER_MODE:
218+
self.skipTest(
219+
'Data isolation tests only apply to server mode.'
220+
)
221+
222+
# Create shared server with owner SSL paths
223+
self.server['shared'] = True
224+
# Set connection_params with owner-specific paths
225+
conn_params = self.server.get('connection_params', {})
226+
conn_params['sslcert'] = '/home/owner/.ssl/cert.pem'
227+
conn_params['sslkey'] = '/home/owner/.ssl/key.pem'
228+
conn_params['sslrootcert'] = '/home/owner/.ssl/ca.pem'
229+
self.server['connection_params'] = conn_params
230+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
231+
response = self.tester.post(
232+
url,
233+
data=json.dumps(self.server),
234+
content_type='html/json'
235+
)
236+
self.assertEqual(response.status_code, 200)
237+
response_data = json.loads(response.data.decode('utf-8'))
238+
self.assertIn('node', response_data)
239+
self.server_id = response_data['node']['_id']
240+
241+
@create_user_wise_test_client(test_user_details)
242+
def runTest(self):
243+
"""Non-owner should NOT see owner's SSL file paths
244+
in connection_params."""
245+
if not self.server_id:
246+
raise Exception("Server not found")
247+
248+
url = '/browser/server/obj/{0}/{1}'.format(
249+
utils.SERVER_GROUP, self.server_id)
250+
response = self.tester.get(url, follow_redirects=True)
251+
self.assertEqual(response.status_code, 200)
252+
data = json.loads(response.data.decode('utf-8'))
253+
254+
conn_params = data.get('connection_params', {})
255+
# Owner SSL paths should be stripped for non-owners
256+
# (non-owner has no SharedServer SSL paths configured,
257+
# so keys should be absent)
258+
for key in ('sslcert', 'sslkey', 'sslrootcert',
259+
'sslcrl', 'sslcrldir'):
260+
val = None
261+
if isinstance(conn_params, list):
262+
for item in conn_params:
263+
if item.get('name') == key:
264+
val = item.get('value')
265+
break
266+
elif isinstance(conn_params, dict):
267+
val = conn_params.get(key)
268+
self.assertIsNone(
269+
val,
270+
'Owner SSL path "{0}" should not leak to '
271+
'non-owner. Got: {1}'.format(key, val)
272+
)
273+
274+
def tearDown(self):
275+
if self.server_id is None:
276+
return
277+
utils.delete_server_with_api(
278+
self.__class__.tester, self.server_id)
279+
280+
281+
class SharedServerRenameDoesNotOrphanTestCase(BaseTestGenerator):
282+
"""Verify that renaming a shared server does not create
283+
orphan SharedServer records (Issue 20 fix — lookup uses
284+
osid, not name)."""
285+
286+
scenarios = [
287+
('Rename shared server preserves non-owner access',
288+
dict(is_positive_test=True)),
289+
]
290+
291+
def setUp(self):
292+
self.server_id = None
293+
if not config.SERVER_MODE:
294+
self.skipTest(
295+
'Data isolation tests only apply to server mode.'
296+
)
297+
298+
# Save admin tester BEFORE the decorator replaces it.
299+
self.admin_tester = self.tester
300+
301+
self.server['shared'] = True
302+
url = "/browser/server/obj/{0}/".format(utils.SERVER_GROUP)
303+
response = self.tester.post(
304+
url,
305+
data=json.dumps(self.server),
306+
content_type='html/json'
307+
)
308+
self.assertEqual(response.status_code, 200)
309+
response_data = json.loads(response.data.decode('utf-8'))
310+
self.assertIn('node', response_data)
311+
self.server_id = response_data['node']['_id']
312+
313+
@create_user_wise_test_client(test_user_details)
314+
def runTest(self):
315+
"""After owner renames the shared server, non-owner
316+
should still be able to access it."""
317+
if not self.server_id:
318+
raise Exception("Server not found")
319+
320+
# First access as non-owner to create SharedServer record
321+
url = '/browser/server/obj/{0}/{1}'.format(
322+
utils.SERVER_GROUP, self.server_id)
323+
response = self.tester.get(url, follow_redirects=True)
324+
self.assertEqual(response.status_code, 200)
325+
326+
# Rename the server as admin (saved in setUp before
327+
# the decorator replaced self.tester).
328+
response = self.admin_tester.put(
329+
'/browser/server/obj/{0}/{1}'.format(
330+
utils.SERVER_GROUP, self.server_id),
331+
data=json.dumps(
332+
{'name': 'renamed_shared_server'}),
333+
content_type='html/json'
334+
)
335+
self.assertIn(
336+
response.status_code, [200],
337+
'Admin should be able to rename shared server.'
338+
)
339+
340+
# Access again as non-owner — should still work
341+
response = self.tester.get(url, follow_redirects=True)
342+
self.assertEqual(
343+
response.status_code, 200,
344+
'Non-owner should still access shared server after '
345+
'rename. Got status {0}'.format(response.status_code)
346+
)
347+
348+
def tearDown(self):
349+
if self.server_id is None:
350+
return
351+
utils.delete_server_with_api(
352+
self.__class__.tester, self.server_id)

0 commit comments

Comments
 (0)