-
Notifications
You must be signed in to change notification settings - Fork 13
Expand file tree
/
Copy pathcontext_graph.py
More file actions
2636 lines (2348 loc) · 83 KB
/
Copy pathcontext_graph.py
File metadata and controls
2636 lines (2348 loc) · 83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Context Graph: Property Graph for Agent Trace + Business Entity linking.
This module provides the "System of Reasoning" for enterprise agents by
cross-linking the **Technical Graph** (execution lineage from the ADK
BigQuery Agent Analytics Plugin) with a **Business Graph** (domain
entities extracted via ``AI.GENERATE``).
Key capabilities:
- **Business entity extraction** — Use ``AI.GENERATE`` to extract
structured entities (e.g. Products, Targeting segments, Campaigns)
from unstructured agent payloads. The model is prompted to return
a JSON array; the SQL strips markdown fences and JSON_EXTRACT_ARRAY
parses each entity.
- **Property Graph DDL** — Generate ``CREATE PROPERTY GRAPH`` DDL
that formalizes Tech nodes, Biz nodes, ``CAUSED`` edges (parent→child
span linkage), and ``EVALUATED`` cross-links.
- **GQL traversal** — Quantified-path GQL queries to answer "Why was
X selected?" by tracing causal chains from a decision back to the
business inputs.
- **World Change detection** — Compare business entities evaluated at
agent-execution time against current availability to detect stale
context in long-running A2A tasks.
- **Decision Semantics** — Model agent decision points with candidates,
scores, selection/rejection status, and rejection rationale for
EU audit compliance.
Example usage::
from bigquery_agent_analytics.context_graph import ContextGraphManager
cgm = ContextGraphManager(
project_id="my-project",
dataset_id="agent_analytics",
)
# Extract business entities from agent traces
biz_nodes = cgm.extract_biz_nodes(session_ids=["sess-1"])
# Generate Property Graph DDL
ddl = cgm.get_property_graph_ddl(graph_name="my_context_graph")
# Traverse reasoning chains via GQL
chain = cgm.explain_decision(
decision_event_type="HITL_CONFIRMATION_REQUEST_COMPLETED",
biz_entity="Yahoo Homepage",
)
"""
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import field
from datetime import datetime
from datetime import timezone
import logging
from typing import Any, Optional
from google.cloud import bigquery
from pydantic import BaseModel
from pydantic import Field
from ._telemetry import LabeledBigQueryClient
from ._telemetry import make_bq_client
from ._telemetry import with_sdk_labels
logger = logging.getLogger("bigquery_agent_analytics." + __name__)
# ------------------------------------------------------------------ #
# Data Models #
# ------------------------------------------------------------------ #
@dataclass
class BizNode:
"""A business-domain entity extracted from agent traces.
Attributes:
span_id: The span from which this entity was extracted.
session_id: Session that produced this entity.
node_type: Entity category (e.g. "Product", "Targeting",
"Campaign", "Budget").
node_value: Entity value (e.g. "Yahoo Homepage",
"Millennials", "$8,000").
confidence: Extraction confidence score (0.0-1.0).
metadata: Additional extraction metadata.
"""
span_id: str
session_id: str
node_type: str
node_value: str
confidence: float = 1.0
evaluated_at: Optional[datetime] = None
artifact_uri: Optional[str] = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class DecisionPoint:
"""A decision point identified in an agent trace.
Represents a moment where the agent evaluated multiple candidates
and selected or rejected them based on scores and criteria.
Attributes:
decision_id: Unique identifier for this decision point.
session_id: Session that contains this decision.
span_id: The span where the decision was made.
decision_type: Category of decision (e.g. "audience_selection",
"placement_selection", "budget_allocation").
description: Human-readable description of the decision.
timestamp: When the decision was made.
metadata: Additional decision metadata.
"""
decision_id: str
session_id: str
span_id: str
decision_type: str
description: str = ""
timestamp: Optional[datetime] = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class Candidate:
"""A candidate option evaluated at a decision point.
Attributes:
candidate_id: Unique identifier for this candidate.
decision_id: The decision point this candidate belongs to.
session_id: Session containing this candidate.
name: Candidate name/label.
score: Evaluation score (0.0-1.0).
status: "SELECTED" or "DROPPED".
rejection_rationale: Why the candidate was dropped (required for
DROPPED candidates, supports EU audit compliance).
properties: Additional candidate properties (e.g. reach, cost).
"""
candidate_id: str
decision_id: str
session_id: str
name: str
score: float = 0.0
status: str = "SELECTED"
rejection_rationale: Optional[str] = None
properties: dict[str, Any] = field(default_factory=dict)
class WorldChangeAlert(BaseModel):
"""An alert indicating a business entity has changed since evaluation.
Attributes:
biz_node: The business entity that changed.
original_state: State at the time the agent evaluated it.
current_state: Current state.
drift_type: Type of drift (e.g. "unavailable",
"price_changed", "inventory_depleted").
severity: Drift severity (0.0-1.0).
recommendation: Suggested action.
"""
biz_node: str = Field(description="The business entity that changed.")
original_state: str = Field(description="State when the agent evaluated it.")
current_state: str = Field(description="Current state.")
drift_type: str = Field(description="Type of drift detected.")
severity: float = Field(description="Drift severity (0.0-1.0).")
recommendation: str = Field(
default="Review before approving.",
description="Suggested action.",
)
class WorldChangeReport(BaseModel):
"""Report on world-state drift for a long-running agent task.
Attributes:
session_id: The session under review.
alerts: List of detected world changes.
total_entities_checked: Number of entities checked.
stale_entities: Number of entities that drifted.
is_safe_to_approve: Whether the context is still valid.
checked_at: When the check was performed.
"""
session_id: str = Field(description="Session under review.")
alerts: list[WorldChangeAlert] = Field(default_factory=list)
total_entities_checked: int = Field(default=0)
stale_entities: int = Field(default=0)
is_safe_to_approve: bool = Field(default=True)
check_failed: bool = Field(
default=False,
description=(
"True when the underlying query or state check could "
"not complete. When True, is_safe_to_approve=False "
"and the report should NOT be used for HITL approval."
),
)
checked_at: datetime = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
model_config = {"arbitrary_types_allowed": True}
def summary(self) -> str:
"""Returns a human-readable summary."""
lines = [
f"World Change Report — Session: {self.session_id}",
f" Entities checked : {self.total_entities_checked}",
f" Stale entities : {self.stale_entities}",
f" Safe to approve : {self.is_safe_to_approve}",
]
if self.check_failed:
lines.append(" CHECK FAILED : query or state check error")
for alert in self.alerts:
lines.append(
f" [{alert.drift_type}] {alert.biz_node}: "
f"{alert.original_state} -> {alert.current_state} "
f"(severity={alert.severity:.2f})"
)
return "\n".join(lines)
class ContextGraphConfig(BaseModel):
"""Configuration for the Context Graph.
Attributes:
biz_nodes_table: Table name for extracted business nodes.
cross_links_table: Table name for cross-link edges.
graph_name: Name for the Property Graph.
endpoint: AI.GENERATE endpoint for entity extraction.
entity_types: Domain-specific entity types to extract.
max_hops: Maximum causal hops for GQL traversal.
"""
biz_nodes_table: str = Field(default="extracted_biz_nodes")
cross_links_table: str = Field(default="context_cross_links")
decision_points_table: str = Field(default="decision_points")
candidates_table: str = Field(default="candidates")
made_decision_edges_table: str = Field(default="made_decision_edges")
candidate_edges_table: str = Field(default="candidate_edges")
graph_name: str = Field(default="agent_context_graph")
endpoint: str = Field(default="gemini-2.5-flash")
entity_types: list[str] = Field(
default_factory=lambda: [
"Product",
"Targeting",
"Campaign",
"Budget",
"Audience",
"Creative",
"Placement",
]
)
max_hops: int = Field(default=20)
# ------------------------------------------------------------------ #
# SQL Templates #
# ------------------------------------------------------------------ #
_EXTRACT_BIZ_NODES_QUERY = """\
MERGE `{project}.{dataset}.{biz_table}` AS target
USING (
SELECT
CONCAT(base.span_id, ':', JSON_EXTRACT_SCALAR(entity, '$.entity_type'),
':', JSON_EXTRACT_SCALAR(entity, '$.entity_value')
) AS biz_node_id,
base.span_id,
base.session_id,
JSON_EXTRACT_SCALAR(entity, '$.entity_type') AS node_type,
JSON_EXTRACT_SCALAR(entity, '$.entity_value') AS node_value,
CAST(
COALESCE(JSON_EXTRACT_SCALAR(entity, '$.confidence'), '1.0')
AS FLOAT64
) AS confidence,
-- Persisted artifact URI from content_parts[].object_ref.uri
(SELECT JSON_EXTRACT_SCALAR(cp, '$.object_ref.uri')
FROM UNNEST(JSON_EXTRACT_ARRAY(TO_JSON_STRING(base.content_parts)))
AS cp WITH OFFSET
WHERE JSON_EXTRACT_SCALAR(cp, '$.object_ref.uri') IS NOT NULL
ORDER BY OFFSET LIMIT 1
) AS artifact_uri
FROM `{project}.{dataset}.{table}` AS base,
UNNEST(JSON_EXTRACT_ARRAY(
-- Strip markdown code fences (```json ... ```) from LLM output
REGEXP_REPLACE(
REGEXP_REPLACE(
AI.GENERATE(
CONCAT(
'Extract business entities from this agent payload. ',
'Entity types: {entity_types}. ',
'Return a JSON array of objects with entity_type, ',
'entity_value, and confidence (0-1).',
'\\n\\nPayload:\\n',
COALESCE(
JSON_EXTRACT_SCALAR(base.content, '$.text_summary'),
JSON_EXTRACT_SCALAR(base.content, '$.response'),
JSON_EXTRACT_SCALAR(base.content, '$.text'),
TO_JSON_STRING(base.content)
)
),
endpoint => '{endpoint}'
).result,
r'^```(?:json)?\\s*', ''),
r'\\s*```$', '')
)) AS entity
WHERE base.session_id IN UNNEST(@session_ids)
AND base.event_type IN (
'USER_MESSAGE_RECEIVED',
'LLM_RESPONSE',
'TOOL_COMPLETED',
'AGENT_COMPLETED'
)
AND base.content IS NOT NULL
) AS source
ON target.biz_node_id = source.biz_node_id
WHEN MATCHED THEN
UPDATE SET confidence = source.confidence,
artifact_uri = source.artifact_uri
WHEN NOT MATCHED BY TARGET THEN
INSERT (biz_node_id, span_id, session_id, node_type, node_value,
confidence, artifact_uri)
VALUES (source.biz_node_id, source.span_id, source.session_id,
source.node_type, source.node_value, source.confidence,
source.artifact_uri)
WHEN NOT MATCHED BY SOURCE
AND target.session_id IN UNNEST(@session_ids) THEN
DELETE
"""
_EXTRACT_BIZ_NODES_SIMPLE_QUERY = """\
SELECT
base.span_id,
base.session_id,
base.event_type,
COALESCE(
JSON_EXTRACT_SCALAR(base.content, '$.text_summary'),
JSON_EXTRACT_SCALAR(base.content, '$.response'),
JSON_EXTRACT_SCALAR(base.content, '$.text'),
TO_JSON_STRING(base.content)
) AS payload_text
FROM `{project}.{dataset}.{table}` AS base
WHERE base.session_id IN UNNEST(@session_ids)
AND base.event_type IN (
'USER_MESSAGE_RECEIVED',
'LLM_RESPONSE',
'TOOL_COMPLETED',
'AGENT_COMPLETED'
)
AND base.content IS NOT NULL
ORDER BY base.timestamp ASC
"""
_CREATE_BIZ_NODES_TABLE_QUERY = """\
CREATE TABLE IF NOT EXISTS `{project}.{dataset}.{biz_table}` (
biz_node_id STRING,
span_id STRING,
session_id STRING,
node_type STRING,
node_value STRING,
confidence FLOAT64,
artifact_uri STRING
)
"""
_DELETE_BIZ_NODES_FOR_SESSIONS_QUERY = """\
DELETE FROM `{project}.{dataset}.{biz_table}`
WHERE session_id IN UNNEST(@session_ids)
"""
_INSERT_BIZ_NODES_QUERY = """\
INSERT INTO `{project}.{dataset}.{biz_table}`
(biz_node_id, span_id, session_id, node_type, node_value, confidence)
VALUES
{values}
"""
_CREATE_CROSS_LINKS_TABLE_QUERY = """\
CREATE TABLE IF NOT EXISTS `{project}.{dataset}.{cross_links_table}` (
link_id STRING,
span_id STRING,
session_id STRING,
biz_node_id STRING,
node_value STRING,
link_type STRING,
artifact_uri STRING,
created_at TIMESTAMP
)
"""
_DELETE_CROSS_LINKS_FOR_SESSIONS_QUERY = """\
DELETE FROM `{project}.{dataset}.{cross_links_table}`
WHERE session_id IN UNNEST(@session_ids)
"""
_INSERT_CROSS_LINKS_QUERY = """\
INSERT INTO `{project}.{dataset}.{cross_links_table}`
(link_id, span_id, session_id, biz_node_id, node_value, link_type,
artifact_uri, created_at)
SELECT
b.biz_node_id AS link_id,
b.span_id,
b.session_id,
b.biz_node_id,
b.node_value,
'EVALUATED' AS link_type,
b.artifact_uri,
CURRENT_TIMESTAMP() AS created_at
FROM `{project}.{dataset}.{biz_table}` b
WHERE b.session_id IN UNNEST(@session_ids)
"""
_CREATE_DECISION_POINTS_TABLE_QUERY = """\
CREATE TABLE IF NOT EXISTS `{project}.{dataset}.{decision_points_table}` (
decision_id STRING,
session_id STRING,
span_id STRING,
decision_type STRING,
description STRING
)
"""
_CREATE_CANDIDATES_TABLE_QUERY = """\
CREATE TABLE IF NOT EXISTS `{project}.{dataset}.{candidates_table}` (
candidate_id STRING,
decision_id STRING,
session_id STRING,
name STRING,
score FLOAT64,
status STRING,
rejection_rationale STRING
)
"""
_CREATE_MADE_DECISION_EDGES_TABLE_QUERY = """\
CREATE TABLE IF NOT EXISTS
`{project}.{dataset}.{made_decision_edges_table}` (
edge_id STRING,
span_id STRING,
decision_id STRING,
created_at TIMESTAMP
)
"""
_CREATE_CANDIDATE_EDGES_TABLE_QUERY = """\
CREATE TABLE IF NOT EXISTS
`{project}.{dataset}.{candidate_edges_table}` (
edge_id STRING,
decision_id STRING,
candidate_id STRING,
edge_type STRING,
rejection_rationale STRING,
created_at TIMESTAMP
)
"""
_EXTRACT_DECISION_POINTS_QUERY = """\
SELECT
base.span_id,
base.session_id,
base.event_type,
COALESCE(
JSON_EXTRACT_SCALAR(base.content, '$.text_summary'),
JSON_EXTRACT_SCALAR(base.content, '$.response'),
JSON_EXTRACT_SCALAR(base.content, '$.text'),
TO_JSON_STRING(base.content)
) AS payload_text
FROM `{project}.{dataset}.{table}` AS base
WHERE base.session_id IN UNNEST(@session_ids)
AND base.event_type IN (
'LLM_RESPONSE',
'TOOL_COMPLETED',
'AGENT_COMPLETED',
'HITL_CONFIRMATION_REQUEST_COMPLETED'
)
AND base.content IS NOT NULL
ORDER BY base.timestamp ASC
"""
_EXTRACT_DECISION_POINTS_AI_QUERY = """\
SELECT
base.span_id,
base.session_id,
TO_JSON_STRING(
AI.GENERATE(
CONCAT(
'Extract agent decision points from the payload. ',
'A decision point is present only when the payload shows the ',
'agent evaluated multiple candidates/options and selected or ',
'rejected them. Return zero decisions if no such decision is ',
'present. Preserve candidate names and rejection rationale text ',
'from the payload. Use status SELECTED or DROPPED. Use null ',
'rejection_rationale for selected candidates. Scores must be ',
'FLOAT64 values between 0 and 1 when present, otherwise 0.0.',
'\\n\\nPayload:\\n',
COALESCE(
JSON_EXTRACT_SCALAR(base.content, '$.text_summary'),
JSON_EXTRACT_SCALAR(base.content, '$.response'),
JSON_EXTRACT_SCALAR(base.content, '$.text'),
TO_JSON_STRING(base.content)
)
),
endpoint => '{endpoint}',
model_params => JSON '{{"generationConfig": {{"temperature": 0.0, "topP": 0.1, "maxOutputTokens": 2048}}}}',
output_schema => 'decisions ARRAY<STRUCT<decision_type STRING, description STRING, candidates ARRAY<STRUCT<name STRING, score FLOAT64, status STRING, rejection_rationale STRING>>>>'
).decisions
) AS decisions_json
FROM `{project}.{dataset}.{table}` AS base
WHERE base.session_id IN UNNEST(@session_ids)
AND base.event_type IN (
'LLM_RESPONSE',
'TOOL_COMPLETED',
'AGENT_COMPLETED',
'HITL_CONFIRMATION_REQUEST_COMPLETED'
)
AND base.content IS NOT NULL
ORDER BY base.timestamp ASC
"""
_DELETE_DECISION_POINTS_FOR_SESSIONS_QUERY = """\
DELETE FROM `{project}.{dataset}.{decision_points_table}`
WHERE session_id IN UNNEST(@session_ids)
"""
_DELETE_CANDIDATES_FOR_SESSIONS_QUERY = """\
DELETE FROM `{project}.{dataset}.{candidates_table}`
WHERE session_id IN UNNEST(@session_ids)
"""
_DELETE_MADE_DECISION_EDGES_FOR_SESSIONS_QUERY = """\
DELETE FROM `{project}.{dataset}.{made_decision_edges_table}`
WHERE decision_id IN (
SELECT decision_id
FROM `{project}.{dataset}.{decision_points_table}`
WHERE session_id IN UNNEST(@session_ids)
)
"""
_DELETE_CANDIDATE_EDGES_FOR_SESSIONS_QUERY = """\
DELETE FROM `{project}.{dataset}.{candidate_edges_table}`
WHERE decision_id IN (
SELECT decision_id
FROM `{project}.{dataset}.{decision_points_table}`
WHERE session_id IN UNNEST(@session_ids)
)
"""
_INSERT_MADE_DECISION_EDGES_QUERY = """\
INSERT INTO `{project}.{dataset}.{made_decision_edges_table}`
(edge_id, span_id, decision_id, created_at)
SELECT
CONCAT(dp.span_id, ':MADE_DECISION:', dp.decision_id) AS edge_id,
dp.span_id,
dp.decision_id,
CURRENT_TIMESTAMP() AS created_at
FROM `{project}.{dataset}.{decision_points_table}` dp
WHERE dp.session_id IN UNNEST(@session_ids)
"""
_INSERT_CANDIDATE_EDGES_QUERY = """\
INSERT INTO `{project}.{dataset}.{candidate_edges_table}`
(edge_id, decision_id, candidate_id, edge_type,
rejection_rationale, created_at)
SELECT
CONCAT(c.decision_id, ':', c.status, ':', c.candidate_id) AS edge_id,
c.decision_id,
c.candidate_id,
CASE c.status
WHEN 'SELECTED' THEN 'SELECTED_CANDIDATE'
ELSE 'DROPPED_CANDIDATE'
END AS edge_type,
c.rejection_rationale,
CURRENT_TIMESTAMP() AS created_at
FROM `{project}.{dataset}.{candidates_table}` c
WHERE c.session_id IN UNNEST(@session_ids)
"""
_DECISION_POINTS_FOR_SESSION_QUERY = """\
SELECT
dp.decision_id,
dp.session_id,
dp.span_id,
dp.decision_type,
dp.description
FROM `{project}.{dataset}.{decision_points_table}` dp
WHERE dp.session_id = @session_id
"""
_CANDIDATES_FOR_DECISION_QUERY = """\
SELECT
c.candidate_id,
c.decision_id,
c.session_id,
c.name,
c.score,
c.status,
c.rejection_rationale
FROM `{project}.{dataset}.{candidates_table}` c
WHERE c.decision_id = @decision_id
ORDER BY c.score DESC
"""
_CANDIDATES_FOR_SESSION_QUERY = """\
SELECT
c.candidate_id,
c.decision_id,
c.session_id,
c.name,
c.score,
c.status,
c.rejection_rationale
FROM `{project}.{dataset}.{candidates_table}` c
WHERE c.session_id = @session_id
ORDER BY c.decision_id, c.score DESC
"""
_PROPERTY_GRAPH_DDL = """\
CREATE OR REPLACE PROPERTY GRAPH `{project}.{dataset}.{graph_name}`
NODE TABLES (
-- Technical execution nodes (spans from ADK plugin)
`{project}.{dataset}.{table}` AS TechNode
KEY (span_id)
LABEL TechNode
PROPERTIES (
event_type,
agent,
timestamp,
session_id,
invocation_id,
content,
latency_ms,
status,
error_message
),
-- Business domain nodes (extracted entities, keyed by composite ID)
`{project}.{dataset}.{biz_table}` AS BizNode
KEY (biz_node_id)
LABEL BizNode
PROPERTIES (
node_type,
node_value,
confidence,
session_id,
span_id,
artifact_uri
)
)
EDGE TABLES (
-- Causal lineage: parent span -> child span
`{project}.{dataset}.{table}` AS Caused
KEY (span_id)
SOURCE KEY (parent_span_id) REFERENCES TechNode (span_id)
DESTINATION KEY (span_id) REFERENCES TechNode (span_id)
LABEL Caused,
-- Cross-link: technical event -> business entity it evaluated
`{project}.{dataset}.{cross_links_table}` AS Evaluated
KEY (link_id)
SOURCE KEY (span_id) REFERENCES TechNode (span_id)
DESTINATION KEY (biz_node_id) REFERENCES BizNode (biz_node_id)
LABEL Evaluated
PROPERTIES (
artifact_uri,
link_type,
created_at
)
)
"""
_GQL_REASONING_CHAIN_QUERY = """\
GRAPH `{project}.{dataset}.{graph_name}`
MATCH
(decision:TechNode)-[c:Caused]->{{1,{max_hops}}}(step:TechNode)
-[e:Evaluated]->(biz:BizNode)
WHERE decision.event_type = @decision_event_type
{biz_filter_clause}
RETURN
TO_JSON(decision) AS decision_node,
decision.span_id AS decision_span_id,
decision.event_type AS decision_type,
step.span_id AS reasoning_span_id,
step.event_type AS step_type,
step.agent AS step_agent,
COALESCE(
JSON_EXTRACT_SCALAR(step.content, '$.text_summary'),
JSON_EXTRACT_SCALAR(step.content, '$.response'),
''
) AS reasoning_text,
step.latency_ms AS step_latency_ms,
biz.node_type AS entity_type,
biz.node_value AS entity_value,
biz.confidence AS entity_confidence,
TO_JSON(step) AS step_node,
TO_JSON(biz) AS biz_node
ORDER BY step.timestamp ASC
LIMIT @result_limit
"""
_GQL_FULL_CAUSAL_CHAIN_QUERY = """\
GRAPH `{project}.{dataset}.{graph_name}`
MATCH
(root:TechNode)-[c:Caused]->{{1,{max_hops}}}(leaf:TechNode)
WHERE root.session_id = @session_id
AND root.event_type = 'USER_MESSAGE_RECEIVED'
RETURN
TO_JSON(root) AS root_node,
root.span_id AS root_span_id,
leaf.span_id AS leaf_span_id,
leaf.event_type AS leaf_event_type,
leaf.agent AS leaf_agent,
COALESCE(
JSON_EXTRACT_SCALAR(leaf.content, '$.text_summary'),
JSON_EXTRACT_SCALAR(leaf.content, '$.response'),
''
) AS leaf_content,
leaf.latency_ms AS leaf_latency_ms,
TO_JSON(leaf) AS leaf_node,
TO_JSON(c) AS edge
ORDER BY leaf.timestamp ASC
LIMIT @result_limit
"""
_GQL_TRACE_RECONSTRUCTION_QUERY = """\
GRAPH `{project}.{dataset}.{graph_name}`
MATCH
(parent:TechNode)-[c:Caused]->(child:TechNode)
WHERE parent.session_id = @session_id
OR child.session_id = @session_id
RETURN
parent.span_id AS parent_span_id,
parent.event_type AS parent_event_type,
parent.agent AS parent_agent,
parent.timestamp AS parent_timestamp,
parent.session_id AS session_id,
parent.invocation_id AS parent_invocation_id,
parent.content AS parent_content,
parent.latency_ms AS parent_latency_ms,
parent.status AS parent_status,
parent.error_message AS parent_error_message,
child.span_id AS child_span_id,
child.event_type AS child_event_type,
child.agent AS child_agent,
child.timestamp AS child_timestamp,
child.invocation_id AS child_invocation_id,
child.content AS child_content,
child.latency_ms AS child_latency_ms,
child.status AS child_status,
child.error_message AS child_error_message
ORDER BY child.timestamp ASC
LIMIT @result_limit
"""
_BIZ_NODES_FOR_SESSION_QUERY = """\
SELECT
biz_node_id,
node_type,
node_value,
confidence,
span_id,
session_id,
artifact_uri
FROM `{project}.{dataset}.{biz_table}`
WHERE session_id = @session_id
ORDER BY confidence DESC
"""
_WORLD_CHANGE_CHECK_QUERY = """\
SELECT
b.node_type,
b.node_value,
b.confidence,
b.span_id,
e.timestamp AS evaluated_at
FROM `{project}.{dataset}.{biz_table}` b
JOIN `{project}.{dataset}.{table}` e
ON b.span_id = e.span_id
WHERE b.session_id = @session_id
ORDER BY e.timestamp ASC
"""
# ------------------------------------------------------------------ #
# Decision Semantics: Extended Property Graph DDL #
# ------------------------------------------------------------------ #
_DECISION_PROPERTY_GRAPH_DDL = """\
CREATE OR REPLACE PROPERTY GRAPH `{project}.{dataset}.{graph_name}`
NODE TABLES (
-- Technical execution nodes (spans from ADK plugin)
`{project}.{dataset}.{table}` AS TechNode
KEY (span_id)
LABEL TechNode
PROPERTIES (
event_type,
agent,
timestamp,
session_id,
invocation_id,
content,
latency_ms,
status,
error_message
),
-- Business domain nodes (extracted entities)
`{project}.{dataset}.{biz_table}` AS BizNode
KEY (biz_node_id)
LABEL BizNode
PROPERTIES (
node_type,
node_value,
confidence,
session_id,
span_id,
artifact_uri
),
-- Decision point nodes
`{project}.{dataset}.{decision_points_table}` AS DecisionPoint
KEY (decision_id)
LABEL DecisionPoint
PROPERTIES (
session_id,
span_id,
decision_type,
description
),
-- Candidate nodes
`{project}.{dataset}.{candidates_table}` AS CandidateNode
KEY (candidate_id)
LABEL CandidateNode
PROPERTIES (
decision_id,
session_id,
name,
score,
status,
rejection_rationale
)
)
EDGE TABLES (
-- Causal lineage: parent span -> child span
`{project}.{dataset}.{table}` AS Caused
KEY (span_id)
SOURCE KEY (parent_span_id) REFERENCES TechNode (span_id)
DESTINATION KEY (span_id) REFERENCES TechNode (span_id)
LABEL Caused,
-- Cross-link: technical event -> business entity it evaluated
`{project}.{dataset}.{cross_links_table}` AS Evaluated
KEY (link_id)
SOURCE KEY (span_id) REFERENCES TechNode (span_id)
DESTINATION KEY (biz_node_id) REFERENCES BizNode (biz_node_id)
LABEL Evaluated
PROPERTIES (
artifact_uri,
link_type,
created_at
),
-- TechNode -> DecisionPoint (span that made the decision)
`{project}.{dataset}.{made_decision_edges_table}` AS MadeDecision
KEY (edge_id)
SOURCE KEY (span_id) REFERENCES TechNode (span_id)
DESTINATION KEY (decision_id) REFERENCES DecisionPoint (decision_id)
LABEL MadeDecision,
-- DecisionPoint -> CandidateNode (selected or dropped)
`{project}.{dataset}.{candidate_edges_table}` AS CandidateEdge
KEY (edge_id)
SOURCE KEY (decision_id) REFERENCES DecisionPoint (decision_id)
DESTINATION KEY (candidate_id) REFERENCES CandidateNode (candidate_id)
LABEL CandidateEdge
PROPERTIES (
edge_type,
rejection_rationale,
created_at
)
)
"""
# ------------------------------------------------------------------ #
# Decision Semantics: GQL Queries #
# ------------------------------------------------------------------ #
_GQL_EU_AUDIT_QUERY = """\
GRAPH `{project}.{dataset}.{graph_name}`
MATCH
(step:TechNode)-[md:MadeDecision]->(dp:DecisionPoint)
-[ce:CandidateEdge]->(cand:CandidateNode)
WHERE dp.session_id = @session_id
{decision_type_clause}
RETURN
dp.decision_id,
dp.decision_type,
dp.description AS decision_description,
cand.name AS candidate_name,
cand.score AS candidate_score,
cand.status AS candidate_status,
cand.rejection_rationale,
ce.edge_type,
step.span_id,
step.event_type,
step.agent
ORDER BY dp.decision_id, cand.score DESC
LIMIT @result_limit
"""
_GQL_DROPPED_CANDIDATES_QUERY = """\
GRAPH `{project}.{dataset}.{graph_name}`
MATCH
(dp:DecisionPoint)-[ce:CandidateEdge]->(cand:CandidateNode)
WHERE dp.session_id = @session_id
AND ce.edge_type = 'DROPPED_CANDIDATE'
RETURN
dp.decision_id,
dp.decision_type,
dp.description AS decision_description,
cand.name AS candidate_name,
cand.score AS candidate_score,
cand.rejection_rationale
ORDER BY dp.decision_id, cand.score DESC
LIMIT @result_limit
"""
def _dedupe_rows_by_key(
rows: list[dict[str, Any]], key: str
) -> list[dict[str, Any]]:
"""Return ``rows`` with one entry per ``rows[i][key]``, last-wins.
The property-graph DDL declares ``decision_id`` and
``candidate_id`` (and ``biz_node_id``) as ``KEY``, so the backing
tables must have at most one row per key. This helper handles
the **in-batch** duplicate case: ``AI.GENERATE`` can return
multiple objects that the SDK maps to the same composite key
(e.g. the same decision surfacing from two LLM_RESPONSE rows).
The cross-batch / rerun case (a prior load is invisible to a
subsequent ``DELETE``) is handled by writing through
``_append_rows_via_load_job`` — load jobs write to managed
storage, not the streaming buffer.
"""
seen: dict[Any, dict[str, Any]] = {}
for row in rows:
seen[row[key]] = row
return list(seen.values())
# ------------------------------------------------------------------ #
# ContextGraphManager #
# ------------------------------------------------------------------ #
class ContextGraphManager:
"""Manages the Context Graph linking technical traces to business entities.
This is the main entry point for building and querying the
"System of Reasoning" Property Graph.
Args:
project_id: Google Cloud project ID.
dataset_id: BigQuery dataset ID.
table_id: Agent events table name.
config: Optional context graph configuration.
client: Optional BigQuery client instance.
"""
def __init__(
self,
project_id: str,
dataset_id: str,
table_id: str = "agent_events",
config: Optional[ContextGraphConfig] = None,
client: Optional[bigquery.Client] = None,
location: str = "US",
) -> None:
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.config = config or ContextGraphConfig()
self._client = client
self._warned_unlabeled_client = False
self.location = location