Skip to content

Commit 2a375ac

Browse files
authored
Merge pull request #212 from anyproto/GO-6461-fix-space-index
GO-6461 fix space index + spacechecker
2 parents 6567b53 + eebb2af commit 2a375ac

8 files changed

Lines changed: 500 additions & 1 deletion

File tree

cmd/any-sync-node.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ import (
5151
"github.com/anyproto/any-sync-node/account"
5252
"github.com/anyproto/any-sync-node/config"
5353
"github.com/anyproto/any-sync-node/debug/nodedebugrpc"
54+
"github.com/anyproto/any-sync-node/debug/spacechecker"
5455
"github.com/anyproto/any-sync-node/nodespace"
5556
"github.com/anyproto/any-sync-node/nodespace/nodecache"
5657
"github.com/anyproto/any-sync-node/nodestorage"
@@ -152,6 +153,7 @@ func Bootstrap(a *app.App) {
152153
Register(spacedeleter.New()).
153154
Register(peermanager.New()).
154155
Register(debugserver.New()).
156+
Register(spacechecker.New()).
155157
Register(nodedebugrpc.New()).
156158
Register(archivestore.New()).
157159
Register(archive.New()).

debug/nodedebugrpc/nodedebugrpc.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.uber.org/zap"
1919

2020
"github.com/anyproto/any-sync-node/debug/nodedebugrpc/nodedebugrpcproto"
21+
"github.com/anyproto/any-sync-node/debug/spacechecker"
2122
"github.com/anyproto/any-sync-node/nodespace"
2223
nodestorage "github.com/anyproto/any-sync-node/nodestorage"
2324
"github.com/anyproto/any-sync-node/nodesync"
@@ -45,6 +46,7 @@ type nodeDebugRpc struct {
4546
nodeConf nodeconf.Service
4647
server debugserver.DebugServer
4748
statService debugstat.StatService
49+
spaceChecker spacechecker.SpaceChecker
4850
}
4951

5052
type statsError struct {
@@ -61,8 +63,10 @@ func (s *nodeDebugRpc) Init(a *app.App) (err error) {
6163
s.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
6264
s.server = a.MustComponent(debugserver.CName).(debugserver.DebugServer)
6365
s.statService = a.MustComponent(debugstat.CName).(debugstat.StatService)
66+
s.spaceChecker = a.MustComponent(spacechecker.CName).(spacechecker.SpaceChecker)
6467
http.HandleFunc("/stat/{spaceId}", s.handleSpaceStats)
6568
http.HandleFunc("/stats", s.handleStats)
69+
http.HandleFunc("/check/{spaceId}", s.handleCheck)
6670
return nil
6771
}
6872

@@ -100,6 +104,39 @@ func (s *nodeDebugRpc) handleStats(rw http.ResponseWriter, req *http.Request) {
100104
_, _ = rw.Write(marshalled)
101105
}
102106

107+
func (s *nodeDebugRpc) handleCheck(rw http.ResponseWriter, req *http.Request) {
108+
spaceId := req.PathValue("spaceId")
109+
fix := req.URL.Query().Get("fix") == "1"
110+
111+
var (
112+
result spacechecker.Result
113+
err error
114+
)
115+
if fix {
116+
result, err = s.spaceChecker.Fix(req.Context(), spaceId)
117+
} else {
118+
result, err = s.spaceChecker.Check(req.Context(), spaceId)
119+
}
120+
if err != nil {
121+
rw.Header().Set("Content-Type", "application/json")
122+
rw.WriteHeader(http.StatusInternalServerError)
123+
marshalledErr, _ := json.MarshalIndent(statsError{Error: err.Error()}, "", " ")
124+
rw.Write(marshalledErr)
125+
return
126+
}
127+
128+
rw.Header().Set("Content-Type", "application/json")
129+
marshalled, err := json.MarshalIndent(result, "", " ")
130+
if err != nil {
131+
log.Error("failed to marshal check result", zap.Error(err))
132+
rw.WriteHeader(http.StatusInternalServerError)
133+
rw.Write([]byte("{\"error\": \"failed to marshal check result\"}"))
134+
return
135+
}
136+
rw.WriteHeader(http.StatusOK)
137+
_, _ = rw.Write(marshalled)
138+
}
139+
103140
func (s *nodeDebugRpc) handleSpaceStats(rw http.ResponseWriter, req *http.Request) {
104141
spaceId := req.PathValue("spaceId")
105142
reqCtx := req.Context()

debug/spacechecker/spacechecker.go

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
package spacechecker
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"os"
8+
"path/filepath"
9+
10+
"github.com/anyproto/any-sync/app"
11+
"github.com/anyproto/any-sync/app/logger"
12+
"github.com/anyproto/any-sync/commonspace/spacestorage"
13+
"github.com/anyproto/any-sync/coordinator/coordinatorclient"
14+
"github.com/anyproto/any-sync/coordinator/coordinatorproto"
15+
"github.com/anyproto/any-sync/nodeconf"
16+
17+
"github.com/anyproto/any-sync-node/nodestorage"
18+
)
19+
20+
const CName = "node.debug.spacechecker"
21+
22+
var log = logger.NewNamed(CName)
23+
24+
type Result struct {
25+
SpaceId string `json:"spaceId"`
26+
LocalStatus string `json:"localStatus"`
27+
LocalStatusError string `json:"localStatusError,omitempty"`
28+
CoordinatorStatus string `json:"coordinatorStatus"`
29+
IsResponsible bool `json:"isResponsible"`
30+
SpaceStorageExists bool `json:"spaceStorageExists"`
31+
IsFixed bool `json:"isFixed"`
32+
Problems []string `json:"problems"`
33+
Log []string `json:"log"`
34+
}
35+
36+
type SpaceChecker interface {
37+
Check(ctx context.Context, spaceId string) (Result, error)
38+
Fix(ctx context.Context, spaceId string) (Result, error)
39+
app.Component
40+
}
41+
42+
func New() SpaceChecker {
43+
return &spaceChecker{}
44+
}
45+
46+
type spaceChecker struct {
47+
storageService nodestorage.NodeStorage
48+
coordClient coordinatorclient.CoordinatorClient
49+
nodeConf nodeconf.Service
50+
}
51+
52+
func (s *spaceChecker) Init(a *app.App) (err error) {
53+
s.storageService = a.MustComponent(spacestorage.CName).(nodestorage.NodeStorage)
54+
s.coordClient = a.MustComponent(coordinatorclient.CName).(coordinatorclient.CoordinatorClient)
55+
s.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
56+
return nil
57+
}
58+
59+
func (s *spaceChecker) Name() (name string) {
60+
return CName
61+
}
62+
63+
func (s *spaceChecker) Check(ctx context.Context, spaceId string) (res Result, err error) {
64+
res.SpaceId = spaceId
65+
66+
// 1. Get local status from index storage
67+
localStatusStr, localErr := s.getLocalStatus(ctx, spaceId, &res)
68+
if localErr != nil {
69+
return res, fmt.Errorf("get local status: %w", localErr)
70+
}
71+
72+
// 2. Get coordinator status
73+
coordStatusStr, coordErr := s.getCoordinatorStatus(ctx, spaceId, &res)
74+
if coordErr != nil {
75+
return res, fmt.Errorf("get coordinator status: %w", coordErr)
76+
}
77+
78+
// 3. Check if node is responsible
79+
res.IsResponsible = s.nodeConf.IsResponsible(spaceId)
80+
res.Log = append(res.Log, fmt.Sprintf("isResponsible: %v", res.IsResponsible))
81+
82+
// 4. Check if storage exists
83+
storeDir := s.storageService.StoreDir(spaceId)
84+
_, statErr := os.Stat(storeDir)
85+
res.SpaceStorageExists = statErr == nil
86+
res.Log = append(res.Log, fmt.Sprintf("spaceStorageExists: %v (path: %s)", res.SpaceStorageExists, storeDir))
87+
88+
// Validate state combinations
89+
s.validate(&res, localStatusStr, coordStatusStr)
90+
91+
return res, nil
92+
}
93+
94+
func (s *spaceChecker) Fix(ctx context.Context, spaceId string) (Result, error) {
95+
res, err := s.Check(ctx, spaceId)
96+
if err != nil {
97+
return res, err
98+
}
99+
if len(res.Problems) == 0 {
100+
return res, nil
101+
}
102+
103+
coordStatus := res.CoordinatorStatus
104+
localStatus := res.LocalStatus
105+
storageExists := res.SpaceStorageExists
106+
isResponsible := res.IsResponsible
107+
indexStorage := s.storageService.IndexStorage()
108+
109+
switch {
110+
// coordStatus: removed, localStatus: not removed or storageExists: true - remove space and switch local status
111+
case coordStatus == "removed" && (localStatus != "removed" || storageExists):
112+
if storageExists {
113+
err = s.storageService.DeleteSpaceStorage(ctx, spaceId)
114+
if err != nil && !errors.Is(err, spacestorage.ErrSpaceStorageMissing) {
115+
return res, fmt.Errorf("delete space storage: %w", err)
116+
}
117+
res.Log = append(res.Log, "fix: deleted space storage")
118+
}
119+
if localStatus != "removed" {
120+
err = indexStorage.SetSpaceStatus(ctx, spaceId, nodestorage.SpaceStatusRemove, "")
121+
if err != nil {
122+
return res, fmt.Errorf("set status removed: %w", err)
123+
}
124+
res.Log = append(res.Log, "fix: set local status to removed")
125+
}
126+
res.IsFixed = true
127+
128+
// coordStatus: remPrepare, localStatus: not remPrepare - switch local status
129+
case coordStatus == "remPrepare" && localStatus != "remPrepare":
130+
err = indexStorage.SetSpaceStatus(ctx, spaceId, nodestorage.SpaceStatusRemovePrepare, "")
131+
if err != nil {
132+
return res, fmt.Errorf("set status remPrepare: %w", err)
133+
}
134+
res.Log = append(res.Log, "fix: set local status to remPrepare")
135+
res.IsFixed = true
136+
137+
// coordStatus: ok, localStatus: ok, isResponsible: false - set notResponsible, move storage if exists
138+
case coordStatus == "ok" && localStatus == "ok" && !isResponsible:
139+
err = indexStorage.SetSpaceStatus(ctx, spaceId, nodestorage.SpaceStatusNotResponsible, "")
140+
if err != nil {
141+
return res, fmt.Errorf("set status notResponsible: %w", err)
142+
}
143+
res.Log = append(res.Log, "fix: set local status to notResponsible")
144+
if storageExists {
145+
srcDir := s.storageService.StoreDir(spaceId)
146+
dstDir := s.storageService.StoreDir(filepath.Join("notresponsible", spaceId))
147+
if err = os.MkdirAll(filepath.Dir(dstDir), 0755); err != nil {
148+
return res, fmt.Errorf("create notresponsible dir: %w", err)
149+
}
150+
if err = os.Rename(srcDir, dstDir); err != nil {
151+
return res, fmt.Errorf("move storage: %w", err)
152+
}
153+
res.Log = append(res.Log, fmt.Sprintf("fix: moved storage from %s to %s", srcDir, dstDir))
154+
}
155+
res.IsFixed = true
156+
157+
default:
158+
res.Log = append(res.Log, "fix: no automatic fix available for this state")
159+
}
160+
161+
return res, nil
162+
}
163+
164+
func (s *spaceChecker) getLocalStatus(ctx context.Context, spaceId string, res *Result) (statusStr string, err error) {
165+
status, err := s.storageService.IndexStorage().SpaceStatus(ctx, spaceId)
166+
if err != nil {
167+
res.LocalStatus = "unknown"
168+
res.LocalStatusError = err.Error()
169+
res.Log = append(res.Log, fmt.Sprintf("localStatus: error getting status: %s", err))
170+
return "unknown", err
171+
}
172+
statusStr = localStatusString(status)
173+
res.LocalStatus = statusStr
174+
res.Log = append(res.Log, fmt.Sprintf("localStatus: %s", statusStr))
175+
return statusStr, nil
176+
}
177+
178+
func (s *spaceChecker) getCoordinatorStatus(ctx context.Context, spaceId string, res *Result) (statusStr string, err error) {
179+
payload, err := s.coordClient.StatusCheck(ctx, spaceId)
180+
if err != nil {
181+
res.CoordinatorStatus = "error"
182+
res.Log = append(res.Log, fmt.Sprintf("coordinatorStatus: error: %s", err))
183+
return "error", err
184+
}
185+
status := payload.GetStatus()
186+
statusStr = coordStatusString(status)
187+
res.CoordinatorStatus = statusStr
188+
res.Log = append(res.Log, fmt.Sprintf("coordinatorStatus: %s", statusStr))
189+
return statusStr, nil
190+
}
191+
192+
func (s *spaceChecker) validate(res *Result, localStr string, coordStr string) {
193+
coordStatus := coordStr
194+
localStatus := localStr
195+
isResponsible := res.IsResponsible
196+
storageExists := res.SpaceStorageExists
197+
198+
valid := false
199+
switch {
200+
// coordStatus: ok, localStatus: ok, isResponsible: true, storageExists: true
201+
case coordStatus == "ok" && localStatus == "ok" && isResponsible && storageExists:
202+
valid = true
203+
// coordStatus: ok, localStatus: archived, isResponsible: true, storageExists: false
204+
case coordStatus == "ok" && localStatus == "archived" && isResponsible && !storageExists:
205+
valid = true
206+
// coordStatus: remPrepare, localStatus: remPrepare, isResponsible: true, storageExists: true
207+
case coordStatus == "remPrepare" && localStatus == "remPrepare" && isResponsible && storageExists:
208+
valid = true
209+
// coordStatus: remPrepare, localStatus: remPrepare, isResponsible: false, storageExists: false
210+
case coordStatus == "remPrepare" && localStatus == "remPrepare" && !isResponsible && !storageExists:
211+
valid = true
212+
// coordStatus: removed, localStatus: removed, storageExists: false
213+
case coordStatus == "removed" && localStatus == "removed" && !storageExists:
214+
valid = true
215+
}
216+
217+
if !valid {
218+
var problems []string
219+
if coordStatus == "error" {
220+
problems = append(problems, "coordinator_error")
221+
}
222+
if localStatus == "unknown" {
223+
problems = append(problems, "local_status_unknown")
224+
}
225+
problems = append(problems, fmt.Sprintf("invalid_state(C:%s,L:%s,R:%v,E:%v)", coordStatus, localStatus, isResponsible, storageExists))
226+
res.Problems = problems
227+
}
228+
}
229+
230+
func localStatusString(s nodestorage.SpaceStatus) string {
231+
switch s {
232+
case nodestorage.SpaceStatusOk:
233+
return "ok"
234+
case nodestorage.SpaceStatusRemove:
235+
return "removed"
236+
case nodestorage.SpaceStatusRemovePrepare:
237+
return "remPrepare"
238+
case nodestorage.SpaceStatusArchived:
239+
return "archived"
240+
case nodestorage.SpaceStatusError:
241+
return "error"
242+
case nodestorage.SpaceStatusNotResponsible:
243+
return "notResponsible"
244+
default:
245+
return fmt.Sprintf("unknown(%d)", s)
246+
}
247+
}
248+
249+
func coordStatusString(s coordinatorproto.SpaceStatus) string {
250+
switch s {
251+
case coordinatorproto.SpaceStatus_SpaceStatusCreated:
252+
return "ok"
253+
case coordinatorproto.SpaceStatus_SpaceStatusPendingDeletion:
254+
return "remPrepare"
255+
case coordinatorproto.SpaceStatus_SpaceStatusDeletionStarted:
256+
return "remPrepare"
257+
case coordinatorproto.SpaceStatus_SpaceStatusDeleted:
258+
return "removed"
259+
case coordinatorproto.SpaceStatus_SpaceStatusNotExists:
260+
return "notExists"
261+
default:
262+
return fmt.Sprintf("unknown(%d)", s)
263+
}
264+
}

0 commit comments

Comments
 (0)