Skip to content

Commit 7111190

Browse files
committed
feat: consume is fully typed
1 parent 9282484 commit 7111190

File tree

7 files changed

+70
-76
lines changed

7 files changed

+70
-76
lines changed

README.md

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,29 +30,32 @@ npm install @nextorders/queue
3030
Create type definitions for your events:
3131

3232
```typescript
33-
import type { BaseEventMessage } from '@nextorders/queue'
33+
import type { BaseEventMap, BaseEventMessage, BaseEventMessageHandlerMap } from '@nextorders/queue'
3434

3535
export enum Events {
3636
UserCreated = 'userCreated',
3737
EmailSent = 'emailSent',
3838
}
3939

40-
export type EventMessage = UserCreated | EmailSent
40+
type EventMessage = UserCreated | EmailSent
41+
type EventMap = BaseEventMap<EventMessage>
4142

42-
export interface UserCreated extends BaseEventMessage {
43+
export type EventHandlerMap = Partial<BaseEventMessageHandlerMap<EventMap>>
44+
45+
type UserCreatedData = {
46+
id: string
47+
name: string
48+
email: string
49+
}
50+
export interface UserCreated extends BaseEventMessage<UserCreatedData> {
4351
event: typeof Events.UserCreated
44-
data: {
45-
id: string
46-
name: string
47-
email: string
48-
}
4952
}
5053

51-
export interface EmailSent extends BaseEventMessage {
54+
type EmailSentData = {
55+
email: string
56+
}
57+
export interface EmailSent extends BaseEventMessage<EmailSentData> {
5258
event: typeof Events.EmailSent
53-
data: {
54-
email: string
55-
}
5659
}
5760
```
5861

@@ -97,14 +100,6 @@ import { Email, User } from './entities'
97100
class QueueRepository extends Repository {
98101
user: User = new User(this)
99102
email: Email = new Email(this)
100-
101-
// Override publish method with proper typing
102-
publish<T extends EventMessage>(
103-
event: T['event'],
104-
data: T['data']
105-
): Promise<void> {
106-
return super.publish(event, data)
107-
}
108103
}
109104

110105
export const repository = new QueueRepository()
@@ -125,7 +120,7 @@ await repository.connect('amqp://guest:guest@localhost:5672')
125120
Create and publish events from your services:
126121

127122
```typescript
128-
await repository.publish(Events.UserCreated, {
123+
await repository.publish<UserCreated>(Events.UserCreated, {
129124
id: newUser.id,
130125
name: newUser.name,
131126
email: newUser.email,
@@ -137,9 +132,14 @@ await repository.publish(Events.UserCreated, {
137132
Subscribe to events and handle them:
138133

139134
```typescript
140-
import type { UserCreated } from './types'
141-
import { repository } from './repository'
142-
import { Events } from './types'
135+
import type { EmailSent, EventHandlerMap, UserCreated } from '../repository/types'
136+
import { repository } from '../repository'
137+
import { Events } from '../repository/types'
138+
139+
// Subscribe to Events and handle them
140+
repository.consume<EventHandlerMap>(repository.email.name, {
141+
userCreated: handleUserCreated,
142+
})
143143

144144
// Define event handlers
145145
async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
@@ -155,14 +155,11 @@ async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
155155
async function sendEmail(email: string): Promise<void> {
156156
console.warn('Sending email to:', email)
157157

158-
// Publish EmailSent event
159-
await repository.publish(Events.EmailSent, { email })
158+
// Publish Event for other services
159+
await repository.publish<EmailSent>(Events.EmailSent, {
160+
email,
161+
})
160162
}
161-
162-
// Subscribe to events
163-
await repository.consume(repository.email.name, {
164-
[Events.UserCreated]: handleUserCreated,
165-
})
166163
```
167164

168165
## 💁‍♂️ Example: Microservices Architecture
Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
1-
import type { EventMessage } from './types'
21
import { Repository } from '@nextorders/queue'
32
import { Email } from './entities/email'
43
import { User } from './entities/user'
54

65
class QueueRepository extends Repository {
76
user = new User(this)
87
email = new Email(this)
9-
10-
async publish<T extends EventMessage>(event: T['event'], data: T['data']) {
11-
return super.publish(event, data)
12-
}
138
}
149

1510
export const repository = new QueueRepository()
Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,28 @@
1-
import type { BaseEventMessage, Status } from '@nextorders/queue'
1+
import type { BaseEventMap, BaseEventMessage, BaseEventMessageHandlerMap } from '@nextorders/queue'
22

33
// All possible events
44
export enum Events {
55
UserCreated = 'userCreated',
66
EmailSent = 'emailSent',
77
}
88

9-
export type EventMessage = UserCreated | EmailSent
9+
type EventMessage = UserCreated | EmailSent
10+
type EventMap = BaseEventMap<EventMessage>
1011

11-
export type EventHandler = (msg: EventMessage) => Promise<Status>
12-
export type EventMessageHandler<T = EventMessage['data']> = (data: T) => Promise<boolean>
12+
export type EventHandlerMap = Partial<BaseEventMessageHandlerMap<EventMap>>
1313

14-
export type EventHandlerMap = Record<EventMessage['event'], EventMessageHandler>
15-
16-
export interface UserCreated extends BaseEventMessage {
14+
type UserCreatedData = {
15+
id: string
16+
name: string
17+
email: string
18+
}
19+
export interface UserCreated extends BaseEventMessage<UserCreatedData> {
1720
event: typeof Events.UserCreated
18-
data: {
19-
id: string
20-
name: string
21-
email: string
22-
}
2321
}
2422

25-
export interface EmailSent extends BaseEventMessage {
23+
type EmailSentData = {
24+
email: string
25+
}
26+
export interface EmailSent extends BaseEventMessage<EmailSentData> {
2627
event: typeof Events.EmailSent
27-
data: {
28-
email: string
29-
}
3028
}

examples/microservices/service1/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { UserCreated } from '../repository/types'
12
import { repository } from '../repository'
23
import { Events } from '../repository/types'
34

@@ -14,7 +15,7 @@ const newUser = {
1415
}
1516

1617
// Publish Event for other services
17-
repository.publish(Events.UserCreated, {
18+
repository.publish<UserCreated>(Events.UserCreated, {
1819
id: newUser.id,
1920
name: newUser.name,
2021
email: newUser.email,
Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,17 @@
1-
import type { EventHandlerMap, UserCreated } from '../repository/types'
1+
import type { EmailSent, EventHandlerMap, UserCreated } from '../repository/types'
22
import { repository } from '../repository'
33
import { Events } from '../repository/types'
44

55
// Service 2: Email Service
66

7-
// Consume to Events
8-
repository.consume(repository.email.name, {
9-
[Events.UserCreated]: handleUserCreated,
10-
} as EventHandlerMap)
7+
// Subscribe to Events and handle them
8+
repository.consume<EventHandlerMap>(repository.email.name, {
9+
userCreated: handleUserCreated,
10+
})
1111

1212
// Business logic
1313
async function handleUserCreated(data: UserCreated['data']): Promise<boolean> {
1414
try {
15-
// Service logic: Send email
1615
await sendEmail(data.email)
1716
return true
1817
} catch (error) {
@@ -25,7 +24,7 @@ async function sendEmail(email: string) {
2524
console.warn('Sending email to', email)
2625

2726
// Publish Event for other services
28-
await repository.publish(Events.EmailSent, {
27+
await repository.publish<EmailSent>(Events.EmailSent, {
2928
email,
3029
})
3130
}

packages/queue/src/repository.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { Consumer, Publisher } from 'rabbitmq-client'
2-
import type { BaseEventHandlerMap, BaseEventMessage, Binding, ExchangesList, Queue, QueueRepository, Status } from './types'
2+
import type { BaseEventMap, BaseEventMessage, BaseEventMessageHandlerMap, Binding, ExchangesList, Queue, QueueRepository, Status } from './types'
33
import { Connection, ConsumerStatus } from 'rabbitmq-client'
44
import { COMMON_EXCHANGES } from './exchanges'
55

@@ -48,7 +48,7 @@ export class Repository implements QueueRepository {
4848
return this.#connection
4949
}
5050

51-
async publish<T extends BaseEventMessage>(event: T['event'], data: T['data']) {
51+
async publish<T extends BaseEventMessage<any>>(event: T['event'], data: T['data']) {
5252
return this.publisher.send({
5353
exchange: this.exchanges.events.exchange,
5454
routingKey: event,
@@ -58,7 +58,7 @@ export class Repository implements QueueRepository {
5858
})
5959
}
6060

61-
async consume<T extends BaseEventMessage['data']>(queue: string, eventHandlers: BaseEventHandlerMap<T>): Promise<Consumer> {
61+
async consume<T extends BaseEventMessageHandlerMap<BaseEventMap<any>>>(queue: string, eventHandlers: T): Promise<Consumer> {
6262
if (!queue) {
6363
throw new Error(`Queue "${queue}" not found`)
6464
}
@@ -72,7 +72,7 @@ export class Repository implements QueueRepository {
7272
qos: {
7373
prefetchCount: 1,
7474
},
75-
}, async (msg) => this.handleEvent(eventHandlers, msg.body))
75+
}, async (msg) => this.#handleEvent(eventHandlers, msg.body))
7676

7777
consumer.on('error', (err) => {
7878
// Maybe the consumer was cancelled, or the connection was reset before a
@@ -83,7 +83,7 @@ export class Repository implements QueueRepository {
8383
return consumer
8484
}
8585

86-
async handleEvent<T extends BaseEventMessage['data']>(eventHandlers: BaseEventHandlerMap<T>, msg: BaseEventMessage<T>): Promise<Status> {
86+
async #handleEvent(eventHandlers: BaseEventMessageHandlerMap<BaseEventMap<any>>, msg: BaseEventMessage<any>): Promise<Status> {
8787
const handler = eventHandlers[msg.event]
8888
if (!handler) {
8989
return this.ignore()

packages/queue/src/types.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@ export interface QueueRepository {
1515
exchanges: ExchangesList
1616
bindings: Binding[]
1717
queues: Queue[]
18-
publish: <T extends BaseEventMessage>(event: T['event'], data: T['data']) => Promise<void>
19-
consume: <T extends BaseEventMessage['data']>(queue: string, eventHandlers: BaseEventHandlerMap<T>) => Promise<Consumer>
20-
handleEvent: <T extends BaseEventMessage['data']>(eventHandlers: BaseEventHandlerMap<T>, msg: BaseEventMessage<T>) => Promise<Status>
18+
publish: <T extends BaseEventMessage<any>>(event: T['event'], data: T['data']) => Promise<void>
19+
consume: <T extends BaseEventMessageHandlerMap<BaseEventMap<any>>>(queue: string, eventHandlers: T) => Promise<Consumer>
2120
connect: (connectionString: string) => Promise<void>
2221
checkHealth: () => boolean
2322
success: () => Status
@@ -32,25 +31,30 @@ export interface QueueEntity {
3231
}
3332

3433
/**
35-
* @example export interface UserCreated extends BaseEventMessage {
36-
* event: 'userCreated'
37-
* data: {
34+
* @example export interface UserCreated extends BaseEventMessage<{
3835
* id: string
3936
* name: string
4037
* email: string
41-
* }
38+
* }> {
39+
* event: 'userCreated'
4240
* }
4341
*/
44-
export interface BaseEventMessage<T = Record<string, unknown>> {
42+
export interface BaseEventMessage<T> {
4543
event: string
4644
data: T
4745
}
4846

49-
export type BaseEventHandler<T = Record<string, unknown>> = (message: BaseEventMessage<T>) => Promise<ConsumerStatus>
47+
export type BaseEventMessageHandler<T> = (data: T) => Promise<boolean>
5048

51-
export type BaseEventMessageHandler<T = Record<string, unknown>> = (data: T) => Promise<boolean>
49+
export type BaseEventMap<T extends Record<string, any>> = {
50+
[K in T as K['event']]: K
51+
}
52+
53+
export type BaseEventMessageHandlerMap<T extends BaseEventMap<any>> = {
54+
[K in keyof T]: BaseEventMessageHandler<T[K]['data']>
55+
}
5256

53-
export type BaseEventHandlerMap<T = Record<string, unknown>> = Record<string, BaseEventMessageHandler<T>>
57+
export type DistributePick<T, K extends keyof any> = T extends any ? Pick<T, Extract<keyof T, K>> : never
5458

5559
export type Queue = MethodParams[Cmd.QueueDeclare]
5660
export type QueuesList = Record<string, Queue>

0 commit comments

Comments
 (0)