Source File
sentinel.go
Belonging Package
github.com/go-redis/redis/v8
package redis
import (
)
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error
Username string
Password string
DB int
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
DialTimeout time.Duration
ReadTimeout time.Duration
WriteTimeout time.Duration
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
TLSConfig *tls.Config
}
func ( *FailoverOptions) () *Options {
return &Options{
Addr: "FailoverClient",
Dialer: .Dialer,
OnConnect: .OnConnect,
DB: .DB,
Username: .Username,
Password: .Password,
MaxRetries: .MaxRetries,
MinRetryBackoff: .MinRetryBackoff,
MaxRetryBackoff: .MaxRetryBackoff,
DialTimeout: .DialTimeout,
ReadTimeout: .ReadTimeout,
WriteTimeout: .WriteTimeout,
PoolSize: .PoolSize,
PoolTimeout: .PoolTimeout,
IdleTimeout: .IdleTimeout,
IdleCheckFrequency: .IdleCheckFrequency,
MinIdleConns: .MinIdleConns,
MaxConnAge: .MaxConnAge,
TLSConfig: .TLSConfig,
}
}
func ( *FailoverOptions) ( string) *Options {
return &Options{
Addr: ,
Dialer: .Dialer,
OnConnect: .OnConnect,
DB: 0,
Password: .SentinelPassword,
MaxRetries: .MaxRetries,
MinRetryBackoff: .MinRetryBackoff,
MaxRetryBackoff: .MaxRetryBackoff,
DialTimeout: .DialTimeout,
ReadTimeout: .ReadTimeout,
WriteTimeout: .WriteTimeout,
PoolSize: .PoolSize,
PoolTimeout: .PoolTimeout,
IdleTimeout: .IdleTimeout,
IdleCheckFrequency: .IdleCheckFrequency,
MinIdleConns: .MinIdleConns,
MaxConnAge: .MaxConnAge,
TLSConfig: .TLSConfig,
}
}
func ( *FailoverOptions) () *ClusterOptions {
return &ClusterOptions{
Dialer: .Dialer,
OnConnect: .OnConnect,
Username: .Username,
Password: .Password,
MaxRedirects: .MaxRetries,
RouteByLatency: .RouteByLatency,
RouteRandomly: .RouteRandomly,
MinRetryBackoff: .MinRetryBackoff,
MaxRetryBackoff: .MaxRetryBackoff,
DialTimeout: .DialTimeout,
ReadTimeout: .ReadTimeout,
WriteTimeout: .WriteTimeout,
PoolSize: .PoolSize,
PoolTimeout: .PoolTimeout,
IdleTimeout: .IdleTimeout,
IdleCheckFrequency: .IdleCheckFrequency,
MinIdleConns: .MinIdleConns,
MaxConnAge: .MaxConnAge,
TLSConfig: .TLSConfig,
}
}
func ( *FailoverOptions) *Client {
if .RouteByLatency {
panic("to route commands by latency, use NewFailoverClusterClient")
}
if .RouteRandomly {
panic("to route commands randomly, use NewFailoverClusterClient")
}
:= make([]string, len(.SentinelAddrs))
copy(, .SentinelAddrs)
:= &sentinelFailover{
opt: ,
sentinelAddrs: ,
}
:= .clientOptions()
.Dialer = masterSlaveDialer()
.init()
:= newConnPool()
.onFailover = func( context.Context, string) {
_ = .Filter(func( *pool.Conn) bool {
return .RemoteAddr().String() !=
})
}
:= Client{
baseClient: newBaseClient(, ),
ctx: context.Background(),
}
.cmdable = .Process
.onClose = .Close
return &
}
func (
*sentinelFailover,
) func( context.Context, , string) (net.Conn, error) {
return func( context.Context, , string) (net.Conn, error) {
var string
var error
if .opt.SlaveOnly {
, = .RandomSlaveAddr()
} else {
, = .MasterAddr()
if == nil {
.trySwitchMaster(, )
}
}
if != nil {
return nil,
}
if .opt.Dialer != nil {
return .opt.Dialer(, , )
}
return net.DialTimeout("tcp", , .opt.DialTimeout)
}
}
type SentinelClient struct {
*baseClient
ctx context.Context
}
func ( *Options) *SentinelClient {
.init()
:= &SentinelClient{
baseClient: &baseClient{
opt: ,
connPool: newConnPool(),
},
ctx: context.Background(),
}
return
}
func ( *SentinelClient) () context.Context {
return .ctx
}
func ( *SentinelClient) ( context.Context) *SentinelClient {
if == nil {
panic("nil context")
}
:= *
.ctx =
return &
}
func ( *SentinelClient) ( context.Context, Cmder) error {
return .baseClient.process(, )
}
func ( *SentinelClient) () *PubSub {
:= &PubSub{
opt: .opt,
newConn: func( context.Context, []string) (*pool.Conn, error) {
return .newConn()
},
closeConn: .connPool.CloseConn,
}
.init()
return
}
func ( *SentinelClient) ( context.Context) *StringCmd {
:= NewStringCmd(, "ping")
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, ...string) *PubSub {
:= .pubSub()
if len() > 0 {
_ = .PSubscribe(, ...)
}
return
}
func ( *SentinelClient) ( context.Context, string) *StringSliceCmd {
:= NewStringSliceCmd(, "sentinel", "get-master-addr-by-name", )
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, string) *SliceCmd {
:= NewSliceCmd(, "sentinel", "sentinels", )
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, string) *StatusCmd {
:= NewStatusCmd(, "sentinel", "failover", )
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context) *StatusCmd {
:= NewStatusCmd(, "sentinel", "flushconfig")
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, string) *StringStringMapCmd {
:= NewStringStringMapCmd(, "sentinel", "master", )
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context) *SliceCmd {
:= NewSliceCmd(, "sentinel", "masters")
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, string) *SliceCmd {
:= NewSliceCmd(, "sentinel", "slaves", )
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, string) *StringCmd {
:= NewStringCmd(, "sentinel", "ckquorum", )
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, , , , string) *StringCmd {
:= NewStringCmd(, "sentinel", "monitor", , , , )
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, , , string) *StringCmd {
:= NewStringCmd(, "sentinel", "set", , , )
_ = .Process(, )
return
}
func ( *SentinelClient) ( context.Context, string) *StringCmd {
:= NewStringCmd(, "sentinel", "remove", )
_ = .Process(, )
return
}
type sentinelFailover struct {
opt *FailoverOptions
sentinelAddrs []string
onFailover func(ctx context.Context, addr string)
onUpdate func(ctx context.Context)
mu sync.RWMutex
_masterAddr string
sentinel *SentinelClient
pubsub *PubSub
}
func ( *sentinelFailover) () error {
.mu.Lock()
defer .mu.Unlock()
if .sentinel != nil {
return .closeSentinel()
}
return nil
}
func ( *sentinelFailover) () error {
:= .pubsub.Close()
.pubsub = nil
:= .sentinel.Close()
if != nil && == nil {
=
}
.sentinel = nil
return
}
func ( *sentinelFailover) ( context.Context) (string, error) {
, := .slaveAddrs()
if != nil {
return "",
}
if len() == 0 {
return .MasterAddr()
}
return [rand.Intn(len())], nil
}
func ( *sentinelFailover) ( context.Context) (string, error) {
.mu.RLock()
:= .sentinel
.mu.RUnlock()
if != nil {
:= .getMasterAddr(, )
if != "" {
return , nil
}
}
.mu.Lock()
defer .mu.Unlock()
if .sentinel != nil {
:= .getMasterAddr(, .sentinel)
if != "" {
return , nil
}
_ = .closeSentinel()
}
for , := range .sentinelAddrs {
:= NewSentinelClient(.opt.sentinelOptions())
, := .GetMasterAddrByName(, .opt.MasterName).Result()
if != nil {
internal.Logger.Printf(, "sentinel: GetMasterAddrByName master=%q failed: %s",
.opt.MasterName, )
_ = .Close()
continue
}
.sentinelAddrs[0], .sentinelAddrs[] = .sentinelAddrs[], .sentinelAddrs[0]
.setSentinel(, )
:= net.JoinHostPort([0], [1])
return , nil
}
return "", errors.New("redis: all sentinels are unreachable")
}
func ( *sentinelFailover) ( context.Context) ([]string, error) {
.mu.RLock()
:= .sentinel
.mu.RUnlock()
if != nil {
:= .getSlaveAddrs(, )
if len() > 0 {
return , nil
}
}
.mu.Lock()
defer .mu.Unlock()
if .sentinel != nil {
:= .getSlaveAddrs(, .sentinel)
if len() > 0 {
return , nil
}
_ = .closeSentinel()
}
for , := range .sentinelAddrs {
:= NewSentinelClient(.opt.sentinelOptions())
, := .Slaves(, .opt.MasterName).Result()
if != nil {
internal.Logger.Printf(, "sentinel: Slaves master=%q failed: %s",
.opt.MasterName, )
_ = .Close()
continue
}
.sentinelAddrs[0], .sentinelAddrs[] = .sentinelAddrs[], .sentinelAddrs[0]
.setSentinel(, )
:= parseSlaveAddrs()
return , nil
}
return []string{}, errors.New("redis: all sentinels are unreachable")
}
func ( *sentinelFailover) ( context.Context, *SentinelClient) string {
, := .GetMasterAddrByName(, .opt.MasterName).Result()
if != nil {
internal.Logger.Printf(, "sentinel: GetMasterAddrByName name=%q failed: %s",
.opt.MasterName, )
return ""
}
return net.JoinHostPort([0], [1])
}
func ( *sentinelFailover) ( context.Context, *SentinelClient) []string {
, := .Slaves(, .opt.MasterName).Result()
if != nil {
internal.Logger.Printf(, "sentinel: Slaves name=%q failed: %s",
.opt.MasterName, )
return []string{}
}
return parseSlaveAddrs()
}
func ( []interface{}) []string {
:= make([]string, 0, len())
for , := range {
:= ""
:= ""
:= []string{}
:= ""
:= false
for , := range .([]interface{}) {
switch {
case "ip":
= .(string)
case "port":
= .(string)
case "flags":
= strings.Split(.(string), ",")
}
= .(string)
}
for , := range {
switch {
case "s_down", "o_down", "disconnected":
= true
}
}
if ! {
= append(, net.JoinHostPort(, ))
}
}
return
}
func ( *sentinelFailover) ( context.Context, string) {
.mu.RLock()
:= ._masterAddr
.mu.RUnlock()
if == {
return
}
.mu.Lock()
defer .mu.Unlock()
if == ._masterAddr {
return
}
._masterAddr =
internal.Logger.Printf(, "sentinel: new master=%q addr=%q",
.opt.MasterName, )
if .onFailover != nil {
.onFailover(, )
}
}
func ( *sentinelFailover) ( context.Context, *SentinelClient) {
if .sentinel != nil {
panic("not reached")
}
.sentinel =
.discoverSentinels()
.pubsub = .Subscribe(, "+switch-master", "+slave-reconf-done")
go .listen(.pubsub)
}
func ( *sentinelFailover) ( context.Context) {
, := .sentinel.Sentinels(, .opt.MasterName).Result()
if != nil {
internal.Logger.Printf(, "sentinel: Sentinels master=%q failed: %s", .opt.MasterName, )
return
}
for , := range {
:= .([]interface{})
for := 0; < len(); += 2 {
:= [].(string)
if == "name" {
:= [+1].(string)
if !contains(.sentinelAddrs, ) {
internal.Logger.Printf(, "sentinel: discovered new sentinel=%q for master=%q",
, .opt.MasterName)
.sentinelAddrs = append(.sentinelAddrs, )
}
}
}
}
}
func ( *sentinelFailover) ( *PubSub) {
:= context.TODO()
if .onUpdate != nil {
.onUpdate()
}
:= .Channel()
for := range {
if .Channel == "+switch-master" {
:= strings.Split(.Payload, " ")
if [0] != .opt.MasterName {
internal.Logger.Printf(.getContext(), "sentinel: ignore addr for master=%q", [0])
continue
}
:= net.JoinHostPort([3], [4])
.trySwitchMaster(.getContext(), )
}
if .onUpdate != nil {
.onUpdate()
}
}
}
func ( []string, string) bool {
for , := range {
if == {
return true
}
}
return false
}
func ( *FailoverOptions) *ClusterClient {
:= make([]string, len(.SentinelAddrs))
copy(, .SentinelAddrs)
:= &sentinelFailover{
opt: ,
sentinelAddrs: ,
}
:= .clusterOptions()
.ClusterSlots = func( context.Context) ([]ClusterSlot, error) {
, := .MasterAddr()
if != nil {
return nil,
}
:= []ClusterNode{{
Addr: ,
}}
, := .slaveAddrs()
if != nil {
return nil,
}
for , := range {
= append(, ClusterNode{
Addr: ,
})
}
:= []ClusterSlot{
{
Start: 0,
End: 16383,
Nodes: ,
},
}
return , nil
}
:= NewClusterClient()
.onUpdate = func( context.Context) {
.ReloadState()
}
return
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |