Source File
pubsub.go
Belonging Package
github.com/go-redis/redis/v8
package redis
import (
)
const (
pingTimeout = time.Second
chanSendTimeout = time.Minute
)
var errPingTimeout = errors.New("redis: ping timeout")
type PubSub struct {
opt *Options
newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
closeConn func(*pool.Conn) error
mu sync.Mutex
cn *pool.Conn
channels map[string]struct{}
patterns map[string]struct{}
closed bool
exit chan struct{}
cmd *Cmd
chOnce sync.Once
msgCh chan *Message
allCh chan interface{}
ping chan struct{}
}
func ( *PubSub) () string {
:= mapKeys(.channels)
= append(, mapKeys(.patterns)...)
return fmt.Sprintf("PubSub(%s)", strings.Join(, ", "))
}
func ( *PubSub) () {
.exit = make(chan struct{})
}
func ( *PubSub) ( context.Context) (*pool.Conn, error) {
.mu.Lock()
, := .conn(, nil)
.mu.Unlock()
return ,
}
func ( *PubSub) ( context.Context, []string) (*pool.Conn, error) {
if .closed {
return nil, pool.ErrClosed
}
if .cn != nil {
return .cn, nil
}
:= mapKeys(.channels)
= append(, ...)
, := .newConn(, )
if != nil {
return nil,
}
if := .resubscribe(, ); != nil {
_ = .closeConn()
return nil,
}
.cn =
return , nil
}
func ( *PubSub) ( context.Context, *pool.Conn, Cmder) error {
return .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
return writeCmd(, )
})
}
func ( *PubSub) ( context.Context, *pool.Conn) error {
var error
if len(.channels) > 0 {
= ._subscribe(, , "subscribe", mapKeys(.channels))
}
if len(.patterns) > 0 {
:= ._subscribe(, , "psubscribe", mapKeys(.patterns))
if != nil && == nil {
=
}
}
return
}
func ( map[string]struct{}) []string {
:= make([]string, len())
:= 0
for := range {
[] =
++
}
return
}
func ( *PubSub) (
context.Context, *pool.Conn, string, []string,
) error {
:= make([]interface{}, 0, 1+len())
= append(, )
for , := range {
= append(, )
}
:= NewSliceCmd(, ...)
return .writeCmd(, , )
}
func ( *PubSub) (
context.Context,
*pool.Conn,
error,
bool,
) {
.mu.Lock()
.releaseConn(, , , )
.mu.Unlock()
}
func ( *PubSub) ( context.Context, *pool.Conn, error, bool) {
if .cn != {
return
}
if isBadConn(, ) {
.reconnect(, )
}
}
func ( *PubSub) ( context.Context, error) {
_ = .closeTheCn()
_, _ = .conn(, nil)
}
func ( *PubSub) ( error) error {
if .cn == nil {
return nil
}
if !.closed {
internal.Logger.Printf(.getContext(), "redis: discarding bad PubSub connection: %s", )
}
:= .closeConn(.cn)
.cn = nil
return
}
func ( *PubSub) () error {
.mu.Lock()
defer .mu.Unlock()
if .closed {
return pool.ErrClosed
}
.closed = true
close(.exit)
return .closeTheCn(pool.ErrClosed)
}
func ( *PubSub) ( context.Context, ...string) error {
.mu.Lock()
defer .mu.Unlock()
for , := range {
delete(.patterns, )
}
:= .subscribe(, "punsubscribe", ...)
return
}
func ( *PubSub) ( context.Context, string, ...string) error {
, := .conn(, )
if != nil {
return
}
= ._subscribe(, , , )
.releaseConn(, , , false)
return
}
func ( *PubSub) ( context.Context, ...string) error {
:= []interface{}{"ping"}
if len() == 1 {
= append(, [0])
}
:= NewCmd(, ...)
, := .connWithLock()
if != nil {
return
}
= .writeCmd(, , )
.releaseConnWithLock(, , , false)
return
}
type Pong struct {
Payload string
}
func ( *Pong) () string {
if .Payload != "" {
return fmt.Sprintf("Pong<%s>", .Payload)
}
return "Pong"
}
func ( *PubSub) ( interface{}) (interface{}, error) {
switch reply := .(type) {
case string:
return &Pong{
Payload: ,
}, nil
case []interface{}:
switch := [0].(string); {
, := [1].(string)
return &Subscription{
Kind: ,
Channel: ,
Count: int([2].(int64)),
}, nil
case "message":
switch payload := [2].(type) {
case string:
return &Message{
Channel: [1].(string),
Payload: ,
}, nil
case []interface{}:
:= make([]string, len())
for , := range {
[] = .(string)
}
return &Message{
Channel: [1].(string),
PayloadSlice: ,
}, nil
default:
return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", )
}
case "pmessage":
return &Message{
Pattern: [1].(string),
Channel: [2].(string),
Payload: [3].(string),
}, nil
case "pong":
return &Pong{
Payload: [1].(string),
}, nil
default:
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", )
}
default:
return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", )
}
}
func ( *PubSub) ( context.Context, time.Duration) (interface{}, error) {
if .cmd == nil {
.cmd = NewCmd()
}
, := .connWithLock()
if != nil {
return nil,
}
= .WithReader(, , func( *proto.Reader) error {
return .cmd.readReply()
})
.releaseConnWithLock(, , , > 0)
if != nil {
return nil,
}
return .newMessage(.cmd.Val())
}
func ( *PubSub) ( context.Context) (interface{}, error) {
return .ReceiveTimeout(, 0)
}
func ( *PubSub) () <-chan *Message {
return .ChannelSize(100)
}
func ( *PubSub) ( int) <-chan *Message {
.chOnce.Do(func() {
.initPing()
.initMsgChan()
})
if .msgCh == nil {
:= fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
panic()
}
if cap(.msgCh) != {
:= fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
panic()
}
return .msgCh
}
func ( *PubSub) ( context.Context, int) <-chan interface{} {
.chOnce.Do(func() {
.initPing()
.initAllChan()
})
if .allCh == nil {
:= fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
panic()
}
if cap(.allCh) != {
:= fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
panic()
}
return .allCh
}
func ( *PubSub) () context.Context {
if .cmd != nil {
return .cmd.ctx
}
return context.Background()
}
func ( *PubSub) () {
:= context.TODO()
.ping = make(chan struct{}, 1)
go func() {
:= time.NewTimer(time.Minute)
.Stop()
:= true
for {
.Reset(pingTimeout)
select {
case <-.ping:
= true
if !.Stop() {
<-.C
}
case <-.C:
:= .Ping()
if {
= false
} else {
if == nil {
= errPingTimeout
}
.mu.Lock()
.reconnect(, )
= true
.mu.Unlock()
}
case <-.exit:
return
}
}
}()
}
case *Message:
.Reset(chanSendTimeout)
select {
case .msgCh <- :
if !.Stop() {
<-.C
}
case <-.C:
internal.Logger.Printf(
.getContext(),
"redis: %s channel is full for %s (message is dropped)",
,
chanSendTimeout,
)
}
default:
internal.Logger.Printf(.getContext(), "redis: unknown message type: %T", )
}
}
}()
}
select {
case .ping <- struct{}{}:
default:
}
switch msg := .(type) {
case *Subscription:
.sendMessage(, )
case *Message:
.sendMessage(, )
default:
internal.Logger.Printf(.getContext(), "redis: unknown message type: %T", )
}
}
}()
}
func ( *PubSub) ( interface{}, *time.Timer) {
.Reset(pingTimeout)
select {
case .allCh <- :
if !.Stop() {
<-.C
}
case <-.C:
internal.Logger.Printf(
.getContext(),
"redis: %s channel is full for %s (message is dropped)", , pingTimeout)
}
}
func ( *PubSub) ( int) time.Duration {
return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |