Skip to content

Commit 5147438

Browse files
committed
feat: enhance runtime scroll creation with UUID generation and timeout settings
- Added UUID generation for runtime scroll IDs when omitted during creation, ensuring unique identifiers. - Implemented a timeout for HTTP clients in the OpenAPI client to improve reliability. - Updated OpenAPI specifications and related documentation to reflect changes in ID handling. - Refactored error handling in the runtime supervisor to manage scroll creation more effectively. - Enhanced tests to cover new functionality and ensure robustness in scroll management.
1 parent 30128b9 commit 5147438

13 files changed

Lines changed: 290 additions & 127 deletions

File tree

Makefile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ k3d-build-pull-image: ## Build the unified Druid runtime image and import it int
3535
build-x86-docker:
3636
docker run -e GOOS=linux -e GOARCH=amd64 -it --rm -v ./:/app -w /app --entrypoint=/bin/bash docker.elastic.co/beats-dev/golang-crossbuild:1.22.5-main -c 'CGO_ENABLED=1 go build -ldflags "-X github.com/highcard-dev/daemon/internal.Version=$(VERSION)" -o ./bin/x86/druid'
3737

38-
install: ## Install Daemon
39-
cp ./bin/druid /usr/local/bin/druid
38+
install: build ## Build and install Druid binaries
39+
install -m 0755 ./bin/druid /usr/local/bin/druid
40+
install -m 0755 ./bin/druid-coldstarter /usr/local/bin/druid-coldstarter
4041

4142
generate-md-docs:
4243
go run ./docs_md/main.go

api/openapi.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,11 @@ components:
7676
properties:
7777
id:
7878
type: string
79-
description: Deprecated alias for name. Optional local runtime scroll id/name.
79+
description: Deprecated alias for name. Optional stable runtime id/name. If omitted, the daemon generates an id.
8080
example: jobs
8181
name:
8282
type: string
83-
description: Optional local runtime scroll id/name. If omitted, the daemon derives it from scroll.yaml name.
83+
description: Optional stable runtime id/name. If omitted, the daemon generates an id; the display name still comes from scroll.yaml.
8484
example: jobs
8585
artifact:
8686
type: string

apps/druid/adapters/daemonclient/openapi_client.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@ import (
44
"bytes"
55
"context"
66
"encoding/json"
7-
"errors"
87
"fmt"
98
"io"
109
"net"
1110
"net/http"
1211
"net/url"
1312
"strings"
13+
"time"
1414

1515
"github.com/highcard-dev/daemon/internal/api"
1616
"github.com/highcard-dev/daemon/internal/utils"
1717
)
1818

19-
var ErrMaterializationUnsupported = errors.New("daemon materialization unsupported")
19+
const daemonRequestTimeout = 5 * time.Second
2020

2121
type OpenAPIClient struct {
2222
client *api.ClientWithResponses
@@ -31,21 +31,22 @@ func NewOpenAPIClient(daemonSocket string) (*OpenAPIClient, error) {
3131
func NewOpenAPIClientForTarget(daemonSocket string, daemonURL string) (*OpenAPIClient, error) {
3232
if daemonURL != "" {
3333
server := strings.TrimRight(daemonURL, "/")
34-
client, err := api.NewClientWithResponses(server)
34+
httpClient := &http.Client{Timeout: daemonRequestTimeout}
35+
client, err := api.NewClientWithResponses(server, api.WithHTTPClient(httpClient))
3536
if err != nil {
3637
return nil, err
3738
}
38-
return &OpenAPIClient{client: client, server: server, httpClient: http.DefaultClient}, nil
39+
return &OpenAPIClient{client: client, server: server, httpClient: httpClient}, nil
3940
}
4041
if daemonSocket == "" {
4142
daemonSocket = utils.DefaultRuntimeSocketPath()
4243
}
4344
transport := &http.Transport{
4445
DialContext: func(ctx context.Context, network string, addr string) (net.Conn, error) {
45-
return (&net.Dialer{}).DialContext(ctx, "unix", daemonSocket)
46+
return (&net.Dialer{Timeout: daemonRequestTimeout}).DialContext(ctx, "unix", daemonSocket)
4647
},
4748
}
48-
httpClient := &http.Client{Transport: transport}
49+
httpClient := &http.Client{Transport: transport, Timeout: daemonRequestTimeout}
4950
client, err := api.NewClientWithResponses("http://druid", api.WithHTTPClient(httpClient))
5051
if err != nil {
5152
return nil, err
@@ -69,9 +70,6 @@ func (c *OpenAPIClient) CreateScroll(ctx context.Context, name string, artifact
6970
if err != nil {
7071
return nil, err
7172
}
72-
if res.StatusCode() == http.StatusNotImplemented {
73-
return nil, ErrMaterializationUnsupported
74-
}
7573
if err := ensureStatus(res.StatusCode(), res.Body); err != nil {
7674
return nil, err
7775
}

apps/druid/adapters/daemonclient/openapi_client_test.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,24 @@ import (
99
"github.com/highcard-dev/daemon/internal/api"
1010
)
1111

12+
func TestOpenAPIClientHasDaemonTimeout(t *testing.T) {
13+
socketClient, err := NewOpenAPIClientForTarget("/tmp/druid-test.sock", "")
14+
if err != nil {
15+
t.Fatal(err)
16+
}
17+
if socketClient.httpClient.Timeout != daemonRequestTimeout {
18+
t.Fatalf("socket timeout = %s, want %s", socketClient.httpClient.Timeout, daemonRequestTimeout)
19+
}
20+
21+
urlClient, err := NewOpenAPIClientForTarget("", "http://127.0.0.1:1")
22+
if err != nil {
23+
t.Fatal(err)
24+
}
25+
if urlClient.httpClient.Timeout != daemonRequestTimeout {
26+
t.Fatalf("url timeout = %s, want %s", urlClient.httpClient.Timeout, daemonRequestTimeout)
27+
}
28+
}
29+
1230
func TestCreateScrollDoesNotSendStart(t *testing.T) {
1331
var got map[string]interface{}
1432
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

apps/druid/adapters/http/handlers/scroll_handler.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,6 @@ func (h *ScrollHandler) CreateScroll(c *fiber.Ctx) error {
7979
if errors.Is(err, domain.ErrRuntimeScrollAlreadyExists) {
8080
return fiber.NewError(fiber.StatusConflict, err.Error())
8181
}
82-
if errors.Is(err, appservices.ErrRuntimeMaterializationUnsupported) {
83-
return fiber.NewError(fiber.StatusNotImplemented, err.Error())
84-
}
8582
return err
8683
}
8784
return c.Status(fiber.StatusCreated).JSON(runtimeScroll)
@@ -108,9 +105,6 @@ func (h *ScrollHandler) EnsureScroll(c *fiber.Ctx) error {
108105
}
109106
runtimeScroll, err := h.supervisor.EnsureWithOwner(request.Artifact, name, ownerID, namespace, registryCredentials(request.RegistryCredentials))
110107
if err != nil {
111-
if errors.Is(err, appservices.ErrRuntimeMaterializationUnsupported) {
112-
return fiber.NewError(fiber.StatusNotImplemented, err.Error())
113-
}
114108
return err
115109
}
116110
return c.JSON(runtimeScroll)

apps/druid/core/services/runtime_materialization.go

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,23 +9,19 @@ import (
99

1010
"github.com/highcard-dev/daemon/internal/core/domain"
1111
"github.com/highcard-dev/daemon/internal/core/ports"
12-
coreservices "github.com/highcard-dev/daemon/internal/core/services"
1312
"github.com/highcard-dev/daemon/internal/core/services/registry"
1413
"github.com/highcard-dev/daemon/internal/utils/logger"
1514
"go.uber.org/zap"
1615
)
1716

18-
var ErrRuntimeMaterializationUnsupported = errors.New("runtime backend does not support daemon materialization")
19-
20-
func (s *RuntimeSupervisor) materializeNewScroll(ctx context.Context, runtimeService ports.RuntimeBackendInterface, artifact string, name string, namespace string, registryCredentials []domain.RegistryCredential) (*ports.RuntimeMaterialization, error) {
21-
id := coreservices.RuntimeScrollIDFromName(name)
22-
if id == "" {
23-
return nil, ErrRuntimeMaterializationUnsupported
24-
}
25-
return s.runPullWorker(ctx, runtimeService, ports.RuntimeWorkerModeCreate, id, artifact, runtimeService.RootRef(id, namespace), registryCredentials)
17+
func (s *RuntimeSupervisor) materializeNewScroll(ctx context.Context, runtimeService ports.RuntimeBackendInterface, artifact string, runtimeID string, namespace string, registryCredentials []domain.RegistryCredential) (*ports.RuntimeMaterialization, error) {
18+
return s.runPullWorker(ctx, runtimeService, ports.RuntimeWorkerModeCreate, runtimeID, artifact, runtimeService.RootRef(runtimeID, namespace), registryCredentials)
2619
}
2720

2821
func (s *RuntimeSupervisor) runPullWorker(ctx context.Context, runtimeService ports.RuntimeBackendInterface, mode ports.RuntimeWorkerMode, runtimeID string, artifact string, root string, registryCredentials []domain.RegistryCredential) (*ports.RuntimeMaterialization, error) {
22+
if s.workerCallbacks == nil || s.workerCallbackURL == "" {
23+
return nil, fmt.Errorf("daemon materialization requires --worker-callback-url and --worker-callback-listen")
24+
}
2925
token, resultCh, err := s.workerCallbacks.Register(runtimeID)
3026
if err != nil {
3127
return nil, err

apps/druid/core/services/runtime_supervisor.go

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88
"sync"
99

10+
"github.com/google/uuid"
1011
"github.com/highcard-dev/daemon/internal/core/domain"
1112
"github.com/highcard-dev/daemon/internal/core/ports"
1213
coreservices "github.com/highcard-dev/daemon/internal/core/services"
@@ -83,54 +84,44 @@ func (s *RuntimeSupervisor) Create(artifact string, name string, registryCredent
8384

8485
func (s *RuntimeSupervisor) CreateWithOwner(artifact string, name string, ownerID string, namespace string, registryCredentials []domain.RegistryCredential) (*domain.RuntimeScroll, error) {
8586
id := coreservices.RuntimeScrollIDFromName(name)
86-
var placeholder *domain.RuntimeScroll
87-
if id != "" {
88-
if _, err := s.store.GetScroll(id); err == nil {
89-
return nil, fmt.Errorf("%w: %s", domain.ErrRuntimeScrollAlreadyExists, id)
90-
} else if !errors.Is(err, domain.ErrRuntimeScrollNotFound) {
91-
return nil, err
92-
}
93-
placeholder = &domain.RuntimeScroll{
94-
ID: id,
95-
OwnerID: ownerID,
96-
Artifact: artifact,
97-
Root: s.runtimeBackend.RootRef(id, namespace),
98-
Status: domain.RuntimeScrollStatusCreated,
99-
Commands: map[string]domain.LockStatus{},
100-
}
101-
if err := s.store.CreateScroll(placeholder); err != nil {
102-
return nil, err
103-
}
87+
if id == "" {
88+
id = uuid.NewString()
89+
}
90+
if _, err := s.store.GetScroll(id); err == nil {
91+
return nil, fmt.Errorf("%w: %s", domain.ErrRuntimeScrollAlreadyExists, id)
92+
} else if !errors.Is(err, domain.ErrRuntimeScrollNotFound) {
93+
return nil, err
94+
}
95+
placeholder := &domain.RuntimeScroll{
96+
ID: id,
97+
OwnerID: ownerID,
98+
Artifact: artifact,
99+
Root: s.runtimeBackend.RootRef(id, namespace),
100+
Status: domain.RuntimeScrollStatusCreated,
101+
Commands: map[string]domain.LockStatus{},
102+
}
103+
if err := s.store.CreateScroll(placeholder); err != nil {
104+
return nil, err
104105
}
105106
markPlaceholderError := func(cause error) {
106-
if placeholder == nil {
107-
return
108-
}
109107
placeholder.Status = domain.RuntimeScrollStatusError
110108
placeholder.LastError = cause.Error()
111109
_ = s.store.UpdateScroll(placeholder)
112110
}
113111

114-
materialized, err := s.materializeNewScroll(context.Background(), s.runtimeBackend, artifact, name, namespace, registryCredentials)
112+
materialized, err := s.materializeNewScroll(context.Background(), s.runtimeBackend, artifact, id, namespace, registryCredentials)
115113
if err != nil {
116114
markPlaceholderError(err)
117115
return nil, err
118116
}
119117
if materialized.Artifact != "" {
120118
artifact = materialized.Artifact
121119
}
122-
if placeholder != nil {
123-
placeholder, err = s.applyMaterializedScroll(placeholder, artifact, materialized)
124-
if err != nil {
125-
return nil, err
126-
}
127-
return placeholder, nil
128-
}
129-
runtimeScroll, err := s.manager.CreateWithDigest(artifact, materialized.ArtifactDigest, name, ownerID, materialized.Root, materialized.ScrollYAML)
120+
placeholder, err = s.applyMaterializedScroll(placeholder, artifact, materialized)
130121
if err != nil {
131122
return nil, err
132123
}
133-
return runtimeScroll, nil
124+
return placeholder, nil
134125
}
135126

136127
func (s *RuntimeSupervisor) Ensure(artifact string, name string, registryCredentials []domain.RegistryCredential) (*domain.RuntimeScroll, error) {
@@ -161,7 +152,7 @@ func (s *RuntimeSupervisor) EnsureWithOwner(artifact string, name string, ownerI
161152
if artifact == "" {
162153
artifact = runtimeScroll.Artifact
163154
}
164-
materialized, err := s.materializeNewScroll(context.Background(), s.runtimeBackend, artifact, name, namespace, registryCredentials)
155+
materialized, err := s.materializeNewScroll(context.Background(), s.runtimeBackend, artifact, id, namespace, registryCredentials)
165156
if err != nil {
166157
runtimeScroll.Status = domain.RuntimeScrollStatusError
167158
runtimeScroll.LastError = err.Error()

apps/druid/core/services/runtime_supervisor_test.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,38 @@ func TestRuntimeSupervisorCreateCanCreate(t *testing.T) {
218218
}
219219
}
220220

221+
func TestRuntimeSupervisorCreateGeneratesIDWhenNameOmitted(t *testing.T) {
222+
store := newTestStateStore(t)
223+
callbacks := NewWorkerCallbackManager()
224+
backend := &fakeWorkerBackend{callbacks: callbacks, scrollYAML: cachedScrollYAML("start"), digest: "sha256:generated"}
225+
supervisor := NewRuntimeSupervisor(
226+
store,
227+
coreservices.NewRuntimeScrollManager(store),
228+
backend,
229+
)
230+
supervisor.SetWorkerCallbacks(callbacks, "http://druid-cli:8083")
231+
232+
runtimeScroll, err := supervisor.Create("registry.local/lab:1.0", "", nil)
233+
if err != nil {
234+
t.Fatal(err)
235+
}
236+
if runtimeScroll.ID == "" || runtimeScroll.ID == "cached" {
237+
t.Fatalf("id = %q, want generated runtime id independent from scroll.yaml name", runtimeScroll.ID)
238+
}
239+
if runtimeScroll.ScrollName != "cached" {
240+
t.Fatalf("scroll name = %q, want cached", runtimeScroll.ScrollName)
241+
}
242+
if backend.action.RuntimeID != runtimeScroll.ID || backend.action.RootRef != backend.RootRef(runtimeScroll.ID, "") {
243+
t.Fatalf("worker action = %#v scroll = %#v", backend.action, runtimeScroll)
244+
}
245+
if backend.action.Mode != ports.RuntimeWorkerModeCreate || backend.action.CallbackToken == "" {
246+
t.Fatalf("worker action = %#v", backend.action)
247+
}
248+
if runtimeScroll.ArtifactDigest != "sha256:generated" || runtimeScroll.Status != domain.RuntimeScrollStatusCreated {
249+
t.Fatalf("runtime scroll = %#v", runtimeScroll)
250+
}
251+
}
252+
221253
func TestRuntimeSupervisorCreateUsesPullWorkerBeforeStateMutation(t *testing.T) {
222254
store := newTestStateStore(t)
223255
callbacks := NewWorkerCallbackManager()
@@ -250,6 +282,52 @@ func TestRuntimeSupervisorCreateUsesPullWorkerBeforeStateMutation(t *testing.T)
250282
}
251283
}
252284

285+
func TestRuntimeSupervisorCreateWorkerFailureLeavesGeneratedPlaceholder(t *testing.T) {
286+
store := newTestStateStore(t)
287+
callbacks := NewWorkerCallbackManager()
288+
backend := &fakeWorkerBackend{callbacks: callbacks, workerErr: errors.New("pull image failed")}
289+
supervisor := NewRuntimeSupervisor(
290+
store,
291+
coreservices.NewRuntimeScrollManager(store),
292+
backend,
293+
)
294+
supervisor.SetWorkerCallbacks(callbacks, "http://druid-cli:8083")
295+
296+
if _, err := supervisor.Create("registry.local/missing:1.0", "", nil); err == nil {
297+
t.Fatal("Create error = nil, want worker error")
298+
}
299+
scrolls, err := store.ListScrolls()
300+
if err != nil {
301+
t.Fatal(err)
302+
}
303+
if len(scrolls) != 1 {
304+
t.Fatalf("scrolls = %#v, want one failed placeholder", scrolls)
305+
}
306+
if scrolls[0].Status != domain.RuntimeScrollStatusError || !strings.Contains(scrolls[0].LastError, "pull image failed") {
307+
t.Fatalf("placeholder = %#v, want remembered worker failure", scrolls[0])
308+
}
309+
}
310+
311+
func TestRuntimeSupervisorCreateRequiresWorkerCallbackConfig(t *testing.T) {
312+
store := newTestStateStore(t)
313+
supervisor := NewRuntimeSupervisor(
314+
store,
315+
coreservices.NewRuntimeScrollManager(store),
316+
&fakeWorkerBackend{scrollYAML: cachedScrollYAML("start")},
317+
)
318+
319+
if _, err := supervisor.Create("registry.local/lab:1.0", "missing-callbacks", nil); err == nil || !strings.Contains(err.Error(), "daemon materialization requires --worker-callback-url and --worker-callback-listen") {
320+
t.Fatalf("Create error = %v, want explicit callback config error", err)
321+
}
322+
runtimeScroll, err := store.GetScroll("missing-callbacks")
323+
if err != nil {
324+
t.Fatal(err)
325+
}
326+
if runtimeScroll.Status != domain.RuntimeScrollStatusError || !strings.Contains(runtimeScroll.LastError, "--worker-callback-url") {
327+
t.Fatalf("runtime scroll = %#v, want callback config error", runtimeScroll)
328+
}
329+
}
330+
253331
func TestRuntimeSupervisorCreateUsesRequestedNamespaceForRoot(t *testing.T) {
254332
store := newTestStateStore(t)
255333
callbacks := NewWorkerCallbackManager()

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/Masterminds/semver/v3 v3.2.1
99
github.com/gofiber/contrib/websocket v1.3.4
1010
github.com/gofiber/fiber/v2 v2.52.9
11+
github.com/google/uuid v1.6.0
1112
github.com/opencontainers/image-spec v1.1.1
1213
github.com/spf13/cobra v1.9.1
1314
github.com/spf13/viper v1.20.1
@@ -22,7 +23,6 @@ require (
2223
github.com/fsnotify/fsnotify v1.9.0
2324
github.com/go-openapi/jsonpointer v0.21.0 // indirect
2425
github.com/go-openapi/swag v0.23.1 // indirect
25-
github.com/google/uuid v1.6.0 // indirect
2626
github.com/inconshreveable/mousetrap v1.1.0 // indirect
2727
github.com/joho/godotenv v1.5.1
2828
github.com/josharian/intern v1.0.0 // indirect

0 commit comments

Comments
 (0)