Skip to content

Commit df35d11

Browse files
committed
feat: add one-per-thread trace listing
1 parent 9026c9c commit df35d11

4 files changed

Lines changed: 310 additions & 1 deletion

File tree

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,9 @@ langsmith trace list --project my-app --full # All fields (metadat
137137
# Show trace hierarchy (fetches full run tree for each trace)
138138
langsmith trace list --project my-app --show-hierarchy --limit 3
139139

140+
# Collapse traces by thread_id (unthreaded traces are still listed individually)
141+
langsmith trace list --project my-app --one-per-thread
142+
140143
# Get a specific trace
141144
langsmith trace get <trace-id> --project my-app --full
142145

internal/cmd/helpers.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,59 @@ func queryRuns(ctx context.Context, c *client.Client, params langsmith.RunQueryP
5656
return allRuns, nil
5757
}
5858

59+
// queryRunsOnePerThread queries runs like queryRuns but collapses runs that
60+
// share a non-empty thread_id, keeping only the first-seen run per thread.
61+
// Because runs are returned newest-first, the first-seen run for a thread is
62+
// the newest one. Runs with an empty thread_id are kept individually.
63+
//
64+
// min-tokens is applied before de-duping, and duplicate runs do not consume the
65+
// output limit, so pagination continues until enough unique rows are collected
66+
// or the API has no further pages.
67+
func queryRunsOnePerThread(ctx context.Context, c *client.Client, params langsmith.RunQueryParams, projectName string, limit int, minTokens int) ([]langsmith.RunSchema, error) {
68+
if projectName != "" {
69+
sessionID, err := c.ResolveSessionID(ctx, projectName)
70+
if err != nil {
71+
return nil, err
72+
}
73+
params.Session = langsmith.F([]string{sessionID})
74+
}
75+
76+
var result []langsmith.RunSchema
77+
seenThreads := make(map[string]bool)
78+
79+
for {
80+
resp, err := c.SDK.Runs.Query(ctx, params)
81+
if err != nil {
82+
return nil, fmt.Errorf("querying runs: %w", err)
83+
}
84+
85+
for _, run := range resp.Runs {
86+
if len(result) >= limit {
87+
return result, nil
88+
}
89+
// Client-side token filter, applied before de-duping.
90+
if minTokens > 0 && run.TotalTokens < int64(minTokens) {
91+
continue
92+
}
93+
if tid := run.ThreadID; tid != "" {
94+
if seenThreads[tid] {
95+
continue
96+
}
97+
seenThreads[tid] = true
98+
}
99+
result = append(result, run)
100+
}
101+
102+
// Keep paginating until the limit is met or there are no more pages.
103+
if resp.Cursors == nil || resp.Cursors["next"] == "" || len(result) >= limit {
104+
break
105+
}
106+
params.Cursor = langsmith.F(resp.Cursors["next"])
107+
}
108+
109+
return result, nil
110+
}
111+
59112
// buildRunSelect returns the Select fields needed for the given include flags.
60113
// Returns nil when neither IO nor feedback is requested, letting the API use its defaults.
61114
// When set, includes all base/metadata fields so they aren't stripped from the response.

internal/cmd/trace.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func newTraceListCmd() *cobra.Command {
4747
includeFlagged bool
4848
full bool
4949
showHierarchy bool
50+
onePerThread bool
5051
outputFile string
5152
)
5253

@@ -74,9 +75,20 @@ func newTraceListCmd() *cobra.Command {
7475

7576
params := BuildRunQueryParams(&ff, true, ff.Limit)
7677
if sel := buildRunSelect(includeIO, includeFeedback); sel != nil {
78+
// When de-duping by thread, thread_id must be selected so it
79+
// is present on the returned runs.
80+
if onePerThread {
81+
sel = append(sel, langsmith.RunQueryParamsSelectThreadID)
82+
}
7783
params.Select = langsmith.F(sel)
7884
}
79-
runs, err := queryRuns(ctx, c, params, projectName, ff.Limit, ff.MinTokens)
85+
var runs []langsmith.RunSchema
86+
var err error
87+
if onePerThread {
88+
runs, err = queryRunsOnePerThread(ctx, c, params, projectName, ff.Limit, ff.MinTokens)
89+
} else {
90+
runs, err = queryRuns(ctx, c, params, projectName, ff.Limit, ff.MinTokens)
91+
}
8092
if err != nil {
8193
ExitErrorf("%v", err)
8294
}
@@ -152,6 +164,7 @@ func newTraceListCmd() *cobra.Command {
152164
cmd.Flags().BoolVar(&includeFlagged, "include-flagged", false, "Add flagged_comment field populated from user-flagged trace feedback")
153165
cmd.Flags().BoolVar(&full, "full", false, "Shorthand for --include-metadata --include-io --include-feedback")
154166
cmd.Flags().BoolVar(&showHierarchy, "show-hierarchy", false, "Fetch the full run tree for each trace")
167+
cmd.Flags().BoolVar(&onePerThread, "one-per-thread", false, "Collapse traces sharing a thread_id to the newest one per thread (traces without a thread_id are kept individually)")
155168
cmd.Flags().StringVarP(&outputFile, "output", "o", "", "Write JSON output to a file")
156169

157170
return cmd

internal/cmd/trace_test.go

Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package cmd
22

33
import (
4+
"encoding/json"
5+
"net/http"
6+
"net/http/httptest"
47
"testing"
58
)
69

@@ -41,6 +44,7 @@ func TestTraceListCmd_Flags(t *testing.T) {
4144
{"include-io", "false", ""},
4245
{"full", "false", ""},
4346
{"show-hierarchy", "false", ""},
47+
{"one-per-thread", "false", ""},
4448
{"output", "", "o"},
4549
}
4650
for _, tc := range tests {
@@ -76,6 +80,242 @@ func TestTraceListCmd_NoRunTypeFlag(t *testing.T) {
7680
}
7781
}
7882

83+
type traceListPage struct {
84+
runs []map[string]any
85+
nextCursor string
86+
}
87+
88+
func newTraceListPagingServer(t *testing.T, pages []traceListPage, requests *[]map[string]any) *httptest.Server {
89+
t.Helper()
90+
91+
return newTestServer(t, func(w http.ResponseWriter, r *http.Request) {
92+
w.Header().Set("Content-Type", "application/json")
93+
switch {
94+
case r.URL.Path == "/api/v1/sessions" && r.Method == "GET":
95+
name := r.URL.Query().Get("name")
96+
if name != "my-app" {
97+
http.Error(w, "not found", 404)
98+
return
99+
}
100+
_ = json.NewEncoder(w).Encode([]map[string]any{{"id": "session-123", "name": "my-app"}})
101+
case r.URL.Path == "/api/v1/runs/query" && r.Method == "POST":
102+
var body map[string]any
103+
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
104+
t.Errorf("decoding request body: %v", err)
105+
http.Error(w, "bad request", 400)
106+
return
107+
}
108+
*requests = append(*requests, body)
109+
110+
pageIdx := len(*requests) - 1
111+
if pageIdx >= len(pages) {
112+
http.Error(w, "unexpected page", 500)
113+
return
114+
}
115+
116+
cursors := map[string]string{}
117+
if pages[pageIdx].nextCursor != "" {
118+
cursors["next"] = pages[pageIdx].nextCursor
119+
}
120+
_ = json.NewEncoder(w).Encode(map[string]any{
121+
"runs": pages[pageIdx].runs,
122+
"cursors": cursors,
123+
})
124+
default:
125+
t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path)
126+
http.Error(w, "not found", 404)
127+
}
128+
})
129+
}
130+
131+
func runTraceListJSON(t *testing.T, serverURL string, args ...string) []map[string]any {
132+
t.Helper()
133+
134+
cleanup := setupTestEnv(t, serverURL)
135+
defer cleanup()
136+
flagOutputFormat = "json"
137+
138+
cmd := newTraceListCmd()
139+
cmd.SetArgs(append([]string{"--project", "my-app"}, args...))
140+
141+
out := captureStdout(t, func() {
142+
if err := cmd.Execute(); err != nil {
143+
t.Fatalf("trace list failed: %v", err)
144+
}
145+
})
146+
147+
var result []map[string]any
148+
if err := json.Unmarshal([]byte(out), &result); err != nil {
149+
t.Fatalf("output is not valid JSON: %v\noutput: %s", err, out)
150+
}
151+
return result
152+
}
153+
154+
func traceRun(id, traceID, threadID, startTime string) map[string]any {
155+
run := map[string]any{
156+
"id": id,
157+
"trace_id": traceID,
158+
"name": id,
159+
"run_type": "chain",
160+
"start_time": startTime,
161+
}
162+
if threadID != "" {
163+
run["thread_id"] = threadID
164+
}
165+
return run
166+
}
167+
168+
func traceRunWithTokens(id, traceID, threadID, startTime string, totalTokens int) map[string]any {
169+
run := traceRun(id, traceID, threadID, startTime)
170+
run["total_tokens"] = totalTokens
171+
return run
172+
}
173+
174+
func TestTraceListCmd_OnePerThreadFlagDefault(t *testing.T) {
175+
cmd := newTraceListCmd()
176+
f := cmd.Flags().Lookup("one-per-thread")
177+
if f == nil {
178+
t.Fatal("--one-per-thread flag not found")
179+
}
180+
if f.DefValue != "false" {
181+
t.Errorf("expected --one-per-thread default false, got %q", f.DefValue)
182+
}
183+
if f.Shorthand != "" {
184+
t.Errorf("expected --one-per-thread to have no shorthand, got %q", f.Shorthand)
185+
}
186+
}
187+
188+
func TestTraceListCmd_WithoutOnePerThreadKeepsDuplicateThreads(t *testing.T) {
189+
var requests []map[string]any
190+
ts := newTraceListPagingServer(t, []traceListPage{{
191+
runs: []map[string]any{
192+
traceRun("run-a-new", "trace-a-new", "thread-a", "2026-01-03T00:00:00Z"),
193+
traceRun("run-a-old", "trace-a-old", "thread-a", "2026-01-02T00:00:00Z"),
194+
traceRun("run-b", "trace-b", "thread-b", "2026-01-01T00:00:00Z"),
195+
},
196+
}}, &requests)
197+
198+
result := runTraceListJSON(t, ts.URL, "--limit", "3")
199+
200+
if len(result) != 3 {
201+
t.Fatalf("expected 3 traces, got %d: %#v", len(result), result)
202+
}
203+
if result[0]["trace_id"] != "trace-a-new" || result[1]["trace_id"] != "trace-a-old" {
204+
t.Fatalf("expected duplicate thread traces to remain, got %#v", result)
205+
}
206+
}
207+
208+
func TestTraceListCmd_OnePerThreadCollapsesDuplicateThreadsAndKeepsUnthreaded(t *testing.T) {
209+
var requests []map[string]any
210+
ts := newTraceListPagingServer(t, []traceListPage{{
211+
runs: []map[string]any{
212+
traceRun("run-a-new", "trace-a-new", "thread-a", "2026-01-04T00:00:00Z"),
213+
traceRun("run-a-old", "trace-a-old", "thread-a", "2026-01-03T00:00:00Z"),
214+
traceRun("run-unthreaded-1", "trace-unthreaded-1", "", "2026-01-02T00:00:00Z"),
215+
traceRun("run-unthreaded-2", "trace-unthreaded-2", "", "2026-01-01T00:00:00Z"),
216+
},
217+
}}, &requests)
218+
219+
result := runTraceListJSON(t, ts.URL, "--one-per-thread", "--limit", "4")
220+
221+
want := []any{"trace-a-new", "trace-unthreaded-1", "trace-unthreaded-2"}
222+
if len(result) != len(want) {
223+
t.Fatalf("expected %d traces, got %d: %#v", len(want), len(result), result)
224+
}
225+
for i := range want {
226+
if got := result[i]["trace_id"]; got != want[i] {
227+
t.Fatalf("trace %d: expected %v, got %v; all results: %#v", i, want[i], got, result)
228+
}
229+
}
230+
}
231+
232+
func TestTraceListCmd_OnePerThreadPaginatesPastDuplicatesToFillLimit(t *testing.T) {
233+
var requests []map[string]any
234+
ts := newTraceListPagingServer(t, []traceListPage{
235+
{
236+
runs: []map[string]any{
237+
traceRun("run-a-new", "trace-a-new", "thread-a", "2026-01-05T00:00:00Z"),
238+
traceRun("run-a-old-1", "trace-a-old-1", "thread-a", "2026-01-04T00:00:00Z"),
239+
traceRun("run-a-old-2", "trace-a-old-2", "thread-a", "2026-01-03T00:00:00Z"),
240+
},
241+
nextCursor: "page-2",
242+
},
243+
{
244+
runs: []map[string]any{
245+
traceRun("run-a-old-3", "trace-a-old-3", "thread-a", "2026-01-02T00:00:00Z"),
246+
traceRun("run-unthreaded", "trace-unthreaded", "", "2026-01-01T12:00:00Z"),
247+
traceRun("run-b", "trace-b", "thread-b", "2026-01-01T00:00:00Z"),
248+
},
249+
},
250+
}, &requests)
251+
252+
result := runTraceListJSON(t, ts.URL, "--one-per-thread", "--limit", "3")
253+
254+
if len(result) != 3 {
255+
t.Fatalf("expected 3 traces, got %d: %#v", len(result), result)
256+
}
257+
if len(requests) != 2 {
258+
t.Fatalf("expected 2 run query requests, got %d", len(requests))
259+
}
260+
if requests[1]["cursor"] != "page-2" {
261+
t.Fatalf("expected second request cursor page-2, got %#v", requests[1]["cursor"])
262+
}
263+
wantTraceIDs := []string{"trace-a-new", "trace-unthreaded", "trace-b"}
264+
for i, want := range wantTraceIDs {
265+
if result[i]["trace_id"] != want {
266+
t.Fatalf("trace %d: expected %s, got %v; all results: %#v", i, want, result[i]["trace_id"], result)
267+
}
268+
}
269+
}
270+
271+
func TestTraceListCmd_OnePerThreadAppliesMinTokensBeforeDedupe(t *testing.T) {
272+
var requests []map[string]any
273+
ts := newTraceListPagingServer(t, []traceListPage{{
274+
runs: []map[string]any{
275+
traceRunWithTokens("run-a-low", "trace-a-low", "thread-a", "2026-01-03T00:00:00Z", 10),
276+
traceRunWithTokens("run-a-high", "trace-a-high", "thread-a", "2026-01-02T00:00:00Z", 200),
277+
traceRunWithTokens("run-b", "trace-b", "thread-b", "2026-01-01T00:00:00Z", 200),
278+
},
279+
}}, &requests)
280+
281+
result := runTraceListJSON(t, ts.URL, "--one-per-thread", "--min-tokens", "100", "--limit", "2")
282+
283+
if len(result) != 2 {
284+
t.Fatalf("expected 2 traces, got %d: %#v", len(result), result)
285+
}
286+
if result[0]["trace_id"] != "trace-a-high" {
287+
t.Fatalf("expected high-token trace for thread-a after low-token trace was filtered, got %#v", result)
288+
}
289+
if result[1]["trace_id"] != "trace-b" {
290+
t.Fatalf("expected trace-b second, got %#v", result)
291+
}
292+
}
293+
294+
func TestTraceListCmd_OnePerThreadIncludeIOSelectsThreadID(t *testing.T) {
295+
var requests []map[string]any
296+
ts := newTraceListPagingServer(t, []traceListPage{{
297+
runs: []map[string]any{
298+
traceRun("run-a", "trace-a", "thread-a", "2026-01-01T00:00:00Z"),
299+
},
300+
}}, &requests)
301+
302+
_ = runTraceListJSON(t, ts.URL, "--one-per-thread", "--include-io", "--limit", "1")
303+
304+
if len(requests) != 1 {
305+
t.Fatalf("expected 1 run query request, got %d", len(requests))
306+
}
307+
selects, ok := requests[0]["select"].([]any)
308+
if !ok {
309+
t.Fatalf("expected select array in request, got %#v", requests[0]["select"])
310+
}
311+
for _, sel := range selects {
312+
if sel == "thread_id" {
313+
return
314+
}
315+
}
316+
t.Fatalf("expected select to include thread_id, got %#v", selects)
317+
}
318+
79319
// ==================== trace get flags ====================
80320

81321
func TestTraceGetCmd_Flags(t *testing.T) {

0 commit comments

Comments
 (0)