Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,6 @@
<artifactId>arrow-memory-netty</artifactId>
<version>17.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.tika</groupId>
<artifactId>tika-core</artifactId>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ private Completable logEvent(
parseFuture =
state
.getParser()
.parse(
content,
traceIds.traceId(),
traceIds.spanId() != null ? traceIds.spanId() : "no_span")
.parse(content)
.thenAccept(
parsedContent -> {
row.put(
Expand Down

This file was deleted.

146 changes: 17 additions & 129 deletions core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static com.google.adk.plugins.agentanalytics.JsonFormatter.mapper;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.smartTruncate;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncate;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateAndAddSuffix;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateWithStatus;

import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -40,43 +39,16 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.tika.mime.MimeTypeException;
import org.apache.tika.mime.MimeTypes;
import org.jspecify.annotations.Nullable;
import org.threeten.bp.Instant;
import org.threeten.bp.LocalDate;
import org.threeten.bp.ZoneOffset;

/** Utility for parsing content for BigQuery logging. */
final class Parser {
private static final String DEFAULT_EXTENSION = ".bin";
private static final int MAX_OFFLOADED_TEXT_LENGTH = 200;
private static final Logger logger = Logger.getLogger(Parser.class.getName());
private static final int INLINE_TEXT_LIMIT = 32 * 1024; // 32KB limit
private static final String UPLOAD_FAILED_MESSAGE = "[UPLOAD FAILED]";
private static final String MEDIA_OFFLOADED_MESSAGE = "[MEDIA OFFLOADED]";
private static final String BINARY_DATA_MESSAGE = "[BINARY DATA]";
private static final String TEXT_OFFLOADED_SUFFIX = "... [OFFLOADED]";
private static final MimeTypes MIME_TYPES = MimeTypes.getDefaultMimeTypes();

private final @Nullable GcsOffloader offloader;
private final int maxLength;
private final @Nullable String connectionId;
private final boolean logMultiModalContent;

Parser(
@Nullable GcsOffloader offloader,
int maxLength,
@Nullable String connectionId,
boolean logMultiModalContent) {
this.offloader = offloader;

Parser(int maxLength) {
this.maxLength = maxLength;
this.connectionId = connectionId;
this.logMultiModalContent = logMultiModalContent;
}

@AutoValue
Expand Down Expand Up @@ -180,27 +152,23 @@ static ObjectRef create(
* Parses content into JSON payload and content parts, matching Python implementation.
*
* @param content the content to parse
* @param traceId the trace ID for GCS path
* @param spanId the span ID for GCS path
* @return a CompletableFuture of ParsedContent object
*/
CompletableFuture<ParsedContent> parse(Object content, String traceId, String spanId) {
CompletableFuture<ParsedContent> parse(Object content) {
if (content instanceof LlmRequest llmRequest) {
ObjectNode jsonPayload = mapper.createObjectNode();
ArrayNode messages = mapper.createArrayNode();
List<CompletableFuture<ParsedContentObject>> futures = new ArrayList<>();
List<Content> contents = llmRequest.contents();

for (Content c : contents) {
futures.add(parseContentObject(c, traceId, spanId));
futures.add(parseContentObject(c));
}

CompletableFuture<ParsedContentObject> systemFuture = null;
if (llmRequest.config().isPresent()
&& llmRequest.config().get().systemInstruction().isPresent()) {
systemFuture =
parseContentObject(
llmRequest.config().get().systemInstruction().get(), traceId, spanId);
systemFuture = parseContentObject(llmRequest.config().get().systemInstruction().get());
futures.add(systemFuture);
}
CompletableFuture<ParsedContentObject> finalSystemFuture = systemFuture;
Expand Down Expand Up @@ -234,7 +202,7 @@ CompletableFuture<ParsedContent> parse(Object content, String traceId, String sp
}
if (content instanceof LlmResponse llmResponse) {
ObjectNode jsonPayload = mapper.createObjectNode();
return parseContentObject(llmResponse.content().orElse(null), traceId, spanId)
return parseContentObject(llmResponse.content().orElse(null))
.thenApply(
parsed -> {
ObjectNode summaryNode = mapper.createObjectNode();
Expand All @@ -257,7 +225,7 @@ CompletableFuture<ParsedContent> parse(Object content, String traceId, String sp
});
}
if (content instanceof Content || content instanceof Part) {
return parseContentObject(content, traceId, spanId)
return parseContentObject(content)
.thenApply(
parsed -> {
ObjectNode summaryNode = mapper.createObjectNode();
Expand All @@ -281,13 +249,10 @@ CompletableFuture<ParsedContent> parse(Object content, String traceId, String sp
* Parses a Content or Part object into summary text and content parts.
*
* @param content the Content or Part object to parse
* @param traceId the trace ID for GCS path
* @param spanId the span ID for GCS path
* @return a CompletableFuture of ParsedContentObject containing parts, summary, and truncation
* flag
*/
private CompletableFuture<ParsedContentObject> parseContentObject(
Object content, String traceId, String spanId) {
private CompletableFuture<ParsedContentObject> parseContentObject(Object content) {
List<Part> parts;
if (content instanceof Content c) {
parts = c.parts().orElse(ImmutableList.of());
Expand All @@ -300,7 +265,7 @@ private CompletableFuture<ParsedContentObject> parseContentObject(

List<CompletableFuture<TruncationResult>> partFutures = new ArrayList<>();
for (int i = 0; i < parts.size(); i++) {
partFutures.add(processPart(parts.get(i), i, traceId, spanId));
partFutures.add(processPart(parts.get(i), i));
}

return CompletableFuture.allOf(partFutures.toArray(new CompletableFuture<?>[0]))
Expand Down Expand Up @@ -330,8 +295,7 @@ private CompletableFuture<ParsedContentObject> parseContentObject(
});
}

private CompletableFuture<TruncationResult> processPart(
Part part, int index, String traceId, String spanId) {
private CompletableFuture<TruncationResult> processPart(Part part, int index) {
ContentPart.Builder partBuilder =
ContentPart.builder()
.setPartIndex(index)
Expand All @@ -356,89 +320,17 @@ private CompletableFuture<TruncationResult> processPart(
if (part.inlineData().isPresent()) {
Blob blob = part.inlineData().get();
String mimeType = blob.mimeType().orElse("application/octet-stream");
if (logMultiModalContent && offloader != null) {
String ext = DEFAULT_EXTENSION;
try {
ext = MIME_TYPES.forName(mimeType).getExtension();
} catch (MimeTypeException e) {
logger.log(Level.WARNING, "Failed to get extension for mime type " + mimeType, e);
}
String path =
String.format(
"%s/%s/%s_p%d_%s%s",
getLocalDate(), traceId, spanId, index, UUID.randomUUID(), ext);
return offloader
.uploadContent(blob.data().orElse(new byte[0]), mimeType, path)
.handle(
(uri, ex) -> {
if (ex != null) {
logger.log(Level.WARNING, "Failed to offload content to GCS", ex);
partBuilder.setText(UPLOAD_FAILED_MESSAGE);
} else {
ObjectNode details = mapper.createObjectNode();
ObjectNode gcsMetadata = details.putObject("gcs_metadata");
gcsMetadata.put("content_type", mimeType);

partBuilder
.setStorageMode("GCS_REFERENCE")
.setUri(uri)
.setMimeType(mimeType)
.setText(MEDIA_OFFLOADED_MESSAGE)
.setObjectRef(
mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
}
return TruncationResult.create(mapper.valueToTree(partBuilder.build()), false);
});
} else {
partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
return CompletableFuture.completedFuture(
TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
}
partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
return CompletableFuture.completedFuture(
TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
}
// CASE C: Text
if (part.text().isPresent()) {
String text = part.text().get();
int textLen = Utf8.encodedLength(text);
int offloadThreshold = Math.min(INLINE_TEXT_LIMIT, maxLength);

if (offloader != null && textLen > offloadThreshold) {

String path =
String.format(
"%s/%s/%s_p%d_%s.txt", getLocalDate(), traceId, spanId, index, UUID.randomUUID());
return offloader
.uploadContent(text, "text/plain", path)
.handle(
(uri, ex) -> {
if (ex != null) {
logger.log(Level.WARNING, "Failed to offload text to GCS", ex);
TruncationResult res = truncateWithStatus(text, maxLength);
partBuilder.setText(res.node().asText());
return TruncationResult.create(
mapper.valueToTree(partBuilder.build()), res.isTruncated());
} else {
ObjectNode details = mapper.createObjectNode();
ObjectNode gcsMetadata = details.putObject("gcs_metadata");
gcsMetadata.put("content_type", "text/plain");

partBuilder
.setStorageMode("GCS_REFERENCE")
.setUri(uri)
.setMimeType("text/plain")
.setText(
truncateAndAddSuffix(
text, MAX_OFFLOADED_TEXT_LENGTH, TEXT_OFFLOADED_SUFFIX))
.setObjectRef(
mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
return TruncationResult.create(mapper.valueToTree(partBuilder.build()), true);
}
});
} else {
TruncationResult res = truncateWithStatus(text, maxLength);
partBuilder.setText(res.node().asText());
return CompletableFuture.completedFuture(
TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
}
TruncationResult res = truncateWithStatus(text, maxLength);
partBuilder.setText(res.node().asText());
return CompletableFuture.completedFuture(
TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
}
if (part.functionCall().isPresent()) {
FunctionCall fc = part.functionCall().get();
Expand Down Expand Up @@ -487,8 +379,4 @@ ArrayNode formatContentParts(Optional<Content> content) {
}
return partsArray;
}

private LocalDate getLocalDate() {
return Instant.now().atZone(ZoneOffset.UTC).toLocalDate();
}
}
Loading
Loading