diff --git a/core/pom.xml b/core/pom.xml index 3ebeaebb2..f0bafe01b 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -217,10 +217,6 @@ arrow-memory-netty 17.0.0 - - org.apache.tika - tika-core - diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java index 566dbd5a4..59e09c8a7 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java @@ -253,10 +253,7 @@ private Completable logEvent( parseFuture = state .getParser() - .parse( - content, - traceIds.traceId(), - traceIds.spanId() != null ? traceIds.spanId() : "no_span") + .parse(content) .thenAccept( parsedContent -> { row.put( diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/GcsOffloader.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/GcsOffloader.java deleted file mode 100644 index 17993bb8e..000000000 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/GcsOffloader.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2026 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.adk.plugins.agentanalytics; - -import static java.nio.charset.StandardCharsets.UTF_8; - -import com.google.auth.Credentials; -import com.google.cloud.storage.BlobId; -import com.google.cloud.storage.BlobInfo; -import com.google.cloud.storage.Storage; -import com.google.cloud.storage.StorageOptions; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.RejectedExecutionException; -import org.jspecify.annotations.Nullable; - -/** Offloads content to GCS. */ -class GcsOffloader { - private final Storage storage; - private final String bucketName; - private final Executor executor; - private final boolean isStorageOverride; - - GcsOffloader( - String projectId, - String bucketName, - Executor executor, - @Nullable Credentials credentials, - @Nullable Storage storageOverride) { - if (storageOverride != null) { - this.isStorageOverride = true; - this.storage = storageOverride; - } else { - this.isStorageOverride = false; - StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(projectId); - if (credentials != null) { - builder.setCredentials(credentials); - } - this.storage = builder.build().getService(); - } - this.bucketName = bucketName; - this.executor = executor; - } - - /** Async wrapper around blocking GCS upload for binary data. */ - CompletableFuture uploadContent(byte[] data, String contentType, String path) { - try { - return CompletableFuture.supplyAsync( - () -> { - BlobId blobId = BlobId.of(bucketName, path); - BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType(contentType).build(); - storage.create(blobInfo, data); - return String.format("gs://%s/%s", bucketName, path); - }, - executor); - } catch (RejectedExecutionException e) { - return CompletableFuture.failedFuture(e); - } - } - - /** Async wrapper around blocking GCS upload for text data. */ - CompletableFuture uploadContent(String data, String contentType, String path) { - try { - return CompletableFuture.supplyAsync(() -> data.getBytes(UTF_8), executor) - .thenCompose(bytes -> uploadContent(bytes, contentType, path)); - } catch (RejectedExecutionException e) { - return CompletableFuture.failedFuture(e); - } - } - - String getBucketName() { - return bucketName; - } - - void close() throws Exception { - if (storage != null && !isStorageOverride) { - storage.close(); - } - } -} diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java index f4eff09f7..5db8be46c 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java @@ -19,7 +19,6 @@ import static com.google.adk.plugins.agentanalytics.JsonFormatter.mapper; import static com.google.adk.plugins.agentanalytics.JsonFormatter.smartTruncate; import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncate; -import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateAndAddSuffix; import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateWithStatus; import com.fasterxml.jackson.annotation.JsonProperty; @@ -40,43 +39,16 @@ import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.logging.Level; -import java.util.logging.Logger; -import org.apache.tika.mime.MimeTypeException; -import org.apache.tika.mime.MimeTypes; import org.jspecify.annotations.Nullable; -import org.threeten.bp.Instant; -import org.threeten.bp.LocalDate; -import org.threeten.bp.ZoneOffset; /** Utility for parsing content for BigQuery logging. */ final class Parser { - private static final String DEFAULT_EXTENSION = ".bin"; - private static final int MAX_OFFLOADED_TEXT_LENGTH = 200; - private static final Logger logger = Logger.getLogger(Parser.class.getName()); - private static final int INLINE_TEXT_LIMIT = 32 * 1024; // 32KB limit - private static final String UPLOAD_FAILED_MESSAGE = "[UPLOAD FAILED]"; - private static final String MEDIA_OFFLOADED_MESSAGE = "[MEDIA OFFLOADED]"; private static final String BINARY_DATA_MESSAGE = "[BINARY DATA]"; - private static final String TEXT_OFFLOADED_SUFFIX = "... [OFFLOADED]"; - private static final MimeTypes MIME_TYPES = MimeTypes.getDefaultMimeTypes(); - - private final @Nullable GcsOffloader offloader; private final int maxLength; - private final @Nullable String connectionId; - private final boolean logMultiModalContent; - - Parser( - @Nullable GcsOffloader offloader, - int maxLength, - @Nullable String connectionId, - boolean logMultiModalContent) { - this.offloader = offloader; + + Parser(int maxLength) { this.maxLength = maxLength; - this.connectionId = connectionId; - this.logMultiModalContent = logMultiModalContent; } @AutoValue @@ -180,11 +152,9 @@ static ObjectRef create( * Parses content into JSON payload and content parts, matching Python implementation. * * @param content the content to parse - * @param traceId the trace ID for GCS path - * @param spanId the span ID for GCS path * @return a CompletableFuture of ParsedContent object */ - CompletableFuture parse(Object content, String traceId, String spanId) { + CompletableFuture parse(Object content) { if (content instanceof LlmRequest llmRequest) { ObjectNode jsonPayload = mapper.createObjectNode(); ArrayNode messages = mapper.createArrayNode(); @@ -192,15 +162,13 @@ CompletableFuture parse(Object content, String traceId, String sp List contents = llmRequest.contents(); for (Content c : contents) { - futures.add(parseContentObject(c, traceId, spanId)); + futures.add(parseContentObject(c)); } CompletableFuture systemFuture = null; if (llmRequest.config().isPresent() && llmRequest.config().get().systemInstruction().isPresent()) { - systemFuture = - parseContentObject( - llmRequest.config().get().systemInstruction().get(), traceId, spanId); + systemFuture = parseContentObject(llmRequest.config().get().systemInstruction().get()); futures.add(systemFuture); } CompletableFuture finalSystemFuture = systemFuture; @@ -234,7 +202,7 @@ CompletableFuture parse(Object content, String traceId, String sp } if (content instanceof LlmResponse llmResponse) { ObjectNode jsonPayload = mapper.createObjectNode(); - return parseContentObject(llmResponse.content().orElse(null), traceId, spanId) + return parseContentObject(llmResponse.content().orElse(null)) .thenApply( parsed -> { ObjectNode summaryNode = mapper.createObjectNode(); @@ -257,7 +225,7 @@ CompletableFuture parse(Object content, String traceId, String sp }); } if (content instanceof Content || content instanceof Part) { - return parseContentObject(content, traceId, spanId) + return parseContentObject(content) .thenApply( parsed -> { ObjectNode summaryNode = mapper.createObjectNode(); @@ -281,13 +249,10 @@ CompletableFuture parse(Object content, String traceId, String sp * Parses a Content or Part object into summary text and content parts. * * @param content the Content or Part object to parse - * @param traceId the trace ID for GCS path - * @param spanId the span ID for GCS path * @return a CompletableFuture of ParsedContentObject containing parts, summary, and truncation * flag */ - private CompletableFuture parseContentObject( - Object content, String traceId, String spanId) { + private CompletableFuture parseContentObject(Object content) { List parts; if (content instanceof Content c) { parts = c.parts().orElse(ImmutableList.of()); @@ -300,7 +265,7 @@ private CompletableFuture parseContentObject( List> partFutures = new ArrayList<>(); for (int i = 0; i < parts.size(); i++) { - partFutures.add(processPart(parts.get(i), i, traceId, spanId)); + partFutures.add(processPart(parts.get(i), i)); } return CompletableFuture.allOf(partFutures.toArray(new CompletableFuture[0])) @@ -330,8 +295,7 @@ private CompletableFuture parseContentObject( }); } - private CompletableFuture processPart( - Part part, int index, String traceId, String spanId) { + private CompletableFuture processPart(Part part, int index) { ContentPart.Builder partBuilder = ContentPart.builder() .setPartIndex(index) @@ -356,89 +320,17 @@ private CompletableFuture processPart( if (part.inlineData().isPresent()) { Blob blob = part.inlineData().get(); String mimeType = blob.mimeType().orElse("application/octet-stream"); - if (logMultiModalContent && offloader != null) { - String ext = DEFAULT_EXTENSION; - try { - ext = MIME_TYPES.forName(mimeType).getExtension(); - } catch (MimeTypeException e) { - logger.log(Level.WARNING, "Failed to get extension for mime type " + mimeType, e); - } - String path = - String.format( - "%s/%s/%s_p%d_%s%s", - getLocalDate(), traceId, spanId, index, UUID.randomUUID(), ext); - return offloader - .uploadContent(blob.data().orElse(new byte[0]), mimeType, path) - .handle( - (uri, ex) -> { - if (ex != null) { - logger.log(Level.WARNING, "Failed to offload content to GCS", ex); - partBuilder.setText(UPLOAD_FAILED_MESSAGE); - } else { - ObjectNode details = mapper.createObjectNode(); - ObjectNode gcsMetadata = details.putObject("gcs_metadata"); - gcsMetadata.put("content_type", mimeType); - - partBuilder - .setStorageMode("GCS_REFERENCE") - .setUri(uri) - .setMimeType(mimeType) - .setText(MEDIA_OFFLOADED_MESSAGE) - .setObjectRef( - mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details))); - } - return TruncationResult.create(mapper.valueToTree(partBuilder.build()), false); - }); - } else { - partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType); - return CompletableFuture.completedFuture( - TruncationResult.create(mapper.valueToTree(partBuilder.build()), false)); - } + partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType); + return CompletableFuture.completedFuture( + TruncationResult.create(mapper.valueToTree(partBuilder.build()), false)); } // CASE C: Text if (part.text().isPresent()) { String text = part.text().get(); - int textLen = Utf8.encodedLength(text); - int offloadThreshold = Math.min(INLINE_TEXT_LIMIT, maxLength); - - if (offloader != null && textLen > offloadThreshold) { - - String path = - String.format( - "%s/%s/%s_p%d_%s.txt", getLocalDate(), traceId, spanId, index, UUID.randomUUID()); - return offloader - .uploadContent(text, "text/plain", path) - .handle( - (uri, ex) -> { - if (ex != null) { - logger.log(Level.WARNING, "Failed to offload text to GCS", ex); - TruncationResult res = truncateWithStatus(text, maxLength); - partBuilder.setText(res.node().asText()); - return TruncationResult.create( - mapper.valueToTree(partBuilder.build()), res.isTruncated()); - } else { - ObjectNode details = mapper.createObjectNode(); - ObjectNode gcsMetadata = details.putObject("gcs_metadata"); - gcsMetadata.put("content_type", "text/plain"); - - partBuilder - .setStorageMode("GCS_REFERENCE") - .setUri(uri) - .setMimeType("text/plain") - .setText( - truncateAndAddSuffix( - text, MAX_OFFLOADED_TEXT_LENGTH, TEXT_OFFLOADED_SUFFIX)) - .setObjectRef( - mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details))); - return TruncationResult.create(mapper.valueToTree(partBuilder.build()), true); - } - }); - } else { - TruncationResult res = truncateWithStatus(text, maxLength); - partBuilder.setText(res.node().asText()); - return CompletableFuture.completedFuture( - TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated())); - } + TruncationResult res = truncateWithStatus(text, maxLength); + partBuilder.setText(res.node().asText()); + return CompletableFuture.completedFuture( + TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated())); } if (part.functionCall().isPresent()) { FunctionCall fc = part.functionCall().get(); @@ -487,8 +379,4 @@ ArrayNode formatContentParts(Optional content) { } return partsArray; } - - private LocalDate getLocalDate() { - return Instant.now().atZone(ZoneOffset.UTC).toLocalDate(); - } } diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java index d1826ec5e..0654fab5d 100644 --- a/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java +++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java @@ -3,7 +3,6 @@ import static com.google.adk.plugins.agentanalytics.BigQueryUtils.getVersionHeaderValue; import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.retrying.RetrySettings; @@ -22,35 +21,21 @@ import java.util.Collection; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; -import org.jspecify.annotations.Nullable; import org.threeten.bp.Duration; -import org.threeten.bp.Instant; /** Manages state for the BigQueryAgentAnalyticsPlugin. */ class PluginState { private static final Logger logger = Logger.getLogger(PluginState.class.getName()); - private static final int GCS_OFFLOAD_CORE_POOL_SIZE = 2; - private static final int GCS_OFFLOAD_MAX_THREADS = 10; - // Max number of tasks in the queue before we start rejecting tasks and executing them in the - // caller thread. - private static final int GCS_OFFLOAD_QUEUE_SIZE = 100; - // Idle time before threads are terminated. - private static final int GCS_OFFLOAD_IDLE_TIME_SECONDS = 30; - private final BigQueryLoggerConfig config; private final ScheduledExecutorService executor; - private final ExecutorService offloadExecutor; private final BigQueryWriteClient writeClient; private static final AtomicLong threadCounter = new AtomicLong(0); // Map of invocation ID to BatchProcessor. @@ -60,7 +45,6 @@ class PluginState { private final ConcurrentHashMap traceManagers = new ConcurrentHashMap<>(); // Cache of invocation ID to Boolean indicating invocation ID has been processed. private final Cache processedInvocations; - private final GcsOffloader offloader; private final Parser parser; private final ConcurrentHashMap>> pendingTasks = new ConcurrentHashMap<>(); @@ -70,7 +54,6 @@ class PluginState { this.executor = Executors.newScheduledThreadPool( 2, r -> new Thread(r, "bq-analytics-plugin-" + threadCounter.getAndIncrement())); - this.offloadExecutor = createGcsOffloadThreadPool(); // One write client per plugin instance, shared by all invocations. this.writeClient = createWriteClient(config); this.processedInvocations = @@ -78,25 +61,7 @@ class PluginState { .maximumSize(10000) .expireAfterWrite(java.time.Duration.ofMinutes(10)) .build(); - this.offloader = getGcsOffloader(config); - this.parser = - new Parser( - offloader, - config.maxContentLength(), - config.connectionId().orElse(null), - config.logMultiModalContent()); - } - - private static ExecutorService createGcsOffloadThreadPool() { - return new ThreadPoolExecutor( - GCS_OFFLOAD_CORE_POOL_SIZE, // The lower limit of threads. - GCS_OFFLOAD_MAX_THREADS, // The upper limit of threads. - GCS_OFFLOAD_IDLE_TIME_SECONDS, // Time to keep idle threads alive. - SECONDS, - new ArrayBlockingQueue<>(GCS_OFFLOAD_QUEUE_SIZE), // workQueue: Hand off tasks directly. - r -> new Thread(r, "bq-analytics-plugin-offload-" + threadCounter.getAndIncrement()), - // Reject tasks if the queue is full. - new ThreadPoolExecutor.AbortPolicy()); + this.parser = new Parser(config.maxContentLength()); } ScheduledExecutorService getExecutor() { @@ -177,14 +142,6 @@ BatchProcessor getBatchProcessor(String invocationId) { }); } - protected @Nullable GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) { - if (config.gcsBucketName().isEmpty()) { - return null; - } - return new GcsOffloader( - config.projectId(), config.gcsBucketName(), offloadExecutor, config.credentials(), null); - } - Parser getParser() { return parser; } @@ -217,8 +174,7 @@ void clearBatchProcessors() { batchProcessors.clear(); } - @VisibleForTesting - protected Set> getPendingTasksForInvocation(String invocationId) { + private Set> getPendingTasksForInvocation(String invocationId) { return pendingTasks.computeIfAbsent(invocationId, k -> ConcurrentHashMap.newKeySet()); } @@ -307,34 +263,13 @@ Completable close() { } try { executor.shutdown(); - offloadExecutor.shutdown(); - long totalTimeoutMillis = config.shutdownTimeout().toMillis(); - Instant startTime = Instant.now(); - if (!executor.awaitTermination(totalTimeoutMillis, MILLISECONDS)) { + if (!executor.awaitTermination(config.shutdownTimeout().toMillis(), MILLISECONDS)) { executor.shutdownNow(); } - long elapsedTimeMillis = Duration.between(startTime, Instant.now()).toMillis(); - long remainingMillis = totalTimeoutMillis - elapsedTimeMillis; - if (remainingMillis > 0) { - if (!offloadExecutor.awaitTermination(remainingMillis, MILLISECONDS)) { - offloadExecutor.shutdownNow(); - } - } else { - offloadExecutor.shutdownNow(); - } } catch (InterruptedException e) { executor.shutdownNow(); - offloadExecutor.shutdownNow(); Thread.currentThread().interrupt(); } - - try { - if (offloader != null) { - offloader.close(); - } - } catch (Exception e) { - logger.log(Level.WARNING, "Failed to close GCS offloader", e); - } }); } } diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java index 6d65e2f15..836442cad 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java @@ -17,7 +17,6 @@ package com.google.adk.plugins.agentanalytics; import static com.google.common.collect.ImmutableList.toImmutableList; -import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -25,13 +24,11 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.adk.agents.BaseAgent; @@ -59,8 +56,6 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.StreamWriter; -import com.google.cloud.storage.BlobInfo; -import com.google.cloud.storage.Storage; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.genai.types.Candidate; @@ -807,7 +802,6 @@ public void logEvent_handlesExceptionFromFormatter() throws Exception { (content, eventType) -> { throw new RuntimeException("Formatter error"); }; - BigQueryLoggerConfig formattedConfig = config.toBuilder().contentFormatter(formatter).build(); PluginState formattedState = new PluginState(formattedConfig) { @@ -1048,133 +1042,6 @@ public void logEvent_createsUniqueProcessorPerInvocation() throws Exception { testExecutor.shutdown(); } - @Test - public void logEvent_offloadsToGcs_whenLargeContent() throws Exception { - GcsOffloader mockOffloader = mock(GcsOffloader.class); - when(mockOffloader.uploadContent(anyString(), anyString(), anyString())) - .thenReturn(CompletableFuture.completedFuture("gs://test-bucket/large.txt")); - - BigQueryLoggerConfig gcsConfig = config.toBuilder().gcsBucketName("test-bucket").build(); - PluginState gcsState = - new PluginState(gcsConfig) { - @Override - protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) { - return mockWriteClient; - } - - @Override - protected StreamWriter createWriter() { - return mockWriter; - } - - @Override - protected GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) { - return mockOffloader; - } - }; - BigQueryAgentAnalyticsPlugin gcsPlugin = - new BigQueryAgentAnalyticsPlugin(gcsConfig, mockBigQuery, gcsState); - - // Large text (> 32KB default threshold) - String largeText = "a".repeat(40000); - Content content = Content.fromParts(Part.fromText(largeText)); - gcsPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe(); - - verify(mockOffloader, atLeastOnce()).uploadContent(anyString(), anyString(), anyString()); - - Map row = gcsState.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(row); - @SuppressWarnings("unchecked") // Test only - List contentParts = (List) row.get("content_parts"); - assertEquals("GCS_REFERENCE", contentParts.get(0).get("storage_mode").asText()); - assertEquals("gs://test-bucket/large.txt", contentParts.get(0).get("uri").asText()); - } - - @Test - public void logEvent_offloadsToGcs_whenMultimodalContent() throws Exception { - GcsOffloader mockOffloader = mock(GcsOffloader.class); - when(mockOffloader.uploadContent(any(byte[].class), anyString(), anyString())) - .thenReturn(CompletableFuture.completedFuture("gs://test-bucket/image.png")); - - BigQueryLoggerConfig gcsConfig = config.toBuilder().gcsBucketName("test-bucket").build(); - PluginState gcsState = - new PluginState(gcsConfig) { - @Override - protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) { - return mockWriteClient; - } - - @Override - protected StreamWriter createWriter() { - return mockWriter; - } - - @Override - protected GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) { - return mockOffloader; - } - }; - BigQueryAgentAnalyticsPlugin gcsPlugin = - new BigQueryAgentAnalyticsPlugin(gcsConfig, mockBigQuery, gcsState); - - Content content = Content.fromParts(Part.fromBytes("test-data".getBytes(UTF_8), "image/png")); - gcsPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe(); - - verify(mockOffloader, atLeastOnce()).uploadContent(any(byte[].class), anyString(), anyString()); - - Map row = gcsState.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(row); - @SuppressWarnings("unchecked") // Test only - List contentParts = (List) row.get("content_parts"); - assertEquals("GCS_REFERENCE", contentParts.get(0).get("storage_mode").asText()); - assertEquals("gs://test-bucket/image.png", contentParts.get(0).get("uri").asText()); - } - - @Test - public void logEvent_integrationWithRealGcsOffloader_whenLargeContent() throws Exception { - Storage mockStorage = mock(Storage.class); - - BigQueryLoggerConfig gcsConfig = config.toBuilder().gcsBucketName("test-bucket").build(); - PluginState gcsState = - new PluginState(gcsConfig) { - @Override - protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) { - return mockWriteClient; - } - - @Override - protected StreamWriter createWriter() { - return mockWriter; - } - - @Override - protected GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) { - return new GcsOffloader( - config.projectId(), - config.gcsBucketName(), - Runnable::run, // Use direct executor for synchronous execution - config.credentials(), - mockStorage); - } - }; - BigQueryAgentAnalyticsPlugin gcsPlugin = - new BigQueryAgentAnalyticsPlugin(gcsConfig, mockBigQuery, gcsState); - - // Large text (> 32KB default threshold) - String largeText = "a".repeat(40000); - Content content = Content.fromParts(Part.fromText(largeText)); - gcsPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe(); - - verify(mockStorage, atLeastOnce()).create(any(BlobInfo.class), any(byte[].class)); - - Map row = gcsState.getBatchProcessor("invocation_id").queue.poll(); - assertNotNull(row); - @SuppressWarnings("unchecked") // Test only - List contentParts = (List) row.get("content_parts"); - assertEquals("GCS_REFERENCE", contentParts.get(0).get("storage_mode").asText()); - assertTrue(contentParts.get(0).get("uri").asText().startsWith("gs://test-bucket/")); - } - private static class FakeAgent extends BaseAgent { FakeAgent(String name) { super(name, "description", null, null, null); diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/GcsOffloaderTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/GcsOffloaderTest.java deleted file mode 100644 index 7b8de3813..000000000 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/GcsOffloaderTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright 2026 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.adk.plugins.agentanalytics; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; - -import com.google.cloud.storage.BlobId; -import com.google.cloud.storage.BlobInfo; -import com.google.cloud.storage.Storage; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.ArgumentCaptor; - -@RunWith(JUnit4.class) -public final class GcsOffloaderTest { - private static final String PROJECT_ID = "test-project"; - private static final String BUCKET_NAME = "test-bucket"; - private static final String PATH = "test-path/file.txt"; - private static final String CONTENT_TYPE = "text/plain"; - - private Storage mockStorage; - private ExecutorService executor; - private GcsOffloader gcsOffloader; - - @Before - public void setUp() { - mockStorage = mock(Storage.class); - executor = Executors.newSingleThreadExecutor(); - gcsOffloader = new GcsOffloader(PROJECT_ID, BUCKET_NAME, executor, null, mockStorage); - } - - @After - public void tearDown() { - executor.shutdown(); - } - - @Test - public void uploadContent_bytes_succeeds() throws Exception { - byte[] data = "hello world".getBytes(UTF_8); - CompletableFuture future = gcsOffloader.uploadContent(data, CONTENT_TYPE, PATH); - - String result = future.get(); - - assertEquals("gs://" + BUCKET_NAME + "/" + PATH, result); - - ArgumentCaptor blobInfoCaptor = ArgumentCaptor.forClass(BlobInfo.class); - verify(mockStorage).create(blobInfoCaptor.capture(), any(byte[].class)); - - BlobInfo blobInfo = blobInfoCaptor.getValue(); - assertEquals(BlobId.of(BUCKET_NAME, PATH), blobInfo.getBlobId()); - assertEquals(CONTENT_TYPE, blobInfo.getContentType()); - } - - @Test - public void uploadContent_string_succeeds() throws Exception { - String data = "hello world string"; - CompletableFuture future = gcsOffloader.uploadContent(data, CONTENT_TYPE, PATH); - - String result = future.get(); - - assertEquals("gs://" + BUCKET_NAME + "/" + PATH, result); - - ArgumentCaptor blobInfoCaptor = ArgumentCaptor.forClass(BlobInfo.class); - verify(mockStorage).create(blobInfoCaptor.capture(), any(byte[].class)); - - BlobInfo blobInfo = blobInfoCaptor.getValue(); - assertEquals(BlobId.of(BUCKET_NAME, PATH), blobInfo.getBlobId()); - assertEquals(CONTENT_TYPE, blobInfo.getContentType()); - } - - @Test - public void uploadContent_executorRejected_returnsFailedFuture() { - Executor rejectingExecutor = - r -> { - throw new RejectedExecutionException("Rejected"); - }; - GcsOffloader offloaderWithRejectingExecutor = - new GcsOffloader(PROJECT_ID, BUCKET_NAME, rejectingExecutor, null, mockStorage); - - CompletableFuture future = - offloaderWithRejectingExecutor.uploadContent("data".getBytes(UTF_8), CONTENT_TYPE, PATH); - - assertTrue(future.isCompletedExceptionally()); - assertThrows(ExecutionException.class, future::get); - } - - @Test - public void close_doesNotCloseStorageOverride() throws Exception { - gcsOffloader.close(); - verify(mockStorage, never()).close(); - } -} diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java index 663f5e5cd..4883438b6 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java @@ -16,22 +16,16 @@ package com.google.adk.plugins.agentanalytics; -import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.adk.models.LlmRequest; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.genai.types.Blob; import com.google.genai.types.Content; import com.google.genai.types.FileData; import com.google.genai.types.FunctionCall; @@ -39,7 +33,6 @@ import com.google.genai.types.Part; import java.util.Arrays; import java.util.List; -import java.util.concurrent.CompletableFuture; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -56,8 +49,7 @@ public void parse_llmRequest_populatesPrompt() throws Exception { Content.fromParts(Part.fromText("hello")).toBuilder().role("user").build())) .build(); - Parser.ParsedContent result = - new Parser(null, 100, null, true).parse(request, "trace", "span").get(); + Parser.ParsedContent result = new Parser(100).parse(request).get(); assertTrue(result.content().has("prompt")); ArrayNode prompt = (ArrayNode) result.content().get("prompt"); @@ -77,8 +69,7 @@ public void parse_llmRequest_populatesSystemPrompt() throws Exception { .build()) .build(); - Parser.ParsedContent result = - new Parser(null, 100, null, true).parse(request, "trace", "span").get(); + Parser.ParsedContent result = new Parser(100).parse(request).get(); assertTrue(result.content().has("system_prompt")); assertEquals("be helpful", result.content().get("system_prompt").asText()); @@ -88,8 +79,7 @@ public void parse_llmRequest_populatesSystemPrompt() throws Exception { @Test public void parse_string_truncates() throws Exception { String longString = "this is a very long string that should be truncated"; - Parser.ParsedContent result = - new Parser(null, 24, null, true).parse(longString, "trace", "span").get(); + Parser.ParsedContent result = new Parser(24).parse(longString).get(); assertTrue(result.isTruncated()); assertEquals("this is a ...[truncated]", result.content().asText()); @@ -99,8 +89,7 @@ public void parse_string_truncates() throws Exception { public void parse_map_truncatesNested() throws Exception { ImmutableMap map = ImmutableMap.of("key", "this is a very long value that should definitely be truncated"); - Parser.ParsedContent result = - new Parser(null, 24, null, true).parse(map, "trace", "span").get(); + Parser.ParsedContent result = new Parser(24).parse(map).get(); assertTrue(result.isTruncated()); assertEquals("this is a ...[truncated]", result.content().get("key").asText()); @@ -109,8 +98,7 @@ public void parse_map_truncatesNested() throws Exception { @Test public void parse_content_returnsSummary() throws Exception { Content content = Content.fromParts(Part.fromText("part 1"), Part.fromText("part 2")); - Parser.ParsedContent result = - new Parser(null, 100, null, true).parse(content, "trace", "span").get(); + Parser.ParsedContent result = new Parser(100).parse(content).get(); assertEquals("part 1 | part 2", result.content().get("text_summary").asText()); assertEquals(2, result.parts().size()); @@ -121,8 +109,7 @@ public void parse_content_withFileData() throws Exception { FileData fileData = FileData.builder().fileUri("gs://bucket/file.txt").mimeType("text/plain").build(); Content content = Content.fromParts(Part.builder().fileData(fileData).build()); - Parser.ParsedContent result = - new Parser(null, 100, null, true).parse(content, "trace", "span").get(); + Parser.ParsedContent result = new Parser(100).parse(content).get(); assertEquals(1, result.parts().size()); JsonNode partData = result.parts().get(0); @@ -135,8 +122,7 @@ public void parse_content_withFileData() throws Exception { public void parse_content_withFunctionCall() throws Exception { FunctionCall fc = FunctionCall.builder().name("myFunction").build(); Content content = Content.fromParts(Part.builder().functionCall(fc).build()); - Parser.ParsedContent result = - new Parser(null, 100, null, true).parse(content, "trace", "span").get(); + Parser.ParsedContent result = new Parser(100).parse(content).get(); assertEquals(1, result.parts().size()); JsonNode partData = result.parts().get(0); @@ -149,8 +135,7 @@ public void parse_content_withFunctionCall() throws Exception { public void parse_list_truncatesElements() throws Exception { List list = Arrays.asList("short", "this is a very long string that should be truncated"); - Parser.ParsedContent result = - new Parser(null, 24, null, true).parse(list, "trace", "span").get(); + Parser.ParsedContent result = new Parser(24).parse(list).get(); assertTrue(result.isTruncated()); JsonNode arrayNode = result.content(); @@ -160,44 +145,6 @@ public void parse_list_truncatesElements() throws Exception { assertEquals("this is a ...[truncated]", arrayNode.get(1).asText()); } - @Test - public void parse_withOffloader_offloadsLargeText() throws Exception { - GcsOffloader offloader = mock(GcsOffloader.class); - when(offloader.uploadContent(anyString(), anyString(), anyString())) - .thenReturn(CompletableFuture.completedFuture("gs://mock-bucket/path")); - - Content content = - Content.fromParts(Part.fromText("this text is longer than 10 characters".repeat(100))); - Parser.ParsedContent result = - new Parser(offloader, 10, "conn", true).parse(content, "trace", "span").get(); - - assertEquals(1, result.parts().size()); - JsonNode partData = result.parts().get(0); - assertEquals("GCS_REFERENCE", partData.get("storage_mode").asText()); - assertEquals("gs://mock-bucket/path", partData.get("uri").asText()); - assertTrue(partData.get("text").asText().contains("[OFFLOADED]")); - assertEquals("conn", partData.get("object_ref").get("authorizer").asText()); - } - - @Test - public void parse_withOffloader_offloadsBinaryData() throws Exception { - GcsOffloader offloader = mock(GcsOffloader.class); - when(offloader.uploadContent(any(byte[].class), anyString(), anyString())) - .thenReturn(CompletableFuture.completedFuture("gs://mock-bucket/image.png")); - - Blob blob = Blob.builder().data("fake-image".getBytes(UTF_8)).mimeType("image/png").build(); - Content content = Content.fromParts(Part.builder().inlineData(blob).build()); - Parser.ParsedContent result = - new Parser(offloader, 100, "conn", true).parse(content, "trace", "span").get(); - - assertEquals(1, result.parts().size()); - JsonNode partData = result.parts().get(0); - assertEquals("GCS_REFERENCE", partData.get("storage_mode").asText()); - assertEquals("gs://mock-bucket/image.png", partData.get("uri").asText()); - assertEquals("image/png", partData.get("mime_type").asText()); - assertEquals("[MEDIA OFFLOADED]", partData.get("text").asText()); - } - @Test public void truncate_variousInputs() { assertNull(JsonFormatter.truncate(null, 10)); @@ -241,8 +188,7 @@ public void parse_multibyteString_truncatesBasedOnBytes() throws Exception { // "こんにちはこんにちは" is 30 bytes, but 10 characters. String nihongo = "こんにちはこんにちは"; // With budget 20, effective budget is 6, so only 2 characters (6 bytes) should be kept. - Parser.ParsedContent result = - new Parser(null, 20, null, true).parse(nihongo, "trace", "span").get(); + Parser.ParsedContent result = new Parser(20).parse(nihongo).get(); assertTrue(result.isTruncated()); assertEquals("こん...[truncated]", result.content().asText()); @@ -251,8 +197,7 @@ public void parse_multibyteString_truncatesBasedOnBytes() throws Exception { @Test public void parse_multibyteContent_truncatesBasedOnBytes() throws Exception { Content content = Content.fromParts(Part.fromText("こんにちはこんにちは")); - Parser.ParsedContent result = - new Parser(null, 20, null, true).parse(content, "trace", "span").get(); + Parser.ParsedContent result = new Parser(20).parse(content).get(); assertTrue(result.isTruncated()); assertEquals("こん...[truncated]", result.content().get("text_summary").asText()); diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/ParserTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/ParserTest.java index 385e81082..9bae03331 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/ParserTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/ParserTest.java @@ -38,13 +38,13 @@ public final class ParserTest { @Before public void setUp() { - parser = new Parser(null, 100, "connectionId", true); + parser = new Parser(100); } @Test public void parse_part_coversLine280() throws Exception { Part part = Part.fromText("test part"); - CompletableFuture future = parser.parse(part, "traceId", "spanId"); + CompletableFuture future = parser.parse(part); Parser.ParsedContent result = future.get(); assertEquals("{\"text_summary\":\"test part\"}", result.content().toString()); @@ -56,7 +56,7 @@ public void parse_part_coversLine280() throws Exception { public void parse_part_withInlineData_coversProcessPart() throws Exception { Blob blob = Blob.builder().mimeType("image/png").data(new byte[] {1, 2, 3}).build(); Part part = Part.builder().inlineData(blob).build(); - CompletableFuture future = parser.parse(part, "traceId", "spanId"); + CompletableFuture future = parser.parse(part); Parser.ParsedContent result = future.get(); assertEquals(1, result.parts().size()); @@ -104,7 +104,7 @@ public void parse_multipartContent_coversLine310() throws Exception { // Call private method using helper if necessary, but parseContentObject is private. // However, parse(Object content, ...) calls it. - CompletableFuture future = parser.parse(content, "traceId", "spanId"); + CompletableFuture future = parser.parse(content); Parser.ParsedContent result = future.get(); assertTrue(result.isTruncated()); diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/PluginStateTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/PluginStateTest.java index 14dcc390e..444cc8a6d 100644 --- a/core/src/test/java/com/google/adk/plugins/agentanalytics/PluginStateTest.java +++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/PluginStateTest.java @@ -17,7 +17,6 @@ package com.google.adk.plugins.agentanalytics; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.atLeastOnce; @@ -29,14 +28,10 @@ import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.StreamWriter; -import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; -import java.lang.reflect.Field; import java.time.Duration; import java.time.Instant; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.logging.Handler; import java.util.logging.Level; import java.util.logging.LogRecord; @@ -70,6 +65,10 @@ protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) { return mockWriteClient; } + BigQueryWriteClient getMockWriteClient() { + return mockWriteClient; + } + @Override protected StreamWriter createWriter() { StreamWriter writer = mock(StreamWriter.class); @@ -103,11 +102,6 @@ public void tearDown() { pluginLogger.setLevel(originalLevel); } - @Test - public void getGcsOffloader_emptyBucketName_returnsNull() { - assertNull(pluginState.getGcsOffloader(config)); - } - @Test public void addPendingTask_removedTaskOnCompletion() { String invocationId = "testInvocation"; @@ -213,8 +207,7 @@ public void ensureInvocationCompleted_timeout_cleansUpState() throws IOException // Wait for cleanup side effects which run after terminal signal. long deadline = Instant.now().plusMillis(1000).toEpochMilli(); - while (!pluginState.getPendingTasksForInvocation(invocationId).isEmpty() - && Instant.now().toEpochMilli() < deadline) { + while (!pluginState.isProcessed(invocationId) && Instant.now().toEpochMilli() < deadline) { try { Thread.sleep(10); } catch (InterruptedException e) { @@ -249,51 +242,4 @@ public void close_succeedsAndCleansUp() throws Exception { assertTrue(pluginState.getTraceManagers().isEmpty()); assertTrue(pluginState.getExecutor().isShutdown()); } - - @Test - public void close_respectsRemainingTimeoutBudget() throws Exception { - config = config.toBuilder().shutdownTimeout(Duration.ofMillis(500)).build(); - pluginState = new TestPluginState(config); - - ExecutorService mockOffloadExecutor = mock(ExecutorService.class); - Field field = PluginState.class.getDeclaredField("offloadExecutor"); - field.setAccessible(true); - field.set(pluginState, mockOffloadExecutor); - - pluginState - .getExecutor() - .execute( - () -> { - Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(200)); - }); - - when(mockOffloadExecutor.awaitTermination(any(Long.class), any(TimeUnit.class))) - .thenReturn(true); - - pluginState.close().test().awaitDone(2, SECONDS); - - ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Long.class); - verify(mockOffloadExecutor).awaitTermination(timeoutCaptor.capture(), any(TimeUnit.class)); - - long capturedTimeout = timeoutCaptor.getValue(); - assertTrue("Timeout should be less than 400", capturedTimeout < 400); - assertTrue("Timeout should be greater than 100", capturedTimeout > 100); - } - - @Test - public void close_closesGcsOffloader() throws Exception { - GcsOffloader mockOffloader = mock(GcsOffloader.class); - BigQueryLoggerConfig gcsConfig = config.toBuilder().gcsBucketName("test-bucket").build(); - PluginState gcsState = - new TestPluginState(gcsConfig) { - @Override - protected GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) { - return mockOffloader; - } - }; - - gcsState.close().test().assertComplete(); - - verify(mockOffloader).close(); - } } diff --git a/pom.xml b/pom.xml index d63ef5268..665b696a0 100644 --- a/pom.xml +++ b/pom.xml @@ -75,7 +75,6 @@ 3.9.0 5.6.1 4.1.118.Final - 3.3.0 @{jacoco.agent.argLine} --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true @@ -295,11 +294,6 @@ assertj-core ${assertj.version} - - org.apache.tika - tika-core - ${tika.version} -