Skip to content

Commit 65c3689

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Add GcsOffloader for asynchronously uploading content to Google Cloud Storage
This change enables user with possibility to upload content to GCP PiperOrigin-RevId: 913845322
1 parent 69638df commit 65c3689

11 files changed

Lines changed: 688 additions & 40 deletions

File tree

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,10 @@
217217
<artifactId>arrow-memory-netty</artifactId>
218218
<version>17.0.0</version>
219219
</dependency>
220+
<dependency>
221+
<groupId>org.apache.tika</groupId>
222+
<artifactId>tika-core</artifactId>
223+
</dependency>
220224
</dependencies>
221225
<build>
222226
<resources>

core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,10 @@ private Completable logEvent(
253253
parseFuture =
254254
state
255255
.getParser()
256-
.parse(content)
256+
.parse(
257+
content,
258+
traceIds.traceId(),
259+
traceIds.spanId() != null ? traceIds.spanId() : "no_span")
257260
.thenAccept(
258261
parsedContent -> {
259262
row.put(
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2026 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.adk.plugins.agentanalytics;
18+
19+
import static java.nio.charset.StandardCharsets.UTF_8;
20+
21+
import com.google.auth.Credentials;
22+
import com.google.cloud.storage.BlobId;
23+
import com.google.cloud.storage.BlobInfo;
24+
import com.google.cloud.storage.Storage;
25+
import com.google.cloud.storage.StorageOptions;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Executor;
28+
import java.util.concurrent.RejectedExecutionException;
29+
import org.jspecify.annotations.Nullable;
30+
31+
/** Offloads content to GCS. */
32+
class GcsOffloader {
33+
private final Storage storage;
34+
private final String bucketName;
35+
private final Executor executor;
36+
private final boolean isStorageOverride;
37+
38+
GcsOffloader(
39+
String projectId,
40+
String bucketName,
41+
Executor executor,
42+
@Nullable Credentials credentials,
43+
@Nullable Storage storageOverride) {
44+
if (storageOverride != null) {
45+
this.isStorageOverride = true;
46+
this.storage = storageOverride;
47+
} else {
48+
this.isStorageOverride = false;
49+
StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(projectId);
50+
if (credentials != null) {
51+
builder.setCredentials(credentials);
52+
}
53+
this.storage = builder.build().getService();
54+
}
55+
this.bucketName = bucketName;
56+
this.executor = executor;
57+
}
58+
59+
/** Async wrapper around blocking GCS upload for binary data. */
60+
CompletableFuture<String> uploadContent(byte[] data, String contentType, String path) {
61+
try {
62+
return CompletableFuture.supplyAsync(
63+
() -> {
64+
BlobId blobId = BlobId.of(bucketName, path);
65+
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType(contentType).build();
66+
storage.create(blobInfo, data);
67+
return String.format("gs://%s/%s", bucketName, path);
68+
},
69+
executor);
70+
} catch (RejectedExecutionException e) {
71+
return CompletableFuture.failedFuture(e);
72+
}
73+
}
74+
75+
/** Async wrapper around blocking GCS upload for text data. */
76+
CompletableFuture<String> uploadContent(String data, String contentType, String path) {
77+
try {
78+
return CompletableFuture.supplyAsync(() -> data.getBytes(UTF_8), executor)
79+
.thenCompose(bytes -> uploadContent(bytes, contentType, path));
80+
} catch (RejectedExecutionException e) {
81+
return CompletableFuture.failedFuture(e);
82+
}
83+
}
84+
85+
String getBucketName() {
86+
return bucketName;
87+
}
88+
89+
void close() throws Exception {
90+
if (storage != null && !isStorageOverride) {
91+
storage.close();
92+
}
93+
}
94+
}

core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java

Lines changed: 129 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.adk.plugins.agentanalytics.JsonFormatter.mapper;
2020
import static com.google.adk.plugins.agentanalytics.JsonFormatter.smartTruncate;
2121
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncate;
22+
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateAndAddSuffix;
2223
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateWithStatus;
2324

2425
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -39,16 +40,43 @@
3940
import java.util.ArrayList;
4041
import java.util.List;
4142
import java.util.Optional;
43+
import java.util.UUID;
4244
import java.util.concurrent.CompletableFuture;
45+
import java.util.logging.Level;
46+
import java.util.logging.Logger;
47+
import org.apache.tika.mime.MimeTypeException;
48+
import org.apache.tika.mime.MimeTypes;
4349
import org.jspecify.annotations.Nullable;
50+
import org.threeten.bp.Instant;
51+
import org.threeten.bp.LocalDate;
52+
import org.threeten.bp.ZoneOffset;
4453

4554
/** Utility for parsing content for BigQuery logging. */
4655
final class Parser {
56+
private static final String DEFAULT_EXTENSION = ".bin";
57+
private static final int MAX_OFFLOADED_TEXT_LENGTH = 200;
58+
private static final Logger logger = Logger.getLogger(Parser.class.getName());
59+
private static final int INLINE_TEXT_LIMIT = 32 * 1024; // 32KB limit
60+
private static final String UPLOAD_FAILED_MESSAGE = "[UPLOAD FAILED]";
61+
private static final String MEDIA_OFFLOADED_MESSAGE = "[MEDIA OFFLOADED]";
4762
private static final String BINARY_DATA_MESSAGE = "[BINARY DATA]";
48-
private final int maxLength;
63+
private static final String TEXT_OFFLOADED_SUFFIX = "... [OFFLOADED]";
64+
private static final MimeTypes MIME_TYPES = MimeTypes.getDefaultMimeTypes();
4965

50-
Parser(int maxLength) {
66+
private final @Nullable GcsOffloader offloader;
67+
private final int maxLength;
68+
private final @Nullable String connectionId;
69+
private final boolean logMultiModalContent;
70+
71+
Parser(
72+
@Nullable GcsOffloader offloader,
73+
int maxLength,
74+
@Nullable String connectionId,
75+
boolean logMultiModalContent) {
76+
this.offloader = offloader;
5177
this.maxLength = maxLength;
78+
this.connectionId = connectionId;
79+
this.logMultiModalContent = logMultiModalContent;
5280
}
5381

5482
@AutoValue
@@ -152,23 +180,27 @@ static ObjectRef create(
152180
* Parses content into JSON payload and content parts, matching Python implementation.
153181
*
154182
* @param content the content to parse
183+
* @param traceId the trace ID for GCS path
184+
* @param spanId the span ID for GCS path
155185
* @return a CompletableFuture of ParsedContent object
156186
*/
157-
CompletableFuture<ParsedContent> parse(Object content) {
187+
CompletableFuture<ParsedContent> parse(Object content, String traceId, String spanId) {
158188
if (content instanceof LlmRequest llmRequest) {
159189
ObjectNode jsonPayload = mapper.createObjectNode();
160190
ArrayNode messages = mapper.createArrayNode();
161191
List<CompletableFuture<ParsedContentObject>> futures = new ArrayList<>();
162192
List<Content> contents = llmRequest.contents();
163193

164194
for (Content c : contents) {
165-
futures.add(parseContentObject(c));
195+
futures.add(parseContentObject(c, traceId, spanId));
166196
}
167197

168198
CompletableFuture<ParsedContentObject> systemFuture = null;
169199
if (llmRequest.config().isPresent()
170200
&& llmRequest.config().get().systemInstruction().isPresent()) {
171-
systemFuture = parseContentObject(llmRequest.config().get().systemInstruction().get());
201+
systemFuture =
202+
parseContentObject(
203+
llmRequest.config().get().systemInstruction().get(), traceId, spanId);
172204
futures.add(systemFuture);
173205
}
174206
CompletableFuture<ParsedContentObject> finalSystemFuture = systemFuture;
@@ -202,7 +234,7 @@ CompletableFuture<ParsedContent> parse(Object content) {
202234
}
203235
if (content instanceof LlmResponse llmResponse) {
204236
ObjectNode jsonPayload = mapper.createObjectNode();
205-
return parseContentObject(llmResponse.content().orElse(null))
237+
return parseContentObject(llmResponse.content().orElse(null), traceId, spanId)
206238
.thenApply(
207239
parsed -> {
208240
ObjectNode summaryNode = mapper.createObjectNode();
@@ -225,7 +257,7 @@ CompletableFuture<ParsedContent> parse(Object content) {
225257
});
226258
}
227259
if (content instanceof Content || content instanceof Part) {
228-
return parseContentObject(content)
260+
return parseContentObject(content, traceId, spanId)
229261
.thenApply(
230262
parsed -> {
231263
ObjectNode summaryNode = mapper.createObjectNode();
@@ -249,10 +281,13 @@ CompletableFuture<ParsedContent> parse(Object content) {
249281
* Parses a Content or Part object into summary text and content parts.
250282
*
251283
* @param content the Content or Part object to parse
284+
* @param traceId the trace ID for GCS path
285+
* @param spanId the span ID for GCS path
252286
* @return a CompletableFuture of ParsedContentObject containing parts, summary, and truncation
253287
* flag
254288
*/
255-
private CompletableFuture<ParsedContentObject> parseContentObject(Object content) {
289+
private CompletableFuture<ParsedContentObject> parseContentObject(
290+
Object content, String traceId, String spanId) {
256291
List<Part> parts;
257292
if (content instanceof Content c) {
258293
parts = c.parts().orElse(ImmutableList.of());
@@ -265,7 +300,7 @@ private CompletableFuture<ParsedContentObject> parseContentObject(Object content
265300

266301
List<CompletableFuture<TruncationResult>> partFutures = new ArrayList<>();
267302
for (int i = 0; i < parts.size(); i++) {
268-
partFutures.add(processPart(parts.get(i), i));
303+
partFutures.add(processPart(parts.get(i), i, traceId, spanId));
269304
}
270305

271306
return CompletableFuture.allOf(partFutures.toArray(new CompletableFuture<?>[0]))
@@ -295,7 +330,8 @@ private CompletableFuture<ParsedContentObject> parseContentObject(Object content
295330
});
296331
}
297332

298-
private CompletableFuture<TruncationResult> processPart(Part part, int index) {
333+
private CompletableFuture<TruncationResult> processPart(
334+
Part part, int index, String traceId, String spanId) {
299335
ContentPart.Builder partBuilder =
300336
ContentPart.builder()
301337
.setPartIndex(index)
@@ -320,17 +356,89 @@ private CompletableFuture<TruncationResult> processPart(Part part, int index) {
320356
if (part.inlineData().isPresent()) {
321357
Blob blob = part.inlineData().get();
322358
String mimeType = blob.mimeType().orElse("application/octet-stream");
323-
partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
324-
return CompletableFuture.completedFuture(
325-
TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
359+
if (logMultiModalContent && offloader != null) {
360+
String ext = DEFAULT_EXTENSION;
361+
try {
362+
ext = MIME_TYPES.forName(mimeType).getExtension();
363+
} catch (MimeTypeException e) {
364+
logger.log(Level.WARNING, "Failed to get extension for mime type " + mimeType, e);
365+
}
366+
String path =
367+
String.format(
368+
"%s/%s/%s_p%d_%s%s",
369+
getLocalDate(), traceId, spanId, index, UUID.randomUUID(), ext);
370+
return offloader
371+
.uploadContent(blob.data().orElse(new byte[0]), mimeType, path)
372+
.handle(
373+
(uri, ex) -> {
374+
if (ex != null) {
375+
logger.log(Level.WARNING, "Failed to offload content to GCS", ex);
376+
partBuilder.setText(UPLOAD_FAILED_MESSAGE);
377+
} else {
378+
ObjectNode details = mapper.createObjectNode();
379+
ObjectNode gcsMetadata = details.putObject("gcs_metadata");
380+
gcsMetadata.put("content_type", mimeType);
381+
382+
partBuilder
383+
.setStorageMode("GCS_REFERENCE")
384+
.setUri(uri)
385+
.setMimeType(mimeType)
386+
.setText(MEDIA_OFFLOADED_MESSAGE)
387+
.setObjectRef(
388+
mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
389+
}
390+
return TruncationResult.create(mapper.valueToTree(partBuilder.build()), false);
391+
});
392+
} else {
393+
partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
394+
return CompletableFuture.completedFuture(
395+
TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
396+
}
326397
}
327398
// CASE C: Text
328399
if (part.text().isPresent()) {
329400
String text = part.text().get();
330-
TruncationResult res = truncateWithStatus(text, maxLength);
331-
partBuilder.setText(res.node().asText());
332-
return CompletableFuture.completedFuture(
333-
TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
401+
int textLen = Utf8.encodedLength(text);
402+
int offloadThreshold = Math.min(INLINE_TEXT_LIMIT, maxLength);
403+
404+
if (offloader != null && textLen > offloadThreshold) {
405+
406+
String path =
407+
String.format(
408+
"%s/%s/%s_p%d_%s.txt", getLocalDate(), traceId, spanId, index, UUID.randomUUID());
409+
return offloader
410+
.uploadContent(text, "text/plain", path)
411+
.handle(
412+
(uri, ex) -> {
413+
if (ex != null) {
414+
logger.log(Level.WARNING, "Failed to offload text to GCS", ex);
415+
TruncationResult res = truncateWithStatus(text, maxLength);
416+
partBuilder.setText(res.node().asText());
417+
return TruncationResult.create(
418+
mapper.valueToTree(partBuilder.build()), res.isTruncated());
419+
} else {
420+
ObjectNode details = mapper.createObjectNode();
421+
ObjectNode gcsMetadata = details.putObject("gcs_metadata");
422+
gcsMetadata.put("content_type", "text/plain");
423+
424+
partBuilder
425+
.setStorageMode("GCS_REFERENCE")
426+
.setUri(uri)
427+
.setMimeType("text/plain")
428+
.setText(
429+
truncateAndAddSuffix(
430+
text, MAX_OFFLOADED_TEXT_LENGTH, TEXT_OFFLOADED_SUFFIX))
431+
.setObjectRef(
432+
mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
433+
return TruncationResult.create(mapper.valueToTree(partBuilder.build()), true);
434+
}
435+
});
436+
} else {
437+
TruncationResult res = truncateWithStatus(text, maxLength);
438+
partBuilder.setText(res.node().asText());
439+
return CompletableFuture.completedFuture(
440+
TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
441+
}
334442
}
335443
if (part.functionCall().isPresent()) {
336444
FunctionCall fc = part.functionCall().get();
@@ -379,4 +487,8 @@ ArrayNode formatContentParts(Optional<Content> content) {
379487
}
380488
return partsArray;
381489
}
490+
491+
private LocalDate getLocalDate() {
492+
return Instant.now().atZone(ZoneOffset.UTC).toLocalDate();
493+
}
382494
}

0 commit comments

Comments
 (0)