|
14 | 14 |
|
15 | 15 | RC_PRODUCT = "FFE_FLAGS" |
16 | 16 | RC_PATH = f"datadog/2/{RC_PRODUCT}" |
| 17 | +EXPOSURES_PATH = "/api/v2/exposures" |
| 18 | +EXPOSURE_WAIT_TIMEOUT_SECONDS = 30 |
| 19 | + |
| 20 | + |
| 21 | +def exposure_events_from_data( |
| 22 | + data: dict, flag_keys: set[str] | None = None, subject_id: str | None = None |
| 23 | +) -> list[dict]: |
| 24 | + """Return exposure events from one agent payload matching the optional flag/subject filters.""" |
| 25 | + if data.get("path") != EXPOSURES_PATH: |
| 26 | + return [] |
| 27 | + |
| 28 | + exposure_data = data.get("request", {}).get("content") |
| 29 | + if not isinstance(exposure_data, dict): |
| 30 | + return [] |
| 31 | + |
| 32 | + exposures = exposure_data.get("exposures") |
| 33 | + if not isinstance(exposures, list): |
| 34 | + return [] |
| 35 | + |
| 36 | + events = [] |
| 37 | + for event in exposures: |
| 38 | + if not isinstance(event, dict): |
| 39 | + continue |
| 40 | + |
| 41 | + flag = event.get("flag") |
| 42 | + subject = event.get("subject") |
| 43 | + event_flag_key = flag.get("key") if isinstance(flag, dict) else None |
| 44 | + event_subject_id = subject.get("id") if isinstance(subject, dict) else None |
| 45 | + |
| 46 | + if flag_keys is not None and event_flag_key not in flag_keys: |
| 47 | + continue |
| 48 | + if subject_id is not None and event_subject_id != subject_id: |
| 49 | + continue |
| 50 | + events.append(event) |
| 51 | + return events |
| 52 | + |
| 53 | + |
| 54 | +def find_exposure_events(flag_key: str, subject_id: str | None = None) -> list[dict]: |
| 55 | + """Find captured exposure events for a specific flag key and optionally a specific subject.""" |
| 56 | + events = [] |
| 57 | + for data in interfaces.agent.get_data(path_filters=EXPOSURES_PATH): |
| 58 | + events.extend(exposure_events_from_data(data, {flag_key}, subject_id)) |
| 59 | + return events |
| 60 | + |
| 61 | + |
| 62 | +def wait_for_exposure_event(flag_keys: set[str], subject_id: str | None = None) -> None: |
| 63 | + """Wait until the agent receives an exposure event for one of the given flags.""" |
| 64 | + assert interfaces.agent.wait_for( |
| 65 | + lambda data: bool(exposure_events_from_data(data, flag_keys, subject_id)), |
| 66 | + timeout=EXPOSURE_WAIT_TIMEOUT_SECONDS, |
| 67 | + ), f"Timed out waiting for exposure event for flags {sorted(flag_keys)} and subject {subject_id!r}" |
| 68 | + |
| 69 | + |
| 70 | +def wait_for_min_exposure_count(flag_key: str, expected: int, subject_id: str | None = None) -> int: |
| 71 | + """Wait until enough matching exposure events are available, then return the current count.""" |
| 72 | + count = count_exposure_events(flag_key, subject_id) |
| 73 | + |
| 74 | + if count < expected: |
| 75 | + assert interfaces.agent.wait_for( |
| 76 | + lambda _: count_exposure_events(flag_key, subject_id) >= expected, |
| 77 | + timeout=EXPOSURE_WAIT_TIMEOUT_SECONDS, |
| 78 | + ), f"Timed out waiting for exposure count >= {expected} for flag {flag_key} and subject {subject_id!r}" |
| 79 | + count = count_exposure_events(flag_key, subject_id) |
| 80 | + |
| 81 | + return count |
17 | 82 |
|
18 | 83 |
|
19 | 84 | # Simple UFC fixture for testing with doLog: true |
@@ -71,12 +136,13 @@ def setup_ffe_exposure_event_generation(self): |
71 | 136 | def test_ffe_exposure_event_generation(self): |
72 | 137 | """Test that FFE generates exposure events when flags are evaluated via weblog.""" |
73 | 138 | assert self.r.status_code == 200, f"Flag evaluation failed: {self.r.text}" |
| 139 | + wait_for_exposure_event({self.flag}, self.targeting_key) |
74 | 140 |
|
75 | 141 | # Search for our specific flag in all exposure events |
76 | 142 | matching_event = None |
77 | 143 | context_validated = False |
78 | 144 |
|
79 | | - for data in interfaces.agent.get_data(path_filters="/api/v2/exposures"): |
| 145 | + for data in interfaces.agent.get_data(path_filters=EXPOSURES_PATH): |
80 | 146 | # validate data sent to /api/v2/exposures |
81 | 147 |
|
82 | 148 | exposure_data = data["request"]["content"] |
@@ -216,11 +282,12 @@ def test_ffe_multiple_remote_config_files(self): |
216 | 282 | """Test that FFE correctly handles multiple remote config files with different flags.""" |
217 | 283 | assert self.r1.status_code == 200, f"First flag evaluation failed: {self.r1.text}" |
218 | 284 | assert self.r2.status_code == 200, f"Second flag evaluation failed: {self.r2.text}" |
| 285 | + wait_for_exposure_event({self.flag_1, self.flag_2}, self.targeting_key) |
219 | 286 |
|
220 | 287 | # Collect all exposure events for our specific flags |
221 | 288 | flags_found = set() |
222 | 289 |
|
223 | | - for data in interfaces.agent.get_data(path_filters="/api/v2/exposures"): |
| 290 | + for data in interfaces.agent.get_data(path_filters=EXPOSURES_PATH): |
224 | 291 | exposure_data = data["request"]["content"] |
225 | 292 | assert exposure_data is not None, "No exposure events were sent to agent" |
226 | 293 |
|
@@ -286,7 +353,7 @@ def test_ffe_empty_remote_config(self): |
286 | 353 |
|
287 | 354 | # When no remote config is set, FFE should still work but return default value |
288 | 355 | # The exposure events should still be generated based on library configuration |
289 | | - for data in interfaces.agent.get_data(path_filters="/api/v2/exposures"): |
| 356 | + for data in interfaces.agent.get_data(path_filters=EXPOSURES_PATH): |
290 | 357 | exposure_data = data["request"]["content"] |
291 | 358 | if exposure_data is not None: |
292 | 359 | # Validate that context is still present |
@@ -381,12 +448,13 @@ def test_ffe_malformed_remote_config_rejection(self): |
381 | 448 | """Test that FFE rejects malformed remote config and preserves the old valid configuration.""" |
382 | 449 | assert self.r1.status_code == 200, f"First flag evaluation failed: {self.r1.text}" |
383 | 450 | assert self.r2.status_code == 200, f"Second flag evaluation failed: {self.r2.text}" |
| 451 | + wait_for_exposure_event({self.flag}, self.targeting_key) |
384 | 452 |
|
385 | 453 | # Verify that exposure events are still generated for both requests |
386 | 454 | # and the flag configuration remained valid despite the malformed update |
387 | 455 | events_found = [] |
388 | 456 |
|
389 | | - for data in interfaces.agent.get_data(path_filters="/api/v2/exposures"): |
| 457 | + for data in interfaces.agent.get_data(path_filters=EXPOSURES_PATH): |
390 | 458 | exposure_data = data["request"]["content"] |
391 | 459 | assert exposure_data is not None, "No exposure events were sent to agent" |
392 | 460 |
|
@@ -430,21 +498,7 @@ def count_exposure_events(flag_key: str, subject_id: str | None = None) -> int: |
430 | 498 | Number of matching exposure events found |
431 | 499 |
|
432 | 500 | """ |
433 | | - count = 0 |
434 | | - for data in interfaces.agent.get_data(path_filters="/api/v2/exposures"): |
435 | | - exposure_data = data["request"]["content"] |
436 | | - if exposure_data is None: |
437 | | - continue |
438 | | - |
439 | | - exposures = exposure_data.get("exposures", []) |
440 | | - for event in exposures: |
441 | | - event_flag_key = event.get("flag", {}).get("key") |
442 | | - event_subject_id = event.get("subject", {}).get("id") |
443 | | - |
444 | | - if event_flag_key == flag_key: |
445 | | - if subject_id is None or event_subject_id == subject_id: |
446 | | - count += 1 |
447 | | - return count |
| 501 | + return len(find_exposure_events(flag_key, subject_id)) |
448 | 502 |
|
449 | 503 |
|
450 | 504 | @scenarios.feature_flagging_and_experimentation |
@@ -488,7 +542,7 @@ def test_ffe_exposure_caching_same_subject(self): |
488 | 542 | assert result["value"] == "value-a", f"Request {i + 1}: expected 'value-a', got '{result['value']}'" |
489 | 543 |
|
490 | 544 | # Count exposure events for this specific subject |
491 | | - exposure_count = count_exposure_events(self.flag_key, self.targeting_key) |
| 545 | + exposure_count = wait_for_min_exposure_count(self.flag_key, 1, self.targeting_key) |
492 | 546 |
|
493 | 547 | # The exposure cache should deduplicate events - we expect exactly 1 exposure |
494 | 548 | # for the same (subject, allocation, variant) tuple |
@@ -538,6 +592,10 @@ def test_ffe_exposure_caching_different_subjects(self): |
538 | 592 | result = json.loads(r.text) |
539 | 593 | assert result["value"] == "value-a", f"Request {i + 1}: expected 'value-a', got '{result['value']}'" |
540 | 594 |
|
| 595 | + # Wait for each subject to be observed before asserting exact totals. |
| 596 | + for subject in self.subjects: |
| 597 | + wait_for_min_exposure_count(self.flag_key, 1, subject) |
| 598 | + |
541 | 599 | # Count total exposure events for this flag |
542 | 600 | total_exposure_count = count_exposure_events(self.flag_key) |
543 | 601 |
|
@@ -642,7 +700,7 @@ def test_ffe_exposure_caching_allocation_cycle(self): |
642 | 700 | # - Exposure #1: default-allocation |
643 | 701 | # - Exposure #2: different-allocation (allocation changed) |
644 | 702 | # - Exposure #3: default-allocation (allocation changed back) |
645 | | - exposure_count = count_exposure_events(self.flag_key, self.targeting_key) |
| 703 | + exposure_count = wait_for_min_exposure_count(self.flag_key, 3, self.targeting_key) |
646 | 704 |
|
647 | 705 | assert exposure_count == 3, ( |
648 | 706 | f"Expected exactly 3 exposure events for subject '{self.targeting_key}' " |
@@ -737,7 +795,7 @@ def test_ffe_exposure_caching_variant_cycle(self): |
737 | 795 | # - Exposure #1: variant-a |
738 | 796 | # - Exposure #2: variant-b (variant changed) |
739 | 797 | # - Exposure #3: variant-a (variant changed back) |
740 | | - exposure_count = count_exposure_events(self.flag_key, self.targeting_key) |
| 798 | + exposure_count = wait_for_min_exposure_count(self.flag_key, 3, self.targeting_key) |
741 | 799 |
|
742 | 800 | assert exposure_count == 3, ( |
743 | 801 | f"Expected exactly 3 exposure events for subject '{self.targeting_key}' " |
@@ -911,11 +969,12 @@ def test_ffe_exp_5_missing_targeting_key(self): |
911 | 969 |
|
912 | 970 | result = json.loads(self.response.text) |
913 | 971 | assert result["value"] == "value-a", f"Expected 'value-a', got '{result['value']}'" |
| 972 | + wait_for_exposure_event({self.flag_key}, "") |
914 | 973 |
|
915 | 974 | # Search for exposure event with empty subject.id |
916 | 975 | matching_event = None |
917 | 976 | all_events_for_flag = [] # Collect all events for debugging |
918 | | - for data in interfaces.agent.get_data(path_filters="/api/v2/exposures"): |
| 977 | + for data in interfaces.agent.get_data(path_filters=EXPOSURES_PATH): |
919 | 978 | exposure_data = data["request"]["content"] |
920 | 979 | if exposure_data is None: |
921 | 980 | continue |
|
0 commit comments