-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathupdate-letter-queue.ts
More file actions
149 lines (133 loc) · 4.28 KB
/
Copy pathupdate-letter-queue.ts
File metadata and controls
149 lines (133 loc) · 4.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import {
DynamoDBRecord,
Handler,
KinesisStreamEvent,
KinesisStreamRecord,
} from "aws-lambda";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import { Unit } from "aws-embedded-metrics";
import { MetricStatus, buildEMFObject } from "@internal/helpers";
import {
InsertPendingLetter,
Letter,
LetterAlreadyExistsError,
LetterSchema,
} from "@internal/datastore";
import { Deps } from "./deps";
export default function createHandler(deps: Deps): Handler<KinesisStreamEvent> {
return async (streamEvent: KinesisStreamEvent) => {
let successCount = 0;
deps.logger.info({ description: "Received event", streamEvent });
deps.logger.info({
description: "Number of records",
count: streamEvent.Records?.length || 0,
});
for (const record of streamEvent.Records) {
const ddbRecord = extractPayload(record, deps);
if (isNewPendingLetter(ddbRecord)) {
const letter = extractNewLetter(ddbRecord);
const pendingLetter = mapLetterToPendingLetter(letter);
try {
deps.logger.info({
description: "Persisting pending letter",
pendingLetter,
});
await deps.letterQueueRepository.putLetter(pendingLetter);
successCount += 1;
} catch (error) {
if (error instanceof LetterAlreadyExistsError) {
deps.logger.warn({
description: "Letter already exists",
supplierId: pendingLetter.supplierId,
letterId: pendingLetter.letterId,
});
} else {
deps.logger.error({
description: "Error persisting pending letter",
error,
pendingLetter,
});
recordProcessing(deps, successCount, 1);
// If we get a failure, return immediately without processing the remaining records. Since we are
// working with a Kinesis stream, AWS will retry from the point of failure and no records will be lost.
// See https://docs.aws.amazon.com/lambda/latest/dg/example_serverless_Kinesis_Lambda_batch_item_failures_section.html
return {
batchItemFailures: [
{ itemIdentifier: record.kinesis.sequenceNumber },
],
};
}
}
}
}
recordProcessing(deps, successCount, 0);
return { batchItemFailures: [] };
};
}
function recordProcessing(
deps: Deps,
successCount: number,
failureCount: number,
) {
deps.logger.info({
description: "Processing complete",
successCount,
failureCount,
totalProcessed: successCount + failureCount,
});
deps.logger.info(buildMetric(MetricStatus.Success, successCount));
deps.logger.info(buildMetric(MetricStatus.Failure, failureCount));
}
function isNewPendingLetter(record: DynamoDBRecord): boolean {
const isInsert = record.eventName === "INSERT";
const newImage = record.dynamodb?.NewImage;
const isPending = newImage?.status?.S === "PENDING";
return isInsert && isPending;
}
function extractPayload(
record: KinesisStreamRecord,
deps: Deps,
): DynamoDBRecord {
try {
deps.logger.info({
description: "Processing Kinesis record",
recordId: record.kinesis.sequenceNumber,
});
// Kinesis data is base64 encoded
const payload = Buffer.from(record.kinesis.data, "base64").toString("utf8");
deps.logger.info({ description: "Decoded payload", payload });
const jsonParsed = JSON.parse(payload);
deps.logger.info({ description: "Extracted dynamoDBRecord", jsonParsed });
return jsonParsed;
} catch (error) {
deps.logger.error({
description: "Error extracting payload",
err: error,
eventId: record.eventID,
});
throw error;
}
}
function extractNewLetter(record: DynamoDBRecord): Letter {
const newImage = record.dynamodb?.NewImage!;
return LetterSchema.parse(unmarshall(newImage as any));
}
function mapLetterToPendingLetter(letter: Letter): InsertPendingLetter {
return {
supplierId: letter.supplierId,
letterId: letter.id,
specificationId: letter.specificationId,
groupId: letter.groupId,
};
}
function buildMetric(status: MetricStatus, count: number) {
return buildEMFObject(
"update-letter-queue",
{},
{
key: status,
value: count,
unit: Unit.Count,
},
);
}