-
Notifications
You must be signed in to change notification settings - Fork 38
Expand file tree
/
Copy pathDynamoDBClientState.ts
More file actions
124 lines (109 loc) · 3.35 KB
/
DynamoDBClientState.ts
File metadata and controls
124 lines (109 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import {
AttributeValue,
BatchWriteItemCommand,
ConditionalCheckFailedException,
DynamoDBClient,
GetItemCommand,
paginateQuery,
PutItemCommand,
PutItemCommandInput,
QueryCommandInput,
WriteRequest,
} from '@aws-sdk/client-dynamodb';
import LDDynamoDBOptions from './LDDynamoDBOptions';
// Unlike some other database integrations where the key prefix is mandatory and has
// a default value, in DynamoDB it is fine to not have a prefix. If there is one, we
// prepend it to keys with a ':' separator.
const DEFAULT_PREFIX = '';
// BatchWrite can only accept 25 items at a time, so split up the writes into batches of 25.
const WRITE_BATCH_SIZE = 25;
/**
* Class for managing the state of a dynamodb client.
*
* Used for the dynamodb persistent store as well as the dynamodb big segment store.
*
* @internal
*/
export default class DynamoDBClientState {
// This will include the ':' if a prefix is set.
private _prefix: string;
private _client: DynamoDBClient;
private _owned: boolean;
constructor(options?: LDDynamoDBOptions) {
this._prefix = options?.prefix ? `${options.prefix}:` : DEFAULT_PREFIX;
// We track if we own the client so that we can destroy clients that we own.
if (options?.dynamoDBClient) {
this._client = options.dynamoDBClient;
this._owned = false;
} else if (options?.clientOptions) {
this._client = new DynamoDBClient(options.clientOptions);
this._owned = true;
} else {
this._client = new DynamoDBClient({});
this._owned = true;
}
}
/**
* Get a key with prefix prepended.
* @param key The key to prefix.
* @returns The prefixed key.
*/
prefixedKey(key: string): string {
return `${this._prefix}${key}`;
}
async query(params: QueryCommandInput): Promise<Record<string, AttributeValue>[]> {
const records: Record<string, AttributeValue>[] = [];
// Using a generator here is a substantial ergonomic improvement.
for await (const page of paginateQuery({ client: this._client }, params)) {
if (page.Items) {
records.push(...page.Items);
}
}
return records;
}
async batchWrite(table: string, params: WriteRequest[]) {
const batches: WriteRequest[][] = [];
// Split into batches of at most 25 commands.
for (let i = 0; i < params.length; i += WRITE_BATCH_SIZE) {
batches.push(params.slice(i, i + WRITE_BATCH_SIZE));
}
// Execute all the batches and wait for them to complete.
await Promise.all(
batches.map((batch) =>
this._client.send(
new BatchWriteItemCommand({
RequestItems: { [table]: batch },
}),
),
),
);
}
async get(
table: string,
key: Record<string, AttributeValue>,
): Promise<Record<string, AttributeValue> | undefined> {
const res = await this._client.send(
new GetItemCommand({
TableName: table,
Key: key,
}),
);
return res.Item;
}
async put(params: PutItemCommandInput): Promise<void> {
try {
await this._client.send(new PutItemCommand(params));
} catch (err) {
// If we couldn't upsert because of the version, then that is fine.
// Otherwise we return failure.
if (!(err instanceof ConditionalCheckFailedException)) {
throw err;
}
}
}
close() {
if (this._owned) {
this._client.destroy();
}
}
}