@@ -6,7 +6,7 @@ import { AnalyticsService } from "../Services/AnalyticsService.ts";
66import { getTelemetryIdentifier } from "../Identify.ts" ;
77import {
88 JIRA_ACCESS_TOKEN_HEADER ,
9- makeProductSpanPayload ,
9+ makeProductSpanBatchPayload ,
1010 productAnalyticsUrlFromConfig ,
1111 shouldAttachJiraProof ,
1212} from "../OtlpProduct.ts" ;
@@ -24,16 +24,35 @@ interface BufferedAnalyticsEvent {
2424
2525const MAX_BUFFERED_EVENTS = 1_000 ;
2626const FLUSH_BATCH_SIZE = 20 ;
27+ const FLUSH_INTERVAL_MS = 1_000 ;
28+ const INITIAL_RETRY_DELAY_MS = 30_000 ;
29+ const MAX_RETRY_DELAY_MS = 5 * 60_000 ;
30+ const FAILURE_LOG_THROTTLE_MS = 60_000 ;
31+
32+ interface ProductAnalyticsExportState {
33+ readonly failures : number ;
34+ readonly nextFlushAt : number ;
35+ readonly lastWarningAt : number ;
36+ }
2737
2838class ProductAnalyticsExportError extends Data . TaggedError ( "ProductAnalyticsExportError" ) < {
2939 readonly cause : unknown ;
3040} > { }
3141
42+ export function productAnalyticsRetryDelayMs ( failures : number ) : number {
43+ return Math . min ( MAX_RETRY_DELAY_MS , INITIAL_RETRY_DELAY_MS * 2 ** Math . max ( 0 , failures - 1 ) ) ;
44+ }
45+
3246const makeAnalyticsService = Effect . gen ( function * ( ) {
3347 const config = yield * ServerConfig ;
3448 const jiraTokenService = yield * Effect . serviceOption ( JiraTokenService ) ;
3549 const identifier = yield * getTelemetryIdentifier ;
3650 const bufferRef = yield * Ref . make < ReadonlyArray < BufferedAnalyticsEvent > > ( [ ] ) ;
51+ const exportStateRef = yield * Ref . make < ProductAnalyticsExportState > ( {
52+ failures : 0 ,
53+ nextFlushAt : 0 ,
54+ lastWarningAt : 0 ,
55+ } ) ;
3756 const productAnalyticsUrl = productAnalyticsUrlFromConfig ( config ) ;
3857
3958 const makeBaseAttributes = ( ) => ( {
@@ -71,40 +90,41 @@ const makeAnalyticsService = Effect.gen(function* () {
7190 Effect . gen ( function * ( ) {
7291 if ( ! productAnalyticsUrl || events . length === 0 ) return ;
7392 const proofHeader = yield * getJiraProofHeader ;
74- yield * Effect . forEach (
75- events ,
76- ( event ) =>
77- Effect . tryPromise ( {
78- try : async ( ) => {
79- const response = await fetch ( productAnalyticsUrl , {
80- method : "POST" ,
81- headers : {
82- "Content-Type" : "application/json" ,
83- ...proofHeader ,
84- } ,
85- body : JSON . stringify (
86- makeProductSpanPayload ( {
87- event : event . event ,
88- capturedAt : event . capturedAt ,
89- attributes : {
90- ...makeBaseAttributes ( ) ,
91- ...event . properties ,
92- } ,
93- } ) ,
94- ) ,
95- } ) ;
96- if ( ! response . ok ) {
97- throw new Error ( `Product analytics export failed with status ${ response . status } ` ) ;
98- }
93+ yield * Effect . tryPromise ( {
94+ try : async ( ) => {
95+ const response = await fetch ( productAnalyticsUrl , {
96+ method : "POST" ,
97+ headers : {
98+ "Content-Type" : "application/json" ,
99+ ...proofHeader ,
99100 } ,
100- catch : ( cause ) => new ProductAnalyticsExportError ( { cause } ) ,
101- } ) ,
102- { discard : true , concurrency : 2 } ,
103- ) ;
101+ body : JSON . stringify (
102+ makeProductSpanBatchPayload (
103+ events . map ( ( event ) => ( {
104+ event : event . event ,
105+ capturedAt : event . capturedAt ,
106+ attributes : {
107+ ...makeBaseAttributes ( ) ,
108+ ...event . properties ,
109+ } ,
110+ } ) ) ,
111+ ) ,
112+ ) ,
113+ } ) ;
114+ if ( ! response . ok ) {
115+ throw new Error ( `Product analytics export failed with status ${ response . status } ` ) ;
116+ }
117+ } ,
118+ catch : ( cause ) => new ProductAnalyticsExportError ( { cause } ) ,
119+ } ) ;
104120 } ) ;
105121
106122 const flush = Effect . gen ( function * ( ) {
107123 while ( true ) {
124+ const now = Date . now ( ) ;
125+ const exportState = yield * Ref . get ( exportStateRef ) ;
126+ if ( now < exportState . nextFlushAt ) return ;
127+
108128 const batch = yield * Ref . modify ( bufferRef , ( current ) => {
109129 if ( current . length === 0 ) {
110130 return [ [ ] as ReadonlyArray < BufferedAnalyticsEvent > , current ] as const ;
@@ -115,14 +135,34 @@ const makeAnalyticsService = Effect.gen(function* () {
115135 } ) ;
116136 if ( batch . length === 0 ) return ;
117137 yield * sendBatch ( batch ) . pipe (
138+ Effect . tap ( ( ) =>
139+ Ref . set ( exportStateRef , {
140+ failures : 0 ,
141+ nextFlushAt : 0 ,
142+ lastWarningAt : 0 ,
143+ } ) ,
144+ ) ,
118145 Effect . catch ( ( cause ) =>
119- Effect . all (
120- [
121- Ref . update ( bufferRef , ( current ) => [ ...batch , ...current ] ) ,
122- Effect . logWarning ( "Failed to flush product analytics" , { cause } ) ,
123- ] ,
124- { discard : true } ,
125- ) ,
146+ Effect . gen ( function * ( ) {
147+ const previous = yield * Ref . get ( exportStateRef ) ;
148+ const failures = previous . failures + 1 ;
149+ const retryDelayMs = productAnalyticsRetryDelayMs ( failures ) ;
150+ const shouldLog =
151+ previous . failures === 0 || now - previous . lastWarningAt >= FAILURE_LOG_THROTTLE_MS ;
152+
153+ yield * Ref . update ( bufferRef , ( current ) => [ ...batch , ...current ] ) ;
154+ yield * Ref . set ( exportStateRef , {
155+ failures,
156+ nextFlushAt : now + retryDelayMs ,
157+ lastWarningAt : shouldLog ? now : previous . lastWarningAt ,
158+ } ) ;
159+ if ( shouldLog ) {
160+ yield * Effect . logWarning ( "Failed to flush product analytics" , {
161+ cause,
162+ retryDelayMs,
163+ } ) ;
164+ }
165+ } ) ,
126166 ) ,
127167 ) ;
128168 }
@@ -151,7 +191,7 @@ const makeAnalyticsService = Effect.gen(function* () {
151191 } ) ;
152192 } ) ;
153193
154- yield * Effect . forever ( Effect . sleep ( 1000 ) . pipe ( Effect . flatMap ( ( ) => flush ) ) , {
194+ yield * Effect . forever ( Effect . sleep ( FLUSH_INTERVAL_MS ) . pipe ( Effect . flatMap ( ( ) => flush ) ) , {
155195 disableYield : true ,
156196 } ) . pipe ( Effect . forkScoped ) ;
157197 yield * Effect . addFinalizer ( ( ) => flush ) ;
0 commit comments