Skip to content

Commit 42c9273

Browse files
Add Composer Airflow class. Update GoogleCloud.getCloudLoggingLogs, add executeBqCommand
1 parent 2b4f3cc commit 42c9273

File tree

10 files changed

+336
-65
lines changed

10 files changed

+336
-65
lines changed

shared/files/fileSystemService.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,10 @@ export interface IFileSystemService {
154154
*/
155155
writeFile(filePath: string, contents: string): Promise<void>;
156156

157+
/**
158+
* Deletes a file.
159+
* @param filePath The file path (either full filesystem path or relative to current working directory)
160+
*/
157161
deleteFile(filePath: string): Promise<void>;
158162

159163
/**

src/cli/llmAliases.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { FastMediumLLM } from '#llm/multi-agent/fastMedium';
2+
import { openAIFlexGPT5Mini } from '#llm/multi-agent/openaiFlex';
23
import { MAD_Balanced, MAD_Fast, MAD_SOTA } from '#llm/multi-agent/reasoning-debate';
34
import { Claude4_1_Opus_Vertex } from '#llm/services/anthropic-vertex';
45
import { cerebrasQwen3_235b_Thinking, cerebrasQwen3_Coder } from '#llm/services/cerebras';
@@ -18,7 +19,7 @@ export const LLM_CLI_ALIAS: Record<string, () => LLM> = {
1819
cc: cerebrasQwen3_Coder,
1920
g5: openaiGPT5,
2021
g5p: openaiGPT5priority,
21-
g5f: openaiGPT5flex,
22+
g5mf: openAIFlexGPT5Mini,
2223
gpt5: openaiGPT5,
2324
g5m: openaiGPT5mini,
2425
g5n: openaiGPT5nano,

src/functions/cloud/google/bigquery.ts

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { humanInTheLoop } from '#agent/autonomous/humanInTheLoop';
44
import { func, funcClass } from '#functionSchema/functionDecorators';
55
import { logger } from '#o11y/logger';
66
import { execCmd, execCommand, failOnError } from '#utils/exec';
7-
const Table = require('table');
87

98
// Should use either bq or the node library in all functions
109
@funcClass(__filename)
@@ -16,14 +15,10 @@ export class BigQuery {
1615
* @param projectId The Google Cloud project id to run the query from. Defaults to the GCLOUD_PROJECT environment variable
1716
*/
1817
@func()
19-
async query(sqlQuery: string, location: string, projectId: string | undefined): Promise<string> {
18+
async query(sqlQuery: string, location: string, projectId: string | undefined): Promise<any[][]> {
2019
projectId ??= process.env.GCLOUD_PROJECT;
2120
if (!projectId) throw new Error('GCLOUD_PROJECT environment variable not set');
22-
const result = await new BigQueryDriver(projectId, location).query(sqlQuery);
23-
if (result.length > 5001) {
24-
return `${result.substring(0, 5000)}\n<truncated>`;
25-
}
26-
return result;
21+
return await new BigQueryDriver(projectId, location).query(sqlQuery);
2722
}
2823

2924
/**
@@ -39,7 +34,7 @@ export class BigQuery {
3934
}
4035
}
4136

42-
class BigQueryDriver {
37+
export class BigQueryDriver {
4338
private bigqueryClient: BigQueryClient;
4439

4540
constructor(
@@ -49,10 +44,11 @@ class BigQueryDriver {
4944
this.bigqueryClient = new BigQueryClient({ projectId });
5045
}
5146

52-
async query<T>(query: string): Promise<string> {
47+
async query<T>(query: string, queryParameters?: Record<string, any>): Promise<any[][]> {
5348
const [dryRun] = await this.bigqueryClient.createQueryJob({
5449
query,
5550
location: this.defaultLocation,
51+
params: queryParameters,
5652
dryRun: true,
5753
});
5854

@@ -63,6 +59,7 @@ class BigQueryDriver {
6359
const [job] = await this.bigqueryClient.createQueryJob({
6460
query,
6561
location: this.defaultLocation,
62+
params: queryParameters,
6663
});
6764

6865
// Wait for the query to finish
@@ -78,9 +75,6 @@ class BigQueryDriver {
7875
const headers = Object.keys(rows[0]);
7976
tableData.unshift(headers);
8077

81-
// Create and print the table
82-
return Table.table(tableData, {
83-
columns: headers.map((header) => ({ alignment: 'left', width: 20 })),
84-
});
78+
return tableData;
8579
}
8680
}
Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import { existsSync, readFile, readFileSync } from 'node:fs';
2+
import { join } from 'node:path';
3+
import axios, { AxiosInstance, AxiosResponse } from 'axios';
4+
import { GoogleAuth } from 'google-auth-library';
5+
import { func, funcClass } from '#functionSchema/functionDecorators';
6+
7+
const AUTH_SCOPE = 'https://www.googleapis.com/auth/cloud-platform';
8+
9+
interface DagConfig {
10+
[key: string]: any;
11+
}
12+
13+
interface DagRun {
14+
conf: any;
15+
dag_id: string;
16+
dag_run_id: string;
17+
data_interval_end: string;
18+
data_interval_start: string;
19+
end_date: string;
20+
execution_date: string;
21+
external_trigger: boolean;
22+
last_scheduling_decision: string;
23+
logical_date: string;
24+
note: string;
25+
run_type: string;
26+
start_date: string;
27+
state: string;
28+
}
29+
30+
interface TaskInstance {
31+
dag_id: string;
32+
dag_run_id: string;
33+
duration: number;
34+
end_date: string;
35+
execution_date: string;
36+
executor: string;
37+
executor_config: string;
38+
hostname: string;
39+
map_index: number;
40+
max_tries: number;
41+
note: string;
42+
operator: string;
43+
pid: number;
44+
pool: string;
45+
pool_slots: number;
46+
priority_weight: number;
47+
queue: string;
48+
queued_when: string;
49+
rendered_fields: any;
50+
rendered_map_index: number;
51+
sla_miss: any | null;
52+
start_date: string;
53+
state: string;
54+
task_display_name: string;
55+
task_id: string;
56+
trigger: any | null;
57+
triggerer_job: any | null;
58+
try_number: number;
59+
unixname: string;
60+
}
61+
62+
let airflowMapping: Record<string, string> | undefined;
63+
64+
/**
65+
* Required the file airflow.json to be present in the root of the project.
66+
* The file should contain a JSON object with the following format:
67+
* {
68+
* "gcpProjectId": "https://airflow.example.com"
69+
* }
70+
*/
71+
@funcClass(__filename)
72+
export class ComposerAirflowClient {
73+
private auth: GoogleAuth;
74+
private httpClient: AxiosInstance;
75+
76+
constructor() {
77+
// Initialize GoogleAuth client using Application Default Credentials (ADC)
78+
this.auth = new GoogleAuth({ scopes: [AUTH_SCOPE] });
79+
this.httpClient = axios.create({ timeout: 90000 });
80+
}
81+
82+
/**
83+
* Helper function to determine the Composer Airflow Web Server URL based on Google Cloud project ID.
84+
*/
85+
private getWebServerUrl(gcpProjectId: string): string {
86+
if (!airflowMapping) {
87+
const airflowFilePath = join(process.cwd(), 'airflow.json');
88+
if (!existsSync(airflowFilePath)) throw new Error(`Airflow config file not found at: ${airflowFilePath}`);
89+
airflowMapping = JSON.parse(readFileSync(airflowFilePath).toString());
90+
if (!airflowMapping) throw new Error('Invalid Airflow config');
91+
}
92+
if (!airflowMapping[gcpProjectId]) {
93+
throw new Error(`No Airflow config found for project ID: ${gcpProjectId} Valid project IDs: ${Object.keys(airflowMapping).join(', ')}`);
94+
}
95+
return airflowMapping[gcpProjectId];
96+
}
97+
98+
/**
99+
* Fetches DAG runs for the given DAG ID and Google Cloud Project.
100+
*
101+
* @param gcpProjectId The Google Cloud Project ID where the Composer environment lives.
102+
* @param dagId The ID of the DAG to fetch runs for.
103+
* @param limit The maximum number of runs to fetch. (Defaults to 20)
104+
*/
105+
@func()
106+
public async fetchDagRuns(gcpProjectId: string, dagId: string, limit = 20): Promise<DagRun[]> {
107+
const airflowWebServerUrl = this.getWebServerUrl(gcpProjectId);
108+
const token = await this.getAuthToken();
109+
110+
const url = `${airflowWebServerUrl}/api/v1/dags/${dagId}/dagRuns?limit=${limit}`;
111+
const response = await this.makeRequest(url, 'GET', token);
112+
113+
return response.data.dag_runs;
114+
}
115+
116+
/**
117+
* Fetches all task instances for a specific DAG run.
118+
* @param gcpProjectId The Google Cloud Project ID.
119+
* @param dagId The ID of the DAG.
120+
* @param dagRunId The ID of the specific DAG run.
121+
* @returns A promise that resolves to an array of task instance objects.
122+
*/
123+
@func()
124+
public async fetchTaskInstances(gcpProjectId: string, dagId: string, dagRunId: string): Promise<TaskInstance[]> {
125+
const airflowWebServerUrl = this.getWebServerUrl(gcpProjectId);
126+
const token = await this.getAuthToken();
127+
128+
const url = `${airflowWebServerUrl}/api/v1/dags/${dagId}/dagRuns/${dagRunId}/taskInstances`;
129+
const response = await this.makeRequest(url, 'GET', token);
130+
131+
return response.data.task_instances;
132+
}
133+
134+
/**
135+
* Fetches the raw log for a specific task attempt.
136+
* @param gcpProjectId The Google Cloud Project ID.
137+
* @param dagId The ID of the DAG.
138+
* @param dagRunId The ID of the DAG run.
139+
* @param taskId The ID of the task.
140+
* @param tryNumber The attempt number of the task.
141+
* @returns A promise that resolves to the raw log content as a string.
142+
*/
143+
@func()
144+
public async fetchTaskLog(gcpProjectId: string, dagId: string, dagRunId: string, taskId: string, tryNumber: number): Promise<string> {
145+
const airflowWebServerUrl = this.getWebServerUrl(gcpProjectId);
146+
const token = await this.getAuthToken();
147+
148+
const url = `${airflowWebServerUrl}/api/v1/dags/${dagId}/dagRuns/${dagRunId}/taskInstances/${taskId}/logs/${tryNumber}`;
149+
const response = await this.makeRequest(url, 'GET', token);
150+
151+
return response.data;
152+
}
153+
154+
/**
155+
* Fetches detailed metadata for a specific DAG.
156+
* @param gcpProjectId The Google Cloud Project ID.
157+
* @param dagId The ID of the DAG.
158+
* @returns A promise that resolves to the DAG detail object.
159+
*/
160+
@func()
161+
public async fetchDagDetails(gcpProjectId: string, dagId: string): Promise<any> {
162+
const airflowWebServerUrl = this.getWebServerUrl(gcpProjectId);
163+
const token = await this.getAuthToken();
164+
const url = `${airflowWebServerUrl}/api/v1/dags/${dagId}`;
165+
const response = await this.makeRequest(url, 'GET', token);
166+
return response.data;
167+
}
168+
169+
/**
170+
* Fetches the current Airflow configuration (airflow.cfg).
171+
* @param gcpProjectId The Google Cloud Project ID.
172+
* @returns A promise that resolves to the Airflow configuration object.
173+
*/
174+
@func()
175+
public async fetchAirflowConfig(gcpProjectId: string): Promise<any> {
176+
const airflowWebServerUrl = this.getWebServerUrl(gcpProjectId);
177+
const token = await this.getAuthToken();
178+
179+
const url = `${airflowWebServerUrl}/api/v1/config`;
180+
const response = await this.makeRequest(url, 'GET', token);
181+
182+
return response.data;
183+
}
184+
185+
/**
186+
* Fetches a short-lived access token needed for authorization.
187+
* This method supports the manual token handling approach seen in fetchDagRuns.
188+
* @returns The access token string.
189+
*/
190+
private async getAuthToken(): Promise<string> {
191+
const token = await this.auth.getAccessToken();
192+
if (!token || typeof token !== 'string' || token.length === 0) throw new Error('Failed to retrieve access token.');
193+
return token;
194+
}
195+
196+
/**
197+
* Generic request handler that uses the retrieved token for authorization.
198+
* @param url The full URL to fetch.
199+
* @param method The HTTP method ('GET', 'POST', etc.).
200+
* @param token The Bearer token for Authorization.
201+
* @param data Optional payload data for POST/PUT requests.
202+
* @returns The Axios response object.
203+
*/
204+
private async makeRequest(url: string, method: 'GET' | 'POST' | 'PUT' | 'DELETE', token: string, data?: object): Promise<AxiosResponse> {
205+
try {
206+
console.debug(`Making ${method} request to: ${url}`);
207+
const response = await this.httpClient({
208+
method,
209+
url,
210+
data: data,
211+
headers: {
212+
Authorization: `Bearer ${token}`,
213+
'Content-Type': 'application/json',
214+
},
215+
});
216+
return response;
217+
} catch (error) {
218+
if (axios.isAxiosError(error) && error.response) {
219+
const status = error.response.status;
220+
if (status === 403) throw new Error(`403 Forbidden: Check Airflow RBAC roles for your account. Details: ${JSON.stringify(error.response.data)}`);
221+
throw new Error(`Request failed with status ${status}: ${error.response.statusText}. ` + `Response data: ${JSON.stringify(error.response.data)}`);
222+
}
223+
throw error;
224+
}
225+
}
226+
}

0 commit comments

Comments
 (0)