Skip to content

Commit e5ae665

Browse files
committed
Add upload deduping to avoid race. Uploads are idempotent but races could still occur and waste time/bandwidth.
1 parent cfd9c1f commit e5ae665

2 files changed

Lines changed: 35 additions & 6 deletions

File tree

contrib/external-storage-s3/src/__tests__/test-driver.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,18 @@ test('identical payloads in the same scope deduplicate to one upload', async (t)
136136
t.is(client.putCount, 1);
137137
});
138138

139+
test('concurrent identical payloads in one batch upload once', async (t) => {
140+
const client = new FakeS3Client();
141+
const driver = new S3StorageDriver({ client, bucket: 'b' });
142+
143+
const [first, second] = await driver.store(workflowContext, [makePayload('"hello"'), makePayload('"hello"')]);
144+
assert(first);
145+
assert(second);
146+
147+
t.is(client.putCount, 1);
148+
t.is(first.claimData.key, second.claimData.key);
149+
});
150+
139151
test('retrieve rejects when stored bytes fail the integrity check', async (t) => {
140152
const client = new FakeS3Client();
141153
const driver = new S3StorageDriver({ client, bucket: 'b' });

contrib/external-storage-s3/src/driver.ts

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,10 @@ export class S3StorageDriver implements StorageDriver {
142142

143143
async store(context: StorageDriverStoreContext, payloads: Payload[]): Promise<StorageDriverClaim[]> {
144144
const contextSegments = buildContextSegments(context.target);
145-
return runAllWithAbortOnError(context.abortSignal, (signal) =>
146-
payloads.map((payload) => this.storePayload(context, payload, contextSegments, signal))
147-
);
145+
return runAllWithAbortOnError(context.abortSignal, (signal) => {
146+
const uploads = new Map<string, Promise<void>>();
147+
return payloads.map((payload) => this.storePayload(context, payload, contextSegments, signal, uploads));
148+
});
148149
}
149150

150151
async retrieve(context: StorageDriverRetrieveContext, claims: StorageDriverClaim[]): Promise<Payload[]> {
@@ -157,7 +158,8 @@ export class S3StorageDriver implements StorageDriver {
157158
context: StorageDriverStoreContext,
158159
payload: Payload,
159160
contextSegments: string,
160-
abortSignal: AbortSignal
161+
abortSignal: AbortSignal,
162+
uploads: Map<string, Promise<void>>
161163
): Promise<StorageDriverClaim> {
162164
const bucket = this.bucket(context, payload);
163165

@@ -172,9 +174,13 @@ export class S3StorageDriver implements StorageDriver {
172174
const key = `v0${contextSegments}/d/sha256/${hashValue}`;
173175

174176
try {
175-
if (!(await this.client.objectExists(bucket, key, { abortSignal }))) {
176-
await this.client.putObject(bucket, key, payloadBytes, { abortSignal });
177+
const dedupeKey = `${bucket} ${key}`;
178+
let upload = uploads.get(dedupeKey);
179+
if (!upload) {
180+
upload = this.uploadIfAbsent(bucket, key, payloadBytes, abortSignal);
181+
uploads.set(dedupeKey, upload);
177182
}
183+
await upload;
178184
} catch (err) {
179185
throw new Error(
180186
`S3StorageDriver store failed [bucket=${bucket}, key=${key}${formatClientContext(this.client)}]`,
@@ -185,6 +191,17 @@ export class S3StorageDriver implements StorageDriver {
185191
return new StorageDriverClaim({ bucket, key, hashAlgorithm: 'sha256', hashValue });
186192
}
187193

194+
private async uploadIfAbsent(
195+
bucket: string,
196+
key: string,
197+
payloadBytes: Uint8Array,
198+
abortSignal: AbortSignal
199+
): Promise<void> {
200+
if (!(await this.client.objectExists(bucket, key, { abortSignal }))) {
201+
await this.client.putObject(bucket, key, payloadBytes, { abortSignal });
202+
}
203+
}
204+
188205
private async retrievePayload(claim: StorageDriverClaim, abortSignal: AbortSignal): Promise<Payload> {
189206
const { bucket, key, hashAlgorithm, hashValue: expectedHash } = claim.claimData;
190207
if (!bucket || !key) {

0 commit comments

Comments
 (0)