Skip to content

Commit bd8471b

Browse files
committed
feat: Add optional support for per-context summary events.
1 parent bfde41d commit bd8471b

6 files changed

Lines changed: 238 additions & 36 deletions

File tree

lib/shared/internal/src/main/java/com/launchdarkly/sdk/internal/events/DefaultEventProcessor.java

Lines changed: 67 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ public Thread newThread(Runnable r) {
363363
// all the workers are busy.
364364
final BlockingQueue<FlushPayload> payloadQueue = new ArrayBlockingQueue<>(1);
365365

366-
final EventBuffer outbox = new EventBuffer(eventsConfig.capacity, logger);
366+
final EventBuffer outbox = new EventBuffer(eventsConfig.capacity, eventsConfig.perContextSummarization, logger);
367367
this.contextDeduplicator = eventsConfig.contextDeduplicator;
368368

369369
Thread mainThread = threadFactory.newThread(new Thread() {
@@ -608,7 +608,13 @@ private void triggerFlush(EventBuffer outbox, BlockingQueue<FlushPayload> payloa
608608
}
609609
FlushPayload payload = outbox.getPayload();
610610
if (diagnosticStore != null) {
611-
int eventCount = payload.events.length + (payload.summary.isEmpty() ? 0 : 1);
611+
int summaryCount = 0;
612+
for (EventSummary summary : payload.summaries) {
613+
if (!summary.isEmpty()) {
614+
summaryCount++;
615+
}
616+
}
617+
int eventCount = payload.events.length + summaryCount;
612618
diagnosticStore.recordEventsInBatch(eventCount);
613619
}
614620
busyFlushWorkersCount.incrementAndGet();
@@ -618,7 +624,10 @@ private void triggerFlush(EventBuffer outbox, BlockingQueue<FlushPayload> payloa
618624
} else {
619625
logger.debug("Skipped flushing because all workers are busy");
620626
// All the workers are busy so we can't flush now; keep the events in our state
621-
outbox.summarizer.restoreTo(payload.summary);
627+
// Only restore if using single summarizer (not per-context)
628+
if (outbox.summarizer != null && !payload.summaries.isEmpty()) {
629+
outbox.summarizer.restoreTo(payload.summaries.get(0));
630+
}
622631
synchronized(busyFlushWorkersCount) {
623632
busyFlushWorkersCount.decrementAndGet();
624633
busyFlushWorkersCount.notify();
@@ -661,15 +670,25 @@ public void run() {
661670

662671
private static final class EventBuffer {
663672
final List<Event> events = new ArrayList<>();
664-
final EventSummarizer summarizer = new EventSummarizer();
673+
final EventSummarizer summarizer; // used when perContextSummarization is false
674+
final MultiContextEventSummarizer multiContextSummarizer; // used when perContextSummarization is true
675+
private final boolean perContextSummarization;
665676
private final int capacity;
666677
private final LDLogger logger;
667678
private boolean capacityExceeded = false;
668679
private long droppedEventCount = 0;
669680

670-
EventBuffer(int capacity, LDLogger logger) {
681+
EventBuffer(int capacity, boolean perContextSummarization, LDLogger logger) {
671682
this.capacity = capacity;
683+
this.perContextSummarization = perContextSummarization;
672684
this.logger = logger;
685+
if (perContextSummarization) {
686+
this.summarizer = null;
687+
this.multiContextSummarizer = new MultiContextEventSummarizer();
688+
} else {
689+
this.summarizer = new EventSummarizer();
690+
this.multiContextSummarizer = null;
691+
}
673692
}
674693

675694
void add(Event e) {
@@ -686,19 +705,35 @@ void add(Event e) {
686705
}
687706

688707
void addToSummary(Event.FeatureRequest e) {
689-
summarizer.summarizeEvent(
690-
e.getCreationDate(),
691-
e.getKey(),
692-
e.getVersion(),
693-
e.getVariation(),
694-
e.getValue(),
695-
e.getDefaultVal(),
696-
e.getContext()
697-
);
708+
if (perContextSummarization) {
709+
multiContextSummarizer.summarizeEvent(
710+
e.getCreationDate(),
711+
e.getKey(),
712+
e.getVersion(),
713+
e.getVariation(),
714+
e.getValue(),
715+
e.getDefaultVal(),
716+
e.getContext()
717+
);
718+
} else {
719+
summarizer.summarizeEvent(
720+
e.getCreationDate(),
721+
e.getKey(),
722+
e.getVersion(),
723+
e.getVariation(),
724+
e.getValue(),
725+
e.getDefaultVal(),
726+
e.getContext()
727+
);
728+
}
698729
}
699730

700731
boolean isEmpty() {
701-
return events.isEmpty() && summarizer.isEmpty();
732+
if (perContextSummarization) {
733+
return events.isEmpty() && multiContextSummarizer.isEmpty();
734+
} else {
735+
return events.isEmpty() && summarizer.isEmpty();
736+
}
702737
}
703738

704739
long getAndClearDroppedCount() {
@@ -709,23 +744,33 @@ long getAndClearDroppedCount() {
709744

710745
FlushPayload getPayload() {
711746
Event[] eventsOut = events.toArray(new Event[events.size()]);
712-
EventSummarizer.EventSummary summary = summarizer.getSummaryAndReset();
713-
return new FlushPayload(eventsOut, summary);
747+
List<EventSummarizer.EventSummary> summaries;
748+
if (perContextSummarization) {
749+
summaries = multiContextSummarizer.getSummariesAndReset();
750+
} else {
751+
EventSummarizer.EventSummary summary = summarizer.getSummaryAndReset();
752+
summaries = java.util.Collections.singletonList(summary);
753+
}
754+
return new FlushPayload(eventsOut, summaries);
714755
}
715756

716757
void clear() {
717758
events.clear();
718-
summarizer.clear();
759+
if (perContextSummarization) {
760+
multiContextSummarizer.clear();
761+
} else {
762+
summarizer.clear();
763+
}
719764
}
720765
}
721766

722767
private static final class FlushPayload {
723768
final Event[] events;
724-
final EventSummary summary;
769+
final List<EventSummary> summaries;
725770

726-
FlushPayload(Event[] events, EventSummary summary) {
771+
FlushPayload(Event[] events, List<EventSummary> summaries) {
727772
this.events = events;
728-
this.summary = summary;
773+
this.summaries = summaries;
729774
}
730775
}
731776

@@ -774,7 +819,7 @@ public void run() {
774819
try {
775820
ByteArrayOutputStream buffer = new ByteArrayOutputStream(INITIAL_OUTPUT_BUFFER_SIZE);
776821
Writer writer = new BufferedWriter(new OutputStreamWriter(buffer, Charset.forName("UTF-8")), INITIAL_OUTPUT_BUFFER_SIZE);
777-
int outputEventCount = formatter.writeOutputEvents(payload.events, payload.summary, writer);
822+
int outputEventCount = formatter.writeOutputEvents(payload.events, payload.summaries, writer);
778823
writer.flush();
779824
EventSender.Result result = eventsConfig.eventSender.sendAnalyticsEvents(
780825
buffer.toByteArray(),

lib/shared/internal/src/main/java/com/launchdarkly/sdk/internal/events/EventOutputFormatter.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import java.io.IOException;
1313
import java.io.Writer;
14+
import java.util.List;
1415
import java.util.Map;
1516

1617
import static com.launchdarkly.sdk.internal.GsonHelpers.gsonInstance;
@@ -33,7 +34,7 @@ final class EventOutputFormatter {
3334
config.privateAttributes.toArray(new AttributeRef[config.privateAttributes.size()]));
3435
}
3536

36-
int writeOutputEvents(Event[] events, EventSummarizer.EventSummary summary, Writer writer) throws IOException {
37+
int writeOutputEvents(Event[] events, List<EventSummarizer.EventSummary> summaries, Writer writer) throws IOException {
3738
int count = 0;
3839
JsonWriter jsonWriter = new JsonWriter(writer);
3940
jsonWriter.beginArray();
@@ -42,9 +43,11 @@ int writeOutputEvents(Event[] events, EventSummarizer.EventSummary summary, Writ
4243
count++;
4344
}
4445
}
45-
if (!summary.isEmpty()) {
46-
writeSummaryEvent(summary, jsonWriter);
47-
count++;
46+
for (EventSummarizer.EventSummary summary : summaries) {
47+
if (!summary.isEmpty()) {
48+
writeSummaryEvent(summary, jsonWriter);
49+
count++;
50+
}
4851
}
4952
jsonWriter.endArray();
5053
jsonWriter.flush();
@@ -234,6 +237,11 @@ private void writeSummaryEvent(EventSummarizer.EventSummary summary, JsonWriter
234237
jw.name("endDate");
235238
jw.value(summary.endDate);
236239

240+
// Include context if present (per-context summarization)
241+
if (summary.context != null) {
242+
writeContext(summary.context, jw, true); // redact anonymous attributes
243+
}
244+
237245
jw.name("features");
238246
jw.beginObject();
239247

lib/shared/internal/src/main/java/com/launchdarkly/sdk/internal/events/EventSummarizer.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,15 @@
1616
*/
1717
final class EventSummarizer {
1818
private EventSummary eventsState;
19-
19+
private final LDContext context; // nullable - only set for per-context summarization
20+
2021
EventSummarizer() {
21-
this.eventsState = new EventSummary();
22+
this(null);
23+
}
24+
25+
EventSummarizer(LDContext context) {
26+
this.context = context;
27+
this.eventsState = new EventSummary(context);
2228
}
2329

2430
/**
@@ -76,22 +82,29 @@ boolean isEmpty() {
7682
}
7783

7884
void clear() {
79-
eventsState = new EventSummary();
85+
eventsState = new EventSummary(context);
8086
}
8187

8288
static final class EventSummary {
8389
final Map<String, FlagInfo> counters;
8490
long startDate;
8591
long endDate;
86-
92+
final LDContext context; // nullable for backward compatibility
93+
8794
EventSummary() {
88-
counters = new HashMap<>();
95+
this((LDContext) null);
96+
}
97+
98+
EventSummary(LDContext context) {
99+
this.counters = new HashMap<>();
100+
this.context = context;
89101
}
90102

91103
EventSummary(EventSummary from) {
92104
counters = new HashMap<>(from.counters);
93105
startDate = from.startDate;
94106
endDate = from.endDate;
107+
context = from.context;
95108
}
96109

97110
boolean isEmpty() {
@@ -142,7 +155,8 @@ void noteTimestamp(long time) {
142155
public boolean equals(Object other) {
143156
if (other instanceof EventSummary) {
144157
EventSummary o = (EventSummary)other;
145-
return o.counters.equals(counters) && startDate == o.startDate && endDate == o.endDate;
158+
return o.counters.equals(counters) && startDate == o.startDate && endDate == o.endDate &&
159+
Objects.equals(context, o.context);
146160
}
147161
return false;
148162
}

lib/shared/internal/src/main/java/com/launchdarkly/sdk/internal/events/EventsConfiguration.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,11 @@ public final class EventsConfiguration {
3030
final boolean initiallyInBackground;
3131
final boolean initiallyOffline;
3232
final List<AttributeRef> privateAttributes;
33+
final boolean perContextSummarization;
3334

3435
/**
3536
* Creates an instance.
36-
*
37+
*
3738
* @param allAttributesPrivate true if all attributes are private
3839
* @param capacity event buffer capacity (if zero or negative, a value of 1 is used to prevent errors)
3940
* @param contextDeduplicator optional EventContextDeduplicator; null for client-side SDK
@@ -63,6 +64,45 @@ public EventsConfiguration(
6364
boolean initiallyOffline,
6465
Collection<AttributeRef> privateAttributes
6566
) {
67+
this(allAttributesPrivate, capacity, contextDeduplicator, diagnosticRecordingIntervalMillis,
68+
diagnosticStore, eventSender, eventSendingThreadPoolSize, eventsUri, flushIntervalMillis,
69+
initiallyInBackground, initiallyOffline, privateAttributes, false);
70+
}
71+
72+
/**
73+
* Creates an instance.
74+
*
75+
* @param allAttributesPrivate true if all attributes are private
76+
* @param capacity event buffer capacity (if zero or negative, a value of 1 is used to prevent errors)
77+
* @param contextDeduplicator optional EventContextDeduplicator; null for client-side SDK
78+
* @param diagnosticRecordingIntervalMillis diagnostic recording interval
79+
* @param diagnosticStore optional DiagnosticStore; null if diagnostics are disabled
80+
* @param eventSender event delivery component; must not be null
81+
* @param eventSendingThreadPoolSize number of worker threads for event delivery; zero to use the default
82+
* @param eventsUri events base URI
83+
* @param flushIntervalMillis event flush interval
84+
* @param initiallyInBackground true if we should start out in background mode (see
85+
* {@link DefaultEventProcessor#setInBackground(boolean)})
86+
* @param initiallyOffline true if we should start out in offline mode (see
87+
* {@link DefaultEventProcessor#setOffline(boolean)})
88+
* @param privateAttributes list of private attribute references; may be null
89+
* @param perContextSummarization true to generate separate summary events per context
90+
*/
91+
public EventsConfiguration(
92+
boolean allAttributesPrivate,
93+
int capacity,
94+
EventContextDeduplicator contextDeduplicator,
95+
long diagnosticRecordingIntervalMillis,
96+
DiagnosticStore diagnosticStore,
97+
EventSender eventSender,
98+
int eventSendingThreadPoolSize,
99+
URI eventsUri,
100+
long flushIntervalMillis,
101+
boolean initiallyInBackground,
102+
boolean initiallyOffline,
103+
Collection<AttributeRef> privateAttributes,
104+
boolean perContextSummarization
105+
) {
66106
super();
67107
this.allAttributesPrivate = allAttributesPrivate;
68108
this.capacity = capacity >= 0 ? capacity : 1;
@@ -77,5 +117,6 @@ public EventsConfiguration(
77117
this.initiallyInBackground = initiallyInBackground;
78118
this.initiallyOffline = initiallyOffline;
79119
this.privateAttributes = privateAttributes == null ? Collections.emptyList() : new ArrayList<>(privateAttributes);
120+
this.perContextSummarization = perContextSummarization;
80121
}
81122
}

0 commit comments

Comments
 (0)