-
Notifications
You must be signed in to change notification settings - Fork 12
feat: add WsAllowOutOfOrder config #68
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: v2-preview
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,59 @@ | ||
| package streams | ||
|
|
||
| const seenBufferSize = 32 | ||
|
|
||
| type Verdict int | ||
|
|
||
| const ( | ||
| Accept Verdict = iota | ||
| Duplicate | ||
| OutOfOrder | ||
| ) | ||
|
|
||
| type feedState struct { | ||
| watermark int64 | ||
| ring [seenBufferSize]int64 | ||
| set map[int64]struct{} | ||
| cursor int | ||
| count int | ||
| } | ||
|
|
||
| type FeedDeduplicator struct { | ||
| feeds map[string]*feedState | ||
| } | ||
|
|
||
| func NewFeedDeduplicator() *FeedDeduplicator { | ||
| return &FeedDeduplicator{feeds: make(map[string]*feedState)} | ||
| } | ||
|
|
||
| func (d *FeedDeduplicator) Check(feedID string, ts int64) Verdict { | ||
| fs := d.feeds[feedID] | ||
| if fs == nil { | ||
| fs = &feedState{set: make(map[int64]struct{}, seenBufferSize)} | ||
| d.feeds[feedID] = fs | ||
| } | ||
|
Comment on lines
+30
to
+34
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would mutex-lock this change. Actually, let's
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This might be unnecessary because of this section: but it will still be cleaner and will avoid potential bugs if some decides to optimise that part by removing the locking. |
||
|
|
||
| if _, dup := fs.set[ts]; dup { | ||
| return Duplicate | ||
| } | ||
|
|
||
| if fs.count == seenBufferSize { | ||
| evict := fs.ring[fs.cursor] | ||
| delete(fs.set, evict) | ||
| } else { | ||
| fs.count++ | ||
| } | ||
| fs.ring[fs.cursor] = ts | ||
| fs.set[ts] = struct{}{} | ||
| fs.cursor = (fs.cursor + 1) % seenBufferSize | ||
|
|
||
| isOutOfOrder := fs.watermark > 0 && ts < fs.watermark | ||
| if ts > fs.watermark { | ||
| fs.watermark = ts | ||
| } | ||
|
Comment on lines
+51
to
+53
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Let's move this block right under the |
||
|
|
||
| if isOutOfOrder { | ||
| return OutOfOrder | ||
| } | ||
| return Accept | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| package streams | ||
|
|
||
| import "testing" | ||
|
|
||
| func TestFeedDeduplicator_Accept(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| if v := d.Check("feed-1", 100); v != Accept { | ||
| t.Fatalf("expected Accept, got %d", v) | ||
| } | ||
| } | ||
|
|
||
| func TestFeedDeduplicator_Duplicate(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| d.Check("feed-1", 100) | ||
| if v := d.Check("feed-1", 100); v != Duplicate { | ||
| t.Fatalf("expected Duplicate, got %d", v) | ||
| } | ||
| } | ||
|
|
||
| func TestFeedDeduplicator_OutOfOrder(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| d.Check("feed-1", 200) | ||
| if v := d.Check("feed-1", 100); v != OutOfOrder { | ||
| t.Fatalf("expected OutOfOrder, got %d", v) | ||
| } | ||
| } | ||
|
|
||
| func TestFeedDeduplicator_OutOfOrderNotDuplicate(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| d.Check("feed-1", 200) | ||
| v := d.Check("feed-1", 100) | ||
| if v != OutOfOrder { | ||
| t.Fatalf("expected OutOfOrder for first OOO delivery, got %d", v) | ||
| } | ||
| if v := d.Check("feed-1", 100); v != Duplicate { | ||
| t.Fatalf("expected Duplicate for second OOO delivery, got %d", v) | ||
| } | ||
| } | ||
|
|
||
| func TestFeedDeduplicator_FIFOEviction(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| for i := int64(1); i <= seenBufferSize; i++ { | ||
| d.Check("feed-1", i) | ||
| } | ||
| d.Check("feed-1", 33) | ||
| // ts=2 (second inserted) should still be in the buffer | ||
| if v := d.Check("feed-1", 2); v != Duplicate { | ||
| t.Fatalf("expected ts=2 still in buffer, got %d", v) | ||
| } | ||
| // ts=1 (first inserted) was evicted by ts=33 | ||
| if v := d.Check("feed-1", 1); v == Duplicate { | ||
| t.Fatal("expected ts=1 to be evicted (FIFO), but got Duplicate") | ||
| } | ||
| } | ||
|
|
||
| func TestFeedDeduplicator_FIFOEvictsOldestInsertedNotSmallest(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| // Insert out of order: 100, 1, 2, 3, ..., 31 (total 32 entries) | ||
| d.Check("feed-1", 100) | ||
| for i := int64(1); i <= seenBufferSize-1; i++ { | ||
| d.Check("feed-1", i) | ||
| } | ||
| // Buffer is full. ts=100 was inserted first (oldest by insertion). | ||
| // Adding ts=999 should evict ts=100, NOT ts=1 (the smallest value). | ||
| d.Check("feed-1", 999) | ||
| // ts=1 should still be present (smallest value, but NOT oldest inserted) | ||
| if v := d.Check("feed-1", 1); v != Duplicate { | ||
| t.Fatalf("expected ts=1 (smallest value, but not oldest inserted) to remain, got %d", v) | ||
| } | ||
| // ts=100 should have been evicted (oldest inserted) | ||
| if v := d.Check("feed-1", 100); v == Duplicate { | ||
| t.Fatal("expected ts=100 (oldest inserted) to be evicted, but got Duplicate") | ||
| } | ||
| } | ||
|
|
||
| func TestFeedDeduplicator_IndependentFeeds(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| d.Check("feed-a", 100) | ||
| d.Check("feed-b", 100) | ||
|
|
||
| if v := d.Check("feed-a", 100); v != Duplicate { | ||
| t.Fatalf("expected Duplicate for feed-a, got %d", v) | ||
| } | ||
| if v := d.Check("feed-b", 100); v != Duplicate { | ||
| t.Fatalf("expected Duplicate for feed-b, got %d", v) | ||
| } | ||
| // Different feed, same ts is not a duplicate | ||
| if v := d.Check("feed-c", 100); v != Accept { | ||
| t.Fatalf("expected Accept for new feed-c, got %d", v) | ||
| } | ||
| } | ||
|
|
||
| func TestFeedDeduplicator_WatermarkZeroNotOutOfOrder(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| // First report at ts=0 should be Accept, not OutOfOrder | ||
| if v := d.Check("feed-1", 0); v != Accept { | ||
| t.Fatalf("expected Accept for first report at ts=0, got %d", v) | ||
| } | ||
| } | ||
|
|
||
| func TestFeedDeduplicator_HADuplicateAfterWatermarkAdvance(t *testing.T) { | ||
| d := NewFeedDeduplicator() | ||
| d.Check("feed-1", 100) // Accept | ||
| d.Check("feed-1", 200) // Accept, watermark -> 200 | ||
| // HA duplicate of ts=100 arrives from second connection | ||
| if v := d.Check("feed-1", 100); v != Duplicate { | ||
| t.Fatalf("expected Duplicate for HA retransmit, got %d", v) | ||
| } | ||
| } |
Uh oh!
There was an error while loading. Please reload this page.