Skip to content

Commit d837ef0

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Refactor BigQueryAgentAnalyticsPlugin for async in preparation for GCS offloading
This change updates the BigQueryAgentAnalyticsPlugin to handle content parsing and logging asynchronously using CompletableFutures. PiperOrigin-RevId: 913808143
1 parent 509c4aa commit d837ef0

9 files changed

Lines changed: 1487 additions & 758 deletions

File tree

core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java

Lines changed: 434 additions & 446 deletions
Large diffs are not rendered by default.

core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public abstract class BigQueryLoggerConfig {
5959
public abstract ImmutableList<String> clusteringFields();
6060

6161
// Whether to log multi-modal content.
62-
// TODO(b/491852782): Implement logging of multi-modal content.
6362
public abstract boolean logMultiModalContent();
6463

6564
// Retry configuration for BigQuery writes.
@@ -96,7 +95,7 @@ public abstract class BigQueryLoggerConfig {
9695
// GCS bucket name to store multi-modal content.
9796
public abstract String gcsBucketName();
9897

99-
// TODO(b/491852782): Implement connection id.
98+
// Optional BigQuery connection ID for ObjectRef columns
10099
public abstract Optional<String> connectionId();
101100

102101
// Toggle for session metadata (e.g. gchat thread-id).
@@ -118,8 +117,7 @@ public abstract class BigQueryLoggerConfig {
118117
// Default "v" produces views like ``v_llm_request``.
119118
public abstract String viewPrefix();
120119

121-
@Nullable
122-
public abstract Credentials credentials();
120+
public abstract @Nullable Credentials credentials();
123121

124122
public abstract Builder toBuilder();
125123

core/src/main/java/com/google/adk/plugins/agentanalytics/JsonFormatter.java

Lines changed: 47 additions & 266 deletions
Original file line numberDiff line numberDiff line change
@@ -16,29 +16,20 @@
1616

1717
package com.google.adk.plugins.agentanalytics;
1818

19-
import com.fasterxml.jackson.annotation.JsonProperty;
2019
import com.fasterxml.jackson.databind.JsonNode;
2120
import com.fasterxml.jackson.databind.ObjectMapper;
2221
import com.fasterxml.jackson.databind.node.ArrayNode;
2322
import com.fasterxml.jackson.databind.node.ObjectNode;
24-
import com.google.adk.models.LlmRequest;
2523
import com.google.auto.value.AutoValue;
26-
import com.google.common.collect.ImmutableList;
27-
import com.google.genai.types.Blob;
28-
import com.google.genai.types.Content;
29-
import com.google.genai.types.FileData;
30-
import com.google.genai.types.FunctionCall;
31-
import com.google.genai.types.Part;
32-
import java.util.ArrayList;
33-
import java.util.List;
24+
import com.google.common.base.Utf8;
3425
import java.util.Map;
35-
import java.util.Optional;
3626
import java.util.Set;
3727
import org.jspecify.annotations.Nullable;
3828

3929
/** Utility for parsing, formatting and truncating content for BigQuery logging. */
4030
final class JsonFormatter {
41-
private static final ObjectMapper mapper = new ObjectMapper().findAndRegisterModules();
31+
static final ObjectMapper mapper = new ObjectMapper().findAndRegisterModules();
32+
static final String TRUNCATION_SUFFIX = "...[truncated]";
4233

4334
@AutoValue
4435
abstract static class TruncationResult {
@@ -51,254 +42,6 @@ static TruncationResult create(JsonNode node, boolean isTruncated) {
5142
}
5243
}
5344

54-
@AutoValue
55-
abstract static class ParsedContent {
56-
abstract ImmutableList<JsonNode> parts();
57-
58-
abstract JsonNode content();
59-
60-
abstract boolean isTruncated();
61-
62-
static ParsedContent create(
63-
ImmutableList<JsonNode> parts, JsonNode content, boolean isTruncated) {
64-
return new AutoValue_JsonFormatter_ParsedContent(parts, content, isTruncated);
65-
}
66-
}
67-
68-
@AutoValue
69-
abstract static class ParsedContentObject {
70-
abstract ArrayNode parts();
71-
72-
abstract String summary();
73-
74-
abstract boolean isTruncated();
75-
76-
static ParsedContentObject create(ArrayNode parts, String summary, boolean isTruncated) {
77-
return new AutoValue_JsonFormatter_ParsedContentObject(parts, summary, isTruncated);
78-
}
79-
}
80-
81-
@AutoValue
82-
abstract static class ContentPart {
83-
@JsonProperty("part_index")
84-
abstract int partIndex();
85-
86-
@JsonProperty("mime_type")
87-
abstract @Nullable String mimeType();
88-
89-
@JsonProperty("uri")
90-
abstract @Nullable String uri();
91-
92-
@JsonProperty("text")
93-
abstract @Nullable String text();
94-
95-
@JsonProperty("part_attributes")
96-
abstract String partAttributes();
97-
98-
@JsonProperty("storage_mode")
99-
abstract String storageMode();
100-
101-
@JsonProperty("object_ref")
102-
abstract @Nullable String objectRef();
103-
104-
static Builder builder() {
105-
return new AutoValue_JsonFormatter_ContentPart.Builder();
106-
}
107-
108-
@AutoValue.Builder
109-
abstract static class Builder {
110-
abstract Builder setPartIndex(int value);
111-
112-
abstract Builder setMimeType(@Nullable String value);
113-
114-
abstract Builder setUri(@Nullable String value);
115-
116-
abstract Builder setText(@Nullable String value);
117-
118-
abstract Builder setPartAttributes(String value);
119-
120-
abstract Builder setStorageMode(String value);
121-
122-
abstract Builder setObjectRef(@Nullable String value);
123-
124-
abstract ContentPart build();
125-
}
126-
}
127-
128-
/**
129-
* Parses content into JSON payload and content parts, matching Python implementation.
130-
*
131-
* @param content the content to parse
132-
* @param maxLength the maximum length for text fields
133-
* @return a ParsedContent object
134-
*/
135-
static ParsedContent parse(Object content, int maxLength) {
136-
JsonNode contentNode = mapper.nullNode();
137-
ArrayNode contentParts = mapper.createArrayNode();
138-
boolean isTruncated = false;
139-
140-
if (content instanceof LlmRequest llmRequest) {
141-
ObjectNode jsonPayload = mapper.createObjectNode();
142-
// Handle prompt
143-
ArrayNode messages = mapper.createArrayNode();
144-
List<Content> contents = llmRequest.contents();
145-
for (Content c : contents) {
146-
String role = c.role().orElse("unknown");
147-
ParsedContentObject parsedContentObject = parseContentObject(c, maxLength);
148-
isTruncated = isTruncated || parsedContentObject.isTruncated();
149-
contentParts.addAll(parsedContentObject.parts());
150-
151-
ObjectNode message = mapper.createObjectNode();
152-
message.put("role", role);
153-
message.put("content", parsedContentObject.summary());
154-
messages.add(message);
155-
}
156-
if (!messages.isEmpty()) {
157-
jsonPayload.set("prompt", messages);
158-
}
159-
// Handle system instruction
160-
if (llmRequest.config().isPresent()
161-
&& llmRequest.config().get().systemInstruction().isPresent()) {
162-
Content systemInstruction = llmRequest.config().get().systemInstruction().get();
163-
ParsedContentObject parsedSystemInstruction =
164-
parseContentObject(systemInstruction, maxLength);
165-
isTruncated = isTruncated || parsedSystemInstruction.isTruncated();
166-
contentParts.addAll(parsedSystemInstruction.parts());
167-
jsonPayload.put("system_prompt", parsedSystemInstruction.summary());
168-
}
169-
contentNode = jsonPayload;
170-
} else if (content instanceof Content || content instanceof Part) {
171-
ParsedContentObject parsedContentObject = parseContentObject(content, maxLength);
172-
ObjectNode summaryNode = mapper.createObjectNode();
173-
summaryNode.put("text_summary", parsedContentObject.summary());
174-
return ParsedContent.create(
175-
ImmutableList.copyOf(parsedContentObject.parts()),
176-
summaryNode,
177-
parsedContentObject.isTruncated());
178-
} else if (content instanceof String s) {
179-
TruncationResult result = truncateWithStatus(s, maxLength);
180-
contentNode = result.node();
181-
isTruncated = result.isTruncated();
182-
} else {
183-
TruncationResult result = smartTruncate(content, maxLength);
184-
contentNode = result.node();
185-
isTruncated = result.isTruncated();
186-
}
187-
return ParsedContent.create(ImmutableList.copyOf(contentParts), contentNode, isTruncated);
188-
}
189-
190-
/**
191-
* Parses a Content or Part object into summary text and content parts.
192-
*
193-
* @param content the Content or Part object to parse
194-
* @param maxLength the maximum length of text fields before truncation
195-
* @return a ParsedContentObject containing parts, summary, and truncation flag
196-
*/
197-
private static ParsedContentObject parseContentObject(Object content, int maxLength) {
198-
ArrayNode contentParts = mapper.createArrayNode();
199-
boolean isTruncated = false;
200-
List<String> summaryText = new ArrayList<>();
201-
202-
List<Part> parts;
203-
if (content instanceof Content c) {
204-
parts = c.parts().orElse(ImmutableList.of());
205-
} else if (content instanceof Part p) {
206-
parts = ImmutableList.of(p);
207-
} else {
208-
return ParsedContentObject.create(contentParts, "", false);
209-
}
210-
211-
for (int i = 0; i < parts.size(); i++) {
212-
Part part = parts.get(i);
213-
ContentPart.Builder partBuilder =
214-
ContentPart.builder()
215-
.setPartIndex(i)
216-
.setMimeType("text/plain")
217-
.setUri(null)
218-
.setText(null)
219-
.setPartAttributes("{}")
220-
.setStorageMode("INLINE")
221-
.setObjectRef(null);
222-
223-
// CASE A: It is already a URI (e.g. from user input)
224-
if (part.fileData().isPresent()) {
225-
FileData fileData = part.fileData().get();
226-
partBuilder
227-
.setStorageMode("EXTERNAL_URI")
228-
.setUri(fileData.fileUri().orElse(null))
229-
.setMimeType(fileData.mimeType().orElse(null));
230-
}
231-
// CASE B: It is Binary/Inline Data (Image/Blob)
232-
else if (part.inlineData().isPresent()) {
233-
// TODO: (b/485571635) Implement GCS offloading here.
234-
partBuilder
235-
.setText("[BINARY DATA]")
236-
.setMimeType(part.inlineData().get().mimeType().orElse(""));
237-
}
238-
// CASE C: Text
239-
else if (part.text().isPresent()) {
240-
String text = part.text().get();
241-
// TODO: (b/485571635) Implement GCS offloading if text length exceeds maxLength.
242-
if (text.length() > maxLength) {
243-
text = truncate(text, maxLength);
244-
isTruncated = true;
245-
}
246-
partBuilder.setText(text);
247-
summaryText.add(text);
248-
} else if (part.functionCall().isPresent()) {
249-
FunctionCall fc = part.functionCall().get();
250-
ObjectNode partAttributes = mapper.createObjectNode();
251-
partAttributes.put("function_name", fc.name().orElse("unknown"));
252-
partBuilder
253-
.setMimeType("application/json")
254-
.setText("Function: " + fc.name().orElse("unknown"))
255-
.setPartAttributes(partAttributes.toString());
256-
}
257-
contentParts.add(mapper.valueToTree(partBuilder.build()));
258-
}
259-
260-
String summaryResult = String.join(" | ", summaryText);
261-
if (summaryResult.length() > maxLength) {
262-
summaryResult = truncate(summaryResult, maxLength);
263-
isTruncated = true;
264-
}
265-
266-
return ParsedContentObject.create(contentParts, summaryResult, isTruncated);
267-
}
268-
269-
/** Formats Content parts into an ArrayNode for BigQuery logging. */
270-
static ArrayNode formatContentParts(Optional<Content> content, int maxLength) {
271-
ArrayNode partsArray = mapper.createArrayNode();
272-
if (content.isEmpty() || content.get().parts() == null) {
273-
return partsArray;
274-
}
275-
276-
List<Part> parts = content.get().parts().orElse(ImmutableList.of());
277-
278-
for (int i = 0; i < parts.size(); i++) {
279-
Part part = parts.get(i);
280-
ObjectNode partObj = mapper.createObjectNode();
281-
partObj.put("part_index", i);
282-
partObj.put("storage_mode", "INLINE");
283-
284-
if (part.text().isPresent()) {
285-
partObj.put("mime_type", "text/plain");
286-
partObj.put("text", truncate(part.text().get(), maxLength));
287-
} else if (part.inlineData().isPresent()) {
288-
Blob blob = part.inlineData().get();
289-
partObj.put("mime_type", blob.mimeType().orElse(""));
290-
partObj.put("text", "[BINARY DATA]");
291-
} else if (part.fileData().isPresent()) {
292-
FileData fileData = part.fileData().get();
293-
partObj.put("mime_type", fileData.mimeType().orElse(""));
294-
partObj.put("uri", fileData.fileUri().orElse(""));
295-
partObj.put("storage_mode", "EXTERNAL_URI");
296-
}
297-
partsArray.add(partObj);
298-
}
299-
return partsArray;
300-
}
301-
30245
/** Recursively truncates long strings inside an object and returns a TruncationResult. */
30346
static TruncationResult smartTruncate(Object obj, int maxLength) {
30447
if (obj == null) {
@@ -328,7 +71,7 @@ private static TruncationResult recursiveSmartTruncate(JsonNode node, int maxLen
32871
boolean isTruncated = false;
32972
if (node.isTextual()) {
33073
String text = node.asText();
331-
if (text.length() > maxLength) {
74+
if (Utf8.encodedLength(text) > maxLength) {
33275
return TruncationResult.create(mapper.valueToTree(truncate(text, maxLength)), true);
33376
}
33477
return TruncationResult.create(node, false);
@@ -353,21 +96,59 @@ private static TruncationResult recursiveSmartTruncate(JsonNode node, int maxLen
35396
return TruncationResult.create(node, false);
35497
}
35598

356-
private static TruncationResult truncateWithStatus(String s, int maxLength) {
99+
static TruncationResult truncateWithStatus(String s, int maxLength) {
357100
if (s == null) {
358101
return TruncationResult.create(mapper.nullNode(), false);
359102
}
360-
if (s.length() <= maxLength) {
103+
if (Utf8.encodedLength(s) <= maxLength) {
361104
return TruncationResult.create(mapper.valueToTree(s), false);
362105
}
363106
return TruncationResult.create(mapper.valueToTree(truncate(s, maxLength)), true);
364107
}
365108

366-
private static String truncate(String s, int maxLength) {
367-
if (s == null || s.length() <= maxLength) {
109+
static @Nullable String truncate(String s, int budget) {
110+
return truncateAndAddSuffix(s, budget, TRUNCATION_SUFFIX);
111+
}
112+
113+
static @Nullable String truncateAndAddSuffix(String s, int budget, String suffix) {
114+
if (s == null) {
115+
return null;
116+
}
117+
if (Utf8.encodedLength(s) <= budget) {
368118
return s;
369119
}
370-
return s.substring(0, maxLength) + "...[truncated]";
120+
int suffixBytes = Utf8.encodedLength(suffix);
121+
int effectiveBudget = Math.max(0, budget - suffixBytes);
122+
// Fallback in case the budget is too small
123+
if (effectiveBudget == 0) {
124+
return suffix.substring(0, budget);
125+
}
126+
127+
int byteCount = 0;
128+
int charIndex = 0;
129+
for (int i = 0; i < s.length(); ) {
130+
int codePoint = s.codePointAt(i);
131+
int codePointLen = Character.charCount(codePoint);
132+
int codePointBytes;
133+
if (codePoint < 0x80) {
134+
codePointBytes = 1;
135+
} else if (codePoint < 0x800) {
136+
codePointBytes = 2;
137+
} else if (codePoint < 0x10000) {
138+
codePointBytes = 3;
139+
} else {
140+
codePointBytes = 4;
141+
}
142+
143+
if (byteCount + codePointBytes > effectiveBudget) {
144+
break;
145+
}
146+
byteCount += codePointBytes;
147+
charIndex += codePointLen;
148+
i += codePointLen;
149+
}
150+
151+
return s.substring(0, charIndex) + suffix;
371152
}
372153

373154
/** Converts a JsonNode to a standard Java object (Map, List, etc.). */

0 commit comments

Comments
 (0)