Skip to content

Commit 28dfa23

Browse files
CCM-12392: Allocation idempotency
1 parent 31deef7 commit 28dfa23

9 files changed

Lines changed: 202 additions & 127 deletions

File tree

infrastructure/terraform/components/api/module_lambda_supplier_allocator.tf

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ module "supplier_allocator" {
3535
log_subscription_role_arn = local.acct.log_subscription_role_arn
3636

3737
lambda_env_vars = merge(local.common_lambda_env_vars, {
38-
UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url
38+
UPSERT_LETTERS_QUEUE_URL = module.sqs_letter_updates.sqs_queue_url,
39+
IDEMPOTENCY_TABLE_NAME = aws_dynamodb_table.idempotency.name
3940
})
4041
}
4142

@@ -110,6 +111,7 @@ data "aws_iam_policy_document" "supplier_allocator_lambda" {
110111

111112
resources = [
112113
aws_dynamodb_table.supplier-quotas.arn,
114+
aws_dynamodb_table.idempotency.arn,
113115
"${aws_dynamodb_table.supplier-quotas.arn}/index/*"
114116
]
115117
}

lambdas/supplier-allocator/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"dependencies": {
3+
"@aws-lambda-powertools/idempotency": "^2.33.0",
34
"@aws-sdk/client-dynamodb": "^3.858.0",
45
"@aws-sdk/client-sqs": "^3.984.0",
56
"@aws-sdk/lib-dynamodb": "^3.1008.0",

lambdas/supplier-allocator/src/config/__tests__/deps.test.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ describe("createDependenciesContainer", () => {
44
const env = {
55
SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable",
66
SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable",
7+
IDEMPOTENCY_TABLE_NAME: "IdempotencyTable",
78
};
89

910
beforeEach(() => {
@@ -27,6 +28,10 @@ describe("createDependenciesContainer", () => {
2728
SupplierQuotasRepository: jest.fn(),
2829
}));
2930

31+
jest.mock("@aws-lambda-powertools/idempotency/dynamodb", () => ({
32+
DynamoDBPersistenceLayer: jest.fn(),
33+
}));
34+
3035
// Env
3136
jest.mock("../env", () => ({ envVars: env }));
3237
});
@@ -40,6 +45,9 @@ describe("createDependenciesContainer", () => {
4045
const { SupplierQuotasRepository } = jest.requireMock(
4146
"@internal/datastore",
4247
);
48+
const { DynamoDBPersistenceLayer } = jest.requireMock(
49+
"@aws-lambda-powertools/idempotency/dynamodb",
50+
);
4351
// eslint-disable-next-line @typescript-eslint/no-require-imports
4452
const { createDependenciesContainer } = require("../deps");
4553
const deps: Deps = createDependenciesContainer();
@@ -54,6 +62,11 @@ describe("createDependenciesContainer", () => {
5462
expect(supplierQuotasRepoCtorArgs[1]).toEqual({
5563
supplierQuotasTableName: "SupplierQuotasTable",
5664
});
65+
expect(DynamoDBPersistenceLayer).toHaveBeenCalledTimes(1);
66+
const idempotencyLayerCtorArgs = DynamoDBPersistenceLayer.mock.calls[0][0];
67+
expect(idempotencyLayerCtorArgs).toEqual({
68+
tableName: "IdempotencyTable",
69+
});
5770
expect(deps.env).toEqual(env);
5871
});
5972
});

lambdas/supplier-allocator/src/config/__tests__/env.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ describe("lambdaEnv", () => {
1616
it("should load all environment variables successfully", () => {
1717
process.env.SUPPLIER_CONFIG_TABLE_NAME = "SupplierConfigTable";
1818
process.env.SUPPLIER_QUOTAS_TABLE_NAME = "SupplierQuotasTable";
19+
process.env.IDEMPOTENCY_TABLE_NAME = "IdempotencyTable";
1920

2021
const { envVars } = require("../env");
2122

2223
expect(envVars).toEqual({
2324
SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable",
2425
SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable",
26+
IDEMPOTENCY_TABLE_NAME: "IdempotencyTable",
2527
});
2628
});
2729
});

lambdas/supplier-allocator/src/config/deps.ts

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
22
import { DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb";
3+
import { DynamoDBPersistenceLayer } from "@aws-lambda-powertools/idempotency/dynamodb";
34
import { SQSClient } from "@aws-sdk/client-sqs";
45
import { Logger } from "pino";
56
import { createLogger } from "@internal/helpers";
@@ -12,6 +13,7 @@ import { EnvVars, envVars } from "./env";
1213
export type Deps = {
1314
supplierConfigRepo: SupplierConfigRepository;
1415
supplierQuotasRepo: SupplierQuotasRepository;
16+
idempotencyLayer: DynamoDBPersistenceLayer;
1517
logger: Logger;
1618
env: EnvVars;
1719
sqsClient: SQSClient;
@@ -22,36 +24,35 @@ function createDocumentClient(): DynamoDBDocumentClient {
2224
return DynamoDBDocumentClient.from(ddbClient);
2325
}
2426

25-
function createSupplierConfigRepository(
26-
log: Logger,
27-
// eslint-disable-next-line @typescript-eslint/no-shadow
28-
envVars: EnvVars,
29-
): SupplierConfigRepository {
27+
function createSupplierConfigRepository(): SupplierConfigRepository {
3028
const config = {
3129
supplierConfigTableName: envVars.SUPPLIER_CONFIG_TABLE_NAME,
3230
};
3331

3432
return new SupplierConfigRepository(createDocumentClient(), config);
3533
}
3634

37-
function createSupplierQuotasRepository(
38-
log: Logger,
39-
// eslint-disable-next-line @typescript-eslint/no-shadow
40-
envVars: EnvVars,
41-
): SupplierQuotasRepository {
35+
function createSupplierQuotasRepository(): SupplierQuotasRepository {
4236
const config = {
4337
supplierQuotasTableName: envVars.SUPPLIER_QUOTAS_TABLE_NAME,
4438
};
4539

4640
return new SupplierQuotasRepository(createDocumentClient(), config);
4741
}
4842

43+
function createIdempotencyLayer(): DynamoDBPersistenceLayer {
44+
return new DynamoDBPersistenceLayer({
45+
tableName: envVars.IDEMPOTENCY_TABLE_NAME,
46+
});
47+
}
48+
4949
export function createDependenciesContainer(): Deps {
5050
const log = createLogger({ logLevel: envVars.PINO_LOG_LEVEL });
5151

5252
return {
53-
supplierConfigRepo: createSupplierConfigRepository(log, envVars),
54-
supplierQuotasRepo: createSupplierQuotasRepository(log, envVars),
53+
supplierConfigRepo: createSupplierConfigRepository(),
54+
supplierQuotasRepo: createSupplierQuotasRepository(),
55+
idempotencyLayer: createIdempotencyLayer(),
5556
logger: log,
5657
env: envVars,
5758
sqsClient: new SQSClient({}),

lambdas/supplier-allocator/src/config/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ const EnvVarsSchema = z.object({
44
SUPPLIER_CONFIG_TABLE_NAME: z.string(),
55
SUPPLIER_QUOTAS_TABLE_NAME: z.string(),
66
PINO_LOG_LEVEL: z.coerce.string().optional(),
7+
IDEMPOTENCY_TABLE_NAME: z.string(),
78
});
89

910
export type EnvVars = z.infer<typeof EnvVarsSchema>;

lambdas/supplier-allocator/src/handler/__tests__/allocate-handler.test.ts

Lines changed: 59 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,11 @@ import {
77
$LetterStatusChangeEvent,
88
LetterStatusChangeEvent,
99
} from "@nhsdigital/nhs-notify-event-schemas-supplier-api/src/events/letter-events";
10-
import {
11-
SupplierConfigRepository,
12-
SupplierQuotasRepository,
13-
} from "@internal/datastore";
10+
import { makeIdempotent } from "@aws-lambda-powertools/idempotency";
1411
import createSupplierAllocatorHandler from "../allocate-handler";
1512
import * as supplierConfig from "../../services/supplier-config";
1613
import * as supplierQuotas from "../../services/supplier-quotas";
1714
import * as allocationConfig from "../allocation-config";
18-
1915
import { Deps } from "../../config/deps";
2016
import packageJson from "../../../package.json";
2117

@@ -28,6 +24,14 @@ jest.mock("../../services/supplier-config");
2824
jest.mock("../../services/supplier-quotas");
2925
jest.mock("../allocation-config");
3026

27+
jest.mock("@aws-lambda-powertools/idempotency", () => {
28+
const original = jest.requireActual("@aws-lambda-powertools/idempotency");
29+
return {
30+
...original,
31+
makeIdempotent: jest.fn((fn, _) => fn),
32+
};
33+
});
34+
3135
function createSQSEvent(records: SQSRecord[]): SQSEvent {
3236
return {
3337
Records: records,
@@ -185,16 +189,15 @@ function setupDefaultMocks() {
185189
}
186190

187191
describe("createSupplierAllocatorHandler", () => {
188-
let mockSqsClient: jest.Mocked<SQSClient>;
189-
let mockedDeps: jest.Mocked<Deps>;
190-
let mockedSupplierConfigRepo: jest.Mocked<SupplierConfigRepository>;
191-
let mockedSupplierQuotasRepo: jest.Mocked<SupplierQuotasRepository>;
192-
beforeEach(() => {
193-
mockSqsClient = {
194-
send: jest.fn(),
195-
} as unknown as jest.Mocked<SQSClient>;
196-
197-
mockedSupplierConfigRepo = {
192+
const mockedDeps: jest.Mocked<Deps> = {
193+
logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger,
194+
env: {
195+
SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable",
196+
SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable",
197+
IDEMPOTENCY_TABLE_NAME: "IdempotencyTable",
198+
},
199+
sqsClient: { send: jest.fn() } as unknown as SQSClient,
200+
supplierConfigRepo: {
198201
ddbClient: {} as any,
199202
config: {} as any,
200203
getLetterVariant: jest.fn(),
@@ -203,27 +206,18 @@ describe("createSupplierAllocatorHandler", () => {
203206
getSuppliersDetails: jest.fn(),
204207
getSupplierPacksForPackSpecification: jest.fn(),
205208
getPackSpecification: jest.fn(),
206-
};
207-
208-
mockedSupplierQuotasRepo = {
209+
},
210+
supplierQuotasRepo: {
209211
ddbClient: {} as any,
210212
config: {} as any,
211213
getOverallAllocation: jest.fn(),
212214
updateOverallAllocation: jest.fn(),
213215
getDailyAllocation: jest.fn(),
214216
updateDailyAllocation: jest.fn(),
215-
};
217+
},
218+
} as unknown as Deps;
216219

217-
mockedDeps = {
218-
logger: { error: jest.fn(), info: jest.fn() } as unknown as pino.Logger,
219-
env: {
220-
SUPPLIER_CONFIG_TABLE_NAME: "SupplierConfigTable",
221-
SUPPLIER_QUOTAS_TABLE_NAME: "SupplierQuotasTable",
222-
},
223-
sqsClient: mockSqsClient,
224-
supplierConfigRepo: mockedSupplierConfigRepo,
225-
supplierQuotasRepo: mockedSupplierQuotasRepo,
226-
};
220+
beforeEach(() => {
227221
jest.clearAllMocks();
228222
});
229223

@@ -244,8 +238,8 @@ describe("createSupplierAllocatorHandler", () => {
244238

245239
expect(result.batchItemFailures).toHaveLength(0);
246240

247-
expect(mockSqsClient.send).toHaveBeenCalledTimes(1);
248-
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
241+
expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1);
242+
const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0];
249243
expect(sendCall).toBeInstanceOf(SendMessageCommand);
250244

251245
const messageBody = JSON.parse(sendCall.input.MessageBody);
@@ -281,8 +275,8 @@ describe("createSupplierAllocatorHandler", () => {
281275

282276
expect(result.batchItemFailures).toHaveLength(0);
283277

284-
expect(mockSqsClient.send).toHaveBeenCalledTimes(1);
285-
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
278+
expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1);
279+
const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0];
286280
expect(sendCall).toBeInstanceOf(SendMessageCommand);
287281

288282
const messageBody = JSON.parse(sendCall.input.MessageBody);
@@ -315,8 +309,8 @@ describe("createSupplierAllocatorHandler", () => {
315309

316310
expect(result.batchItemFailures).toHaveLength(0);
317311

318-
expect(mockSqsClient.send).toHaveBeenCalledTimes(1);
319-
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
312+
expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1);
313+
const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0];
320314
const messageBody = JSON.parse(sendCall.input.MessageBody);
321315
expect(messageBody.allocationDetails.supplierSpec).toEqual({
322316
supplierId: "supplier1",
@@ -361,7 +355,7 @@ describe("createSupplierAllocatorHandler", () => {
361355
const handler = createSupplierAllocatorHandler(mockedDeps);
362356
await handler(evt, {} as any, {} as any);
363357

364-
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
358+
const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0];
365359
const messageBody = JSON.parse(sendCall.input.MessageBody);
366360
expect(messageBody.letterEvent.data.domainId).toBe("letter-test");
367361
});
@@ -410,7 +404,7 @@ describe("createSupplierAllocatorHandler", () => {
410404
if (!result) throw new Error("expected BatchResponse, got void");
411405

412406
expect(result.batchItemFailures).toHaveLength(0);
413-
expect(mockSqsClient.send).toHaveBeenCalledTimes(2);
407+
expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(2);
414408
});
415409

416410
test("returns batch failure for invalid JSON", async () => {
@@ -480,7 +474,7 @@ describe("createSupplierAllocatorHandler", () => {
480474
process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue";
481475

482476
const sqsError = new Error("SQS send failed");
483-
(mockSqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError);
477+
(mockedDeps.sqsClient.send as jest.Mock).mockRejectedValueOnce(sqsError);
484478

485479
const handler = createSupplierAllocatorHandler(mockedDeps);
486480
const result = await handler(evt, {} as any, {} as any);
@@ -513,7 +507,7 @@ describe("createSupplierAllocatorHandler", () => {
513507
expect(result.batchItemFailures).toHaveLength(1);
514508
expect(result.batchItemFailures[0].itemIdentifier).toBe("fail-msg");
515509

516-
expect(mockSqsClient.send).toHaveBeenCalledTimes(2);
510+
expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(2);
517511
});
518512

519513
test("sends correct queue URL in SQS message command", async () => {
@@ -529,7 +523,7 @@ describe("createSupplierAllocatorHandler", () => {
529523
const handler = createSupplierAllocatorHandler(mockedDeps);
530524
await handler(evt, {} as any, {} as any);
531525

532-
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
526+
const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0];
533527
expect(sendCall.input.QueueUrl).toBe(queueUrl);
534528
});
535529

@@ -557,8 +551,8 @@ describe("createSupplierAllocatorHandler", () => {
557551
variantId: "lv1",
558552
}),
559553
);
560-
expect(mockSqsClient.send).toHaveBeenCalledTimes(1);
561-
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
554+
expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1);
555+
const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0];
562556
expect(sendCall).toBeInstanceOf(SendMessageCommand);
563557

564558
const messageBody = JSON.parse(sendCall.input.MessageBody);
@@ -667,8 +661,9 @@ describe("createSupplierAllocatorHandler", () => {
667661
variantId: "lv1",
668662
}),
669663
);
670-
expect(mockSqsClient.send).toHaveBeenCalledTimes(1);
671-
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
664+
expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1);
665+
const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock
666+
.calls[0][0];
672667
expect(sendCall).toBeInstanceOf(SendMessageCommand);
673668

674669
const messageBody = JSON.parse(sendCall.input.MessageBody);
@@ -711,8 +706,8 @@ describe("createSupplierAllocatorHandler", () => {
711706
variantId: "lv1",
712707
}),
713708
);
714-
expect(mockSqsClient.send).toHaveBeenCalledTimes(1);
715-
const sendCall = (mockSqsClient.send as jest.Mock).mock.calls[0][0];
709+
expect(mockedDeps.sqsClient.send).toHaveBeenCalledTimes(1);
710+
const sendCall = (mockedDeps.sqsClient.send as jest.Mock).mock.calls[0][0];
716711
expect(sendCall).toBeInstanceOf(SendMessageCommand);
717712

718713
const messageBody = JSON.parse(sendCall.input.MessageBody);
@@ -771,4 +766,22 @@ describe("createSupplierAllocatorHandler", () => {
771766
expect(result.batchItemFailures).toHaveLength(0);
772767
expect(allocationConfig.selectSupplierByFactor).toHaveBeenCalledTimes(2);
773768
});
769+
770+
test("does not process a message more than once due to idempotency wrapper", async () => {
771+
const preparedEvent = createPreparedV2Event();
772+
const evt: SQSEvent = createSQSEvent([
773+
createSqsRecord("msg1", JSON.stringify(preparedEvent)),
774+
]);
775+
776+
process.env.UPSERT_LETTERS_QUEUE_URL = "https://sqs.test.queue";
777+
778+
setupDefaultMocks();
779+
(makeIdempotent as jest.Mock).mockImplementationOnce((_fn) => "supplier1");
780+
781+
const handler = createSupplierAllocatorHandler(mockedDeps);
782+
await handler(evt, {} as any, {} as any);
783+
784+
expect(makeIdempotent).toHaveBeenCalledTimes(1);
785+
expect(mockedDeps.sqsClient.send).not.toHaveBeenCalled();
786+
});
774787
});

0 commit comments

Comments
 (0)