-
Notifications
You must be signed in to change notification settings - Fork 105
Expand file tree
/
Copy pathsend-webhook-worker.ts
More file actions
109 lines (102 loc) · 3.37 KB
/
Copy pathsend-webhook-worker.ts
File metadata and controls
109 lines (102 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import type { Static } from "@sinclair/typebox";
import { Worker, type Job, type Processor } from "bullmq";
import superjson from "superjson";
import { TransactionDB } from "../../shared/db/transactions/db";
import {
WebhooksEventTypes,
type BackendWalletBalanceWebhookParams,
type WalletSubscriptionWebhookParams,
} from "../../shared/schemas/webhooks";
import { toEventLogSchema } from "../../server/schemas/event-log";
import {
toTransactionSchema,
type TransactionSchema,
} from "../../server/schemas/transaction";
import { toTransactionReceiptSchema } from "../../server/schemas/transaction-receipt";
import { logger } from "../../shared/utils/logger";
import { redis } from "../../shared/utils/redis/redis";
import {
sendWebhookRequest,
type WebhookResponse,
} from "../../shared/utils/webhook";
import {
SendWebhookQueue,
type WebhookJob,
} from "../queues/send-webhook-queue";
import { env } from "../../shared/utils/env";
const handler: Processor<string, void, string> = async (job: Job<string>) => {
const { data, webhook } = superjson.parse<WebhookJob>(job.data);
let resp: WebhookResponse | undefined;
switch (data.type) {
case WebhooksEventTypes.CONTRACT_SUBSCRIPTION: {
let webhookBody: {
type: "event-log" | "transaction-receipt";
data: unknown;
};
if (data.eventLog) {
webhookBody = {
type: "event-log",
data: toEventLogSchema(data.eventLog),
};
} else if (data.transactionReceipt) {
webhookBody = {
type: "transaction-receipt",
data: toTransactionReceiptSchema(data.transactionReceipt),
};
} else {
throw new Error(
'Missing "eventLog" or "transactionReceipt" for CONTRACT_SUBSCRIPTION webhook.',
);
}
resp = await sendWebhookRequest(webhook, webhookBody);
break;
}
case WebhooksEventTypes.SENT_TX:
case WebhooksEventTypes.MINED_TX:
case WebhooksEventTypes.ERRORED_TX:
case WebhooksEventTypes.CANCELLED_TX: {
const transaction = await TransactionDB.get(data.queueId);
if (!transaction) {
job.log("Transaction not found.");
return;
}
const webhookBody: Static<typeof TransactionSchema> =
toTransactionSchema(transaction);
resp = await sendWebhookRequest(webhook, webhookBody);
break;
}
case WebhooksEventTypes.BACKEND_WALLET_BALANCE: {
const webhookBody: BackendWalletBalanceWebhookParams = data.body;
resp = await sendWebhookRequest(webhook, webhookBody);
break;
}
case WebhooksEventTypes.WALLET_SUBSCRIPTION: {
const webhookBody: WalletSubscriptionWebhookParams = data.body;
resp = await sendWebhookRequest(
webhook,
webhookBody as unknown as Record<string, unknown>,
);
break;
}
}
// Throw on 5xx so it remains in the queue to retry later.
if (resp && resp.status >= 500) {
const error = new Error(
`Received status ${resp.status} from webhook ${webhook.url}.`,
);
job.log(error.message);
logger({
level: "debug",
message: error.message,
service: "worker",
});
throw error;
}
};
// Must be explicitly called for the worker to run on this host.
export const initSendWebhookWorker = () => {
new Worker(SendWebhookQueue.q.name, handler, {
concurrency: env.SEND_WEBHOOK_QUEUE_CONCURRENCY,
connection: redis,
});
};