-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathletter-operations.ts
More file actions
144 lines (127 loc) · 3.87 KB
/
Copy pathletter-operations.ts
File metadata and controls
144 lines (127 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import { LetterBase, LetterRepository } from "@internal/datastore";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
import { GetObjectCommand, S3Client } from "@aws-sdk/client-s3";
import { SendMessageBatchCommand } from "@aws-sdk/client-sqs";
import NotFoundError from "../errors/not-found-error";
import { UpdateLetterCommand } from "../contracts/letters";
import { ApiErrorDetail } from "../contracts/errors";
import { Deps } from "../config/deps";
function isNotFoundError(error: any) {
return (
error instanceof Error &&
/^Letter with id \w+ not found for supplier \w+$/.test(error.message)
);
}
async function getDownloadUrl(
s3Uri: string,
s3Client: S3Client,
expiry: number,
) {
const url = new URL(s3Uri); // works for s3:// URIs
const bucket = url.hostname;
const key = url.pathname.slice(1); // remove leading '/'
const command = new GetObjectCommand({
Bucket: bucket,
Key: key,
});
return getSignedUrl(s3Client, command, { expiresIn: expiry });
}
export const getLettersForSupplier = async (
supplierId: string,
status: string,
limit: number,
letterRepo: LetterRepository,
): Promise<LetterBase[]> => {
return letterRepo.getLettersBySupplier(supplierId, status, limit);
};
export const getLetterById = async (
supplierId: string,
letterId: string,
letterRepo: LetterRepository,
): Promise<LetterBase> => {
let letter;
try {
letter = await letterRepo.getLetterById(supplierId, letterId);
} catch (error) {
if (isNotFoundError(error)) {
throw new NotFoundError(ApiErrorDetail.NotFoundLetterId);
}
throw error;
}
return letter;
};
export const getLetterDataUrl = async (
supplierId: string,
letterId: string,
deps: Deps,
): Promise<string> => {
let letter;
try {
letter = await deps.letterRepo.getLetterById(supplierId, letterId);
return await getDownloadUrl(
letter.url,
deps.s3Client,
deps.env.DOWNLOAD_URL_TTL_SECONDS,
);
} catch (error) {
if (isNotFoundError(error)) {
throw new NotFoundError(ApiErrorDetail.NotFoundLetterId);
}
throw error;
}
};
function chunk(
arr: UpdateLetterCommand[],
size: number,
): UpdateLetterCommand[][] {
const chunks: UpdateLetterCommand[][] = [];
for (let i = 0; i < arr.length; i += size)
chunks.push(arr.slice(i, i + size));
return chunks;
}
export async function enqueueLetterUpdateRequests(
updateLetterCommands: UpdateLetterCommand[],
correlationId: string,
deps: Deps,
) {
const BATCH_SIZE = 10; // SQS SendMessageBatch max
const CONCURRENCY = 5; // number of parallel batch API calls
const batches = chunk(updateLetterCommands, BATCH_SIZE);
// send batches in groups with limited concurrency
// BATCH_SIZE * CONCURRENCY is the number of total updates / db calls in-flight
for (let i = 0; i < batches.length; i += CONCURRENCY) {
const window = batches.slice(i, i + CONCURRENCY);
await Promise.all(
window.map(async (batch, batchIdx) => {
const entries = batch.map((request, idx) => ({
Id: `${i + batchIdx}-${idx}`, // unique per batch entry
MessageBody: JSON.stringify(request),
MessageAttributes: {
CorrelationId: { DataType: "String", StringValue: correlationId },
},
}));
const cmd = new SendMessageBatchCommand({
QueueUrl: deps.env.QUEUE_URL,
Entries: entries,
});
try {
const result = await deps.sqsClient.send(cmd);
if (result.Failed && result.Failed.length > 0) {
deps.logger.error(
{ failed: result.Failed },
"Some batch entries failed",
);
}
} catch (error) {
deps.logger.error(
{
err: error,
correlationId,
},
"Error enqueuing letter status updates",
);
}
}),
);
}
}