Skip to content

Commit f09ab23

Browse files
google-genai-botcopybara-github
authored andcommitted
feat: Adding functionality to support customer content formating
Allow plugin to modify the content before logging. This is useful for masking sensitive data, formatting content, etc. PiperOrigin-RevId: 904056388
1 parent 02a08a1 commit f09ab23

4 files changed

Lines changed: 156 additions & 13 deletions

File tree

core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPlugin.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,18 @@ private void logEvent(
210210
if (state.isProcessed(invocationContext.invocationId())) {
211211
return;
212212
}
213+
if (config.contentFormatter() != null && content != null) {
214+
try {
215+
content = config.contentFormatter().apply(content, eventType);
216+
} catch (RuntimeException e) {
217+
218+
logger.log(
219+
Level.WARNING,
220+
"Failed to format content for invocation ID: " + invocationContext.invocationId(),
221+
e);
222+
content = null; // Fail-closed to avoid leaking unmasked sensitive data
223+
}
224+
}
213225
String invocationId = invocationContext.invocationId();
214226
BatchProcessor processor = state.getBatchProcessor(invocationId);
215227
// Ensure table exists before logging.
@@ -223,10 +235,12 @@ private void logEvent(
223235
row.put("invocation_id", invocationContext.invocationId());
224236
row.put("user_id", invocationContext.userId());
225237
// Parse and log content
226-
ParsedContent parsedContent = JsonFormatter.parse(content, config.maxContentLength());
227-
row.put("content_parts", parsedContent.parts());
228-
row.put("content", parsedContent.content());
229-
row.put("is_truncated", isContentTruncated || parsedContent.isTruncated());
238+
if (content != null) {
239+
ParsedContent parsedContent = JsonFormatter.parse(content, config.maxContentLength());
240+
row.put("content_parts", parsedContent.parts());
241+
row.put("content", parsedContent.content());
242+
row.put("is_truncated", isContentTruncated || parsedContent.isTruncated());
243+
}
230244

231245
EventData data = eventData.orElse(EventData.builder().build());
232246
row.put("status", data.status());
@@ -301,7 +315,10 @@ private Map<String, Object> getAttributes(
301315
}
302316
attributes.put("session_metadata", sessionMeta);
303317
} catch (RuntimeException e) {
304-
// Ignore session enrichment errors as in Python.
318+
logger.log(
319+
Level.WARNING,
320+
"Failed to log session metadata for invocation ID: " + invocationContext.invocationId(),
321+
e);
305322
}
306323
}
307324

core/src/main/java/com/google/adk/plugins/agentanalytics/BigQueryLoggerConfig.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,32 @@ public abstract class BigQueryLoggerConfig {
7777
// Max size of the batch processor queue.
7878
public abstract int queueMaxSize();
7979

80-
// Optional custom formatter for content.
81-
// TODO(b/491852782): Implement content formatter.
82-
@Nullable
83-
public abstract BiFunction<Object, String, Object> contentFormatter();
80+
/**
81+
* Optional custom formatter for content.
82+
*
83+
* <p>Allow plugins to modify the content before logging. This is useful for masking sensitive
84+
* data, formatting content, etc.
85+
*
86+
* <p>The contentFormatter must be <b>thread-safe</b> as it may be called concurrently across
87+
* different agent invocations and <b>fast/non-blocking</b> to avoid adding latency to the agent's
88+
* event processing pipeline.
89+
*
90+
* <p><b>Important:</b> To avoid corruption of the logs, the incoming content object should
91+
* <b>not</b> be mutated. Modifying code should return a <b>new copy</b> of the object with
92+
* desired changes.
93+
*/
94+
public abstract @Nullable BiFunction<Object, String, Object> contentFormatter();
95+
96+
// GCS bucket name to store multi-modal content.
97+
public abstract String gcsBucketName();
8498

8599
// TODO(b/491852782): Implement connection id.
86100
public abstract Optional<String> connectionId();
87101

88102
// Toggle for session metadata (e.g. gchat thread-id).
89-
// TODO(b/491852782): Implement logging of session metadata.
90103
public abstract boolean logSessionMetadata();
91104

92105
// Static custom tags (e.g. {"agent_role": "sales"}).
93-
// TODO(b/491852782): Implement custom tags.
94106
public abstract ImmutableMap<String, Object> customTags();
95107

96108
// Automatically add new columns to existing tables when the plugin
@@ -120,6 +132,7 @@ public static Builder builder() {
120132
.tableName("events")
121133
.clusteringFields(ImmutableList.of("event_type", "agent", "user_id"))
122134
.logMultiModalContent(true)
135+
.gcsBucketName("")
123136
.retryConfig(RetryConfig.builder().build())
124137
.batchSize(1)
125138
.batchFlushInterval(Duration.ofSeconds(1))
@@ -205,6 +218,9 @@ public abstract Builder contentFormatter(
205218
@CanIgnoreReturnValue
206219
public abstract Builder viewPrefix(String viewPrefix);
207220

221+
@CanIgnoreReturnValue
222+
public abstract Builder gcsBucketName(String gcsBucketName);
223+
208224
@CanIgnoreReturnValue
209225
public abstract Builder credentials(Credentials credentials);
210226

core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginE2ETest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.junit.Assert.assertEquals;
2020
import static org.junit.Assert.assertFalse;
2121
import static org.junit.Assert.assertNotNull;
22+
import static org.junit.Assert.assertNull;
2223
import static org.junit.Assert.assertTrue;
2324
import static org.mockito.ArgumentMatchers.any;
2425
import static org.mockito.Mockito.mock;
@@ -200,7 +201,8 @@ public void runAgent_logsAgentStartingAndCompleted() throws Exception {
200201
assertEquals("user", agentStartingRow.get("user_id"));
201202
assertNotNull("invocation_id should be populated", agentStartingRow.get("invocation_id"));
202203
assertTrue("timestamp should be positive", (Long) agentStartingRow.get("timestamp") > 0);
203-
assertEquals(false, agentStartingRow.get("is_truncated"));
204+
// AGENT_STARTING is not a content-bearing event, so is_truncated is not set and should be null.
205+
assertNull(agentStartingRow.get("is_truncated"));
204206

205207
// Verify content for USER_MESSAGE_RECEIVED
206208
Map<String, Object> userMessageRow =

core/src/test/java/com/google/adk/plugins/agentanalytics/BigQueryAgentAnalyticsPluginTest.java

Lines changed: 109 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
import java.util.concurrent.CountDownLatch;
8080
import java.util.concurrent.ExecutorService;
8181
import java.util.concurrent.Executors;
82+
import java.util.function.BiFunction;
8283
import java.util.logging.Handler;
8384
import java.util.logging.Level;
8485
import java.util.logging.LogRecord;
@@ -493,7 +494,12 @@ public void onModelErrorCallback_populatesCorrectFields() throws Exception {
493494
assertEquals("ERROR", row.get("status"));
494495
assertEquals("model error message", row.get("error_message"));
495496
assertNotNull(row.get("latency_ms"));
496-
assertEquals(false, row.get("is_truncated"));
497+
assertFalse("Row should not contain content when it is null", row.containsKey("content"));
498+
assertFalse(
499+
"Row should not contain content_parts when it is null", row.containsKey("content_parts"));
500+
assertFalse(
501+
"Row should not contain is_truncated when content is null",
502+
row.containsKey("is_truncated"));
497503
}
498504

499505
@Test
@@ -649,6 +655,108 @@ protected StreamWriter createWriter() {
649655
"attributes should not contain session_metadata", attributes.has("session_metadata"));
650656
}
651657

658+
@Test
659+
public void logEvent_usesContentFormatter_whenConfigured() throws Exception {
660+
BiFunction<Object, String, Object> formatter =
661+
(content, eventType) -> {
662+
if (Objects.equals(eventType, "USER_MESSAGE_RECEIVED") && content instanceof Content) {
663+
return "Formatted: " + content;
664+
}
665+
return content;
666+
};
667+
668+
BigQueryLoggerConfig formattedConfig = config.toBuilder().contentFormatter(formatter).build();
669+
PluginState formattedState =
670+
new PluginState(formattedConfig) {
671+
@Override
672+
protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) {
673+
return mockWriteClient;
674+
}
675+
676+
@Override
677+
protected StreamWriter createWriter() {
678+
return mockWriter;
679+
}
680+
};
681+
BigQueryAgentAnalyticsPlugin formattedPlugin =
682+
new BigQueryAgentAnalyticsPlugin(formattedConfig, mockBigQuery, formattedState);
683+
684+
Content content = Content.fromParts(Part.fromText("test message"));
685+
formattedPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe();
686+
687+
Map<String, Object> row = formattedState.getBatchProcessor("invocation_id").queue.poll();
688+
assertNotNull(row);
689+
assertTrue(row.get("content").toString().contains("Formatted: "));
690+
}
691+
692+
@Test
693+
public void logEvent_handlesNullContentFromFormatter() throws Exception {
694+
BiFunction<Object, String, Object> formatter = (content, eventType) -> null;
695+
696+
BigQueryLoggerConfig formattedConfig = config.toBuilder().contentFormatter(formatter).build();
697+
PluginState formattedState =
698+
new PluginState(formattedConfig) {
699+
@Override
700+
protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) {
701+
return mockWriteClient;
702+
}
703+
704+
@Override
705+
protected StreamWriter createWriter() {
706+
return mockWriter;
707+
}
708+
};
709+
BigQueryAgentAnalyticsPlugin formattedPlugin =
710+
new BigQueryAgentAnalyticsPlugin(formattedConfig, mockBigQuery, formattedState);
711+
712+
Content content = Content.fromParts(Part.fromText("test message"));
713+
formattedPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe();
714+
715+
Map<String, Object> row = formattedState.getBatchProcessor("invocation_id").queue.poll();
716+
assertNotNull(row);
717+
assertFalse(
718+
"Row should not contain content when formatter returns null", row.containsKey("content"));
719+
assertFalse(
720+
"Row should not contain content_parts when formatter returns null",
721+
row.containsKey("content_parts"));
722+
}
723+
724+
@Test
725+
public void logEvent_handlesExceptionFromFormatter() throws Exception {
726+
BiFunction<Object, String, Object> formatter =
727+
(content, eventType) -> {
728+
throw new RuntimeException("Formatter error");
729+
};
730+
731+
BigQueryLoggerConfig formattedConfig = config.toBuilder().contentFormatter(formatter).build();
732+
PluginState formattedState =
733+
new PluginState(formattedConfig) {
734+
@Override
735+
protected BigQueryWriteClient createWriteClient(BigQueryLoggerConfig config) {
736+
return mockWriteClient;
737+
}
738+
739+
@Override
740+
protected StreamWriter createWriter() {
741+
return mockWriter;
742+
}
743+
};
744+
BigQueryAgentAnalyticsPlugin formattedPlugin =
745+
new BigQueryAgentAnalyticsPlugin(formattedConfig, mockBigQuery, formattedState);
746+
747+
Content content = Content.fromParts(Part.fromText("test message"));
748+
formattedPlugin.onUserMessageCallback(mockInvocationContext, content).blockingSubscribe();
749+
750+
Map<String, Object> row = formattedState.getBatchProcessor("invocation_id").queue.poll();
751+
assertNotNull(row);
752+
assertFalse(
753+
"Row should not contain content when formatter throws exception",
754+
row.containsKey("content"));
755+
assertFalse(
756+
"Row should not contain content_parts when formatter throws exception",
757+
row.containsKey("content_parts"));
758+
}
759+
652760
@Test
653761
public void maybeUpgradeSchema_addsNewTopLevelField() throws Exception {
654762
Table mockTable = mock(Table.class);

0 commit comments

Comments
 (0)