Skip to content

Commit 1a18845

Browse files
committed
integrity verifier: support sampling subset like 1/4 of all files on a single scan
closes #266
1 parent 8f2a292 commit 1a18845

6 files changed

Lines changed: 131 additions & 6 deletions

File tree

pkg/stoserver/commandhandlers.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,26 @@ func (c *cHandlers) VolumeMigrateData(cmd *stoservertypes.VolumeMigrateData, ctx
412412
func (c *cHandlers) VolumeVerifyIntegrity(cmd *stoservertypes.VolumeVerifyIntegrity, ctx *command.Ctx) error {
413413
jobID := stoutils.NewIntegrityVerificationJobID()
414414

415+
sampleSpecification, err := func() (*string, error) {
416+
if samplingSpec := cmd.Sampling; samplingSpec != "" {
417+
// validate the sampling spec
418+
_, err := stointegrityverifier.CreateSampler(&samplingSpec)
419+
420+
return &samplingSpec, err
421+
} else {
422+
return nil, nil
423+
}
424+
}()
425+
if err != nil {
426+
return err
427+
}
428+
415429
if err := c.db.Update(func(tx *bbolt.Tx) error {
416430
job := &stotypes.IntegrityVerificationJob{
417-
ID: jobID,
418-
Started: ctx.Meta.Timestamp,
419-
VolumeID: cmd.Id,
431+
ID: jobID,
432+
Started: ctx.Meta.Timestamp,
433+
VolumeID: cmd.Id,
434+
SampleSpecification: sampleSpecification,
420435
}
421436

422437
return stodb.IntegrityVerificationJobRepository.Update(job, tx)
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package stointegrityverifier
2+
3+
// With sampling we can select only a subset of the blobs to visit.
4+
// https://en.wikipedia.org/wiki/Sampling_(statistics)
5+
6+
import (
7+
"encoding/binary"
8+
"fmt"
9+
"strconv"
10+
11+
"github.com/function61/varasto/pkg/stotypes"
12+
)
13+
14+
// answers whether we should visit a blob
15+
type batchSampler func(stotypes.BlobRef) bool
16+
17+
func CreateSampler(sampleSpecificationMaybe *string) (batchSampler, error) {
18+
if sampleSpecification := sampleSpecificationMaybe; sampleSpecification != nil {
19+
// bit string like `1111` to number (`15`)
20+
num, err := strconv.ParseUint(*sampleSpecification, 2, 32)
21+
if err != nil {
22+
return nil, fmt.Errorf("invalid sampling spec. expected binary string like 01; got '%s'", *sampleSpecification)
23+
}
24+
bitCount := len(*sampleSpecification)
25+
26+
return prefixSampler(uint32(num), uint8(bitCount)), nil
27+
} else {
28+
return func(_ stotypes.BlobRef) bool { return true }, nil
29+
}
30+
}
31+
32+
// only accepts blob refs that start with a specific bit pattern. that means that we can accept blob refs starting with:
33+
// - `0b0` => accepts 1/2 of refs
34+
// - `0b00` => accepts 1/4 of refs
35+
// - `0b000` => accepts 1/8 of refs
36+
// - and so on...
37+
//
38+
// the interesting property of this, as opposed to something like using random sampling for acceptance is that
39+
// this is deterministic based on blob ref and thus we can integrity verify first batch today and next back next week and
40+
// we are guaranteed that the next batch won't re-visit blobs from first batch.
41+
//
42+
// let's take the 1/4 acceptance as example. we have four batches: 1) `0b00` 2) `0b01` 3) `0b10` 4) `0b11`
43+
//
44+
// we could visit those four batches in four different weeks to guaranteed visit all blobs (except those added later to earlier batches' "partitions")
45+
func prefixSampler(value uint32, bitCount uint8) batchSampler {
46+
return func(blobRef stotypes.BlobRef) bool {
47+
// counter-intuitively: use big-endian encoding to *not* filter on the very first bits of the blob ref, because
48+
// the verifier is iterating the blobs ordered on the blob ref. we don't want us to consecutively ignore a large
49+
// portion of the scan but instead we want to accept blobs uniformly over time
50+
blobRefUint32 := binary.BigEndian.Uint32(blobRef)
51+
return blobRefUint32&bitmask(bitCount) == value
52+
}
53+
}
54+
55+
// `1` => `0b1`
56+
// `3` => `0b111`
57+
// `8` => `0b11111111`
58+
func bitmask(n uint8) uint32 {
59+
return (1 << n) - 1
60+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package stointegrityverifier
2+
3+
import (
4+
"testing"
5+
6+
"github.com/function61/gokit/assert"
7+
"github.com/function61/varasto/pkg/stotypes"
8+
)
9+
10+
func TestNoSampler(t *testing.T) {
11+
sample, err := CreateSampler(nil)
12+
assert.Ok(t, err)
13+
14+
assert.Assert(t, sample(stotypes.BlobRef{0x00, 0x00, 0x00, 0x00}) == true)
15+
assert.Assert(t, sample(stotypes.BlobRef{0xFF, 0xFF, 0xFF, 0xFF}) == true)
16+
}
17+
18+
func TestSampler(t *testing.T) {
19+
sampleSpec := "11"
20+
sample, err := CreateSampler(&sampleSpec)
21+
assert.Ok(t, err)
22+
23+
assert.Assert(t, sample(stotypes.BlobRef{0x00, 0x00, 0x00, 0x00}) == false)
24+
assert.Assert(t, sample(stotypes.BlobRef{0x00, 0x00, 0x00, 0b01}) == false)
25+
assert.Assert(t, sample(stotypes.BlobRef{0x00, 0x00, 0x00, 0b10}) == false)
26+
assert.Assert(t, sample(stotypes.BlobRef{0x00, 0x00, 0x00, 0b11}) == true)
27+
assert.Assert(t, sample(stotypes.BlobRef{0xFF, 0xFF, 0xFF, 0xFF}) == true)
28+
}
29+
30+
func TestInvalidSpec(t *testing.T) {
31+
sampleSpec := "wrong"
32+
_, err := CreateSampler(&sampleSpec)
33+
assert.EqualString(t, err.Error(), "invalid sampling spec. expected binary string like 01; got 'wrong'")
34+
}

pkg/stoserver/stointegrityverifier/controller.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,12 @@ func (c *Controller) resumeJobWorker(
197197
return nil
198198
}
199199

200+
// depending on the sampling function we may during this run visit 100 % of blobs, 25 % of blobs or so on
201+
sample, err := CreateSampler(job.SampleSpecification)
202+
if err != nil {
203+
return err
204+
}
205+
200206
batchLimit := 1000
201207

202208
for {
@@ -232,11 +238,18 @@ func (c *Controller) resumeJobWorker(
232238
}
233239
}
234240

241+
sizeOnDisk := uint64(blob.SizeOnDisk)
242+
235243
blobExistsOnVolumeToVerify := slices.Contains(blob.Volumes, job.VolumeID)
236244
if !blobExistsOnVolumeToVerify {
237245
continue
238246
}
239247

248+
if !sample(blob.Ref) {
249+
job.BytesSkipped += sizeOnDisk
250+
continue
251+
}
252+
240253
bytesScanned, err := c.diskAccess.Scrub(blob.Ref, job.VolumeID)
241254
if err != nil {
242255
descr := fmt.Sprintf("blob %s: %v\n", blob.Ref.AsHex(), err)
@@ -251,7 +264,7 @@ func (c *Controller) resumeJobWorker(
251264
}
252265
}
253266

254-
job.BytesScanned += uint64(blob.SizeOnDisk)
267+
job.BytesScanned += sizeOnDisk
255268
}
256269

257270
if len(blobBatch) < batchLimit { // fewer blobs than requested, so there will be no more

pkg/stoserver/stoservertypes/commands.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
"title": "Verify data integrity",
2424
"fields": [
2525
{ "key": "Id", "type": "integer", "hideIfDefaultValue": true },
26-
{ "key": "Start", "title": "Start immediately", "type": "checkbox" }
26+
{ "key": "Start", "title": "Start immediately", "type": "checkbox" },
27+
{ "key": "Sampling", "title": "Instead of verifying all content, verify a subset by sampling", "placeholder": "00", "optional": true }
2728
]
2829
},
2930
{

pkg/stotypes/dbtypes.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,11 @@ type IntegrityVerificationJob struct {
145145
ID string
146146
Started time.Time
147147
Completed time.Time
148-
VolumeID int `msgpack:"VolumeId"`
148+
SampleSpecification *string // sample only a subset of items. half: `0`. if you later want to process the second half: `1`. if you want to process quarter: `00`. then the remaining quarters would be (`01`, `10` and `11`). 1 % (can be approximated as 1/128) would be `0000000`.
149+
VolumeID int `msgpack:"VolumeId"`
149150
LastCompletedBlobRef BlobRef
150151
BytesScanned uint64
152+
BytesSkipped uint64 // if sampling is in place, we'll be skipping some blobs
151153
ErrorsFound int
152154
Report string
153155
}

0 commit comments

Comments
 (0)