Skip to content

Commit 272529a

Browse files
authored
Merge pull request #184 from anyproto/GO-4043-archive-cold-spaces
GO-4043 archive cold spaces
2 parents 4a4e0e0 + 471efe8 commit 272529a

39 files changed

Lines changed: 2530 additions & 720 deletions

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ vendor
2424

2525
# database
2626
db
27+
anyDb
28+
29+
#network config
30+
N*.yml
2731

2832
# artefacts for Intelli-J fleet
2933
.fleet

Makefile

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
.PHONY: proto build test deps
22
SHELL=/usr/bin/env bash
33
export GOPRIVATE=github.com/anyproto
4-
export PATH:=deps:$(PATH)
4+
export PATH:=$(CURDIR)/deps:$(PATH)
55
export CGO_ENABLED:=1
66
BUILD_GOOS:=$(shell go env GOOS)
77
BUILD_GOARCH:=$(shell go env GOARCH)
@@ -52,7 +52,11 @@ deps:
5252
go build -o deps google.golang.org/protobuf/cmd/protoc-gen-go
5353
go build -o deps github.com/planetscale/vtprotobuf/cmd/protoc-gen-go-vtproto
5454
go build -o deps github.com/ahmetb/govvv
55+
go build -o deps go.uber.org/mock/mockgen
5556

57+
mocks:
58+
echo 'Generating mocks...'
59+
go generate ./...
5660

5761
proto:
5862
$(call generate_drpc,,nodesync/nodesyncproto/protos)

archive/archive.go

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
//go:generate mockgen -destination mock_archive/mock_archive.go github.com/anyproto/any-sync-node/archive Archive
2+
3+
package archive
4+
5+
import (
6+
"compress/gzip"
7+
"context"
8+
"errors"
9+
"io"
10+
"os"
11+
"path/filepath"
12+
"time"
13+
14+
anystore "github.com/anyproto/any-store"
15+
"github.com/anyproto/any-sync/app"
16+
"github.com/anyproto/any-sync/app/logger"
17+
"github.com/anyproto/any-sync/metric"
18+
"github.com/anyproto/any-sync/util/periodicsync"
19+
"go.uber.org/zap"
20+
21+
"github.com/anyproto/any-sync-node/archive/archivestore"
22+
"github.com/anyproto/any-sync-node/nodestorage"
23+
"github.com/anyproto/any-sync-node/nodesync"
24+
)
25+
26+
const CName = "node.archive"
27+
28+
var log = logger.NewNamed(CName)
29+
30+
func New() Archive {
31+
return new(archive)
32+
}
33+
34+
type Archive interface {
35+
app.ComponentRunnable
36+
Restore(ctx context.Context, spaceId string) (err error)
37+
}
38+
39+
type archive struct {
40+
storageProvider nodestorage.NodeStorage
41+
archiveStore archivestore.ArchiveStore
42+
config Config
43+
checker periodicsync.PeriodicSync
44+
accessDurCutoff time.Duration
45+
stat *archiveStat
46+
syncWaiter <-chan struct{}
47+
runCtx context.Context
48+
runCtxCancel context.CancelFunc
49+
}
50+
51+
func (a *archive) Init(ap *app.App) (err error) {
52+
a.storageProvider = ap.MustComponent(nodestorage.CName).(nodestorage.NodeStorage)
53+
a.archiveStore = ap.MustComponent(archivestore.CName).(archivestore.ArchiveStore)
54+
a.config = ap.MustComponent("config").(configSource).GetArchive()
55+
if a.config.ArchiveAfterDays <= 0 {
56+
a.config.ArchiveAfterDays = 7
57+
}
58+
a.accessDurCutoff = time.Duration(a.config.ArchiveAfterDays) * time.Hour * 24
59+
a.syncWaiter = ap.MustComponent(nodesync.CName).(nodesync.NodeSync).WaitSyncOnStart()
60+
a.runCtx, a.runCtxCancel = context.WithCancel(context.Background())
61+
if a.config.CheckPeriodMinutes <= 0 {
62+
a.config.CheckPeriodMinutes = 2
63+
}
64+
period := time.Minute * time.Duration(a.config.CheckPeriodMinutes)
65+
a.checker = periodicsync.NewPeriodicSyncDuration(period, time.Hour, a.check, log)
66+
a.stat = new(archiveStat)
67+
if m := ap.Component(metric.CName); m != nil {
68+
registerMetric(a.stat, m.(metric.Metric).Registry())
69+
}
70+
return
71+
}
72+
73+
func (a *archive) Name() (name string) {
74+
return CName
75+
}
76+
77+
func (a *archive) Run(_ context.Context) (err error) {
78+
if !a.config.Enabled {
79+
return
80+
}
81+
go func() {
82+
select {
83+
case <-a.runCtx.Done():
84+
return
85+
case <-a.syncWaiter:
86+
}
87+
a.checker.Run()
88+
}()
89+
return
90+
}
91+
92+
var errArchived = errors.New("archived")
93+
94+
func (a *archive) Archive(ctx context.Context, spaceId string) (err error) {
95+
var gzSize, dbSize int64
96+
tmpDir, err := os.MkdirTemp("", spaceId)
97+
if err != nil {
98+
return
99+
}
100+
defer func() {
101+
_ = os.RemoveAll(tmpDir)
102+
}()
103+
err = a.storageProvider.TryLockAndOpenDb(ctx, spaceId, func(db anystore.DB) error {
104+
storePath := filepath.Join(tmpDir, "store.db")
105+
if err = db.Backup(ctx, storePath); err != nil {
106+
return err
107+
}
108+
gzPath, gzSz, dbSz, err := a.createGzipFromStore(tmpDir)
109+
if err != nil {
110+
return err
111+
}
112+
gzSize, dbSize = gzSz, dbSz
113+
114+
r, err := os.Open(gzPath)
115+
if err != nil {
116+
return err
117+
}
118+
defer func() {
119+
if cerr := r.Close(); err == nil && cerr != nil {
120+
err = cerr
121+
}
122+
}()
123+
124+
if err = a.archiveStore.Put(ctx, spaceId, r); err != nil {
125+
return err
126+
}
127+
128+
if err = a.storageProvider.IndexStorage().MarkArchived(ctx, spaceId, gzSize, dbSize); err != nil {
129+
return err
130+
}
131+
132+
_ = db.Close()
133+
_ = os.RemoveAll(a.storageProvider.StoreDir(spaceId))
134+
a.stat.archived.Add(1)
135+
return errArchived
136+
})
137+
138+
if errors.Is(err, errArchived) {
139+
return nil
140+
}
141+
return
142+
}
143+
144+
// createGzipFromStore creates store.gz from store.db inside spaceDir.
145+
// Returns path to .gz, its size and original db size.
146+
func (a *archive) createGzipFromStore(spaceDir string) (gzPath string, gzSize, dbSize int64, err error) {
147+
storePath := filepath.Join(spaceDir, "store.db")
148+
gzPath = filepath.Join(spaceDir, "store.gz")
149+
150+
storeFile, err := os.Open(storePath)
151+
if err != nil {
152+
return "", 0, 0, err
153+
}
154+
defer func() {
155+
if cerr := storeFile.Close(); err == nil && cerr != nil {
156+
err = cerr
157+
}
158+
}()
159+
160+
gzFile, err := os.Create(gzPath)
161+
if err != nil {
162+
return "", 0, 0, err
163+
}
164+
defer func() {
165+
if cerr := gzFile.Close(); err == nil && cerr != nil {
166+
err = cerr
167+
}
168+
}()
169+
170+
gw := gzip.NewWriter(gzFile)
171+
dbSize, err = io.Copy(gw, storeFile)
172+
if err != nil {
173+
_ = gw.Close()
174+
return "", 0, 0, err
175+
}
176+
177+
if err = gw.Close(); err != nil {
178+
return "", 0, 0, err
179+
}
180+
181+
info, err := gzFile.Stat()
182+
if err != nil {
183+
return "", 0, 0, err
184+
}
185+
gzSize = info.Size()
186+
187+
return gzPath, gzSize, dbSize, nil
188+
}
189+
190+
func (a *archive) Restore(ctx context.Context, spaceId string) (err error) {
191+
if err = a.restoreFile(ctx, spaceId); err != nil {
192+
_ = os.RemoveAll(a.storageProvider.StoreDir(spaceId))
193+
return err
194+
}
195+
if err = a.storageProvider.IndexStorage().SetSpaceStatus(ctx, spaceId, nodestorage.SpaceStatusOk, ""); err != nil {
196+
return
197+
}
198+
a.stat.restored.Add(1)
199+
return a.archiveStore.Delete(ctx, spaceId)
200+
}
201+
202+
func (a *archive) restoreFile(ctx context.Context, spaceId string) (err error) {
203+
reader, err := a.archiveStore.Get(ctx, spaceId)
204+
if err != nil {
205+
return
206+
}
207+
defer func() {
208+
_ = reader.Close()
209+
}()
210+
211+
gzipReader, err := gzip.NewReader(reader)
212+
if err != nil {
213+
return
214+
}
215+
216+
defer func() {
217+
_ = gzipReader.Close()
218+
}()
219+
220+
storeDir := a.storageProvider.StoreDir(spaceId)
221+
storePath := filepath.Join(storeDir, "store.db")
222+
if err = os.MkdirAll(storeDir, 0755); err != nil {
223+
return
224+
}
225+
226+
var cleanup = func() {
227+
_ = os.RemoveAll(storePath)
228+
}
229+
230+
storeFile, err := os.Create(storePath)
231+
if err != nil {
232+
cleanup()
233+
return
234+
}
235+
defer func() {
236+
if cErr := storeFile.Close(); cErr != nil {
237+
err = errors.Join(err, cErr)
238+
}
239+
if err != nil {
240+
cleanup()
241+
}
242+
}()
243+
244+
if _, err = io.Copy(storeFile, gzipReader); err != nil {
245+
return
246+
}
247+
return
248+
}
249+
250+
func (a *archive) check(ctx context.Context) error {
251+
indexStore := a.storageProvider.IndexStorage()
252+
deadline, _ := ctx.Deadline()
253+
var skip int
254+
for {
255+
log.Info("check spaces", zap.Time("lastAccessTime", time.Now().Add(-a.accessDurCutoff)))
256+
spaceId, err := indexStore.FindOldestInactiveSpace(ctx, a.accessDurCutoff, skip)
257+
if err != nil {
258+
if errors.Is(err, anystore.ErrDocNotFound) {
259+
return nil
260+
}
261+
return err
262+
}
263+
st := time.Now()
264+
if err = a.Archive(ctx, spaceId); err != nil {
265+
log.Error("space archive failed", zap.String("spaceId", spaceId), zap.Error(err))
266+
if errors.Is(err, nodestorage.ErrLocked) {
267+
skip++
268+
continue
269+
}
270+
return err
271+
}
272+
log.Info("space is archived", zap.String("spaceId", spaceId), zap.Duration("dur", time.Since(st)))
273+
if !deadline.IsZero() && deadline.Sub(time.Now()) < time.Minute*10 {
274+
return nil
275+
}
276+
}
277+
}
278+
279+
func (a *archive) Close(_ context.Context) (err error) {
280+
if a.checker != nil {
281+
a.checker.Close()
282+
}
283+
if a.runCtxCancel != nil {
284+
a.runCtxCancel()
285+
}
286+
return
287+
}

0 commit comments

Comments
 (0)