Skip to content

Commit 343cc20

Browse files
feat: refactor pipeline into shared package with step-aware log streaming
Extract pipeline stage execution from cli/cmd/start_pipeline.go into a reusable pkg/pipeline package with a Runner that orchestrates stage execution, discovers sub-steps from the IDE server's pipeline status API, and streams logs per-step via SSE. - Add pkg/pipeline with Runner, Client interface, and step discovery - Add StreamLogs SSE method to api/workspace.go for real-time log output - Add StreamLogs to cli/cmd Client interface - Refactor start_pipeline.go to delegate to pipeline.Runner - Add pipeline streaming unit tests (single/multi-step, no-stream cases) - Update .mockery.yml for pipeline mock generation Signed-off-by: Alex <132889147+alexvcodesphere@users.noreply.github.com>
1 parent f1af57b commit 343cc20

10 files changed

Lines changed: 948 additions & 118 deletions

File tree

.mockery.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,11 @@ packages:
3838
config:
3939
all: true
4040
interfaces:
41+
github.com/codesphere-cloud/cs-go/pkg/deploy:
42+
config:
43+
all: true
44+
interfaces:
45+
github.com/codesphere-cloud/cs-go/pkg/pipeline:
46+
config:
47+
all: true
48+
interfaces:

api/workspace.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,14 @@
44
package api
55

66
import (
7+
"bufio"
8+
"context"
9+
"encoding/json"
710
"fmt"
11+
"io"
12+
"log"
13+
"net/http"
14+
"strings"
815

916
"github.com/codesphere-cloud/cs-go/api/errors"
1017
"github.com/codesphere-cloud/cs-go/api/openapi_client"
@@ -210,3 +217,97 @@ func (c Client) GitPull(workspaceId int, remote string, branch string) error {
210217
_, err := req.Execute()
211218
return errors.FormatAPIError(err)
212219
}
220+
221+
// logEntry represents a single log line from the SSE stream.
222+
type logEntry struct {
223+
Timestamp string `json:"timestamp"`
224+
Kind string `json:"kind"`
225+
Data string `json:"data"`
226+
}
227+
228+
// StreamLogs connects to the Codesphere SSE log endpoint and writes parsed
229+
// log entries to the provided writer until the context is cancelled or the
230+
// stream ends. This is used during pipeline execution to provide real-time
231+
// log output.
232+
func (c *Client) StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error {
233+
endpoint := fmt.Sprintf("%s/workspaces/%d/logs/%s/%d", apiUrl, wsId, stage, step)
234+
235+
req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
236+
if err != nil {
237+
return fmt.Errorf("failed to construct log stream request: %w", err)
238+
}
239+
240+
req.Header.Set("Accept", "text/event-stream")
241+
242+
// Set auth from the client's context token
243+
if token, ok := ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" {
244+
req.Header.Set("Authorization", "Bearer "+token)
245+
} else if token, ok := c.ctx.Value(openapi_client.ContextAccessToken).(string); ok && token != "" {
246+
req.Header.Set("Authorization", "Bearer "+token)
247+
}
248+
249+
resp, err := http.DefaultClient.Do(req)
250+
if err != nil {
251+
// Context cancellation is expected when the stage finishes
252+
if ctx.Err() != nil {
253+
return nil
254+
}
255+
return fmt.Errorf("failed to connect to log stream: %w", err)
256+
}
257+
defer func() { _ = resp.Body.Close() }()
258+
259+
if resp.StatusCode != http.StatusOK {
260+
return fmt.Errorf("log stream responded with status %d", resp.StatusCode)
261+
}
262+
263+
reader := bufio.NewReader(resp.Body)
264+
265+
for {
266+
// Check if context is done
267+
select {
268+
case <-ctx.Done():
269+
return nil
270+
default:
271+
}
272+
273+
// Parse one SSE event
274+
var eventData string
275+
for {
276+
line, err := reader.ReadString('\n')
277+
if err != nil {
278+
if ctx.Err() != nil || err == io.EOF {
279+
return nil
280+
}
281+
return fmt.Errorf("failed to read log stream: %w", err)
282+
}
283+
284+
line = strings.TrimSpace(line)
285+
286+
if strings.HasPrefix(line, "data:") {
287+
data := strings.TrimSpace(strings.TrimPrefix(line, "data:"))
288+
if eventData != "" {
289+
eventData += "\n" + data
290+
} else {
291+
eventData = data
292+
}
293+
} else if line == "" && eventData != "" {
294+
// Empty line marks end of SSE event
295+
break
296+
}
297+
}
298+
299+
// Parse and print log entries
300+
var entries []logEntry
301+
if err := json.Unmarshal([]byte(eventData), &entries); err != nil {
302+
// Skip unparseable events (e.g. error responses)
303+
log.Printf("⚠ log stream: %s", eventData)
304+
eventData = ""
305+
continue
306+
}
307+
308+
for _, entry := range entries {
309+
_, _ = fmt.Fprintf(w, "%s | %s\n", entry.Timestamp, entry.Data)
310+
}
311+
eventData = ""
312+
}
313+
}

cli/cmd/client.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Client interface {
3232
GetPipelineState(wsId int, stage string) ([]api.PipelineStatus, error)
3333
GitPull(wsId int, remote string, branch string) error
3434
DeployLandscape(wsId int, profile string) error
35+
StreamLogs(ctx context.Context, apiUrl string, wsId int, stage string, step int, w io.Writer) error
3536
}
3637

3738
// CommandExecutor abstracts command execution for testing

cli/cmd/mocks.go

Lines changed: 81 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cli/cmd/start_pipeline.go

Lines changed: 17 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,11 @@ package cmd
55

66
import (
77
"fmt"
8-
"log"
9-
"slices"
108
"time"
119

1210
"github.com/codesphere-cloud/cs-go/api"
1311
"github.com/codesphere-cloud/cs-go/pkg/io"
12+
"github.com/codesphere-cloud/cs-go/pkg/pipeline"
1413

1514
"github.com/spf13/cobra"
1615
)
@@ -27,8 +26,6 @@ type StartPipelineOpts struct {
2726
Timeout *time.Duration
2827
}
2928

30-
const IdeServer string = "codesphere-ide"
31-
3229
func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error {
3330

3431
workspaceId, err := c.Opts.GetWorkspaceId()
@@ -41,11 +38,11 @@ func (c *StartPipelineCmd) RunE(_ *cobra.Command, args []string) error {
4138
return fmt.Errorf("failed to create Codesphere client: %w", err)
4239
}
4340

44-
return c.StartPipelineStages(client, workspaceId, args)
41+
return c.StartPipelineStages(client, workspaceId, args, c.Opts.GetApiUrl())
4542
}
4643

4744
func AddStartPipelineCmd(start *cobra.Command, opts GlobalOptions) {
48-
pipeline := StartPipelineCmd{
45+
p := StartPipelineCmd{
4946
cmd: &cobra.Command{
5047
Use: "pipeline",
5148
Short: "Start pipeline stages of a workspace",
@@ -72,116 +69,22 @@ func AddStartPipelineCmd(start *cobra.Command, opts GlobalOptions) {
7269
Time: &api.RealTime{},
7370
}
7471

75-
pipeline.Opts.Timeout = pipeline.cmd.Flags().Duration("timeout", 30*time.Minute, "Time to wait per stage before stopping the command execution (e.g. 10m)")
76-
pipeline.Opts.Profile = pipeline.cmd.Flags().StringP("profile", "p", "", "CI profile to use (e.g. 'prod' for the profile defined in 'ci.prod.yml'), defaults to the ci.yml profile")
77-
start.AddCommand(pipeline.cmd)
78-
79-
pipeline.cmd.RunE = pipeline.RunE
80-
}
81-
82-
func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string) error {
83-
for _, stage := range stages {
84-
if !isValidStage(stage) {
85-
return fmt.Errorf("invalid pipeline stage: %s", stage)
86-
}
87-
}
88-
for _, stage := range stages {
89-
err := c.startStage(client, wsId, stage)
90-
if err != nil {
91-
return err
92-
}
93-
}
94-
return nil
95-
}
96-
97-
func isValidStage(stage string) bool {
98-
return slices.Contains([]string{"prepare", "test", "run"}, stage)
99-
}
100-
101-
func (c *StartPipelineCmd) startStage(client Client, wsId int, stage string) error {
102-
log.Printf("starting %s stage on workspace %d...", stage, wsId)
103-
104-
err := client.StartPipelineStage(wsId, *c.Opts.Profile, stage)
105-
if err != nil {
106-
log.Println()
107-
return fmt.Errorf("failed to start pipeline stage %s: %w", stage, err)
108-
}
109-
110-
err = c.waitForPipelineStage(client, wsId, stage)
111-
if err != nil {
112-
return fmt.Errorf("failed waiting for stage %s to finish: %w", stage, err)
72+
p.Opts.Timeout = p.cmd.Flags().Duration("timeout", 30*time.Minute, "Time to wait per stage before stopping the command execution (e.g. 10m)")
73+
p.Opts.Profile = p.cmd.Flags().StringP("profile", "p", "", "CI profile to use (e.g. 'prod' for the profile defined in 'ci.prod.yml'), defaults to the ci.yml profile")
74+
start.AddCommand(p.cmd)
11375

114-
}
115-
return nil
116-
}
117-
118-
func (c *StartPipelineCmd) waitForPipelineStage(client Client, wsId int, stage string) error {
119-
delay := 5 * time.Second
120-
121-
maxWaitTime := c.Time.Now().Add(*c.Opts.Timeout)
122-
for {
123-
status, err := client.GetPipelineState(wsId, stage)
124-
if err != nil {
125-
log.Printf("\nError getting pipeline status: %s, trying again...", err.Error())
126-
c.Time.Sleep(delay)
127-
continue
128-
}
129-
130-
if c.allFinished(status) {
131-
log.Println("(finished)")
132-
break
133-
}
134-
135-
if allRunning(status) && stage == "run" {
136-
log.Println("(running)")
137-
break
138-
}
139-
140-
err = shouldAbort(status)
141-
if err != nil {
142-
log.Println("(failed)")
143-
return fmt.Errorf("stage %s failed: %w", stage, err)
144-
}
145-
146-
log.Print(".")
147-
if c.Time.Now().After(maxWaitTime) {
148-
log.Println()
149-
return fmt.Errorf("timed out waiting for pipeline stage %s to be complete", stage)
150-
}
151-
c.Time.Sleep(delay)
152-
}
153-
return nil
154-
}
155-
156-
func allRunning(status []api.PipelineStatus) bool {
157-
for _, s := range status {
158-
// Run stage is only running customer servers, ignore IDE server
159-
if s.Server != IdeServer && s.State != "running" {
160-
return false
161-
}
162-
}
163-
return true
164-
}
165-
166-
func (c *StartPipelineCmd) allFinished(status []api.PipelineStatus) bool {
167-
io.Verboseln(*c.Opts.Verbose, "====")
168-
for _, s := range status {
169-
io.Verbosef(*c.Opts.Verbose, "Server: %s, State: %s, Replica: %s\n", s.Server, s.State, s.Replica)
170-
}
171-
for _, s := range status {
172-
// Prepare and Test stage is only running in the IDE server, ignore customer servers
173-
if s.Server == IdeServer && s.State != "success" {
174-
return false
175-
}
176-
}
177-
return true
76+
p.cmd.RunE = p.RunE
17877
}
17978

180-
func shouldAbort(status []api.PipelineStatus) error {
181-
for _, s := range status {
182-
if slices.Contains([]string{"failure", "aborted"}, s.State) {
183-
return fmt.Errorf("server %s, replica %s reached unexpected state %s", s.Server, s.Replica, s.State)
184-
}
79+
func (c *StartPipelineCmd) StartPipelineStages(client Client, wsId int, stages []string, apiUrl ...string) error {
80+
url := ""
81+
if len(apiUrl) > 0 {
82+
url = apiUrl[0]
18583
}
186-
return nil
84+
runner := pipeline.NewRunner(client, c.Time)
85+
return runner.RunStages(wsId, stages, pipeline.Config{
86+
Profile: *c.Opts.Profile,
87+
Timeout: *c.Opts.Timeout,
88+
ApiUrl: url,
89+
})
18790
}

0 commit comments

Comments
 (0)