Skip to content

Commit e349fe0

Browse files
authored
Pipelines: R2 Data Catalog sinks enforce minimum write interval of 60 seconds (#13951)
1 parent 62abf97 commit e349fe0

5 files changed

Lines changed: 63 additions & 5 deletions

File tree

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
---
2+
"wrangler": patch
3+
---
4+
5+
Enforce minimum 60 second interval for R2 Data Catalog sinks
6+
7+
R2 Data Catalog sinks now require a minimum `--roll-interval` of 60 seconds to prevent compaction issues in the R2 Data Catalog. This validation is applied when creating sinks via `wrangler pipelines sinks create` with type `r2-data-catalog`, and during the interactive `wrangler pipelines setup` flow.
8+
9+
Regular R2 sinks are not affected and can still use intervals as low as 10 seconds.

packages/wrangler/src/__tests__/pipelines.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,6 +1720,34 @@ describe("wrangler pipelines", () => {
17201720
`[Error: --namespace is required for r2-data-catalog sinks]`
17211721
);
17221722
});
1723+
1724+
it("should error when r2-data-catalog has interval below minimum", async ({
1725+
expect,
1726+
}) => {
1727+
await expect(
1728+
runWrangler(
1729+
"pipelines sinks create my_sink --type r2-data-catalog --bucket catalog-bucket --namespace default --table my-table --catalog-token token123 --roll-interval 30"
1730+
)
1731+
).rejects.toThrowErrorMatchingInlineSnapshot(
1732+
`[Error: Pipeline frequency must be at least 60 seconds for R2 Data Catalog sinks to prevent compaction issues. Current value: 30 seconds.]`
1733+
);
1734+
});
1735+
1736+
it("should allow r2 sinks with interval below 60 seconds", async ({
1737+
expect,
1738+
}) => {
1739+
const createRequest = mockCreateSinkRequest(expect, {
1740+
name: "my_sink",
1741+
type: "r2",
1742+
});
1743+
1744+
await runWrangler(
1745+
"pipelines sinks create my_sink --type r2 --bucket my-bucket --access-key-id mykey --secret-access-key mysecret --roll-interval 30"
1746+
);
1747+
1748+
expect(createRequest.count).toBe(1);
1749+
expect(std.err).toMatchInlineSnapshot(`""`);
1750+
});
17231751
});
17241752

17251753
describe("pipelines sinks list", () => {

packages/wrangler/src/pipelines/cli/setup.ts

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,10 +191,14 @@ async function promptCompression(): Promise<string> {
191191
});
192192
}
193193

194-
async function promptRollingPolicy(): Promise<{
194+
async function promptRollingPolicy(options?: {
195+
minIntervalSeconds?: number;
196+
}): Promise<{
195197
fileSizeBytes: number;
196198
intervalSeconds: number;
197199
}> {
200+
const minInterval = options?.minIntervalSeconds ?? 10;
201+
198202
const fileSizeMB = await promptWithRetry(
199203
() => "File size",
200204
() =>
@@ -214,13 +218,13 @@ async function promptRollingPolicy(): Promise<{
214218
const intervalSeconds = await promptWithRetry(
215219
() => "Interval",
216220
() =>
217-
prompt("Roll file when time reaches (seconds, minimum 10):", {
221+
prompt(`Roll file when time reaches (seconds, minimum ${minInterval}):`, {
218222
defaultValue: String(SINK_DEFAULTS.rolling_policy.interval_seconds),
219223
}),
220224
(value) => {
221225
const num = parseInt(value, 10);
222-
if (isNaN(num) || num < 10) {
223-
throw new UserError("Interval must be a number >= 10", {
226+
if (isNaN(num) || num < minInterval) {
227+
throw new UserError(`Interval must be a number >= ${minInterval}`, {
224228
telemetryMessage: "pipelines setup invalid interval",
225229
});
226230
}
@@ -960,7 +964,10 @@ async function setupDataCatalogSink(
960964
const token = await promptCatalogToken(config, accountId, bucket);
961965

962966
const compression = await promptCompression();
963-
const rollingPolicy = await promptRollingPolicy();
967+
// R2 Data Catalog sinks require minimum 60 second intervals to prevent compaction issues
968+
const rollingPolicy = await promptRollingPolicy({
969+
minIntervalSeconds: SINK_DEFAULTS.rolling_policy.min_interval_seconds,
970+
});
964971

965972
setupConfig.sinkConfig = {
966973
name: setupConfig.sinkName,

packages/wrangler/src/pipelines/cli/sinks/create.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,19 @@ export const pipelinesSinksCreateCommand = createCommand({
144144
{ telemetryMessage: "pipelines sinks create invalid format" }
145145
);
146146
}
147+
// Enforce minimum interval for R2 Data Catalog to prevent compaction issues
148+
if (
149+
args.rollInterval !== undefined &&
150+
args.rollInterval < SINK_DEFAULTS.rolling_policy.min_interval_seconds
151+
) {
152+
throw new CommandLineArgsError(
153+
`Pipeline frequency must be at least ${SINK_DEFAULTS.rolling_policy.min_interval_seconds} seconds for R2 Data Catalog sinks to prevent compaction issues. Current value: ${args.rollInterval} seconds.`,
154+
{
155+
telemetryMessage:
156+
"pipelines r2 data catalog interval below minimum threshold",
157+
}
158+
);
159+
}
147160
}
148161
},
149162
async handler(args, { config }) {

packages/wrangler/src/pipelines/defaults.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export const SINK_DEFAULTS = {
99
rolling_policy: {
1010
file_size_bytes: undefined,
1111
interval_seconds: 300,
12+
min_interval_seconds: 60,
1213
},
1314
r2: {
1415
path: "",

0 commit comments

Comments
 (0)