Source File
redis.go
Belonging Package
github.com/go-redis/redis/v8
package redis
import (
)
type Hook interface {
BeforeProcess(ctx context.Context, cmd Cmder) (context.Context, error)
AfterProcess(ctx context.Context, cmd Cmder) error
BeforeProcessPipeline(ctx context.Context, cmds []Cmder) (context.Context, error)
AfterProcessPipeline(ctx context.Context, cmds []Cmder) error
}
type hooks struct {
hooks []Hook
}
func ( *hooks) () {
.hooks = .hooks[:len(.hooks):len(.hooks)]
}
func ( hooks) () hooks {
:=
.lock()
return
}
func ( *hooks) ( Hook) {
.hooks = append(.hooks, )
}
func ( hooks) (
context.Context, Cmder, func(context.Context, Cmder) error,
) error {
if len(.hooks) == 0 {
return (, )
}
var int
var error
for ; < len(.hooks) && == nil; ++ {
, = .hooks[].BeforeProcess(, )
if != nil {
.SetErr()
}
}
if == nil {
= (, )
}
for --; >= 0; -- {
if := .hooks[].AfterProcess(, ); != nil {
=
.SetErr()
}
}
return
}
func ( hooks) (
context.Context, []Cmder, func(context.Context, []Cmder) error,
) error {
if len(.hooks) == 0 {
return (, )
}
var int
var error
for ; < len(.hooks) && == nil; ++ {
, = .hooks[].BeforeProcessPipeline(, )
if != nil {
setCmdsErr(, )
}
}
if == nil {
= (, )
}
for --; >= 0; -- {
if := .hooks[].AfterProcessPipeline(, ); != nil {
=
setCmdsErr(, )
}
}
return
}
func ( hooks) (
context.Context, []Cmder, func(context.Context, []Cmder) error,
) error {
= wrapMultiExec(, )
return .processPipeline(, , )
}
type baseClient struct {
opt *Options
connPool pool.Pooler
onClose func() error // hook called when client is closed
}
func ( *Options, pool.Pooler) *baseClient {
return &baseClient{
opt: ,
connPool: ,
}
}
func ( *baseClient) () *baseClient {
:= *
return &
}
func ( *baseClient) ( time.Duration) *baseClient {
:= .opt.clone()
.ReadTimeout =
.WriteTimeout =
:= .clone()
.opt =
return
}
func ( *baseClient) () string {
return fmt.Sprintf("Redis<%s db:%d>", .getAddr(), .opt.DB)
}
func ( *baseClient) ( context.Context) (*pool.Conn, error) {
, := .connPool.NewConn()
if != nil {
return nil,
}
= .initConn(, )
if != nil {
_ = .connPool.CloseConn()
return nil,
}
return , nil
}
func ( *baseClient) ( context.Context) (*pool.Conn, error) {
if .opt.Limiter != nil {
:= .opt.Limiter.Allow()
if != nil {
return nil,
}
}
, := ._getConn()
if != nil {
if .opt.Limiter != nil {
.opt.Limiter.ReportResult()
}
return nil,
}
return , nil
}
func ( *baseClient) ( context.Context) (*pool.Conn, error) {
, := .connPool.Get()
if != nil {
return nil,
}
if .Inited {
return , nil
}
= internal.WithSpan(, "init_conn", func( context.Context) error {
return .initConn(, )
})
if != nil {
.connPool.Remove(, , )
if := internal.Unwrap(); != nil {
return nil,
}
return nil,
}
return , nil
}
func ( *baseClient) ( context.Context, *pool.Conn) error {
if .Inited {
return nil
}
.Inited = true
if .opt.Password == "" &&
.opt.DB == 0 &&
!.opt.readOnly &&
.opt.OnConnect == nil {
return nil
}
:= pool.NewSingleConnPool(.connPool, )
:= newConn(, .opt, )
, := .Pipelined(, func( Pipeliner) error {
if .opt.Password != "" {
if .opt.Username != "" {
.AuthACL(, .opt.Username, .opt.Password)
} else {
.Auth(, .opt.Password)
}
}
if .opt.DB > 0 {
.Select(, .opt.DB)
}
if .opt.readOnly {
.ReadOnly()
}
return nil
})
if != nil {
return
}
if .opt.OnConnect != nil {
return .opt.OnConnect(, )
}
return nil
}
func ( *baseClient) ( context.Context, *pool.Conn, error) {
if .opt.Limiter != nil {
.opt.Limiter.ReportResult()
}
if isBadConn(, false) {
.connPool.Remove(, , )
} else {
.connPool.Put(, )
}
}
func ( *baseClient) (
context.Context, func(context.Context, *pool.Conn) error,
) error {
return internal.WithSpan(, "with_conn", func( context.Context) error {
, := .getConn()
if != nil {
return
}
defer func() {
.releaseConn(, , )
}()
= (, )
return
})
}
func ( *baseClient) ( context.Context, Cmder) error {
:= ._process(, )
if != nil {
.SetErr()
return
}
return nil
}
func ( *baseClient) ( context.Context, Cmder) error {
var error
for := 0; <= .opt.MaxRetries; ++ {
:=
var bool
:= internal.WithSpan(, "process", func( context.Context) error {
if > 0 {
if := internal.Sleep(, .retryBackoff()); != nil {
return
}
}
:= true
:= .withConn(, func( context.Context, *pool.Conn) error {
:= .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
return writeCmd(, )
})
if != nil {
return
}
= .WithReader(, .cmdTimeout(), .readReply)
if != nil {
= .readTimeout() == nil
return
}
return nil
})
if == nil {
return nil
}
= shouldRetry(, )
return
})
if == nil || ! {
return
}
=
}
return
}
func ( *baseClient) ( int) time.Duration {
return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}
func ( *baseClient) ( Cmder) time.Duration {
if := .readTimeout(); != nil {
:= *
if == 0 {
return 0
}
return + 10*time.Second
}
return .opt.ReadTimeout
}
func ( *baseClient) () error {
var error
if .onClose != nil {
if := .onClose(); != nil {
=
}
}
if := .connPool.Close(); != nil && == nil {
=
}
return
}
func ( *baseClient) () string {
return .opt.Addr
}
func ( *baseClient) ( context.Context, []Cmder) error {
return .generalProcessPipeline(, , .pipelineProcessCmds)
}
func ( *baseClient) ( context.Context, []Cmder) error {
return .generalProcessPipeline(, , .txPipelineProcessCmds)
}
type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
func ( *baseClient) (
context.Context, []Cmder, pipelineProcessor,
) error {
:= ._generalProcessPipeline(, , )
if != nil {
setCmdsErr(, )
return
}
return cmdsFirstErr()
}
func ( *baseClient) (
context.Context, []Cmder, pipelineProcessor,
) error {
var error
for := 0; <= .opt.MaxRetries; ++ {
if > 0 {
if := internal.Sleep(, .retryBackoff()); != nil {
return
}
}
var bool
= .withConn(, func( context.Context, *pool.Conn) error {
var error
, = (, , )
return
})
if == nil || ! || !shouldRetry(, true) {
return
}
}
return
}
func ( *baseClient) (
context.Context, *pool.Conn, []Cmder,
) (bool, error) {
:= .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
return writeCmds(, )
})
if != nil {
return true,
}
= .WithReader(, .opt.ReadTimeout, func( *proto.Reader) error {
return pipelineReadCmds(, )
})
return true,
}
func ( *proto.Reader, []Cmder) error {
for , := range {
:= .readReply()
if != nil && !isRedisError() {
return
}
}
return nil
}
func ( *baseClient) (
context.Context, *pool.Conn, []Cmder,
) (bool, error) {
:= .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
return writeCmds(, )
})
if != nil {
return true,
}
= .WithReader(, .opt.ReadTimeout, func( *proto.Reader) error {
= [1 : len()-1]
:= txPipelineReadQueued(, , )
if != nil {
return
}
return pipelineReadCmds(, )
})
return false,
}
func ( context.Context, []Cmder) []Cmder {
if len() == 0 {
panic("not reached")
}
= append(, make([]Cmder, 2)...)
copy([1:], [:len()-2])
[0] = NewStatusCmd(, "multi")
[len()-1] = NewSliceCmd(, "exec")
return
}
if := .readReply(); != nil {
return
}
for range {
if := .readReply(); != nil && !isRedisError() {
return
}
}
, := .ReadLine()
if != nil {
if == Nil {
= TxFailedErr
}
return
}
switch [0] {
case proto.ErrorReply:
return proto.ParseErrorReply()
func ( *Options) *Client {
.init()
:= Client{
baseClient: newBaseClient(, newConnPool()),
ctx: context.Background(),
}
.cmdable = .Process
return &
}
func ( *Client) () *Client {
:= *
.cmdable = .Process
.hooks.lock()
return &
}
func ( *Client) ( time.Duration) *Client {
:= .clone()
.baseClient = .baseClient.withTimeout()
return
}
func ( *Client) () context.Context {
return .ctx
}
func ( *Client) ( context.Context) *Client {
if == nil {
panic("nil context")
}
:= .clone()
.ctx =
return
}
func ( *Client) ( context.Context) *Conn {
return newConn(, .opt, pool.NewStickyConnPool(.connPool))
}
func ( *Client) ( context.Context, ...interface{}) *Cmd {
:= NewCmd(, ...)
_ = .Process(, )
return
}
func ( *Client) ( context.Context, Cmder) error {
return .hooks.process(, , .baseClient.process)
}
func ( *Client) ( context.Context, []Cmder) error {
return .hooks.processPipeline(, , .baseClient.processPipeline)
}
func ( *Client) ( context.Context, []Cmder) error {
return .hooks.processTxPipeline(, , .baseClient.processTxPipeline)
}
func ( *Client) () *PoolStats {
:= .connPool.Stats()
return (*PoolStats)()
}
func ( *Client) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {
return .Pipeline().Pipelined(, )
}
func ( *Client) () Pipeliner {
:= Pipeline{
ctx: .ctx,
exec: .processPipeline,
}
.init()
return &
}
func ( *Client) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {
return .TxPipeline().Pipelined(, )
}
type conn struct {
baseClient
cmdable
statefulCmdable
}
type Conn struct {
*conn
ctx context.Context
}
func ( context.Context, *Options, pool.Pooler) *Conn {
:= Conn{
conn: &conn{
baseClient: baseClient{
opt: ,
connPool: ,
},
},
ctx: ,
}
.cmdable = .Process
.statefulCmdable = .Process
return &
}
func ( *Conn) ( context.Context, Cmder) error {
return .baseClient.process(, )
}
func ( *Conn) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {
return .Pipeline().Pipelined(, )
}
func ( *Conn) () Pipeliner {
:= Pipeline{
ctx: .ctx,
exec: .processPipeline,
}
.init()
return &
}
func ( *Conn) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {
return .TxPipeline().Pipelined(, )
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |