Skip to content

Commit fb7eff4

Browse files
fix: paginate ListNotCompletedDeployments in pipedv1
piped was calling ListNotCompletedDeployments once per sync tick and throwing away the cursor on the response. If the datastore returned a partial page, every deployment past the first page would be silently skipped until the next tick — showing up as stuck PENDING or PLANNED. Fix: - Add cursor and page_size to ListNotCompletedDeploymentsRequest proto - Forward them in PipedAPI.ListNotCompletedDeployments into the datastore ListOptions so the server honours the client's position - Replace the single RPC call in the pipedv1 deployment store with a loop that follows the cursor until the server returns an empty one, accumulating all deployments before classifying them - Add table-driven tests for the v1 store covering single-page, multi-page, and all deployment status classifications v0 store is intentionally untouched (frozen). Fixes #6696 Signed-off-by: Shridhar Panigrahi <sridharpanigrahi2006@gmail.com>
1 parent 78fdbeb commit fb7eff4

6 files changed

Lines changed: 954 additions & 787 deletions

File tree

pkg/app/pipedv1/apistore/deploymentstore/store.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -103,23 +103,30 @@ func (s *store) Lister() Lister {
103103
}
104104

105105
func (s *store) sync(ctx context.Context) error {
106-
// TODO: Call ListNotCompletedDeployments itervally until all required deployments are fetched.
107-
resp, err := s.apiClient.ListNotCompletedDeployments(ctx, &pipedservice.ListNotCompletedDeploymentsRequest{})
108-
if err != nil {
109-
s.logger.Error("failed to list unhandled deployment", zap.Error(err))
110-
return err
111-
}
112-
113106
var pendings, planneds, runnings []*model.Deployment
114-
for _, d := range resp.Deployments {
115-
switch d.Status {
116-
case model.DeploymentStatus_DEPLOYMENT_PENDING:
117-
pendings = append(pendings, d)
118-
case model.DeploymentStatus_DEPLOYMENT_PLANNED:
119-
planneds = append(planneds, d)
120-
case model.DeploymentStatus_DEPLOYMENT_RUNNING, model.DeploymentStatus_DEPLOYMENT_ROLLING_BACK:
121-
runnings = append(runnings, d)
107+
cursor := ""
108+
for {
109+
resp, err := s.apiClient.ListNotCompletedDeployments(ctx, &pipedservice.ListNotCompletedDeploymentsRequest{
110+
Cursor: cursor,
111+
})
112+
if err != nil {
113+
s.logger.Error("failed to list unhandled deployment", zap.Error(err))
114+
return err
115+
}
116+
for _, d := range resp.Deployments {
117+
switch d.Status {
118+
case model.DeploymentStatus_DEPLOYMENT_PENDING:
119+
pendings = append(pendings, d)
120+
case model.DeploymentStatus_DEPLOYMENT_PLANNED:
121+
planneds = append(planneds, d)
122+
case model.DeploymentStatus_DEPLOYMENT_RUNNING, model.DeploymentStatus_DEPLOYMENT_ROLLING_BACK:
123+
runnings = append(runnings, d)
124+
}
125+
}
126+
if resp.Cursor == "" {
127+
break
122128
}
129+
cursor = resp.Cursor
123130
}
124131

125132
headDeployments := make(map[string]*model.Deployment)
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright 2024 The PipeCD Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package deploymentstore
16+
17+
import (
18+
"context"
19+
"testing"
20+
21+
"github.com/stretchr/testify/assert"
22+
"github.com/stretchr/testify/require"
23+
"go.uber.org/zap"
24+
"google.golang.org/grpc"
25+
26+
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
27+
"github.com/pipe-cd/pipecd/pkg/model"
28+
)
29+
30+
// fakeAPIClient returns a sequence of pages, one per call.
31+
type fakeAPIClient struct {
32+
pages []*pipedservice.ListNotCompletedDeploymentsResponse
33+
call int
34+
err error
35+
}
36+
37+
func (f *fakeAPIClient) ListNotCompletedDeployments(_ context.Context, _ *pipedservice.ListNotCompletedDeploymentsRequest, _ ...grpc.CallOption) (*pipedservice.ListNotCompletedDeploymentsResponse, error) {
38+
if f.err != nil {
39+
return nil, f.err
40+
}
41+
resp := f.pages[f.call]
42+
f.call++
43+
return resp, nil
44+
}
45+
46+
func makeDeployment(id, appID string, status model.DeploymentStatus) *model.Deployment {
47+
return &model.Deployment{Id: id, ApplicationId: appID, Status: status}
48+
}
49+
50+
func TestSync(t *testing.T) {
51+
pending := makeDeployment("d-pending", "app-1", model.DeploymentStatus_DEPLOYMENT_PENDING)
52+
planned := makeDeployment("d-planned", "app-2", model.DeploymentStatus_DEPLOYMENT_PLANNED)
53+
running := makeDeployment("d-running", "app-3", model.DeploymentStatus_DEPLOYMENT_RUNNING)
54+
rollingBack := makeDeployment("d-rolling-back", "app-4", model.DeploymentStatus_DEPLOYMENT_ROLLING_BACK)
55+
56+
tests := []struct {
57+
name string
58+
pages []*pipedservice.ListNotCompletedDeploymentsResponse
59+
wantPendings []*model.Deployment
60+
wantPlanneds []*model.Deployment
61+
wantRunnings []*model.Deployment
62+
wantHeadAppIDs []string
63+
}{
64+
{
65+
name: "empty response",
66+
pages: []*pipedservice.ListNotCompletedDeploymentsResponse{
67+
{Deployments: nil, Cursor: ""},
68+
},
69+
wantPendings: nil,
70+
wantPlanneds: nil,
71+
wantRunnings: nil,
72+
wantHeadAppIDs: nil,
73+
},
74+
{
75+
name: "single page with all statuses",
76+
pages: []*pipedservice.ListNotCompletedDeploymentsResponse{
77+
{Deployments: []*model.Deployment{pending, planned, running, rollingBack}, Cursor: ""},
78+
},
79+
wantPendings: []*model.Deployment{pending},
80+
wantPlanneds: []*model.Deployment{planned},
81+
wantRunnings: []*model.Deployment{running, rollingBack},
82+
wantHeadAppIDs: []string{"app-1", "app-2", "app-3", "app-4"},
83+
},
84+
{
85+
name: "multiple pages are all fetched",
86+
pages: []*pipedservice.ListNotCompletedDeploymentsResponse{
87+
{Deployments: []*model.Deployment{pending}, Cursor: "page2"},
88+
{Deployments: []*model.Deployment{planned, running}, Cursor: ""},
89+
},
90+
wantPendings: []*model.Deployment{pending},
91+
wantPlanneds: []*model.Deployment{planned},
92+
wantRunnings: []*model.Deployment{running},
93+
wantHeadAppIDs: []string{"app-1", "app-2", "app-3"},
94+
},
95+
{
96+
name: "rolling back is classified as running",
97+
pages: []*pipedservice.ListNotCompletedDeploymentsResponse{
98+
{Deployments: []*model.Deployment{rollingBack}, Cursor: ""},
99+
},
100+
wantPendings: nil,
101+
wantPlanneds: nil,
102+
wantRunnings: []*model.Deployment{rollingBack},
103+
wantHeadAppIDs: []string{"app-4"},
104+
},
105+
}
106+
107+
for _, tc := range tests {
108+
t.Run(tc.name, func(t *testing.T) {
109+
s := &store{
110+
apiClient: &fakeAPIClient{pages: tc.pages},
111+
logger: zap.NewNop(),
112+
}
113+
114+
err := s.sync(context.Background())
115+
require.NoError(t, err)
116+
117+
assert.Equal(t, tc.wantPendings, s.ListPendings())
118+
assert.Equal(t, tc.wantPlanneds, s.ListPlanneds())
119+
assert.Equal(t, tc.wantRunnings, s.ListRunnings())
120+
121+
heads := s.ListAppHeadDeployments()
122+
if len(tc.wantHeadAppIDs) == 0 {
123+
assert.Empty(t, heads)
124+
} else {
125+
assert.Len(t, heads, len(tc.wantHeadAppIDs))
126+
for _, appID := range tc.wantHeadAppIDs {
127+
assert.Contains(t, heads, appID)
128+
}
129+
}
130+
})
131+
}
132+
}

pkg/app/server/grpcapi/piped_api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,6 +372,8 @@ func (a *PipedAPI) ListNotCompletedDeployments(ctx context.Context, req *pipedse
372372
Value: model.GetNotCompletedDeploymentStatuses(),
373373
},
374374
},
375+
Limit: int(req.PageSize),
376+
Cursor: req.Cursor,
375377
}
376378

377379
deployments, cursor, err := a.deploymentStore.List(ctx, opts)

0 commit comments

Comments
 (0)