@@ -115,12 +115,8 @@ function isUpToDateMessage<T extends Row<unknown>>(
115115// Check if a message contains txids in its headers
116116function hasTxids < T extends Row < unknown > > (
117117 message : Message < T >
118- ) : message is Message < T > & { headers : { txids ?: Array < number > } } {
119- return (
120- `headers` in message &&
121- `txids` in message . headers &&
122- Array . isArray ( message . headers . txids )
123- )
118+ ) : message is Message < T > & { headers : { txids ?: Array < string > } } {
119+ return `txids` in message . headers && Array . isArray ( message . headers . txids )
124120}
125121
126122/**
@@ -149,7 +145,7 @@ export function electricCollectionOptions<
149145 TSchema extends StandardSchemaV1 = never ,
150146 TFallback extends Row < unknown > = Row < unknown > ,
151147> ( config : ElectricCollectionConfig < TExplicit , TSchema , TFallback > ) {
152- const seenTxids = new Store < Set < string > > ( new Set ( [ ` ${ Math . random ( ) } ` ] ) )
148+ const seenTxids = new Store < Set < string > > ( new Set ( [ ] ) )
153149 const sync = createElectricSync < ResolveType < TExplicit , TSchema , TFallback > > (
154150 config . shapeOptions ,
155151 {
@@ -165,7 +161,7 @@ export function electricCollectionOptions<
165161 */
166162 const awaitTxId : AwaitTxIdFn = async (
167163 txId : string ,
168- timeout = 30000
164+ timeout : number = 30000
169165 ) : Promise < boolean > => {
170166 if ( typeof txId !== `string` ) {
171167 throw new TypeError (
@@ -246,18 +242,14 @@ export function electricCollectionOptions<
246242 ResolveType < TExplicit , TSchema , TFallback >
247243 >
248244 ) => {
249- // Runtime check (that doesn't follow type)
250- // eslint-disable-next-line
251- const handlerResult = ( await config . onDelete ! ( params ) ) ?? { }
252- const txid = ( handlerResult as { txid ?: string } ) . txid
253-
254- if ( ! txid ) {
245+ const handlerResult = await config . onDelete ! ( params )
246+ if ( ! handlerResult . txid ) {
255247 throw new Error (
256248 `Electric collection onDelete handler must return a txid`
257249 )
258250 }
259251
260- await awaitTxId ( txid )
252+ await awaitTxId ( handlerResult . txid )
261253 return handlerResult
262254 }
263255 : undefined
@@ -333,43 +325,37 @@ function createElectricSync<T extends Row<unknown>>(
333325 signal : abortController . signal ,
334326 } )
335327 let transactionStarted = false
336- let newTxids = new Set < string > ( )
328+ const newTxids = new Set < string > ( )
337329
338330 unsubscribeStream = stream . subscribe ( ( messages : Array < Message < T > > ) => {
339331 let hasUpToDate = false
340332
341333 for ( const message of messages ) {
342334 // Check for txids in the message and add them to our store
343- if ( hasTxids ( message ) && message . headers . txids ) {
344- message . headers . txids . forEach ( ( txid ) => newTxids . add ( String ( txid ) ) )
335+ if ( hasTxids ( message ) ) {
336+ message . headers . txids ? .forEach ( ( txid ) => newTxids . add ( txid ) )
345337 }
346338
347- // Check if the message contains schema information
348- if ( isChangeMessage ( message ) && message . headers . schema ) {
349- // Store the schema for future use if it's a valid string
350- if ( typeof message . headers . schema === `string` ) {
351- const schema : string = message . headers . schema
339+ if ( isChangeMessage ( message ) ) {
340+ // Check if the message contains schema information
341+ const schema = message . headers . schema
342+ if ( schema && typeof schema === `string` ) {
343+ // Store the schema for future use if it's a valid string
352344 relationSchema . setState ( ( ) => schema )
353345 }
354- }
355346
356- if ( isChangeMessage ( message ) ) {
357347 if ( ! transactionStarted ) {
358348 begin ( )
359349 transactionStarted = true
360350 }
361351
362- const value = message . value as unknown as T
363-
364- // Include the primary key and relation info in the metadata
365- const enhancedMetadata = {
366- ...message . headers ,
367- }
368-
369352 write ( {
370353 type : message . headers . operation ,
371- value,
372- metadata : enhancedMetadata ,
354+ value : message . value ,
355+ // Include the primary key and relation info in the metadata
356+ metadata : {
357+ ...message . headers ,
358+ } ,
373359 } )
374360 } else if ( isUpToDateMessage ( message ) ) {
375361 hasUpToDate = true
@@ -390,10 +376,9 @@ function createElectricSync<T extends Row<unknown>>(
390376
391377 // Always commit txids when we receive up-to-date, regardless of transaction state
392378 seenTxids . setState ( ( currentTxids ) => {
393- const clonedSeen = new Set ( currentTxids )
394- newTxids . forEach ( ( txid ) => clonedSeen . add ( String ( txid ) ) )
395-
396- newTxids = new Set ( )
379+ const clonedSeen = new Set < string > ( currentTxids )
380+ newTxids . forEach ( ( txid ) => clonedSeen . add ( txid ) )
381+ newTxids . clear ( )
397382 return clonedSeen
398383 } )
399384 }
0 commit comments