Source File
cluster.go
Belonging Package
github.com/go-redis/redis/v8
package redis
import (
)
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
ClusterSlots func(context.Context) ([]ClusterSlot, error)
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error
Username string
Password string
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
TLSConfig *tls.Config
}
func ( *ClusterOptions) () {
if .MaxRedirects == -1 {
.MaxRedirects = 0
} else if .MaxRedirects == 0 {
.MaxRedirects = 3
}
if (.RouteByLatency || .RouteRandomly) && .ClusterSlots == nil {
.ReadOnly = true
}
if .PoolSize == 0 {
.PoolSize = 5 * runtime.NumCPU()
}
switch .ReadTimeout {
case -1:
.ReadTimeout = 0
case 0:
.ReadTimeout = 3 * time.Second
}
switch .WriteTimeout {
case -1:
.WriteTimeout = 0
case 0:
.WriteTimeout = .ReadTimeout
}
if .MaxRetries == 0 {
.MaxRetries = -1
}
switch .MinRetryBackoff {
case -1:
.MinRetryBackoff = 0
case 0:
.MinRetryBackoff = 8 * time.Millisecond
}
switch .MaxRetryBackoff {
case -1:
.MaxRetryBackoff = 0
case 0:
.MaxRetryBackoff = 512 * time.Millisecond
}
if .NewClient == nil {
.NewClient = NewClient
}
}
func ( *ClusterOptions) () *Options {
const = -1
return &Options{
Dialer: .Dialer,
OnConnect: .OnConnect,
Username: .Username,
Password: .Password,
MaxRetries: .MaxRetries,
MinRetryBackoff: .MinRetryBackoff,
MaxRetryBackoff: .MaxRetryBackoff,
DialTimeout: .DialTimeout,
ReadTimeout: .ReadTimeout,
WriteTimeout: .WriteTimeout,
PoolSize: .PoolSize,
MinIdleConns: .MinIdleConns,
MaxConnAge: .MaxConnAge,
PoolTimeout: .PoolTimeout,
IdleTimeout: .IdleTimeout,
IdleCheckFrequency: ,
readOnly: .ReadOnly,
TLSConfig: .TLSConfig,
}
}
type clusterNode struct {
Client *Client
latency uint32 // atomic
generation uint32 // atomic
failing uint32 // atomic
}
func ( *ClusterOptions, string) *clusterNode {
:= .clientOptions()
.Addr =
:= clusterNode{
Client: .NewClient(),
}
.latency = math.MaxUint32
if .RouteByLatency {
go .updateLatency()
}
return &
}
func ( *clusterNode) () string {
return .Client.String()
}
func ( *clusterNode) () error {
return .Client.Close()
}
func ( *clusterNode) () {
const = 10
var uint64
for := 0; < ; ++ {
time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
:= time.Now()
.Client.Ping(context.TODO())
+= uint64(time.Since() / time.Microsecond)
}
:= float64() / float64()
atomic.StoreUint32(&.latency, uint32(+0.5))
}
func ( *clusterNode) () time.Duration {
:= atomic.LoadUint32(&.latency)
return time.Duration() * time.Microsecond
}
func ( *clusterNode) () {
atomic.StoreUint32(&.failing, uint32(time.Now().Unix()))
}
func ( *clusterNode) () bool {
const = 15 // 15 seconds
:= atomic.LoadUint32(&.failing)
if == 0 {
return false
}
if time.Now().Unix()-int64() < {
return true
}
atomic.StoreUint32(&.failing, 0)
return false
}
func ( *clusterNode) () uint32 {
return atomic.LoadUint32(&.generation)
}
func ( *clusterNode) ( uint32) {
for {
:= atomic.LoadUint32(&.generation)
if < || atomic.CompareAndSwapUint32(&.generation, , ) {
break
}
}
}
type clusterNodes struct {
opt *ClusterOptions
mu sync.RWMutex
addrs []string
nodes map[string]*clusterNode
activeAddrs []string
closed bool
_generation uint32 // atomic
}
func ( *ClusterOptions) *clusterNodes {
return &clusterNodes{
opt: ,
addrs: .Addrs,
nodes: make(map[string]*clusterNode),
}
}
func ( *clusterNodes) () error {
.mu.Lock()
defer .mu.Unlock()
if .closed {
return nil
}
.closed = true
var error
for , := range .nodes {
if := .Client.Close(); != nil && == nil {
=
}
}
.nodes = nil
.activeAddrs = nil
return
}
func ( *clusterNodes) () ([]string, error) {
var []string
.mu.RLock()
:= .closed
if ! {
if len(.activeAddrs) > 0 {
= .activeAddrs
} else {
= .addrs
}
}
.mu.RUnlock()
if {
return nil, pool.ErrClosed
}
if len() == 0 {
return nil, errClusterNoNodes
}
return , nil
}
func ( *clusterNodes) () uint32 {
return atomic.AddUint32(&._generation, 1)
}
var []*clusterNode
.mu.Lock()
.activeAddrs = .activeAddrs[:0]
for , := range .nodes {
if .Generation() >= {
.activeAddrs = append(.activeAddrs, )
if .opt.RouteByLatency {
go .updateLatency()
}
continue
}
delete(.nodes, )
= append(, )
}
.mu.Unlock()
for , := range {
_ = .Client.Close()
}
}
func ( *clusterNodes) ( string) (*clusterNode, error) {
, := .get()
if != nil {
return nil,
}
if != nil {
return , nil
}
.mu.Lock()
defer .mu.Unlock()
if .closed {
return nil, pool.ErrClosed
}
, := .nodes[]
if {
return , nil
}
= newClusterNode(.opt, )
.addrs = appendIfNotExists(.addrs, )
.nodes[] =
return , nil
}
func ( *clusterNodes) ( string) (*clusterNode, error) {
var *clusterNode
var error
.mu.RLock()
if .closed {
= pool.ErrClosed
} else {
= .nodes[]
}
.mu.RUnlock()
return ,
}
func ( *clusterNodes) () ([]*clusterNode, error) {
.mu.RLock()
defer .mu.RUnlock()
if .closed {
return nil, pool.ErrClosed
}
:= make([]*clusterNode, 0, len(.nodes))
for , := range .nodes {
= append(, )
}
return , nil
}
func ( *clusterNodes) () (*clusterNode, error) {
, := .Addrs()
if != nil {
return nil,
}
:= rand.Intn(len())
return .Get([])
}
type clusterSlot struct {
start, end int
nodes []*clusterNode
}
type clusterSlotSlice []*clusterSlot
func ( clusterSlotSlice) () int {
return len()
}
func ( clusterSlotSlice) (, int) bool {
return [].start < [].start
}
func ( clusterSlotSlice) (, int) {
[], [] = [], []
}
type clusterState struct {
nodes *clusterNodes
Masters []*clusterNode
Slaves []*clusterNode
slots []*clusterSlot
generation uint32
createdAt time.Time
}
func (
*clusterNodes, []ClusterSlot, string,
) (*clusterState, error) {
:= clusterState{
nodes: ,
slots: make([]*clusterSlot, 0, len()),
generation: .NextGeneration(),
createdAt: time.Now(),
}
, , := net.SplitHostPort()
:= isLoopback()
for , := range {
var []*clusterNode
for , := range .Nodes {
:= .Addr
if ! {
= replaceLoopbackHost(, )
}
, := .nodes.Get()
if != nil {
return nil,
}
.SetGeneration(.generation)
= append(, )
if == 0 {
.Masters = appendUniqueNode(.Masters, )
} else {
.Slaves = appendUniqueNode(.Slaves, )
}
}
.slots = append(.slots, &clusterSlot{
start: .Start,
end: .End,
nodes: ,
})
}
sort.Sort(clusterSlotSlice(.slots))
time.AfterFunc(time.Minute, func() {
.GC(.generation)
})
return &, nil
}
func (, string) string {
, , := net.SplitHostPort()
if != nil {
return
}
:= net.ParseIP()
if == nil {
return
}
if !.IsLoopback() {
return
}
return net.JoinHostPort(, )
}
func ( string) bool {
:= net.ParseIP()
if == nil {
return true
}
return .IsLoopback()
}
func ( *clusterState) ( int) (*clusterNode, error) {
:= .slotNodes()
if len() > 0 {
return [0], nil
}
return .nodes.Random()
}
func ( *clusterState) ( int) (*clusterNode, error) {
:= .slotNodes()
switch len() {
case 0:
return .nodes.Random()
case 1:
return [0], nil
case 2:
if := [1]; !.Failing() {
return , nil
}
return [0], nil
default:
var *clusterNode
for := 0; < 10; ++ {
:= rand.Intn(len()-1) + 1
= []
if !.Failing() {
return , nil
}
}
return .nodes.Random()
}
func ( *clusterState) ( int) (*clusterNode, error) {
:= .slotNodes()
if len() == 0 {
return .nodes.Random()
}
:= rand.Intn(len())
return [], nil
}
func ( *clusterState) ( int) []*clusterNode {
:= sort.Search(len(.slots), func( int) bool {
return .slots[].end >=
})
if >= len(.slots) {
return nil
}
:= .slots[]
if >= .start && <= .end {
return .nodes
}
return nil
}
type clusterStateHolder struct {
load func(ctx context.Context) (*clusterState, error)
state atomic.Value
reloading uint32 // atomic
}
func ( func( context.Context) (*clusterState, error)) *clusterStateHolder {
return &clusterStateHolder{
load: ,
}
}
func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
, := .load()
if != nil {
return nil,
}
.state.Store()
return , nil
}
func ( *clusterStateHolder) ( context.Context) {
if !atomic.CompareAndSwapUint32(&.reloading, 0, 1) {
return
}
go func() {
defer atomic.StoreUint32(&.reloading, 0)
, := .Reload()
if != nil {
return
}
time.Sleep(200 * time.Millisecond)
}()
}
func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
:= .state.Load()
if != nil {
:= .(*clusterState)
if time.Since(.createdAt) > 10*time.Second {
.LazyReload()
}
return , nil
}
return .Reload()
}
func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
, := .Reload()
if == nil {
return , nil
}
return .Get()
}
type clusterClient struct {
opt *ClusterOptions
nodes *clusterNodes
state *clusterStateHolder //nolint:structcheck
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
}
type ClusterClient struct {
*clusterClient
cmdable
hooks
ctx context.Context
}
func ( *ClusterOptions) *ClusterClient {
.init()
:= &ClusterClient{
clusterClient: &clusterClient{
opt: ,
nodes: newClusterNodes(),
},
ctx: context.Background(),
}
.state = newClusterStateHolder(.loadState)
.cmdsInfoCache = newCmdsInfoCache(.cmdsInfo)
.cmdable = .Process
if .IdleCheckFrequency > 0 {
go .reaper(.IdleCheckFrequency)
}
return
}
func ( *ClusterClient) () context.Context {
return .ctx
}
func ( *ClusterClient) ( context.Context) *ClusterClient {
if == nil {
panic("nil context")
}
:= *
.cmdable = .Process
.hooks.lock()
.ctx =
return &
}
func ( *ClusterClient) () *ClusterOptions {
return .opt
}
func ( *ClusterClient) ( context.Context) {
.state.LazyReload()
}
func ( *ClusterClient) () error {
return .nodes.Close()
}
func ( *ClusterClient) ( context.Context, ...interface{}) *Cmd {
:= NewCmd(, ...)
_ = .Process(, )
return
}
func ( *ClusterClient) ( context.Context, Cmder) error {
return .hooks.process(, , .process)
}
func ( *ClusterClient) ( context.Context, Cmder) error {
:= ._process(, )
if != nil {
.SetErr()
return
}
return nil
}
func ( *ClusterClient) ( context.Context, Cmder) error {
:= .cmdInfo(.Name())
:= .cmdSlot()
var *clusterNode
var bool
var error
for := 0; <= .opt.MaxRedirects; ++ {
if > 0 {
if := internal.Sleep(, .retryBackoff()); != nil {
return
}
}
if == nil {
var error
, = .cmdNode(, , )
if != nil {
return
}
}
if {
:= .Client.Pipeline()
_ = .Process(, NewCmd(, "asking"))
_ = .Process(, )
_, = .Exec()
_ = .Close()
= false
} else {
= .Client.Process(, )
}
if == nil {
return nil
}
if := isReadOnlyError(); || == pool.ErrClosed {
if {
.state.LazyReload()
}
= nil
continue
}
if .opt.ReadOnly && isLoadingError() {
.MarkAsFailing()
= nil
continue
}
var bool
var string
, , = isMovedError()
if || {
var error
, = .nodes.Get()
if != nil {
return
}
continue
}
if == 0 {
continue
}
.MarkAsFailing()
= nil
continue
}
return
}
return
}
func ( *ClusterClient) (
context.Context,
func( context.Context, *Client) error,
) error {
, := .state.ReloadOrGet()
if != nil {
return
}
var sync.WaitGroup
:= make(chan error, 1)
for , := range .Masters {
.Add(1)
go func( *clusterNode) {
defer .Done()
:= (, .Client)
if != nil {
select {
case <- :
default:
}
}
}()
}
.Wait()
select {
case := <-:
return
default:
return nil
}
}
func ( *ClusterClient) (
context.Context,
func( context.Context, *Client) error,
) error {
, := .state.ReloadOrGet()
if != nil {
return
}
var sync.WaitGroup
:= make(chan error, 1)
for , := range .Slaves {
.Add(1)
go func( *clusterNode) {
defer .Done()
:= (, .Client)
if != nil {
select {
case <- :
default:
}
}
}()
}
.Wait()
select {
case := <-:
return
default:
return nil
}
}
func ( *ClusterClient) (
context.Context,
func( context.Context, *Client) error,
) error {
, := .state.ReloadOrGet()
if != nil {
return
}
var sync.WaitGroup
:= make(chan error, 1)
:= func( *clusterNode) {
defer .Done()
:= (, .Client)
if != nil {
select {
case <- :
default:
}
}
}
for , := range .Masters {
.Add(1)
go ()
}
for , := range .Slaves {
.Add(1)
go ()
}
.Wait()
select {
case := <-:
return
default:
return nil
}
}
func ( *ClusterClient) () *PoolStats {
var PoolStats
, := .state.Get(context.TODO())
if == nil {
return &
}
for , := range .Masters {
:= .Client.connPool.Stats()
.Hits += .Hits
.Misses += .Misses
.Timeouts += .Timeouts
.TotalConns += .TotalConns
.IdleConns += .IdleConns
.StaleConns += .StaleConns
}
for , := range .Slaves {
:= .Client.connPool.Stats()
.Hits += .Hits
.Misses += .Misses
.Timeouts += .Timeouts
.TotalConns += .TotalConns
.IdleConns += .IdleConns
.StaleConns += .StaleConns
}
return &
}
func ( *ClusterClient) ( context.Context) (*clusterState, error) {
if .opt.ClusterSlots != nil {
, := .opt.ClusterSlots()
if != nil {
return nil,
}
return newClusterState(.nodes, , "")
}
, := .nodes.Addrs()
if != nil {
return nil,
}
var error
for , := range rand.Perm(len()) {
:= []
, := .nodes.Get()
if != nil {
if == nil {
=
}
continue
}
, := .Client.ClusterSlots().Result()
if != nil {
if == nil {
=
}
continue
}
return newClusterState(.nodes, , .Client.opt.Addr)
}
func ( *ClusterClient) ( time.Duration) {
:= time.NewTicker()
defer .Stop()
for range .C {
, := .nodes.All()
if != nil {
break
}
for , := range {
, := .Client.connPool.(*pool.ConnPool).ReapStaleConns()
if != nil {
internal.Logger.Printf(.Context(), "ReapStaleConns failed: %s", )
}
}
}
}
func ( *ClusterClient) () Pipeliner {
:= Pipeline{
ctx: .ctx,
exec: .processPipeline,
}
.init()
return &
}
func ( *ClusterClient) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {
return .Pipeline().Pipelined(, )
}
func ( *ClusterClient) ( context.Context, []Cmder) error {
return .hooks.processPipeline(, , ._processPipeline)
}
func ( *ClusterClient) ( context.Context, []Cmder) error {
:= newCmdsMap()
:= .mapCmdsByNode(, , )
if != nil {
setCmdsErr(, )
return
}
for := 0; <= .opt.MaxRedirects; ++ {
if > 0 {
if := internal.Sleep(, .retryBackoff()); != nil {
setCmdsErr(, )
return
}
}
:= newCmdsMap()
var sync.WaitGroup
for , := range .m {
.Add(1)
go func( *clusterNode, []Cmder) {
defer .Done()
:= ._processPipelineNode(, , , )
if == nil {
return
}
if < .opt.MaxRedirects {
if := .mapCmdsByNode(, , ); != nil {
setCmdsErr(, )
}
} else {
setCmdsErr(, )
}
}(, )
}
.Wait()
if len(.m) == 0 {
break
}
=
}
return cmdsFirstErr()
}
func ( *ClusterClient) ( context.Context, *cmdsMap, []Cmder) error {
, := .state.Get()
if != nil {
return
}
if .opt.ReadOnly && .cmdsAreReadOnly() {
for , := range {
:= .cmdSlot()
, := .slotReadOnlyNode(, )
if != nil {
return
}
.Add(, )
}
return nil
}
for , := range {
:= .cmdSlot()
, := .slotMasterNode()
if != nil {
return
}
.Add(, )
}
return nil
}
func ( *ClusterClient) ( []Cmder) bool {
for , := range {
:= .cmdInfo(.Name())
if == nil || !.ReadOnly {
return false
}
}
return true
}
func ( *ClusterClient) (
context.Context, *clusterNode, []Cmder, *cmdsMap,
) error {
return .Client.hooks.processPipeline(, , func( context.Context, []Cmder) error {
return .Client.withConn(, func( context.Context, *pool.Conn) error {
:= .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
return writeCmds(, )
})
if != nil {
return
}
return .WithReader(, .opt.ReadTimeout, func( *proto.Reader) error {
return .pipelineReadCmds(, , , , )
})
})
})
}
func ( *ClusterClient) (
context.Context,
*clusterNode,
*proto.Reader,
[]Cmder,
*cmdsMap,
) error {
for , := range {
:= .readReply()
if == nil {
continue
}
if .checkMovedErr(, , , ) {
continue
}
if .opt.ReadOnly && isLoadingError() {
.MarkAsFailing()
return
}
if isRedisError() {
continue
}
return
}
return nil
}
func ( *ClusterClient) (
context.Context, Cmder, error, *cmdsMap,
) bool {
, , := isMovedError()
if ! && ! {
return false
}
, := .nodes.Get()
if != nil {
return false
}
if {
.state.LazyReload()
.Add(, )
return true
}
if {
.Add(, NewCmd(, "asking"), )
return true
}
panic("not reached")
}
func ( *ClusterClient) () Pipeliner {
:= Pipeline{
ctx: .ctx,
exec: .processTxPipeline,
}
.init()
return &
}
func ( *ClusterClient) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {
return .TxPipeline().Pipelined(, )
}
func ( *ClusterClient) ( context.Context, []Cmder) error {
return .hooks.processPipeline(, , ._processTxPipeline)
}
func ( *ClusterClient) ( context.Context, []Cmder) error {
, := .state.Get()
if != nil {
setCmdsErr(, )
return
}
:= .mapCmdsBySlot()
for , := range {
, := .slotMasterNode()
if != nil {
setCmdsErr(, )
continue
}
:= map[*clusterNode][]Cmder{: }
for := 0; <= .opt.MaxRedirects; ++ {
if > 0 {
if := internal.Sleep(, .retryBackoff()); != nil {
setCmdsErr(, )
return
}
}
:= newCmdsMap()
var sync.WaitGroup
for , := range {
.Add(1)
go func( *clusterNode, []Cmder) {
defer .Done()
:= ._processTxPipelineNode(, , , )
if == nil {
return
}
if < .opt.MaxRedirects {
if := .mapCmdsByNode(, , ); != nil {
setCmdsErr(, )
}
} else {
setCmdsErr(, )
}
}(, )
}
.Wait()
if len(.m) == 0 {
break
}
= .m
}
}
return cmdsFirstErr()
}
func ( *ClusterClient) ( []Cmder) map[int][]Cmder {
:= make(map[int][]Cmder)
for , := range {
:= .cmdSlot()
[] = append([], )
}
return
}
func ( *ClusterClient) (
context.Context, *clusterNode, []Cmder, *cmdsMap,
) error {
return .Client.hooks.processTxPipeline(, , func( context.Context, []Cmder) error {
return .Client.withConn(, func( context.Context, *pool.Conn) error {
:= .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
return writeCmds(, )
})
if != nil {
return
}
return .WithReader(, .opt.ReadTimeout, func( *proto.Reader) error {
= [1 : len()-1]
:= .txPipelineReadQueued(, , , , )
if != nil {
, , := isMovedError()
if || {
return .cmdsMoved(, , , , , )
}
return
}
return pipelineReadCmds(, )
})
})
})
}
func ( *ClusterClient) (
context.Context,
*proto.Reader,
*StatusCmd,
[]Cmder,
*cmdsMap,
if := .readReply(); != nil {
return
}
for , := range {
:= .readReply()
if == nil || .checkMovedErr(, , , ) || isRedisError() {
continue
}
return
}
, := .ReadLine()
if != nil {
if == Nil {
= TxFailedErr
}
return
}
switch [0] {
case proto.ErrorReply:
return proto.ParseErrorReply()
default:
return fmt.Errorf("redis: expected '*', but got line %q", )
}
return nil
}
func ( *ClusterClient) (
context.Context, []Cmder,
, bool,
string,
*cmdsMap,
) error {
, := .nodes.Get()
if != nil {
return
}
if {
.state.LazyReload()
for , := range {
.Add(, )
}
return nil
}
if {
for , := range {
.Add(, NewCmd(, "asking"), )
}
return nil
}
return nil
}
func ( *ClusterClient) ( context.Context, func(*Tx) error, ...string) error {
if len() == 0 {
return fmt.Errorf("redis: Watch requires at least one key")
}
:= hashtag.Slot([0])
for , := range [1:] {
if hashtag.Slot() != {
:= fmt.Errorf("redis: Watch requires all keys to be in the same slot")
return
}
}
, := .slotMasterNode(, )
if != nil {
return
}
for := 0; <= .opt.MaxRedirects; ++ {
if > 0 {
if := internal.Sleep(, .retryBackoff()); != nil {
return
}
}
= .Client.Watch(, , ...)
if == nil {
break
}
, , := isMovedError()
if || {
, = .nodes.Get()
if != nil {
return
}
continue
}
if := isReadOnlyError(); || == pool.ErrClosed {
if {
.state.LazyReload()
}
, = .slotMasterNode(, )
if != nil {
return
}
continue
}
if shouldRetry(, true) {
continue
}
return
}
return
}
func ( *ClusterClient) () *PubSub {
var *clusterNode
:= &PubSub{
opt: .opt.clientOptions(),
newConn: func( context.Context, []string) (*pool.Conn, error) {
if != nil {
panic("node != nil")
}
var error
if len() > 0 {
:= hashtag.Slot([0])
, = .slotMasterNode(, )
} else {
, = .nodes.Random()
}
if != nil {
return nil,
}
, := .Client.newConn(context.TODO())
if != nil {
= nil
return nil,
}
return , nil
},
closeConn: func( *pool.Conn) error {
:= .Client.connPool.CloseConn()
= nil
return
},
}
.init()
return
}
func ( *ClusterClient) ( context.Context, ...string) *PubSub {
:= .pubSub()
if len() > 0 {
_ = .PSubscribe(, ...)
}
return
}
func ( *ClusterClient) ( int) time.Duration {
return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}
const = 3
, := .nodes.Addrs()
if != nil {
return nil,
}
var error
:= rand.Perm(len())
if len() > {
= [:]
}
for , := range {
:= []
, := .nodes.Get()
if != nil {
if == nil {
=
}
continue
}
, := .Client.Command(.ctx).Result()
if == nil {
return , nil
}
if == nil {
=
}
}
if == nil {
panic("not reached")
}
return nil,
}
func ( *ClusterClient) ( string) *CommandInfo {
, := .cmdsInfoCache.Get()
if != nil {
return nil
}
:= []
if == nil {
internal.Logger.Printf(.Context(), "info for cmd=%s not found", )
}
return
}
func ( *ClusterClient) ( Cmder) int {
:= .Args()
if [0] == "cluster" && [1] == "getkeysinslot" {
return [2].(int)
}
:= .cmdInfo(.Name())
return cmdSlot(, cmdFirstKeyPos(, ))
}
func ( Cmder, int) int {
if == 0 {
return hashtag.RandomSlot()
}
:= .stringArg()
return hashtag.Slot()
}
func ( *ClusterClient) (
context.Context,
*CommandInfo,
int,
) (*clusterNode, error) {
, := .state.Get()
if != nil {
return nil,
}
if (.opt.RouteByLatency || .opt.RouteRandomly) && != nil && .ReadOnly {
return .slotReadOnlyNode(, )
}
return .slotMasterNode()
}
func ( *clusterClient) ( *clusterState, int) (*clusterNode, error) {
if .opt.RouteByLatency {
return .slotClosestNode()
}
if .opt.RouteRandomly {
return .slotRandomNode()
}
return .slotSlaveNode()
}
func ( *ClusterClient) ( context.Context, int) (*clusterNode, error) {
, := .state.Get()
if != nil {
return nil,
}
return .slotMasterNode()
}
func ( []*clusterNode, *clusterNode) []*clusterNode {
for , := range {
if == {
return
}
}
return append(, )
}
func ( []string, ...string) []string {
:
for , := range {
for , := range {
if == {
continue
}
}
= append(, )
}
return
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |