Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,30 @@ brightdata scraper create <url> <description> [options]
| `--name <name>` | Scraper template name (default: `cli-scraper-<timestamp>`) |
| `--deliver-webhook <url>` | Webhook URL for the deliver stub (default: `https://example.com/webhook`) |
| `--timeout <seconds>` | Polling timeout in seconds (default: `600`) |
| `--max-retries <n>` | Max retries on the AI-Flow concurrent-job cap 429 (default: `4`). See below. |
| `--no-retry` | Fail immediately on 429 instead of waiting. Same as `--max-retries 0`. |
| `-o, --output <path>` | Write output to file |
| `--json` / `--pretty` | JSON output (raw / indented) |
| `--timing` | Show request timing |
| `-k, --api-key <key>` | Override API key |

> **Note:** The scraper is created with a placeholder webhook delivery target (`https://example.com/webhook`). You can reconfigure the actual delivery endpoint in the [Bright Data web UI](https://brightdata.com/cp/scrapers) after creation.

#### Concurrent-job cap & auto-backoff

The Bright Data AI Flow caps concurrent `scraper create` generations per account (currently 3). If you exceed it, the API returns `429 Cannot run more than N jobs in parallel`. The CLI handles this automatically: it waits with exponential backoff + jitter and retries up to `--max-retries` times (default 4). During the wait the CLI prints status lines so you know it isn't hung:

```
Triggering AI generation...
Hit AI-Flow concurrent-job cap (429). Waiting 32s before retry 1/4...
Hit AI-Flow concurrent-job cap (429). Waiting 67s before retry 2/4...
Generating scraper...
```

If the cap is still hit after all retries, the CLI exits with a stderr note pointing at the half-built collector's dashboard URL so you can inspect or delete it manually (Bright Data does not yet expose programmatic collector deletion).

Use `--no-retry` if you want the old fail-fast behavior — typically for scripts that prefer to handle backoff themselves.

**Examples**

```bash
Expand All @@ -338,6 +355,17 @@ brightdata scraper create https://example.com/product/1 \
"Extract title, price, and image URL from this product page" \
--name my-product-scraper --pretty -o scraper-output.json

# Fan out 10 parallel creates — the CLI serialises automatically via 429 backoff
for url in $(cat urls.txt); do
brightdata scraper create "$url" "Extract title, price, ..." \
--name "scraper-$(basename $url)" &
done; wait

# Disable the auto-backoff (fail fast on 429)
brightdata scraper create https://example.com/product/1 \
"Extract title, price, and image URL from this product page" \
--no-retry

# Use a custom webhook delivery URL
brightdata scraper create https://example.com/product/1 \
"Extract title, price, and image URL from this product page" \
Expand Down
230 changes: 228 additions & 2 deletions src/__tests__/commands/scraper.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ import {
parse_sync_timeout,
is_realtime_page_limit_error,
classify_dataset,
parse_max_retries,
build_ai_trigger_retry,
print_stub_recovery_note,
AI_TRIGGER_DEFAULT_RETRIES,
AI_TRIGGER_RETRY_BASE_MS,
AI_TRIGGER_RETRY_MAX_MS,
} from '../../commands/scraper';

describe('commands/scraper', ()=>{
Expand Down Expand Up @@ -176,15 +182,15 @@ describe('commands/scraper', ()=>{
expect.objectContaining({
deliver: expect.objectContaining({type: 'webhook'}),
}),
{timing: undefined}
expect.objectContaining({timing: undefined})
);
expect(mocks.post).toHaveBeenNthCalledWith(
2,
'api_key',
'/dca/collectors/c_abc/automate_template',
{description: 'extract title',
urls: ['https://example.com/p/1']},
{timing: undefined}
expect.objectContaining({timing: undefined})
);
expect(mocks.poll_until).toHaveBeenCalledTimes(1);
expect(mocks.poll_until).toHaveBeenCalledWith(
Expand Down Expand Up @@ -624,4 +630,224 @@ describe('commands/scraper', ()=>{
);
});
});

// PR-11: AI-Flow concurrent-job cap (429) auto-backoff. The
// mechanism in client.ts is generic; these tests assert the
// scraper command wires it with the right config + opts.
describe('parse_max_retries (PR-11)', ()=>{
it('defaults to the AI-trigger default when undefined', ()=>{
expect(parse_max_retries(undefined))
.toBe(AI_TRIGGER_DEFAULT_RETRIES);
});

it('parses a non-negative integer', ()=>{
expect(parse_max_retries('0')).toBe(0);
expect(parse_max_retries('1')).toBe(1);
expect(parse_max_retries('8')).toBe(8);
});

it('rejects negatives, floats, and non-numeric', ()=>{
expect(()=>parse_max_retries('-1')).toThrow(/non-negative/);
expect(()=>parse_max_retries('1.5')).toThrow(/non-negative/);
expect(()=>parse_max_retries('abc')).toThrow(/non-negative/);
});
});

describe('build_ai_trigger_retry (PR-11)', ()=>{
it('returns the default schedule when no flags are set', ()=>{
const cfg = build_ai_trigger_retry({});
expect(cfg).toBeDefined();
expect(cfg!.max_attempts).toBe(AI_TRIGGER_DEFAULT_RETRIES);
expect(cfg!.base_ms).toBe(AI_TRIGGER_RETRY_BASE_MS);
expect(cfg!.max_ms).toBe(AI_TRIGGER_RETRY_MAX_MS);
expect(typeof cfg!.on_retry).toBe('function');
});

it('--max-retries overrides max_attempts', ()=>{
const cfg = build_ai_trigger_retry({maxRetries: '8'});
expect(cfg!.max_attempts).toBe(8);
// Base/ceiling stay at the AI-Flow values.
expect(cfg!.base_ms).toBe(AI_TRIGGER_RETRY_BASE_MS);
});

it('--no-retry disables retries entirely', ()=>{
const cfg = build_ai_trigger_retry({noRetry: true});
expect(cfg!.max_attempts).toBe(0);
});

it('on_retry emits a 429-specific stderr line when status=429',
()=>{
const error = vi.spyOn(console, 'error')
.mockImplementation(()=>{});
const cfg = build_ai_trigger_retry({});
cfg!.on_retry!({
attempt: 1, max_attempts: 4,
delay_ms: 32_000, status: 429,
});
const msg = error.mock.calls.map(c=>String(c[0])).join('\n');
expect(msg).toMatch(/AI-Flow concurrent-job cap/);
expect(msg).toMatch(/32s/);
expect(msg).toMatch(/retry 1\/4/);
error.mockRestore();
});

it('on_retry emits a generic transient line for non-429', ()=>{
const error = vi.spyOn(console, 'error')
.mockImplementation(()=>{});
const cfg = build_ai_trigger_retry({});
cfg!.on_retry!({
attempt: 2, max_attempts: 4,
delay_ms: 60_000, status: 503,
});
const msg = error.mock.calls.map(c=>String(c[0])).join('\n');
expect(msg).toMatch(/Transient error/);
expect(msg).toMatch(/status 503/);
error.mockRestore();
});

it('on_retry handles network-error case (status=0)', ()=>{
const error = vi.spyOn(console, 'error')
.mockImplementation(()=>{});
const cfg = build_ai_trigger_retry({});
cfg!.on_retry!({
attempt: 1, max_attempts: 4,
delay_ms: 30_000, status: 0,
});
const msg = error.mock.calls.map(c=>String(c[0])).join('\n');
expect(msg).toMatch(/status network/);
error.mockRestore();
});
});

describe('print_stub_recovery_note (PR-11)', ()=>{
it('prints a dashboard URL and a manual-deletion notice', ()=>{
const error = vi.spyOn(console, 'error')
.mockImplementation(()=>{});
print_stub_recovery_note('c_xyz');
const msg = error.mock.calls.map(c=>String(c[0])).join('\n');
expect(msg).toContain('c_xyz');
expect(msg).toMatch(
/https:\/\/brightdata\.com\/cp\/scrapers\/c_xyz/);
expect(msg).toMatch(/inspect or delete it manually/);
expect(msg).toMatch(/does not yet expose programmatic/);
error.mockRestore();
});

it('does nothing when collector_id is empty', ()=>{
const error = vi.spyOn(console, 'error')
.mockImplementation(()=>{});
print_stub_recovery_note('');
expect(error).not.toHaveBeenCalled();
error.mockRestore();
});
});

describe('handle_create_scraper retry + stub-nudge wiring (PR-11)',
()=>{
it('passes the AI-trigger retry config to the second post call',
async()=>{
mocks.post
.mockResolvedValueOnce({id: 'c_abc'})
.mockResolvedValueOnce({id: 'ia_xyz', queued: false});
mocks.poll_until.mockResolvedValue({
result: {status: 'done', completed_steps: []},
attempts: 1,
});
await handle_create_scraper('https://x.com', 'd', {});
const ai_call_opts = mocks.post.mock.calls[1][3] as
{retry?: {max_attempts: number; base_ms: number}};
expect(ai_call_opts.retry).toBeDefined();
expect(ai_call_opts.retry!.max_attempts)
.toBe(AI_TRIGGER_DEFAULT_RETRIES);
expect(ai_call_opts.retry!.base_ms)
.toBe(AI_TRIGGER_RETRY_BASE_MS);
});

it('honors --max-retries on the AI-trigger', async()=>{
mocks.post
.mockResolvedValueOnce({id: 'c_abc'})
.mockResolvedValueOnce({id: 'ia_xyz', queued: false});
mocks.poll_until.mockResolvedValue({
result: {status: 'done', completed_steps: []},
attempts: 1,
});
await handle_create_scraper('https://x.com', 'd',
{maxRetries: '10'});
const ai_call_opts = mocks.post.mock.calls[1][3] as
{retry?: {max_attempts: number}};
expect(ai_call_opts.retry!.max_attempts).toBe(10);
});

it('honors --no-retry (max_attempts = 0)', async()=>{
mocks.post
.mockResolvedValueOnce({id: 'c_abc'})
.mockResolvedValueOnce({id: 'ia_xyz', queued: false});
mocks.poll_until.mockResolvedValue({
result: {status: 'done', completed_steps: []},
attempts: 1,
});
await handle_create_scraper('https://x.com', 'd',
{noRetry: true});
const ai_call_opts = mocks.post.mock.calls[1][3] as
{retry?: {max_attempts: number}};
expect(ai_call_opts.retry!.max_attempts).toBe(0);
});

it('emits the stub-recovery note when AI-trigger ultimately fails',
async()=>{
mocks.post
.mockResolvedValueOnce({id: 'c_stub'})
.mockRejectedValueOnce(
new Error('Cannot run more than 3 jobs in parallel'));
const exit = vi.spyOn(process, 'exit')
.mockImplementation(()=>undefined as never);
const error = vi.spyOn(console, 'error')
.mockImplementation(()=>{});
await handle_create_scraper('https://x.com', 'd', {});
const msg = error.mock.calls.map(c=>String(c[0])).join('\n');
expect(msg).toContain('c_stub');
expect(msg).toMatch(/inspect or delete it manually/);
exit.mockRestore();
error.mockRestore();
});

it('emits the stub-recovery note when poll status != done',
async()=>{
mocks.post
.mockResolvedValueOnce({id: 'c_abc'})
.mockResolvedValueOnce({id: 'ia_xyz', queued: false});
mocks.poll_until.mockResolvedValue({
result: {status: 'failed', completed_steps: []},
attempts: 2,
});
const exit = vi.spyOn(process, 'exit')
.mockImplementation(()=>undefined as never);
const error = vi.spyOn(console, 'error')
.mockImplementation(()=>{});
await handle_create_scraper('https://x.com', 'd', {});
const msg = error.mock.calls.map(c=>String(c[0])).join('\n');
expect(msg).toMatch(/inspect or delete it manually/);
expect(msg).toContain('c_abc');
exit.mockRestore();
error.mockRestore();
});

it('emits the stub-recovery note when polling itself throws',
async()=>{
mocks.post
.mockResolvedValueOnce({id: 'c_abc'})
.mockResolvedValueOnce({id: 'ia_xyz', queued: false});
mocks.poll_until.mockRejectedValue(
new Error('Timeout after 600 seconds'));
const exit = vi.spyOn(process, 'exit')
.mockImplementation(()=>undefined as never);
const error = vi.spyOn(console, 'error')
.mockImplementation(()=>{});
await handle_create_scraper('https://x.com', 'd', {});
const msg = error.mock.calls.map(c=>String(c[0])).join('\n');
expect(msg).toMatch(/inspect or delete it manually/);
exit.mockRestore();
error.mockRestore();
});
});
});
61 changes: 61 additions & 0 deletions src/__tests__/utils/client.retry.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import {describe, it, expect} from 'vitest';
import {
compute_backoff,
RETRY_BASE_MS,
RETRY_MAX_MS_DEFAULT,
} from '../../utils/client';

// PR-11: the per-request retry config + jitter is the mechanism that
// makes the AI-Flow concurrent-job cap (429) recoverable client-side.
// These tests cover the pure backoff math; the integration tests for
// the AI-trigger wiring live in commands/scraper.test.ts.

describe('utils/client.compute_backoff', ()=>{
it('grows exponentially with attempt', ()=>{
// With base=1000, max=large: attempt 0 → exp=1000 → range [500,1000]
// attempt 1 → exp=2000 → range [1000,2000]; etc. Take min of many
// samples so jitter doesn't flake the test.
const sample_min = (attempt: number)=>{
let m = Infinity;
for (let i = 0; i < 50; i++)
{
const d = compute_backoff(attempt, 1000, 1_000_000);
if (d < m) m = d;
}
return m;
};
expect(sample_min(0)).toBeGreaterThanOrEqual(500);
expect(sample_min(1)).toBeGreaterThanOrEqual(1000);
expect(sample_min(2)).toBeGreaterThanOrEqual(2000);
});

it('honors the max_ms ceiling', ()=>{
// At attempt 20, base * 2^20 would be huge; max_ms should cap it.
for (let i = 0; i < 50; i++)
{
const d = compute_backoff(20, 1000, 5000);
expect(d).toBeLessThanOrEqual(5000);
}
});

it('uses full-jitter (delay falls in [exp/2, exp])', ()=>{
// Sample many to ensure both halves of the range are reachable.
const samples: number[] = [];
for (let i = 0; i < 200; i++)
samples.push(compute_backoff(0, 1000, 1_000_000));
// exp = 1000, so range is [500, 1000].
expect(Math.min(...samples)).toBeGreaterThanOrEqual(500);
expect(Math.max(...samples)).toBeLessThanOrEqual(1000);
// With 200 samples we'd expect both halves to be hit.
const below_mid = samples.filter(d=>d < 750).length;
const above_mid = samples.filter(d=>d >= 750).length;
expect(below_mid).toBeGreaterThan(20);
expect(above_mid).toBeGreaterThan(20);
});

it('exported defaults match the documented short schedule', ()=>{
// Sanity: don't accidentally make every command wait minutes.
expect(RETRY_BASE_MS).toBe(500);
expect(RETRY_MAX_MS_DEFAULT).toBe(16_000);
});
});
Loading