Skip to content

Commit 9c500a7

Browse files
committed
feat: stream-driven progress for explain --generate
Switches `entire explain --generate` from silent wait to real-time stream-driven progress. Independent of typed Claude error classification (#963). ### User-visible changes **Interactive (TTY):** live progress phases with in-place updates Generating checkpoint summary... (transcript: 47.0 KB) → Sending request to Anthropic... → Anthropic responded (TTFT 1.9s, 35.9k cached input tokens) -- generating... → Writing summary... (~1.2k tokens) ✓ Summary generated (3.1s, 1.9k output tokens) ✓ Summary generated and saved **Non-TTY (CI):** same lines, one per event (no in-place updates) **ACCESSIBLE=1:** one line per event, no ANSI escape sequences ### Architecture - `agent.StreamingTextGenerator` optional capability interface with `AsStreamingTextGenerator` helper. Plain-error contract — typed error classification will be wired separately (see #963). - Claude Code implements it via `GenerateTextStreaming` in a new file (`generate_streaming.go`), leaving the existing `GenerateText` path untouched. Falls back internally to `--output-format json` when the CLI rejects streaming flags. - `summaryProgressWriter` renders `GenerationProgress` phases using existing `statusStyles` for consistent styling with `entire status`. - TTY-gated deadline: no deadline interactive (Ctrl+C is the timeout); idle-based 5min watchdog non-TTY with a 30min wall-clock cap. - `ProgressFn` threads through `summarize.GenerateFromTranscript` and `ClaudeGenerator`; preserved as nil for the condensation path. ### Scope note This PR intentionally does not map Claude auth/rate-limit/config errors to actionable messages — that's #963's typed-error work. When #963 lands on main, a small follow-up will wire typed errors into the streaming path. Users hitting stream errors in the meantime see a generic "claude CLI reported error: <text>" (HTTP status included when present). ### Tests - Stream parser: success + error fixtures, malformed-line resilience, reader errors - GenerateTextStreaming: phase emission, envelope error surfacing, legacy fallback, context cancellation - Progress writer: all phases, Generating dedup, lastActivity tracking - Idle watchdog: stale-fire, bump-prevents-fire, zero-idle noop - Deadline resolver: interactive no-deadline vs non-interactive idle Entire-Checkpoint: 56ee3789dcea
1 parent 96867cd commit 9c500a7

15 files changed

Lines changed: 999 additions & 109 deletions

cmd/entire/cli/agent/agent.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,49 @@ type TextGenerator interface {
187187
GenerateText(ctx context.Context, prompt string, model string) (string, error)
188188
}
189189

190+
// ProgressPhase identifies a coarse stage in streaming text generation.
191+
type ProgressPhase string
192+
193+
const (
194+
// PhaseConnecting is emitted once when the CLI signals it is making the upstream request.
195+
PhaseConnecting ProgressPhase = "connecting"
196+
// PhaseFirstToken is emitted once when the upstream responds with the first event,
197+
// carrying TTFT and input/cache token counts.
198+
PhaseFirstToken ProgressPhase = "first-token"
199+
// PhaseGenerating is emitted repeatedly as text or thinking deltas arrive.
200+
// OutputTokens carries a running estimate based on delta sizes.
201+
PhaseGenerating ProgressPhase = "generating"
202+
// PhaseDone is emitted once when the final result event is received without error.
203+
PhaseDone ProgressPhase = "done"
204+
)
205+
206+
// GenerationProgress reports a snapshot of streaming text generation progress.
207+
// Fields not relevant to the current Phase may be zero-valued.
208+
type GenerationProgress struct {
209+
Phase ProgressPhase
210+
OutputTokens int // running estimate during PhaseGenerating; final at PhaseDone
211+
InputTokens int // populated at PhaseFirstToken
212+
CachedInputTokens int // populated at PhaseFirstToken
213+
TTFTms int // time-to-first-token, populated at PhaseFirstToken
214+
DurationMs int // populated at PhaseDone (final result event)
215+
}
216+
217+
// ProgressFn receives streaming progress updates. It must not block — invoke it
218+
// from the same goroutine that reads the stream and keep handlers fast.
219+
type ProgressFn func(GenerationProgress)
220+
221+
// StreamingTextGenerator is an optional interface for text generators whose
222+
// underlying CLI exposes a streaming output mode. Callers can use AsStreamingTextGenerator
223+
// to detect support and fall back to plain GenerateText when unavailable.
224+
type StreamingTextGenerator interface {
225+
Agent
226+
227+
// GenerateTextStreaming invokes the agent's streaming text generation and
228+
// calls progress for each phase update. progress may be nil to suppress
229+
// reporting. The returned string is the final response text.
230+
GenerateTextStreaming(ctx context.Context, prompt, model string, progress ProgressFn) (string, error)
231+
}
232+
190233
// HookResponseWriter is implemented by agents that support structured hook responses.
191234
// Agents that implement this can output messages (e.g., banners) to the user via
192235
// the agent's response protocol. For example, Claude Code outputs JSON with a

cmd/entire/cli/agent/capabilities.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ type DeclaredCaps struct {
2121
TranscriptPreparer bool `json:"transcript_preparer"`
2222
TokenCalculator bool `json:"token_calculator"`
2323
TextGenerator bool `json:"text_generator"`
24+
StreamingTextGenerator bool `json:"streaming_text_generator"`
2425
HookResponseWriter bool `json:"hook_response_writer"`
2526
SubagentAwareExtractor bool `json:"subagent_aware_extractor"`
2627
}
@@ -105,6 +106,22 @@ func AsTextGenerator(ag Agent) (TextGenerator, bool) { //nolint:ireturn // type-
105106
return tg, true
106107
}
107108

109+
// AsStreamingTextGenerator returns the agent as StreamingTextGenerator if it both
110+
// implements the interface and (for CapabilityDeclarer agents) has declared the capability.
111+
func AsStreamingTextGenerator(ag Agent) (StreamingTextGenerator, bool) { //nolint:ireturn // type-assertion helper must return interface
112+
if ag == nil {
113+
return nil, false
114+
}
115+
stg, ok := ag.(StreamingTextGenerator)
116+
if !ok {
117+
return nil, false
118+
}
119+
if cd, ok := ag.(CapabilityDeclarer); ok {
120+
return stg, cd.DeclaredCapabilities().StreamingTextGenerator
121+
}
122+
return stg, true
123+
}
124+
108125
// AsHookResponseWriter returns the agent as HookResponseWriter if it both
109126
// implements the interface and (for CapabilityDeclarer agents) has declared the capability.
110127
func AsHookResponseWriter(ag Agent) (HookResponseWriter, bool) { //nolint:ireturn // type-assertion helper must return interface

cmd/entire/cli/agent/capabilities_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,20 @@ func (m *mockFullAgent) CalculateTotalTokenUsage([]byte, int, string) (*TokenUsa
9494
return nil, nil //nolint:nilnil // test mock
9595
}
9696

97+
// StreamingTextGenerator
98+
func (m *mockFullAgent) GenerateTextStreaming(context.Context, string, string, ProgressFn) (string, error) {
99+
return "", nil
100+
}
101+
102+
// mockBuiltinStreamingAgent is a built-in agent that implements StreamingTextGenerator but NOT CapabilityDeclarer.
103+
type mockBuiltinStreamingAgent struct {
104+
mockBaseAgent
105+
}
106+
107+
func (m *mockBuiltinStreamingAgent) GenerateTextStreaming(context.Context, string, string, ProgressFn) (string, error) {
108+
return "", nil
109+
}
110+
97111
// mockBuiltinPromptAgent is a built-in agent that implements PromptExtractor but NOT CapabilityDeclarer.
98112
type mockBuiltinPromptAgent struct {
99113
mockBaseAgent
@@ -371,3 +385,43 @@ func TestAsPromptExtractor(t *testing.T) {
371385
}
372386
})
373387
}
388+
389+
func TestAsStreamingTextGenerator(t *testing.T) {
390+
t.Parallel()
391+
392+
t.Run("not implemented", func(t *testing.T) {
393+
t.Parallel()
394+
ag := &mockBaseAgent{}
395+
_, ok := AsStreamingTextGenerator(ag)
396+
if ok {
397+
t.Error("expected false for agent not implementing StreamingTextGenerator")
398+
}
399+
})
400+
401+
t.Run("builtin agent", func(t *testing.T) {
402+
t.Parallel()
403+
ag := &mockBuiltinStreamingAgent{}
404+
stg, ok := AsStreamingTextGenerator(ag)
405+
if !ok || stg == nil {
406+
t.Error("expected true for built-in agent implementing StreamingTextGenerator")
407+
}
408+
})
409+
410+
t.Run("declared true", func(t *testing.T) {
411+
t.Parallel()
412+
ag := &mockFullAgent{caps: DeclaredCaps{StreamingTextGenerator: true}}
413+
stg, ok := AsStreamingTextGenerator(ag)
414+
if !ok || stg == nil {
415+
t.Error("expected true when capability declared true")
416+
}
417+
})
418+
419+
t.Run("declared false", func(t *testing.T) {
420+
t.Parallel()
421+
ag := &mockFullAgent{caps: DeclaredCaps{StreamingTextGenerator: false}}
422+
_, ok := AsStreamingTextGenerator(ag)
423+
if ok {
424+
t.Error("expected false when capability declared false")
425+
}
426+
})
427+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package claudecode
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"errors"
7+
"fmt"
8+
"os"
9+
"os/exec"
10+
"strings"
11+
12+
"github.com/entireio/cli/cmd/entire/cli/agent"
13+
)
14+
15+
// GenerateTextStreaming runs the Claude CLI in stream-json mode, dispatches
16+
// progress events to the optional callback, and returns the final result text.
17+
// Implements the agent.StreamingTextGenerator interface.
18+
//
19+
// If the CLI rejects the stream-json flags (older Claude CLI), this falls back
20+
// to the non-streaming GenerateText path — without progress events.
21+
func (c *ClaudeCodeAgent) GenerateTextStreaming(
22+
ctx context.Context,
23+
prompt, model string,
24+
progress agent.ProgressFn,
25+
) (string, error) {
26+
if model == "" {
27+
model = "haiku"
28+
}
29+
30+
commandRunner := c.CommandRunner
31+
if commandRunner == nil {
32+
commandRunner = exec.CommandContext
33+
}
34+
35+
cmd := commandRunner(ctx, "claude",
36+
"--print",
37+
"--output-format", "stream-json",
38+
"--verbose",
39+
"--model", model,
40+
"--setting-sources", "")
41+
42+
cmd.Dir = os.TempDir()
43+
cmd.Env = agent.StripGitEnv(os.Environ())
44+
cmd.Stdin = strings.NewReader(prompt)
45+
46+
stdout, err := cmd.StdoutPipe()
47+
if err != nil {
48+
return "", fmt.Errorf("claude stream stdout pipe: %w", err)
49+
}
50+
var stderr bytes.Buffer
51+
cmd.Stderr = &stderr
52+
53+
if err := cmd.Start(); err != nil {
54+
return "", fmt.Errorf("claude stream start: %w", err)
55+
}
56+
57+
final, parseErr := streamClaudeResponse(stdout, makeProgressDispatcher(progress))
58+
waitErr := cmd.Wait()
59+
60+
// Context errors pass through as sentinels so callers can use errors.Is.
61+
if ctx.Err() != nil {
62+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
63+
return "", context.DeadlineExceeded
64+
}
65+
return "", context.Canceled
66+
}
67+
68+
if final != nil {
69+
if !final.IsError {
70+
if final.Result == nil {
71+
return "", errors.New("claude returned empty result")
72+
}
73+
if progress != nil {
74+
progress(agent.GenerationProgress{
75+
Phase: agent.PhaseDone,
76+
OutputTokens: outputTokensFromUsage(final.Usage),
77+
DurationMs: final.DurationMs,
78+
})
79+
}
80+
return *final.Result, nil
81+
}
82+
msg := "claude CLI reported error"
83+
if final.Result != nil && *final.Result != "" {
84+
msg = fmt.Sprintf("%s: %s", msg, *final.Result)
85+
}
86+
if final.APIErrorStatus != nil {
87+
msg = fmt.Sprintf("%s (HTTP %d)", msg, *final.APIErrorStatus)
88+
}
89+
return "", errors.New(msg)
90+
}
91+
92+
// No envelope: check if the CLI rejected streaming flags (older version).
93+
// If so, fall back to the non-streaming path.
94+
if waitErr != nil {
95+
stderrStr := stderr.String()
96+
if looksLikeUnrecognizedFlag(stderrStr) {
97+
return c.GenerateText(ctx, prompt, model)
98+
}
99+
if stderrStr != "" {
100+
return "", fmt.Errorf("claude stream failed: %s: %w", strings.TrimSpace(stderrStr), waitErr)
101+
}
102+
return "", fmt.Errorf("claude stream failed: %w", waitErr)
103+
}
104+
105+
if parseErr != nil {
106+
return "", fmt.Errorf("claude stream parse: %w", parseErr)
107+
}
108+
return "", errors.New("claude exited without producing a result")
109+
}
110+
111+
// makeProgressDispatcher returns a per-event handler that translates raw
112+
// stream events into agent.GenerationProgress callbacks. PhaseDone is
113+
// emitted by GenerateTextStreaming after cmd.Wait, because it needs data
114+
// from the parsed final envelope (OutputTokens, DurationMs).
115+
func makeProgressDispatcher(progress agent.ProgressFn) func(StreamEvent) {
116+
if progress == nil {
117+
return func(StreamEvent) {} // no-op: drain events
118+
}
119+
var outputTokensEstimate int
120+
return func(ev StreamEvent) {
121+
switch {
122+
case ev.Type == "system" && ev.Subtype == "status" && ev.Status == "requesting":
123+
progress(agent.GenerationProgress{Phase: agent.PhaseConnecting})
124+
case ev.Type == "stream_event" && ev.Event.Type == "message_start":
125+
p := agent.GenerationProgress{Phase: agent.PhaseFirstToken, TTFTms: ev.TTFTms}
126+
if ev.Event.Message != nil && ev.Event.Message.Usage != nil {
127+
p.InputTokens = ev.Event.Message.Usage.InputTokens
128+
p.CachedInputTokens = ev.Event.Message.Usage.CacheReadInputTokens
129+
}
130+
progress(p)
131+
case ev.Type == "stream_event" && ev.Event.Type == "content_block_delta" && ev.Event.Delta != nil:
132+
text := ev.Event.Delta.Text
133+
if text == "" {
134+
text = ev.Event.Delta.Thinking
135+
}
136+
outputTokensEstimate += len(text) / 4 // rough estimate: ~4 chars/token
137+
progress(agent.GenerationProgress{Phase: agent.PhaseGenerating, OutputTokens: outputTokensEstimate})
138+
}
139+
}
140+
}
141+
142+
func outputTokensFromUsage(u *messageUsage) int {
143+
if u == nil {
144+
return 0
145+
}
146+
return u.OutputTokens
147+
}
148+
149+
// looksLikeUnrecognizedFlag returns true if stderr indicates the CLI
150+
// rejected one of the streaming-specific flags (older Claude CLI that
151+
// doesn't support stream-json or --verbose). Requires both a rejection
152+
// phrase AND a streaming flag name to avoid false-positives on unrelated
153+
// errors that happen to contain "unknown option".
154+
func looksLikeUnrecognizedFlag(stderr string) bool {
155+
lower := strings.ToLower(stderr)
156+
hasRejectPhrase := strings.Contains(lower, "unrecognized option") ||
157+
strings.Contains(lower, "unknown flag") ||
158+
strings.Contains(lower, "unknown option") ||
159+
strings.Contains(lower, "invalid option")
160+
if !hasRejectPhrase {
161+
return false
162+
}
163+
return strings.Contains(lower, "stream-json") ||
164+
strings.Contains(lower, "verbose") ||
165+
strings.Contains(lower, "include-partial")
166+
}

0 commit comments

Comments
 (0)