|
19 | 19 | from tests.live import ChannelsE2ETestCase |
20 | 20 |
|
21 | 21 |
|
| 22 | +def clean_response(response): |
| 23 | + # Remove raw_attributes to make checking easier |
| 24 | + for obj in response: |
| 25 | + del obj["raw_attributes"] |
| 26 | + del obj["raw_dn"] |
| 27 | + obj["attributes"] = dict(obj["attributes"]) |
| 28 | + obj["attributes"].pop("uid", None) |
| 29 | + return response |
| 30 | + |
| 31 | + |
22 | 32 | class TestProviderLDAP(ChannelsE2ETestCase): |
23 | 33 | """LDAP and Outpost e2e tests""" |
24 | 34 |
|
| 35 | + def assert_list_dict_equal(self, expected: list[dict], actual: list[dict], match_key="dn"): |
| 36 | + """Assert a list of dictionaries is identical, ignoring the ordering of items""" |
| 37 | + self.assertEqual(len(expected), len(actual)) |
| 38 | + for res_item in actual: |
| 39 | + all_matching = [x for x in expected if x[match_key] == res_item[match_key]] |
| 40 | + self.assertEqual(len(all_matching), 1) |
| 41 | + matching = all_matching[0] |
| 42 | + self.assertDictEqual(res_item, matching) |
| 43 | + |
25 | 44 | def start_ldap(self, outpost: Outpost): |
26 | 45 | """Start ldap container based on outpost created""" |
27 | 46 | self.run_container( |
@@ -211,20 +230,14 @@ def test_ldap_bind_search(self): |
211 | 230 | search_scope=SUBTREE, |
212 | 231 | attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES], |
213 | 232 | ) |
214 | | - response: list = _connection.response |
215 | | - # Remove raw_attributes to make checking easier |
216 | | - for obj in response: |
217 | | - del obj["raw_attributes"] |
218 | | - del obj["raw_dn"] |
219 | | - obj["attributes"] = dict(obj["attributes"]) |
| 233 | + response = clean_response(_connection.response) |
220 | 234 | o_user = outpost.user |
221 | 235 | expected = [ |
222 | 236 | { |
223 | 237 | "dn": f"cn={o_user.username},ou=users,dc=ldap,dc=goauthentik,dc=io", |
224 | 238 | "attributes": { |
225 | 239 | "cn": o_user.username, |
226 | 240 | "sAMAccountName": o_user.username, |
227 | | - "uid": o_user.uid, |
228 | 241 | "name": o_user.name, |
229 | 242 | "displayName": o_user.name, |
230 | 243 | "sn": o_user.name, |
@@ -255,7 +268,6 @@ def test_ldap_bind_search(self): |
255 | 268 | "attributes": { |
256 | 269 | "cn": embedded_account.username, |
257 | 270 | "sAMAccountName": embedded_account.username, |
258 | | - "uid": embedded_account.uid, |
259 | 271 | "name": embedded_account.name, |
260 | 272 | "displayName": embedded_account.name, |
261 | 273 | "sn": embedded_account.name, |
@@ -286,7 +298,6 @@ def test_ldap_bind_search(self): |
286 | 298 | "attributes": { |
287 | 299 | "cn": self.user.username, |
288 | 300 | "sAMAccountName": self.user.username, |
289 | | - "uid": self.user.uid, |
290 | 301 | "name": self.user.name, |
291 | 302 | "displayName": self.user.name, |
292 | 303 | "sn": self.user.name, |
@@ -355,19 +366,13 @@ def test_ldap_bind_search_no_perms(self): |
355 | 366 | search_scope=SUBTREE, |
356 | 367 | attributes=[ALL_ATTRIBUTES, ALL_OPERATIONAL_ATTRIBUTES], |
357 | 368 | ) |
358 | | - response: list = _connection.response |
359 | | - # Remove raw_attributes to make checking easier |
360 | | - for obj in response: |
361 | | - del obj["raw_attributes"] |
362 | | - del obj["raw_dn"] |
363 | | - obj["attributes"] = dict(obj["attributes"]) |
| 369 | + response = clean_response(_connection.response) |
364 | 370 | expected = [ |
365 | 371 | { |
366 | 372 | "dn": f"cn={user.username},ou=users,dc=ldap,dc=goauthentik,dc=io", |
367 | 373 | "attributes": { |
368 | 374 | "cn": user.username, |
369 | 375 | "sAMAccountName": user.username, |
370 | | - "uid": user.uid, |
371 | 376 | "name": user.name, |
372 | 377 | "displayName": user.name, |
373 | 378 | "sn": user.name, |
@@ -399,15 +404,6 @@ def test_ldap_bind_search_no_perms(self): |
399 | 404 | ] |
400 | 405 | self.assert_list_dict_equal(expected, response) |
401 | 406 |
|
402 | | - def assert_list_dict_equal(self, expected: list[dict], actual: list[dict], match_key="dn"): |
403 | | - """Assert a list of dictionaries is identical, ignoring the ordering of items""" |
404 | | - self.assertEqual(len(expected), len(actual)) |
405 | | - for res_item in actual: |
406 | | - all_matching = [x for x in expected if x[match_key] == res_item[match_key]] |
407 | | - self.assertEqual(len(all_matching), 1) |
408 | | - matching = all_matching[0] |
409 | | - self.assertDictEqual(res_item, matching) |
410 | | - |
411 | 407 | @retry() |
412 | 408 | @apply_blueprint( |
413 | 409 | "default/flow-default-authentication-flow.yaml", |
@@ -471,11 +467,8 @@ def test_ldap_search_attrs_filter(self): |
471 | 467 | search_scope=SUBTREE, |
472 | 468 | attributes=["cn"], |
473 | 469 | ) |
474 | | - response: list = _connection.response |
475 | | - # Remove raw_attributes to make checking easier |
476 | | - for obj in response: |
477 | | - del obj["raw_attributes"] |
478 | | - del obj["raw_dn"] |
| 470 | + response = clean_response(_connection.response) |
| 471 | + |
479 | 472 | o_user = outpost.user |
480 | 473 | self.assert_list_dict_equal( |
481 | 474 | [ |
|
0 commit comments