Skip to content

Commit 9c01476

Browse files
authored
Bump @platformatic/kafka version (#484)
* Bump platformatic/kafka version * Fix topics deletion issue
1 parent 8a9555b commit 9c01476

6 files changed

Lines changed: 30 additions & 31 deletions

File tree

packages/kafka/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"name": "@message-queue-toolkit/kafka",
33
"version": "0.11.4",
44
"engines": {
5-
"node": ">= 22.14.0"
5+
"node": ">= 22.22.0"
66
},
77
"private": false,
88
"license": "MIT",
@@ -53,7 +53,7 @@
5353
"dependencies": {
5454
"@lokalise/node-core": "^14.8.1",
5555
"@lokalise/universal-ts-utils": "^4.10.0",
56-
"@platformatic/kafka": "^2.2.3"
56+
"@platformatic/kafka": "^2.3.1"
5757
},
5858
"peerDependencies": {
5959
"@message-queue-toolkit/core": ">=26.1.1",

packages/kafka/test/consumer/PermissionBatchConsumer.spec.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
type PermissionAdded,
1515
TOPICS,
1616
} from '../utils/permissionSchemas.ts'
17-
import { createTestContext, type TestContext } from '../utils/testContext.ts'
17+
import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts'
1818
import { PermissionBatchConsumer } from './PermissionBatchConsumer.ts'
1919

2020
describe('PermissionBatchConsumer', () => {
@@ -35,13 +35,7 @@ describe('PermissionBatchConsumer', () => {
3535

3636
describe('init - close', () => {
3737
beforeEach(async () => {
38-
try {
39-
await testContext.cradle.kafkaAdmin.deleteTopics({
40-
topics: TOPICS,
41-
})
42-
} catch (_) {
43-
// Ignore errors if the topic does not exist
44-
}
38+
await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS)
4539
})
4640

4741
it('should thrown an error if topics is empty', async () => {

packages/kafka/test/consumer/PermissionConsumer.spec.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
type PermissionAdded,
1212
TOPICS,
1313
} from '../utils/permissionSchemas.ts'
14-
import { createTestContext, type TestContext } from '../utils/testContext.ts'
14+
import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts'
1515
import { PermissionConsumer } from './PermissionConsumer.ts'
1616

1717
describe('PermissionConsumer', () => {
@@ -32,13 +32,7 @@ describe('PermissionConsumer', () => {
3232

3333
describe('init - close', () => {
3434
beforeEach(async () => {
35-
try {
36-
await testContext.cradle.kafkaAdmin.deleteTopics({
37-
topics: TOPICS,
38-
})
39-
} catch (_) {
40-
// Ignore errors if the topic does not exist
41-
}
35+
await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS)
4236
})
4337

4438
it('should thrown an error if topics is empty', async () => {

packages/kafka/test/publisher/PermissionPublisher.spec.ts

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
type PermissionRemoved,
88
TOPICS,
99
} from '../utils/permissionSchemas.ts'
10-
import { createTestContext, type TestContext } from '../utils/testContext.ts'
10+
import { createTestContext, deleteExistingTopics, type TestContext } from '../utils/testContext.ts'
1111
import { PermissionPublisher } from './PermissionPublisher.ts'
1212

1313
describe('PermissionPublisher', () => {
@@ -28,13 +28,7 @@ describe('PermissionPublisher', () => {
2828

2929
describe('init - close', () => {
3030
beforeEach(async () => {
31-
try {
32-
await testContext.cradle.kafkaAdmin.deleteTopics({
33-
topics: TOPICS,
34-
})
35-
} catch (_) {
36-
// Ignore errors if the topic does not exist
37-
}
31+
await deleteExistingTopics(testContext.cradle.kafkaAdmin, TOPICS)
3832
})
3933

4034
it('should thrown an error if topics is empty', () => {

packages/kafka/test/utils/testContext.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,23 @@ export const getKafkaConfig = (): KafkaConfig => ({
5555
clientId: randomUUID(),
5656
})
5757

58+
/**
59+
* Deletes only the topics that currently exist.
60+
*
61+
* Workaround for a regression in @platformatic/kafka >= 2.3.0 where Admin#deleteTopics
62+
* leaves a stale entry in its internal deduplication cache when the operation errors
63+
* (e.g. deleting a non-existent topic). Reusing the same Admin instance afterwards causes
64+
* subsequent deleteTopics calls for the same topics to hang. Filtering to existing topics
65+
* avoids triggering that error path.
66+
*/
67+
export const deleteExistingTopics = async (kafkaAdmin: Admin, topics: string[]): Promise<void> => {
68+
const existingTopics = await kafkaAdmin.listTopics()
69+
const topicsToDelete = topics.filter((topic) => existingTopics.includes(topic))
70+
if (topicsToDelete.length === 0) return
71+
72+
await kafkaAdmin.deleteTopics({ topics: topicsToDelete })
73+
}
74+
5875
const resolveDIConfig = (awilixManager: AwilixManager): DiConfig => ({
5976
awilixManager: asFunction(() => awilixManager, SINGLETON_CONFIG),
6077
kafkaConfig: asFunction(getKafkaConfig, SINGLETON_CONFIG),

pnpm-lock.yaml

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)