diff --git a/core/pom.xml b/core/pom.xml
index 3ebeaebb2..f0bafe01b 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -217,10 +217,6 @@
arrow-memory-netty
17.0.0
-
- org.apache.tika
- tika-core
-
diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java
index 566dbd5a4..59e09c8a7 100644
--- a/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java
+++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java
@@ -253,10 +253,7 @@ private Completable logEvent(
parseFuture =
state
.getParser()
- .parse(
- content,
- traceIds.traceId(),
- traceIds.spanId() != null ? traceIds.spanId() : "no_span")
+ .parse(content)
.thenAccept(
parsedContent -> {
row.put(
diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/GcsOffloader.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/GcsOffloader.java
deleted file mode 100644
index 17993bb8e..000000000
--- a/core/src/main/java/com/google/adk/plugins/agentanalytics/GcsOffloader.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Copyright 2026 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.adk.plugins.agentanalytics;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import com.google.auth.Credentials;
-import com.google.cloud.storage.BlobId;
-import com.google.cloud.storage.BlobInfo;
-import com.google.cloud.storage.Storage;
-import com.google.cloud.storage.StorageOptions;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import org.jspecify.annotations.Nullable;
-
-/** Offloads content to GCS. */
-class GcsOffloader {
- private final Storage storage;
- private final String bucketName;
- private final Executor executor;
- private final boolean isStorageOverride;
-
- GcsOffloader(
- String projectId,
- String bucketName,
- Executor executor,
- @Nullable Credentials credentials,
- @Nullable Storage storageOverride) {
- if (storageOverride != null) {
- this.isStorageOverride = true;
- this.storage = storageOverride;
- } else {
- this.isStorageOverride = false;
- StorageOptions.Builder builder = StorageOptions.newBuilder().setProjectId(projectId);
- if (credentials != null) {
- builder.setCredentials(credentials);
- }
- this.storage = builder.build().getService();
- }
- this.bucketName = bucketName;
- this.executor = executor;
- }
-
- /** Async wrapper around blocking GCS upload for binary data. */
- CompletableFuture uploadContent(byte[] data, String contentType, String path) {
- try {
- return CompletableFuture.supplyAsync(
- () -> {
- BlobId blobId = BlobId.of(bucketName, path);
- BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType(contentType).build();
- storage.create(blobInfo, data);
- return String.format("gs://%s/%s", bucketName, path);
- },
- executor);
- } catch (RejectedExecutionException e) {
- return CompletableFuture.failedFuture(e);
- }
- }
-
- /** Async wrapper around blocking GCS upload for text data. */
- CompletableFuture uploadContent(String data, String contentType, String path) {
- try {
- return CompletableFuture.supplyAsync(() -> data.getBytes(UTF_8), executor)
- .thenCompose(bytes -> uploadContent(bytes, contentType, path));
- } catch (RejectedExecutionException e) {
- return CompletableFuture.failedFuture(e);
- }
- }
-
- String getBucketName() {
- return bucketName;
- }
-
- void close() throws Exception {
- if (storage != null && !isStorageOverride) {
- storage.close();
- }
- }
-}
diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java
index f4eff09f7..5db8be46c 100644
--- a/core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java
+++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/Parser.java
@@ -19,7 +19,6 @@
import static com.google.adk.plugins.agentanalytics.JsonFormatter.mapper;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.smartTruncate;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncate;
-import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateAndAddSuffix;
import static com.google.adk.plugins.agentanalytics.JsonFormatter.truncateWithStatus;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -40,43 +39,16 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-import org.apache.tika.mime.MimeTypeException;
-import org.apache.tika.mime.MimeTypes;
import org.jspecify.annotations.Nullable;
-import org.threeten.bp.Instant;
-import org.threeten.bp.LocalDate;
-import org.threeten.bp.ZoneOffset;
/** Utility for parsing content for BigQuery logging. */
final class Parser {
- private static final String DEFAULT_EXTENSION = ".bin";
- private static final int MAX_OFFLOADED_TEXT_LENGTH = 200;
- private static final Logger logger = Logger.getLogger(Parser.class.getName());
- private static final int INLINE_TEXT_LIMIT = 32 * 1024; // 32KB limit
- private static final String UPLOAD_FAILED_MESSAGE = "[UPLOAD FAILED]";
- private static final String MEDIA_OFFLOADED_MESSAGE = "[MEDIA OFFLOADED]";
private static final String BINARY_DATA_MESSAGE = "[BINARY DATA]";
- private static final String TEXT_OFFLOADED_SUFFIX = "... [OFFLOADED]";
- private static final MimeTypes MIME_TYPES = MimeTypes.getDefaultMimeTypes();
-
- private final @Nullable GcsOffloader offloader;
private final int maxLength;
- private final @Nullable String connectionId;
- private final boolean logMultiModalContent;
-
- Parser(
- @Nullable GcsOffloader offloader,
- int maxLength,
- @Nullable String connectionId,
- boolean logMultiModalContent) {
- this.offloader = offloader;
+
+ Parser(int maxLength) {
this.maxLength = maxLength;
- this.connectionId = connectionId;
- this.logMultiModalContent = logMultiModalContent;
}
@AutoValue
@@ -180,11 +152,9 @@ static ObjectRef create(
* Parses content into JSON payload and content parts, matching Python implementation.
*
* @param content the content to parse
- * @param traceId the trace ID for GCS path
- * @param spanId the span ID for GCS path
* @return a CompletableFuture of ParsedContent object
*/
- CompletableFuture parse(Object content, String traceId, String spanId) {
+ CompletableFuture parse(Object content) {
if (content instanceof LlmRequest llmRequest) {
ObjectNode jsonPayload = mapper.createObjectNode();
ArrayNode messages = mapper.createArrayNode();
@@ -192,15 +162,13 @@ CompletableFuture parse(Object content, String traceId, String sp
List contents = llmRequest.contents();
for (Content c : contents) {
- futures.add(parseContentObject(c, traceId, spanId));
+ futures.add(parseContentObject(c));
}
CompletableFuture systemFuture = null;
if (llmRequest.config().isPresent()
&& llmRequest.config().get().systemInstruction().isPresent()) {
- systemFuture =
- parseContentObject(
- llmRequest.config().get().systemInstruction().get(), traceId, spanId);
+ systemFuture = parseContentObject(llmRequest.config().get().systemInstruction().get());
futures.add(systemFuture);
}
CompletableFuture finalSystemFuture = systemFuture;
@@ -234,7 +202,7 @@ CompletableFuture parse(Object content, String traceId, String sp
}
if (content instanceof LlmResponse llmResponse) {
ObjectNode jsonPayload = mapper.createObjectNode();
- return parseContentObject(llmResponse.content().orElse(null), traceId, spanId)
+ return parseContentObject(llmResponse.content().orElse(null))
.thenApply(
parsed -> {
ObjectNode summaryNode = mapper.createObjectNode();
@@ -257,7 +225,7 @@ CompletableFuture parse(Object content, String traceId, String sp
});
}
if (content instanceof Content || content instanceof Part) {
- return parseContentObject(content, traceId, spanId)
+ return parseContentObject(content)
.thenApply(
parsed -> {
ObjectNode summaryNode = mapper.createObjectNode();
@@ -281,13 +249,10 @@ CompletableFuture parse(Object content, String traceId, String sp
* Parses a Content or Part object into summary text and content parts.
*
* @param content the Content or Part object to parse
- * @param traceId the trace ID for GCS path
- * @param spanId the span ID for GCS path
* @return a CompletableFuture of ParsedContentObject containing parts, summary, and truncation
* flag
*/
- private CompletableFuture parseContentObject(
- Object content, String traceId, String spanId) {
+ private CompletableFuture parseContentObject(Object content) {
List parts;
if (content instanceof Content c) {
parts = c.parts().orElse(ImmutableList.of());
@@ -300,7 +265,7 @@ private CompletableFuture parseContentObject(
List> partFutures = new ArrayList<>();
for (int i = 0; i < parts.size(); i++) {
- partFutures.add(processPart(parts.get(i), i, traceId, spanId));
+ partFutures.add(processPart(parts.get(i), i));
}
return CompletableFuture.allOf(partFutures.toArray(new CompletableFuture>[0]))
@@ -330,8 +295,7 @@ private CompletableFuture parseContentObject(
});
}
- private CompletableFuture processPart(
- Part part, int index, String traceId, String spanId) {
+ private CompletableFuture processPart(Part part, int index) {
ContentPart.Builder partBuilder =
ContentPart.builder()
.setPartIndex(index)
@@ -356,89 +320,17 @@ private CompletableFuture processPart(
if (part.inlineData().isPresent()) {
Blob blob = part.inlineData().get();
String mimeType = blob.mimeType().orElse("application/octet-stream");
- if (logMultiModalContent && offloader != null) {
- String ext = DEFAULT_EXTENSION;
- try {
- ext = MIME_TYPES.forName(mimeType).getExtension();
- } catch (MimeTypeException e) {
- logger.log(Level.WARNING, "Failed to get extension for mime type " + mimeType, e);
- }
- String path =
- String.format(
- "%s/%s/%s_p%d_%s%s",
- getLocalDate(), traceId, spanId, index, UUID.randomUUID(), ext);
- return offloader
- .uploadContent(blob.data().orElse(new byte[0]), mimeType, path)
- .handle(
- (uri, ex) -> {
- if (ex != null) {
- logger.log(Level.WARNING, "Failed to offload content to GCS", ex);
- partBuilder.setText(UPLOAD_FAILED_MESSAGE);
- } else {
- ObjectNode details = mapper.createObjectNode();
- ObjectNode gcsMetadata = details.putObject("gcs_metadata");
- gcsMetadata.put("content_type", mimeType);
-
- partBuilder
- .setStorageMode("GCS_REFERENCE")
- .setUri(uri)
- .setMimeType(mimeType)
- .setText(MEDIA_OFFLOADED_MESSAGE)
- .setObjectRef(
- mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
- }
- return TruncationResult.create(mapper.valueToTree(partBuilder.build()), false);
- });
- } else {
- partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
- return CompletableFuture.completedFuture(
- TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
- }
+ partBuilder.setText(BINARY_DATA_MESSAGE).setMimeType(mimeType);
+ return CompletableFuture.completedFuture(
+ TruncationResult.create(mapper.valueToTree(partBuilder.build()), false));
}
// CASE C: Text
if (part.text().isPresent()) {
String text = part.text().get();
- int textLen = Utf8.encodedLength(text);
- int offloadThreshold = Math.min(INLINE_TEXT_LIMIT, maxLength);
-
- if (offloader != null && textLen > offloadThreshold) {
-
- String path =
- String.format(
- "%s/%s/%s_p%d_%s.txt", getLocalDate(), traceId, spanId, index, UUID.randomUUID());
- return offloader
- .uploadContent(text, "text/plain", path)
- .handle(
- (uri, ex) -> {
- if (ex != null) {
- logger.log(Level.WARNING, "Failed to offload text to GCS", ex);
- TruncationResult res = truncateWithStatus(text, maxLength);
- partBuilder.setText(res.node().asText());
- return TruncationResult.create(
- mapper.valueToTree(partBuilder.build()), res.isTruncated());
- } else {
- ObjectNode details = mapper.createObjectNode();
- ObjectNode gcsMetadata = details.putObject("gcs_metadata");
- gcsMetadata.put("content_type", "text/plain");
-
- partBuilder
- .setStorageMode("GCS_REFERENCE")
- .setUri(uri)
- .setMimeType("text/plain")
- .setText(
- truncateAndAddSuffix(
- text, MAX_OFFLOADED_TEXT_LENGTH, TEXT_OFFLOADED_SUFFIX))
- .setObjectRef(
- mapper.valueToTree(ObjectRef.create(uri, null, connectionId, details)));
- return TruncationResult.create(mapper.valueToTree(partBuilder.build()), true);
- }
- });
- } else {
- TruncationResult res = truncateWithStatus(text, maxLength);
- partBuilder.setText(res.node().asText());
- return CompletableFuture.completedFuture(
- TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
- }
+ TruncationResult res = truncateWithStatus(text, maxLength);
+ partBuilder.setText(res.node().asText());
+ return CompletableFuture.completedFuture(
+ TruncationResult.create(mapper.valueToTree(partBuilder.build()), res.isTruncated()));
}
if (part.functionCall().isPresent()) {
FunctionCall fc = part.functionCall().get();
@@ -487,8 +379,4 @@ ArrayNode formatContentParts(Optional content) {
}
return partsArray;
}
-
- private LocalDate getLocalDate() {
- return Instant.now().atZone(ZoneOffset.UTC).toLocalDate();
- }
}
diff --git a/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java b/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java
index d1826ec5e..0654fab5d 100644
--- a/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java
+++ b/core/src/main/java/com/google/adk/plugins/agentanalytics/PluginState.java
@@ -3,7 +3,6 @@
import static com.google.adk.plugins.agentanalytics.BigQueryUtils.getVersionHeaderValue;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static java.util.concurrent.TimeUnit.SECONDS;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.retrying.RetrySettings;
@@ -22,35 +21,21 @@
import java.util.Collection;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.jspecify.annotations.Nullable;
import org.threeten.bp.Duration;
-import org.threeten.bp.Instant;
/** Manages state for the BigQueryAgentAnalyticsPlugin. */
class PluginState {
private static final Logger logger = Logger.getLogger(PluginState.class.getName());
- private static final int GCS_OFFLOAD_CORE_POOL_SIZE = 2;
- private static final int GCS_OFFLOAD_MAX_THREADS = 10;
- // Max number of tasks in the queue before we start rejecting tasks and executing them in the
- // caller thread.
- private static final int GCS_OFFLOAD_QUEUE_SIZE = 100;
- // Idle time before threads are terminated.
- private static final int GCS_OFFLOAD_IDLE_TIME_SECONDS = 30;
-
private final BigQueryLoggerConfig config;
private final ScheduledExecutorService executor;
- private final ExecutorService offloadExecutor;
private final BigQueryWriteClient writeClient;
private static final AtomicLong threadCounter = new AtomicLong(0);
// Map of invocation ID to BatchProcessor.
@@ -60,7 +45,6 @@ class PluginState {
private final ConcurrentHashMap traceManagers = new ConcurrentHashMap<>();
// Cache of invocation ID to Boolean indicating invocation ID has been processed.
private final Cache processedInvocations;
- private final GcsOffloader offloader;
private final Parser parser;
private final ConcurrentHashMap>> pendingTasks =
new ConcurrentHashMap<>();
@@ -70,7 +54,6 @@ class PluginState {
this.executor =
Executors.newScheduledThreadPool(
2, r -> new Thread(r, "bq-analytics-plugin-" + threadCounter.getAndIncrement()));
- this.offloadExecutor = createGcsOffloadThreadPool();
// One write client per plugin instance, shared by all invocations.
this.writeClient = createWriteClient(config);
this.processedInvocations =
@@ -78,25 +61,7 @@ class PluginState {
.maximumSize(10000)
.expireAfterWrite(java.time.Duration.ofMinutes(10))
.build();
- this.offloader = getGcsOffloader(config);
- this.parser =
- new Parser(
- offloader,
- config.maxContentLength(),
- config.connectionId().orElse(null),
- config.logMultiModalContent());
- }
-
- private static ExecutorService createGcsOffloadThreadPool() {
- return new ThreadPoolExecutor(
- GCS_OFFLOAD_CORE_POOL_SIZE, // The lower limit of threads.
- GCS_OFFLOAD_MAX_THREADS, // The upper limit of threads.
- GCS_OFFLOAD_IDLE_TIME_SECONDS, // Time to keep idle threads alive.
- SECONDS,
- new ArrayBlockingQueue<>(GCS_OFFLOAD_QUEUE_SIZE), // workQueue: Hand off tasks directly.
- r -> new Thread(r, "bq-analytics-plugin-offload-" + threadCounter.getAndIncrement()),
- // Reject tasks if the queue is full.
- new ThreadPoolExecutor.AbortPolicy());
+ this.parser = new Parser(config.maxContentLength());
}
ScheduledExecutorService getExecutor() {
@@ -177,14 +142,6 @@ BatchProcessor getBatchProcessor(String invocationId) {
});
}
- protected @Nullable GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) {
- if (config.gcsBucketName().isEmpty()) {
- return null;
- }
- return new GcsOffloader(
- config.projectId(), config.gcsBucketName(), offloadExecutor, config.credentials(), null);
- }
-
Parser getParser() {
return parser;
}
@@ -217,8 +174,7 @@ void clearBatchProcessors() {
batchProcessors.clear();
}
- @VisibleForTesting
- protected Set> getPendingTasksForInvocation(String invocationId) {
+ private Set> getPendingTasksForInvocation(String invocationId) {
return pendingTasks.computeIfAbsent(invocationId, k -> ConcurrentHashMap.newKeySet());
}
@@ -307,34 +263,13 @@ Completable close() {
}
try {
executor.shutdown();
- offloadExecutor.shutdown();
- long totalTimeoutMillis = config.shutdownTimeout().toMillis();
- Instant startTime = Instant.now();
- if (!executor.awaitTermination(totalTimeoutMillis, MILLISECONDS)) {
+ if (!executor.awaitTermination(config.shutdownTimeout().toMillis(), MILLISECONDS)) {
executor.shutdownNow();
}
- long elapsedTimeMillis = Duration.between(startTime, Instant.now()).toMillis();
- long remainingMillis = totalTimeoutMillis - elapsedTimeMillis;
- if (remainingMillis > 0) {
- if (!offloadExecutor.awaitTermination(remainingMillis, MILLISECONDS)) {
- offloadExecutor.shutdownNow();
- }
- } else {
- offloadExecutor.shutdownNow();
- }
} catch (InterruptedException e) {
executor.shutdownNow();
- offloadExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
-
- try {
- if (offloader != null) {
- offloader.close();
- }
- } catch (Exception e) {
- logger.log(Level.WARNING, "Failed to close GCS offloader", e);
- }
});
}
}
diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java
index 6d65e2f15..836442cad 100644
--- a/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java
+++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java
@@ -17,7 +17,6 @@
package com.google.adk.plugins.agentanalytics;
import static com.google.common.collect.ImmutableList.toImmutableList;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -25,13 +24,11 @@
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
-import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.adk.agents.BaseAgent;
@@ -59,8 +56,6 @@
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
-import com.google.cloud.storage.BlobInfo;
-import com.google.cloud.storage.Storage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.genai.types.Candidate;
@@ -807,7 +802,6 @@ public void logEvent_handlesExceptionFromFormatter() throws Exception {
(content, eventType) -> {
throw new RuntimeException("Formatter error");
};
-
BigQueryLoggerConfig formattedConfig = config.toBuilder().contentFormatter(formatter).build();
PluginState formattedState =
new PluginState(formattedConfig) {
@@ -1048,133 +1042,6 @@ public void logEvent_createsUniqueProcessorPerInvocation() throws Exception {
testExecutor.shutdown();
}
- @Test
- public void logEvent_offloadsToGcs_whenLargeContent() throws Exception {
- GcsOffloader mockOffloader = mock(GcsOffloader.class);
- when(mockOffloader.uploadContent(anyString(), anyString(), anyString()))
- .thenReturn(CompletableFuture.completedFuture("gs://test-bucket/large.txt"));
-
- BigQueryLoggerConfig gcsConfig = config.toBuilder().gcsBucketName("test-bucket").build();
- PluginState gcsState =
- new PluginState(gcsConfig) {
- @Override
- protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) {
- return mockWriteClient;
- }
-
- @Override
- protected StreamWriter createWriter() {
- return mockWriter;
- }
-
- @Override
- protected GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) {
- return mockOffloader;
- }
- };
- BigQueryAgentAnalyticsPlugin gcsPlugin =
- new BigQueryAgentAnalyticsPlugin(gcsConfig, mockBigQuery, gcsState);
-
- // Large text (> 32KB default threshold)
- String largeText = "a".repeat(40000);
- Content content = Content.fromParts(Part.fromText(largeText));
- gcsPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe();
-
- verify(mockOffloader, atLeastOnce()).uploadContent(anyString(), anyString(), anyString());
-
- Map row = gcsState.getBatchProcessor("invocation_id").queue.poll();
- assertNotNull(row);
- @SuppressWarnings("unchecked") // Test only
- List contentParts = (List) row.get("content_parts");
- assertEquals("GCS_REFERENCE", contentParts.get(0).get("storage_mode").asText());
- assertEquals("gs://test-bucket/large.txt", contentParts.get(0).get("uri").asText());
- }
-
- @Test
- public void logEvent_offloadsToGcs_whenMultimodalContent() throws Exception {
- GcsOffloader mockOffloader = mock(GcsOffloader.class);
- when(mockOffloader.uploadContent(any(byte[].class), anyString(), anyString()))
- .thenReturn(CompletableFuture.completedFuture("gs://test-bucket/image.png"));
-
- BigQueryLoggerConfig gcsConfig = config.toBuilder().gcsBucketName("test-bucket").build();
- PluginState gcsState =
- new PluginState(gcsConfig) {
- @Override
- protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) {
- return mockWriteClient;
- }
-
- @Override
- protected StreamWriter createWriter() {
- return mockWriter;
- }
-
- @Override
- protected GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) {
- return mockOffloader;
- }
- };
- BigQueryAgentAnalyticsPlugin gcsPlugin =
- new BigQueryAgentAnalyticsPlugin(gcsConfig, mockBigQuery, gcsState);
-
- Content content = Content.fromParts(Part.fromBytes("test-data".getBytes(UTF_8), "image/png"));
- gcsPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe();
-
- verify(mockOffloader, atLeastOnce()).uploadContent(any(byte[].class), anyString(), anyString());
-
- Map row = gcsState.getBatchProcessor("invocation_id").queue.poll();
- assertNotNull(row);
- @SuppressWarnings("unchecked") // Test only
- List contentParts = (List) row.get("content_parts");
- assertEquals("GCS_REFERENCE", contentParts.get(0).get("storage_mode").asText());
- assertEquals("gs://test-bucket/image.png", contentParts.get(0).get("uri").asText());
- }
-
- @Test
- public void logEvent_integrationWithRealGcsOffloader_whenLargeContent() throws Exception {
- Storage mockStorage = mock(Storage.class);
-
- BigQueryLoggerConfig gcsConfig = config.toBuilder().gcsBucketName("test-bucket").build();
- PluginState gcsState =
- new PluginState(gcsConfig) {
- @Override
- protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) {
- return mockWriteClient;
- }
-
- @Override
- protected StreamWriter createWriter() {
- return mockWriter;
- }
-
- @Override
- protected GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) {
- return new GcsOffloader(
- config.projectId(),
- config.gcsBucketName(),
- Runnable::run, // Use direct executor for synchronous execution
- config.credentials(),
- mockStorage);
- }
- };
- BigQueryAgentAnalyticsPlugin gcsPlugin =
- new BigQueryAgentAnalyticsPlugin(gcsConfig, mockBigQuery, gcsState);
-
- // Large text (> 32KB default threshold)
- String largeText = "a".repeat(40000);
- Content content = Content.fromParts(Part.fromText(largeText));
- gcsPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe();
-
- verify(mockStorage, atLeastOnce()).create(any(BlobInfo.class), any(byte[].class));
-
- Map row = gcsState.getBatchProcessor("invocation_id").queue.poll();
- assertNotNull(row);
- @SuppressWarnings("unchecked") // Test only
- List contentParts = (List) row.get("content_parts");
- assertEquals("GCS_REFERENCE", contentParts.get(0).get("storage_mode").asText());
- assertTrue(contentParts.get(0).get("uri").asText().startsWith("gs://test-bucket/"));
- }
-
private static class FakeAgent extends BaseAgent {
FakeAgent(String name) {
super(name, "description", null, null, null);
diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/GcsOffloaderTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/GcsOffloaderTest.java
deleted file mode 100644
index 7b8de3813..000000000
--- a/core/src/test/java/com/google/adk/plugins/agentanalytics/GcsOffloaderTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Copyright 2026 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.adk.plugins.agentanalytics;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
-import com.google.cloud.storage.BlobId;
-import com.google.cloud.storage.BlobInfo;
-import com.google.cloud.storage.Storage;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.ArgumentCaptor;
-
-@RunWith(JUnit4.class)
-public final class GcsOffloaderTest {
- private static final String PROJECT_ID = "test-project";
- private static final String BUCKET_NAME = "test-bucket";
- private static final String PATH = "test-path/file.txt";
- private static final String CONTENT_TYPE = "text/plain";
-
- private Storage mockStorage;
- private ExecutorService executor;
- private GcsOffloader gcsOffloader;
-
- @Before
- public void setUp() {
- mockStorage = mock(Storage.class);
- executor = Executors.newSingleThreadExecutor();
- gcsOffloader = new GcsOffloader(PROJECT_ID, BUCKET_NAME, executor, null, mockStorage);
- }
-
- @After
- public void tearDown() {
- executor.shutdown();
- }
-
- @Test
- public void uploadContent_bytes_succeeds() throws Exception {
- byte[] data = "hello world".getBytes(UTF_8);
- CompletableFuture future = gcsOffloader.uploadContent(data, CONTENT_TYPE, PATH);
-
- String result = future.get();
-
- assertEquals("gs://" + BUCKET_NAME + "/" + PATH, result);
-
- ArgumentCaptor blobInfoCaptor = ArgumentCaptor.forClass(BlobInfo.class);
- verify(mockStorage).create(blobInfoCaptor.capture(), any(byte[].class));
-
- BlobInfo blobInfo = blobInfoCaptor.getValue();
- assertEquals(BlobId.of(BUCKET_NAME, PATH), blobInfo.getBlobId());
- assertEquals(CONTENT_TYPE, blobInfo.getContentType());
- }
-
- @Test
- public void uploadContent_string_succeeds() throws Exception {
- String data = "hello world string";
- CompletableFuture future = gcsOffloader.uploadContent(data, CONTENT_TYPE, PATH);
-
- String result = future.get();
-
- assertEquals("gs://" + BUCKET_NAME + "/" + PATH, result);
-
- ArgumentCaptor blobInfoCaptor = ArgumentCaptor.forClass(BlobInfo.class);
- verify(mockStorage).create(blobInfoCaptor.capture(), any(byte[].class));
-
- BlobInfo blobInfo = blobInfoCaptor.getValue();
- assertEquals(BlobId.of(BUCKET_NAME, PATH), blobInfo.getBlobId());
- assertEquals(CONTENT_TYPE, blobInfo.getContentType());
- }
-
- @Test
- public void uploadContent_executorRejected_returnsFailedFuture() {
- Executor rejectingExecutor =
- r -> {
- throw new RejectedExecutionException("Rejected");
- };
- GcsOffloader offloaderWithRejectingExecutor =
- new GcsOffloader(PROJECT_ID, BUCKET_NAME, rejectingExecutor, null, mockStorage);
-
- CompletableFuture future =
- offloaderWithRejectingExecutor.uploadContent("data".getBytes(UTF_8), CONTENT_TYPE, PATH);
-
- assertTrue(future.isCompletedExceptionally());
- assertThrows(ExecutionException.class, future::get);
- }
-
- @Test
- public void close_doesNotCloseStorageOverride() throws Exception {
- gcsOffloader.close();
- verify(mockStorage, never()).close();
- }
-}
diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java
index 663f5e5cd..4883438b6 100644
--- a/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java
+++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/JsonFormatterTest.java
@@ -16,22 +16,16 @@
package com.google.adk.plugins.agentanalytics;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.adk.models.LlmRequest;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.genai.types.Blob;
import com.google.genai.types.Content;
import com.google.genai.types.FileData;
import com.google.genai.types.FunctionCall;
@@ -39,7 +33,6 @@
import com.google.genai.types.Part;
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -56,8 +49,7 @@ public void parse_llmRequest_populatesPrompt() throws Exception {
Content.fromParts(Part.fromText("hello")).toBuilder().role("user").build()))
.build();
- Parser.ParsedContent result =
- new Parser(null, 100, null, true).parse(request, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(100).parse(request).get();
assertTrue(result.content().has("prompt"));
ArrayNode prompt = (ArrayNode) result.content().get("prompt");
@@ -77,8 +69,7 @@ public void parse_llmRequest_populatesSystemPrompt() throws Exception {
.build())
.build();
- Parser.ParsedContent result =
- new Parser(null, 100, null, true).parse(request, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(100).parse(request).get();
assertTrue(result.content().has("system_prompt"));
assertEquals("be helpful", result.content().get("system_prompt").asText());
@@ -88,8 +79,7 @@ public void parse_llmRequest_populatesSystemPrompt() throws Exception {
@Test
public void parse_string_truncates() throws Exception {
String longString = "this is a very long string that should be truncated";
- Parser.ParsedContent result =
- new Parser(null, 24, null, true).parse(longString, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(24).parse(longString).get();
assertTrue(result.isTruncated());
assertEquals("this is a ...[truncated]", result.content().asText());
@@ -99,8 +89,7 @@ public void parse_string_truncates() throws Exception {
public void parse_map_truncatesNested() throws Exception {
ImmutableMap map =
ImmutableMap.of("key", "this is a very long value that should definitely be truncated");
- Parser.ParsedContent result =
- new Parser(null, 24, null, true).parse(map, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(24).parse(map).get();
assertTrue(result.isTruncated());
assertEquals("this is a ...[truncated]", result.content().get("key").asText());
@@ -109,8 +98,7 @@ public void parse_map_truncatesNested() throws Exception {
@Test
public void parse_content_returnsSummary() throws Exception {
Content content = Content.fromParts(Part.fromText("part 1"), Part.fromText("part 2"));
- Parser.ParsedContent result =
- new Parser(null, 100, null, true).parse(content, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(100).parse(content).get();
assertEquals("part 1 | part 2", result.content().get("text_summary").asText());
assertEquals(2, result.parts().size());
@@ -121,8 +109,7 @@ public void parse_content_withFileData() throws Exception {
FileData fileData =
FileData.builder().fileUri("gs://bucket/file.txt").mimeType("text/plain").build();
Content content = Content.fromParts(Part.builder().fileData(fileData).build());
- Parser.ParsedContent result =
- new Parser(null, 100, null, true).parse(content, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(100).parse(content).get();
assertEquals(1, result.parts().size());
JsonNode partData = result.parts().get(0);
@@ -135,8 +122,7 @@ public void parse_content_withFileData() throws Exception {
public void parse_content_withFunctionCall() throws Exception {
FunctionCall fc = FunctionCall.builder().name("myFunction").build();
Content content = Content.fromParts(Part.builder().functionCall(fc).build());
- Parser.ParsedContent result =
- new Parser(null, 100, null, true).parse(content, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(100).parse(content).get();
assertEquals(1, result.parts().size());
JsonNode partData = result.parts().get(0);
@@ -149,8 +135,7 @@ public void parse_content_withFunctionCall() throws Exception {
public void parse_list_truncatesElements() throws Exception {
List list =
Arrays.asList("short", "this is a very long string that should be truncated");
- Parser.ParsedContent result =
- new Parser(null, 24, null, true).parse(list, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(24).parse(list).get();
assertTrue(result.isTruncated());
JsonNode arrayNode = result.content();
@@ -160,44 +145,6 @@ public void parse_list_truncatesElements() throws Exception {
assertEquals("this is a ...[truncated]", arrayNode.get(1).asText());
}
- @Test
- public void parse_withOffloader_offloadsLargeText() throws Exception {
- GcsOffloader offloader = mock(GcsOffloader.class);
- when(offloader.uploadContent(anyString(), anyString(), anyString()))
- .thenReturn(CompletableFuture.completedFuture("gs://mock-bucket/path"));
-
- Content content =
- Content.fromParts(Part.fromText("this text is longer than 10 characters".repeat(100)));
- Parser.ParsedContent result =
- new Parser(offloader, 10, "conn", true).parse(content, "trace", "span").get();
-
- assertEquals(1, result.parts().size());
- JsonNode partData = result.parts().get(0);
- assertEquals("GCS_REFERENCE", partData.get("storage_mode").asText());
- assertEquals("gs://mock-bucket/path", partData.get("uri").asText());
- assertTrue(partData.get("text").asText().contains("[OFFLOADED]"));
- assertEquals("conn", partData.get("object_ref").get("authorizer").asText());
- }
-
- @Test
- public void parse_withOffloader_offloadsBinaryData() throws Exception {
- GcsOffloader offloader = mock(GcsOffloader.class);
- when(offloader.uploadContent(any(byte[].class), anyString(), anyString()))
- .thenReturn(CompletableFuture.completedFuture("gs://mock-bucket/image.png"));
-
- Blob blob = Blob.builder().data("fake-image".getBytes(UTF_8)).mimeType("image/png").build();
- Content content = Content.fromParts(Part.builder().inlineData(blob).build());
- Parser.ParsedContent result =
- new Parser(offloader, 100, "conn", true).parse(content, "trace", "span").get();
-
- assertEquals(1, result.parts().size());
- JsonNode partData = result.parts().get(0);
- assertEquals("GCS_REFERENCE", partData.get("storage_mode").asText());
- assertEquals("gs://mock-bucket/image.png", partData.get("uri").asText());
- assertEquals("image/png", partData.get("mime_type").asText());
- assertEquals("[MEDIA OFFLOADED]", partData.get("text").asText());
- }
-
@Test
public void truncate_variousInputs() {
assertNull(JsonFormatter.truncate(null, 10));
@@ -241,8 +188,7 @@ public void parse_multibyteString_truncatesBasedOnBytes() throws Exception {
// "こんにちはこんにちは" is 30 bytes, but 10 characters.
String nihongo = "こんにちはこんにちは";
// With budget 20, effective budget is 6, so only 2 characters (6 bytes) should be kept.
- Parser.ParsedContent result =
- new Parser(null, 20, null, true).parse(nihongo, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(20).parse(nihongo).get();
assertTrue(result.isTruncated());
assertEquals("こん...[truncated]", result.content().asText());
@@ -251,8 +197,7 @@ public void parse_multibyteString_truncatesBasedOnBytes() throws Exception {
@Test
public void parse_multibyteContent_truncatesBasedOnBytes() throws Exception {
Content content = Content.fromParts(Part.fromText("こんにちはこんにちは"));
- Parser.ParsedContent result =
- new Parser(null, 20, null, true).parse(content, "trace", "span").get();
+ Parser.ParsedContent result = new Parser(20).parse(content).get();
assertTrue(result.isTruncated());
assertEquals("こん...[truncated]", result.content().get("text_summary").asText());
diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/ParserTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/ParserTest.java
index 385e81082..9bae03331 100644
--- a/core/src/test/java/com/google/adk/plugins/agentanalytics/ParserTest.java
+++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/ParserTest.java
@@ -38,13 +38,13 @@ public final class ParserTest {
@Before
public void setUp() {
- parser = new Parser(null, 100, "connectionId", true);
+ parser = new Parser(100);
}
@Test
public void parse_part_coversLine280() throws Exception {
Part part = Part.fromText("test part");
- CompletableFuture future = parser.parse(part, "traceId", "spanId");
+ CompletableFuture future = parser.parse(part);
Parser.ParsedContent result = future.get();
assertEquals("{\"text_summary\":\"test part\"}", result.content().toString());
@@ -56,7 +56,7 @@ public void parse_part_coversLine280() throws Exception {
public void parse_part_withInlineData_coversProcessPart() throws Exception {
Blob blob = Blob.builder().mimeType("image/png").data(new byte[] {1, 2, 3}).build();
Part part = Part.builder().inlineData(blob).build();
- CompletableFuture future = parser.parse(part, "traceId", "spanId");
+ CompletableFuture future = parser.parse(part);
Parser.ParsedContent result = future.get();
assertEquals(1, result.parts().size());
@@ -104,7 +104,7 @@ public void parse_multipartContent_coversLine310() throws Exception {
// Call private method using helper if necessary, but parseContentObject is private.
// However, parse(Object content, ...) calls it.
- CompletableFuture future = parser.parse(content, "traceId", "spanId");
+ CompletableFuture future = parser.parse(content);
Parser.ParsedContent result = future.get();
assertTrue(result.isTruncated());
diff --git a/core/src/test/java/com/google/adk/plugins/agentanalytics/PluginStateTest.java b/core/src/test/java/com/google/adk/plugins/agentanalytics/PluginStateTest.java
index 14dcc390e..444cc8a6d 100644
--- a/core/src/test/java/com/google/adk/plugins/agentanalytics/PluginStateTest.java
+++ b/core/src/test/java/com/google/adk/plugins/agentanalytics/PluginStateTest.java
@@ -17,7 +17,6 @@
package com.google.adk.plugins.agentanalytics;
import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
@@ -29,14 +28,10 @@
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
-import com.google.common.util.concurrent.Uninterruptibles;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogRecord;
@@ -70,6 +65,10 @@ protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) {
return mockWriteClient;
}
+ BigQueryWriteClient getMockWriteClient() {
+ return mockWriteClient;
+ }
+
@Override
protected StreamWriter createWriter() {
StreamWriter writer = mock(StreamWriter.class);
@@ -103,11 +102,6 @@ public void tearDown() {
pluginLogger.setLevel(originalLevel);
}
- @Test
- public void getGcsOffloader_emptyBucketName_returnsNull() {
- assertNull(pluginState.getGcsOffloader(config));
- }
-
@Test
public void addPendingTask_removedTaskOnCompletion() {
String invocationId = "testInvocation";
@@ -213,8 +207,7 @@ public void ensureInvocationCompleted_timeout_cleansUpState() throws IOException
// Wait for cleanup side effects which run after terminal signal.
long deadline = Instant.now().plusMillis(1000).toEpochMilli();
- while (!pluginState.getPendingTasksForInvocation(invocationId).isEmpty()
- && Instant.now().toEpochMilli() < deadline) {
+ while (!pluginState.isProcessed(invocationId) && Instant.now().toEpochMilli() < deadline) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
@@ -249,51 +242,4 @@ public void close_succeedsAndCleansUp() throws Exception {
assertTrue(pluginState.getTraceManagers().isEmpty());
assertTrue(pluginState.getExecutor().isShutdown());
}
-
- @Test
- public void close_respectsRemainingTimeoutBudget() throws Exception {
- config = config.toBuilder().shutdownTimeout(Duration.ofMillis(500)).build();
- pluginState = new TestPluginState(config);
-
- ExecutorService mockOffloadExecutor = mock(ExecutorService.class);
- Field field = PluginState.class.getDeclaredField("offloadExecutor");
- field.setAccessible(true);
- field.set(pluginState, mockOffloadExecutor);
-
- pluginState
- .getExecutor()
- .execute(
- () -> {
- Uninterruptibles.sleepUninterruptibly(Duration.ofMillis(200));
- });
-
- when(mockOffloadExecutor.awaitTermination(any(Long.class), any(TimeUnit.class)))
- .thenReturn(true);
-
- pluginState.close().test().awaitDone(2, SECONDS);
-
- ArgumentCaptor timeoutCaptor = ArgumentCaptor.forClass(Long.class);
- verify(mockOffloadExecutor).awaitTermination(timeoutCaptor.capture(), any(TimeUnit.class));
-
- long capturedTimeout = timeoutCaptor.getValue();
- assertTrue("Timeout should be less than 400", capturedTimeout < 400);
- assertTrue("Timeout should be greater than 100", capturedTimeout > 100);
- }
-
- @Test
- public void close_closesGcsOffloader() throws Exception {
- GcsOffloader mockOffloader = mock(GcsOffloader.class);
- BigQueryLoggerConfig gcsConfig = config.toBuilder().gcsBucketName("test-bucket").build();
- PluginState gcsState =
- new TestPluginState(gcsConfig) {
- @Override
- protected GcsOffloader getGcsOffloader(BigQueryLoggerConfig config) {
- return mockOffloader;
- }
- };
-
- gcsState.close().test().assertComplete();
-
- verify(mockOffloader).close();
- }
}
diff --git a/pom.xml b/pom.xml
index d63ef5268..665b696a0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,6 @@
3.9.0
5.6.1
4.1.118.Final
- 3.3.0
@{jacoco.agent.argLine} --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED -Dio.netty.tryReflectionSetAccessible=true
@@ -295,11 +294,6 @@
assertj-core
${assertj.version}
-
- org.apache.tika
- tika-core
- ${tika.version}
-