Skip to content

Commit c704dd8

Browse files
refactor: address PR review — simplify streaming, remove apiUrl, extract stepStreamer
- Remove apiUrl from StreamLogs interface/Config — client uses internal baseUrl - Extract inline closures into stepStreamer struct (step_streamer.go) - Simplify drain: cancel() + wg.Wait() instead of timeout race - Use defer streamer.drain() for cleanup safety - Rename waitForStageWithStepCallback → waitForStage (no callback indirection) - Replace fmt with log framework in pipeline.go - Only stream steps with state 'running' (not already-completed ones) - Regenerate mocks Signed-off-by: Alex <132889147+alexvcodesphere@users.noreply.github.com>
1 parent 343cc20 commit c704dd8

9 files changed

Lines changed: 139 additions & 139 deletions

File tree

api/client.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,10 @@ import (
1313
)
1414

1515
type Client struct {
16-
ctx context.Context
17-
api *openapi_client.APIClient
18-
time Time
16+
ctx context.Context
17+
api *openapi_client.APIClient
18+
time Time
19+
baseUrl *url.URL
1920
}
2021

2122
type Configuration struct {
@@ -40,9 +41,10 @@ func (c Configuration) GetApiUrl() *url.URL {
4041
// For use in tests
4142
func NewClientWithCustomDeps(ctx context.Context, opts Configuration, api *openapi_client.APIClient, time Time) *Client {
4243
return &Client{
43-
ctx: context.WithValue(ctx, openapi_client.ContextAccessToken, opts.Token),
44-
api: api,
45-
time: time,
44+
ctx: context.WithValue(ctx, openapi_client.ContextAccessToken, opts.Token),
45+
api: api,
46+
time: time,
47+
baseUrl: opts.GetApiUrl(),
4648
}
4749
}
4850

api/workspace.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ type logEntry struct {
229229
// log entries to the provided writer until the context is cancelled or the
230230
// stream ends. This is used during pipeline execution to provide real-time
231231
// log output.
232-
func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error {
233-
endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", apiUrl, wsId, stage, step)
232+
func (c *Client) StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error {
233+
endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", c.baseUrl.String(), wsId, stage, step)
234234

235235
req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
236236
if err != nil {

cli/cmd/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type Client interface {
3232
GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error)
3333
GitPull(wsId int, remote string, branch string) error
3434
DeployLandscape(wsId int, profile string) error
35-
StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error
35+
StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error
3636
}
3737

3838
// CommandExecutor abstracts command execution for testing

cli/cmd/mocks.go

Lines changed: 16 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/cmd/start_pipeline.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error {
3838
return fmt.Errorf("failed to create Codesphere client: %w", err)
3939
}
4040

41-
return c.StartPipelineStages(client, workspaceId, args, c.Opts.GetApiUrl())
41+
return c.StartPipelineStages(client, workspaceId, args)
4242
}
4343

4444
func AddStartPipelineCmd(start *cobra.Command, opts GlobalOptions) {
@@ -76,15 +76,10 @@ func AddStartPipelineCmd(start *cobra.Command, opts GlobalOptions) {
7676
p.cmd.RunE = p.RunE
7777
}
7878

79-
func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string, apiUrl ...string) error {
80-
url := ""
81-
if len(apiUrl) > 0 {
82-
url = apiUrl[0]
83-
}
79+
func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string) error {
8480
runner := pipeline.NewRunner(client, c.Time)
8581
return runner.RunStages(wsId, stages, pipeline.Config{
8682
Profile: *c.Opts.Profile,
8783
Timeout: *c.Opts.Timeout,
88-
ApiUrl: url,
8984
})
9085
}

pkg/pipeline/mocks.go

Lines changed: 16 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/pipeline/pipeline.go

Lines changed: 13 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@ import (
1010
"fmt"
1111
"io"
1212
"log"
13-
"os"
1413
"slices"
15-
"sync"
1614
"time"
1715

1816
"github.com/codesphere-cloud/cs-go/api"
@@ -25,14 +23,13 @@ type Client interface {
2523
StartPipelineStage(wsId int, profile string, stage string) error
2624
GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error)
2725
DeployLandscape(wsId int, profile string) error
28-
StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error
26+
StreamLogs(ctx context.Context, wsId int, stage string, step int, w io.Writer) error
2927
}
3028

3129
// Config holds parameters for pipeline execution.
3230
type Config struct {
3331
Profile string
3432
Timeout time.Duration
35-
ApiUrl string
3633
}
3734

3835
// Runner orchestrates pipeline stage execution.
@@ -61,11 +58,11 @@ func (r *Runner) RunStages(wsId int, stages []string, cfg Config) error {
6158
for _, stage := range stages {
6259
// Sync the landscape before the run stage
6360
if stage == "run" {
64-
fmt.Println(" 🔄 Syncing landscape...")
61+
log.Println(" 🔄 Syncing landscape...")
6562
if err := r.Client.DeployLandscape(wsId, cfg.Profile); err != nil {
6663
return fmt.Errorf("syncing landscape: %w", err)
6764
}
68-
fmt.Println(" ✅ Landscape synced.")
65+
log.Println(" ✅ Landscape synced.")
6966
}
7067

7168
if err := r.runStage(wsId, stage, cfg); err != nil {
@@ -83,63 +80,13 @@ func (r *Runner) runStage(wsId int, stage string, cfg Config) error {
8380
return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err)
8481
}
8582

86-
// Step-aware log streaming for non-run stages.
87-
// Each step gets its own context; when a new step is discovered the
88-
// previous step's stream is cancelled and drained before moving on.
89-
streamEnabled := stage != "run" && cfg.ApiUrl != ""
90-
streamingStep := -1
91-
var stepCancel context.CancelFunc
92-
var stepWg sync.WaitGroup
93-
94-
// drainStream waits for the current stream to deliver logs, then cancels.
95-
drainStream := func() {
96-
if stepCancel == nil {
97-
return
98-
}
99-
done := make(chan struct{})
100-
go func() {
101-
stepWg.Wait()
102-
close(done)
103-
}()
104-
select {
105-
case <-done:
106-
case <-time.After(3 * time.Second):
107-
stepCancel()
108-
stepWg.Wait()
109-
}
110-
}
111-
112-
startStreamForStep := func(step int, totalSteps int) {
113-
if !streamEnabled || step <= streamingStep {
114-
return
115-
}
116-
117-
// Drain previous step before starting next
118-
drainStream()
119-
120-
streamingStep = step
121-
fmt.Printf("\n 📋 Step %d/%d\n", step+1, totalSteps)
122-
123-
ctx, cancel := context.WithCancel(context.Background())
124-
stepCancel = cancel
125-
stepWg.Add(1)
126-
go func() {
127-
defer stepWg.Done()
128-
if err := r.Client.StreamLogs(ctx, cfg.ApiUrl, wsId, stage, step, os.Stdout); err != nil {
129-
_, _ = fmt.Fprintf(os.Stderr, "⚠ log stream error (step %d): %v\n", step, err)
130-
}
131-
}()
132-
}
133-
134-
err := r.waitForStageWithStepCallback(wsId, stage, cfg, startStreamForStep)
135-
136-
// Drain final step's logs
137-
drainStream()
83+
streamer := newStepStreamer(r.Client, wsId, stage)
84+
defer streamer.drain()
13885

139-
return err
86+
return r.waitForStage(wsId, stage, cfg, streamer)
14087
}
14188

142-
func (r *Runner) waitForStageWithStepCallback(wsId int, stage string, cfg Config, onStep func(step int, total int)) error {
89+
func (r *Runner) waitForStage(wsId int, stage string, cfg Config, streamer *stepStreamer) error {
14390
delay := 5 * time.Second
14491
timeout := cfg.Timeout
14592
if timeout == 0 {
@@ -156,17 +103,14 @@ func (r *Runner) waitForStageWithStepCallback(wsId int, stage string, cfg Config
156103
}
157104

158105
// Discover active step from IDE server's Steps array
159-
if onStep != nil {
160-
for _, s := range status {
161-
if s.Server == IdeServer {
162-
total := len(s.Steps)
163-
for i, step := range s.Steps {
164-
if step.State == "running" || step.State == "success" {
165-
onStep(i, total)
166-
}
106+
for _, s := range status {
107+
if s.Server == IdeServer {
108+
for i, step := range s.Steps {
109+
if step.State == "running" {
110+
streamer.startStep(i, len(s.Steps))
167111
}
168-
break
169112
}
113+
break
170114
}
171115
}
172116

0 commit comments

Comments
 (0)