package redis

import (
	
	
	
	
	
	
	
	
	
	

	
	
	
	
	
)

var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
ClusterOptions are used to configure a cluster client and should be passed to NewClusterClient.
A seed list of host:port addresses of cluster nodes.
NewClient creates a cluster node client with provided name and options.
	NewClient func(opt *Options) *Client
The maximum number of retries before giving up. Command is retried on network errors and MOVED/ASK redirects. Default is 3 retries.
Enables read-only commands on slave nodes.
Allows routing read-only commands to the closest master or slave node. It automatically enables ReadOnly.
Allows routing read-only commands to the random master or slave node. It automatically enables ReadOnly.
Optional function that returns cluster slots information. It is useful to manually create cluster of standalone Redis servers and load-balance read/write operations between master and slaves. It can use service like ZooKeeper to maintain configuration information and Cluster.ReloadState to manually trigger state reloading.
PoolSize applies per cluster node and not for the whole cluster.
	PoolSize           int
	MinIdleConns       int
	MaxConnAge         time.Duration
	PoolTimeout        time.Duration
	IdleTimeout        time.Duration
	IdleCheckFrequency time.Duration

	TLSConfig *tls.Config
}

func ( *ClusterOptions) () {
	if .MaxRedirects == -1 {
		.MaxRedirects = 0
	} else if .MaxRedirects == 0 {
		.MaxRedirects = 3
	}

	if (.RouteByLatency || .RouteRandomly) && .ClusterSlots == nil {
		.ReadOnly = true
	}

	if .PoolSize == 0 {
		.PoolSize = 5 * runtime.NumCPU()
	}

	switch .ReadTimeout {
	case -1:
		.ReadTimeout = 0
	case 0:
		.ReadTimeout = 3 * time.Second
	}
	switch .WriteTimeout {
	case -1:
		.WriteTimeout = 0
	case 0:
		.WriteTimeout = .ReadTimeout
	}

	if .MaxRetries == 0 {
		.MaxRetries = -1
	}
	switch .MinRetryBackoff {
	case -1:
		.MinRetryBackoff = 0
	case 0:
		.MinRetryBackoff = 8 * time.Millisecond
	}
	switch .MaxRetryBackoff {
	case -1:
		.MaxRetryBackoff = 0
	case 0:
		.MaxRetryBackoff = 512 * time.Millisecond
	}

	if .NewClient == nil {
		.NewClient = NewClient
	}
}

func ( *ClusterOptions) () *Options {
	const  = -1

	return &Options{
		Dialer:    .Dialer,
		OnConnect: .OnConnect,

		Username: .Username,
		Password: .Password,

		MaxRetries:      .MaxRetries,
		MinRetryBackoff: .MinRetryBackoff,
		MaxRetryBackoff: .MaxRetryBackoff,

		DialTimeout:  .DialTimeout,
		ReadTimeout:  .ReadTimeout,
		WriteTimeout: .WriteTimeout,

		PoolSize:           .PoolSize,
		MinIdleConns:       .MinIdleConns,
		MaxConnAge:         .MaxConnAge,
		PoolTimeout:        .PoolTimeout,
		IdleTimeout:        .IdleTimeout,
		IdleCheckFrequency: ,

		readOnly: .ReadOnly,

		TLSConfig: .TLSConfig,
	}
}
------------------------------------------------------------------------------

type clusterNode struct {
	Client *Client

	latency    uint32 // atomic
	generation uint32 // atomic
	failing    uint32 // atomic
}

func ( *ClusterOptions,  string) *clusterNode {
	 := .clientOptions()
	.Addr = 
	 := clusterNode{
		Client: .NewClient(),
	}

	.latency = math.MaxUint32
	if .RouteByLatency {
		go .updateLatency()
	}

	return &
}

func ( *clusterNode) () string {
	return .Client.String()
}

func ( *clusterNode) () error {
	return .Client.Close()
}

func ( *clusterNode) () {
	const  = 10
	var  uint64

	for  := 0;  < ; ++ {
		time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)

		 := time.Now()
		.Client.Ping(context.TODO())
		 += uint64(time.Since() / time.Microsecond)
	}

	 := float64() / float64()
	atomic.StoreUint32(&.latency, uint32(+0.5))
}

func ( *clusterNode) () time.Duration {
	 := atomic.LoadUint32(&.latency)
	return time.Duration() * time.Microsecond
}

func ( *clusterNode) () {
	atomic.StoreUint32(&.failing, uint32(time.Now().Unix()))
}

func ( *clusterNode) () bool {
	const  = 15 // 15 seconds

	 := atomic.LoadUint32(&.failing)
	if  == 0 {
		return false
	}
	if time.Now().Unix()-int64() <  {
		return true
	}
	atomic.StoreUint32(&.failing, 0)
	return false
}

func ( *clusterNode) () uint32 {
	return atomic.LoadUint32(&.generation)
}

func ( *clusterNode) ( uint32) {
	for {
		 := atomic.LoadUint32(&.generation)
		if  <  || atomic.CompareAndSwapUint32(&.generation, , ) {
			break
		}
	}
}
------------------------------------------------------------------------------

type clusterNodes struct {
	opt *ClusterOptions

	mu          sync.RWMutex
	addrs       []string
	nodes       map[string]*clusterNode
	activeAddrs []string
	closed      bool

	_generation uint32 // atomic
}

func ( *ClusterOptions) *clusterNodes {
	return &clusterNodes{
		opt: ,

		addrs: .Addrs,
		nodes: make(map[string]*clusterNode),
	}
}

func ( *clusterNodes) () error {
	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return nil
	}
	.closed = true

	var  error
	for ,  := range .nodes {
		if  := .Client.Close();  != nil &&  == nil {
			 = 
		}
	}

	.nodes = nil
	.activeAddrs = nil

	return 
}

func ( *clusterNodes) () ([]string, error) {
	var  []string
	.mu.RLock()
	 := .closed
	if ! {
		if len(.activeAddrs) > 0 {
			 = .activeAddrs
		} else {
			 = .addrs
		}
	}
	.mu.RUnlock()

	if  {
		return nil, pool.ErrClosed
	}
	if len() == 0 {
		return nil, errClusterNoNodes
	}
	return , nil
}

func ( *clusterNodes) () uint32 {
	return atomic.AddUint32(&._generation, 1)
}
GC removes unused nodes.
nolint:prealloc
	var  []*clusterNode

	.mu.Lock()

	.activeAddrs = .activeAddrs[:0]
	for ,  := range .nodes {
		if .Generation() >=  {
			.activeAddrs = append(.activeAddrs, )
			if .opt.RouteByLatency {
				go .updateLatency()
			}
			continue
		}

		delete(.nodes, )
		 = append(, )
	}

	.mu.Unlock()

	for ,  := range  {
		_ = .Client.Close()
	}
}

func ( *clusterNodes) ( string) (*clusterNode, error) {
	,  := .get()
	if  != nil {
		return nil, 
	}
	if  != nil {
		return , nil
	}

	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return nil, pool.ErrClosed
	}

	,  := .nodes[]
	if  {
		return , nil
	}

	 = newClusterNode(.opt, )

	.addrs = appendIfNotExists(.addrs, )
	.nodes[] = 

	return , nil
}

func ( *clusterNodes) ( string) (*clusterNode, error) {
	var  *clusterNode
	var  error
	.mu.RLock()
	if .closed {
		 = pool.ErrClosed
	} else {
		 = .nodes[]
	}
	.mu.RUnlock()
	return , 
}

func ( *clusterNodes) () ([]*clusterNode, error) {
	.mu.RLock()
	defer .mu.RUnlock()

	if .closed {
		return nil, pool.ErrClosed
	}

	 := make([]*clusterNode, 0, len(.nodes))
	for ,  := range .nodes {
		 = append(, )
	}
	return , nil
}

func ( *clusterNodes) () (*clusterNode, error) {
	,  := .Addrs()
	if  != nil {
		return nil, 
	}

	 := rand.Intn(len())
	return .Get([])
}
------------------------------------------------------------------------------

type clusterSlot struct {
	start, end int
	nodes      []*clusterNode
}

type clusterSlotSlice []*clusterSlot

func ( clusterSlotSlice) () int {
	return len()
}

func ( clusterSlotSlice) (,  int) bool {
	return [].start < [].start
}

func ( clusterSlotSlice) (,  int) {
	[], [] = [], []
}

type clusterState struct {
	nodes   *clusterNodes
	Masters []*clusterNode
	Slaves  []*clusterNode

	slots []*clusterSlot

	generation uint32
	createdAt  time.Time
}

func (
	 *clusterNodes,  []ClusterSlot,  string,
) (*clusterState, error) {
	 := clusterState{
		nodes: ,

		slots: make([]*clusterSlot, 0, len()),

		generation: .NextGeneration(),
		createdAt:  time.Now(),
	}

	, ,  := net.SplitHostPort()
	 := isLoopback()

	for ,  := range  {
		var  []*clusterNode
		for ,  := range .Nodes {
			 := .Addr
			if ! {
				 = replaceLoopbackHost(, )
			}

			,  := .nodes.Get()
			if  != nil {
				return nil, 
			}

			.SetGeneration(.generation)
			 = append(, )

			if  == 0 {
				.Masters = appendUniqueNode(.Masters, )
			} else {
				.Slaves = appendUniqueNode(.Slaves, )
			}
		}

		.slots = append(.slots, &clusterSlot{
			start: .Start,
			end:   .End,
			nodes: ,
		})
	}

	sort.Sort(clusterSlotSlice(.slots))

	time.AfterFunc(time.Minute, func() {
		.GC(.generation)
	})

	return &, nil
}

func (,  string) string {
	, ,  := net.SplitHostPort()
	if  != nil {
		return 
	}

	 := net.ParseIP()
	if  == nil {
		return 
	}

	if !.IsLoopback() {
		return 
	}
Use origin host which is not loopback and node port.
	return net.JoinHostPort(, )
}

func ( string) bool {
	 := net.ParseIP()
	if  == nil {
		return true
	}
	return .IsLoopback()
}

func ( *clusterState) ( int) (*clusterNode, error) {
	 := .slotNodes()
	if len() > 0 {
		return [0], nil
	}
	return .nodes.Random()
}

func ( *clusterState) ( int) (*clusterNode, error) {
	 := .slotNodes()
	switch len() {
	case 0:
		return .nodes.Random()
	case 1:
		return [0], nil
	case 2:
		if  := [1]; !.Failing() {
			return , nil
		}
		return [0], nil
	default:
		var  *clusterNode
		for  := 0;  < 10; ++ {
			 := rand.Intn(len()-1) + 1
			 = []
			if !.Failing() {
				return , nil
			}
		}
All slaves are loading - use master.
		return [0], nil
	}
}

func ( *clusterState) ( int) (*clusterNode, error) {
	 := .slotNodes()
	if len() == 0 {
		return .nodes.Random()
	}

	var  *clusterNode
	for ,  := range  {
		if .Failing() {
			continue
		}
		if  == nil || .Latency() < .Latency() {
			 = 
		}
	}
	if  != nil {
		return , nil
	}
If all nodes are failing - return random node
	return .nodes.Random()
}

func ( *clusterState) ( int) (*clusterNode, error) {
	 := .slotNodes()
	if len() == 0 {
		return .nodes.Random()
	}
	 := rand.Intn(len())
	return [], nil
}

func ( *clusterState) ( int) []*clusterNode {
	 := sort.Search(len(.slots), func( int) bool {
		return .slots[].end >= 
	})
	if  >= len(.slots) {
		return nil
	}
	 := .slots[]
	if  >= .start &&  <= .end {
		return .nodes
	}
	return nil
}
------------------------------------------------------------------------------

type clusterStateHolder struct {
	load func(ctx context.Context) (*clusterState, error)

	state     atomic.Value
	reloading uint32 // atomic
}

func ( func( context.Context) (*clusterState, error)) *clusterStateHolder {
	return &clusterStateHolder{
		load: ,
	}
}

func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
	,  := .load()
	if  != nil {
		return nil, 
	}
	.state.Store()
	return , nil
}

func ( *clusterStateHolder) ( context.Context) {
	if !atomic.CompareAndSwapUint32(&.reloading, 0, 1) {
		return
	}
	go func() {
		defer atomic.StoreUint32(&.reloading, 0)

		,  := .Reload()
		if  != nil {
			return
		}
		time.Sleep(200 * time.Millisecond)
	}()
}

func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
	 := .state.Load()
	if  != nil {
		 := .(*clusterState)
		if time.Since(.createdAt) > 10*time.Second {
			.LazyReload()
		}
		return , nil
	}
	return .Reload()
}

func ( *clusterStateHolder) ( context.Context) (*clusterState, error) {
	,  := .Reload()
	if  == nil {
		return , nil
	}
	return .Get()
}
------------------------------------------------------------------------------

type clusterClient struct {
	opt           *ClusterOptions
	nodes         *clusterNodes
	state         *clusterStateHolder //nolint:structcheck
	cmdsInfoCache *cmdsInfoCache      //nolint:structcheck
}
ClusterClient is a Redis Cluster client representing a pool of zero or more underlying connections. It's safe for concurrent use by multiple goroutines.
NewClusterClient returns a Redis Cluster client as described in http://redis.io/topics/cluster-spec.
func ( *ClusterOptions) *ClusterClient {
	.init()

	 := &ClusterClient{
		clusterClient: &clusterClient{
			opt:   ,
			nodes: newClusterNodes(),
		},
		ctx: context.Background(),
	}
	.state = newClusterStateHolder(.loadState)
	.cmdsInfoCache = newCmdsInfoCache(.cmdsInfo)
	.cmdable = .Process

	if .IdleCheckFrequency > 0 {
		go .reaper(.IdleCheckFrequency)
	}

	return 
}

func ( *ClusterClient) () context.Context {
	return .ctx
}

func ( *ClusterClient) ( context.Context) *ClusterClient {
	if  == nil {
		panic("nil context")
	}
	 := *
	.cmdable = .Process
	.hooks.lock()
	.ctx = 
	return &
}
Options returns read-only Options that were used to create the client.
func ( *ClusterClient) () *ClusterOptions {
	return .opt
}
ReloadState reloads cluster state. If available it calls ClusterSlots func to get cluster slots information.
Close closes the cluster client, releasing any open resources. It is rare to Close a ClusterClient, as the ClusterClient is meant to be long-lived and shared between many goroutines.
func ( *ClusterClient) () error {
	return .nodes.Close()
}
Do creates a Cmd from the args and processes the cmd.
func ( *ClusterClient) ( context.Context,  ...interface{}) *Cmd {
	 := NewCmd(, ...)
	_ = .Process(, )
	return 
}

func ( *ClusterClient) ( context.Context,  Cmder) error {
	return .hooks.process(, , .process)
}

func ( *ClusterClient) ( context.Context,  Cmder) error {
	 := ._process(, )
	if  != nil {
		.SetErr()
		return 
	}
	return nil
}

func ( *ClusterClient) ( context.Context,  Cmder) error {
	 := .cmdInfo(.Name())
	 := .cmdSlot()

	var  *clusterNode
	var  bool
	var  error
	for  := 0;  <= .opt.MaxRedirects; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				return 
			}
		}

		if  == nil {
			var  error
			,  = .cmdNode(, , )
			if  != nil {
				return 
			}
		}

		if  {
			 := .Client.Pipeline()
			_ = .Process(, NewCmd(, "asking"))
			_ = .Process(, )
			_,  = .Exec()
			_ = .Close()
			 = false
		} else {
			 = .Client.Process(, )
		}
If there is no error - we are done.
		if  == nil {
			return nil
		}
		if  := isReadOnlyError();  ||  == pool.ErrClosed {
			if  {
				.state.LazyReload()
			}
			 = nil
			continue
		}
If slave is loading - pick another node.
		if .opt.ReadOnly && isLoadingError() {
			.MarkAsFailing()
			 = nil
			continue
		}

		var  bool
		var  string
		, ,  = isMovedError()
		if  ||  {
			var  error
			,  = .nodes.Get()
			if  != nil {
				return 
			}
			continue
		}

First retry the same node.
			if  == 0 {
				continue
			}
Second try another node.
			.MarkAsFailing()
			 = nil
			continue
		}

		return 
	}
	return 
}
ForEachMaster concurrently calls the fn on each master node in the cluster. It returns the first error if any.
func ( *ClusterClient) (
	 context.Context,
	 func( context.Context,  *Client) error,
) error {
	,  := .state.ReloadOrGet()
	if  != nil {
		return 
	}

	var  sync.WaitGroup
	 := make(chan error, 1)

	for ,  := range .Masters {
		.Add(1)
		go func( *clusterNode) {
			defer .Done()
			 := (, .Client)
			if  != nil {
				select {
				case  <- :
				default:
				}
			}
		}()
	}

	.Wait()

	select {
	case  := <-:
		return 
	default:
		return nil
	}
}
ForEachSlave concurrently calls the fn on each slave node in the cluster. It returns the first error if any.
func ( *ClusterClient) (
	 context.Context,
	 func( context.Context,  *Client) error,
) error {
	,  := .state.ReloadOrGet()
	if  != nil {
		return 
	}

	var  sync.WaitGroup
	 := make(chan error, 1)

	for ,  := range .Slaves {
		.Add(1)
		go func( *clusterNode) {
			defer .Done()
			 := (, .Client)
			if  != nil {
				select {
				case  <- :
				default:
				}
			}
		}()
	}

	.Wait()

	select {
	case  := <-:
		return 
	default:
		return nil
	}
}
ForEachShard concurrently calls the fn on each known node in the cluster. It returns the first error if any.
func ( *ClusterClient) (
	 context.Context,
	 func( context.Context,  *Client) error,
) error {
	,  := .state.ReloadOrGet()
	if  != nil {
		return 
	}

	var  sync.WaitGroup
	 := make(chan error, 1)

	 := func( *clusterNode) {
		defer .Done()
		 := (, .Client)
		if  != nil {
			select {
			case  <- :
			default:
			}
		}
	}

	for ,  := range .Masters {
		.Add(1)
		go ()
	}
	for ,  := range .Slaves {
		.Add(1)
		go ()
	}

	.Wait()

	select {
	case  := <-:
		return 
	default:
		return nil
	}
}
PoolStats returns accumulated connection pool stats.
func ( *ClusterClient) () *PoolStats {
	var  PoolStats

	,  := .state.Get(context.TODO())
	if  == nil {
		return &
	}

	for ,  := range .Masters {
		 := .Client.connPool.Stats()
		.Hits += .Hits
		.Misses += .Misses
		.Timeouts += .Timeouts

		.TotalConns += .TotalConns
		.IdleConns += .IdleConns
		.StaleConns += .StaleConns
	}

	for ,  := range .Slaves {
		 := .Client.connPool.Stats()
		.Hits += .Hits
		.Misses += .Misses
		.Timeouts += .Timeouts

		.TotalConns += .TotalConns
		.IdleConns += .IdleConns
		.StaleConns += .StaleConns
	}

	return &
}

func ( *ClusterClient) ( context.Context) (*clusterState, error) {
	if .opt.ClusterSlots != nil {
		,  := .opt.ClusterSlots()
		if  != nil {
			return nil, 
		}
		return newClusterState(.nodes, , "")
	}

	,  := .nodes.Addrs()
	if  != nil {
		return nil, 
	}

	var  error

	for ,  := range rand.Perm(len()) {
		 := []

		,  := .nodes.Get()
		if  != nil {
			if  == nil {
				 = 
			}
			continue
		}

		,  := .Client.ClusterSlots().Result()
		if  != nil {
			if  == nil {
				 = 
			}
			continue
		}

		return newClusterState(.nodes, , .Client.opt.Addr)
	}
* No node is connectable. It's possible that all nodes' IP has changed. * Clear activeAddrs to let client be able to re-connect using the initial * setting of the addresses (e.g. [redis-cluster-0:6379, redis-cluster-1:6379]), * which might have chance to resolve domain name and get updated IP address.
	.nodes.mu.Lock()
	.nodes.activeAddrs = nil
	.nodes.mu.Unlock()

	return nil, 
}
reaper closes idle connections to the cluster.
func ( *ClusterClient) ( time.Duration) {
	 := time.NewTicker()
	defer .Stop()

	for range .C {
		,  := .nodes.All()
		if  != nil {
			break
		}

		for ,  := range  {
			,  := .Client.connPool.(*pool.ConnPool).ReapStaleConns()
			if  != nil {
				internal.Logger.Printf(.Context(), "ReapStaleConns failed: %s", )
			}
		}
	}
}

func ( *ClusterClient) () Pipeliner {
	 := Pipeline{
		ctx:  .ctx,
		exec: .processPipeline,
	}
	.init()
	return &
}

func ( *ClusterClient) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .Pipeline().Pipelined(, )
}

func ( *ClusterClient) ( context.Context,  []Cmder) error {
	return .hooks.processPipeline(, , ._processPipeline)
}

func ( *ClusterClient) ( context.Context,  []Cmder) error {
	 := newCmdsMap()
	 := .mapCmdsByNode(, , )
	if  != nil {
		setCmdsErr(, )
		return 
	}

	for  := 0;  <= .opt.MaxRedirects; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				setCmdsErr(, )
				return 
			}
		}

		 := newCmdsMap()
		var  sync.WaitGroup

		for ,  := range .m {
			.Add(1)
			go func( *clusterNode,  []Cmder) {
				defer .Done()

				 := ._processPipelineNode(, , , )
				if  == nil {
					return
				}
				if  < .opt.MaxRedirects {
					if  := .mapCmdsByNode(, , );  != nil {
						setCmdsErr(, )
					}
				} else {
					setCmdsErr(, )
				}
			}(, )
		}

		.Wait()
		if len(.m) == 0 {
			break
		}
		 = 
	}

	return cmdsFirstErr()
}

func ( *ClusterClient) ( context.Context,  *cmdsMap,  []Cmder) error {
	,  := .state.Get()
	if  != nil {
		return 
	}

	if .opt.ReadOnly && .cmdsAreReadOnly() {
		for ,  := range  {
			 := .cmdSlot()
			,  := .slotReadOnlyNode(, )
			if  != nil {
				return 
			}
			.Add(, )
		}
		return nil
	}

	for ,  := range  {
		 := .cmdSlot()
		,  := .slotMasterNode()
		if  != nil {
			return 
		}
		.Add(, )
	}
	return nil
}

func ( *ClusterClient) ( []Cmder) bool {
	for ,  := range  {
		 := .cmdInfo(.Name())
		if  == nil || !.ReadOnly {
			return false
		}
	}
	return true
}

func ( *ClusterClient) (
	 context.Context,  *clusterNode,  []Cmder,  *cmdsMap,
) error {
	return .Client.hooks.processPipeline(, , func( context.Context,  []Cmder) error {
		return .Client.withConn(, func( context.Context,  *pool.Conn) error {
			 := .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
				return writeCmds(, )
			})
			if  != nil {
				return 
			}

			return .WithReader(, .opt.ReadTimeout, func( *proto.Reader) error {
				return .pipelineReadCmds(, , , , )
			})
		})
	})
}

func ( *ClusterClient) (
	 context.Context,
	 *clusterNode,
	 *proto.Reader,
	 []Cmder,
	 *cmdsMap,
) error {
	for ,  := range  {
		 := .readReply()
		if  == nil {
			continue
		}
		if .checkMovedErr(, , , ) {
			continue
		}

		if .opt.ReadOnly && isLoadingError() {
			.MarkAsFailing()
			return 
		}
		if isRedisError() {
			continue
		}
		return 
	}
	return nil
}

func ( *ClusterClient) (
	 context.Context,  Cmder,  error,  *cmdsMap,
) bool {
	, ,  := isMovedError()
	if ! && ! {
		return false
	}

	,  := .nodes.Get()
	if  != nil {
		return false
	}

	if  {
		.state.LazyReload()
		.Add(, )
		return true
	}

	if  {
		.Add(, NewCmd(, "asking"), )
		return true
	}

	panic("not reached")
}
TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func ( *ClusterClient) () Pipeliner {
	 := Pipeline{
		ctx:  .ctx,
		exec: .processTxPipeline,
	}
	.init()
	return &
}

func ( *ClusterClient) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .TxPipeline().Pipelined(, )
}

func ( *ClusterClient) ( context.Context,  []Cmder) error {
	return .hooks.processPipeline(, , ._processTxPipeline)
}

func ( *ClusterClient) ( context.Context,  []Cmder) error {
	,  := .state.Get()
	if  != nil {
		setCmdsErr(, )
		return 
	}

	 := .mapCmdsBySlot()
	for ,  := range  {
		,  := .slotMasterNode()
		if  != nil {
			setCmdsErr(, )
			continue
		}

		 := map[*clusterNode][]Cmder{: }
		for  := 0;  <= .opt.MaxRedirects; ++ {
			if  > 0 {
				if  := internal.Sleep(, .retryBackoff());  != nil {
					setCmdsErr(, )
					return 
				}
			}

			 := newCmdsMap()
			var  sync.WaitGroup

			for ,  := range  {
				.Add(1)
				go func( *clusterNode,  []Cmder) {
					defer .Done()

					 := ._processTxPipelineNode(, , , )
					if  == nil {
						return
					}
					if  < .opt.MaxRedirects {
						if  := .mapCmdsByNode(, , );  != nil {
							setCmdsErr(, )
						}
					} else {
						setCmdsErr(, )
					}
				}(, )
			}

			.Wait()
			if len(.m) == 0 {
				break
			}
			 = .m
		}
	}

	return cmdsFirstErr()
}

func ( *ClusterClient) ( []Cmder) map[int][]Cmder {
	 := make(map[int][]Cmder)
	for ,  := range  {
		 := .cmdSlot()
		[] = append([], )
	}
	return 
}

func ( *ClusterClient) (
	 context.Context,  *clusterNode,  []Cmder,  *cmdsMap,
) error {
	return .Client.hooks.processTxPipeline(, , func( context.Context,  []Cmder) error {
		return .Client.withConn(, func( context.Context,  *pool.Conn) error {
			 := .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
				return writeCmds(, )
			})
			if  != nil {
				return 
			}

			return .WithReader(, .opt.ReadTimeout, func( *proto.Reader) error {
Trim multi and exec.
				 = [1 : len()-1]

				 := .txPipelineReadQueued(, , , , )
				if  != nil {
					, ,  := isMovedError()
					if  ||  {
						return .cmdsMoved(, , , , , )
					}
					return 
				}

				return pipelineReadCmds(, )
			})
		})
	})
}

func ( *ClusterClient) (
	 context.Context,
	 *proto.Reader,
	 *StatusCmd,
	 []Cmder,
	 *cmdsMap,
Parse queued replies.
	if  := .readReply();  != nil {
		return 
	}

	for ,  := range  {
		 := .readReply()
		if  == nil || .checkMovedErr(, , , ) || isRedisError() {
			continue
		}
		return 
	}
Parse number of replies.
	,  := .ReadLine()
	if  != nil {
		if  == Nil {
			 = TxFailedErr
		}
		return 
	}

	switch [0] {
	case proto.ErrorReply:
		return proto.ParseErrorReply()
ok
	default:
		return fmt.Errorf("redis: expected '*', but got line %q", )
	}

	return nil
}

func ( *ClusterClient) (
	 context.Context,  []Cmder,
	,  bool,
	 string,
	 *cmdsMap,
) error {
	,  := .nodes.Get()
	if  != nil {
		return 
	}

	if  {
		.state.LazyReload()
		for ,  := range  {
			.Add(, )
		}
		return nil
	}

	if  {
		for ,  := range  {
			.Add(, NewCmd(, "asking"), )
		}
		return nil
	}

	return nil
}

func ( *ClusterClient) ( context.Context,  func(*Tx) error,  ...string) error {
	if len() == 0 {
		return fmt.Errorf("redis: Watch requires at least one key")
	}

	 := hashtag.Slot([0])
	for ,  := range [1:] {
		if hashtag.Slot() !=  {
			 := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
			return 
		}
	}

	,  := .slotMasterNode(, )
	if  != nil {
		return 
	}

	for  := 0;  <= .opt.MaxRedirects; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				return 
			}
		}

		 = .Client.Watch(, , ...)
		if  == nil {
			break
		}

		, ,  := isMovedError()
		if  ||  {
			,  = .nodes.Get()
			if  != nil {
				return 
			}
			continue
		}

		if  := isReadOnlyError();  ||  == pool.ErrClosed {
			if  {
				.state.LazyReload()
			}
			,  = .slotMasterNode(, )
			if  != nil {
				return 
			}
			continue
		}

		if shouldRetry(, true) {
			continue
		}

		return 
	}

	return 
}

func ( *ClusterClient) () *PubSub {
	var  *clusterNode
	 := &PubSub{
		opt: .opt.clientOptions(),

		newConn: func( context.Context,  []string) (*pool.Conn, error) {
			if  != nil {
				panic("node != nil")
			}

			var  error
			if len() > 0 {
				 := hashtag.Slot([0])
				,  = .slotMasterNode(, )
			} else {
				,  = .nodes.Random()
			}
			if  != nil {
				return nil, 
			}

			,  := .Client.newConn(context.TODO())
			if  != nil {
				 = nil

				return nil, 
			}

			return , nil
		},
		closeConn: func( *pool.Conn) error {
			 := .Client.connPool.CloseConn()
			 = nil
			return 
		},
	}
	.init()

	return 
}
Subscribe subscribes the client to the specified channels. Channels can be omitted to create empty subscription.
func ( *ClusterClient) ( context.Context,  ...string) *PubSub {
	 := .pubSub()
	if len() > 0 {
		_ = .Subscribe(, ...)
	}
	return 
}
PSubscribe subscribes the client to the given patterns. Patterns can be omitted to create empty subscription.
func ( *ClusterClient) ( context.Context,  ...string) *PubSub {
	 := .pubSub()
	if len() > 0 {
		_ = .PSubscribe(, ...)
	}
	return 
}

func ( *ClusterClient) ( int) time.Duration {
	return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}

Try 3 random nodes.
	const  = 3

	,  := .nodes.Addrs()
	if  != nil {
		return nil, 
	}

	var  error

	 := rand.Perm(len())
	if len() >  {
		 = [:]
	}

	for ,  := range  {
		 := []

		,  := .nodes.Get()
		if  != nil {
			if  == nil {
				 = 
			}
			continue
		}

		,  := .Client.Command(.ctx).Result()
		if  == nil {
			return , nil
		}
		if  == nil {
			 = 
		}
	}

	if  == nil {
		panic("not reached")
	}
	return nil, 
}

func ( *ClusterClient) ( string) *CommandInfo {
	,  := .cmdsInfoCache.Get()
	if  != nil {
		return nil
	}

	 := []
	if  == nil {
		internal.Logger.Printf(.Context(), "info for cmd=%s not found", )
	}
	return 
}

func ( *ClusterClient) ( Cmder) int {
	 := .Args()
	if [0] == "cluster" && [1] == "getkeysinslot" {
		return [2].(int)
	}

	 := .cmdInfo(.Name())
	return cmdSlot(, cmdFirstKeyPos(, ))
}

func ( Cmder,  int) int {
	if  == 0 {
		return hashtag.RandomSlot()
	}
	 := .stringArg()
	return hashtag.Slot()
}

func ( *ClusterClient) (
	 context.Context,
	 *CommandInfo,
	 int,
) (*clusterNode, error) {
	,  := .state.Get()
	if  != nil {
		return nil, 
	}

	if (.opt.RouteByLatency || .opt.RouteRandomly) &&  != nil && .ReadOnly {
		return .slotReadOnlyNode(, )
	}
	return .slotMasterNode()
}

func ( *clusterClient) ( *clusterState,  int) (*clusterNode, error) {
	if .opt.RouteByLatency {
		return .slotClosestNode()
	}
	if .opt.RouteRandomly {
		return .slotRandomNode()
	}
	return .slotSlaveNode()
}

func ( *ClusterClient) ( context.Context,  int) (*clusterNode, error) {
	,  := .state.Get()
	if  != nil {
		return nil, 
	}
	return .slotMasterNode()
}

func ( []*clusterNode,  *clusterNode) []*clusterNode {
	for ,  := range  {
		if  ==  {
			return 
		}
	}
	return append(, )
}

func ( []string,  ...string) []string {
:
	for ,  := range  {
		for ,  := range  {
			if  ==  {
				continue 
			}
		}
		 = append(, )
	}
	return 
}
------------------------------------------------------------------------------

type cmdsMap struct {
	mu sync.Mutex
	m  map[*clusterNode][]Cmder
}

func () *cmdsMap {
	return &cmdsMap{
		m: make(map[*clusterNode][]Cmder),
	}
}

func ( *cmdsMap) ( *clusterNode,  ...Cmder) {
	.mu.Lock()
	.m[] = append(.m[], ...)
	.mu.Unlock()