Skip to content

Commit f71dbf3

Browse files
refi64neolynx
authored andcommitted
Split reflists to share their contents across snapshots
In current aptly, each repository and snapshot has its own reflist in the database. This brings a few problems with it: - Given a sufficiently large repositories and snapshots, these lists can get enormous, reaching >1MB. This is a problem for LevelDB's overall performance, as it tends to prefer values around the confiruged block size (defaults to just 4KiB). - When you take these large repositories and snapshot them, you have a full, new copy of the reflist, even if only a few packages changed. This means that having a lot of snapshots with a few changes causes the database to basically be full of largely duplicate reflists. - All the duplication also means that many of the same refs are being loaded repeatedly, which can cause some slowdown but, more notably, eats up huge amounts of memory. - Adding on more and more new repositories and snapshots will cause the time and memory spent on things like cleanup and publishing to grow roughly linearly. At the core, there are two problems here: - Reflists get very big because there are just a lot of packages. - Different reflists can tend to duplicate much of the same contents. *Split reflists* aim at solving this by separating reflists into 64 *buckets*. Package refs are sorted into individual buckets according to the following system: - Take the first 3 letters of the package name, after dropping a `lib` prefix. (Using only the first 3 letters will cause packages with similar prefixes to end up in the same bucket, under the assumption that packages with similar names tend to be updated together.) - Take the 64-bit xxhash of these letters. (xxhash was chosen because it relatively good distribution across the individual bits, which is important for the next step.) - Use the first 6 bits of the hash (range [0:63]) as an index into the buckets. Once refs are placed in buckets, a sha256 digest of all the refs in the bucket is taken. These buckets are then stored in the database, split into roughly block-sized segments, and all the repositories and snapshots simply store an array of bucket digests. This approach means that *repositories and snapshots can share their reflist buckets*. If a snapshot is taken of a repository, it will have the same contents, so its split reflist will point to the same buckets as the base repository, and only one copy of each bucket is stored in the database. When some packages in the repository change, only the buckets containing those packages will be modified; all the other buckets will remain unchanged, and thus their contents will still be shared. Later on, when these reflists are loaded, each bucket is only loaded once, short-cutting loaded many megabytes of data. In effect, split reflists are essentially copy-on-write, with only the changed buckets stored individually. Changing the disk format means that a migration needs to take place, so that task is moved into the database cleanup step, which will migrate reflists over to split reflists, as well as delete any unused reflist buckets. All the reflist tests are also changed to additionally test out split reflists; although the internal logic is all shared (since buckets are, themselves, just normal reflists), some special additions are needed to have native versions of the various reflist helper methods. In our tests, we've observed the following improvements: - Memory usage during publish and database cleanup, with `GOMEMLIMIT=2GiB`, goes down from ~3.2GiB (larger than the memory limit!) to ~0.7GiB, a decrease of ~4.5x. - Database size decreases from 1.3GB to 367MB. *In my local tests*, publish times had also decreased down to mere seconds but the same effect wasn't observed on the server, with the times staying around the same. My suspicions are that this is due to I/O performance: my local system is an M1 MBP, which almost certainly has much faster disk speeds than our DigitalOcean block volumes. Split reflists include a side effect of requiring more random accesses from reading all the buckets by their keys, so if your random I/O performance is slower, it might cancel out the benefits. That being said, even in that case, the memory usage and database size advantages still persist. Signed-off-by: Ryan Gonzalez <ryan.gonzalez@collabora.com>
1 parent 3c64f86 commit f71dbf3

70 files changed

Lines changed: 1962 additions & 669 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

api/api.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ func maybeRunTaskInBackground(c *gin.Context, name string, resources []string, p
189189

190190
// Common piece of code to show list of packages,
191191
// with searching & details if requested
192-
func showPackages(c *gin.Context, reflist *deb.PackageRefList, collectionFactory *deb.CollectionFactory) {
192+
func showPackages(c *gin.Context, reflist deb.AnyRefList, collectionFactory *deb.CollectionFactory) {
193193
result := []*deb.Package{}
194194

195195
list, err := deb.NewPackageListFromRefList(reflist, collectionFactory.PackageCollection(), nil)

api/db.go

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sort"
66

77
"github.com/aptly-dev/aptly/aptly"
8+
"github.com/aptly-dev/aptly/database"
89
"github.com/aptly-dev/aptly/deb"
910
"github.com/aptly-dev/aptly/task"
1011
"github.com/aptly-dev/aptly/utils"
@@ -20,18 +21,22 @@ func apiDbCleanup(c *gin.Context) {
2021

2122
collectionFactory := context.NewCollectionFactory()
2223

23-
// collect information about referenced packages...
24-
existingPackageRefs := deb.NewPackageRefList()
24+
// collect information about referenced packages and their reflist buckets...
25+
existingPackageRefs := deb.NewSplitRefList()
26+
existingBuckets := deb.NewRefListDigestSet()
27+
28+
reflistMigration := collectionFactory.RefListCollection().NewMigration()
2529

2630
out.Printf("Loading mirrors, local repos, snapshots and published repos...")
2731
err = collectionFactory.RemoteRepoCollection().ForEach(func(repo *deb.RemoteRepo) error {
28-
e := collectionFactory.RemoteRepoCollection().LoadComplete(repo)
29-
if e != nil {
32+
sl := deb.NewSplitRefList()
33+
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, repo.RefKey(), reflistMigration)
34+
if e != nil && e != database.ErrNotFound {
3035
return e
3136
}
32-
if repo.RefList() != nil {
33-
existingPackageRefs = existingPackageRefs.Merge(repo.RefList(), false, true)
34-
}
37+
38+
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
39+
existingBuckets.AddAllInRefList(sl)
3540

3641
return nil
3742
})
@@ -40,14 +45,14 @@ func apiDbCleanup(c *gin.Context) {
4045
}
4146

4247
err = collectionFactory.LocalRepoCollection().ForEach(func(repo *deb.LocalRepo) error {
43-
e := collectionFactory.LocalRepoCollection().LoadComplete(repo)
44-
if e != nil {
48+
sl := deb.NewSplitRefList()
49+
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, repo.RefKey(), reflistMigration)
50+
if e != nil && e != database.ErrNotFound {
4551
return e
4652
}
4753

48-
if repo.RefList() != nil {
49-
existingPackageRefs = existingPackageRefs.Merge(repo.RefList(), false, true)
50-
}
54+
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
55+
existingBuckets.AddAllInRefList(sl)
5156

5257
return nil
5358
})
@@ -56,12 +61,14 @@ func apiDbCleanup(c *gin.Context) {
5661
}
5762

5863
err = collectionFactory.SnapshotCollection().ForEach(func(snapshot *deb.Snapshot) error {
59-
e := collectionFactory.SnapshotCollection().LoadComplete(snapshot)
64+
sl := deb.NewSplitRefList()
65+
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, snapshot.RefKey(), reflistMigration)
6066
if e != nil {
6167
return e
6268
}
6369

64-
existingPackageRefs = existingPackageRefs.Merge(snapshot.RefList(), false, true)
70+
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
71+
existingBuckets.AddAllInRefList(sl)
6572

6673
return nil
6774
})
@@ -73,25 +80,37 @@ func apiDbCleanup(c *gin.Context) {
7380
if published.SourceKind != deb.SourceLocalRepo {
7481
return nil
7582
}
76-
e := collectionFactory.PublishedRepoCollection().LoadComplete(published, collectionFactory)
77-
if e != nil {
78-
return e
79-
}
8083

8184
for _, component := range published.Components() {
82-
existingPackageRefs = existingPackageRefs.Merge(published.RefList(component), false, true)
85+
sl := deb.NewSplitRefList()
86+
e := collectionFactory.RefListCollection().LoadCompleteAndMigrate(sl, published.RefKey(component), reflistMigration)
87+
if e != nil {
88+
return e
89+
}
90+
91+
existingPackageRefs = existingPackageRefs.Merge(sl, false, true)
92+
existingBuckets.AddAllInRefList(sl)
8393
}
8494
return nil
8595
})
8696
if err != nil {
8797
return nil, err
8898
}
8999

100+
err = reflistMigration.Flush()
101+
if err != nil {
102+
return nil, err
103+
}
104+
if stats := reflistMigration.Stats(); stats.Reflists > 0 {
105+
out.Printf("Split %d reflist(s) into %d bucket(s) (%d segment(s))",
106+
stats.Reflists, stats.Buckets, stats.Segments)
107+
}
108+
90109
// ... and compare it to the list of all packages
91110
out.Printf("Loading list of all packages...")
92111
allPackageRefs := collectionFactory.PackageCollection().AllPackageRefs()
93112

94-
toDelete := allPackageRefs.Subtract(existingPackageRefs)
113+
toDelete := allPackageRefs.Subtract(existingPackageRefs.Flatten())
95114

96115
// delete packages that are no longer referenced
97116
out.Printf("Deleting unreferenced packages (%d)...", toDelete.Len())
@@ -112,6 +131,28 @@ func apiDbCleanup(c *gin.Context) {
112131
}
113132
}
114133

134+
bucketsToDelete, err := collectionFactory.RefListCollection().AllBucketDigests()
135+
if err != nil {
136+
return nil, err
137+
}
138+
139+
bucketsToDelete.RemoveAll(existingBuckets)
140+
141+
out.Printf("Deleting unreferenced reflist buckets (%d)...", bucketsToDelete.Len())
142+
if bucketsToDelete.Len() > 0 {
143+
batch := db.CreateBatch()
144+
err := bucketsToDelete.ForEach(func(digest []byte) error {
145+
return collectionFactory.RefListCollection().UnsafeDropBucket(digest, batch)
146+
})
147+
if err != nil {
148+
return nil, err
149+
}
150+
151+
if err := batch.Write(); err != nil {
152+
return nil, err
153+
}
154+
}
155+
115156
// now, build a list of files that should be present in Repository (package pool)
116157
out.Printf("Building list of files referenced by packages...")
117158
referencedFiles := make([]string, 0, existingPackageRefs.Len())

api/metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func countPackagesByRepos() {
102102

103103
components := repo.Components()
104104
for _, c := range components {
105-
count := float64(len(repo.RefList(c).Refs))
105+
count := float64(repo.RefList(c).Len())
106106
apiReposPackageCountGauge.WithLabelValues(fmt.Sprintf("%s", (repo.SourceNames())), repo.Distribution, c).Set(count)
107107
}
108108

api/mirror.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func apiMirrorsCreate(c *gin.Context) {
124124
return
125125
}
126126

127-
err = collection.Add(repo)
127+
err = collection.Add(repo, collectionFactory.RefListCollection())
128128
if err != nil {
129129
AbortWithJSONError(c, 500, fmt.Errorf("unable to add mirror: %s", err))
130130
return
@@ -184,7 +184,7 @@ func apiMirrorsShow(c *gin.Context) {
184184
return
185185
}
186186

187-
err = collection.LoadComplete(repo)
187+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
188188
if err != nil {
189189
AbortWithJSONError(c, 500, fmt.Errorf("unable to show: %s", err))
190190
}
@@ -204,7 +204,7 @@ func apiMirrorsPackages(c *gin.Context) {
204204
return
205205
}
206206

207-
err = collection.LoadComplete(repo)
207+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
208208
if err != nil {
209209
AbortWithJSONError(c, 500, fmt.Errorf("unable to show: %s", err))
210210
}
@@ -402,12 +402,12 @@ func apiMirrorsUpdate(c *gin.Context) {
402402
e := context.ReOpenDatabase()
403403
if e == nil {
404404
remote.MarkAsIdle()
405-
collection.Update(remote)
405+
collection.Update(remote, collectionFactory.RefListCollection())
406406
}
407407
}()
408408

409409
remote.MarkAsUpdating()
410-
err = collection.Update(remote)
410+
err = collection.Update(remote, collectionFactory.RefListCollection())
411411
if err != nil {
412412
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
413413
}
@@ -565,7 +565,7 @@ func apiMirrorsUpdate(c *gin.Context) {
565565

566566
log.Info().Msgf("%s: Finalizing download...", b.Name)
567567
remote.FinalizeDownload(collectionFactory, out)
568-
err = collectionFactory.RemoteRepoCollection().Update(remote)
568+
err = collectionFactory.RemoteRepoCollection().Update(remote, collectionFactory.RefListCollection())
569569
if err != nil {
570570
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
571571
}

api/publish.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
141141
}
142142

143143
resources = append(resources, string(snapshot.ResourceKey()))
144-
err = snapshotCollection.LoadComplete(snapshot)
144+
err = snapshotCollection.LoadComplete(snapshot, collectionFactory.RefListCollection())
145145
if err != nil {
146146
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
147147
return
@@ -165,7 +165,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
165165
}
166166

167167
resources = append(resources, string(localRepo.Key()))
168-
err = localCollection.LoadComplete(localRepo)
168+
err = localCollection.LoadComplete(localRepo, collectionFactory.RefListCollection())
169169
if err != nil {
170170
AbortWithJSONError(c, 500, fmt.Errorf("unable to publish: %s", err))
171171
}
@@ -232,7 +232,7 @@ func apiPublishRepoOrSnapshot(c *gin.Context) {
232232
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to publish: %s", err)
233233
}
234234

235-
err = collection.Add(published)
235+
err = collection.Add(published, collectionFactory.RefListCollection())
236236
if err != nil {
237237
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
238238
}
@@ -313,7 +313,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
313313
return
314314
}
315315

316-
err2 = snapshotCollection.LoadComplete(snapshot)
316+
err2 = snapshotCollection.LoadComplete(snapshot, collectionFactory.RefListCollection())
317317
if err2 != nil {
318318
AbortWithJSONError(c, 500, err2)
319319
return
@@ -348,7 +348,7 @@ func apiPublishUpdateSwitch(c *gin.Context) {
348348
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to update: %s", err)
349349
}
350350

351-
err = collection.Update(published)
351+
err = collection.Update(published, collectionFactory.RefListCollection())
352352
if err != nil {
353353
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save to DB: %s", err)
354354
}

api/repos.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func apiReposCreate(c *gin.Context) {
8282

8383
collectionFactory := context.NewCollectionFactory()
8484
collection := collectionFactory.LocalRepoCollection()
85-
err := collection.Add(repo)
85+
err := collection.Add(repo, collectionFactory.RefListCollection())
8686
if err != nil {
8787
AbortWithJSONError(c, 400, err)
8888
return
@@ -132,7 +132,7 @@ func apiReposEdit(c *gin.Context) {
132132
repo.DefaultComponent = *b.DefaultComponent
133133
}
134134

135-
err = collection.Update(repo)
135+
err = collection.Update(repo, collectionFactory.RefListCollection())
136136
if err != nil {
137137
AbortWithJSONError(c, 500, err)
138138
return
@@ -201,7 +201,7 @@ func apiReposPackagesShow(c *gin.Context) {
201201
return
202202
}
203203

204-
err = collection.LoadComplete(repo)
204+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
205205
if err != nil {
206206
AbortWithJSONError(c, 500, err)
207207
return
@@ -229,7 +229,7 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
229229
return
230230
}
231231

232-
err = collection.LoadComplete(repo)
232+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
233233
if err != nil {
234234
AbortWithJSONError(c, 500, err)
235235
return
@@ -261,9 +261,9 @@ func apiReposPackagesAddDelete(c *gin.Context, taskNamePrefix string, cb func(li
261261
}
262262
}
263263

264-
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
264+
repo.UpdateRefList(deb.NewSplitRefListFromPackageList(list))
265265

266-
err = collectionFactory.LocalRepoCollection().Update(repo)
266+
err = collectionFactory.LocalRepoCollection().Update(repo, collectionFactory.RefListCollection())
267267
if err != nil {
268268
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
269269
}
@@ -320,7 +320,7 @@ func apiReposPackageFromDir(c *gin.Context) {
320320
return
321321
}
322322

323-
err = collection.LoadComplete(repo)
323+
err = collection.LoadComplete(repo, collectionFactory.RefListCollection())
324324
if err != nil {
325325
AbortWithJSONError(c, 500, err)
326326
return
@@ -369,9 +369,9 @@ func apiReposPackageFromDir(c *gin.Context) {
369369
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to import package files: %s", err)
370370
}
371371

372-
repo.UpdateRefList(deb.NewPackageRefListFromPackageList(list))
372+
repo.UpdateRefList(deb.NewSplitRefListFromPackageList(list))
373373

374-
err = collectionFactory.LocalRepoCollection().Update(repo)
374+
err = collectionFactory.LocalRepoCollection().Update(repo, collectionFactory.RefListCollection())
375375
if err != nil {
376376
return &task.ProcessReturnValue{Code: http.StatusInternalServerError, Value: nil}, fmt.Errorf("unable to save: %s", err)
377377
}
@@ -489,7 +489,7 @@ func apiReposIncludePackageFromDir(c *gin.Context) {
489489
_, failedFiles2, err = deb.ImportChangesFiles(
490490
changesFiles, reporter, acceptUnsigned, ignoreSignature, forceReplace, noRemoveFiles, verifier,
491491
repoTemplate, context.Progress(), collectionFactory.LocalRepoCollection(), collectionFactory.PackageCollection(),
492-
context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
492+
collectionFactory.RefListCollection(), context.PackagePool(), collectionFactory.ChecksumCollection, nil, query.Parse)
493493
failedFiles = append(failedFiles, failedFiles2...)
494494

495495
if err != nil {

0 commit comments

Comments
 (0)