Skip to content
53 changes: 53 additions & 0 deletions lib/ldclient-rb/impl/datasource/null_processor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
require 'concurrent'

module LaunchDarkly
module Impl
module DataSource
#
# A minimal UpdateProcessor implementation used when the SDK is in offline mode
# or daemon (LDD) mode. It does nothing except mark itself as initialized.
#
# @private
#
class NullUpdateProcessor
include LaunchDarkly::Interfaces::DataSource

#
# Creates a new NullUpdateProcessor.
#
def initialize
@ready = Concurrent::Event.new
end

#
# Starts the data source. Since this is a null implementation, it immediately
# sets the ready event to indicate initialization is complete.
#
# @return [Concurrent::Event] The ready event
#
def start
@ready.set
@ready
end

#
# Stops the data source. This is a no-op for the null implementation.
#
# @return [void]
#
def stop
# Nothing to do
end

#
# Checks if the data source has been initialized.
#
# @return [Boolean] Always returns true since this is a null implementation
#
def initialized?
true
end
end
end
end
end
18 changes: 14 additions & 4 deletions lib/ldclient-rb/impl/datasystem.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ module DataSystem
#
# Starts the data system.
#
# This method will return immediately. The provided event will be set when the system
# This method will return immediately. The returned event will be set when the system
# has reached an initial state (either permanently failed, e.g. due to bad auth, or succeeded).
#
# @param ready_event [Concurrent::Event] Event to set when initialization is complete
# @return [void]
# @return [Concurrent::Event] Event that will be set when initialization is complete
#
def start(ready_event)
def start
raise NotImplementedError, "#{self.class} must implement #start"
end

Expand Down Expand Up @@ -117,6 +116,17 @@ def set_flag_value_eval_fn(eval_fn)
raise NotImplementedError, "#{self.class} must implement #set_flag_value_eval_fn"
end

#
# Sets the diagnostic accumulator for streaming initialization metrics.
# This should be called before start() to ensure metrics are collected.
#
# @param diagnostic_accumulator [DiagnosticAccumulator] The diagnostic accumulator
# @return [void]
#
def set_diagnostic_accumulator(diagnostic_accumulator)
raise NotImplementedError, "#{self.class} must implement #set_diagnostic_accumulator"
end

#
# Represents the availability of data in the SDK.
#
Expand Down
222 changes: 222 additions & 0 deletions lib/ldclient-rb/impl/datasystem/fdv1.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
require 'concurrent'
require 'ldclient-rb/impl/datasystem'
require 'ldclient-rb/impl/data_source'
require 'ldclient-rb/impl/data_store'
require 'ldclient-rb/impl/datasource/null_processor'
require 'ldclient-rb/impl/flag_tracker'
require 'ldclient-rb/impl/broadcaster'

module LaunchDarkly
module Impl
module DataSystem
#
# FDv1 wires the existing v1 data source and store behavior behind the
# generic DataSystem surface.
#
# @private
Comment thread
jsonbailey marked this conversation as resolved.
Outdated
#
class FDv1
include LaunchDarkly::Impl::DataSystem

#
# Creates a new FDv1 data system.
#
# @param sdk_key [String] The SDK key
# @param config [LaunchDarkly::Config] The SDK configuration
#
def initialize(sdk_key, config)
@sdk_key = sdk_key
@config = config
@shared_executor = Concurrent::SingleThreadExecutor.new

# Set up data store plumbing
@data_store_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@data_store_update_sink = LaunchDarkly::Impl::DataStore::UpdateSink.new(
@data_store_broadcaster
)

# Wrap the data store with client wrapper (must be created before status provider)
@store_wrapper = LaunchDarkly::Impl::FeatureStoreClientWrapper.new(
@config.feature_store,
@data_store_update_sink,
@config.logger
)

# Create status provider with store wrapper
@data_store_status_provider = LaunchDarkly::Impl::DataStore::StatusProvider.new(
@store_wrapper,
@data_store_update_sink
)

# Set up data source plumbing
@data_source_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@flag_change_broadcaster = LaunchDarkly::Impl::Broadcaster.new(@shared_executor, @config.logger)
@flag_tracker_impl = LaunchDarkly::Impl::FlagTracker.new(
@flag_change_broadcaster,
lambda { |_key, _context| nil } # Replaced by client to use its evaluation method
)
@data_source_update_sink = LaunchDarkly::Impl::DataSource::UpdateSink.new(
@store_wrapper,
@data_source_broadcaster,
@flag_change_broadcaster
)
@data_source_status_provider = LaunchDarkly::Impl::DataSource::StatusProvider.new(
@data_source_broadcaster,
@data_source_update_sink
)

# Ensure v1 processors can find the sink via config for status updates
@config.data_source_update_sink = @data_source_update_sink

# Update processor created in start(), because it needs the ready event
@update_processor = nil

# Diagnostic accumulator provided by client for streaming metrics
@diagnostic_accumulator = nil
end

#
# Starts the v1 update processor and returns immediately. The returned event
# will be set by the processor upon first successful initialization or upon permanent failure.
#
# @return [Concurrent::Event] Event that will be set when initialization is complete
#
def start
@update_processor = make_update_processor
@update_processor.start
end

#
# Halts the data system, stopping the update processor and shutting down the executor.
#
# @return [void]
#
def stop
@update_processor&.stop
@shared_executor.shutdown
end

#
# Returns the feature store wrapper used by this data system.
#
# @return [LaunchDarkly::Impl::DataStore::ClientWrapper]
#
def store
@store_wrapper
end

#
# Injects the flag value evaluation function used by the flag tracker to
# compute FlagValueChange events. The function signature should be
# (key, context) -> value.
#
# @param eval_fn [Proc] The evaluation function
# @return [void]
#
def set_flag_value_eval_fn(eval_fn)
@flag_tracker_impl = LaunchDarkly::Impl::FlagTracker.new(@flag_change_broadcaster, eval_fn)
end
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated

#
# Sets the diagnostic accumulator for streaming initialization metrics.
# This should be called before start() to ensure metrics are collected.
#
# @param diagnostic_accumulator [DiagnosticAccumulator] The diagnostic accumulator
# @return [void]
#
def set_diagnostic_accumulator(diagnostic_accumulator)
@diagnostic_accumulator = diagnostic_accumulator
end

#
# Returns the data source status provider.
#
# @return [LaunchDarkly::Interfaces::DataSource::StatusProvider]
#
def data_source_status_provider
@data_source_status_provider
end

#
# Returns the data store status provider.
#
# @return [LaunchDarkly::Interfaces::DataStore::StatusProvider]
#
def data_store_status_provider
@data_store_status_provider
end

#
# Returns the flag tracker.
#
# @return [LaunchDarkly::Interfaces::FlagTracker]
#
def flag_tracker
@flag_tracker_impl
end

#
# Indicates what form of data is currently available.
#
# This is calculated dynamically based on current system state.
#
# @return [Symbol] One of DataAvailability constants
#
def data_availability
return DataAvailability::DEFAULTS if @config.offline?

unless @config.use_ldd?
return DataAvailability::REFRESHED if @update_processor && @update_processor.initialized?
end
Comment thread
jsonbailey marked this conversation as resolved.
Outdated

return DataAvailability::CACHED if @store_wrapper.initialized?

DataAvailability::DEFAULTS
end

#
# Indicates the ideal form of data attainable given the current configuration.
#
# @return [Symbol] One of DataAvailability constants
#
def target_availability
return DataAvailability::DEFAULTS if @config.offline?
return DataAvailability::CACHED if @config.use_ldd?

DataAvailability::REFRESHED
end

#
# Creates the appropriate update processor based on the configuration.
#
# @return [Object] The update processor
#
private def make_update_processor
# Handle custom data source (factory or instance)
if @config.data_source
return @config.data_source unless @config.data_source.respond_to?(:call)

# Factory - call with appropriate arity
return @config.data_source.arity == 3 ?
@config.data_source.call(@sdk_key, @config, @diagnostic_accumulator) :
@config.data_source.call(@sdk_key, @config)
end

# Create default data source based on config
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new if @config.offline? || @config.use_ldd?

if @config.stream?
require 'ldclient-rb/stream'
return LaunchDarkly::StreamProcessor.new(@sdk_key, @config, @diagnostic_accumulator)
end

# Polling processor
require 'ldclient-rb/polling'
requestor = LaunchDarkly::Requestor.new(@sdk_key, @config)
LaunchDarkly::PollingProcessor.new(@config, requestor)
end
end
end
end
end

24 changes: 3 additions & 21 deletions lib/ldclient-rb/ldclient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "ldclient-rb/impl/broadcaster"
require "ldclient-rb/impl/data_source"
require "ldclient-rb/impl/data_store"
require "ldclient-rb/impl/datasource/null_processor"
require "ldclient-rb/impl/diagnostic_events"
require "ldclient-rb/impl/evaluator"
require "ldclient-rb/impl/evaluation_with_hook_result"
Expand Down Expand Up @@ -132,7 +133,7 @@ def postfork(wait_for_sec = 5)

if @config.use_ldd?
@config.logger.info { "[LDClient] Started LaunchDarkly Client in LDD mode" }
@data_source = NullUpdateProcessor.new
@data_source = LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
return # requestor and update processor are not used in this mode
end

Expand Down Expand Up @@ -710,7 +711,7 @@ def close

def create_default_data_source(sdk_key, config, diagnostic_accumulator)
if config.offline?
return NullUpdateProcessor.new
return LaunchDarkly::Impl::DataSource::NullUpdateProcessor.new
end
raise ArgumentError, "sdk_key must not be nil" if sdk_key.nil? # see LDClient constructor comment on sdk_key
if config.stream?
Expand Down Expand Up @@ -877,23 +878,4 @@ def evaluate_internal(key, context, default, with_reasons)
false
end
end

#
# Used internally when the client is offline.
# @private
#
class NullUpdateProcessor
def start
e = Concurrent::Event.new
e.set
e
end

def initialized?
true
end

def stop
end
end
end
Loading