Skip to content

Commit 8017d4e

Browse files
committed
re-implement compose logic
- build observed state - compute reconciliation plan by comparing observed vs desired state - execute plan this allow to decorelate reconciliation (aka "convergence") logic from docker API, and write simpler and efficient tests to cover various scenarios Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
1 parent e8c2143 commit 8017d4e

16 files changed

Lines changed: 3282 additions & 432 deletions

pkg/compose/convergence.go

Lines changed: 0 additions & 380 deletions
Large diffs are not rendered by default.

pkg/compose/create.go

Lines changed: 81 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -87,82 +87,116 @@ func (s *composeService) create(ctx context.Context, project *types.Project, opt
8787

8888
prepareNetworks(project)
8989

90-
networks, err := s.ensureNetworks(ctx, project)
90+
// Temporary implementation of use_api_socket until we get actual support inside docker engine
91+
project, err = s.useAPISocket(project)
9192
if err != nil {
9293
return err
9394
}
9495

95-
volumes, err := s.ensureProjectVolumes(ctx, project)
96+
// Phase 1: Inspect current state
97+
observed, err := s.InspectState(ctx, project)
9698
if err != nil {
9799
return err
98100
}
99101

100-
var observedState Containers
101-
observedState, err = s.getContainers(ctx, project.Name, oneOffInclude, true)
102-
if err != nil {
103-
return err
104-
}
105-
orphans := observedState.filter(isOrphaned(project))
106-
if len(orphans) > 0 && !options.IgnoreOrphans {
107-
if options.RemoveOrphans {
108-
err := s.removeContainers(ctx, orphans, nil, nil, false)
109-
if err != nil {
110-
return err
111-
}
112-
} else {
102+
// Handle orphan containers
103+
if len(observed.Orphans) > 0 && !options.IgnoreOrphans {
104+
if !options.RemoveOrphans {
113105
logrus.Warnf("Found orphan containers (%s) for this project. If "+
114106
"you removed or renamed this service in your compose "+
115107
"file, you can run this command with the "+
116-
"--remove-orphans flag to clean it up.", orphans.names())
108+
"--remove-orphans flag to clean it up.", observed.Orphans.names())
117109
}
118110
}
119111

120-
// Temporary implementation of use_api_socket until we get actual support inside docker engine
121-
project, err = s.useAPISocket(project)
122-
if err != nil {
112+
// Validate external networks exist before reconciling
113+
if err := s.validateExternalNetworks(ctx, project, options.Services); err != nil {
123114
return err
124115
}
125116

126-
return newConvergence(options.Services, observedState, networks, volumes, s).apply(ctx, project, options)
127-
}
117+
// Phase 2: Reconcile desired vs observed state (pure function)
118+
plan, err := Reconcile(project, observed, ReconcileOptions{
119+
Recreate: options.Recreate,
120+
RecreateDependencies: options.RecreateDependencies,
121+
Services: options.Services,
122+
Inherit: options.Inherit,
123+
Timeout: options.Timeout,
124+
RemoveOrphans: options.RemoveOrphans,
125+
})
126+
if err != nil {
127+
return err
128+
}
128129

129-
func prepareNetworks(project *types.Project) {
130-
for k, nw := range project.Networks {
131-
nw.CustomLabels = nw.CustomLabels.
132-
Add(api.NetworkLabel, k).
133-
Add(api.ProjectLabel, project.Name).
134-
Add(api.VersionLabel, api.ComposeVersion)
135-
project.Networks[k] = nw
130+
if plan.IsEmpty() {
131+
return nil
136132
}
133+
134+
s.emitUntouchedContainerEvents(project, observed, plan)
135+
136+
// Phase 3: Execute the plan
137+
return s.ExecutePlan(ctx, project, plan)
137138
}
138139

139-
func (s *composeService) ensureNetworks(ctx context.Context, project *types.Project) (map[string]string, error) {
140-
networks := map[string]string{}
141-
for name, nw := range project.Networks {
142-
id, err := s.ensureNetwork(ctx, project, name, &nw)
143-
if err != nil {
144-
return nil, err
140+
// emitUntouchedContainerEvents emits progress events for containers that are
141+
// already up-to-date and running, so that callers (e.g. scale) can see them.
142+
func (s *composeService) emitUntouchedContainerEvents(project *types.Project, observed *ObservedState, plan *ReconciliationPlan) {
143+
for _, service := range project.Services {
144+
for _, ctr := range observed.Containers[service.Name] {
145+
ctrName := getCanonicalContainerName(ctr)
146+
if _, touched := plan.Operations["recreate-container:"+ctrName]; touched {
147+
continue
148+
}
149+
if _, touched := plan.Operations["stop-container:"+ctrName]; touched {
150+
continue
151+
}
152+
if _, touched := plan.Operations["create-container:"+ctrName]; touched {
153+
continue
154+
}
155+
if ctr.State == container.StateRunning {
156+
s.events.On(runningEvent(getContainerProgressName(ctr)))
157+
}
145158
}
146-
networks[name] = id
147-
project.Networks[name] = nw
148159
}
149-
return networks, nil
150160
}
151161

152-
func (s *composeService) ensureProjectVolumes(ctx context.Context, project *types.Project) (map[string]string, error) {
153-
ids := map[string]string{}
154-
for k, volume := range project.Volumes {
155-
volume.CustomLabels = volume.CustomLabels.Add(api.VolumeLabel, k)
156-
volume.CustomLabels = volume.CustomLabels.Add(api.ProjectLabel, project.Name)
157-
volume.CustomLabels = volume.CustomLabels.Add(api.VersionLabel, api.ComposeVersion)
158-
id, err := s.ensureVolume(ctx, k, volume, project)
162+
// validateExternalNetworks checks that external networks exist for services
163+
// that are part of the current operation. Returns an error if a required
164+
// external network is not found.
165+
func (s *composeService) validateExternalNetworks(ctx context.Context, project *types.Project, services []string) error {
166+
for key, net := range project.Networks {
167+
if !net.External {
168+
continue
169+
}
170+
// Check if any targeted service uses this network
171+
usedByTargetedService := false
172+
for _, service := range project.Services {
173+
if len(services) > 0 && !slices.Contains(services, service.Name) {
174+
continue
175+
}
176+
if _, ok := service.Networks[key]; ok {
177+
usedByTargetedService = true
178+
break
179+
}
180+
}
181+
if !usedByTargetedService {
182+
continue
183+
}
184+
_, err := s.resolveExternalNetwork(ctx, &net)
159185
if err != nil {
160-
return nil, err
186+
return err
161187
}
162-
ids[k] = id
163188
}
189+
return nil
190+
}
164191

165-
return ids, nil
192+
func prepareNetworks(project *types.Project) {
193+
for k, nw := range project.Networks {
194+
nw.CustomLabels = nw.CustomLabels.
195+
Add(api.NetworkLabel, k).
196+
Add(api.ProjectLabel, project.Name).
197+
Add(api.VersionLabel, api.ComposeVersion)
198+
project.Networks[k] = nw
199+
}
166200
}
167201

168202
//nolint:gocyclo

pkg/compose/observed_state.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
Copyright 2020 Docker Compose CLI authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package compose
18+
19+
import (
20+
"context"
21+
22+
"github.com/compose-spec/compose-go/v2/types"
23+
"github.com/moby/moby/client"
24+
"golang.org/x/sync/errgroup"
25+
26+
"github.com/docker/compose/v5/pkg/api"
27+
)
28+
29+
// ObservedState captures the current state of a Compose project as seen by the Docker engine.
30+
type ObservedState struct {
31+
ProjectName string
32+
Containers map[string]Containers
33+
Networks map[string]ObservedNetwork
34+
Volumes map[string]ObservedVolume
35+
Orphans Containers
36+
}
37+
38+
// ObservedNetwork represents a Docker network associated with a Compose project.
39+
type ObservedNetwork struct {
40+
ID string
41+
Name string
42+
Driver string
43+
Labels map[string]string
44+
ConfigHash string
45+
}
46+
47+
// ObservedVolume represents a Docker volume associated with a Compose project.
48+
type ObservedVolume struct {
49+
Name string
50+
Driver string
51+
Labels map[string]string
52+
ConfigHash string
53+
}
54+
55+
// allContainers returns all containers from the observed state (across all services and orphans).
56+
func (s *ObservedState) allContainers() Containers {
57+
var result Containers
58+
for _, ctrs := range s.Containers {
59+
result = append(result, ctrs...)
60+
}
61+
result = append(result, s.Orphans...)
62+
return result
63+
}
64+
65+
// InspectState queries the Docker engine to build an ObservedState for the given project.
66+
func (s *composeService) InspectState(ctx context.Context, project *types.Project) (*ObservedState, error) {
67+
var (
68+
allContainers Containers
69+
networks client.NetworkListResult
70+
volumes client.VolumeListResult
71+
)
72+
73+
eg, ctx := errgroup.WithContext(ctx)
74+
75+
eg.Go(func() error {
76+
var err error
77+
allContainers, err = s.getContainers(ctx, project.Name, oneOffInclude, true)
78+
return err
79+
})
80+
81+
eg.Go(func() error {
82+
var err error
83+
networks, err = s.apiClient().NetworkList(ctx, client.NetworkListOptions{
84+
Filters: projectFilter(project.Name),
85+
})
86+
return err
87+
})
88+
89+
eg.Go(func() error {
90+
var err error
91+
volumes, err = s.apiClient().VolumeList(ctx, client.VolumeListOptions{
92+
Filters: projectFilter(project.Name),
93+
})
94+
return err
95+
})
96+
97+
if err := eg.Wait(); err != nil {
98+
return nil, err
99+
}
100+
101+
// Partition containers by service
102+
containersByService := map[string]Containers{}
103+
for _, c := range allContainers {
104+
service := c.Labels[api.ServiceLabel]
105+
containersByService[service] = append(containersByService[service], c)
106+
}
107+
108+
// Identify orphan containers
109+
orphans := allContainers.filter(isOrphaned(project))
110+
111+
// Map networks by their Compose network name
112+
observedNetworks := map[string]ObservedNetwork{}
113+
for _, n := range networks.Items {
114+
name := n.Labels[api.NetworkLabel]
115+
observedNetworks[name] = ObservedNetwork{
116+
ID: n.ID,
117+
Name: n.Name,
118+
Driver: n.Driver,
119+
Labels: n.Labels,
120+
ConfigHash: n.Labels[api.ConfigHashLabel],
121+
}
122+
}
123+
124+
// Map volumes by their Compose volume name
125+
observedVolumes := map[string]ObservedVolume{}
126+
for _, v := range volumes.Items {
127+
name := v.Labels[api.VolumeLabel]
128+
observedVolumes[name] = ObservedVolume{
129+
Name: v.Name,
130+
Driver: v.Driver,
131+
Labels: v.Labels,
132+
ConfigHash: v.Labels[api.ConfigHashLabel],
133+
}
134+
}
135+
136+
return &ObservedState{
137+
ProjectName: project.Name,
138+
Containers: containersByService,
139+
Networks: observedNetworks,
140+
Volumes: observedVolumes,
141+
Orphans: orphans,
142+
}, nil
143+
}

0 commit comments

Comments
 (0)