Source File
pool.go
Belonging Package
github.com/go-redis/redis/v8/internal/pool
type Stats struct {
Hits uint32 // number of times free connection was found in the pool
Misses uint32 // number of times free connection was NOT found in the pool
Timeouts uint32 // number of times a wait timeout occurred
TotalConns uint32 // number of total connections in the pool
IdleConns uint32 // number of idle connections in the pool
StaleConns uint32 // number of stale connections removed from the pool
}
type Pooler interface {
NewConn(context.Context) (*Conn, error)
CloseConn(*Conn) error
Get(context.Context) (*Conn, error)
Put(context.Context, *Conn)
Remove(context.Context, *Conn, error)
Len() int
IdleLen() int
Stats() *Stats
Close() error
}
type Options struct {
Dialer func(context.Context) (net.Conn, error)
OnClose func(*Conn) error
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}
type lastDialErrorWrap struct {
err error
}
type ConnPool struct {
opt *Options
dialErrorsNum uint32 // atomic
lastDialError atomic.Value
queue chan struct{}
connsMu sync.Mutex
conns []*Conn
idleConns []*Conn
poolSize int
idleConnsLen int
stats Stats
_closed uint32 // atomic
closedCh chan struct{}
}
var _ Pooler = (*ConnPool)(nil)
func ( *Options) *ConnPool {
:= &ConnPool{
opt: ,
queue: make(chan struct{}, .PoolSize),
conns: make([]*Conn, 0, .PoolSize),
idleConns: make([]*Conn, 0, .PoolSize),
closedCh: make(chan struct{}),
}
.connsMu.Lock()
.checkMinIdleConns()
.connsMu.Unlock()
if .IdleTimeout > 0 && .IdleCheckFrequency > 0 {
go .reaper(.IdleCheckFrequency)
}
return
}
func ( *ConnPool) () {
if .opt.MinIdleConns == 0 {
return
}
for .poolSize < .opt.PoolSize && .idleConnsLen < .opt.MinIdleConns {
.poolSize++
.idleConnsLen++
go func() {
:= .addIdleConn()
if != nil {
.connsMu.Lock()
.poolSize--
.idleConnsLen--
.connsMu.Unlock()
}
}()
}
}
func ( *ConnPool) () error {
, := .dialConn(context.TODO(), true)
if != nil {
return
}
.connsMu.Lock()
.conns = append(.conns, )
.idleConns = append(.idleConns, )
.connsMu.Unlock()
return nil
}
func ( *ConnPool) ( context.Context) (*Conn, error) {
return .newConn(, false)
}
func ( *ConnPool) ( context.Context, bool) (*Conn, error) {
, := .dialConn(, )
if != nil {
return nil,
}
.connsMu.Lock()
.conns = append(.conns, )
if .poolSize >= .opt.PoolSize {
.pooled = false
} else {
.poolSize++
}
}
.connsMu.Unlock()
return , nil
}
func ( *ConnPool) ( context.Context, bool) (*Conn, error) {
if .closed() {
return nil, ErrClosed
}
if atomic.LoadUint32(&.dialErrorsNum) >= uint32(.opt.PoolSize) {
return nil, .getLastDialError()
}
, := .opt.Dialer()
if != nil {
.setLastDialError()
if atomic.AddUint32(&.dialErrorsNum, 1) == uint32(.opt.PoolSize) {
go .tryDial()
}
return nil,
}
internal.NewConnectionsCounter.Add(, 1)
:= NewConn()
.pooled =
return , nil
}
func ( *ConnPool) () {
for {
if .closed() {
return
}
, := .opt.Dialer(context.Background())
if != nil {
.setLastDialError()
time.Sleep(time.Second)
continue
}
atomic.StoreUint32(&.dialErrorsNum, 0)
_ = .Close()
return
}
}
func ( *ConnPool) ( error) {
.lastDialError.Store(&lastDialErrorWrap{err: })
}
func ( *ConnPool) () error {
, := .lastDialError.Load().(*lastDialErrorWrap)
if != nil {
return .err
}
return nil
}
func ( *ConnPool) ( context.Context) (*Conn, error) {
if .closed() {
return nil, ErrClosed
}
:= .waitTurn()
if != nil {
return nil,
}
for {
.connsMu.Lock()
:= .popIdle()
.connsMu.Unlock()
if == nil {
break
}
if .isStaleConn() {
_ = .CloseConn()
continue
}
atomic.AddUint32(&.stats.Hits, 1)
return , nil
}
atomic.AddUint32(&.stats.Misses, 1)
, := .newConn(, true)
if != nil {
.freeTurn()
return nil,
}
return , nil
}
func ( *ConnPool) () {
.queue <- struct{}{}
}
func ( *ConnPool) ( context.Context) error {
select {
case <-.Done():
return .Err()
default:
}
select {
case .queue <- struct{}{}:
return nil
default:
}
:= timers.Get().(*time.Timer)
.Reset(.opt.PoolTimeout)
select {
case <-.Done():
if !.Stop() {
<-.C
}
timers.Put()
return .Err()
case .queue <- struct{}{}:
if !.Stop() {
<-.C
}
timers.Put()
return nil
case <-.C:
timers.Put()
atomic.AddUint32(&.stats.Timeouts, 1)
return ErrPoolTimeout
}
}
func ( *ConnPool) () {
<-.queue
}
func ( *ConnPool) () *Conn {
if len(.idleConns) == 0 {
return nil
}
:= len(.idleConns) - 1
:= .idleConns[]
.idleConns = .idleConns[:]
.idleConnsLen--
.checkMinIdleConns()
return
}
func ( *ConnPool) ( context.Context, *Conn) {
if .rd.Buffered() > 0 {
internal.Logger.Printf(, "Conn has unread data")
.Remove(, , BadConnError{})
return
}
if !.pooled {
.Remove(, , nil)
return
}
.connsMu.Lock()
.idleConns = append(.idleConns, )
.idleConnsLen++
.connsMu.Unlock()
.freeTurn()
}
func ( *ConnPool) ( context.Context, *Conn, error) {
.removeConnWithLock()
.freeTurn()
_ = .closeConn()
}
func ( *ConnPool) ( *Conn) error {
.removeConnWithLock()
return .closeConn()
}
func ( *ConnPool) ( *Conn) {
.connsMu.Lock()
.removeConn()
.connsMu.Unlock()
}
func ( *ConnPool) ( *Conn) {
for , := range .conns {
if == {
.conns = append(.conns[:], .conns[+1:]...)
if .pooled {
.poolSize--
.checkMinIdleConns()
}
return
}
}
}
func ( *ConnPool) ( *Conn) error {
if .opt.OnClose != nil {
_ = .opt.OnClose()
}
return .Close()
}
func ( *ConnPool) () int {
.connsMu.Lock()
:= .idleConnsLen
.connsMu.Unlock()
return
}
func ( *ConnPool) () *Stats {
:= .IdleLen()
return &Stats{
Hits: atomic.LoadUint32(&.stats.Hits),
Misses: atomic.LoadUint32(&.stats.Misses),
Timeouts: atomic.LoadUint32(&.stats.Timeouts),
TotalConns: uint32(.Len()),
IdleConns: uint32(),
StaleConns: atomic.LoadUint32(&.stats.StaleConns),
}
}
func ( *ConnPool) () bool {
return atomic.LoadUint32(&._closed) == 1
}
func ( *ConnPool) ( func(*Conn) bool) error {
.connsMu.Lock()
defer .connsMu.Unlock()
var error
for , := range .conns {
if () {
if := .closeConn(); != nil && == nil {
=
}
}
}
return
}
func ( *ConnPool) () error {
if !atomic.CompareAndSwapUint32(&._closed, 0, 1) {
return ErrClosed
}
close(.closedCh)
var error
.connsMu.Lock()
for , := range .conns {
if := .closeConn(); != nil && == nil {
=
}
}
.conns = nil
.poolSize = 0
.idleConns = nil
.idleConnsLen = 0
.connsMu.Unlock()
return
}
func ( *ConnPool) ( time.Duration) {
:= time.NewTicker()
defer .Stop()
for {
select {
if .closed() {
return
}
, := .ReapStaleConns()
if != nil {
internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", )
continue
}
case <-.closedCh:
return
}
}
}
func ( *ConnPool) () (int, error) {
var int
for {
.getTurn()
.connsMu.Lock()
:= .reapStaleConn()
.connsMu.Unlock()
.freeTurn()
if != nil {
_ = .closeConn()
++
} else {
break
}
}
atomic.AddUint32(&.stats.StaleConns, uint32())
return , nil
}
func ( *ConnPool) () *Conn {
if len(.idleConns) == 0 {
return nil
}
:= .idleConns[0]
if !.isStaleConn() {
return nil
}
.idleConns = append(.idleConns[:0], .idleConns[1:]...)
.idleConnsLen--
.removeConn()
return
}
func ( *ConnPool) ( *Conn) bool {
if .opt.IdleTimeout == 0 && .opt.MaxConnAge == 0 {
return false
}
:= time.Now()
if .opt.IdleTimeout > 0 && .Sub(.UsedAt()) >= .opt.IdleTimeout {
return true
}
if .opt.MaxConnAge > 0 && .Sub(.createdAt) >= .opt.MaxConnAge {
return true
}
return false
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |