-
Notifications
You must be signed in to change notification settings - Fork 54
Expand file tree
/
Copy pathfdv1.rb
More file actions
205 lines (177 loc) · 6.79 KB
/
fdv1.rb
File metadata and controls
205 lines (177 loc) · 6.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
require 'concurrent'
require 'ldclient-rb/impl/datasystem'
require 'ldclient-rb/impl/data_source'
require 'ldclient-rb/impl/data_store'
require 'ldclient-rb/impl/datasource/null_processor'
require 'ldclient-rb/impl/broadcaster'
module LaunchDarkly
module Impl
module DataSystem
#
# FDv1 wires the existing v1 data source and store behavior behind the
# generic DataSystem surface.
#
# @private
#
class FDv1
include LaunchDarkly::Impl::DataSystem
#
# Creates a new FDv1 data system.
#
# @param sdk_key [String] The SDK key
# @param config [LaunchDarkly::Config] The SDK configuration
#
def initialize(sdk_key, config)
@sdk_key = sdk_key
@config = config
@shared_executor = Concurrent::SingleThreadExecutor.new
# Set up data store plumbing
@data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@data_store_update_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(
@data_store_broadcaster
)
# Wrap the data store with client wrapper (must be created before status provider)
@store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new(
@config.feature_store,
@data_store_update_sink,
@config.logger
)
# Create status provider with store wrapper
@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(
@store_wrapper,
@data_store_update_sink
)
# Set up data source plumbing
@data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@flag_change_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(
@store_wrapper,
@data_source_broadcaster,
@flag_change_broadcaster
)
@data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(
@data_source_broadcaster,
@data_source_update_sink
)
# Ensure v1 processors can find the sink via config for status updates
@config.data_source_update_sink = @data_source_update_sink
# Update processor created in start(), because it needs the ready event
@update_processor = nil
# Diagnostic accumulator provided by client for streaming metrics
@diagnostic_accumulator = nil
end
#
# Starts the v1 update processor and returns immediately. The returned event
# will be set by the processor upon first successful initialization or upon permanent failure.
#
# @return [Concurrent::Event] Event that will be set when initialization is complete
#
def start
@update_processor = make_update_processor
@update_processor.start
end
#
# Halts the data system, stopping the update processor and shutting down the executor.
#
# @return [void]
#
def stop
@update_processor&.stop
@shared_executor.shutdown
end
#
# Returns the feature store wrapper used by this data system.
#
# @return [LaunchDarkly::Impl::DataStore::ClientWrapper]
#
def store
@store_wrapper
end
#
# Sets the diagnostic accumulator for streaming initialization metrics.
# This should be called before start() to ensure metrics are collected.
#
# @param diagnostic_accumulator [DiagnosticAccumulator] The diagnostic accumulator
# @return [void]
#
def set_diagnostic_accumulator(diagnostic_accumulator)
@diagnostic_accumulator = diagnostic_accumulator
end
#
# Returns the data source status provider.
#
# @return [LaunchDarkly::Interfaces::DataSource::StatusProvider]
#
def data_source_status_provider
@data_source_status_provider
end
#
# Returns the data store status provider.
#
# @return [LaunchDarkly::Interfaces::DataStore::StatusProvider]
#
def data_store_status_provider
@data_store_status_provider
end
#
# Returns the broadcaster for flag change notifications.
#
# @return [LaunchDarkly::Impl::Broadcaster]
#
def flag_change_broadcaster
@flag_change_broadcaster
end
#
# Indicates what form of data is currently available.
#
# This is calculated dynamically based on current system state.
#
# @return [Symbol] One of DataAvailability constants
#
def data_availability
return DataAvailability::DEFAULTS if @config.offline?
unless @config.use_ldd?
return DataAvailability::REFRESHED if @update_processor && @update_processor.initialized?
end
return DataAvailability::CACHED if @store_wrapper.initialized?
DataAvailability::DEFAULTS
end
#
# Indicates the ideal form of data attainable given the current configuration.
#
# @return [Symbol] One of DataAvailability constants
#
def target_availability
return DataAvailability::DEFAULTS if @config.offline?
return DataAvailability::CACHED if @config.use_ldd?
DataAvailability::REFRESHED
end
#
# Creates the appropriate update processor based on the configuration.
#
# @return [Object] The update processor
#
private def make_update_processor
# Handle custom data source (factory or instance)
if @config.data_source
return @config.data_source unless @config.data_source.respond_to?(:call)
# Factory - call with appropriate arity
return @config.data_source.arity == 3 ?
@config.data_source.call(@sdk_key, @config, @diagnostic_accumulator) :
@config.data_source.call(@sdk_key, @config)
end
# Create default data source based on config
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new if @config.offline? || @config.use_ldd?
if @config.stream?
require 'ldclient-rb/stream'
return LaunchDarkly::StreamProcessor.new(@sdk_key, @config, @diagnostic_accumulator)
end
# Polling processor
require 'ldclient-rb/polling'
requestor = LaunchDarkly::Requestor.new(@sdk_key, @config)
LaunchDarkly::PollingProcessor.new(@config, requestor)
end
end
end
end
end