Skip to content

Commit 1a5c7fb

Browse files
authored
fix: prevent concurrent publish from producing empty files (#428)
## Summary Fixes a bug where two concurrent `mops publish` sessions for the same `package@version` could produce empty files in storage. This happened with `core@2.3.0` where 3 files (`src/Queue.mo`, `src/Result.mo`, `src/Runtime.mo`) were stored with empty chunk data. **Root cause**: the storage canister's `startUpload` overwrites active uploads via `put()`, so a second session's `startUpload` resets chunks the first session had already filled. `finishUploads` then committed the empty chunks. **Fix (defense in depth):** - **Storage canister**: validate all chunks are non-empty in `finishUploads` before committing to permanent storage - **CLI `parallel.ts`**: propagate errors from concurrent tasks instead of silently swallowing rejections (and keep `busyThreads` balanced on failure) ## Test plan - [x] New test file `storage-publish-bug.test.mo` covers: - Idempotent `startUpload` reset + `finishUploads` catching empty chunks - Successful retry after reset with fresh data - Partial chunk upload rejection - Normal multi-chunk upload regression - `chunkCount=0` (legitimately empty files) regression - [x] Updated existing `storage.test.mo` and `storage-actor.test.mo` to upload a chunk before `finishUploads` (required by new validation) - [x] All 17 test files pass locally; `npm run lint` + `npm run format:check` pass
1 parent 19e38c1 commit 1a5c7fb

7 files changed

Lines changed: 230 additions & 8 deletions

File tree

backend/storage/storage-canister.mo

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,12 @@ shared ({ caller = parent }) persistent actor class Storage() {
9494
if (Option.isNull(activeUploadsMeta.get(fileId))) {
9595
return #err("File '" # fileId # "' is not uploading");
9696
};
97-
if (Option.isNull(activeUploadsChunks.get(fileId))) {
98-
return #err("File '" # fileId # "' is not uploading");
97+
let ?chunks = activeUploadsChunks.get(fileId) else return #err("File '" # fileId # "' is not uploading");
98+
99+
for (i in chunks.keys()) {
100+
if (chunks[i].size() == 0) {
101+
return #err("File '" # fileId # "' has empty chunk at index " # Nat.toText(i));
102+
};
99103
};
100104
};
101105

cli/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
## Next
44
- Support `MOPS_REGISTRY_HOST` and `MOPS_REGISTRY_CANISTER_ID` environment variables for custom registry endpoints
55
- Fix `mops build` crashing with `__wbindgen_malloc` error in bundled CLI distribution
6+
- Fix `parallel()` swallowing errors from concurrent tasks (e.g. `mops publish` uploads), which could hang or leave failures unreported
67

78
## 2.4.0
89
- Support `[build].outputDir` config in `mops.toml` for custom build output directory

cli/parallel.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@ export async function parallel<T>(
33
items: T[],
44
fn: (item: T) => Promise<void>,
55
) {
6-
return new Promise<void>((resolve) => {
6+
return new Promise<void>((resolve, reject) => {
77
let busyThreads = 0;
8+
let failed = false;
89
items = items.slice();
910

1011
let loop = () => {
12+
if (failed) {
13+
return;
14+
}
1115
if (!items.length) {
1216
if (busyThreads === 0) {
1317
resolve();
@@ -18,10 +22,17 @@ export async function parallel<T>(
1822
return;
1923
}
2024
busyThreads++;
21-
fn(items.shift() as T).then(() => {
22-
busyThreads--;
23-
loop();
24-
});
25+
fn(items.shift() as T).then(
26+
() => {
27+
busyThreads--;
28+
loop();
29+
},
30+
(err) => {
31+
busyThreads--;
32+
failed = true;
33+
reject(err);
34+
},
35+
);
2536
loop();
2637
};
2738
loop();

cli/tests/build.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { describe, expect, test } from "@jest/globals";
1+
import { describe, expect, jest, test } from "@jest/globals";
22
import { execa } from "execa";
33
import { existsSync, rmSync } from "node:fs";
44
import path from "path";
@@ -14,6 +14,9 @@ function cleanFixture(cwd: string, ...extras: string[]) {
1414
}
1515

1616
describe("build", () => {
17+
// Several dfx/pocket-ic builds per test; slow CI (e.g. node 20 matrix) can exceed 60s default.
18+
jest.setTimeout(120_000);
19+
1720
test("ok", async () => {
1821
const cwd = path.join(import.meta.dirname, "build/success");
1922
try {

test/storage-actor.test.mo

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,13 @@ actor {
4141
},
4242
);
4343

44+
await test(
45+
"upload chunk",
46+
func() : async () {
47+
assert Result.isOk(await storage.uploadChunk(fileId, 0, Blob.fromArray([1, 2, 3])));
48+
},
49+
);
50+
4451
await test(
4552
"try to finish upload with unknown file id",
4653
func() : async () {

test/storage-publish-bug.test.mo

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
import Result "mo:base/Result";
2+
import Blob "mo:base/Blob";
3+
import { test; suite } "mo:test/async";
4+
5+
import Storage "../backend/storage/storage-canister";
6+
7+
var storage = await Storage.Storage();
8+
9+
// PRECONDITION: startUpload is idempotent — resets active upload (prevents stale data)
10+
await suite(
11+
"PRECONDITION: startUpload reset + finishUploads rejects empty chunks",
12+
func() : async () {
13+
let fileId = "core@2.3.0/src/Runtime.mo";
14+
let realData = Blob.fromArray([1, 2, 3, 4, 5, 6, 7, 8]);
15+
16+
await test(
17+
"start upload and fill chunk",
18+
func() : async () {
19+
assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Runtime.mo"; chunkCount = 1; owners = [] }));
20+
assert Result.isOk(await storage.uploadChunk(fileId, 0, realData));
21+
},
22+
);
23+
24+
await test(
25+
"second startUpload resets the active upload (idempotent)",
26+
func() : async () {
27+
assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Runtime.mo"; chunkCount = 1; owners = [] }));
28+
},
29+
);
30+
31+
await test(
32+
"finishUploads rejects because reset cleared the chunk data",
33+
func() : async () {
34+
assert Result.isErr(await storage.finishUploads([fileId]));
35+
},
36+
);
37+
},
38+
);
39+
40+
// PRECONDITION: idempotent startUpload allows a clean retry to succeed
41+
await suite(
42+
"PRECONDITION: retry after reset succeeds with fresh data",
43+
func() : async () {
44+
storage := await Storage.Storage();
45+
let fileId = "core@2.3.0/src/Runtime.mo";
46+
let staleData = Blob.fromArray([1, 2, 3]);
47+
let freshData = Blob.fromArray([10, 20, 30, 40, 50]);
48+
49+
await test(
50+
"stale session: start and upload",
51+
func() : async () {
52+
assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Runtime.mo"; chunkCount = 1; owners = [] }));
53+
assert Result.isOk(await storage.uploadChunk(fileId, 0, staleData));
54+
},
55+
);
56+
57+
await test(
58+
"retry session: startUpload resets, then upload fresh data",
59+
func() : async () {
60+
assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Runtime.mo"; chunkCount = 1; owners = [] }));
61+
assert Result.isOk(await storage.uploadChunk(fileId, 0, freshData));
62+
},
63+
);
64+
65+
await test(
66+
"finishUploads succeeds with fresh data",
67+
func() : async () {
68+
assert Result.isOk(await storage.finishUploads([fileId]));
69+
},
70+
);
71+
72+
await test(
73+
"downloaded file has the fresh data",
74+
func() : async () {
75+
let chunkRes = await storage.downloadChunk(fileId, 0);
76+
switch (chunkRes) {
77+
case (#ok(chunk)) {
78+
assert chunk == freshData;
79+
};
80+
case (#err(_)) {
81+
assert false;
82+
};
83+
};
84+
},
85+
);
86+
},
87+
);
88+
89+
// FIX VERIFICATION: finishUploads rejects files with empty chunks
90+
await suite(
91+
"FIX: finishUploads rejects empty chunks",
92+
func() : async () {
93+
storage := await Storage.Storage();
94+
let fileId = "pkg@1.0.0/src/Lib.mo";
95+
96+
await test(
97+
"start upload with chunkCount=2 but upload only chunk 0",
98+
func() : async () {
99+
assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Lib.mo"; chunkCount = 2; owners = [] }));
100+
assert Result.isOk(await storage.uploadChunk(fileId, 0, Blob.fromArray([10, 20, 30])));
101+
},
102+
);
103+
104+
await test(
105+
"finishUploads is rejected because chunk 1 was never uploaded",
106+
func() : async () {
107+
let res = await storage.finishUploads([fileId]);
108+
assert Result.isErr(res);
109+
},
110+
);
111+
},
112+
);
113+
114+
// REGRESSION: normal upload flow still works
115+
await suite(
116+
"REGRESSION: normal upload flow",
117+
func() : async () {
118+
storage := await Storage.Storage();
119+
let fileId = "pkg@2.0.0/src/Main.mo";
120+
let data1 = Blob.fromArray([10, 20, 30, 40, 50]);
121+
let data2 = Blob.fromArray([60, 70, 80]);
122+
123+
await test(
124+
"upload file with 2 chunks",
125+
func() : async () {
126+
assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Main.mo"; chunkCount = 2; owners = [] }));
127+
assert Result.isOk(await storage.uploadChunk(fileId, 0, data1));
128+
assert Result.isOk(await storage.uploadChunk(fileId, 1, data2));
129+
},
130+
);
131+
132+
await test(
133+
"finishUploads succeeds when all chunks are present",
134+
func() : async () {
135+
assert Result.isOk(await storage.finishUploads([fileId]));
136+
},
137+
);
138+
139+
await test(
140+
"downloaded chunks have correct data",
141+
func() : async () {
142+
let c0 = await storage.downloadChunk(fileId, 0);
143+
switch (c0) {
144+
case (#ok(chunk)) { assert chunk == data1 };
145+
case (#err(_)) { assert false };
146+
};
147+
let c1 = await storage.downloadChunk(fileId, 1);
148+
switch (c1) {
149+
case (#ok(chunk)) { assert chunk == data2 };
150+
case (#err(_)) { assert false };
151+
};
152+
},
153+
);
154+
},
155+
);
156+
157+
// REGRESSION: empty files (chunkCount=0) still work
158+
await suite(
159+
"REGRESSION: empty files (chunkCount=0) are allowed",
160+
func() : async () {
161+
storage := await Storage.Storage();
162+
let fileId = "pkg@3.0.0/src/Empty.mo";
163+
164+
await test(
165+
"start upload with chunkCount=0",
166+
func() : async () {
167+
assert Result.isOk(await storage.startUpload({ id = fileId; path = "src/Empty.mo"; chunkCount = 0; owners = [] }));
168+
},
169+
);
170+
171+
await test(
172+
"finishUploads succeeds for empty files",
173+
func() : async () {
174+
assert Result.isOk(await storage.finishUploads([fileId]));
175+
},
176+
);
177+
178+
await test(
179+
"file meta reports 0 chunks",
180+
func() : async () {
181+
let res = await storage.getFileMeta(fileId);
182+
switch (res) {
183+
case (#ok(meta)) { assert meta.chunkCount == 0 };
184+
case (#err(_)) { assert false };
185+
};
186+
},
187+
);
188+
},
189+
);

test/storage.test.mo

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ await suite(
3333
},
3434
);
3535

36+
await test(
37+
"upload chunk",
38+
func() : async () {
39+
assert Result.isOk(await storage.uploadChunk(fileId, 0, Blob.fromArray([1, 2, 3])));
40+
},
41+
);
42+
3643
await test(
3744
"try to finish upload with unknown file id",
3845
func() : async () {

0 commit comments

Comments
 (0)