@@ -6,6 +6,7 @@ import { RedisAdapter } from '../src/drivers/redis_adapter.js'
66import { KnexAdapter } from '../src/drivers/knex_adapter.js'
77import { QueueSchemaService } from '../src/services/queue_schema.js'
88import { registerDriverTestSuite } from './_utils/register_driver_test_suite.js'
9+ import { withRedisWriteSpy } from './_utils/with_redis_write_spy.js'
910
1011const KEY_PREFIX = 'boringnode::queue::test::'
1112
@@ -65,26 +66,17 @@ test.group('Adapter | Redis', (group) => {
6566 } )
6667 }
6768
68- const stream = ( connection as any ) . stream as { write : ( ...args : any [ ] ) => any }
69- const originalWrite = stream . write . bind ( stream )
70- let writes = 0
71-
72- stream . write = ( ( ...args : any [ ] ) => {
73- writes ++
74- return originalWrite ( ...args )
75- } ) as typeof stream . write
76-
77- try {
78- const schedules = await adapter . listSchedules ( )
79- assert . lengthOf ( schedules , 50 )
80- assert . isAtMost (
81- writes ,
82- 4 ,
83- `Expected bounded write count with pipelining, got ${ writes } writes for 50 schedules`
84- )
85- } finally {
86- stream . write = originalWrite
87- }
69+ const { result : schedules , writes } = await withRedisWriteSpy ( {
70+ connection,
71+ run : ( ) => adapter . listSchedules ( ) ,
72+ } )
73+
74+ assert . lengthOf ( schedules , 50 )
75+ assert . isAtMost (
76+ writes ,
77+ 4 ,
78+ `Expected bounded write count with pipelining, got ${ writes } writes for 50 schedules`
79+ )
8880 } )
8981
9082 test ( 'deleteSchedule should not leave ghost index under write-failure chaos' , async ( { assert } ) => {
@@ -99,23 +91,15 @@ test.group('Adapter | Redis', (group) => {
9991 timezone : 'UTC' ,
10092 } )
10193
102- const stream = ( connection as any ) . stream as { write : ( ...args : any [ ] ) => any }
103- const originalWrite = stream . write . bind ( stream )
104- let writes = 0
105-
106- stream . write = ( ( ...args : any [ ] ) => {
107- writes ++
108- if ( writes === 2 ) {
109- throw new Error ( 'chaos: second network write blocked' )
110- }
111- return originalWrite ( ...args )
112- } ) as typeof stream . write
113-
114- try {
115- await adapter . deleteSchedule ( id )
116- } finally {
117- stream . write = originalWrite
118- }
94+ const { writes } = await withRedisWriteSpy ( {
95+ connection,
96+ run : ( ) => adapter . deleteSchedule ( id ) ,
97+ onWrite : ( writeCount ) => {
98+ if ( writeCount === 2 ) {
99+ throw new Error ( 'chaos: second network write blocked' )
100+ }
101+ } ,
102+ } )
119103
120104 const scheduleExists = await connection . exists ( `schedules::${ id } ` )
121105 const indexContains = await connection . sismember ( 'schedules::index' , id )
0 commit comments