diff --git a/blocking.go b/blocking.go index 2335f8e..d93b6c0 100644 --- a/blocking.go +++ b/blocking.go @@ -181,7 +181,7 @@ func (bq *Blocking[T]) Iterator() <-chan T { defer bq.lock.Unlock() // use a buffered channel to avoid blocking the iterator. - iteratorCh := make(chan T, bq.size()) + iteratorCh := make(chan T, len(bq.elems)) // close the channel when the function returns. defer close(iteratorCh) @@ -276,10 +276,6 @@ func (bq *Blocking[T]) isFull() bool { return len(bq.elems) >= *bq.capacity } -func (bq *Blocking[T]) size() int { - return len(bq.elems) -} - func (bq *Blocking[T]) get() (v T, _ error) { if bq.isEmpty() { return v, ErrNoElementsAvailable diff --git a/circular.go b/circular.go index 8af2fa4..9e2a307 100644 --- a/circular.go +++ b/circular.go @@ -5,7 +5,7 @@ import ( "sync" ) -// Ensure Priority implements the Queue interface. +// Ensure Circular implements the Queue interface. var _ Queue[any] = (*Circular[any])(nil) // Circular is a Queue implementation. @@ -189,18 +189,29 @@ func (q *Circular[T]) Contains(elem T) bool { defer q.lock.RUnlock() if q.isEmpty() { - return false // queue is empty, item not found + return false } - for i := 0; i < q.size; i++ { - idx := (q.head + i) % len(q.elems) + // Walk head..end, then wrap to 0..tail. Avoids a modulo per + // iteration in the hot path. + firstChunk := len(q.elems) - q.head + if firstChunk > q.size { + firstChunk = q.size + } - if q.elems[idx] == elem { - return true // item found + for i := 0; i < firstChunk; i++ { + if q.elems[q.head+i] == elem { + return true } } - return false // item not found + for i := 0; i < q.size-firstChunk; i++ { + if q.elems[i] == elem { + return true + } + } + + return false } // Peek returns the element at the head of the queue. @@ -274,14 +285,18 @@ func (q *Circular[T]) MarshalJSON() ([]byte, error) { return []byte("[]"), nil } - // Collect elements in logical order from head to tail. + // Collect elements in logical order: head..end of array, then + // wrap to 0..tail. Two contiguous copies, no per-element modulo. elements := make([]T, q.size) - for i := 0; i < q.size; i++ { - index := (q.head + i) % len(q.elems) - elements[i] = q.elems[index] + firstChunk := len(q.elems) - q.head + if firstChunk > q.size { + firstChunk = q.size } + copy(elements, q.elems[q.head:q.head+firstChunk]) + copy(elements[firstChunk:], q.elems[:q.size-firstChunk]) + q.lock.RUnlock() return json.Marshal(elements) diff --git a/circular_test.go b/circular_test.go index d551bc5..35a3825 100644 --- a/circular_test.go +++ b/circular_test.go @@ -316,6 +316,37 @@ func testCircularContains(t *testing.T) { t.Fatal("expected elem to not be found") } }) + + t.Run("WrapsAroundBackingArray", func(t *testing.T) { + t.Parallel() + + // Force the queue to span the array's wrap point so Contains + // has to walk both the head..end and 0..tail chunks. + circularQueue := queue.NewCircular([]int{1, 2, 3, 4}, 4) + + // Pop two from the head, push two new at the tail. Logical + // queue is now [3, 4, 5, 6] but the backing array is + // [5, 6, 3, 4] with head=2, tail=2. + _, _ = circularQueue.Get() + _, _ = circularQueue.Get() + + if err := circularQueue.Offer(5); err != nil { + t.Fatalf("offer 5: %v", err) + } + + if err := circularQueue.Offer(6); err != nil { + t.Fatalf("offer 6: %v", err) + } + + // 4 lives in the head chunk, 5 lives in the wrapped chunk. + if !circularQueue.Contains(4) { + t.Fatal("expected to find 4 in the head chunk") + } + + if !circularQueue.Contains(5) { + t.Fatal("expected to find 5 in the wrapped chunk") + } + }) } func testCircularClear(t *testing.T) { diff --git a/doc.go b/doc.go index aa34e48..a284e34 100644 --- a/doc.go +++ b/doc.go @@ -1,13 +1,13 @@ // Package queue provides multiple thread-safe generic queue implementations. -// Currently, there are 2 available implementations: +// Currently, there are 4 available implementations: // // A blocking queue, which provides methods that wait for the // queue to have available elements when attempting to retrieve an element, and // waits for a free slot when attempting to insert an element. // -// A priority queue based on a container.Heap. The elements in the queue -// must implement the Lesser interface, and are ordered based on the -// Less method. The head of the queue is always the highest priority element. +// A priority queue based on container/heap. Order is defined by a less +// function supplied at construction; the head of the queue is always the +// highest priority element. // // A circular queue, which is a queue that uses a fixed-size slice as // if it were connected end-to-end. When the queue is full, adding a new element to the queue diff --git a/linked.go b/linked.go index fa02972..8d0f045 100644 --- a/linked.go +++ b/linked.go @@ -188,7 +188,7 @@ func (lq *Linked[T]) IsEmpty() bool { return lq.isEmpty() } -// IsEmpty returns true if the queue is empty, false otherwise. +// isEmpty returns true if the queue is empty, false otherwise. func (lq *Linked[T]) isEmpty() bool { return lq.size == 0 } @@ -196,7 +196,10 @@ func (lq *Linked[T]) isEmpty() bool { // Iterator returns a channel that will be filled with the elements. // It removes the elements from the queue. func (lq *Linked[T]) Iterator() <-chan T { - elems := lq.Clear() + lq.lock.Lock() + defer lq.lock.Unlock() + + elems := lq.drainLocked() ch := make(chan T, len(elems)) @@ -214,6 +217,12 @@ func (lq *Linked[T]) Clear() []T { lq.lock.Lock() defer lq.lock.Unlock() + return lq.drainLocked() +} + +// drainLocked collects all elements in order and resets the queue. +// Caller must hold the write lock. +func (lq *Linked[T]) drainLocked() []T { elements := make([]T, lq.size) current := lq.head diff --git a/priority.go b/priority.go index f601944..2b51202 100644 --- a/priority.go +++ b/priority.go @@ -272,31 +272,21 @@ func (pq *Priority[T]) Size() int { return pq.elements.Len() } -// MarshalJSON serializes the Priority queue to JSON. +// MarshalJSON serializes the Priority queue to JSON in priority order. func (pq *Priority[T]) MarshalJSON() ([]byte, error) { pq.lock.RLock() - // Create a temporary copy of the heap to extract elements in order. - tempHeap := &priorityHeap[T]{ - elems: make([]T, len(pq.elements.elems)), - lessFunc: pq.elements.lessFunc, - } - - copy(tempHeap.elems, pq.elements.elems) + output := make([]T, len(pq.elements.elems)) + copy(output, pq.elements.elems) + lessFunc := pq.elements.lessFunc pq.lock.RUnlock() - heap.Init(tempHeap) - - output := make([]T, len(tempHeap.elems)) - - i := 0 - - for tempHeap.Len() > 0 { - // nolint: forcetypeassert, revive - output[i] = heap.Pop(tempHeap).(T) - i++ - } + // Sorting the copy gives the same result as draining a heap and is + // cache-friendlier than heap.Init + N heap.Pop calls. + sort.Slice(output, func(i, j int) bool { + return lessFunc(output[i], output[j]) + }) return json.Marshal(output) }