package redis

import (
	
	
	
	
	
	
	
	
	

	
	
	

	
	
	
)

var errRingShardsDown = errors.New("redis: all ring shards are down")
------------------------------------------------------------------------------

type ConsistentHash interface {
	Get(string) string
}

type rendezvousWrapper struct {
	*rendezvous.Rendezvous
}

func ( rendezvousWrapper) ( string) string {
	return .Lookup()
}

func ( []string) ConsistentHash {
	return rendezvousWrapper{rendezvous.New(, xxhash.Sum64String)}
}
------------------------------------------------------------------------------
RingOptions are used to configure a ring client and should be passed to NewRing.
Map of name => host:port addresses of ring shards.
NewClient creates a shard client with provided name and options.
	NewClient func(name string, opt *Options) *Client
Frequency of PING commands sent to check shards availability. Shard is considered down after 3 subsequent failed checks.
NewConsistentHash returns a consistent hash that is used to distribute keys across the shards. See https://medium.com/@dgryski/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8 for consistent hashing algorithmic tradeoffs.
Following options are copied from Options struct.

	Dialer    func(ctx context.Context, network, addr string) (net.Conn, error)
	OnConnect func(ctx context.Context, cn *Conn) error

	Username string
	Password string
	DB       int

	MaxRetries      int
	MinRetryBackoff time.Duration
	MaxRetryBackoff time.Duration

	DialTimeout  time.Duration
	ReadTimeout  time.Duration
	WriteTimeout time.Duration

	PoolSize           int
	MinIdleConns       int
	MaxConnAge         time.Duration
	PoolTimeout        time.Duration
	IdleTimeout        time.Duration
	IdleCheckFrequency time.Duration

	TLSConfig *tls.Config
	Limiter   Limiter
}

func ( *RingOptions) () {
	if .NewClient == nil {
		.NewClient = func( string,  *Options) *Client {
			return NewClient()
		}
	}

	if .HeartbeatFrequency == 0 {
		.HeartbeatFrequency = 500 * time.Millisecond
	}

	if .NewConsistentHash == nil {
		.NewConsistentHash = newRendezvous
	}

	if .MaxRetries == -1 {
		.MaxRetries = 0
	} else if .MaxRetries == 0 {
		.MaxRetries = 3
	}
	switch .MinRetryBackoff {
	case -1:
		.MinRetryBackoff = 0
	case 0:
		.MinRetryBackoff = 8 * time.Millisecond
	}
	switch .MaxRetryBackoff {
	case -1:
		.MaxRetryBackoff = 0
	case 0:
		.MaxRetryBackoff = 512 * time.Millisecond
	}
}

func ( *RingOptions) () *Options {
	return &Options{
		Dialer:    .Dialer,
		OnConnect: .OnConnect,

		Username: .Username,
		Password: .Password,
		DB:       .DB,

		MaxRetries: -1,

		DialTimeout:  .DialTimeout,
		ReadTimeout:  .ReadTimeout,
		WriteTimeout: .WriteTimeout,

		PoolSize:           .PoolSize,
		MinIdleConns:       .MinIdleConns,
		MaxConnAge:         .MaxConnAge,
		PoolTimeout:        .PoolTimeout,
		IdleTimeout:        .IdleTimeout,
		IdleCheckFrequency: .IdleCheckFrequency,

		TLSConfig: .TLSConfig,
		Limiter:   .Limiter,
	}
}
------------------------------------------------------------------------------

type ringShard struct {
	Client *Client
	down   int32
}

func ( *RingOptions, ,  string) *ringShard {
	 := .clientOptions()
	.Addr = 

	return &ringShard{
		Client: .NewClient(, ),
	}
}

func ( *ringShard) () string {
	var  string
	if .IsUp() {
		 = "up"
	} else {
		 = "down"
	}
	return fmt.Sprintf("%s is %s", .Client, )
}

func ( *ringShard) () bool {
	const  = 3
	return atomic.LoadInt32(&.down) >= 
}

func ( *ringShard) () bool {
	return !.IsDown()
}
Vote votes to set shard state and returns true if state was changed.
func ( *ringShard) ( bool) bool {
	if  {
		 := .IsDown()
		atomic.StoreInt32(&.down, 0)
		return 
	}

	if .IsDown() {
		return false
	}

	atomic.AddInt32(&.down, 1)
	return .IsDown()
}
------------------------------------------------------------------------------

type ringShards struct {
	opt *RingOptions

	mu       sync.RWMutex
	hash     ConsistentHash
	shards   map[string]*ringShard // read only
	list     []*ringShard          // read only
	numShard int
	closed   bool
}

func ( *RingOptions) *ringShards {
	 := make(map[string]*ringShard, len(.Addrs))
	 := make([]*ringShard, 0, len())

	for ,  := range .Addrs {
		 := newRingShard(, , )
		[] = 

		 = append(, )
	}

	 := &ringShards{
		opt: ,

		shards: ,
		list:   ,
	}
	.rebalance()

	return 
}

func ( *ringShards) () []*ringShard {
	var  []*ringShard

	.mu.RLock()
	if !.closed {
		 = .list
	}
	.mu.RUnlock()

	return 
}

func ( *ringShards) ( string) string {
	 = hashtag.Key()

	var  string

	.mu.RLock()
	if .numShard > 0 {
		 = .hash.Get()
	}
	.mu.RUnlock()

	return 
}

func ( *ringShards) ( string) (*ringShard, error) {
	 = hashtag.Key()

	.mu.RLock()

	if .closed {
		.mu.RUnlock()
		return nil, pool.ErrClosed
	}

	if .numShard == 0 {
		.mu.RUnlock()
		return nil, errRingShardsDown
	}

	 := .hash.Get()
	if  == "" {
		.mu.RUnlock()
		return nil, errRingShardsDown
	}

	 := .shards[]
	.mu.RUnlock()

	return , nil
}

func ( *ringShards) ( string) (*ringShard, error) {
	if  == "" {
		return .Random()
	}

	.mu.RLock()
	 := .shards[]
	.mu.RUnlock()
	return , nil
}

func ( *ringShards) () (*ringShard, error) {
	return .GetByKey(strconv.Itoa(rand.Int()))
}
heartbeat monitors state of each shard in the ring.
func ( *ringShards) ( time.Duration) {
	 := time.NewTicker()
	defer .Stop()

	 := context.Background()
	for range .C {
		var  bool

		for ,  := range .List() {
			 := .Client.Ping().Err()
			 :=  == nil ||  == pool.ErrPoolTimeout
			if .Vote() {
				internal.Logger.Printf(context.Background(), "ring shard state changed: %s", )
				 = true
			}
		}

		if  {
			.rebalance()
		}
	}
}
rebalance removes dead shards from the Ring.
func ( *ringShards) () {
	.mu.RLock()
	 := .shards
	.mu.RUnlock()

	 := make([]string, 0, len())

	for ,  := range  {
		if .IsUp() {
			 = append(, )
		}
	}

	 := .opt.NewConsistentHash()

	.mu.Lock()
	.hash = 
	.numShard = len()
	.mu.Unlock()
}

func ( *ringShards) () int {
	.mu.RLock()
	 := .numShard
	.mu.RUnlock()
	return 
}

func ( *ringShards) () error {
	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return nil
	}
	.closed = true

	var  error
	for ,  := range .shards {
		if  := .Client.Close();  != nil &&  == nil {
			 = 
		}
	}
	.hash = nil
	.shards = nil
	.list = nil

	return 
}
------------------------------------------------------------------------------

type ring struct {
	opt           *RingOptions
	shards        *ringShards
	cmdsInfoCache *cmdsInfoCache //nolint:structcheck
}
Ring is a Redis client that uses consistent hashing to distribute keys across multiple Redis servers (shards). It's safe for concurrent use by multiple goroutines. Ring monitors the state of each shard and removes dead shards from the ring. When a shard comes online it is added back to the ring. This gives you maximum availability and partition tolerance, but no consistency between different shards or even clients. Each client uses shards that are available to the client and does not do any coordination when shard state is changed. Ring should be used when you need multiple Redis servers for caching and can tolerate losing data when one of the servers dies. Otherwise you should use Redis Cluster.
type Ring struct {
	*ring
	cmdable
	hooks
	ctx context.Context
}

func ( *RingOptions) *Ring {
	.init()

	 := Ring{
		ring: &ring{
			opt:    ,
			shards: newRingShards(),
		},
		ctx: context.Background(),
	}

	.cmdsInfoCache = newCmdsInfoCache(.cmdsInfo)
	.cmdable = .Process

	go .shards.Heartbeat(.HeartbeatFrequency)

	return &
}

func ( *Ring) () context.Context {
	return .ctx
}

func ( *Ring) ( context.Context) *Ring {
	if  == nil {
		panic("nil context")
	}
	 := *
	.cmdable = .Process
	.hooks.lock()
	.ctx = 
	return &
}
Do creates a Cmd from the args and processes the cmd.
func ( *Ring) ( context.Context,  ...interface{}) *Cmd {
	 := NewCmd(, ...)
	_ = .Process(, )
	return 
}

func ( *Ring) ( context.Context,  Cmder) error {
	return .hooks.process(, , .process)
}
Options returns read-only Options that were used to create the client.
func ( *Ring) () *RingOptions {
	return .opt
}

func ( *Ring) ( int) time.Duration {
	return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}
PoolStats returns accumulated connection pool stats.
func ( *Ring) () *PoolStats {
	 := .shards.List()
	var  PoolStats
	for ,  := range  {
		 := .Client.connPool.Stats()
		.Hits += .Hits
		.Misses += .Misses
		.Timeouts += .Timeouts
		.TotalConns += .TotalConns
		.IdleConns += .IdleConns
	}
	return &
}
Len returns the current number of shards in the ring.
func ( *Ring) () int {
	return .shards.Len()
}
Subscribe subscribes the client to the specified channels.
func ( *Ring) ( context.Context,  ...string) *PubSub {
	if len() == 0 {
		panic("at least one channel is required")
	}

	,  := .shards.GetByKey([0])
TODO: return PubSub with sticky error
		panic()
	}
	return .Client.Subscribe(, ...)
}
PSubscribe subscribes the client to the given patterns.
func ( *Ring) ( context.Context,  ...string) *PubSub {
	if len() == 0 {
		panic("at least one channel is required")
	}

	,  := .shards.GetByKey([0])
TODO: return PubSub with sticky error
		panic()
	}
	return .Client.PSubscribe(, ...)
}
ForEachShard concurrently calls the fn on each live shard in the ring. It returns the first error if any.
func ( *Ring) (
	 context.Context,
	 func( context.Context,  *Client) error,
) error {
	 := .shards.List()
	var  sync.WaitGroup
	 := make(chan error, 1)
	for ,  := range  {
		if .IsDown() {
			continue
		}

		.Add(1)
		go func( *ringShard) {
			defer .Done()
			 := (, .Client)
			if  != nil {
				select {
				case  <- :
				default:
				}
			}
		}()
	}
	.Wait()

	select {
	case  := <-:
		return 
	default:
		return nil
	}
}

func ( *Ring) () (map[string]*CommandInfo, error) {
	 := .shards.List()
	var  error
	for ,  := range  {
		,  := .Client.Command(context.TODO()).Result()
		if  == nil {
			return , nil
		}
		if  == nil {
			 = 
		}
	}
	if  == nil {
		return nil, errRingShardsDown
	}
	return nil, 
}

func ( *Ring) ( string) *CommandInfo {
	,  := .cmdsInfoCache.Get()
	if  != nil {
		return nil
	}
	 := []
	if  == nil {
		internal.Logger.Printf(.Context(), "info for cmd=%s not found", )
	}
	return 
}

func ( *Ring) ( Cmder) (*ringShard, error) {
	 := .cmdInfo(.Name())
	 := cmdFirstKeyPos(, )
	if  == 0 {
		return .shards.Random()
	}
	 := .stringArg()
	return .shards.GetByKey()
}

func ( *Ring) ( context.Context,  Cmder) error {
	 := ._process(, )
	if  != nil {
		.SetErr()
		return 
	}
	return nil
}

func ( *Ring) ( context.Context,  Cmder) error {
	var  error
	for  := 0;  <= .opt.MaxRetries; ++ {
		if  > 0 {
			if  := internal.Sleep(, .retryBackoff());  != nil {
				return 
			}
		}

		,  := .cmdShard()
		if  != nil {
			return 
		}

		 = .Client.Process(, )
		if  == nil || !shouldRetry(, .readTimeout() == nil) {
			return 
		}
	}
	return 
}

func ( *Ring) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .Pipeline().Pipelined(, )
}

func ( *Ring) () Pipeliner {
	 := Pipeline{
		ctx:  .ctx,
		exec: .processPipeline,
	}
	.init()
	return &
}

func ( *Ring) ( context.Context,  []Cmder) error {
	return .hooks.processPipeline(, , func( context.Context,  []Cmder) error {
		return .generalProcessPipeline(, , false)
	})
}

func ( *Ring) ( context.Context,  func(Pipeliner) error) ([]Cmder, error) {
	return .TxPipeline().Pipelined(, )
}

func ( *Ring) () Pipeliner {
	 := Pipeline{
		ctx:  .ctx,
		exec: .processTxPipeline,
	}
	.init()
	return &
}

func ( *Ring) ( context.Context,  []Cmder) error {
	return .hooks.processPipeline(, , func( context.Context,  []Cmder) error {
		return .generalProcessPipeline(, , true)
	})
}

func ( *Ring) (
	 context.Context,  []Cmder,  bool,
) error {
	 := make(map[string][]Cmder)
	for ,  := range  {
		 := .cmdInfo(.Name())
		 := .stringArg(cmdFirstKeyPos(, ))
		if  != "" {
			 = .shards.Hash()
		}
		[] = append([], )
	}

	var  sync.WaitGroup
	for ,  := range  {
		.Add(1)
		go func( string,  []Cmder) {
			defer .Done()

			_ = .processShardPipeline(, , , )
		}(, )
	}

	.Wait()
	return cmdsFirstErr()
}

func ( *Ring) (
	 context.Context,  string,  []Cmder,  bool,
TODO: retry?
	,  := .shards.GetByName()
	if  != nil {
		setCmdsErr(, )
		return 
	}

	if  {
		 = .Client.processTxPipeline(, )
	} else {
		 = .Client.processPipeline(, )
	}
	return 
}

func ( *Ring) ( context.Context,  func(*Tx) error,  ...string) error {
	if len() == 0 {
		return fmt.Errorf("redis: Watch requires at least one key")
	}

	var  []*ringShard
	for ,  := range  {
		if  != "" {
			,  := .shards.GetByKey(hashtag.Key())
			if  != nil {
				return 
			}

			 = append(, )
		}
	}

	if len() == 0 {
		return fmt.Errorf("redis: Watch requires at least one shard")
	}

	if len() > 1 {
		for ,  := range [1:] {
			if .Client != [0].Client {
				 := fmt.Errorf("redis: Watch requires all keys to be in the same shard")
				return 
			}
		}
	}

	return [0].Client.Watch(, , ...)
}
Close closes the ring client, releasing any open resources. It is rare to Close a Ring, as the Ring is meant to be long-lived and shared between many goroutines.
func ( *Ring) () error {
	return .shards.Close()