seaweedfs/weed/util/buffered_queue/buffered_queue.go
2024-05-05 12:20:08 -07:00

157 lines
3.4 KiB
Go

package buffered_queue
import (
"fmt"
"sync"
)
// ItemChunkNode represents a node in the linked list of job chunks
type ItemChunkNode[T any] struct {
items []T
headIndex int
tailIndex int
next *ItemChunkNode[T]
nodeId int
}
// BufferedQueue implements a buffered queue using a linked list of job chunks
type BufferedQueue[T any] struct {
chunkSize int // Maximum number of items per chunk
head *ItemChunkNode[T]
tail *ItemChunkNode[T]
last *ItemChunkNode[T] // Pointer to the last chunk, for reclaiming memory
count int // Total number of items in the queue
mutex sync.Mutex
nodeCounter int
waitCond *sync.Cond
isClosed bool
}
// NewBufferedQueue creates a new buffered queue with the specified chunk size
func NewBufferedQueue[T any](chunkSize int) *BufferedQueue[T] {
// Create an empty chunk to initialize head and tail
chunk := &ItemChunkNode[T]{items: make([]T, chunkSize), nodeId: 0}
bq := &BufferedQueue[T]{
chunkSize: chunkSize,
head: chunk,
tail: chunk,
last: chunk,
count: 0,
mutex: sync.Mutex{},
}
bq.waitCond = sync.NewCond(&bq.mutex)
return bq
}
// Enqueue adds a job to the queue
func (q *BufferedQueue[T]) Enqueue(job T) error {
if q.isClosed {
return fmt.Errorf("queue is closed")
}
q.mutex.Lock()
defer q.mutex.Unlock()
// If the tail chunk is full, create a new chunk (reusing empty chunks if available)
if q.tail.tailIndex == q.chunkSize {
if q.tail == q.last {
// Create a new chunk
q.nodeCounter++
newChunk := &ItemChunkNode[T]{items: make([]T, q.chunkSize), nodeId: q.nodeCounter}
q.tail.next = newChunk
q.tail = newChunk
q.last = newChunk
} else {
// Reuse an empty chunk
q.tail = q.tail.next
q.tail.headIndex = 0
q.tail.tailIndex = 0
// println("tail moved to chunk", q.tail.nodeId)
}
}
// Add the job to the tail chunk
q.tail.items[q.tail.tailIndex] = job
q.tail.tailIndex++
q.count++
if q.count == 1 {
q.waitCond.Signal()
}
return nil
}
// Dequeue removes and returns a job from the queue
func (q *BufferedQueue[T]) Dequeue() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
for q.count <= 0 && !q.isClosed {
q.waitCond.Wait()
}
if q.count <= 0 && q.isClosed {
var a T
return a, false
}
q.maybeAdjustHeadIndex()
job := q.head.items[q.head.headIndex]
q.head.headIndex++
q.count--
return job, true
}
func (q *BufferedQueue[T]) maybeAdjustHeadIndex() {
if q.head.headIndex == q.chunkSize {
q.last.next = q.head
q.head = q.head.next
q.last = q.last.next
q.last.next = nil
//println("reusing chunk", q.last.nodeId)
//fmt.Printf("head: %+v\n", q.head)
//fmt.Printf("tail: %+v\n", q.tail)
//fmt.Printf("last: %+v\n", q.last)
//fmt.Printf("count: %d\n", q.count)
//for p := q.head; p != nil ; p = p.next {
// fmt.Printf("Node: %+v\n", p)
//}
}
}
func (q *BufferedQueue[T]) PeekHead() (T, bool) {
q.mutex.Lock()
defer q.mutex.Unlock()
if q.count <= 0 {
var a T
return a, false
}
q.maybeAdjustHeadIndex()
job := q.head.items[q.head.headIndex]
return job, true
}
// Size returns the number of items in the queue
func (q *BufferedQueue[T]) Size() int {
q.mutex.Lock()
defer q.mutex.Unlock()
return q.count
}
// IsEmpty returns true if the queue is empty
func (q *BufferedQueue[T]) IsEmpty() bool {
return q.Size() == 0
}
func (q *BufferedQueue[T]) CloseInput() {
q.mutex.Lock()
defer q.mutex.Unlock()
q.isClosed = true
q.waitCond.Broadcast()
}