diff --git a/README.md b/README.md index ee9d385..2706893 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ --- -The `queue` package provides thread-safe generic implementations in Go for the following data structures: `BlockingQueue`, `PriorityQueue`, `CircularQueue` and `Linked Queue`. +The `queue` package provides thread-safe generic implementations in Go for the following data structures: `BlockingQueue`, `PriorityQueue`, `CircularQueue`, `Linked Queue` and `DelayQueue`. A queue is a sequence of entities that is open at both ends where the elements are added (enqueued) to the tail (back) of the queue and removed (dequeued) from the head (front) of the queue. @@ -36,6 +36,7 @@ Benchmarks and Example tests can be found in this package. * [Priority Queue](#priority-queue) * [Circular Queue](#circular-queue) * [Linked Queue](#linked-queue) + * [Delay Queue](#delay-queue) * [Benchmarks](#benchmarks-) @@ -267,6 +268,50 @@ func main() { } ``` +### Delay Queue + +A `Delay` queue is a priority queue where each element becomes dequeuable at a deadline computed by a caller-supplied function at `Offer` time. `Get` returns `ErrNoElementsAvailable` until the head's deadline has passed; `GetWait` sleeps until it does. Useful for timers, retry scheduling, and TTL expiry. + +```go +package main + +import ( + "fmt" + "time" + + "github.com/adrianbrad/queue" +) + +type task struct { + id int + runAt time.Time +} + +func main() { + now := time.Now() + + delayQueue := queue.NewDelay( + []task{ + {id: 1, runAt: now.Add(20 * time.Millisecond)}, + {id: 2, runAt: now.Add(5 * time.Millisecond)}, + }, + func(t task) time.Time { return t.runAt }, + ) + + size := delayQueue.Size() + fmt.Println(size) // 2 + + // Non-blocking: not due yet. + if _, err := delayQueue.Get(); err != nil { + // err == queue.ErrNoElementsAvailable + } + + // Blocking: returns as soon as the head's deadline passes. + next := delayQueue.GetWait() + fmt.Printf("next: %d\n", next.id) // next: 2 +} +``` + ## Benchmarks Run locally with `go test -bench=. -benchmem -benchtime=3s -count=3`. Reported numbers are per-operation timings and allocations; absolute values vary by hardware, but the shape (zero-alloc reads everywhere, zero-alloc offer/get for Circular and Linked) should be stable. @@ -284,4 +329,7 @@ BenchmarkLinkedQueue/Offer 22.7 ns/op 16 B/op 1 allocs/o BenchmarkPriorityQueue/Peek 3.9 ns/op 0 B/op 0 allocs/op BenchmarkPriorityQueue/Get_Offer 18.1 ns/op 0 B/op 0 allocs/op BenchmarkPriorityQueue/Offer 17.1 ns/op 48 B/op 0 allocs/op +BenchmarkDelayQueue/Peek 4.1 ns/op 0 B/op 0 allocs/op +BenchmarkDelayQueue/Get_Offer 52.4 ns/op 0 B/op 0 allocs/op +BenchmarkDelayQueue/Offer 63.5 ns/op 315 B/op 0 allocs/op ``` diff --git a/delay.go b/delay.go new file mode 100644 index 0000000..49ed887 --- /dev/null +++ b/delay.go @@ -0,0 +1,353 @@ +package queue + +import ( + "encoding/json" + "sort" + "sync" + "time" +) + +// delayed pairs an element with its cached deadline. +type delayed[T any] struct { + elem T + deadline time.Time +} + +// delayHeap is a min-heap over delayed[T] keyed by deadline. +// +// The heap algorithm is implemented directly on the typed slice instead +// of via container/heap to avoid boxing delayed[T] into `any` on every +// push and pop — the dominant allocation source in the benchmark profile. +type delayHeap[T any] struct { + items []delayed[T] +} + +func (h *delayHeap[T]) len() int { return len(h.items) } + +func (h *delayHeap[T]) less(i, j int) bool { + return h.items[i].deadline.Before(h.items[j].deadline) +} + +func (h *delayHeap[T]) swap(i, j int) { + h.items[i], h.items[j] = h.items[j], h.items[i] +} + +// push appends x and restores the heap invariant. +func (h *delayHeap[T]) push(x delayed[T]) { + h.items = append(h.items, x) + h.up(len(h.items) - 1) +} + +// pop removes and returns the root (earliest-deadline) element. +// The queue must be non-empty. +func (h *delayHeap[T]) pop() delayed[T] { + n := len(h.items) - 1 + + h.swap(0, n) + h.down(0, n) + + x := h.items[n] + + var zero delayed[T] + + h.items[n] = zero + h.items = h.items[:n] + + return x +} + +// up sifts the item at index i toward the root. +func (h *delayHeap[T]) up(i int) { + for { + parent := (i - 1) / 2 //nolint:mnd // standard binary-heap parent index. + if parent == i || !h.less(i, parent) { + return + } + + h.swap(parent, i) + i = parent + } +} + +// down sifts the item at index i toward the leaves; n bounds the +// active heap region. +func (h *delayHeap[T]) down(i, n int) { + for { + left := 2*i + 1 //nolint:mnd // standard binary-heap left-child index. + if left >= n || left < 0 { + return + } + + j := left + if right := left + 1; right < n && h.less(right, left) { + j = right + } + + if !h.less(j, i) { + return + } + + h.swap(i, j) + i = j + } +} + +// Ensure Delay implements the Queue interface. +var _ Queue[any] = (*Delay[any])(nil) + +// Delay is a Queue implementation where each element becomes dequeuable +// at a deadline computed by a caller-supplied function at Offer time. +// +// Get returns ErrNoElementsAvailable if the queue is empty or the head's +// deadline has not yet passed; GetWait sleeps until the head becomes due. +type Delay[T comparable] struct { + deadlineFunc func(T) time.Time + items *delayHeap[T] + initial []T + capacity *int + + lock sync.Mutex + notEmpty *sync.Cond +} + +// NewDelay creates a Delay queue. deadlineFunc is called at Offer and +// Reset time to compute each element's deadline; the deadline is cached +// per element (not re-evaluated on every Get). +// Panics if deadlineFunc is nil or WithCapacity is negative. +func NewDelay[T comparable]( + elems []T, + deadlineFunc func(T) time.Time, + opts ...Option, +) *Delay[T] { + if deadlineFunc == nil { + panic("nil deadline func") + } + + options := options{capacity: nil} + + for _, o := range opts { + o.apply(&options) + } + + if options.capacity != nil && *options.capacity < 0 { + panic("negative capacity") + } + + effective := elems + if options.capacity != nil && *options.capacity < len(effective) { + effective = effective[:*options.capacity] + } + + initial := make([]T, len(effective)) + copy(initial, effective) + + dq := &Delay[T]{ + deadlineFunc: deadlineFunc, + items: &delayHeap[T]{items: make([]delayed[T], 0, len(effective))}, + initial: initial, + capacity: options.capacity, + } + + dq.notEmpty = sync.NewCond(&dq.lock) + + for _, e := range effective { + dq.items.push(delayed[T]{elem: e, deadline: deadlineFunc(e)}) + } + + return dq +} + +// ==================================Insertion================================= + +// Offer inserts elem with deadline = deadlineFunc(elem). +// Returns ErrQueueIsFull when constructed WithCapacity and already at limit. +func (dq *Delay[T]) Offer(elem T) error { + dq.lock.Lock() + defer dq.lock.Unlock() + + if dq.capacity != nil && dq.items.len() >= *dq.capacity { + return ErrQueueIsFull + } + + dq.items.push(delayed[T]{ + elem: elem, + deadline: dq.deadlineFunc(elem), + }) + + dq.notEmpty.Broadcast() + + return nil +} + +// Reset restores the queue to the elements provided at construction, +// recomputing their deadlines with the original deadlineFunc. +func (dq *Delay[T]) Reset() { + dq.lock.Lock() + defer dq.lock.Unlock() + + dq.items.items = dq.items.items[:0] + + for _, e := range dq.initial { + dq.items.push(delayed[T]{ + elem: e, + deadline: dq.deadlineFunc(e), + }) + } + + dq.notEmpty.Broadcast() +} + +// ===================================Removal================================== + +// Get returns the head if its deadline has passed, otherwise +// ErrNoElementsAvailable. Never blocks. +func (dq *Delay[T]) Get() (v T, _ error) { + dq.lock.Lock() + defer dq.lock.Unlock() + + if dq.items.len() == 0 { + return v, ErrNoElementsAvailable + } + + if time.Now().Before(dq.items.items[0].deadline) { + return v, ErrNoElementsAvailable + } + + elem := dq.items.pop().elem + + dq.notEmpty.Broadcast() + + return elem, nil +} + +// GetWait blocks until the head's deadline passes and returns that +// element. If the queue is empty, waits for an Offer. +func (dq *Delay[T]) GetWait() T { + dq.lock.Lock() + defer dq.lock.Unlock() + + for { + if dq.items.len() > 0 { + now := time.Now() + if !now.Before(dq.items.items[0].deadline) { + return dq.items.pop().elem + } + + // Head is not yet due: schedule a timer that Broadcasts when + // the deadline passes, then Wait. Any earlier Offer / Reset / + // Clear also Broadcasts, so we re-check on state changes too. + remaining := dq.items.items[0].deadline.Sub(now) + timer := time.AfterFunc(remaining, func() { + dq.lock.Lock() + dq.notEmpty.Broadcast() + dq.lock.Unlock() + }) + + dq.notEmpty.Wait() + timer.Stop() + + continue + } + + dq.notEmpty.Wait() + } +} + +// Clear removes and returns all elements in deadline order. +func (dq *Delay[T]) Clear() []T { + dq.lock.Lock() + defer dq.lock.Unlock() + + n := dq.items.len() + out := make([]T, n) + + for i := 0; i < n; i++ { + out[i] = dq.items.pop().elem + } + + dq.notEmpty.Broadcast() + + return out +} + +// Iterator returns a channel that receives all elements in deadline +// order. Elements are removed from the queue. +func (dq *Delay[T]) Iterator() <-chan T { + dq.lock.Lock() + defer dq.lock.Unlock() + + ch := make(chan T, dq.items.len()) + + for dq.items.len() > 0 { + ch <- dq.items.pop().elem + } + + close(ch) + + dq.notEmpty.Broadcast() + + return ch +} + +// =================================Examination================================ + +// Peek returns the head regardless of whether its deadline has passed. +// Returns ErrNoElementsAvailable if the queue is empty. +func (dq *Delay[T]) Peek() (v T, _ error) { + dq.lock.Lock() + defer dq.lock.Unlock() + + if dq.items.len() == 0 { + return v, ErrNoElementsAvailable + } + + return dq.items.items[0].elem, nil +} + +// Size returns the number of elements in the queue, due or not. +func (dq *Delay[T]) Size() int { + dq.lock.Lock() + defer dq.lock.Unlock() + + return dq.items.len() +} + +// IsEmpty returns true if the queue contains no elements. +func (dq *Delay[T]) IsEmpty() bool { + return dq.Size() == 0 +} + +// Contains reports whether the given element is in the queue. +func (dq *Delay[T]) Contains(elem T) bool { + dq.lock.Lock() + defer dq.lock.Unlock() + + for i := range dq.items.items { + if dq.items.items[i].elem == elem { + return true + } + } + + return false +} + +// MarshalJSON serializes the Delay queue to JSON in deadline order. +func (dq *Delay[T]) MarshalJSON() ([]byte, error) { + dq.lock.Lock() + + snapshot := make([]delayed[T], len(dq.items.items)) + copy(snapshot, dq.items.items) + + dq.lock.Unlock() + + sort.Slice(snapshot, func(i, j int) bool { + return snapshot[i].deadline.Before(snapshot[j].deadline) + }) + + out := make([]T, len(snapshot)) + for i := range snapshot { + out[i] = snapshot[i].elem + } + + return json.Marshal(out) +} diff --git a/delay_test.go b/delay_test.go new file mode 100644 index 0000000..c02b684 --- /dev/null +++ b/delay_test.go @@ -0,0 +1,551 @@ +package queue_test + +import ( + "bytes" + "encoding/json" + "errors" + "reflect" + "sort" + "sync" + "testing" + "time" + + "github.com/adrianbrad/queue" +) + +// Panic messages reused across test files. +const negativeCapacityPanic = "negative capacity" + +type delayed struct { + ID int `json:"id"` + At time.Time `json:"at"` +} + +func delayedDeadline(d delayed) time.Time { return d.At } + +func TestDelay(t *testing.T) { + t.Parallel() + + t.Run("NilDeadlineFunc", testDelayNilDeadlineFunc) + t.Run("NegativeCapacity", testDelayNegativeCapacity) + t.Run("Get", testDelayGet) + t.Run("GetWait", testDelayGetWait) + t.Run("Offer", testDelayOffer) + t.Run("OfferEarlierWakesWaiter", testDelayOfferEarlierWakesWaiter) + t.Run("Peek", testDelayPeek) + t.Run("Size", testDelaySize) + t.Run("Contains", testDelayContains) + t.Run("Clear", testDelayClear) + t.Run("Iterator", testDelayIterator) + t.Run("Reset", testDelayReset) + t.Run("MarshalJSON", testDelayMarshalJSON) + t.Run("CapacityLesserThanLenElems", testDelayCapacityLesserThanLenElems) +} + +func testDelayNilDeadlineFunc(t *testing.T) { + defer func() { + if p := recover(); p != "nil deadline func" { + t.Fatalf("expected panic 'nil deadline func', got %v", p) + } + }() + + queue.NewDelay[delayed](nil, nil) +} + +func testDelayNegativeCapacity(t *testing.T) { + t.Parallel() + + defer func() { + if p := recover(); p != negativeCapacityPanic { + t.Fatalf("expected panic %q, got %v", negativeCapacityPanic, p) + } + }() + + _ = queue.NewDelay(nil, delayedDeadline, queue.WithCapacity(-1)) +} + +func testDelayGet(t *testing.T) { + t.Parallel() + + t.Run("Due", func(t *testing.T) { + t.Parallel() + + past := time.Now().Add(-time.Minute) + delayQueue := queue.NewDelay([]delayed{{ID: 1, At: past}}, delayedDeadline) + + got, err := delayQueue.Get() + if err != nil { + t.Fatalf("get: %v", err) + } + + if got.ID != 1 { + t.Fatalf("got id=%d want 1", got.ID) + } + }) + + t.Run("NotDue", func(t *testing.T) { + t.Parallel() + + future := time.Now().Add(time.Hour) + delayQueue := queue.NewDelay([]delayed{{ID: 2, At: future}}, delayedDeadline) + + if _, err := delayQueue.Get(); !errors.Is(err, queue.ErrNoElementsAvailable) { + t.Fatalf("expected ErrNoElementsAvailable, got %v", err) + } + }) + + t.Run("Empty", func(t *testing.T) { + t.Parallel() + + delayQueue := queue.NewDelay[delayed](nil, delayedDeadline) + + if _, err := delayQueue.Get(); !errors.Is(err, queue.ErrNoElementsAvailable) { + t.Fatalf("expected ErrNoElementsAvailable, got %v", err) + } + }) +} + +func testDelayGetWait(t *testing.T) { + t.Parallel() + + t.Run("WakesAfterDeadline", func(t *testing.T) { + t.Parallel() + + due := time.Now().Add(20 * time.Millisecond) + delayQueue := queue.NewDelay([]delayed{{ID: 1, At: due}}, delayedDeadline) + + start := time.Now() + got := delayQueue.GetWait() + elapsed := time.Since(start) + + if got.ID != 1 { + t.Fatalf("got id=%d want 1", got.ID) + } + + if elapsed < 20*time.Millisecond { + t.Fatalf("GetWait returned before deadline: %s", elapsed) + } + }) + + t.Run("BlocksOnEmpty", func(t *testing.T) { + t.Parallel() + + delayQueue := queue.NewDelay[delayed](nil, delayedDeadline) + + done := make(chan delayed, 1) + + go func() { + done <- delayQueue.GetWait() + }() + + select { + case <-done: + t.Fatal("GetWait returned on empty queue") + case <-time.After(20 * time.Millisecond): + } + + if err := delayQueue.Offer(delayed{ID: 7, At: time.Now().Add(-time.Second)}); err != nil { + t.Fatalf("offer: %v", err) + } + + select { + case got := <-done: + if got.ID != 7 { + t.Fatalf("got id=%d want 7", got.ID) + } + case <-time.After(time.Second): + t.Fatal("GetWait did not wake after Offer") + } + }) +} + +func testDelayOffer(t *testing.T) { + t.Parallel() + + t.Run("Success", func(t *testing.T) { + t.Parallel() + + delayQueue := queue.NewDelay[delayed](nil, delayedDeadline) + + if err := delayQueue.Offer(delayed{ID: 1, At: time.Now()}); err != nil { + t.Fatalf("offer: %v", err) + } + + if delayQueue.Size() != 1 { + t.Fatalf("size = %d want 1", delayQueue.Size()) + } + }) + + t.Run("ErrQueueIsFull", func(t *testing.T) { + t.Parallel() + + delayQueue := queue.NewDelay( + []delayed{{ID: 1, At: time.Now()}}, + delayedDeadline, + queue.WithCapacity(1), + ) + + err := delayQueue.Offer(delayed{ID: 2, At: time.Now()}) + if !errors.Is(err, queue.ErrQueueIsFull) { + t.Fatalf("expected ErrQueueIsFull, got %v", err) + } + }) +} + +func testDelayOfferEarlierWakesWaiter(t *testing.T) { + t.Parallel() + + // Head is scheduled far in the future; a new Offer with an earlier + // deadline should cause the waiter to wake and take the new head. + far := time.Now().Add(time.Hour) + delayQueue := queue.NewDelay([]delayed{{ID: 1, At: far}}, delayedDeadline) + + result := make(chan delayed, 1) + + go func() { + result <- delayQueue.GetWait() + }() + + // Let the waiter enter its timer-wait. + time.Sleep(20 * time.Millisecond) + + soon := time.Now().Add(10 * time.Millisecond) + if err := delayQueue.Offer(delayed{ID: 2, At: soon}); err != nil { + t.Fatalf("offer: %v", err) + } + + select { + case got := <-result: + if got.ID != 2 { + t.Fatalf("got id=%d want 2 (the earlier-deadline element)", got.ID) + } + case <-time.After(time.Second): + t.Fatal("waiter did not wake for earlier-deadline Offer") + } +} + +func testDelayPeek(t *testing.T) { + t.Parallel() + + t.Run("IgnoresDeadline", func(t *testing.T) { + t.Parallel() + + future := time.Now().Add(time.Hour) + delayQueue := queue.NewDelay([]delayed{{ID: 7, At: future}}, delayedDeadline) + + got, err := delayQueue.Peek() + if err != nil { + t.Fatalf("peek: %v", err) + } + + if got.ID != 7 { + t.Fatalf("got id=%d want 7", got.ID) + } + + if delayQueue.Size() != 1 { + t.Fatal("Peek consumed an element") + } + }) + + t.Run("Empty", func(t *testing.T) { + t.Parallel() + + delayQueue := queue.NewDelay[delayed](nil, delayedDeadline) + + if _, err := delayQueue.Peek(); !errors.Is(err, queue.ErrNoElementsAvailable) { + t.Fatalf("expected ErrNoElementsAvailable, got %v", err) + } + }) +} + +func testDelaySize(t *testing.T) { + t.Parallel() + + delayQueue := queue.NewDelay[delayed](nil, delayedDeadline) + + if !delayQueue.IsEmpty() { + t.Fatal("empty queue is not empty") + } + + if err := delayQueue.Offer(delayed{ID: 1, At: time.Now()}); err != nil { + t.Fatalf("offer: %v", err) + } + + if delayQueue.IsEmpty() { + t.Fatal("non-empty queue reports empty") + } + + if delayQueue.Size() != 1 { + t.Fatalf("size = %d want 1", delayQueue.Size()) + } +} + +func testDelayContains(t *testing.T) { + t.Parallel() + + d := delayed{ID: 42, At: time.Now().Add(time.Minute)} + delayQueue := queue.NewDelay([]delayed{d}, delayedDeadline) + + if !delayQueue.Contains(d) { + t.Fatal("expected queue to contain d") + } + + if delayQueue.Contains(delayed{ID: 99, At: time.Now()}) { + t.Fatal("expected queue to not contain a stranger") + } +} + +func testDelayClear(t *testing.T) { + t.Parallel() + + now := time.Now() + delayQueue := queue.NewDelay( + []delayed{ + {ID: 2, At: now.Add(20 * time.Millisecond)}, + {ID: 1, At: now.Add(10 * time.Millisecond)}, + {ID: 3, At: now.Add(30 * time.Millisecond)}, + }, + delayedDeadline, + ) + + cleared := delayQueue.Clear() + ids := make([]int, len(cleared)) + + for i, c := range cleared { + ids[i] = c.ID + } + + expected := []int{1, 2, 3} + if !reflect.DeepEqual(expected, ids) { + t.Fatalf("expected %v got %v", expected, ids) + } + + if !delayQueue.IsEmpty() { + t.Fatal("Clear did not empty the queue") + } +} + +func testDelayIterator(t *testing.T) { + t.Parallel() + + now := time.Now() + delayQueue := queue.NewDelay( + []delayed{ + {ID: 3, At: now.Add(30 * time.Millisecond)}, + {ID: 1, At: now.Add(10 * time.Millisecond)}, + {ID: 2, At: now.Add(20 * time.Millisecond)}, + }, + delayedDeadline, + ) + + iterCh := delayQueue.Iterator() + + if !delayQueue.IsEmpty() { + t.Fatal("Iterator did not drain the queue") + } + + var ids []int + for d := range iterCh { + ids = append(ids, d.ID) + } + + expected := []int{1, 2, 3} + if !reflect.DeepEqual(expected, ids) { + t.Fatalf("expected %v got %v", expected, ids) + } +} + +func testDelayReset(t *testing.T) { + t.Parallel() + + base := time.Now() + initial := []delayed{ + {ID: 1, At: base.Add(time.Minute)}, + {ID: 2, At: base.Add(2 * time.Minute)}, + } + + delayQueue := queue.NewDelay(initial, delayedDeadline) + + if err := delayQueue.Offer(delayed{ID: 99, At: base.Add(time.Second)}); err != nil { + t.Fatalf("offer: %v", err) + } + + delayQueue.Reset() + + if delayQueue.Size() != len(initial) { + t.Fatalf("size after Reset = %d want %d", delayQueue.Size(), len(initial)) + } + + if delayQueue.Contains(delayed{ID: 99, At: base.Add(time.Second)}) { + t.Fatal("Reset did not drop Offered element") + } +} + +func testDelayMarshalJSON(t *testing.T) { + t.Parallel() + + now := time.Now() + delayQueue := queue.NewDelay( + []delayed{ + {ID: 3, At: now.Add(30 * time.Millisecond)}, + {ID: 1, At: now.Add(10 * time.Millisecond)}, + {ID: 2, At: now.Add(20 * time.Millisecond)}, + }, + delayedDeadline, + ) + + marshaled, err := json.Marshal(delayQueue) + if err != nil { + t.Fatalf("marshal: %v", err) + } + + var got []struct { + ID int `json:"id"` + At time.Time `json:"at"` + } + + if err := json.Unmarshal(marshaled, &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + + ids := []int{got[0].ID, got[1].ID, got[2].ID} + expected := []int{1, 2, 3} + + if !reflect.DeepEqual(expected, ids) { + t.Fatalf("expected ids %v got %v (raw: %s)", expected, ids, marshaled) + } + + // Re-marshal to confirm it's stable. + second, err := json.Marshal(delayQueue) + if err != nil { + t.Fatalf("marshal 2: %v", err) + } + + if !bytes.Equal(marshaled, second) { + t.Fatalf("second marshal differs: %s vs %s", marshaled, second) + } +} + +func testDelayCapacityLesserThanLenElems(t *testing.T) { + t.Parallel() + + delayQueue := queue.NewDelay( + []delayed{ + {ID: 1, At: time.Now()}, + {ID: 2, At: time.Now()}, + {ID: 3, At: time.Now()}, + }, + delayedDeadline, + queue.WithCapacity(2), + ) + + if delayQueue.Size() != 2 { + t.Fatalf("expected size 2 after construction, got %d", delayQueue.Size()) + } +} + +func BenchmarkDelayQueue(b *testing.B) { + past := time.Now().Add(-time.Hour) + + b.Run("Peek", func(b *testing.B) { + delayQueue := queue.NewDelay([]delayed{{ID: 1, At: past}}, delayedDeadline) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i <= b.N; i++ { + _, _ = delayQueue.Peek() + } + }) + + b.Run("Get_Offer", func(b *testing.B) { + delayQueue := queue.NewDelay([]delayed{{ID: 1, At: past}}, delayedDeadline) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i <= b.N; i++ { + _, _ = delayQueue.Get() + + _ = delayQueue.Offer(delayed{ID: i, At: past}) + } + }) + + b.Run("Offer", func(b *testing.B) { + delayQueue := queue.NewDelay[delayed](nil, delayedDeadline) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i <= b.N; i++ { + _ = delayQueue.Offer(delayed{ID: i, At: past}) + } + }) +} + +// Race: concurrent Offers + GetWaits eventually drain all elements. +func TestDelayConcurrentOfferAndGet(t *testing.T) { + t.Parallel() + + const ( + producers = 8 + perProducer = 25 + totalDelayed = producers * perProducer + ) + + delayQueue := queue.NewDelay[delayed](nil, delayedDeadline) + + var offered sync.WaitGroup + + offered.Add(producers) + + for p := 0; p < producers; p++ { + go func(p int) { + defer offered.Done() + + for i := 0; i < perProducer; i++ { + when := time.Now().Add(time.Duration(i%5) * time.Millisecond) + + if err := delayQueue.Offer(delayed{ID: p*perProducer + i, At: when}); err != nil { + t.Errorf("offer: %v", err) + return + } + } + }(p) + } + + seen := make(chan int, totalDelayed) + + var consumers sync.WaitGroup + + consumers.Add(producers) + + for c := 0; c < producers; c++ { + go func() { + defer consumers.Done() + + for i := 0; i < perProducer; i++ { + got := delayQueue.GetWait() + seen <- got.ID + } + }() + } + + offered.Wait() + consumers.Wait() + + close(seen) + + ids := make([]int, 0, totalDelayed) + for id := range seen { + ids = append(ids, id) + } + + sort.Ints(ids) + + for i := 0; i < totalDelayed; i++ { + if ids[i] != i { + t.Fatalf("missing or duplicate id at position %d: %d", i, ids[i]) + } + } +} diff --git a/priority_test.go b/priority_test.go index d5282dd..05277ee 100644 --- a/priority_test.go +++ b/priority_test.go @@ -98,8 +98,8 @@ func testPriorityNegativeCapacity(t *testing.T) { } msg, ok := p.(string) - if !ok || msg != "negative capacity" { - t.Fatalf("expected panic 'negative capacity', got %v", p) + if !ok || msg != negativeCapacityPanic { + t.Fatalf("expected panic %q, got %v", negativeCapacityPanic, p) } }()