Source File
ring.go
Belonging Package
github.com/go-redis/redis/v8
package redis
import (
)
var errRingShardsDown = errors.New("redis: all ring shards are down")
type ConsistentHash interface {
Get(string) string
}
type rendezvousWrapper struct {
*rendezvous.Rendezvous
}
func ( rendezvousWrapper) ( string) string {
return .Lookup()
}
func ( []string) ConsistentHash {
return rendezvousWrapper{rendezvous.New(, xxhash.Sum64String)}
}
NewConsistentHash func(shards []string) ConsistentHash
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error
Username string
Password string
DB int
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
TLSConfig *tls.Config
Limiter Limiter
}
func ( *RingOptions) () {
if .NewClient == nil {
.NewClient = func( string, *Options) *Client {
return NewClient()
}
}
if .HeartbeatFrequency == 0 {
.HeartbeatFrequency = 500 * time.Millisecond
}
if .NewConsistentHash == nil {
.NewConsistentHash = newRendezvous
}
if .MaxRetries == -1 {
.MaxRetries = 0
} else if .MaxRetries == 0 {
.MaxRetries = 3
}
switch .MinRetryBackoff {
case -1:
.MinRetryBackoff = 0
case 0:
.MinRetryBackoff = 8 * time.Millisecond
}
switch .MaxRetryBackoff {
case -1:
.MaxRetryBackoff = 0
case 0:
.MaxRetryBackoff = 512 * time.Millisecond
}
}
func ( *RingOptions) () *Options {
return &Options{
Dialer: .Dialer,
OnConnect: .OnConnect,
Username: .Username,
Password: .Password,
DB: .DB,
MaxRetries: -1,
DialTimeout: .DialTimeout,
ReadTimeout: .ReadTimeout,
WriteTimeout: .WriteTimeout,
PoolSize: .PoolSize,
MinIdleConns: .MinIdleConns,
MaxConnAge: .MaxConnAge,
PoolTimeout: .PoolTimeout,
IdleTimeout: .IdleTimeout,
IdleCheckFrequency: .IdleCheckFrequency,
TLSConfig: .TLSConfig,
Limiter: .Limiter,
}
}
type ringShard struct {
Client *Client
down int32
}
func ( *RingOptions, , string) *ringShard {
:= .clientOptions()
.Addr =
return &ringShard{
Client: .NewClient(, ),
}
}
func ( *ringShard) () string {
var string
if .IsUp() {
= "up"
} else {
= "down"
}
return fmt.Sprintf("%s is %s", .Client, )
}
func ( *ringShard) () bool {
const = 3
return atomic.LoadInt32(&.down) >=
}
func ( *ringShard) () bool {
return !.IsDown()
}
type ringShards struct {
opt *RingOptions
mu sync.RWMutex
hash ConsistentHash
shards map[string]*ringShard // read only
list []*ringShard // read only
numShard int
closed bool
}
func ( *RingOptions) *ringShards {
:= make(map[string]*ringShard, len(.Addrs))
:= make([]*ringShard, 0, len())
for , := range .Addrs {
:= newRingShard(, , )
[] =
= append(, )
}
:= &ringShards{
opt: ,
shards: ,
list: ,
}
.rebalance()
return
}
func ( *ringShards) () []*ringShard {
var []*ringShard
.mu.RLock()
if !.closed {
= .list
}
.mu.RUnlock()
return
}
func ( *ringShards) ( string) string {
= hashtag.Key()
var string
.mu.RLock()
if .numShard > 0 {
= .hash.Get()
}
.mu.RUnlock()
return
}
func ( *ringShards) ( string) (*ringShard, error) {
= hashtag.Key()
.mu.RLock()
if .closed {
.mu.RUnlock()
return nil, pool.ErrClosed
}
if .numShard == 0 {
.mu.RUnlock()
return nil, errRingShardsDown
}
:= .hash.Get()
if == "" {
.mu.RUnlock()
return nil, errRingShardsDown
}
:= .shards[]
.mu.RUnlock()
return , nil
}
func ( *ringShards) ( string) (*ringShard, error) {
if == "" {
return .Random()
}
.mu.RLock()
:= .shards[]
.mu.RUnlock()
return , nil
}
func ( *ringShards) () (*ringShard, error) {
return .GetByKey(strconv.Itoa(rand.Int()))
}
func ( *ringShards) ( time.Duration) {
:= time.NewTicker()
defer .Stop()
:= context.Background()
for range .C {
var bool
for , := range .List() {
:= .Client.Ping().Err()
:= == nil || == pool.ErrPoolTimeout
if .Vote() {
internal.Logger.Printf(context.Background(), "ring shard state changed: %s", )
= true
}
}
if {
.rebalance()
}
}
}
func ( *ringShards) () {
.mu.RLock()
:= .shards
.mu.RUnlock()
:= make([]string, 0, len())
for , := range {
if .IsUp() {
= append(, )
}
}
:= .opt.NewConsistentHash()
.mu.Lock()
.hash =
.numShard = len()
.mu.Unlock()
}
func ( *ringShards) () int {
.mu.RLock()
:= .numShard
.mu.RUnlock()
return
}
func ( *ringShards) () error {
.mu.Lock()
defer .mu.Unlock()
if .closed {
return nil
}
.closed = true
var error
for , := range .shards {
if := .Client.Close(); != nil && == nil {
=
}
}
.hash = nil
.shards = nil
.list = nil
return
}
type ring struct {
opt *RingOptions
shards *ringShards
cmdsInfoCache *cmdsInfoCache //nolint:structcheck
}
type Ring struct {
*ring
cmdable
hooks
ctx context.Context
}
func ( *RingOptions) *Ring {
.init()
:= Ring{
ring: &ring{
opt: ,
shards: newRingShards(),
},
ctx: context.Background(),
}
.cmdsInfoCache = newCmdsInfoCache(.cmdsInfo)
.cmdable = .Process
go .shards.Heartbeat(.HeartbeatFrequency)
return &
}
func ( *Ring) () context.Context {
return .ctx
}
func ( *Ring) ( context.Context) *Ring {
if == nil {
panic("nil context")
}
:= *
.cmdable = .Process
.hooks.lock()
.ctx =
return &
}
func ( *Ring) () *RingOptions {
return .opt
}
func ( *Ring) ( int) time.Duration {
return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
}
panic()
}
return .Client.PSubscribe(, ...)
}
func ( *Ring) (
context.Context,
func( context.Context, *Client) error,
) error {
:= .shards.List()
var sync.WaitGroup
:= make(chan error, 1)
for , := range {
if .IsDown() {
continue
}
.Add(1)
go func( *ringShard) {
defer .Done()
:= (, .Client)
if != nil {
select {
case <- :
default:
}
}
}()
}
.Wait()
select {
case := <-:
return
default:
return nil
}
}
func ( *Ring) () (map[string]*CommandInfo, error) {
:= .shards.List()
var error
for , := range {
, := .Client.Command(context.TODO()).Result()
if == nil {
return , nil
}
if == nil {
=
}
}
if == nil {
return nil, errRingShardsDown
}
return nil,
}
func ( *Ring) ( string) *CommandInfo {
, := .cmdsInfoCache.Get()
if != nil {
return nil
}
:= []
if == nil {
internal.Logger.Printf(.Context(), "info for cmd=%s not found", )
}
return
}
func ( *Ring) ( Cmder) (*ringShard, error) {
:= .cmdInfo(.Name())
:= cmdFirstKeyPos(, )
if == 0 {
return .shards.Random()
}
:= .stringArg()
return .shards.GetByKey()
}
func ( *Ring) ( context.Context, Cmder) error {
:= ._process(, )
if != nil {
.SetErr()
return
}
return nil
}
func ( *Ring) ( context.Context, Cmder) error {
var error
for := 0; <= .opt.MaxRetries; ++ {
if > 0 {
if := internal.Sleep(, .retryBackoff()); != nil {
return
}
}
, := .cmdShard()
if != nil {
return
}
= .Client.Process(, )
if == nil || !shouldRetry(, .readTimeout() == nil) {
return
}
}
return
}
func ( *Ring) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {
return .Pipeline().Pipelined(, )
}
func ( *Ring) () Pipeliner {
:= Pipeline{
ctx: .ctx,
exec: .processPipeline,
}
.init()
return &
}
func ( *Ring) ( context.Context, []Cmder) error {
return .hooks.processPipeline(, , func( context.Context, []Cmder) error {
return .generalProcessPipeline(, , false)
})
}
func ( *Ring) ( context.Context, func(Pipeliner) error) ([]Cmder, error) {
return .TxPipeline().Pipelined(, )
}
func ( *Ring) () Pipeliner {
:= Pipeline{
ctx: .ctx,
exec: .processTxPipeline,
}
.init()
return &
}
func ( *Ring) ( context.Context, []Cmder) error {
return .hooks.processPipeline(, , func( context.Context, []Cmder) error {
return .generalProcessPipeline(, , true)
})
}
func ( *Ring) (
context.Context, []Cmder, bool,
) error {
:= make(map[string][]Cmder)
for , := range {
:= .cmdInfo(.Name())
:= .stringArg(cmdFirstKeyPos(, ))
if != "" {
= .shards.Hash()
}
[] = append([], )
}
var sync.WaitGroup
for , := range {
.Add(1)
go func( string, []Cmder) {
defer .Done()
_ = .processShardPipeline(, , , )
}(, )
}
.Wait()
return cmdsFirstErr()
}
func ( *Ring) (
context.Context, string, []Cmder, bool,
, := .shards.GetByName()
if != nil {
setCmdsErr(, )
return
}
if {
= .Client.processTxPipeline(, )
} else {
= .Client.processPipeline(, )
}
return
}
func ( *Ring) ( context.Context, func(*Tx) error, ...string) error {
if len() == 0 {
return fmt.Errorf("redis: Watch requires at least one key")
}
var []*ringShard
for , := range {
if != "" {
, := .shards.GetByKey(hashtag.Key())
if != nil {
return
}
= append(, )
}
}
if len() == 0 {
return fmt.Errorf("redis: Watch requires at least one shard")
}
if len() > 1 {
for , := range [1:] {
if .Client != [0].Client {
:= fmt.Errorf("redis: Watch requires all keys to be in the same shard")
return
}
}
}
return [0].Client.Watch(, , ...)
}
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |