Skip to content

Commit 4610e6f

Browse files
committed
chore: add DataSourceUpdatesSinkV2 support to DataSourceUpdatesImpl
1 parent 89fab04 commit 4610e6f

9 files changed

Lines changed: 1129 additions & 4 deletions

File tree

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataModelDependencies.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.google.common.collect.Iterables;
77
import com.launchdarkly.sdk.LDValue;
88
import com.launchdarkly.sdk.server.DataModel.Operator;
9+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
10+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSetType;
911
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
1012
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
1113
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
@@ -132,6 +134,23 @@ public static FullDataSet<ItemDescriptor> sortAllCollections(FullDataSet<ItemDes
132134
return new FullDataSet<>(builder.build().entrySet());
133135
}
134136

137+
/**
138+
* Sort the data in the changeset in dependency order. If there are any duplicates, then the highest version
139+
* of the duplicate item will be retained.
140+
*
141+
* @param inSet the changeset to sort
142+
* @return a sorted copy of the changeset
143+
*/
144+
public static ChangeSet<ItemDescriptor> sortChangeset(ChangeSet<ItemDescriptor> inSet) {
145+
ImmutableSortedMap.Builder<DataKind, KeyedItems<ItemDescriptor>> builder =
146+
ImmutableSortedMap.orderedBy(dataKindPriorityOrder);
147+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> entry: inSet.getData()) {
148+
DataKind kind = entry.getKey();
149+
builder.put(kind, sortCollection(kind, entry.getValue()));
150+
}
151+
return new ChangeSet<>(inSet.getType(), inSet.getSelector(), builder.build().entrySet(), inSet.getEnvironmentId());
152+
}
153+
135154
private static KeyedItems<ItemDescriptor> sortCollection(DataKind kind, KeyedItems<ItemDescriptor> input) {
136155
if (!isDependencyOrdered(kind) || isEmpty(input.getItems())) {
137156
return input;

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/DataSourceUpdatesImpl.java

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,14 @@
1212
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.Status;
1313
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.StatusListener;
1414
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSink;
15+
import com.launchdarkly.sdk.server.subsystems.DataSourceUpdateSinkV2;
1516
import com.launchdarkly.sdk.server.subsystems.DataStore;
17+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
1618
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.DataKind;
1719
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.FullDataSet;
1820
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
1921
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.KeyedItems;
22+
import com.launchdarkly.sdk.server.subsystems.TransactionalDataStore;
2023
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
2124
import com.launchdarkly.sdk.server.interfaces.FlagChangeEvent;
2225
import com.launchdarkly.sdk.server.interfaces.FlagChangeListener;
@@ -48,7 +51,7 @@
4851
*
4952
* @since 4.11.0
5053
*/
51-
final class DataSourceUpdatesImpl implements DataSourceUpdateSink {
54+
final class DataSourceUpdatesImpl implements DataSourceUpdateSink, DataSourceUpdateSinkV2 {
5255
private final DataStore store;
5356
private final EventBroadcasterImpl<FlagChangeListener, FlagChangeEvent> flagChangeEventNotifier;
5457
private final EventBroadcasterImpl<StatusListener, Status> dataSourceStatusNotifier;
@@ -365,4 +368,178 @@ private void onTimeout() {
365368
private static String describeErrorCount(Map.Entry<ErrorInfo, Integer> entry) {
366369
return entry.getKey() + " (" + entry.getValue() + (entry.getValue() == 1 ? " time" : " times") + ")";
367370
}
371+
372+
// ===== ITransactionalDataSourceUpdates methods =====
373+
374+
@Override
375+
public boolean apply(ChangeSet<ItemDescriptor> changeSet) {
376+
if (store instanceof TransactionalDataStore) {
377+
return applyToTransactionalStore((TransactionalDataStore) store, changeSet);
378+
}
379+
380+
// Legacy update path for non-transactional stores
381+
return applyToLegacyStore(changeSet);
382+
}
383+
384+
private boolean applyToTransactionalStore(TransactionalDataStore transactionalDataStore,
385+
ChangeSet<ItemDescriptor> changeSet) {
386+
Map<DataKind, Map<String, ItemDescriptor>> oldData;
387+
// Getting the old values requires accessing the store, which can fail.
388+
// If there is a failure to read the store, then we stop treating it as a failure.
389+
try {
390+
oldData = getOldDataIfFlagChangeListeners();
391+
} catch (RuntimeException e) {
392+
reportStoreFailure(e);
393+
return false;
394+
}
395+
396+
ChangeSet<ItemDescriptor> sortedChangeSet = DataModelDependencies.sortChangeset(changeSet);
397+
398+
try {
399+
transactionalDataStore.apply(sortedChangeSet);
400+
lastStoreUpdateFailed = false;
401+
} catch (RuntimeException e) {
402+
reportStoreFailure(e);
403+
return false;
404+
}
405+
406+
// Calling Apply implies that the data source is now in a valid state.
407+
updateStatus(State.VALID, null);
408+
409+
Set<KindAndKey> changes = updateDependencyTrackerForChangesetAndDetermineChanges(oldData, sortedChangeSet);
410+
411+
// Now, if we previously queried the old data because someone is listening for flag change events, compare
412+
// the versions of all items and generate events for those (and any other items that depend on them)
413+
if (changes != null) {
414+
sendChangeEvents(changes);
415+
}
416+
417+
return true;
418+
}
419+
420+
private boolean applyToLegacyStore(ChangeSet<ItemDescriptor> sortedChangeSet) {
421+
switch (sortedChangeSet.getType()) {
422+
case Full:
423+
return applyFullChangeSetToLegacyStore(sortedChangeSet);
424+
case Partial:
425+
return applyPartialChangeSetToLegacyStore(sortedChangeSet);
426+
case None:
427+
default:
428+
return true;
429+
}
430+
}
431+
432+
private boolean applyFullChangeSetToLegacyStore(ChangeSet<ItemDescriptor> unsortedChangeset) {
433+
// Convert ChangeSet to FullDataSet for legacy init path
434+
return init(new FullDataSet<>(unsortedChangeset.getData()));
435+
}
436+
437+
private boolean applyPartialChangeSetToLegacyStore(ChangeSet<ItemDescriptor> changeSet) {
438+
// Sorting isn't strictly required here, as upsert behavior didn't traditionally have it,
439+
// but it also doesn't hurt, and there could be cases where it results in slightly
440+
// greater store consistency for persistent stores.
441+
ChangeSet<ItemDescriptor> sortedChangeset = DataModelDependencies.sortChangeset(changeSet);
442+
443+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindItemsPair: sortedChangeset.getData()) {
444+
for (Map.Entry<String, ItemDescriptor> item: kindItemsPair.getValue().getItems()) {
445+
boolean applySuccess = upsert(kindItemsPair.getKey(), item.getKey(), item.getValue());
446+
if (!applySuccess) {
447+
return false;
448+
}
449+
}
450+
}
451+
// The upsert will update the store status in the case of a store failure.
452+
// The application of the upserts does not set the store initialized.
453+
454+
// Considering the store will be the same for the duration of the application
455+
// lifecycle we will not be applying a partial update to a store that didn't
456+
// already get a full update. The non-transactional store will also not support a selector.
457+
458+
return true;
459+
}
460+
461+
private Map<DataKind, Map<String, ItemDescriptor>> getOldDataIfFlagChangeListeners() {
462+
if (hasFlagChangeEventListeners()) {
463+
// Query the existing data if any, so that after the update we can send events for
464+
// whatever was changed
465+
Map<DataKind, Map<String, ItemDescriptor>> oldData = new HashMap<>();
466+
for (DataKind kind: ALL_DATA_KINDS) {
467+
KeyedItems<ItemDescriptor> items = store.getAll(kind);
468+
oldData.put(kind, ImmutableMap.copyOf(items.getItems()));
469+
}
470+
return oldData;
471+
} else {
472+
return null;
473+
}
474+
}
475+
476+
private Map<DataKind, Map<String, ItemDescriptor>> changeSetToMap(ChangeSet<ItemDescriptor> changeSet) {
477+
Map<DataKind, Map<String, ItemDescriptor>> ret = new HashMap<>();
478+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> e: changeSet.getData()) {
479+
ret.put(e.getKey(), ImmutableMap.copyOf(e.getValue().getItems()));
480+
}
481+
return ret;
482+
}
483+
484+
private Set<KindAndKey> updateDependencyTrackerForChangesetAndDetermineChanges(
485+
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
486+
ChangeSet<ItemDescriptor> changeSet) {
487+
switch (changeSet.getType()) {
488+
case Full:
489+
return handleFullChangeset(oldDataMap, changeSet);
490+
case Partial:
491+
return handlePartialChangeset(oldDataMap, changeSet);
492+
case None:
493+
return null;
494+
default:
495+
return null;
496+
}
497+
}
498+
499+
private Set<KindAndKey> handleFullChangeset(
500+
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
501+
ChangeSet<ItemDescriptor> changeSet) {
502+
dependencyTracker.reset();
503+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
504+
DataKind kind = kindEntry.getKey();
505+
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
506+
String key = itemEntry.getKey();
507+
dependencyTracker.updateDependenciesFrom(kind, key, itemEntry.getValue());
508+
}
509+
}
510+
511+
if (oldDataMap == null) {
512+
return null;
513+
}
514+
515+
Map<DataKind, Map<String, ItemDescriptor>> newDataMap = changeSetToMap(changeSet);
516+
return computeChangedItemsForFullDataSet(oldDataMap, newDataMap);
517+
}
518+
519+
private Set<KindAndKey> handlePartialChangeset(
520+
Map<DataKind, Map<String, ItemDescriptor>> oldDataMap,
521+
ChangeSet<ItemDescriptor> changeSet) {
522+
if (oldDataMap == null) {
523+
// Update dependencies but don't track changes when no listeners
524+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
525+
DataKind kind = kindEntry.getKey();
526+
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
527+
dependencyTracker.updateDependenciesFrom(kind, itemEntry.getKey(), itemEntry.getValue());
528+
}
529+
}
530+
return null;
531+
}
532+
533+
Set<KindAndKey> affectedItems = new HashSet<>();
534+
for (Map.Entry<DataKind, KeyedItems<ItemDescriptor>> kindEntry: changeSet.getData()) {
535+
DataKind kind = kindEntry.getKey();
536+
for (Map.Entry<String, ItemDescriptor> itemEntry: kindEntry.getValue().getItems()) {
537+
String key = itemEntry.getKey();
538+
dependencyTracker.updateDependenciesFrom(kind, key, itemEntry.getValue());
539+
dependencyTracker.addAffectedItems(affectedItems, new KindAndKey(kind, key));
540+
}
541+
}
542+
543+
return affectedItems;
544+
}
368545
}

lib/sdk/server/src/main/java/com/launchdarkly/sdk/server/Version.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,7 @@ abstract class Version {
44
private Version() {}
55

66
// This constant is updated automatically by our Gradle script during a release, if the project version has changed
7+
// x-release-please-start-version
78
static final String SDK_VERSION = "7.10.2";
9+
// x-release-please-end
810
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.launchdarkly.sdk.server.subsystems;
2+
3+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider;
4+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.ErrorInfo;
5+
import com.launchdarkly.sdk.server.interfaces.DataSourceStatusProvider.State;
6+
import com.launchdarkly.sdk.server.interfaces.DataStoreStatusProvider;
7+
8+
/**
9+
* Interfaces required by data source updates implementations in FDv2.
10+
* <p>
11+
* This interface extends {@link TransactionalDataSourceUpdateSink} to add status tracking
12+
* and status update capabilities required for FDv2 data sources.
13+
* <p>
14+
* This interface is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
15+
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
16+
*
17+
* @since 5.0.0
18+
* @see TransactionalDataSourceUpdateSink
19+
* @see DataSource
20+
*/
21+
public interface DataSourceUpdateSinkV2 extends TransactionalDataSourceUpdateSink {
22+
/**
23+
* An object that provides status tracking for the data store, if applicable.
24+
* <p>
25+
* This may be useful if the data source needs to be aware of storage problems that might require it
26+
* to take some special action: for instance, if a database outage may have caused some data to be
27+
* lost and therefore the data should be re-requested from LaunchDarkly.
28+
*
29+
* @return a {@link DataStoreStatusProvider}
30+
*/
31+
DataStoreStatusProvider getDataStoreStatusProvider();
32+
33+
/**
34+
* Informs the SDK of a change in the data source's status.
35+
* <p>
36+
* Data source implementations should use this method if they have any concept of being in a valid
37+
* state, a temporarily disconnected state, or a permanently stopped state.
38+
* <p>
39+
* If {@code newState} is different from the previous state, and/or {@code newError} is non-null, the
40+
* SDK will start returning the new status (adding a timestamp for the change) from
41+
* {@link DataSourceStatusProvider#getStatus()}, and will trigger status change events to any
42+
* registered listeners.
43+
* <p>
44+
* A special case is that if {@code newState} is {@link State#INTERRUPTED},
45+
* but the previous state was {@link State#INITIALIZING}, the state will
46+
* remain at {@link State#INITIALIZING} because {@link State#INTERRUPTED}
47+
* is only meaningful after a successful startup.
48+
*
49+
* @param newState the data source state
50+
* @param newError information about a new error, if any
51+
* @see DataSourceStatusProvider
52+
*/
53+
void updateStatus(State newState, ErrorInfo newError);
54+
}
55+
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.launchdarkly.sdk.server.subsystems;
2+
3+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ChangeSet;
4+
import com.launchdarkly.sdk.server.subsystems.DataStoreTypes.ItemDescriptor;
5+
6+
/**
7+
* Interface that an implementation of {@link DataSource} will use to push data into the SDK transactionally.
8+
* <p>
9+
* The data source interacts with this object, rather than manipulating the data store directly, so
10+
* that the SDK can perform any other necessary operations that must happen when data is updated. This
11+
* object also provides a mechanism to report status changes.
12+
* <p>
13+
* Component factories for {@link DataSource} implementations will receive an implementation of this
14+
* interface in the {@link ClientContext#getDataSourceUpdateSink()} property of {@link ClientContext}.
15+
* <p>
16+
* This interface is not stable, and not subject to any backwards compatibility guarantees or semantic versioning.
17+
* It is in early access. If you want access to this feature please join the EAP. https://launchdarkly.com/docs/sdk/features/data-saving-mode
18+
*
19+
* @since 5.0.0
20+
* @see DataSource
21+
* @see ClientContext
22+
*/
23+
public interface TransactionalDataSourceUpdateSink {
24+
/**
25+
* Apply the given change set to the store. This should be done atomically if possible.
26+
*
27+
* @param changeSet the changeset to apply
28+
* @return true if the update succeeded, false if it failed
29+
*/
30+
boolean apply(ChangeSet<ItemDescriptor> changeSet);
31+
}
32+

0 commit comments

Comments
 (0)