migrated multi host connection pool from godropbox package

removing unneeded dependencies, which involved etcd versions.
This commit is contained in:
Chris Lu 2021-03-06 14:24:01 -08:00
parent 3a96461be3
commit 1444e9d275
12 changed files with 1538 additions and 0 deletions

View File

@ -0,0 +1,159 @@
package net2
import (
"net"
"strings"
"time"
rp "github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool"
)
const defaultDialTimeout = 1 * time.Second
func defaultDialFunc(network string, address string) (net.Conn, error) {
return net.DialTimeout(network, address, defaultDialTimeout)
}
func parseResourceLocation(resourceLocation string) (
network string,
address string) {
idx := strings.Index(resourceLocation, " ")
if idx >= 0 {
return resourceLocation[:idx], resourceLocation[idx+1:]
}
return "", resourceLocation
}
// A thin wrapper around the underlying resource pool.
type connectionPoolImpl struct {
options ConnectionOptions
pool rp.ResourcePool
}
// This returns a connection pool where all connections are connected
// to the same (network, address)
func newBaseConnectionPool(
options ConnectionOptions,
createPool func(rp.Options) rp.ResourcePool) ConnectionPool {
dial := options.Dial
if dial == nil {
dial = defaultDialFunc
}
openFunc := func(loc string) (interface{}, error) {
network, address := parseResourceLocation(loc)
return dial(network, address)
}
closeFunc := func(handle interface{}) error {
return handle.(net.Conn).Close()
}
poolOptions := rp.Options{
MaxActiveHandles: options.MaxActiveConnections,
MaxIdleHandles: options.MaxIdleConnections,
MaxIdleTime: options.MaxIdleTime,
OpenMaxConcurrency: options.DialMaxConcurrency,
Open: openFunc,
Close: closeFunc,
NowFunc: options.NowFunc,
}
return &connectionPoolImpl{
options: options,
pool: createPool(poolOptions),
}
}
// This returns a connection pool where all connections are connected
// to the same (network, address)
func NewSimpleConnectionPool(options ConnectionOptions) ConnectionPool {
return newBaseConnectionPool(options, rp.NewSimpleResourcePool)
}
// This returns a connection pool that manages multiple (network, address)
// entries. The connections to each (network, address) entry acts
// independently. For example ("tcp", "localhost:11211") could act as memcache
// shard 0 and ("tcp", "localhost:11212") could act as memcache shard 1.
func NewMultiConnectionPool(options ConnectionOptions) ConnectionPool {
return newBaseConnectionPool(
options,
func(poolOptions rp.Options) rp.ResourcePool {
return rp.NewMultiResourcePool(poolOptions, nil)
})
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) NumActive() int32 {
return p.pool.NumActive()
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) ActiveHighWaterMark() int32 {
return p.pool.ActiveHighWaterMark()
}
// This returns the number of alive idle connections. This method is not part
// of ConnectionPool's API. It is used only for testing.
func (p *connectionPoolImpl) NumIdle() int {
return p.pool.NumIdle()
}
// BaseConnectionPool can only register a single (network, address) entry.
// Register should be call before any Get calls.
func (p *connectionPoolImpl) Register(network string, address string) error {
return p.pool.Register(network + " " + address)
}
// BaseConnectionPool has nothing to do on Unregister.
func (p *connectionPoolImpl) Unregister(network string, address string) error {
return nil
}
func (p *connectionPoolImpl) ListRegistered() []NetworkAddress {
result := make([]NetworkAddress, 0, 1)
for _, location := range p.pool.ListRegistered() {
network, address := parseResourceLocation(location)
result = append(
result,
NetworkAddress{
Network: network,
Address: address,
})
}
return result
}
// This gets an active connection from the connection pool. Note that network
// and address arguments are ignored (The connections with point to the
// network/address provided by the first Register call).
func (p *connectionPoolImpl) Get(
network string,
address string) (ManagedConn, error) {
handle, err := p.pool.Get(network + " " + address)
if err != nil {
return nil, err
}
return NewManagedConn(network, address, handle, p, p.options), nil
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) Release(conn ManagedConn) error {
return conn.ReleaseConnection()
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) Discard(conn ManagedConn) error {
return conn.DiscardConnection()
}
// See ConnectionPool for documentation.
func (p *connectionPoolImpl) EnterLameDuckMode() {
p.pool.EnterLameDuckMode()
}

View File

@ -0,0 +1,97 @@
package net2
import (
"net"
"time"
)
type ConnectionOptions struct {
// The maximum number of connections that can be active per host at any
// given time (A non-positive value indicates the number of connections
// is unbounded).
MaxActiveConnections int32
// The maximum number of idle connections per host that are kept alive by
// the connection pool.
MaxIdleConnections uint32
// The maximum amount of time an idle connection can alive (if specified).
MaxIdleTime *time.Duration
// This limits the number of concurrent Dial calls (there's no limit when
// DialMaxConcurrency is non-positive).
DialMaxConcurrency int
// Dial specifies the dial function for creating network connections.
// If Dial is nil, net.DialTimeout is used, with timeout set to 1 second.
Dial func(network string, address string) (net.Conn, error)
// This specifies the now time function. When the function is non-nil, the
// connection pool will use the specified function instead of time.Now to
// generate the current time.
NowFunc func() time.Time
// This specifies the timeout for any Read() operation.
// Note that setting this to 0 (i.e. not setting it) will make
// read operations block indefinitely.
ReadTimeout time.Duration
// This specifies the timeout for any Write() operation.
// Note that setting this to 0 (i.e. not setting it) will make
// write operations block indefinitely.
WriteTimeout time.Duration
}
func (o ConnectionOptions) getCurrentTime() time.Time {
if o.NowFunc == nil {
return time.Now()
} else {
return o.NowFunc()
}
}
// A generic interface for managed connection pool. All connection pool
// implementations must be threadsafe.
type ConnectionPool interface {
// This returns the number of active connections that are on loan.
NumActive() int32
// This returns the highest number of active connections for the entire
// lifetime of the pool.
ActiveHighWaterMark() int32
// This returns the number of idle connections that are in the pool.
NumIdle() int
// This associates (network, address) to the connection pool; afterwhich,
// the user can get connections to (network, address).
Register(network string, address string) error
// This dissociate (network, address) from the connection pool;
// afterwhich, the user can no longer get connections to
// (network, address).
Unregister(network string, address string) error
// This returns the list of registered (network, address) entries.
ListRegistered() []NetworkAddress
// This gets an active connection from the connection pool. The connection
// will remain active until one of the following is called:
// 1. conn.ReleaseConnection()
// 2. conn.DiscardConnection()
// 3. pool.Release(conn)
// 4. pool.Discard(conn)
Get(network string, address string) (ManagedConn, error)
// This releases an active connection back to the connection pool.
Release(conn ManagedConn) error
// This discards an active connection from the connection pool.
Discard(conn ManagedConn) error
// Enter the connection pool into lame duck mode. The connection pool
// will no longer return connections, and all idle connections are closed
// immediately (including active connections that are released back to the
// pool afterward).
EnterLameDuckMode()
}

View File

@ -0,0 +1,6 @@
// net2 is a collection of functions meant to supplement the capabilities
// provided by the standard "net" package.
package net2
// copied from https://github.com/dropbox/godropbox/tree/master/net2
// removed other dependencies

177
weed/wdclient/net2/ip.go Normal file
View File

@ -0,0 +1,177 @@
package net2
import (
"fmt"
"log"
"net"
"os"
"strings"
"sync"
)
var myHostname string
var myHostnameOnce sync.Once
// Like os.Hostname but caches first successful result, making it cheap to call it
// over and over.
// It will also crash whole process if fetching Hostname fails!
func MyHostname() string {
myHostnameOnce.Do(func() {
var err error
myHostname, err = os.Hostname()
if err != nil {
log.Fatal(err)
}
})
return myHostname
}
var myIp4 *net.IPAddr
var myIp4Once sync.Once
// Resolves `MyHostname()` to an Ip4 address. Caches first successful result, making it
// cheap to call it over and over.
// It will also crash whole process if resolving the IP fails!
func MyIp4() *net.IPAddr {
myIp4Once.Do(func() {
var err error
myIp4, err = net.ResolveIPAddr("ip4", MyHostname())
if err != nil {
log.Fatal(err)
}
})
return myIp4
}
var myIp6 *net.IPAddr
var myIp6Once sync.Once
// Resolves `MyHostname()` to an Ip6 address. Caches first successful result, making it
// cheap to call it over and over.
// It will also crash whole process if resolving the IP fails!
func MyIp6() *net.IPAddr {
myIp6Once.Do(func() {
var err error
myIp6, err = net.ResolveIPAddr("ip6", MyHostname())
if err != nil {
log.Fatal(err)
}
})
return myIp6
}
// This returns the list of local ip addresses which other hosts can connect
// to (NOTE: Loopback ip is ignored).
// Also resolves Hostname to an address and adds it to the list too, so
// IPs from /etc/hosts can work too.
func GetLocalIPs() ([]*net.IP, error) {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("Failed to lookup hostname: %v", err)
}
// Resolves IP Address from Hostname, this way overrides in /etc/hosts
// can work too for IP resolution.
ipInfo, err := net.ResolveIPAddr("ip4", hostname)
if err != nil {
return nil, fmt.Errorf("Failed to resolve ip: %v", err)
}
ips := []*net.IP{&ipInfo.IP}
// TODO(zviad): Is rest of the code really necessary?
addrs, err := net.InterfaceAddrs()
if err != nil {
return nil, fmt.Errorf( "Failed to get interface addresses: %v", err)
}
for _, addr := range addrs {
ipnet, ok := addr.(*net.IPNet)
if !ok {
continue
}
if ipnet.IP.IsLoopback() {
continue
}
ips = append(ips, &ipnet.IP)
}
return ips, nil
}
var localhostIPNets []*net.IPNet
func init() {
for _, mask := range []string{"127.0.0.1/8", "::1/128"} {
_, ipnet, err := net.ParseCIDR(mask)
if err != nil {
panic(err)
}
localhostIPNets = append(localhostIPNets, ipnet)
}
}
func IsLocalhostIp(ipStr string) bool {
ip := net.ParseIP(ipStr)
if ip == nil {
return false
}
for _, ipnet := range localhostIPNets {
if ipnet.Contains(ip) {
return true
}
}
return false
}
// Given a host string, return true if the host is an ip (v4/v6) localhost.
func IsLocalhost(host string) bool {
return IsLocalhostIp(host) ||
host == "localhost" ||
host == "ip6-localhost" ||
host == "ipv6-localhost"
}
// Resolves hostnames in addresses to actual IP4 addresses. Skips all invalid addresses
// and all addresses that can't be resolved.
// `addrs` are assumed to be of form: ["<hostname>:<port>", ...]
// Returns an error in addition to resolved addresses if not all resolutions succeed.
func ResolveIP4s(addrs []string) ([]string, error) {
resolvedAddrs := make([]string, 0, len(addrs))
var lastErr error
for _, server := range addrs {
hostPort := strings.Split(server, ":")
if len(hostPort) != 2 {
lastErr = fmt.Errorf("Skipping invalid address: %s", server)
continue
}
ip, err := net.ResolveIPAddr("ip4", hostPort[0])
if err != nil {
lastErr = err
continue
}
resolvedAddrs = append(resolvedAddrs, ip.IP.String()+":"+hostPort[1])
}
return resolvedAddrs, lastErr
}
func LookupValidAddrs() (map[string]bool, error) {
hostName, err := os.Hostname()
if err != nil {
return nil, err
}
addrs, err := net.LookupHost(hostName)
if err != nil {
return nil, err
}
validAddrs := make(map[string]bool)
validAddrs[hostName] = true
for _, addr := range addrs {
validAddrs[addr] = true
}
// Special case localhost/127.0.0.1 so that this works on devVMs. It should
// have no affect in production.
validAddrs["127.0.0.1"] = true
validAddrs["localhost"] = true
return validAddrs, nil
}

View File

@ -0,0 +1,185 @@
package net2
import (
"fmt"
"net"
"time"
"errors"
"github.com/chrislusf/seaweedfs/weed/wdclient/resource_pool"
)
// Dial's arguments.
type NetworkAddress struct {
Network string
Address string
}
// A connection managed by a connection pool. NOTE: SetDeadline,
// SetReadDeadline and SetWriteDeadline are disabled for managed connections.
// (The deadlines are set by the connection pool).
type ManagedConn interface {
net.Conn
// This returns the original (network, address) entry used for creating
// the connection.
Key() NetworkAddress
// This returns the underlying net.Conn implementation.
RawConn() net.Conn
// This returns the connection pool which owns this connection.
Owner() ConnectionPool
// This indictes a user is done with the connection and releases the
// connection back to the connection pool.
ReleaseConnection() error
// This indicates the connection is an invalid state, and that the
// connection should be discarded from the connection pool.
DiscardConnection() error
}
// A physical implementation of ManagedConn
type managedConnImpl struct {
addr NetworkAddress
handle resource_pool.ManagedHandle
pool ConnectionPool
options ConnectionOptions
}
// This creates a managed connection wrapper.
func NewManagedConn(
network string,
address string,
handle resource_pool.ManagedHandle,
pool ConnectionPool,
options ConnectionOptions) ManagedConn {
addr := NetworkAddress{
Network: network,
Address: address,
}
return &managedConnImpl{
addr: addr,
handle: handle,
pool: pool,
options: options,
}
}
func (c *managedConnImpl) rawConn() (net.Conn, error) {
h, err := c.handle.Handle()
return h.(net.Conn), err
}
// See ManagedConn for documentation.
func (c *managedConnImpl) RawConn() net.Conn {
h, _ := c.handle.Handle()
return h.(net.Conn)
}
// See ManagedConn for documentation.
func (c *managedConnImpl) Key() NetworkAddress {
return c.addr
}
// See ManagedConn for documentation.
func (c *managedConnImpl) Owner() ConnectionPool {
return c.pool
}
// See ManagedConn for documentation.
func (c *managedConnImpl) ReleaseConnection() error {
return c.handle.Release()
}
// See ManagedConn for documentation.
func (c *managedConnImpl) DiscardConnection() error {
return c.handle.Discard()
}
// See net.Conn for documentation
func (c *managedConnImpl) Read(b []byte) (n int, err error) {
conn, err := c.rawConn()
if err != nil {
return 0, err
}
if c.options.ReadTimeout > 0 {
deadline := c.options.getCurrentTime().Add(c.options.ReadTimeout)
_ = conn.SetReadDeadline(deadline)
}
n, err = conn.Read(b)
if err != nil {
var localAddr string
if conn.LocalAddr() != nil {
localAddr = conn.LocalAddr().String()
} else {
localAddr = "(nil)"
}
var remoteAddr string
if conn.RemoteAddr() != nil {
remoteAddr = conn.RemoteAddr().String()
} else {
remoteAddr = "(nil)"
}
err = fmt.Errorf("Read error from host: %s <-> %s: %v", localAddr, remoteAddr, err)
}
return
}
// See net.Conn for documentation
func (c *managedConnImpl) Write(b []byte) (n int, err error) {
conn, err := c.rawConn()
if err != nil {
return 0, err
}
if c.options.WriteTimeout > 0 {
deadline := c.options.getCurrentTime().Add(c.options.WriteTimeout)
_ = conn.SetWriteDeadline(deadline)
}
n, err = conn.Write(b)
if err != nil {
err = fmt.Errorf("Write error: %v", err)
}
return
}
// See net.Conn for documentation
func (c *managedConnImpl) Close() error {
return c.handle.Discard()
}
// See net.Conn for documentation
func (c *managedConnImpl) LocalAddr() net.Addr {
conn, _ := c.rawConn()
return conn.LocalAddr()
}
// See net.Conn for documentation
func (c *managedConnImpl) RemoteAddr() net.Addr {
conn, _ := c.rawConn()
return conn.RemoteAddr()
}
// SetDeadline is disabled for managed connection (The deadline is set by
// us, with respect to the read/write timeouts specified in ConnectionOptions).
func (c *managedConnImpl) SetDeadline(t time.Time) error {
return errors.New("Cannot set deadline for managed connection")
}
// SetReadDeadline is disabled for managed connection (The deadline is set by
// us with respect to the read timeout specified in ConnectionOptions).
func (c *managedConnImpl) SetReadDeadline(t time.Time) error {
return errors.New("Cannot set read deadline for managed connection")
}
// SetWriteDeadline is disabled for managed connection (The deadline is set by
// us with respect to the write timeout specified in ConnectionOptions).
func (c *managedConnImpl) SetWriteDeadline(t time.Time) error {
return errors.New("Cannot set write deadline for managed connection")
}

View File

@ -0,0 +1,19 @@
package net2
import (
"net"
"strconv"
)
// Returns the port information.
func GetPort(addr net.Addr) (int, error) {
_, lport, err := net.SplitHostPort(addr.String())
if err != nil {
return -1, err
}
lportInt, err := strconv.Atoi(lport)
if err != nil {
return -1, err
}
return lportInt, nil
}

View File

@ -0,0 +1,5 @@
// A generic resource pool for managing resources such as network connections.
package resource_pool
// copied from https://github.com/dropbox/godropbox/tree/master/resource_pool
// removed other dependencies

View File

@ -0,0 +1,97 @@
package resource_pool
import (
"sync/atomic"
"errors"
)
// A resource handle managed by a resource pool.
type ManagedHandle interface {
// This returns the handle's resource location.
ResourceLocation() string
// This returns the underlying resource handle (or error if the handle
// is no longer active).
Handle() (interface{}, error)
// This returns the resource pool which owns this handle.
Owner() ResourcePool
// The releases the underlying resource handle to the caller and marks the
// managed handle as inactive. The caller is responsible for cleaning up
// the released handle. This returns nil if the managed handle no longer
// owns the resource.
ReleaseUnderlyingHandle() interface{}
// This indictes a user is done with the handle and releases the handle
// back to the resource pool.
Release() error
// This indicates the handle is an invalid state, and that the
// connection should be discarded from the connection pool.
Discard() error
}
// A physical implementation of ManagedHandle
type managedHandleImpl struct {
location string
handle interface{}
pool ResourcePool
isActive int32 // atomic bool
options Options
}
// This creates a managed handle wrapper.
func NewManagedHandle(
resourceLocation string,
handle interface{},
pool ResourcePool,
options Options) ManagedHandle {
h := &managedHandleImpl{
location: resourceLocation,
handle: handle,
pool: pool,
options: options,
}
atomic.StoreInt32(&h.isActive, 1)
return h
}
// See ManagedHandle for documentation.
func (c *managedHandleImpl) ResourceLocation() string {
return c.location
}
// See ManagedHandle for documentation.
func (c *managedHandleImpl) Handle() (interface{}, error) {
if atomic.LoadInt32(&c.isActive) == 0 {
return c.handle, errors.New("Resource handle is no longer valid")
}
return c.handle, nil
}
// See ManagedHandle for documentation.
func (c *managedHandleImpl) Owner() ResourcePool {
return c.pool
}
// See ManagedHandle for documentation.
func (c *managedHandleImpl) ReleaseUnderlyingHandle() interface{} {
if atomic.CompareAndSwapInt32(&c.isActive, 1, 0) {
return c.handle
}
return nil
}
// See ManagedHandle for documentation.
func (c *managedHandleImpl) Release() error {
return c.pool.Release(c)
}
// See ManagedHandle for documentation.
func (c *managedHandleImpl) Discard() error {
return c.pool.Discard(c)
}

View File

@ -0,0 +1,200 @@
package resource_pool
import (
"fmt"
"sync"
"errors"
)
// A resource pool implementation that manages multiple resource location
// entries. The handles to each resource location entry acts independently.
// For example "tcp localhost:11211" could act as memcache
// shard 0 and "tcp localhost:11212" could act as memcache shard 1.
type multiResourcePool struct {
options Options
createPool func(Options) ResourcePool
rwMutex sync.RWMutex
isLameDuck bool // guarded by rwMutex
// NOTE: the locationPools is guarded by rwMutex, but the pool entries
// are not.
locationPools map[string]ResourcePool
}
// This returns a MultiResourcePool, which manages multiple
// resource location entries. The handles to each resource location
// entry acts independently.
//
// When createPool is nil, NewSimpleResourcePool is used as default.
func NewMultiResourcePool(
options Options,
createPool func(Options) ResourcePool) ResourcePool {
if createPool == nil {
createPool = NewSimpleResourcePool
}
return &multiResourcePool{
options: options,
createPool: createPool,
rwMutex: sync.RWMutex{},
isLameDuck: false,
locationPools: make(map[string]ResourcePool),
}
}
// See ResourcePool for documentation.
func (p *multiResourcePool) NumActive() int32 {
total := int32(0)
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
for _, pool := range p.locationPools {
total += pool.NumActive()
}
return total
}
// See ResourcePool for documentation.
func (p *multiResourcePool) ActiveHighWaterMark() int32 {
high := int32(0)
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
for _, pool := range p.locationPools {
val := pool.ActiveHighWaterMark()
if val > high {
high = val
}
}
return high
}
// See ResourcePool for documentation.
func (p *multiResourcePool) NumIdle() int {
total := 0
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
for _, pool := range p.locationPools {
total += pool.NumIdle()
}
return total
}
// See ResourcePool for documentation.
func (p *multiResourcePool) Register(resourceLocation string) error {
if resourceLocation == "" {
return errors.New("Registering invalid resource location")
}
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
if p.isLameDuck {
return fmt.Errorf(
"Cannot register %s to lame duck resource pool",
resourceLocation)
}
if _, inMap := p.locationPools[resourceLocation]; inMap {
return nil
}
pool := p.createPool(p.options)
if err := pool.Register(resourceLocation); err != nil {
return err
}
p.locationPools[resourceLocation] = pool
return nil
}
// See ResourcePool for documentation.
func (p *multiResourcePool) Unregister(resourceLocation string) error {
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
if pool, inMap := p.locationPools[resourceLocation]; inMap {
_ = pool.Unregister("")
pool.EnterLameDuckMode()
delete(p.locationPools, resourceLocation)
}
return nil
}
func (p *multiResourcePool) ListRegistered() []string {
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
result := make([]string, 0, len(p.locationPools))
for key, _ := range p.locationPools {
result = append(result, key)
}
return result
}
// See ResourcePool for documentation.
func (p *multiResourcePool) Get(
resourceLocation string) (ManagedHandle, error) {
pool := p.getPool(resourceLocation)
if pool == nil {
return nil, fmt.Errorf(
"%s is not registered in the resource pool",
resourceLocation)
}
return pool.Get(resourceLocation)
}
// See ResourcePool for documentation.
func (p *multiResourcePool) Release(handle ManagedHandle) error {
pool := p.getPool(handle.ResourceLocation())
if pool == nil {
return errors.New(
"Resource pool cannot take control of a handle owned " +
"by another resource pool")
}
return pool.Release(handle)
}
// See ResourcePool for documentation.
func (p *multiResourcePool) Discard(handle ManagedHandle) error {
pool := p.getPool(handle.ResourceLocation())
if pool == nil {
return errors.New(
"Resource pool cannot take control of a handle owned " +
"by another resource pool")
}
return pool.Discard(handle)
}
// See ResourcePool for documentation.
func (p *multiResourcePool) EnterLameDuckMode() {
p.rwMutex.Lock()
defer p.rwMutex.Unlock()
p.isLameDuck = true
for _, pool := range p.locationPools {
pool.EnterLameDuckMode()
}
}
func (p *multiResourcePool) getPool(resourceLocation string) ResourcePool {
p.rwMutex.RLock()
defer p.rwMutex.RUnlock()
if pool, inMap := p.locationPools[resourceLocation]; inMap {
return pool
}
return nil
}

View File

@ -0,0 +1,96 @@
package resource_pool
import (
"time"
)
type Options struct {
// The maximum number of active resource handles per resource location. (A
// non-positive value indicates the number of active resource handles is
// unbounded).
MaxActiveHandles int32
// The maximum number of idle resource handles per resource location that
// are kept alive by the resource pool.
MaxIdleHandles uint32
// The maximum amount of time an idle resource handle can remain alive (if
// specified).
MaxIdleTime *time.Duration
// This limits the number of concurrent Open calls (there's no limit when
// OpenMaxConcurrency is non-positive).
OpenMaxConcurrency int
// This function creates a resource handle (e.g., a connection) for a
// resource location. The function must be thread-safe.
Open func(resourceLocation string) (
handle interface{},
err error)
// This function destroys a resource handle and performs the necessary
// cleanup to free up resources. The function must be thread-safe.
Close func(handle interface{}) error
// This specifies the now time function. When the function is non-nil, the
// resource pool will use the specified function instead of time.Now to
// generate the current time.
NowFunc func() time.Time
}
func (o Options) getCurrentTime() time.Time {
if o.NowFunc == nil {
return time.Now()
} else {
return o.NowFunc()
}
}
// A generic interface for managed resource pool. All resource pool
// implementations must be threadsafe.
type ResourcePool interface {
// This returns the number of active resource handles.
NumActive() int32
// This returns the highest number of actives handles for the entire
// lifetime of the pool. If the pool contains multiple sub-pools, the
// high water mark is the max of the sub-pools' high water marks.
ActiveHighWaterMark() int32
// This returns the number of alive idle handles. NOTE: This is only used
// for testing.
NumIdle() int
// This associates a resource location to the resource pool; afterwhich,
// the user can get resource handles for the resource location.
Register(resourceLocation string) error
// This dissociates a resource location from the resource pool; afterwhich,
// the user can no longer get resource handles for the resource location.
// If the given resource location corresponds to a sub-pool, the unregistered
// sub-pool will enter lame duck mode.
Unregister(resourceLocation string) error
// This returns the list of registered resource location entries.
ListRegistered() []string
// This gets an active resource handle from the resource pool. The
// handle will remain active until one of the following is called:
// 1. handle.Release()
// 2. handle.Discard()
// 3. pool.Release(handle)
// 4. pool.Discard(handle)
Get(key string) (ManagedHandle, error)
// This releases an active resource handle back to the resource pool.
Release(handle ManagedHandle) error
// This discards an active resource from the resource pool.
Discard(handle ManagedHandle) error
// Enter the resource pool into lame duck mode. The resource pool
// will no longer return resource handles, and all idle resource handles
// are closed immediately (including active resource handles that are
// released back to the pool afterward).
EnterLameDuckMode()
}

View File

@ -0,0 +1,154 @@
package resource_pool
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Semaphore interface {
// Increment the semaphore counter by one.
Release()
// Decrement the semaphore counter by one, and block if counter < 0
Acquire()
// Decrement the semaphore counter by one, and block if counter < 0
// Wait for up to the given duration. Returns true if did not timeout
TryAcquire(timeout time.Duration) bool
}
// A simple counting Semaphore.
type boundedSemaphore struct {
slots chan struct{}
}
// Create a bounded semaphore. The count parameter must be a positive number.
// NOTE: The bounded semaphore will panic if the user tries to Release
// beyond the specified count.
func NewBoundedSemaphore(count uint) Semaphore {
sem := &boundedSemaphore{
slots: make(chan struct{}, int(count)),
}
for i := 0; i < cap(sem.slots); i++ {
sem.slots <- struct{}{}
}
return sem
}
// Acquire returns on successful acquisition.
func (sem *boundedSemaphore) Acquire() {
<-sem.slots
}
// TryAcquire returns true if it acquires a resource slot within the
// timeout, false otherwise.
func (sem *boundedSemaphore) TryAcquire(timeout time.Duration) bool {
if timeout > 0 {
// Wait until we get a slot or timeout expires.
tm := time.NewTimer(timeout)
defer tm.Stop()
select {
case <-sem.slots:
return true
case <-tm.C:
// Timeout expired. In very rare cases this might happen even if
// there is a slot available, e.g. GC pause after we create the timer
// and select randomly picked this one out of the two available channels.
// We should do one final immediate check below.
}
}
// Return true if we have a slot available immediately and false otherwise.
select {
case <-sem.slots:
return true
default:
return false
}
}
// Release the acquired semaphore. You must not release more than you
// have acquired.
func (sem *boundedSemaphore) Release() {
select {
case sem.slots <- struct{}{}:
default:
// slots is buffered. If a send blocks, it indicates a programming
// error.
panic(fmt.Errorf("too many releases for boundedSemaphore"))
}
}
// This returns an unbound counting semaphore with the specified initial count.
// The semaphore counter can be arbitrary large (i.e., Release can be called
// unlimited amount of times).
//
// NOTE: In general, users should use bounded semaphore since it is more
// efficient than unbounded semaphore.
func NewUnboundedSemaphore(initialCount int) Semaphore {
res := &unboundedSemaphore{
counter: int64(initialCount),
}
res.cond.L = &res.lock
return res
}
type unboundedSemaphore struct {
lock sync.Mutex
cond sync.Cond
counter int64
}
func (s *unboundedSemaphore) Release() {
s.lock.Lock()
s.counter += 1
if s.counter > 0 {
// Not broadcasting here since it's unlike we can satify all waiting
// goroutines. Instead, we will Signal again if there are left over
// quota after Acquire, in case of lost wakeups.
s.cond.Signal()
}
s.lock.Unlock()
}
func (s *unboundedSemaphore) Acquire() {
s.lock.Lock()
for s.counter < 1 {
s.cond.Wait()
}
s.counter -= 1
if s.counter > 0 {
s.cond.Signal()
}
s.lock.Unlock()
}
func (s *unboundedSemaphore) TryAcquire(timeout time.Duration) bool {
done := make(chan bool, 1)
// Gate used to communicate between the threads and decide what the result
// is. If the main thread decides, we have timed out, otherwise we succeed.
decided := new(int32)
atomic.StoreInt32(decided, 0)
go func() {
s.Acquire()
if atomic.SwapInt32(decided, 1) == 0 {
// Acquire won the race
done <- true
} else {
// If we already decided the result, and this thread did not win
s.Release()
}
}()
select {
case <-done:
return true
case <-time.After(timeout):
if atomic.SwapInt32(decided, 1) == 1 {
// The other thread already decided the result
return true
}
return false
}
}

View File

@ -0,0 +1,343 @@
package resource_pool
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
)
type idleHandle struct {
handle interface{}
keepUntil *time.Time
}
type TooManyHandles struct {
location string
}
func (t TooManyHandles) Error() string {
return fmt.Sprintf("Too many handles to %s", t.location)
}
type OpenHandleError struct {
location string
err error
}
func (o OpenHandleError) Error() string {
return fmt.Sprintf("Failed to open resource handle: %s (%v)", o.location, o.err)
}
// A resource pool implementation where all handles are associated to the
// same resource location.
type simpleResourcePool struct {
options Options
numActive *int32 // atomic counter
activeHighWaterMark *int32 // atomic / monotonically increasing value
openTokens Semaphore
mutex sync.Mutex
location string // guard by mutex
idleHandles []*idleHandle // guarded by mutex
isLameDuck bool // guarded by mutex
}
// This returns a SimpleResourcePool, where all handles are associated to a
// single resource location.
func NewSimpleResourcePool(options Options) ResourcePool {
numActive := new(int32)
atomic.StoreInt32(numActive, 0)
activeHighWaterMark := new(int32)
atomic.StoreInt32(activeHighWaterMark, 0)
var tokens Semaphore
if options.OpenMaxConcurrency > 0 {
tokens = NewBoundedSemaphore(uint(options.OpenMaxConcurrency))
}
return &simpleResourcePool{
location: "",
options: options,
numActive: numActive,
activeHighWaterMark: activeHighWaterMark,
openTokens: tokens,
mutex: sync.Mutex{},
idleHandles: make([]*idleHandle, 0, 0),
isLameDuck: false,
}
}
// See ResourcePool for documentation.
func (p *simpleResourcePool) NumActive() int32 {
return atomic.LoadInt32(p.numActive)
}
// See ResourcePool for documentation.
func (p *simpleResourcePool) ActiveHighWaterMark() int32 {
return atomic.LoadInt32(p.activeHighWaterMark)
}
// See ResourcePool for documentation.
func (p *simpleResourcePool) NumIdle() int {
p.mutex.Lock()
defer p.mutex.Unlock()
return len(p.idleHandles)
}
// SimpleResourcePool can only register a single (network, address) entry.
// Register should be call before any Get calls.
func (p *simpleResourcePool) Register(resourceLocation string) error {
if resourceLocation == "" {
return errors.New("Invalid resource location")
}
p.mutex.Lock()
defer p.mutex.Unlock()
if p.isLameDuck {
return fmt.Errorf(
"cannot register %s to lame duck resource pool",
resourceLocation)
}
if p.location == "" {
p.location = resourceLocation
return nil
}
return errors.New("SimpleResourcePool can only register one location")
}
// SimpleResourcePool will enter lame duck mode upon calling Unregister.
func (p *simpleResourcePool) Unregister(resourceLocation string) error {
p.EnterLameDuckMode()
return nil
}
func (p *simpleResourcePool) ListRegistered() []string {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.location != "" {
return []string{p.location}
}
return []string{}
}
func (p *simpleResourcePool) getLocation() (string, error) {
p.mutex.Lock()
defer p.mutex.Unlock()
if p.location == "" {
return "", fmt.Errorf(
"resource location is not set for SimpleResourcePool")
}
if p.isLameDuck {
return "", fmt.Errorf(
"lame duck resource pool cannot return handles to %s",
p.location)
}
return p.location, nil
}
// This gets an active resource from the resource pool. Note that the
// resourceLocation argument is ignored (The handles are associated to the
// resource location provided by the first Register call).
func (p *simpleResourcePool) Get(unused string) (ManagedHandle, error) {
activeCount := atomic.AddInt32(p.numActive, 1)
if p.options.MaxActiveHandles > 0 &&
activeCount > p.options.MaxActiveHandles {
atomic.AddInt32(p.numActive, -1)
return nil, TooManyHandles{p.location}
}
highest := atomic.LoadInt32(p.activeHighWaterMark)
for activeCount > highest &&
!atomic.CompareAndSwapInt32(
p.activeHighWaterMark,
highest,
activeCount) {
highest = atomic.LoadInt32(p.activeHighWaterMark)
}
if h := p.getIdleHandle(); h != nil {
return h, nil
}
location, err := p.getLocation()
if err != nil {
atomic.AddInt32(p.numActive, -1)
return nil, err
}
if p.openTokens != nil {
// Current implementation does not wait for tokens to become available.
// If that causes availability hits, we could increase the wait,
// similar to simple_pool.go.
if p.openTokens.TryAcquire(0) {
defer p.openTokens.Release()
} else {
// We could not immediately acquire a token.
// Instead of waiting
atomic.AddInt32(p.numActive, -1)
return nil, OpenHandleError{
p.location, errors.New("Open Error: reached OpenMaxConcurrency")}
}
}
handle, err := p.options.Open(location)
if err != nil {
atomic.AddInt32(p.numActive, -1)
return nil, OpenHandleError{p.location, err}
}
return NewManagedHandle(p.location, handle, p, p.options), nil
}
// See ResourcePool for documentation.
func (p *simpleResourcePool) Release(handle ManagedHandle) error {
if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p {
return errors.New(
"Resource pool cannot take control of a handle owned " +
"by another resource pool")
}
h := handle.ReleaseUnderlyingHandle()
if h != nil {
// We can unref either before or after queuing the idle handle.
// The advantage of unref-ing before queuing is that there is
// a higher chance of successful Get when number of active handles
// is close to the limit (but potentially more handle creation).
// The advantage of queuing before unref-ing is that there's a
// higher chance of reusing handle (but potentially more Get failures).
atomic.AddInt32(p.numActive, -1)
p.queueIdleHandles(h)
}
return nil
}
// See ResourcePool for documentation.
func (p *simpleResourcePool) Discard(handle ManagedHandle) error {
if pool, ok := handle.Owner().(*simpleResourcePool); !ok || pool != p {
return errors.New(
"Resource pool cannot take control of a handle owned " +
"by another resource pool")
}
h := handle.ReleaseUnderlyingHandle()
if h != nil {
atomic.AddInt32(p.numActive, -1)
if err := p.options.Close(h); err != nil {
return fmt.Errorf("failed to close resource handle: %v", err)
}
}
return nil
}
// See ResourcePool for documentation.
func (p *simpleResourcePool) EnterLameDuckMode() {
p.mutex.Lock()
toClose := p.idleHandles
p.isLameDuck = true
p.idleHandles = []*idleHandle{}
p.mutex.Unlock()
p.closeHandles(toClose)
}
// This returns an idle resource, if there is one.
func (p *simpleResourcePool) getIdleHandle() ManagedHandle {
var toClose []*idleHandle
defer func() {
// NOTE: Must keep the closure around to late bind the toClose slice.
p.closeHandles(toClose)
}()
now := p.options.getCurrentTime()
p.mutex.Lock()
defer p.mutex.Unlock()
var i int
for i = 0; i < len(p.idleHandles); i++ {
idle := p.idleHandles[i]
if idle.keepUntil == nil || now.Before(*idle.keepUntil) {
break
}
}
if i > 0 {
toClose = p.idleHandles[0:i]
}
if i < len(p.idleHandles) {
idle := p.idleHandles[i]
p.idleHandles = p.idleHandles[i+1:]
return NewManagedHandle(p.location, idle.handle, p, p.options)
}
if len(p.idleHandles) > 0 {
p.idleHandles = []*idleHandle{}
}
return nil
}
// This adds an idle resource to the pool.
func (p *simpleResourcePool) queueIdleHandles(handle interface{}) {
var toClose []*idleHandle
defer func() {
// NOTE: Must keep the closure around to late bind the toClose slice.
p.closeHandles(toClose)
}()
now := p.options.getCurrentTime()
var keepUntil *time.Time
if p.options.MaxIdleTime != nil {
// NOTE: Assign to temp variable first to work around compiler bug
x := now.Add(*p.options.MaxIdleTime)
keepUntil = &x
}
p.mutex.Lock()
defer p.mutex.Unlock()
if p.isLameDuck {
toClose = []*idleHandle{
{handle: handle},
}
return
}
p.idleHandles = append(
p.idleHandles,
&idleHandle{
handle: handle,
keepUntil: keepUntil,
})
nIdleHandles := uint32(len(p.idleHandles))
if nIdleHandles > p.options.MaxIdleHandles {
handlesToClose := nIdleHandles - p.options.MaxIdleHandles
toClose = p.idleHandles[0:handlesToClose]
p.idleHandles = p.idleHandles[handlesToClose:nIdleHandles]
}
}
// Closes resources, at this point it is assumed that this resources
// are no longer referenced from the main idleHandles slice.
func (p *simpleResourcePool) closeHandles(handles []*idleHandle) {
for _, handle := range handles {
_ = p.options.Close(handle.handle)
}
}