Skip to content

Commit 9812d38

Browse files
khaliqgantProactive Runtime Bot
andauthored
feat(cloud): upload workflow code through cloud storage API (#938)
Co-authored-by: Proactive Runtime Bot <agent@agent-relay.com>
1 parent cb8cfc0 commit 9812d38

2 files changed

Lines changed: 123 additions & 21 deletions

File tree

packages/cloud/src/workflows.test.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,74 @@ describe('runWorkflow code sync', () => {
425425
});
426426
expect((runBodies[0] as { paths?: unknown }).paths).toBeUndefined();
427427
});
428+
429+
it('uploads code through the cloud API when prepare returns cloud-api storage', async () => {
430+
await writeFile('README.md', 'cloud-api\n');
431+
const workflowPath = path.join(tmpRoot, 'workflow.yaml');
432+
await writeFile(
433+
workflowPath,
434+
['version: "1.0"', 'name: cloud-api', 'swarm:', ' pattern: dag', 'agents: []', 'workflows: []'].join(
435+
'\n'
436+
)
437+
);
438+
439+
const runBodies: unknown[] = [];
440+
const uploadPaths: string[] = [];
441+
authorizedApiFetchMock.mockImplementation(async (_auth, requestPath, init) => {
442+
if (requestPath === '/api/v1/workflows/prepare') {
443+
return {
444+
auth: { accessToken: 'token' },
445+
response: new Response(
446+
JSON.stringify({
447+
runId: 'run-1',
448+
s3Credentials: {
449+
...s3Credentials,
450+
backend: 'cloud-api',
451+
cloudApiUrl: 'https://agentrelay.com/cloud',
452+
cloudApiAccessToken: 'token',
453+
},
454+
s3CodeKey: 'code.tar.gz',
455+
workflowStorage: { backend: 'cloud-api' },
456+
}),
457+
{ status: 200, headers: { 'Content-Type': 'application/json' } }
458+
),
459+
};
460+
}
461+
if (requestPath === '/api/v1/workflows/runs/run-1/storage/code.tar.gz') {
462+
uploadPaths.push(requestPath);
463+
expect(init?.method).toBe('PUT');
464+
expect(init?.headers).toMatchObject({ 'content-type': 'application/gzip' });
465+
expect(init?.body).toBeInstanceOf(Buffer);
466+
return {
467+
auth: { accessToken: 'token' },
468+
response: new Response(JSON.stringify({ ok: true }), {
469+
status: 200,
470+
headers: { 'Content-Type': 'application/json' },
471+
}),
472+
};
473+
}
474+
if (requestPath === '/api/v1/workflows/run') {
475+
runBodies.push(JSON.parse(String(init?.body)));
476+
return {
477+
auth: { accessToken: 'token' },
478+
response: new Response(JSON.stringify({ runId: 'run-1', status: 'pending' }), {
479+
status: 200,
480+
headers: { 'Content-Type': 'application/json' },
481+
}),
482+
};
483+
}
484+
throw new Error(`unexpected request: ${requestPath}`);
485+
});
486+
487+
await runWorkflow(workflowPath);
488+
489+
expect(s3SendMock).not.toHaveBeenCalled();
490+
expect(uploadPaths).toEqual(['/api/v1/workflows/runs/run-1/storage/code.tar.gz']);
491+
expect(runBodies[0]).toMatchObject({
492+
runId: 'run-1',
493+
s3CodeKey: 'code.tar.gz',
494+
});
495+
});
428496
});
429497

430498
describe('workflow schedules', () => {

packages/cloud/src/workflows.ts

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,23 @@ type ResolvedWorkflowInput = {
2525
};
2626

2727
type S3Credentials = {
28+
backend?: 's3' | 'cloud-api';
2829
accessKeyId: string;
2930
secretAccessKey: string;
3031
sessionToken: string;
3132
bucket: string;
3233
prefix: string;
34+
cloudApiUrl?: string;
35+
cloudApiAccessToken?: string;
3336
};
3437

3538
type PrepareWorkflowResponse = {
3639
runId: string;
3740
s3Credentials: S3Credentials;
3841
s3CodeKey: string;
42+
workflowStorage?: {
43+
backend?: 's3' | 'cloud-api';
44+
};
3945
};
4046

4147
type WorkflowPathDefinition = {
@@ -525,7 +531,40 @@ export async function runWorkflow(
525531
const prepared = prepPayload;
526532
console.error(` Prepared in ${((Date.now() - t0) / 1000).toFixed(1)}s`);
527533

528-
const s3Client = createScopedS3Client(prepared.s3Credentials);
534+
let s3Client: S3Client | null = null;
535+
const uploadCodeObject = async (objectKey: string, tarball: Buffer) => {
536+
if (isCloudApiWorkflowStorage(prepared)) {
537+
const { response, auth: uploadAuth } = await authorizedApiFetch(
538+
auth,
539+
workflowStorageObjectPath(prepared.runId, objectKey),
540+
{
541+
method: 'PUT',
542+
headers: {
543+
'content-type': 'application/gzip',
544+
accept: 'application/json',
545+
},
546+
body: tarball as unknown as BodyInit,
547+
}
548+
);
549+
auth = uploadAuth;
550+
const payload = await readJsonResponse(response);
551+
if (!response.ok) {
552+
throw new Error(`Workflow storage upload failed: ${describeResponseError(response, payload)}`);
553+
}
554+
return;
555+
}
556+
557+
s3Client ??= createScopedS3Client(prepared.s3Credentials);
558+
const key = scopedCodeKey(prepared.s3Credentials.prefix, objectKey);
559+
await s3Client.send(
560+
new PutObjectCommand({
561+
Bucket: prepared.s3Credentials.bucket,
562+
Key: key,
563+
Body: tarball,
564+
ContentType: 'application/gzip',
565+
})
566+
);
567+
};
529568
requestBody.runId = prepared.runId;
530569

531570
const declaredPaths = parseWorkflowPaths(input.workflow, input.fileType);
@@ -552,15 +591,7 @@ export async function runWorkflow(
552591
);
553592

554593
const t2 = Date.now();
555-
const key = scopedCodeKey(prepared.s3Credentials.prefix, s3CodeKey);
556-
await s3Client.send(
557-
new PutObjectCommand({
558-
Bucket: prepared.s3Credentials.bucket,
559-
Key: key,
560-
Body: tarball,
561-
ContentType: 'application/gzip',
562-
})
563-
);
594+
await uploadCodeObject(s3CodeKey, tarball);
564595
console.error(` ${pathDef.name}: uploaded in ${((Date.now() - t2) / 1000).toFixed(1)}s`);
565596

566597
const repo = parseGitHubRemoteForPath(absolutePath);
@@ -595,16 +626,8 @@ export async function runWorkflow(
595626
);
596627

597628
const t2 = Date.now();
598-
console.error('Uploading to S3...');
599-
const key = scopedCodeKey(prepared.s3Credentials.prefix, prepared.s3CodeKey);
600-
await s3Client.send(
601-
new PutObjectCommand({
602-
Bucket: prepared.s3Credentials.bucket,
603-
Key: key,
604-
Body: tarball,
605-
ContentType: 'application/gzip',
606-
})
607-
);
629+
console.error('Uploading to workflow storage...');
630+
await uploadCodeObject(prepared.s3CodeKey, tarball);
608631
console.error(` Uploaded in ${((Date.now() - t2) / 1000).toFixed(1)}s`);
609632

610633
requestBody.s3CodeKey = prepared.s3CodeKey;
@@ -981,10 +1004,21 @@ function isPrepareWorkflowResponse(payload: unknown): payload is PrepareWorkflow
9811004
typeof creds.secretAccessKey === 'string' &&
9821005
typeof creds.sessionToken === 'string' &&
9831006
typeof creds.bucket === 'string' &&
984-
typeof creds.prefix === 'string'
1007+
typeof creds.prefix === 'string' &&
1008+
(creds.backend === undefined || creds.backend === 's3' || creds.backend === 'cloud-api')
9851009
);
9861010
}
9871011

1012+
function isCloudApiWorkflowStorage(prepared: PrepareWorkflowResponse): boolean {
1013+
return prepared.workflowStorage?.backend === 'cloud-api' || prepared.s3Credentials.backend === 'cloud-api';
1014+
}
1015+
1016+
function workflowStorageObjectPath(runId: string, objectKey: string): string {
1017+
const encodedRunId = encodeURIComponent(runId);
1018+
const encodedKey = objectKey.split('/').map(encodeURIComponent).join('/');
1019+
return `/api/v1/workflows/runs/${encodedRunId}/storage/${encodedKey}`;
1020+
}
1021+
9881022
function createScopedS3Client(s3Credentials: S3Credentials): S3Client {
9891023
return new S3Client({
9901024
region: process.env.AWS_REGION ?? process.env.AWS_DEFAULT_REGION ?? 'us-east-1',

0 commit comments

Comments
 (0)