Skip to content

Commit 5bad20a

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Add GcsOffloader for asynchronously uploading content to Google Cloud Storage
This change enables user with possibility to upload content to GCP PiperOrigin-RevId: 919952620
1 parent a1d2c1c commit 5bad20a

11 files changed

Lines changed: 40 additions & 688 deletions

File tree

core/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,6 @@
217217
<artifactId>arrow-memory-netty</artifactId>
218218
<version>17.0.0</version>
219219
</dependency>
220-
<dependency>
221-
<groupId>org.apache.tika</groupId>
222-
<artifactId>tika-core</artifactId>
223-
</dependency>
224220
</dependencies>
225221
<build>
226222
<resources>

core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -253,10 +253,7 @@ private Completable logEvent(
253253
parseFuture =
254254
state
255255
.getParser()
256-
.parse(
257-
content,
258-
traceIds.traceId(),
259-
traceIds.spanId() != null ? traceIds.spanId() : "no_span")
256+
.parse(content)
260257
.thenAccept(
261258
parsedContent -> {
262259
row.put(

core/src/main/java/com/google/adk/plugins/agentanalytics/GcsOffloader.java

Lines changed: 0 additions & 94 deletions
This file was deleted.

core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java

Lines changed: 17 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import static com.google.adk.plugins.agentanalytics.JsonFormatter.mapper;
2020
import static com.google.adk.plugins.agentanalytics.JsonFormatter.smartTruncate;
2121
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncate;
22-
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateAndAddSuffix;
2322
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateWithStatus;
2423

2524
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -40,43 +39,16 @@
4039
import java.util.ArrayList;
4140
import java.util.List;
4241
import java.util.Optional;
43-
import java.util.UUID;
4442
import java.util.concurrent.CompletableFuture;
45-
import java.util.logging.Level;
46-
import java.util.logging.Logger;
47-
import org.apache.tika.mime.MimeTypeException;
48-
import org.apache.tika.mime.MimeTypes;
4943
import org.jspecify.annotations.Nullable;
50-
import org.threeten.bp.Instant;
51-
import org.threeten.bp.LocalDate;
52-
import org.threeten.bp.ZoneOffset;
5344

5445
/** Utility for parsing content for BigQuery logging. */
5546
final class Parser {
56-
private static final String DEFAULT_EXTENSION = ".bin";
57-
private static final int MAX_OFFLOADED_TEXT_LENGTH = 200;
58-
private static final Logger logger = Logger.getLogger(Parser.class.getName());
59-
private static final int INLINE_TEXT_LIMIT = 32 * 1024; // 32KB limit
60-
private static final String UPLOAD_FAILED_MESSAGE = "[UPLOAD FAILED]";
61-
private static final String MEDIA_OFFLOADED_MESSAGE = "[MEDIA OFFLOADED]";
6247
private static final String BINARY_DATA_MESSAGE = "[BINARY DATA]";
63-
private static final String TEXT_OFFLOADED_SUFFIX = "... [OFFLOADED]";
64-
private static final MimeTypes MIME_TYPES = MimeTypes.getDefaultMimeTypes();
65-
66-
private final @Nullable GcsOffloader offloader;
6748
private final int maxLength;
68-
private final @Nullable String connectionId;
69-
private final boolean logMultiModalContent;
70-
71-
Parser(
72-
@Nullable GcsOffloader offloader,
73-
int maxLength,
74-
@Nullable String connectionId,
75-
boolean logMultiModalContent) {
76-
this.offloader = offloader;
49+
50+
Parser(int maxLength) {
7751
this.maxLength = maxLength;
78-
this.connectionId = connectionId;
79-
this.logMultiModalContent = logMultiModalContent;
8052
}
8153

8254
@AutoValue
@@ -180,27 +152,23 @@ static ObjectRef create(
180152
* Parses content into JSON payload and content parts, matching Python implementation.
181153
*
182154
* @param content the content to parse
183-
* @param traceId the trace ID for GCS path
184-
* @param spanId the span ID for GCS path
185155
* @return a CompletableFuture of ParsedContent object
186156
*/
187-
CompletableFuture<ParsedContent> parse(Object content, String traceId, String spanId) {
157+
CompletableFuture<ParsedContent> parse(Object content) {
188158
if (content instanceof LlmRequest llmRequest) {
189159
ObjectNode jsonPayload = mapper.createObjectNode();
190160
ArrayNode messages = mapper.createArrayNode();
191161
List<CompletableFuture<ParsedContentObject>> futures = new ArrayList<>();
192162
List<Content> contents = llmRequest.contents();
193163

194164
for (Content c : contents) {
195-
futures.add(parseContentObject(c, traceId, spanId));
165+
futures.add(parseContentObject(c));
196166
}
197167

198168
CompletableFuture<ParsedContentObject> systemFuture = null;
199169
if (llmRequest.config().isPresent()
200170
&& llmRequest.config().get().systemInstruction().isPresent()) {
201-
systemFuture =
202-
parseContentObject(
203-
llmRequest.config().get().systemInstruction().get(), traceId, spanId);
171+
systemFuture = parseContentObject(llmRequest.config().get().systemInstruction().get());
204172
futures.add(systemFuture);
205173
}
206174
CompletableFuture<ParsedContentObject> finalSystemFuture = systemFuture;
@@ -234,7 +202,7 @@ CompletableFuture<ParsedContent> parse(Object content, String traceId, String sp
234202
}
235203
if (content instanceof LlmResponse llmResponse) {
236204
ObjectNode jsonPayload = mapper.createObjectNode();
237-
return parseContentObject(llmResponse.content().orElse(null), traceId, spanId)
205+
return parseContentObject(llmResponse.content().orElse(null))
238206
.thenApply(
239207
parsed -> {
240208
ObjectNode summaryNode = mapper.createObjectNode();
@@ -257,7 +225,7 @@ CompletableFuture<ParsedContent> parse(Object content, String traceId, String sp
257225
});
258226
}
259227
if (content instanceof Content || content instanceof Part) {
260-
return parseContentObject(content, traceId, spanId)
228+
return parseContentObject(content)
261229
.thenApply(
262230
parsed -> {
263231
ObjectNode summaryNode = mapper.createObjectNode();
@@ -281,13 +249,10 @@ CompletableFuture<ParsedContent> parse(Object content, String traceId, String sp
281249
* Parses a Content or Part object into summary text and content parts.
282250
*
283251
* @param content the Content or Part object to parse
284-
* @param traceId the trace ID for GCS path
285-
* @param spanId the span ID for GCS path
286252
* @return a CompletableFuture of ParsedContentObject containing parts, summary, and truncation
287253
* flag
288254
*/
289-
private CompletableFuture<ParsedContentObject> parseContentObject(
290-
Object content, String traceId, String spanId) {
255+
private CompletableFuture<ParsedContentObject> parseContentObject(Object content) {
291256
List<Part> parts;
292257
if (content instanceof Content c) {
293258
parts = c.parts().orElse(ImmutableList.of());
@@ -300,7 +265,7 @@ private CompletableFuture<ParsedContentObject> parseContentObject(
300265

301266
List<CompletableFuture<TruncationResult>> partFutures = new ArrayList<>();
302267
for (int i = 0; i < parts.size(); i++) {
303-
partFutures.add(processPart(parts.get(i), i, traceId, spanId));
268+
partFutures.add(processPart(parts.get(i), i));
304269
}
305270

306271
return CompletableFuture.allOf(partFutures.toArray(new CompletableFuture<?>[0]))
@@ -330,8 +295,7 @@ private CompletableFuture<ParsedContentObject> parseContentObject(
330295
});
331296
}
332297

333-
private CompletableFuture<TruncationResult> processPart(
334-
Part part, int index, String traceId, String spanId) {
298+
private CompletableFuture<TruncationResult> processPart(Part part, int index) {
335299
ContentPart.Builder partBuilder =
336300
ContentPart.builder()
337301
.setPartIndex(index)
@@ -356,89 +320,17 @@ private CompletableFuture<TruncationResult> processPart(
356320
if (part.inlineData().isPresent()) {
357321
Blob blob = part.inlineData().get();
358322
String mimeType = blob.mimeType().orElse("application/octet-stream");
359-
if (logMultiModalContent && offloader != null) {
360-
String ext = DEFAULT_EXTENSION;
361-
try {
362-
ext = MIME_TYPES.forName(mimeType).getExtension();
363-
} catch (MimeTypeException e) {
364-
logger.log(Level.WARNING, "Failed to get extension for mime type " + mimeType, e);
365-
}
366-
String path =
367-
String.format(
368-
"%s/%s/%s_p%d_%s%s",
369-
getLocalDate(), traceId, spanId, index, UUID.randomUUID(), ext);
370-
return offloader
371-
.uploadContent(blob.data().orElse(new byte[0]), mimeType, path)
372-
.handle(
373-
(uri, ex) -> {
374-
if (ex != null) {
375-
logger.log(Level.WARNING, "Failed to offload content to GCS", ex);
376-
partBuilder.setText(UPLOAD_FAILED_MESSAGE);
377-
} else {
378-
ObjectNode details = mapper.createObjectNode();
379-
ObjectNode gcsMetadata = details.putObject("gcs_metadata");
380-
gcsMetadata.put("content_type", mimeType);
381-
382-
partBuilder
383-
.setStorageMode("GCS_REFERENCE")
384-
.setUri(uri)
385-
.setMimeType(mimeType)
386-
.setText(MEDIA_OFFLOADED_MESSAGE)
387-
.setObjectRef(
388-
mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
389-
}
390-
return TruncationResult.create(mapper.valueToTree(partBuilder.build()), false);
391-
});
392-
} else {
393-
partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
394-
return CompletableFuture.completedFuture(
395-
TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
396-
}
323+
partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
324+
return CompletableFuture.completedFuture(
325+
TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
397326
}
398327
// CASE C: Text
399328
if (part.text().isPresent()) {
400329
String text = part.text().get();
401-
int textLen = Utf8.encodedLength(text);
402-
int offloadThreshold = Math.min(INLINE_TEXT_LIMIT, maxLength);
403-
404-
if (offloader != null && textLen > offloadThreshold) {
405-
406-
String path =
407-
String.format(
408-
"%s/%s/%s_p%d_%s.txt", getLocalDate(), traceId, spanId, index, UUID.randomUUID());
409-
return offloader
410-
.uploadContent(text, "text/plain", path)
411-
.handle(
412-
(uri, ex) -> {
413-
if (ex != null) {
414-
logger.log(Level.WARNING, "Failed to offload text to GCS", ex);
415-
TruncationResult res = truncateWithStatus(text, maxLength);
416-
partBuilder.setText(res.node().asText());
417-
return TruncationResult.create(
418-
mapper.valueToTree(partBuilder.build()), res.isTruncated());
419-
} else {
420-
ObjectNode details = mapper.createObjectNode();
421-
ObjectNode gcsMetadata = details.putObject("gcs_metadata");
422-
gcsMetadata.put("content_type", "text/plain");
423-
424-
partBuilder
425-
.setStorageMode("GCS_REFERENCE")
426-
.setUri(uri)
427-
.setMimeType("text/plain")
428-
.setText(
429-
truncateAndAddSuffix(
430-
text, MAX_OFFLOADED_TEXT_LENGTH, TEXT_OFFLOADED_SUFFIX))
431-
.setObjectRef(
432-
mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
433-
return TruncationResult.create(mapper.valueToTree(partBuilder.build()), true);
434-
}
435-
});
436-
} else {
437-
TruncationResult res = truncateWithStatus(text, maxLength);
438-
partBuilder.setText(res.node().asText());
439-
return CompletableFuture.completedFuture(
440-
TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
441-
}
330+
TruncationResult res = truncateWithStatus(text, maxLength);
331+
partBuilder.setText(res.node().asText());
332+
return CompletableFuture.completedFuture(
333+
TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
442334
}
443335
if (part.functionCall().isPresent()) {
444336
FunctionCall fc = part.functionCall().get();
@@ -487,8 +379,4 @@ ArrayNode formatContentParts(Optional<Content> content) {
487379
}
488380
return partsArray;
489381
}
490-
491-
private LocalDate getLocalDate() {
492-
return Instant.now().atZone(ZoneOffset.UTC).toLocalDate();
493-
}
494382
}

0 commit comments

Comments
 (0)