Skip to content

Commit 7085990

Browse files
committed
Refactor to be purely event-driven in order to support direct Rpc use
1 parent f8b9c73 commit 7085990

12 files changed

Lines changed: 659 additions & 940 deletions

dotnet/src/CopilotTelemetry.cs

Lines changed: 320 additions & 537 deletions
Large diffs are not rendered by default.

dotnet/src/Session.cs

Lines changed: 28 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public partial class CopilotSession : IAsyncDisposable
5454
private event SessionEventHandler? _eventHandlers;
5555
private readonly Dictionary<string, AIFunction> _toolHandlers = new();
5656
private readonly JsonRpc _rpc;
57-
private readonly CopilotTelemetry.AgentTurnTracker? _turnTracker;
57+
private readonly CopilotTelemetry.AgentTurnTracker? _telemetryTracker;
5858
private volatile PermissionRequestHandler? _permissionHandler;
5959
private volatile UserInputHandler? _userInputHandler;
6060
private SessionHooks? _hooks;
@@ -82,12 +82,12 @@ public partial class CopilotSession : IAsyncDisposable
8282
/// </value>
8383
public string? WorkspacePath { get; internal set; }
8484

85-
internal string TelemetryProviderName => _turnTracker?.ProviderName ?? OpenTelemetryConsts.DefaultProviderName;
86-
internal string? TelemetryServerAddress => _turnTracker?.ServerAddress;
87-
internal int? TelemetryServerPort => _turnTracker?.ServerPort;
88-
internal ActivityContext TelemetryActivityContext => _turnTracker?.GetActivityContext() ?? default;
85+
internal string TelemetryProviderName => _telemetryTracker?.ProviderName ?? OpenTelemetryConsts.DefaultProviderName;
86+
internal string? TelemetryServerAddress => _telemetryTracker?.ServerAddress;
87+
internal int? TelemetryServerPort => _telemetryTracker?.ServerPort;
88+
internal ActivityContext TelemetryActivityContext => _telemetryTracker?.GetActivityContext() ?? default;
8989
internal ActivityContext GetTelemetryToolCallParentContext(string toolCallId) =>
90-
_turnTracker?.GetToolCallParentContext(toolCallId) ?? default;
90+
_telemetryTracker?.GetToolCallParentContext(toolCallId) ?? default;
9191

9292
/// <summary>
9393
/// Initializes a new instance of the <see cref="CopilotSession"/> class.
@@ -119,7 +119,7 @@ internal CopilotSession(
119119
{
120120
SessionId = sessionId;
121121
_rpc = rpc;
122-
_turnTracker = telemetry is not null ? new CopilotTelemetry.AgentTurnTracker(telemetry, sessionId, model, provider, systemMessage, tools, streaming, agentName, agentDescription) : null;
122+
_telemetryTracker = telemetry is not null ? new CopilotTelemetry.AgentTurnTracker(telemetry, sessionId, model, provider, systemMessage, tools, streaming, agentName, agentDescription) : null;
123123
WorkspacePath = workspacePath;
124124
}
125125

@@ -156,27 +156,18 @@ private Task<T> InvokeRpcAsync<T>(string method, object?[]? args, CancellationTo
156156
/// </example>
157157
public async Task<string> SendAsync(MessageOptions options, CancellationToken cancellationToken = default)
158158
{
159-
_turnTracker?.BeginSend(options.Prompt);
160-
try
159+
var request = new SendMessageRequest
161160
{
162-
var request = new SendMessageRequest
163-
{
164-
SessionId = SessionId,
165-
Prompt = options.Prompt,
166-
Attachments = options.Attachments,
167-
Mode = options.Mode
168-
};
161+
SessionId = SessionId,
162+
Prompt = options.Prompt,
163+
Attachments = options.Attachments,
164+
Mode = options.Mode
165+
};
169166

170-
var response = await InvokeRpcAsync<SendMessageResponse>(
171-
"session.send", [request], cancellationToken);
167+
var response = await InvokeRpcAsync<SendMessageResponse>(
168+
"session.send", [request], cancellationToken);
172169

173-
return response.MessageId;
174-
}
175-
catch (Exception ex) when (_turnTracker is { } tracker)
176-
{
177-
tracker.CompleteTurnWithError(ex);
178-
throw;
179-
}
170+
return response.MessageId;
180171
}
181172

182173
/// <summary>
@@ -238,28 +229,18 @@ void Handler(SessionEvent evt)
238229

239230
await SendAsync(options, cancellationToken);
240231

241-
try
242-
{
243-
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
244-
cts.CancelAfter(effectiveTimeout);
245-
246-
using var registration = cts.Token.Register(() =>
247-
{
248-
if (cancellationToken.IsCancellationRequested)
249-
tcs.TrySetCanceled(cancellationToken);
250-
else
251-
tcs.TrySetException(new TimeoutException($"SendAndWaitAsync timed out after {effectiveTimeout}"));
252-
});
232+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
233+
cts.CancelAfter(effectiveTimeout);
253234

254-
return await tcs.Task;
255-
}
256-
catch (Exception ex) when (_turnTracker is { } tracker)
235+
using var registration = cts.Token.Register(() =>
257236
{
258-
// If timeout/cancellation occurs before DispatchEvent handles the turn-ending event,
259-
// complete the telemetry span with the error (idempotent if already completed).
260-
tracker.CompleteTurnWithError(ex);
261-
throw;
262-
}
237+
if (cancellationToken.IsCancellationRequested)
238+
tcs.TrySetCanceled(cancellationToken);
239+
else
240+
tcs.TrySetException(new TimeoutException($"SendAndWaitAsync timed out after {effectiveTimeout}"));
241+
});
242+
243+
return await tcs.Task;
263244
}
264245

265246
/// <summary>
@@ -309,7 +290,7 @@ public IDisposable On(SessionEventHandler handler)
309290
/// </remarks>
310291
internal void DispatchEvent(SessionEvent sessionEvent)
311292
{
312-
_turnTracker?.ProcessEvent(sessionEvent);
293+
_telemetryTracker?.ProcessEvent(sessionEvent);
313294

314295
// Reading the field once gives us a snapshot; delegates are immutable.
315296
_eventHandlers?.Invoke(sessionEvent);
@@ -620,7 +601,7 @@ await InvokeRpcAsync<object>(
620601

621602
_eventHandlers = null;
622603
_toolHandlers.Clear();
623-
_turnTracker?.CompleteOnDispose();
604+
_telemetryTracker?.CompleteOnDispose();
624605

625606
_permissionHandler = null;
626607
}

go/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1354,7 +1354,7 @@ func (c *Client) executeToolCall(
13541354

13551355
if c.telemetry != nil {
13561356
toolSpanCtx := context.Background()
1357-
if session.turnTracker != nil {
1357+
if session.telemetryTracker != nil {
13581358
if tCtx := session.getToolCallParentContext(toolCallID); tCtx != nil {
13591359
toolSpanCtx = tCtx
13601360
}
@@ -1380,10 +1380,10 @@ func (c *Client) executeToolCall(
13801380
providerName := otelDefaultProviderName
13811381
var serverAddress string
13821382
var serverPort int
1383-
if session.turnTracker != nil {
1384-
providerName = session.turnTracker.getProviderName()
1385-
serverAddress = session.turnTracker.getServerAddress()
1386-
serverPort = session.turnTracker.getServerPort()
1383+
if session.telemetryTracker != nil {
1384+
providerName = session.telemetryTracker.getProviderName()
1385+
serverAddress = session.telemetryTracker.getServerAddress()
1386+
serverPort = session.telemetryTracker.getServerPort()
13871387
}
13881388
ctx := spanCtx
13891389
if ctx == nil {

go/copilot_telemetry.go

Lines changed: 51 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -573,8 +573,6 @@ type agentTurnTracker struct {
573573
turnID *string
574574
turnInteractionID *string
575575

576-
// Stashed user prompt for the first chat turn
577-
pendingUserPrompt string
578576
}
579577

580578
func newAgentTurnTracker(telemetry *copilotTelemetry, sessionID string, model string, provider *ProviderConfig, systemMessage *SystemMessageConfig, tools []Tool, streaming bool, agentName string, agentDescription string) *agentTurnTracker {
@@ -641,45 +639,34 @@ func (t *agentTurnTracker) completeOnDispose() {
641639
}
642640
}
643641

644-
// beginSend is called at the start of Send() to start a span and record the user message.
645-
func (t *agentTurnTracker) beginSend(ctx context.Context, prompt string) {
646-
t.mu.Lock()
647-
defer t.mu.Unlock()
648-
if t.agentSpan == nil {
649-
spanCtx, span := t.telemetry.startInvokeAgentSpan(
650-
ctx,
651-
t.sessionID,
652-
t.requestModel,
653-
t.providerName,
654-
t.serverAddress,
655-
t.serverPort,
656-
t.agentName,
657-
t.agentDescription,
658-
)
659-
t.agentSpan = span
660-
t.agentSpanCtx = spanCtx
661-
t.agentStartTime = time.Now()
662-
t.agentInputMsgs = nil
663-
t.agentOutputMsgs = nil
664-
}
665-
666-
// Agent-level input = what the caller sent (all user prompts).
667-
if prompt != "" {
668-
t.agentInputMsgs = append(t.agentInputMsgs, otelMsg{
669-
Role: "user",
670-
Parts: []otelPart{{Type: "text", Content: prompt}},
671-
})
672-
}
673-
674-
// Stash user prompt for the first chat turn's input messages.
675-
t.pendingUserPrompt = prompt
676-
}
677-
678642
// processEvent handles telemetry enrichment for dispatched events.
679643
func (t *agentTurnTracker) processEvent(event SessionEvent) {
680644
t.mu.Lock()
681645
defer t.mu.Unlock()
682646

647+
// A user.message event starts a new invoke_agent span (if not already
648+
// active) and records the user prompt.
649+
if event.Type == UserMessage {
650+
prompt := ""
651+
if event.Data.Content != nil {
652+
prompt = *event.Data.Content
653+
}
654+
t.ensureAgentSpan()
655+
656+
if prompt != "" {
657+
t.agentInputMsgs = append(t.agentInputMsgs, otelMsg{
658+
Role: "user",
659+
Parts: []otelPart{{Type: "text", Content: prompt}},
660+
})
661+
t.turnInputMsgs = append(t.turnInputMsgs, otelMsg{
662+
Role: "user",
663+
Parts: []otelPart{{Type: "text", Content: prompt}},
664+
})
665+
}
666+
667+
return
668+
}
669+
683670
// Route subagent events by parentToolCallId.
684671
parentToolCallID := getParentToolCallID(event)
685672
if parentToolCallID != "" {
@@ -968,23 +955,43 @@ func (t *agentTurnTracker) processEvent(event SessionEvent) {
968955
}
969956
}
970957

971-
// completeTurnWithError completes the current turn with an error.
972-
func (t *agentTurnTracker) completeTurnWithError(err error) {
973-
t.mu.Lock()
974-
defer t.mu.Unlock()
975-
t.completeChatTurnLocked(err)
976-
t.completeAgentTurnLocked(err)
977-
}
978-
979958
// ============================================================================
980959
// Chat turn lifecycle
981960
// ============================================================================
982961

983962
// beginChatTurnLocked starts a new chat child span for an LLM turn. Caller must hold mu.
963+
// ensureAgentSpan ensures the invoke_agent span exists, creating it on demand
964+
// if needed. Called from both the user.message handler and beginChatTurnLocked
965+
// so that RPC-initiated turns (no user.message) still get an agent span.
966+
// Caller must hold mu.
967+
func (t *agentTurnTracker) ensureAgentSpan() {
968+
if t.agentSpan == nil {
969+
spanCtx, span := t.telemetry.startInvokeAgentSpan(
970+
context.Background(),
971+
t.sessionID,
972+
t.requestModel,
973+
t.providerName,
974+
t.serverAddress,
975+
t.serverPort,
976+
t.agentName,
977+
t.agentDescription,
978+
)
979+
t.agentSpan = span
980+
t.agentSpanCtx = spanCtx
981+
t.agentStartTime = time.Now()
982+
t.agentInputMsgs = nil
983+
t.agentOutputMsgs = nil
984+
}
985+
}
986+
984987
func (t *agentTurnTracker) beginChatTurnLocked() {
985988
// If there's already an active turn, complete it first.
986989
t.completeChatTurnLocked(nil)
987990

991+
// Ensure the parent agent span exists — covers RPC-initiated turns
992+
// where no user.message event preceded the assistant.turn_start.
993+
t.ensureAgentSpan()
994+
988995
t.turnResponseModel = ""
989996
t.turnResponseID = ""
990997
t.turnInputTokens = 0
@@ -993,7 +1000,6 @@ func (t *agentTurnTracker) beginChatTurnLocked() {
9931000
t.turnCacheCreationTokens = 0
9941001
t.firstOutputChunkRecorded = false
9951002
t.lastOutputChunkTime = time.Time{}
996-
t.turnInputMsgs = nil
9971003
t.turnOutputMsgs = nil
9981004
t.turnCost = nil
9991005
t.turnServerDuration = nil
@@ -1002,15 +1008,6 @@ func (t *agentTurnTracker) beginChatTurnLocked() {
10021008
t.turnID = nil
10031009
t.turnInteractionID = nil
10041010

1005-
// Add stashed user prompt as input message for the first turn.
1006-
if t.pendingUserPrompt != "" {
1007-
t.turnInputMsgs = append(t.turnInputMsgs, otelMsg{
1008-
Role: "user",
1009-
Parts: []otelPart{{Type: "text", Content: t.pendingUserPrompt}},
1010-
})
1011-
t.pendingUserPrompt = ""
1012-
}
1013-
10141011
parentCtx := t.agentSpanCtx
10151012
if parentCtx == nil {
10161013
parentCtx = context.Background()
@@ -1222,7 +1219,6 @@ func (t *agentTurnTracker) completeAgentTurnLocked(err error) {
12221219
t.agentSpan = nil
12231220
t.agentSpanCtx = nil
12241221
t.agentStartTime = time.Time{}
1225-
t.pendingUserPrompt = ""
12261222
t.agentInputMsgs = nil
12271223
t.agentOutputMsgs = nil
12281224

0 commit comments

Comments
 (0)