Skip to content

Commit d94e36e

Browse files
committed
s3: fix pathCache race condition
Make sure pathCache is properly locked for concurrent access. Add RWMutex to the PublishedStorage struct: - Cache initialization Read-lock to test for nil, then write-lock with a second nil check before populating - Cache reads RLock/RUnlock, allowing concurrent readers - Cache writes / deletes Lock/Unlock
1 parent 947887e commit d94e36e

2 files changed

Lines changed: 33 additions & 9 deletions

File tree

api/s3.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ import (
1414
// @Router /api/s3 [get]
1515
func apiS3List(c *gin.Context) {
1616
keys := []string{}
17-
for k := range context.Config().S3PublishRoots {
17+
s3Roots := context.Config().S3PublishRoots
18+
for k := range s3Roots {
1819
keys = append(keys, k)
1920
}
2021
c.JSON(200, keys)

s3/public.go

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"os"
88
"path/filepath"
99
"strings"
10+
"sync"
1011

1112
"github.com/aptly-dev/aptly/aptly"
1213
"github.com/aptly-dev/aptly/utils"
@@ -51,6 +52,7 @@ type PublishedStorage struct {
5152
plusWorkaround bool
5253
disableMultiDel bool
5354
pathCache map[string]string
55+
pathCacheMutex sync.RWMutex
5456

5557
// True if the bucket encrypts objects by default.
5658
encryptByDefault bool
@@ -251,7 +253,9 @@ func (storage *PublishedStorage) Remove(path string) error {
251253
_ = storage.Remove(strings.Replace(path, "+", " ", -1))
252254
}
253255

256+
storage.pathCacheMutex.Lock()
254257
delete(storage.pathCache, path)
258+
storage.pathCacheMutex.Unlock()
255259

256260
return nil
257261
}
@@ -280,7 +284,9 @@ func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error
280284
if err != nil {
281285
return fmt.Errorf("error deleting path %s from %s: %s", filelist[i], storage, err)
282286
}
287+
storage.pathCacheMutex.Lock()
283288
delete(storage.pathCache, filepath.Join(path, filelist[i]))
289+
storage.pathCacheMutex.Unlock()
284290
}
285291
} else {
286292
numParts := (len(filelist) + page - 1) / page
@@ -313,9 +319,11 @@ func (storage *PublishedStorage) RemoveDirs(path string, _ aptly.Progress) error
313319
if err != nil {
314320
return fmt.Errorf("error deleting multiple paths from %s: %s", storage, err)
315321
}
322+
storage.pathCacheMutex.Lock()
316323
for i := range part {
317324
delete(storage.pathCache, filepath.Join(path, part[i]))
318325
}
326+
storage.pathCacheMutex.Unlock()
319327
}
320328
}
321329

@@ -337,20 +345,31 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
337345
relPath := filepath.Join(publishedDirectory, fileName)
338346
poolPath := filepath.Join(storage.prefix, relPath)
339347

340-
if storage.pathCache == nil {
341-
paths, md5s, err := storage.internalFilelist(filepath.Join(publishedPrefix, "pool"), true)
342-
if err != nil {
343-
return errors.Wrap(err, "error caching paths under prefix")
344-
}
348+
storage.pathCacheMutex.RLock()
349+
cacheNil := storage.pathCache == nil
350+
storage.pathCacheMutex.RUnlock()
351+
352+
if cacheNil {
353+
storage.pathCacheMutex.Lock()
354+
if storage.pathCache == nil {
355+
paths, md5s, err := storage.internalFilelist(filepath.Join(publishedPrefix, "pool"), true)
356+
if err != nil {
357+
storage.pathCacheMutex.Unlock()
358+
return errors.Wrap(err, "error caching paths under prefix")
359+
}
345360

346-
storage.pathCache = make(map[string]string, len(paths))
361+
storage.pathCache = make(map[string]string, len(paths))
347362

348-
for i := range paths {
349-
storage.pathCache[filepath.Join(publishedPrefix, "pool", paths[i])] = md5s[i]
363+
for i := range paths {
364+
storage.pathCache[filepath.Join(publishedPrefix, "pool", paths[i])] = md5s[i]
365+
}
350366
}
367+
storage.pathCacheMutex.Unlock()
351368
}
352369

370+
storage.pathCacheMutex.RLock()
353371
destinationMD5, exists := storage.pathCache[relPath]
372+
storage.pathCacheMutex.RUnlock()
354373
sourceMD5 := sourceChecksums.MD5
355374

356375
if exists {
@@ -367,7 +386,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
367386
err = errors.Wrap(err, fmt.Sprintf("error verifying MD5 for %s: %s", storage, poolPath))
368387
return err
369388
}
389+
storage.pathCacheMutex.Lock()
370390
storage.pathCache[relPath] = destinationMD5
391+
storage.pathCacheMutex.Unlock()
371392
}
372393

373394
if destinationMD5 == sourceMD5 {
@@ -388,7 +409,9 @@ func (storage *PublishedStorage) LinkFromPool(publishedPrefix, publishedRelPath,
388409
log.Debug().Msgf("S3: LinkFromPool '%s'", relPath)
389410
err = storage.putFile(relPath, source, sourceMD5)
390411
if err == nil {
412+
storage.pathCacheMutex.Lock()
391413
storage.pathCache[relPath] = sourceMD5
414+
storage.pathCacheMutex.Unlock()
392415
} else {
393416
err = errors.Wrap(err, fmt.Sprintf("error uploading %s to %s: %s", sourcePath, storage, poolPath))
394417
}

0 commit comments

Comments
 (0)