Skip to content

Commit 64d0b57

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Refactor BigQueryAgentAnalyticsPlugin for async and GCS offloading
This change updates the BigQueryAgentAnalyticsPlugin to handle content parsing and logging asynchronously using CompletableFutures. It introduces a GCSOffloader to store large text and binary content in Google Cloud Storage, with references logged to BigQuery. The JsonFormatter is updated to support this offloading. PiperOrigin-RevId: 905233794
1 parent e9184c9 commit 64d0b57

12 files changed

Lines changed: 1813 additions & 757 deletions

File tree

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,10 @@
217217
<artifactId>arrow-memory-netty</artifactId>
218218
<version>17.0.0</version>
219219
</dependency>
220+
<dependency>
221+
<groupId>org.apache.tika</groupId>
222+
<artifactId>tika-core</artifactId>
223+
</dependency>
220224
</dependencies>
221225
<build>
222226
<resources>

core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java

Lines changed: 437 additions & 446 deletions
Large diffs are not rendered by default.

core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public abstract class BigQueryLoggerConfig {
5959
public abstract ImmutableList<String> clusteringFields();
6060

6161
// Whether to log multi-modal content.
62-
// TODO(b/491852782): Implement logging of multi-modal content.
6362
public abstract boolean logMultiModalContent();
6463

6564
// Retry configuration for BigQuery writes.
@@ -96,7 +95,7 @@ public abstract class BigQueryLoggerConfig {
9695
// GCS bucket name to store multi-modal content.
9796
public abstract String gcsBucketName();
9897

99-
// TODO(b/491852782): Implement connection id.
98+
// Optional BigQuery connection ID for ObjectRef columns
10099
public abstract Optional<String> connectionId();
101100

102101
// Toggle for session metadata (e.g. gchat thread-id).
@@ -118,8 +117,7 @@ public abstract class BigQueryLoggerConfig {
118117
// Default "v" produces views like ``v_llm_request``.
119118
public abstract String viewPrefix();
120119

121-
@Nullable
122-
public abstract Credentials credentials();
120+
public abstract @Nullable Credentials credentials();
123121

124122
public abstract Builder toBuilder();
125123

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.plugins.agentanalytics;
18+
19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
21+
import com.google.auth.Credentials;
22+
import com.google.cloud.storage.BlobId;
23+
import com.google.cloud.storage.BlobInfo;
24+
import com.google.cloud.storage.Storage;
25+
import com.google.cloud.storage.StorageOptions;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Executor;
28+
import org.jspecify.annotations.Nullable;
29+
30+
/** Offloads content to GCS. */
31+
class GcsOffloader {
32+
private final Storage storage;
33+
private final String bucketName;
34+
private final Executor executor;
35+
36+
GcsOffloader(
37+
String projectId,
38+
String bucketName,
39+
Executor executor,
40+
@Nullable Credentials credentials,
41+
@Nullable Storage storageOverride) {
42+
if (storageOverride != null) {
43+
this.storage = storageOverride;
44+
} else {
45+
StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(projectId);
46+
if (credentials != null) {
47+
builder.setCredentials(credentials);
48+
}
49+
this.storage = builder.build().getService();
50+
}
51+
this.bucketName = bucketName;
52+
this.executor = executor;
53+
}
54+
55+
/** Async wrapper around blocking GCS upload for binary data. */
56+
CompletableFuture<String> uploadContent(byte[] data, String contentType, String path) {
57+
return CompletableFuture.supplyAsync(
58+
() -> {
59+
BlobId blobId = BlobId.of(bucketName, path);
60+
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType(contentType).build();
61+
storage.create(blobInfo, data);
62+
return String.format("gs://%s/%s", bucketName, path);
63+
},
64+
executor);
65+
}
66+
67+
/** Async wrapper around blocking GCS upload for text data. */
68+
CompletableFuture<String> uploadContent(String data, String contentType, String path) {
69+
return uploadContent(data.getBytes(UTF_8), contentType, path);
70+
}
71+
72+
String getBucketName() {
73+
return bucketName;
74+
}
75+
76+
void close() throws Exception {
77+
if (storage != null) {
78+
storage.close();
79+
}
80+
}
81+
}

0 commit comments

Comments
 (0)