Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (bq *Blocking[T]) Iterator() <-chan T {
defer bq.lock.Unlock()

// use a buffered channel to avoid blocking the iterator.
iteratorCh := make(chan T, bq.size())
iteratorCh := make(chan T, len(bq.elems))

// close the channel when the function returns.
defer close(iteratorCh)
Expand Down Expand Up @@ -276,10 +276,6 @@ func (bq *Blocking[T]) isFull() bool {
return len(bq.elems) >= *bq.capacity
}

func (bq *Blocking[T]) size() int {
return len(bq.elems)
}

func (bq *Blocking[T]) get() (v T, _ error) {
if bq.isEmpty() {
return v, ErrNoElementsAvailable
Expand Down
37 changes: 26 additions & 11 deletions circular.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
)

// Ensure Priority implements the Queue interface.
// Ensure Circular implements the Queue interface.
var _ Queue[any] = (*Circular[any])(nil)

// Circular is a Queue implementation.
Expand Down Expand Up @@ -189,18 +189,29 @@ func (q *Circular[T]) Contains(elem T) bool {
defer q.lock.RUnlock()

if q.isEmpty() {
return false // queue is empty, item not found
return false
}

for i := 0; i < q.size; i++ {
idx := (q.head + i) % len(q.elems)
// Walk head..end, then wrap to 0..tail. Avoids a modulo per
// iteration in the hot path.
firstChunk := len(q.elems) - q.head
if firstChunk > q.size {
firstChunk = q.size
}

if q.elems[idx] == elem {
return true // item found
for i := 0; i < firstChunk; i++ {
if q.elems[q.head+i] == elem {
return true
}
}

return false // item not found
for i := 0; i < q.size-firstChunk; i++ {
if q.elems[i] == elem {
return true
}
}

return false
}

// Peek returns the element at the head of the queue.
Expand Down Expand Up @@ -274,14 +285,18 @@ func (q *Circular[T]) MarshalJSON() ([]byte, error) {
return []byte("[]"), nil
}

// Collect elements in logical order from head to tail.
// Collect elements in logical order: head..end of array, then
// wrap to 0..tail. Two contiguous copies, no per-element modulo.
elements := make([]T, q.size)

for i := 0; i < q.size; i++ {
index := (q.head + i) % len(q.elems)
elements[i] = q.elems[index]
firstChunk := len(q.elems) - q.head
if firstChunk > q.size {
firstChunk = q.size
}

copy(elements, q.elems[q.head:q.head+firstChunk])
copy(elements[firstChunk:], q.elems[:q.size-firstChunk])

q.lock.RUnlock()

return json.Marshal(elements)
Expand Down
31 changes: 31 additions & 0 deletions circular_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,37 @@ func testCircularContains(t *testing.T) {
t.Fatal("expected elem to not be found")
}
})

t.Run("WrapsAroundBackingArray", func(t *testing.T) {
t.Parallel()

// Force the queue to span the array's wrap point so Contains
// has to walk both the head..end and 0..tail chunks.
circularQueue := queue.NewCircular([]int{1, 2, 3, 4}, 4)

// Pop two from the head, push two new at the tail. Logical
// queue is now [3, 4, 5, 6] but the backing array is
// [5, 6, 3, 4] with head=2, tail=2.
_, _ = circularQueue.Get()
_, _ = circularQueue.Get()

if err := circularQueue.Offer(5); err != nil {
t.Fatalf("offer 5: %v", err)
}

if err := circularQueue.Offer(6); err != nil {
t.Fatalf("offer 6: %v", err)
}

// 4 lives in the head chunk, 5 lives in the wrapped chunk.
if !circularQueue.Contains(4) {
t.Fatal("expected to find 4 in the head chunk")
}

if !circularQueue.Contains(5) {
t.Fatal("expected to find 5 in the wrapped chunk")
}
})
}

func testCircularClear(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions doc.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
// Package queue provides multiple thread-safe generic queue implementations.
// Currently, there are 2 available implementations:
// Currently, there are 4 available implementations:
//
// A blocking queue, which provides methods that wait for the
// queue to have available elements when attempting to retrieve an element, and
// waits for a free slot when attempting to insert an element.
//
// A priority queue based on a container.Heap. The elements in the queue
// must implement the Lesser interface, and are ordered based on the
// Less method. The head of the queue is always the highest priority element.
// A priority queue based on container/heap. Order is defined by a less
// function supplied at construction; the head of the queue is always the
// highest priority element.
//
// A circular queue, which is a queue that uses a fixed-size slice as
// if it were connected end-to-end. When the queue is full, adding a new element to the queue
Expand Down
13 changes: 11 additions & 2 deletions linked.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,18 @@ func (lq *Linked[T]) IsEmpty() bool {
return lq.isEmpty()
}

// IsEmpty returns true if the queue is empty, false otherwise.
// isEmpty returns true if the queue is empty, false otherwise.
func (lq *Linked[T]) isEmpty() bool {
return lq.size == 0
}

// Iterator returns a channel that will be filled with the elements.
// It removes the elements from the queue.
func (lq *Linked[T]) Iterator() <-chan T {
elems := lq.Clear()
lq.lock.Lock()
defer lq.lock.Unlock()

elems := lq.drainLocked()

ch := make(chan T, len(elems))

Expand All @@ -214,6 +217,12 @@ func (lq *Linked[T]) Clear() []T {
lq.lock.Lock()
defer lq.lock.Unlock()

return lq.drainLocked()
}

// drainLocked collects all elements in order and resets the queue.
// Caller must hold the write lock.
func (lq *Linked[T]) drainLocked() []T {
elements := make([]T, lq.size)

current := lq.head
Expand Down
28 changes: 9 additions & 19 deletions priority.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,31 +272,21 @@ func (pq *Priority[T]) Size() int {
return pq.elements.Len()
}

// MarshalJSON serializes the Priority queue to JSON.
// MarshalJSON serializes the Priority queue to JSON in priority order.
func (pq *Priority[T]) MarshalJSON() ([]byte, error) {
pq.lock.RLock()

// Create a temporary copy of the heap to extract elements in order.
tempHeap := &priorityHeap[T]{
elems: make([]T, len(pq.elements.elems)),
lessFunc: pq.elements.lessFunc,
}

copy(tempHeap.elems, pq.elements.elems)
output := make([]T, len(pq.elements.elems))
copy(output, pq.elements.elems)
lessFunc := pq.elements.lessFunc

pq.lock.RUnlock()

heap.Init(tempHeap)

output := make([]T, len(tempHeap.elems))

i := 0

for tempHeap.Len() > 0 {
// nolint: forcetypeassert, revive
output[i] = heap.Pop(tempHeap).(T)
i++
}
// Sorting the copy gives the same result as draining a heap and is
// cache-friendlier than heap.Init + N heap.Pop calls.
sort.Slice(output, func(i, j int) bool {
return lessFunc(output[i], output[j])
})

return json.Marshal(output)
}
Loading