309 lines
8.8 KiB
Go
309 lines
8.8 KiB
Go
// Copyright 2019 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package sync
|
|
|
|
import (
|
|
"sync/atomic"
|
|
"unsafe"
|
|
)
|
|
|
|
// poolDequeue is a lock-free fixed-size single-producer,
|
|
// multi-consumer queue. The single producer can both push and pop
|
|
// from the head, and consumers can pop from the tail.
|
|
//
|
|
// It has the added feature that it nils out unused slots to avoid
|
|
// unnecessary retention of objects. This is important for sync.Pool,
|
|
// but not typically a property considered in the literature.
|
|
type poolDequeue struct {
|
|
// headTail packs together a 32-bit head index and a 32-bit
|
|
// tail index. Both are indexes into vals modulo len(vals)-1.
|
|
//
|
|
// tail = index of oldest data in queue
|
|
// head = index of next slot to fill
|
|
//
|
|
// Slots in the range [tail, head) are owned by consumers.
|
|
// A consumer continues to own a slot outside this range until
|
|
// it nils the slot, at which point ownership passes to the
|
|
// producer.
|
|
//
|
|
// The head index is stored in the most-significant bits so
|
|
// that we can atomically add to it and the overflow is
|
|
// harmless.
|
|
headTail uint64
|
|
|
|
// vals is a ring buffer of interface{} values stored in this
|
|
// dequeue. The size of this must be a power of 2.
|
|
//
|
|
// vals[i].typ is nil if the slot is empty and non-nil
|
|
// otherwise. A slot is still in use until *both* the tail
|
|
// index has moved beyond it and typ has been set to nil. This
|
|
// is set to nil atomically by the consumer and read
|
|
// atomically by the producer.
|
|
vals []eface
|
|
}
|
|
|
|
type eface struct {
|
|
typ, val unsafe.Pointer
|
|
}
|
|
|
|
const dequeueBits = 32
|
|
|
|
// dequeueLimit is the maximum size of a poolDequeue.
|
|
//
|
|
// This must be at most (1<<dequeueBits)/2 because detecting fullness
|
|
// depends on wrapping around the ring buffer without wrapping around
|
|
// the index. We divide by 4 so this fits in an int on 32-bit.
|
|
const dequeueLimit = (1 << dequeueBits) / 4
|
|
|
|
// dequeueNil is used in poolDequeue to represent interface{}(nil).
|
|
// Since we use nil to represent empty slots, we need a sentinel value
|
|
// to represent nil.
|
|
type dequeueNil *struct{}
|
|
|
|
func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
|
|
const mask = 1<<dequeueBits - 1
|
|
head = uint32((ptrs >> dequeueBits) & mask)
|
|
tail = uint32(ptrs & mask)
|
|
return
|
|
}
|
|
|
|
func (d *poolDequeue) pack(head, tail uint32) uint64 {
|
|
const mask = 1<<dequeueBits - 1
|
|
return (uint64(head) << dequeueBits) |
|
|
uint64(tail&mask)
|
|
}
|
|
|
|
// pushHead adds val at the head of the queue. It returns false if the
|
|
// queue is full. It must only be called by a single producer.
|
|
func (d *poolDequeue) pushHead(val any) bool {
|
|
ptrs := atomic.LoadUint64(&d.headTail)
|
|
head, tail := d.unpack(ptrs)
|
|
if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
|
|
// Queue is full.
|
|
return false
|
|
}
|
|
slot := &d.vals[head&uint32(len(d.vals)-1)]
|
|
|
|
// Check if the head slot has been released by popTail.
|
|
typ := atomic.LoadPointer(&slot.typ)
|
|
if typ != nil {
|
|
// Another goroutine is still cleaning up the tail, so
|
|
// the queue is actually still full.
|
|
return false
|
|
}
|
|
|
|
// The head slot is free, so we own it.
|
|
if val == nil {
|
|
val = dequeueNil(nil)
|
|
}
|
|
*(*any)(unsafe.Pointer(slot)) = val
|
|
|
|
// Increment head. This passes ownership of slot to popTail
|
|
// and acts as a store barrier for writing the slot.
|
|
atomic.AddUint64(&d.headTail, 1<<dequeueBits)
|
|
return true
|
|
}
|
|
|
|
// popHead removes and returns the element at the head of the queue.
|
|
// It returns false if the queue is empty. It must only be called by a
|
|
// single producer.
|
|
func (d *poolDequeue) popHead() (any, bool) {
|
|
var slot *eface
|
|
for {
|
|
ptrs := atomic.LoadUint64(&d.headTail)
|
|
head, tail := d.unpack(ptrs)
|
|
if tail == head {
|
|
// Queue is empty.
|
|
return nil, false
|
|
}
|
|
|
|
// Confirm tail and decrement head. We do this before
|
|
// reading the value to take back ownership of this
|
|
// slot.
|
|
head--
|
|
ptrs2 := d.pack(head, tail)
|
|
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
|
|
// We successfully took back slot.
|
|
slot = &d.vals[head&uint32(len(d.vals)-1)]
|
|
break
|
|
}
|
|
}
|
|
|
|
val := *(*any)(unsafe.Pointer(slot))
|
|
if val == dequeueNil(nil) {
|
|
val = nil
|
|
}
|
|
// Zero the slot. Unlike popTail, this isn't racing with
|
|
// pushHead, so we don't need to be careful here.
|
|
*slot = eface{}
|
|
return val, true
|
|
}
|
|
|
|
// popTail removes and returns the element at the tail of the queue.
|
|
// It returns false if the queue is empty. It may be called by any
|
|
// number of consumers.
|
|
func (d *poolDequeue) popTail() (any, bool) {
|
|
var slot *eface
|
|
for {
|
|
ptrs := atomic.LoadUint64(&d.headTail)
|
|
head, tail := d.unpack(ptrs)
|
|
if tail == head {
|
|
// Queue is empty.
|
|
return nil, false
|
|
}
|
|
|
|
// Confirm head and tail (for our speculative check
|
|
// above) and increment tail. If this succeeds, then
|
|
// we own the slot at tail.
|
|
ptrs2 := d.pack(head, tail+1)
|
|
if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
|
|
// Success.
|
|
slot = &d.vals[tail&uint32(len(d.vals)-1)]
|
|
break
|
|
}
|
|
}
|
|
|
|
// We now own slot.
|
|
val := *(*any)(unsafe.Pointer(slot))
|
|
if val == dequeueNil(nil) {
|
|
val = nil
|
|
}
|
|
|
|
// Tell pushHead that we're done with this slot. Zeroing the
|
|
// slot is also important so we don't leave behind references
|
|
// that could keep this object live longer than necessary.
|
|
//
|
|
// We write to val first and then publish that we're done with
|
|
// this slot by atomically writing to typ.
|
|
slot.val = nil
|
|
atomic.StorePointer(&slot.typ, nil)
|
|
// At this point pushHead owns the slot.
|
|
|
|
return val, true
|
|
}
|
|
|
|
// poolChain is a dynamically-sized version of poolDequeue.
|
|
//
|
|
// This is implemented as a doubly-linked list queue of poolDequeues
|
|
// where each dequeue is double the size of the previous one. Once a
|
|
// dequeue fills up, this allocates a new one and only ever pushes to
|
|
// the latest dequeue. Pops happen from the other end of the list and
|
|
// once a dequeue is exhausted, it gets removed from the list.
|
|
type poolChain struct {
|
|
// head is the poolDequeue to push to. This is only accessed
|
|
// by the producer, so doesn't need to be synchronized.
|
|
head *poolChainElt
|
|
|
|
// tail is the poolDequeue to popTail from. This is accessed
|
|
// by consumers, so reads and writes must be atomic.
|
|
tail *poolChainElt
|
|
}
|
|
|
|
type poolChainElt struct {
|
|
poolDequeue
|
|
|
|
// next and prev link to the adjacent poolChainElts in this
|
|
// poolChain.
|
|
//
|
|
// next is written atomically by the producer and read
|
|
// atomically by the consumer. It only transitions from nil to
|
|
// non-nil.
|
|
//
|
|
// prev is written atomically by the consumer and read
|
|
// atomically by the producer. It only transitions from
|
|
// non-nil to nil.
|
|
next, prev *poolChainElt
|
|
}
|
|
|
|
func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
|
|
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
|
|
}
|
|
|
|
func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
|
|
return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
|
|
}
|
|
|
|
func (c *poolChain) pushHead(val any) {
|
|
d := c.head
|
|
if d == nil {
|
|
// Initialize the chain.
|
|
const initSize = 8 // Must be a power of 2
|
|
d = new(poolChainElt)
|
|
d.vals = make([]eface, initSize)
|
|
c.head = d
|
|
storePoolChainElt(&c.tail, d)
|
|
}
|
|
|
|
if d.pushHead(val) {
|
|
return
|
|
}
|
|
|
|
// The current dequeue is full. Allocate a new one of twice
|
|
// the size.
|
|
newSize := len(d.vals) * 2
|
|
if newSize >= dequeueLimit {
|
|
// Can't make it any bigger.
|
|
newSize = dequeueLimit
|
|
}
|
|
|
|
d2 := &poolChainElt{prev: d}
|
|
d2.vals = make([]eface, newSize)
|
|
c.head = d2
|
|
storePoolChainElt(&d.next, d2)
|
|
d2.pushHead(val)
|
|
}
|
|
|
|
func (c *poolChain) popHead() (any, bool) {
|
|
d := c.head
|
|
for d != nil {
|
|
if val, ok := d.popHead(); ok {
|
|
return val, ok
|
|
}
|
|
// There may still be unconsumed elements in the
|
|
// previous dequeue, so try backing up.
|
|
d = loadPoolChainElt(&d.prev)
|
|
}
|
|
return nil, false
|
|
}
|
|
|
|
func (c *poolChain) popTail() (any, bool) {
|
|
d := loadPoolChainElt(&c.tail)
|
|
if d == nil {
|
|
return nil, false
|
|
}
|
|
|
|
for {
|
|
// It's important that we load the next pointer
|
|
// *before* popping the tail. In general, d may be
|
|
// transiently empty, but if next is non-nil before
|
|
// the pop and the pop fails, then d is permanently
|
|
// empty, which is the only condition under which it's
|
|
// safe to drop d from the chain.
|
|
d2 := loadPoolChainElt(&d.next)
|
|
|
|
if val, ok := d.popTail(); ok {
|
|
return val, ok
|
|
}
|
|
|
|
if d2 == nil {
|
|
// This is the only dequeue. It's empty right
|
|
// now, but could be pushed to in the future.
|
|
return nil, false
|
|
}
|
|
|
|
// The tail of the chain has been drained, so move on
|
|
// to the next dequeue. Try to drop it from the chain
|
|
// so the next pop doesn't have to look at the empty
|
|
// dequeue again.
|
|
if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
|
|
// We won the race. Clear the prev pointer so
|
|
// the garbage collector can collect the empty
|
|
// dequeue and so popHead doesn't back up
|
|
// further than necessary.
|
|
storePoolChainElt(&d2.prev, nil)
|
|
}
|
|
d = d2
|
|
}
|
|
}
|