Skip to content

Commit 82e6ce6

Browse files
authored
fix: support for cluster with bus (#98)
1 parent 67e059b commit 82e6ce6

19 files changed

Lines changed: 164 additions & 113 deletions

File tree

.changeset/fine-rabbits-glow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'bentocache': patch
3+
---
4+
5+
Add support for Cluster with bus

.gitignore

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ build
33
coverage
44
poc
55
doc.md
6-
.env*
7-
!.env.*.example
6+
.env
87
cache.sqlite3
98
.todo.md
109

docs/vscode_grammars/dotenv.tm_language.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
".env.dev",
1010
".env.test",
1111
".env.testing",
12-
".env.testing.example",
1312
".env.production",
1413
".env.prod"
1514
],
@@ -147,4 +146,4 @@
147146
]
148147
}
149148
}
150-
}
149+
}

packages/bentocache/.env.testing.example

Lines changed: 0 additions & 10 deletions
This file was deleted.

packages/bentocache/bin/test.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
1-
import { config } from 'dotenv'
2-
import { resolve } from 'node:path'
1+
import 'dotenv/config'
2+
33
import { assert } from '@japa/assert'
44
import { fileSystem } from '@japa/file-system'
55
import { expectTypeOf } from '@japa/expect-type'
66
import { processCLIArgs, configure, run } from '@japa/runner'
77

88
import { BASE_URL } from '../tests/helpers/index.js'
99

10-
config({ path: resolve(process.cwd(), '.env.testing') })
11-
1210
/*
1311
|--------------------------------------------------------------------------
1412
| Configure tests

packages/bentocache/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@
9191
}
9292
},
9393
"dependencies": {
94-
"@boringnode/bus": "^0.7.1",
94+
"@boringnode/bus": "^0.9.0",
9595
"@julr/utils": "^1.9.0",
9696
"@poppinss/exception": "^1.2.1",
9797
"async-mutex": "^0.5.0",

packages/bentocache/src/drivers/redis.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,24 @@ export function redisDriver(options: RedisConfig): CreateDriverResult<RedisDrive
2323
/**
2424
* Create a new bus redis driver. It leverages the Pub/sub capabilities of Redis
2525
* to sending messages between your different processes.
26+
*
27+
* You can pass either connection options or an existing Redis/Cluster instance.
2628
*/
2729
export function redisBusDriver(
28-
options: { connection: IoRedisOptions } & BusOptions,
30+
options: { connection: IoRedisOptions | IoRedis | IoRedisCluster } & BusOptions,
2931
): CreateBusDriverResult {
3032
return {
3133
options,
3234
factory: () => {
35+
/**
36+
* If an existing Redis or Cluster instance is passed, use it directly
37+
*/
38+
if (options.connection instanceof IoRedis || options.connection instanceof IoRedisCluster) {
39+
return new RedisTransport(options.connection, new BinaryEncoder(), {
40+
useMessageBuffer: true,
41+
})
42+
}
43+
3344
return new RedisTransport(
3445
{ ...options.connection, useMessageBuffer: true } as RedisTransportConfig,
3546
new BinaryEncoder(),

packages/bentocache/tests/bus/bus.spec.ts

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { test } from '@japa/runner'
22
import { sleep } from '@julr/utils/misc'
33
import { MemoryTransport } from '@boringnode/bus/transports/memory'
4+
import { Redis as IoRedis, Cluster as IoRedisCluster } from 'ioredis'
45

56
import { ChaosBus } from '../helpers/chaos/chaos_bus.js'
67
import { ChaosCache } from '../helpers/chaos/chaos_cache.js'
@@ -411,4 +412,70 @@ test.group('Bus synchronization', () => {
411412

412413
await sleep(200)
413414
}).waitForDone()
415+
416+
test('works with an existing Redis instance', async ({ assert }, done) => {
417+
const redis = new IoRedis(REDIS_CREDENTIALS)
418+
419+
const bus1 = redisBusDriver({ connection: redis })
420+
.factory(null as any)
421+
.setId('foo')
422+
423+
const bus2 = redisBusDriver({ connection: redis })
424+
.factory(null as any)
425+
.setId('bar')
426+
427+
const data = {
428+
keys: ['foo'],
429+
type: CacheBusMessageType.Set,
430+
}
431+
432+
bus1.subscribe('foo', (message: any) => {
433+
assert.deepInclude(message, data)
434+
done()
435+
})
436+
437+
await sleep(200)
438+
439+
await bus2.publish('foo', data)
440+
441+
await bus1.disconnect()
442+
await bus2.disconnect()
443+
await redis.quit()
444+
445+
await sleep(200)
446+
}).waitForDone()
447+
448+
test('works with an existing Cluster instance', async ({ assert }, done) => {
449+
const cluster = new IoRedisCluster([{ host: '127.0.0.1', port: 7000 }])
450+
451+
const bus1 = redisBusDriver({ connection: cluster })
452+
.factory(null as any)
453+
.setId('foo')
454+
455+
const bus2 = redisBusDriver({ connection: cluster })
456+
.factory(null as any)
457+
.setId('bar')
458+
459+
const data = {
460+
keys: ['foo'],
461+
type: CacheBusMessageType.Set,
462+
}
463+
464+
bus1.subscribe('foo', (message: any) => {
465+
assert.deepInclude(message, data)
466+
done()
467+
})
468+
469+
await sleep(200)
470+
471+
await bus2.publish('foo', data)
472+
473+
await bus1.disconnect()
474+
await bus2.disconnect()
475+
await cluster.quit()
476+
477+
await sleep(200)
478+
})
479+
.waitForDone()
480+
.skip(!!process.env.CI, 'Skipping cluster test on CI')
414481
})

packages/bentocache/tests/drivers/dynamodb.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { registerCacheDriverTestSuite } from '../helpers/driver_test_suite.js'
66

77
const dynamoClient = new DynamoDBClient({
88
region: 'eu-west-3',
9-
endpoint: process.env.DYNAMODB_ENDPOINT,
9+
endpoint: 'http://localhost:8000',
1010
credentials: { accessKeyId: 'foo', secretAccessKey: 'foo' },
1111
})
1212

packages/bentocache/tests/drivers/knex/mysql.spec.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ import { test } from '@japa/runner'
33
import { sleep } from '@julr/utils/misc'
44

55
import { createKnexStore } from './helpers.js'
6-
import { DatabaseType, getDbConfig } from '../../helpers/db_config.js'
76
import { registerCacheDriverTestSuite } from '../../helpers/driver_test_suite.js'
87

98
test.group('Knex | MySQL driver', (group) => {
@@ -12,13 +11,20 @@ test.group('Knex | MySQL driver', (group) => {
1211
group,
1312
supportsMilliseconds: false,
1413
createDriver: (options) => {
15-
const db = knex(getDbConfig(DatabaseType.MYSQL))
14+
const db = knex({
15+
client: 'mysql2',
16+
connection: { user: 'root', password: 'root', database: 'mysql', port: 3306 },
17+
})
18+
1619
return createKnexStore({ connection: db, prefix: 'japa', ...options })
1720
},
1821
})
1922

2023
test('should not throw error when disconnecting immediately', async () => {
21-
const db = knex(getDbConfig(DatabaseType.MYSQL))
24+
const db = knex({
25+
client: 'mysql2',
26+
connection: { user: 'root', password: 'root', database: 'mysql', port: 3306 },
27+
})
2228

2329
const store = createKnexStore({ connection: db, prefix: 'japa' })
2430

0 commit comments

Comments
 (0)