代码积累 --通用连接池实现-pool

  |   0 评论   |   0 浏览

首先定义参数结构体以及接口,需引入我们之前介绍的 time 包,代码如下

连接池接口定义

package pool

import (
	"context"
	"errors"
	"io"
	"time"

	xtime "common/library/time"
)

var (
	// 连接已用尽。
	ErrPoolExhausted = errors.New("container/pool exhausted")
	// 连接池已关闭
	ErrPoolClosed = errors.New("container/pool closed")
	// 返回当前时间
	nowFunc = time.Now
)

// 连接池参数结构体
type Config struct {
	//连接池在给定时间分配的活动项数。
	//如果为零,则池中的项目数没有限制。
	Active int
	// 连接池空闲数
	Idle int
	// 空闲时间
	IdleTimeout xtime.Duration
	// 获取连接等待时长
	WaitTimeout xtime.Duration
	//如果Wait设置为true,则等待ctx超时,或默认flase并直接返回。
	Wait bool
}

type item struct {
	createdAt time.Time
	c         io.Closer
}

func (i *item) expired(timeout time.Duration) bool {
	if timeout <= 0 {
		return false
	}
	return i.createdAt.Add(timeout).Before(nowFunc())
}

func (i *item) close() error {
	return i.c.Close()
}

// 连接池接口定义
type Pool interface {
	Get(ctx context.Context) (io.Closer, error)
	Put(ctx context.Context, c io.Closer, forceClose bool) error
	Close() error
}

连接池实现

slice

package pool

import (
	"context"
	"io"
	"sync"
	"time"
)

var _ Pool = &Slice{}

// Slice .
type Slice struct {
	// New is an application supplied function for creating and configuring a
	// item.
	//
	// The item returned from new must not be in a special state
	// (subscribed to pubsub channel, transaction started, ...).
	New  func(ctx context.Context) (io.Closer, error)
	stop func() // stop cancels the item opener.

	// mu protects fields defined below.
	mu           sync.Mutex
	freeItem     []*item
	itemRequests map[uint64]chan item
	nextRequest  uint64 // Next key to use in itemRequests.
	active       int    // number of opened and pending open items
	// Used to signal the need for new items
	// a goroutine running itemOpener() reads on this chan and
	// maybeOpenNewItems sends on the chan (one send per needed item)
	// It is closed during db.Close(). The close tells the itemOpener
	// goroutine to exit.
	openerCh  chan struct{}
	closed    bool
	cleanerCh chan struct{}

	// Config pool configuration
	conf *Config
}

// NewSlice creates a new pool.
func NewSlice(c *Config) *Slice {
	// check Config
	if c == nil || c.Active < c.Idle {
		panic("config nil or Idle Must <= Active")
	}
	ctx, cancel := context.WithCancel(context.Background())
	// new pool
	p := &Slice{
		conf:         c,
		stop:         cancel,
		itemRequests: make(map[uint64]chan item),
		openerCh:     make(chan struct{}, 1000000),
	}
	p.startCleanerLocked(time.Duration(c.IdleTimeout))

	go p.itemOpener(ctx)
	return p
}

// Reload reload config.
func (p *Slice) Reload(c *Config) error {
	p.mu.Lock()
	p.startCleanerLocked(time.Duration(c.IdleTimeout))
	p.setActive(c.Active)
	p.setIdle(c.Idle)
	p.conf = c
	p.mu.Unlock()
	return nil
}

// Get returns a newly-opened or cached *item.
func (p *Slice) Get(ctx context.Context) (io.Closer, error) {
	p.mu.Lock()
	if p.closed {
		p.mu.Unlock()
		return nil, ErrPoolClosed
	}
	idleTimeout := time.Duration(p.conf.IdleTimeout)
	// Prefer a free item, if possible.
	numFree := len(p.freeItem)
	for numFree > 0 {
		i := p.freeItem[0]
		copy(p.freeItem, p.freeItem[1:])
		p.freeItem = p.freeItem[:numFree-1]
		p.mu.Unlock()
		if i.expired(idleTimeout) {
			i.close()
			p.mu.Lock()
			p.release()
		} else {
			return i.c, nil
		}
		numFree = len(p.freeItem)
	}

	// Out of free items or we were asked not to use one. If we're not
	// allowed to open any more items, make a request and wait.
	if p.conf.Active > 0 && p.active >= p.conf.Active {
		// check WaitTimeout and return directly
		if p.conf.WaitTimeout == 0 && !p.conf.Wait {
			p.mu.Unlock()
			return nil, ErrPoolExhausted
		}
		// Make the item channel. It's buffered so that the
		// itemOpener doesn't block while waiting for the req to be read.
		req := make(chan item, 1)
		reqKey := p.nextRequestKeyLocked()
		p.itemRequests[reqKey] = req
		wt := p.conf.WaitTimeout
		p.mu.Unlock()

		// reset context timeout
		if wt > 0 {
			var cancel func()
			_, ctx, cancel = wt.Shrink(ctx)
			defer cancel()
		}
		// Timeout the item request with the context.
		select {
		case <-ctx.Done():
			// Remove the item request and ensure no value has been sent
			// on it after removing.
			p.mu.Lock()
			delete(p.itemRequests, reqKey)
			p.mu.Unlock()
			return nil, ctx.Err()
		case ret, ok := <-req:
			if !ok {
				return nil, ErrPoolClosed
			}
			if ret.expired(idleTimeout) {
				ret.close()
				p.mu.Lock()
				p.release()
			} else {
				return ret.c, nil
			}
		}
	}

	p.active++ // optimistically
	p.mu.Unlock()
	c, err := p.New(ctx)
	if err != nil {
		p.mu.Lock()
		p.release()
		p.mu.Unlock()
		return nil, err
	}
	return c, nil
}

// Put adds a item to the p's free pool.
// err is optionally the last error that occurred on this item.
func (p *Slice) Put(ctx context.Context, c io.Closer, forceClose bool) error {
	p.mu.Lock()
	defer p.mu.Unlock()
	if forceClose {
		p.release()
		return c.Close()
	}
	added := p.putItemLocked(c)
	if !added {
		p.active--
		return c.Close()
	}
	return nil
}

// Satisfy a item or put the item in the idle pool and return true
// or return false.
// putItemLocked will satisfy a item if there is one, or it will
// return the *item to the freeItem list if err == nil and the idle
// item limit will not be exceeded.
// If err != nil, the value of i is ignored.
// If err == nil, then i must not equal nil.
// If a item was fulfilled or the *item was placed in the
// freeItem list, then true is returned, otherwise false is returned.
func (p *Slice) putItemLocked(c io.Closer) bool {
	if p.closed {
		return false
	}
	if p.conf.Active > 0 && p.active > p.conf.Active {
		return false
	}
	i := item{
		c:         c,
		createdAt: nowFunc(),
	}
	if l := len(p.itemRequests); l > 0 {
		var req chan item
		var reqKey uint64
		for reqKey, req = range p.itemRequests {
			break
		}
		delete(p.itemRequests, reqKey) // Remove from pending requests.
		req <- i
		return true
	} else if !p.closed && p.maxIdleItemsLocked() > len(p.freeItem) {
		p.freeItem = append(p.freeItem, &i)
		return true
	}
	return false
}

// Runs in a separate goroutine, opens new item when requested.
func (p *Slice) itemOpener(ctx context.Context) {
	for {
		select {
		case <-ctx.Done():
			return
		case <-p.openerCh:
			p.openNewItem(ctx)
		}
	}
}

func (p *Slice) maybeOpenNewItems() {
	numRequests := len(p.itemRequests)
	if p.conf.Active > 0 {
		numCanOpen := p.conf.Active - p.active
		if numRequests > numCanOpen {
			numRequests = numCanOpen
		}
	}
	for numRequests > 0 {
		p.active++ // optimistically
		numRequests--
		if p.closed {
			return
		}
		p.openerCh <- struct{}{}
	}
}

// openNewItem one new item
func (p *Slice) openNewItem(ctx context.Context) {
	// maybeOpenNewConnctions has already executed p.active++ before it sent
	// on p.openerCh. This function must execute p.active-- if the
	// item fails or is closed before returning.
	c, err := p.New(ctx)
	p.mu.Lock()
	defer p.mu.Unlock()
	if err != nil {
		p.release()
		return
	}
	if !p.putItemLocked(c) {
		p.active--
		c.Close()
	}
}

// setIdle sets the maximum number of items in the idle
// item pool.
//
// If MaxOpenConns is greater than 0 but less than the new IdleConns
// then the new IdleConns will be reduced to match the MaxOpenConns limit
//
// If n <= 0, no idle items are retained.
func (p *Slice) setIdle(n int) {
	p.mu.Lock()
	if n > 0 {
		p.conf.Idle = n
	} else {
		// No idle items.
		p.conf.Idle = -1
	}
	// Make sure maxIdle doesn't exceed maxOpen
	if p.conf.Active > 0 && p.maxIdleItemsLocked() > p.conf.Active {
		p.conf.Idle = p.conf.Active
	}
	var closing []*item
	idleCount := len(p.freeItem)
	maxIdle := p.maxIdleItemsLocked()
	if idleCount > maxIdle {
		closing = p.freeItem[maxIdle:]
		p.freeItem = p.freeItem[:maxIdle]
	}
	p.mu.Unlock()
	for _, c := range closing {
		c.close()
	}
}

// setActive sets the maximum number of open items to the database.
//
// If IdleConns is greater than 0 and the new MaxOpenConns is less than
// IdleConns, then IdleConns will be reduced to match the new
// MaxOpenConns limit
//
// If n <= 0, then there is no limit on the number of open items.
// The default is 0 (unlimited).
func (p *Slice) setActive(n int) {
	p.mu.Lock()
	p.conf.Active = n
	if n < 0 {
		p.conf.Active = 0
	}
	syncIdle := p.conf.Active > 0 && p.maxIdleItemsLocked() > p.conf.Active
	p.mu.Unlock()
	if syncIdle {
		p.setIdle(n)
	}
}

// startCleanerLocked starts itemCleaner if needed.
func (p *Slice) startCleanerLocked(d time.Duration) {
	if d <= 0 {
		// if set 0, staleCleaner() will return directly
		return
	}
	if d < time.Duration(p.conf.IdleTimeout) && p.cleanerCh != nil {
		select {
		case p.cleanerCh <- struct{}{}:
		default:
		}
	}
	// run only one, clean stale items.
	if p.cleanerCh == nil {
		p.cleanerCh = make(chan struct{}, 1)
		go p.staleCleaner(time.Duration(p.conf.IdleTimeout))
	}
}

func (p *Slice) staleCleaner(d time.Duration) {
	const minInterval = 100 * time.Millisecond

	if d < minInterval {
		d = minInterval
	}
	t := time.NewTimer(d)

	for {
		select {
		case <-t.C:
		case <-p.cleanerCh: // maxLifetime was changed or db was closed.
		}
		p.mu.Lock()
		d = time.Duration(p.conf.IdleTimeout)
		if p.closed || d <= 0 {
			p.mu.Unlock()
			return
		}

		expiredSince := nowFunc().Add(-d)
		var closing []*item
		for i := 0; i < len(p.freeItem); i++ {
			c := p.freeItem[i]
			if c.createdAt.Before(expiredSince) {
				closing = append(closing, c)
				p.active--
				last := len(p.freeItem) - 1
				p.freeItem[i] = p.freeItem[last]
				p.freeItem[last] = nil
				p.freeItem = p.freeItem[:last]
				i--
			}
		}
		p.mu.Unlock()

		for _, c := range closing {
			c.close()
		}

		if d < minInterval {
			d = minInterval
		}
		t.Reset(d)
	}
}

// nextRequestKeyLocked returns the next item request key.
// It is assumed that nextRequest will not overflow.
func (p *Slice) nextRequestKeyLocked() uint64 {
	next := p.nextRequest
	p.nextRequest++
	return next
}

const defaultIdleItems = 2

func (p *Slice) maxIdleItemsLocked() int {
	n := p.conf.Idle
	switch {
	case n == 0:
		return defaultIdleItems
	case n < 0:
		return 0
	default:
		return n
	}
}

func (p *Slice) release() {
	p.active--
	p.maybeOpenNewItems()
}

// Close close pool.
func (p *Slice) Close() error {
	p.mu.Lock()
	if p.closed {
		p.mu.Unlock()
		return nil
	}
	if p.cleanerCh != nil {
		close(p.cleanerCh)
	}
	var err error
	for _, i := range p.freeItem {
		i.close()
	}
	p.freeItem = nil
	p.closed = true
	for _, req := range p.itemRequests {
		close(req)
	}
	p.mu.Unlock()
	p.stop()
	return err
}

测试用例

package pool

import (
	"context"
	"io"
	"testing"
	"time"

	xtime "common/library/time"

	"github.com/stretchr/testify/assert"
)

type closer struct {
}

func (c *closer) Close() error {
	return nil
}

type connection struct {
	c    io.Closer
	pool Pool
}

func (c *connection) HandleQuick() {
	//	time.Sleep(1 * time.Millisecond)
}

func (c *connection) HandleNormal() {
	time.Sleep(20 * time.Millisecond)
}

func (c *connection) HandleSlow() {
	time.Sleep(500 * time.Millisecond)
}

func (c *connection) Close() {
	c.pool.Put(context.Background(), c.c, false)
}

func TestSliceGetPut(t *testing.T) {
	// new pool
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	// test Get Put
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	c1 := connection{pool: pool, c: conn}
	c1.HandleNormal()
	c1.Close()
}

func TestSlicePut(t *testing.T) {
	var id = 0
	type connID struct {
		io.Closer
		id int
	}
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(1 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: false,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		id = id + 1
		return &connID{id: id, Closer: &closer{}}, nil
	}
	// test Put(ctx, conn, true)
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	conn1 := conn.(*connID)
	// Put(ctx, conn, true) drop the connection.
	pool.Put(context.TODO(), conn, true)
	conn, err = pool.Get(context.TODO())
	assert.Nil(t, err)
	conn2 := conn.(*connID)
	assert.NotEqual(t, conn1.id, conn2.id)
}

func TestSliceIdleTimeout(t *testing.T) {
	var id = 0
	type connID struct {
		io.Closer
		id int
	}
	config := &Config{
		Active: 1,
		Idle:   1,
		// conn timeout
		IdleTimeout: xtime.Duration(1 * time.Millisecond),
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		id = id + 1
		return &connID{id: id, Closer: &closer{}}, nil
	}
	// test Put(ctx, conn, true)
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	conn1 := conn.(*connID)
	// Put(ctx, conn, true) drop the connection.
	pool.Put(context.TODO(), conn, false)
	time.Sleep(5 * time.Millisecond)
	// idletimeout and get new conn
	conn, err = pool.Get(context.TODO())
	assert.Nil(t, err)
	conn2 := conn.(*connID)
	assert.NotEqual(t, conn1.id, conn2.id)
}

func TestSliceContextTimeout(t *testing.T) {
	// new pool
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}
	// test context timeout
	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
	defer cancel()
	conn, err := pool.Get(ctx)
	assert.Nil(t, err)
	_, err = pool.Get(ctx)
	// context timeout error
	assert.NotNil(t, err)
	pool.Put(context.TODO(), conn, false)
	_, err = pool.Get(ctx)
	assert.Nil(t, err)
}

func TestSlicePoolExhausted(t *testing.T) {
	// test pool exhausted
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(90 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: false,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
	defer cancel()
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	_, err = pool.Get(ctx)
	// config active == 1, so no avaliable conns make connection exhausted.
	assert.NotNil(t, err)
	pool.Put(context.TODO(), conn, false)
	_, err = pool.Get(ctx)
	assert.Nil(t, err)
}

func TestSliceStaleClean(t *testing.T) {
	var id = 0
	type connID struct {
		io.Closer
		id int
	}
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(1 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		id = id + 1
		return &connID{id: id, Closer: &closer{}}, nil
	}
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	conn1 := conn.(*connID)
	pool.Put(context.TODO(), conn, false)
	conn, err = pool.Get(context.TODO())
	assert.Nil(t, err)
	conn2 := conn.(*connID)
	assert.Equal(t, conn1.id, conn2.id)
	pool.Put(context.TODO(), conn, false)
	// sleep more than idleTimeout
	time.Sleep(2 * time.Second)
	conn, err = pool.Get(context.TODO())
	assert.Nil(t, err)
	conn3 := conn.(*connID)
	assert.NotEqual(t, conn1.id, conn3.id)
}

func BenchmarkSlice1(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleQuick()
			c1.Close()
		}
	})
}

func BenchmarkSlice2(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleNormal()
			c1.Close()
		}
	})
}

func BenchmarkSlice3(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleSlow()
			c1.Close()
		}
	})
}

func BenchmarkSlice4(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: false,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleSlow()
			c1.Close()
		}
	})
}

func BenchmarkSlice5(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: true,
	}
	pool := NewSlice(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleSlow()
			c1.Close()
		}
	})
}

list

package pool

import (
	"container/list"
	"context"
	"io"
	"sync"
	"time"
)

var _ Pool = &List{}

// List .
type List struct {
	// New is an application supplied function for creating and configuring a
	// item.
	//
	// The item returned from new must not be in a special state
	// (subscribed to pubsub channel, transaction started, ...).
	New func(ctx context.Context) (io.Closer, error)

	// mu protects fields defined below.
	mu     sync.Mutex
	cond   chan struct{}
	closed bool
	active int
	// clean stale items
	cleanerCh chan struct{}

	// Stack of item with most recently used at the front.
	idles list.List

	// Config pool configuration
	conf *Config
}

// NewList creates a new pool.
func NewList(c *Config) *List {
	// check Config
	if c == nil || c.Active < c.Idle {
		panic("config nil or Idle Must <= Active")
	}
	// new pool
	p := &List{conf: c}
	p.cond = make(chan struct{})
	p.startCleanerLocked(time.Duration(c.IdleTimeout))
	return p
}

// Reload reload config.
func (p *List) Reload(c *Config) error {
	p.mu.Lock()
	p.startCleanerLocked(time.Duration(c.IdleTimeout))
	p.conf = c
	p.mu.Unlock()
	return nil
}

// startCleanerLocked
func (p *List) startCleanerLocked(d time.Duration) {
	if d <= 0 {
		// if set 0, staleCleaner() will return directly
		return
	}
	if d < time.Duration(p.conf.IdleTimeout) && p.cleanerCh != nil {
		select {
		case p.cleanerCh <- struct{}{}:
		default:
		}
	}
	// run only one, clean stale items.
	if p.cleanerCh == nil {
		p.cleanerCh = make(chan struct{}, 1)
		go p.staleCleaner()
	}
}

// staleCleaner clean stale items proc.
func (p *List) staleCleaner() {
	ticker := time.NewTicker(100 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-p.cleanerCh: // maxLifetime was changed or db was closed.
		}
		p.mu.Lock()
		if p.closed || p.conf.IdleTimeout <= 0 {
			p.mu.Unlock()
			return
		}
		for i, n := 0, p.idles.Len(); i < n; i++ {
			e := p.idles.Back()
			if e == nil {
				// no possible
				break
			}
			ic := e.Value.(item)
			if !ic.expired(time.Duration(p.conf.IdleTimeout)) {
				// not need continue.
				break
			}
			p.idles.Remove(e)
			p.release()
			p.mu.Unlock()
			ic.c.Close()
			p.mu.Lock()
		}
		p.mu.Unlock()
	}
}

// Get returns a item from the idles List or
// get a new item.
func (p *List) Get(ctx context.Context) (io.Closer, error) {
	p.mu.Lock()
	if p.closed {
		p.mu.Unlock()
		return nil, ErrPoolClosed
	}
	for {
		// get idles item.
		for i, n := 0, p.idles.Len(); i < n; i++ {
			e := p.idles.Front()
			if e == nil {
				break
			}
			ic := e.Value.(item)
			p.idles.Remove(e)
			p.mu.Unlock()
			if !ic.expired(time.Duration(p.conf.IdleTimeout)) {
				return ic.c, nil
			}
			ic.c.Close()
			p.mu.Lock()
			p.release()
		}
		// Check for pool closed before dialing a new item.
		if p.closed {
			p.mu.Unlock()
			return nil, ErrPoolClosed
		}
		// new item if under limit.
		if p.conf.Active == 0 || p.active < p.conf.Active {
			newItem := p.New
			p.active++
			p.mu.Unlock()
			c, err := newItem(ctx)
			if err != nil {
				p.mu.Lock()
				p.release()
				p.mu.Unlock()
				c = nil
			}
			return c, err
		}
		if p.conf.WaitTimeout == 0 && !p.conf.Wait {
			p.mu.Unlock()
			return nil, ErrPoolExhausted
		}
		wt := p.conf.WaitTimeout
		p.mu.Unlock()

		// slowpath: reset context timeout
		nctx := ctx
		cancel := func() {}
		if wt > 0 {
			_, nctx, cancel = wt.Shrink(ctx)
		}
		select {
		case <-nctx.Done():
			cancel()
			return nil, nctx.Err()
		case <-p.cond:
		}
		cancel()
		p.mu.Lock()
	}
}

// Put put item into pool.
func (p *List) Put(ctx context.Context, c io.Closer, forceClose bool) error {
	p.mu.Lock()
	if !p.closed && !forceClose {
		p.idles.PushFront(item{createdAt: nowFunc(), c: c})
		if p.idles.Len() > p.conf.Idle {
			c = p.idles.Remove(p.idles.Back()).(item).c
		} else {
			c = nil
		}
	}
	if c == nil {
		p.signal()
		p.mu.Unlock()
		return nil
	}
	p.release()
	p.mu.Unlock()
	return c.Close()
}

// Close releases the resources used by the pool.
func (p *List) Close() error {
	p.mu.Lock()
	idles := p.idles
	p.idles.Init()
	p.closed = true
	p.active -= idles.Len()
	p.mu.Unlock()
	for e := idles.Front(); e != nil; e = e.Next() {
		e.Value.(item).c.Close()
	}
	return nil
}

// release decrements the active count and signals waiters. The caller must
// hold p.mu during the call.
func (p *List) release() {
	p.active--
	p.signal()
}

func (p *List) signal() {
	select {
	default:
	case p.cond <- struct{}{}:
	}
}

测试用例

package pool

import (
	"context"
	"io"
	"testing"
	"time"

	xtime "common/library/time"

	"github.com/stretchr/testify/assert"
)

func TestListGetPut(t *testing.T) {
	// new pool
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	// test Get Put
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	c1 := connection{pool: pool, c: conn}
	c1.HandleNormal()
	c1.Close()
}

func TestListPut(t *testing.T) {
	var id = 0
	type connID struct {
		io.Closer
		id int
	}
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(1 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		id = id + 1
		return &connID{id: id, Closer: &closer{}}, nil
	}
	// test Put(ctx, conn, true)
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	conn1 := conn.(*connID)
	// Put(ctx, conn, true) drop the connection.
	pool.Put(context.TODO(), conn, true)
	conn, err = pool.Get(context.TODO())
	assert.Nil(t, err)
	conn2 := conn.(*connID)
	assert.NotEqual(t, conn1.id, conn2.id)
}

func TestListIdleTimeout(t *testing.T) {
	var id = 0
	type connID struct {
		io.Closer
		id int
	}
	config := &Config{
		Active: 1,
		Idle:   1,
		// conn timeout
		IdleTimeout: xtime.Duration(1 * time.Millisecond),
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		id = id + 1
		return &connID{id: id, Closer: &closer{}}, nil
	}
	// test Put(ctx, conn, true)
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	conn1 := conn.(*connID)
	// Put(ctx, conn, true) drop the connection.
	pool.Put(context.TODO(), conn, false)
	time.Sleep(5 * time.Millisecond)
	// idletimeout and get new conn
	conn, err = pool.Get(context.TODO())
	assert.Nil(t, err)
	conn2 := conn.(*connID)
	assert.NotEqual(t, conn1.id, conn2.id)
}

func TestListContextTimeout(t *testing.T) {
	// new pool
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}
	// test context timeout
	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
	defer cancel()
	conn, err := pool.Get(ctx)
	assert.Nil(t, err)
	_, err = pool.Get(ctx)
	// context timeout error
	assert.NotNil(t, err)
	pool.Put(context.TODO(), conn, false)
	_, err = pool.Get(ctx)
	assert.Nil(t, err)
}

func TestListPoolExhausted(t *testing.T) {
	// test pool exhausted
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(90 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	ctx, cancel := context.WithTimeout(context.TODO(), 100*time.Millisecond)
	defer cancel()
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	_, err = pool.Get(ctx)
	// config active == 1, so no avaliable conns make connection exhausted.
	assert.NotNil(t, err)
	pool.Put(context.TODO(), conn, false)
	_, err = pool.Get(ctx)
	assert.Nil(t, err)
}

func TestListStaleClean(t *testing.T) {
	var id = 0
	type connID struct {
		io.Closer
		id int
	}
	config := &Config{
		Active:      1,
		Idle:        1,
		IdleTimeout: xtime.Duration(1 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		id = id + 1
		return &connID{id: id, Closer: &closer{}}, nil
	}
	conn, err := pool.Get(context.TODO())
	assert.Nil(t, err)
	conn1 := conn.(*connID)
	pool.Put(context.TODO(), conn, false)
	conn, err = pool.Get(context.TODO())
	assert.Nil(t, err)
	conn2 := conn.(*connID)
	assert.Equal(t, conn1.id, conn2.id)
	pool.Put(context.TODO(), conn, false)
	// sleep more than idleTimeout
	time.Sleep(2 * time.Second)
	conn, err = pool.Get(context.TODO())
	assert.Nil(t, err)
	conn3 := conn.(*connID)
	assert.NotEqual(t, conn1.id, conn3.id)
}

func BenchmarkList1(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleQuick()
			c1.Close()
		}
	})
}

func BenchmarkList2(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleNormal()
			c1.Close()
		}
	})
}

func BenchmarkPool3(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait:        false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleSlow()
			c1.Close()
		}
	})
}

func BenchmarkList4(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: false,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleSlow()
			c1.Close()
		}
	})
}

func BenchmarkList5(b *testing.B) {
	config := &Config{
		Active:      30,
		Idle:        30,
		IdleTimeout: xtime.Duration(90 * time.Second),
		//		WaitTimeout: xtime.Duration(10 * time.Millisecond),
		Wait: true,
	}
	pool := NewList(config)
	pool.New = func(ctx context.Context) (io.Closer, error) {
		return &closer{}, nil
	}

	b.ResetTimer()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			conn, err := pool.Get(context.TODO())
			if err != nil {
				b.Error(err)
				continue
			}
			c1 := connection{pool: pool, c: conn}
			c1.HandleSlow()
			c1.Close()
		}
	})
}


标题:代码积累 --通用连接池实现-pool
作者:疲惫的怪神明