Skip to content

Commit 5003b43

Browse files
committed
chore: Add FDv2 compatible data source for testing
1 parent 2492c12 commit 5003b43

3 files changed

Lines changed: 992 additions & 0 deletions

File tree

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
require 'concurrent/atomics'
2+
require 'ldclient-rb/impl/data_system'
3+
require 'ldclient-rb/interfaces/data_system'
4+
require 'ldclient-rb/util'
5+
require 'thread'
6+
7+
module LaunchDarkly
8+
module Impl
9+
module Integrations
10+
module TestData
11+
#
12+
# Internal implementation of both Initializer and Synchronizer protocols for TestDataV2.
13+
#
14+
# This component bridges the test data management in TestDataV2 with the FDv2 protocol
15+
# interfaces. Each instance implements both Initializer and Synchronizer protocols
16+
# and receives change notifications for dynamic updates.
17+
#
18+
class TestDataSourceV2
19+
include LaunchDarkly::Interfaces::DataSystem::Initializer
20+
include LaunchDarkly::Interfaces::DataSystem::Synchronizer
21+
22+
# @api private
23+
#
24+
# @param test_data [LaunchDarkly::Integrations::TestDataV2] the test data instance
25+
#
26+
def initialize(test_data)
27+
@test_data = test_data
28+
@closed = false
29+
@update_queue = Queue.new
30+
@lock = Mutex.new
31+
32+
# Always register for change notifications
33+
@test_data.add_instance(self)
34+
end
35+
36+
#
37+
# Return the name of this data source.
38+
#
39+
# @return [String]
40+
#
41+
def name
42+
'TestDataV2'
43+
end
44+
45+
#
46+
# Implementation of the Initializer.fetch method.
47+
#
48+
# Returns the current test data as a Basis for initial data loading.
49+
#
50+
# @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for test data)
51+
# @return [LaunchDarkly::Result] A Result containing either a Basis or an error message
52+
#
53+
def fetch(selector_store)
54+
begin
55+
@lock.synchronize do
56+
if @closed
57+
return LaunchDarkly::Result.fail('TestDataV2 source has been closed')
58+
end
59+
60+
# Get all current flags from test data
61+
init_data = @test_data.make_init_data
62+
version = @test_data.get_version
63+
64+
# Build a full transfer changeset
65+
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
66+
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_FULL)
67+
68+
# Add all flags to the changeset
69+
init_data.each do |key, flag_data|
70+
builder.add_put(
71+
LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG,
72+
key,
73+
flag_data[:version] || 1,
74+
flag_data
75+
)
76+
end
77+
78+
# Create selector for this version
79+
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
80+
change_set = builder.finish(selector)
81+
82+
basis = LaunchDarkly::Interfaces::DataSystem::Basis.new(change_set: change_set, persist: false, environment_id: nil)
83+
84+
LaunchDarkly::Result.success(basis)
85+
end
86+
rescue => e
87+
LaunchDarkly::Result.fail("Error fetching test data: #{e.message}", e)
88+
end
89+
end
90+
91+
#
92+
# Implementation of the Synchronizer.sync method.
93+
#
94+
# Yields updates as test data changes occur.
95+
#
96+
# @param selector_store [LaunchDarkly::Interfaces::DataSystem::SelectorStore] Provides the Selector (unused for test data)
97+
# @yield [LaunchDarkly::Interfaces::DataSystem::Update] Yields Update objects as synchronization progresses
98+
# @return [void]
99+
#
100+
def sync(selector_store)
101+
# First yield initial data
102+
initial_result = fetch(selector_store)
103+
unless initial_result.success?
104+
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
105+
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
106+
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
107+
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
108+
0,
109+
initial_result.error,
110+
Time.now
111+
)
112+
)
113+
return
114+
end
115+
116+
# Yield the initial successful state
117+
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
118+
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
119+
change_set: initial_result.value.change_set
120+
)
121+
122+
# Continue yielding updates as they arrive
123+
until @closed
124+
begin
125+
# stop() will push nil to the queue to wake us up when shutting down
126+
update = @update_queue.pop
127+
128+
# Handle nil sentinel for shutdown
129+
break if update.nil?
130+
131+
# Yield the actual update
132+
yield update
133+
rescue => e
134+
yield LaunchDarkly::Interfaces::DataSystem::Update.new(
135+
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
136+
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
137+
LaunchDarkly::Interfaces::DataSource::ErrorInfo::UNKNOWN,
138+
0,
139+
"Error in test data synchronizer: #{e.message}",
140+
Time.now
141+
)
142+
)
143+
break
144+
end
145+
end
146+
end
147+
148+
#
149+
# Stop the data source and clean up resources
150+
#
151+
# @return [void]
152+
#
153+
def stop
154+
@lock.synchronize do
155+
return if @closed
156+
@closed = true
157+
end
158+
159+
@test_data.closed_instance(self)
160+
# Signal shutdown to sync generator
161+
@update_queue.push(nil)
162+
end
163+
164+
#
165+
# Called by TestDataV2 when a flag is updated.
166+
#
167+
# This method converts the flag update into an FDv2 changeset and
168+
# queues it for delivery through the sync() generator.
169+
#
170+
# @param flag_data [Hash] the flag data
171+
# @return [void]
172+
#
173+
def upsert_flag(flag_data)
174+
@lock.synchronize do
175+
return if @closed
176+
177+
begin
178+
version = @test_data.get_version
179+
180+
# Build a changes transfer changeset
181+
builder = LaunchDarkly::Interfaces::DataSystem::ChangeSetBuilder.new
182+
builder.start(LaunchDarkly::Interfaces::DataSystem::IntentCode::TRANSFER_CHANGES)
183+
184+
# Add the updated flag
185+
builder.add_put(
186+
LaunchDarkly::Interfaces::DataSystem::ObjectKind::FLAG,
187+
flag_data[:key],
188+
flag_data[:version] || 1,
189+
flag_data
190+
)
191+
192+
# Create selector for this version
193+
selector = LaunchDarkly::Interfaces::DataSystem::Selector.new_selector(version.to_s, version)
194+
change_set = builder.finish(selector)
195+
196+
# Queue the update
197+
update = LaunchDarkly::Interfaces::DataSystem::Update.new(
198+
state: LaunchDarkly::Interfaces::DataSource::Status::VALID,
199+
change_set: change_set
200+
)
201+
202+
@update_queue.push(update)
203+
rescue => e
204+
# Queue an error update
205+
error_update = LaunchDarkly::Interfaces::DataSystem::Update.new(
206+
state: LaunchDarkly::Interfaces::DataSource::Status::OFF,
207+
error: LaunchDarkly::Interfaces::DataSource::ErrorInfo.new(
208+
LaunchDarkly::Interfaces::DataSource::ErrorInfo::STORE_ERROR,
209+
0,
210+
"Error processing flag update: #{e.message}",
211+
Time.now
212+
)
213+
)
214+
@update_queue.push(error_update)
215+
end
216+
end
217+
end
218+
end
219+
end
220+
end
221+
end
222+
end
223+

0 commit comments

Comments
 (0)