Skip to content

Commit 93ee0a7

Browse files
committed
Refactor LitAgentRunnerTests to support configurable chat client and options
- Updated CreateAgent method to accept optional chatClient, options, and loggerFactory parameters. - Modified test methods to utilize enqueued rollouts and validate spans. - Added a new test for retrying failed rollouts with a flaky chat client. Enhance InMemoryLightningStoreTests with additional functionality - Added tests for StartRollout, StartAttempt, WaitForRollouts, and GetNextSpanSequenceId methods. - Improved QueryRollouts to filter by status. Implement TraceToTripletAdapter for converting spans to triplet trajectories - Created TracerTraceToTripletAdapter class to handle span conversion. - Added logic for reward matching policies and deduplication of LLM calls. Add unit tests for TracerTraceToTripletAdapter - Implemented tests to validate token and reward projection, deduplication, and reward policies. Introduce FlakyChatClient for simulating intermittent failures in tests - Created FlakyChatClient to simulate failures before returning a successful response.
1 parent b0bf425 commit 93ee0a7

File tree

10 files changed

+1989
-125
lines changed

10 files changed

+1989
-125
lines changed

MIGRATION_PLAN.md

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,18 @@ This plan tracks parity work between `external/microsoft-agent-lightning` (Pytho
2626

2727
| Adapter | Python Source | Status | Notes |
2828
| --- | --- | --- | --- |
29-
| Trace → messages | `adapter/messages.py` | 🚧 | Convert spans to chat history for `IChatClient` |
30-
| Trace → triplets | `adapter/triplet.py` | 🚧 | Build reward-aware triplets from spans |
29+
| Trace → messages | `adapter/messages.py` | | `TraceToMessagesAdapter` translates GenAI spans into OpenAI chat payloads |
30+
| Trace → triplets | `adapter/triplet.py` | | `TracerTraceToTripletAdapter` exports triplets with reward policies |
3131
| OTEL trace adapter | `adapter/base.py` | 🚧 | Hook Activity -> SpanModel bridging |
3232

3333
## Execution & Store Layers
3434

3535
| Component | Python Source | Status | Notes |
3636
| --- | --- | --- | --- |
37-
| LightningStore (async) | `store/base.py` | 🚧 | Async contract mirroring enqueue/start/query APIs |
38-
| In-memory store | `store/memory.py` | 🚧 | Production-grade concurrency port |
37+
| LightningStore (async) | `store/base.py` | | `ILightningStore` exposes start/enqueue/start-attempt, span sequencing, and wait semantics |
38+
| In-memory store | `store/memory.py` | | Expanded store handles attempts, spans, resources, and polling waits with thread-safe state |
3939
| Client/server bridge | `store/client_server.py` || Decide ASP.NET hosting approach |
40-
| Runner execution strategies | `execution/*` | 🚧 | Channel/task-based equivalents |
40+
| Runner execution strategies | `execution/*` | 🚧 | C# runner tracks worker ids, retries via store requeue; parallel orchestration still pending |
4141

4242
## Algorithms & Training Pipelines
4343

@@ -97,11 +97,10 @@ This plan tracks parity work between `external/microsoft-agent-lightning` (Pytho
9797

9898
## Near-Term Priorities
9999

100-
1. Port trace adapters (messages, triplets) to convert spans into actionable payloads.
101-
2. Implement store abstractions (interface + in-memory implementation) to unblock runner tests.
102-
3. Port runner orchestration and cover end-to-end execution (agent + store + adapters).
103-
4. Reproduce key Python fixtures/tests for adapters and store logic.
104-
5. Plan hosting story for LLM proxy / legacy endpoints (document decisions).
100+
1. Expand runner execution strategies (parallel workers, retries, resource coordination).
101+
2. Reproduce key Python fixtures/tests for adapters, store logic, and integration flows.
102+
3. Plan hosting story for LLM proxy / legacy endpoints and capture decisions.
103+
4. Map persistence backends beyond in-memory (client/server bridge, durable stores).
105104

106105
## Tracking Guidance
107106

src/ManagedCode.AgentLightning.AgentRuntime/Runner/LitAgentRunner.cs

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
14
using ManagedCode.AgentLightning.Core.Adapters;
25
using ManagedCode.AgentLightning.Core.Models;
36
using ManagedCode.AgentLightning.Core.Stores;
@@ -15,6 +18,7 @@ public sealed class LitAgentRunner
1518
private readonly ILightningStore _store;
1619
private readonly ILogger<LitAgentRunner> _logger;
1720
private readonly TimeProvider _timeProvider;
21+
private string? _workerId;
1822

1923
public LitAgentRunner(
2024
LightningAgent agent,
@@ -37,7 +41,7 @@ public async Task<IReadOnlyList<LightningExecutionResult>> RunBatchAsync(int exp
3741

3842
var results = new List<LightningExecutionResult>(expectedRollouts);
3943

40-
for (var i = 0; i < expectedRollouts; i++)
44+
while (results.Count < expectedRollouts && !cancellationToken.IsCancellationRequested)
4145
{
4246
var attempted = await _store.DequeueRolloutAsync(cancellationToken).ConfigureAwait(false);
4347
if (attempted is null)
@@ -47,29 +51,87 @@ public async Task<IReadOnlyList<LightningExecutionResult>> RunBatchAsync(int exp
4751

4852
try
4953
{
54+
var workerId = GetWorkerId();
55+
attempted.Attempt.AttachWorker(workerId);
56+
attempted.Attempt.UpdateStatus(AttemptStatus.Running);
57+
attempted.Attempt.Touch(_timeProvider.GetUtcNow());
58+
await _store.UpdateAttemptAsync(attempted.Attempt, cancellationToken).ConfigureAwait(false);
59+
5060
var execution = await _agent.ExecuteAsync(attempted.Rollout.Input, cancellationToken).ConfigureAwait(false);
5161
results.Add(execution);
5262

5363
attempted.Attempt.UpdateStatus(execution.Attempt.Status, execution.Attempt.EndTime ?? _timeProvider.GetUtcNow());
5464
await _store.UpdateAttemptAsync(attempted.Attempt, cancellationToken).ConfigureAwait(false);
55-
await _store.UpdateRolloutStatusAsync(attempted.Rollout.RolloutId, execution.Rollout.Status, execution.Rollout.EndTime, cancellationToken).ConfigureAwait(false);
5665

57-
var span = BuildSpanFromResult(attempted, execution);
58-
await _store.AddSpanAsync(attempted.Rollout.RolloutId, attempted.Attempt.AttemptId, span, cancellationToken).ConfigureAwait(false);
66+
if (execution.Attempt.Status == AttemptStatus.Succeeded)
67+
{
68+
await _store.UpdateRolloutStatusAsync(
69+
attempted.Rollout.RolloutId,
70+
execution.Rollout.Status,
71+
execution.Rollout.EndTime ?? _timeProvider.GetUtcNow(),
72+
cancellationToken).ConfigureAwait(false);
73+
74+
var sequenceId = await _store.GetNextSpanSequenceIdAsync(
75+
attempted.Rollout.RolloutId,
76+
attempted.Attempt.AttemptId,
77+
cancellationToken).ConfigureAwait(false);
78+
79+
var span = BuildSpanFromResult(attempted, execution, sequenceId);
80+
await _store.AddSpanAsync(span, cancellationToken).ConfigureAwait(false);
81+
}
82+
else
83+
{
84+
await HandleAttemptFailureAsync(attempted, cancellationToken).ConfigureAwait(false);
85+
}
5986
}
6087
catch (Exception ex) when (!cancellationToken.IsCancellationRequested)
6188
{
6289
attempted.Attempt.UpdateStatus(AttemptStatus.Failed, _timeProvider.GetUtcNow());
6390
await _store.UpdateAttemptAsync(attempted.Attempt, cancellationToken).ConfigureAwait(false);
64-
await _store.UpdateRolloutStatusAsync(attempted.Rollout.RolloutId, RolloutStatus.Failed, _timeProvider.GetUtcNow(), cancellationToken).ConfigureAwait(false);
91+
await HandleAttemptFailureAsync(attempted, cancellationToken).ConfigureAwait(false);
6592
_logger.LogError(ex, "Runner failed while executing rollout {RolloutId}.", attempted.Rollout.RolloutId);
6693
}
6794
}
6895

6996
return results;
7097
}
7198

72-
private static SpanModel BuildSpanFromResult(AttemptedRollout attempted, LightningExecutionResult execution)
99+
private string GetWorkerId() =>
100+
_workerId ??= $"runner-{Environment.CurrentManagedThreadId}";
101+
102+
private async Task HandleAttemptFailureAsync(AttemptedRollout attempted, CancellationToken cancellationToken)
103+
{
104+
var shouldRetry = ShouldRetry(attempted.Rollout, attempted.Attempt);
105+
var status = shouldRetry ? RolloutStatus.Requeuing : RolloutStatus.Failed;
106+
var endTime = shouldRetry ? (DateTimeOffset?)null : _timeProvider.GetUtcNow();
107+
108+
await _store.UpdateRolloutStatusAsync(
109+
attempted.Rollout.RolloutId,
110+
status,
111+
endTime,
112+
cancellationToken).ConfigureAwait(false);
113+
114+
if (shouldRetry)
115+
{
116+
_logger.LogInformation("Rollout {RolloutId} requeued after attempt {AttemptId} failed.", attempted.Rollout.RolloutId, attempted.Attempt.AttemptId);
117+
}
118+
else
119+
{
120+
_logger.LogWarning("Rollout {RolloutId} failed after attempt {AttemptId}.", attempted.Rollout.RolloutId, attempted.Attempt.AttemptId);
121+
}
122+
}
123+
124+
private static bool ShouldRetry(Rollout rollout, Attempt attempt)
125+
{
126+
if (attempt.SequenceId >= rollout.Config.MaxAttempts)
127+
{
128+
return false;
129+
}
130+
131+
return rollout.Config.RetryOn.Contains(attempt.Status);
132+
}
133+
134+
private static SpanModel BuildSpanFromResult(AttemptedRollout attempted, LightningExecutionResult execution, int sequenceId)
73135
{
74136
var attributes = new Dictionary<string, object?>(StringComparer.Ordinal);
75137
if (execution.Triplet.Prompt is IEnumerable<object?> prompts)
@@ -103,7 +165,7 @@ private static SpanModel BuildSpanFromResult(AttemptedRollout attempted, Lightni
103165
attributes,
104166
rolloutId: attempted.Rollout.RolloutId,
105167
attemptId: attempted.Attempt.AttemptId,
106-
sequenceId: attempted.Attempt.SequenceId,
168+
sequenceId: sequenceId,
107169
name: "agentlightning.completion",
108170
startTime: attempted.Attempt.StartTime.ToUnixTimeSeconds(),
109171
endTime: attempted.Attempt.EndTime?.ToUnixTimeSeconds());

src/ManagedCode.AgentLightning.AgentRuntime/Trainer/Trainer.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,12 @@ public async Task<IReadOnlyList<LightningExecutionResult>> TrainAsync(IEnumerabl
4040
return Array.Empty<LightningExecutionResult>();
4141
}
4242

43-
var now = _timeProvider.GetUtcNow();
4443
foreach (var (payload, index) in taskList.Select((task, index) => (task, index)))
4544
{
46-
var rollout = new Rollout(
47-
rolloutId: $"rollout-{now.ToUnixTimeMilliseconds()}-{index}",
48-
input: payload,
49-
startTime: now,
50-
config: new RolloutConfig().Normalize());
51-
52-
await _store.EnqueueRolloutAsync(rollout, cancellationToken).ConfigureAwait(false);
45+
await _store.EnqueueRolloutAsync(
46+
payload,
47+
config: new RolloutConfig(),
48+
cancellationToken: cancellationToken).ConfigureAwait(false);
5349
}
5450

5551
_logger.LogInformation("Enqueued {Count} rollouts for training.", taskList.Count);

0 commit comments

Comments
 (0)