package redis

import (
	
	
	
	
	
	

	
	
	
)

const (
	pingTimeout     = time.Second
	chanSendTimeout = time.Minute
)

var errPingTimeout = errors.New("redis: ping timeout")
PubSub implements Pub/Sub commands as described in http://redis.io/topics/pubsub. Message receiving is NOT safe for concurrent use by multiple goroutines. PubSub automatically reconnects to Redis Server and resubscribes to the channels in case of network errors.
type PubSub struct {
	opt *Options

	newConn   func(ctx context.Context, channels []string) (*pool.Conn, error)
	closeConn func(*pool.Conn) error

	mu       sync.Mutex
	cn       *pool.Conn
	channels map[string]struct{}
	patterns map[string]struct{}

	closed bool
	exit   chan struct{}

	cmd *Cmd

	chOnce sync.Once
	msgCh  chan *Message
	allCh  chan interface{}
	ping   chan struct{}
}

func ( *PubSub) () string {
	 := mapKeys(.channels)
	 = append(, mapKeys(.patterns)...)
	return fmt.Sprintf("PubSub(%s)", strings.Join(, ", "))
}

func ( *PubSub) () {
	.exit = make(chan struct{})
}

func ( *PubSub) ( context.Context) (*pool.Conn, error) {
	.mu.Lock()
	,  := .conn(, nil)
	.mu.Unlock()
	return , 
}

func ( *PubSub) ( context.Context,  []string) (*pool.Conn, error) {
	if .closed {
		return nil, pool.ErrClosed
	}
	if .cn != nil {
		return .cn, nil
	}

	 := mapKeys(.channels)
	 = append(, ...)

	,  := .newConn(, )
	if  != nil {
		return nil, 
	}

	if  := .resubscribe(, );  != nil {
		_ = .closeConn()
		return nil, 
	}

	.cn = 
	return , nil
}

func ( *PubSub) ( context.Context,  *pool.Conn,  Cmder) error {
	return .WithWriter(, .opt.WriteTimeout, func( *proto.Writer) error {
		return writeCmd(, )
	})
}

func ( *PubSub) ( context.Context,  *pool.Conn) error {
	var  error

	if len(.channels) > 0 {
		 = ._subscribe(, , "subscribe", mapKeys(.channels))
	}

	if len(.patterns) > 0 {
		 := ._subscribe(, , "psubscribe", mapKeys(.patterns))
		if  != nil &&  == nil {
			 = 
		}
	}

	return 
}

func ( map[string]struct{}) []string {
	 := make([]string, len())
	 := 0
	for  := range  {
		[] = 
		++
	}
	return 
}

func ( *PubSub) (
	 context.Context,  *pool.Conn,  string,  []string,
) error {
	 := make([]interface{}, 0, 1+len())
	 = append(, )
	for ,  := range  {
		 = append(, )
	}
	 := NewSliceCmd(, ...)
	return .writeCmd(, , )
}

func ( *PubSub) (
	 context.Context,
	 *pool.Conn,
	 error,
	 bool,
) {
	.mu.Lock()
	.releaseConn(, , , )
	.mu.Unlock()
}

func ( *PubSub) ( context.Context,  *pool.Conn,  error,  bool) {
	if .cn !=  {
		return
	}
	if isBadConn(, ) {
		.reconnect(, )
	}
}

func ( *PubSub) ( context.Context,  error) {
	_ = .closeTheCn()
	_, _ = .conn(, nil)
}

func ( *PubSub) ( error) error {
	if .cn == nil {
		return nil
	}
	if !.closed {
		internal.Logger.Printf(.getContext(), "redis: discarding bad PubSub connection: %s", )
	}
	 := .closeConn(.cn)
	.cn = nil
	return 
}

func ( *PubSub) () error {
	.mu.Lock()
	defer .mu.Unlock()

	if .closed {
		return pool.ErrClosed
	}
	.closed = true
	close(.exit)

	return .closeTheCn(pool.ErrClosed)
}
Subscribe the client to the specified channels. It returns empty subscription if there are no channels.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	 := .subscribe(, "subscribe", ...)
	if .channels == nil {
		.channels = make(map[string]struct{})
	}
	for ,  := range  {
		.channels[] = struct{}{}
	}
	return 
}
PSubscribe the client to the given patterns. It returns empty subscription if there are no patterns.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	 := .subscribe(, "psubscribe", ...)
	if .patterns == nil {
		.patterns = make(map[string]struct{})
	}
	for ,  := range  {
		.patterns[] = struct{}{}
	}
	return 
}
Unsubscribe the client from the given channels, or from all of them if none is given.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	for ,  := range  {
		delete(.channels, )
	}
	 := .subscribe(, "unsubscribe", ...)
	return 
}
PUnsubscribe the client from the given patterns, or from all of them if none is given.
func ( *PubSub) ( context.Context,  ...string) error {
	.mu.Lock()
	defer .mu.Unlock()

	for ,  := range  {
		delete(.patterns, )
	}
	 := .subscribe(, "punsubscribe", ...)
	return 
}

func ( *PubSub) ( context.Context,  string,  ...string) error {
	,  := .conn(, )
	if  != nil {
		return 
	}

	 = ._subscribe(, , , )
	.releaseConn(, , , false)
	return 
}

func ( *PubSub) ( context.Context,  ...string) error {
	 := []interface{}{"ping"}
	if len() == 1 {
		 = append(, [0])
	}
	 := NewCmd(, ...)

	,  := .connWithLock()
	if  != nil {
		return 
	}

	 = .writeCmd(, , )
	.releaseConnWithLock(, , , false)
	return 
}
Subscription received after a successful subscription to channel.
Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
Channel name we have subscribed to.
Number of channels we are currently subscribed to.
	Count int
}

func ( *Subscription) () string {
	return fmt.Sprintf("%s: %s", .Kind, .Channel)
}
Message received as result of a PUBLISH command issued by another client.
type Message struct {
	Channel      string
	Pattern      string
	Payload      string
	PayloadSlice []string
}

func ( *Message) () string {
	return fmt.Sprintf("Message<%s: %s>", .Channel, .Payload)
}
Pong received as result of a PING command issued by another client.
type Pong struct {
	Payload string
}

func ( *Pong) () string {
	if .Payload != "" {
		return fmt.Sprintf("Pong<%s>", .Payload)
	}
	return "Pong"
}

func ( *PubSub) ( interface{}) (interface{}, error) {
	switch reply := .(type) {
	case string:
		return &Pong{
			Payload: ,
		}, nil
	case []interface{}:
		switch  := [0].(string);  {
Can be nil in case of "unsubscribe".
			,  := [1].(string)
			return &Subscription{
				Kind:    ,
				Channel: ,
				Count:   int([2].(int64)),
			}, nil
		case "message":
			switch payload := [2].(type) {
			case string:
				return &Message{
					Channel: [1].(string),
					Payload: ,
				}, nil
			case []interface{}:
				 := make([]string, len())
				for ,  := range  {
					[] = .(string)
				}
				return &Message{
					Channel:      [1].(string),
					PayloadSlice: ,
				}, nil
			default:
				return nil, fmt.Errorf("redis: unsupported pubsub message payload: %T", )
			}
		case "pmessage":
			return &Message{
				Pattern: [1].(string),
				Channel: [2].(string),
				Payload: [3].(string),
			}, nil
		case "pong":
			return &Pong{
				Payload: [1].(string),
			}, nil
		default:
			return nil, fmt.Errorf("redis: unsupported pubsub message: %q", )
		}
	default:
		return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", )
	}
}
ReceiveTimeout acts like Receive but returns an error if message is not received in time. This is low-level API and in most cases Channel should be used instead.
func ( *PubSub) ( context.Context,  time.Duration) (interface{}, error) {
	if .cmd == nil {
		.cmd = NewCmd()
	}

	,  := .connWithLock()
	if  != nil {
		return nil, 
	}

	 = .WithReader(, , func( *proto.Reader) error {
		return .cmd.readReply()
	})

	.releaseConnWithLock(, , ,  > 0)
	if  != nil {
		return nil, 
	}

	return .newMessage(.cmd.Val())
}
Receive returns a message as a Subscription, Message, Pong or error. See PubSub example for details. This is low-level API and in most cases Channel should be used instead.
func ( *PubSub) ( context.Context) (interface{}, error) {
	return .ReceiveTimeout(, 0)
}
ReceiveMessage returns a Message or error ignoring Subscription and Pong messages. This is low-level API and in most cases Channel should be used instead.
func ( *PubSub) ( context.Context) (*Message, error) {
	for {
		,  := .Receive()
		if  != nil {
			return nil, 
		}

		switch msg := .(type) {
Ignore.
Ignore.
		case *Message:
			return , nil
		default:
			 := fmt.Errorf("redis: unknown message: %T", )
			return nil, 
		}
	}
}
Channel returns a Go channel for concurrently receiving messages. The channel is closed together with the PubSub. If the Go channel is blocked full for 30 seconds the message is dropped. Receive* APIs can not be used after channel is created. go-redis periodically sends ping messages to test connection health and re-subscribes if ping can not not received for 30 seconds.
func ( *PubSub) () <-chan *Message {
	return .ChannelSize(100)
}
ChannelSize is like Channel, but creates a Go channel with specified buffer size.
func ( *PubSub) ( int) <-chan *Message {
	.chOnce.Do(func() {
		.initPing()
		.initMsgChan()
	})
	if .msgCh == nil {
		 := fmt.Errorf("redis: Channel can't be called after ChannelWithSubscriptions")
		panic()
	}
	if cap(.msgCh) !=  {
		 := fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
		panic()
	}
	return .msgCh
}
ChannelWithSubscriptions is like Channel, but message type can be either *Subscription or *Message. Subscription messages can be used to detect reconnections. ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
func ( *PubSub) ( context.Context,  int) <-chan interface{} {
	.chOnce.Do(func() {
		.initPing()
		.initAllChan()
	})
	if .allCh == nil {
		 := fmt.Errorf("redis: ChannelWithSubscriptions can't be called after Channel")
		panic()
	}
	if cap(.allCh) !=  {
		 := fmt.Errorf("redis: PubSub.Channel size can not be changed once created")
		panic()
	}
	return .allCh
}

func ( *PubSub) () context.Context {
	if .cmd != nil {
		return .cmd.ctx
	}
	return context.Background()
}

func ( *PubSub) () {
	 := context.TODO()
	.ping = make(chan struct{}, 1)
	go func() {
		 := time.NewTimer(time.Minute)
		.Stop()

		 := true
		for {
			.Reset(pingTimeout)
			select {
			case <-.ping:
				 = true
				if !.Stop() {
					<-.C
				}
			case <-.C:
				 := .Ping()
				if  {
					 = false
				} else {
					if  == nil {
						 = errPingTimeout
					}
					.mu.Lock()
					.reconnect(, )
					 = true
					.mu.Unlock()
				}
			case <-.exit:
				return
			}
		}
	}()
}
initMsgChan must be in sync with initAllChan.
func ( *PubSub) ( int) {
	 := context.TODO()
	.msgCh = make(chan *Message, )
	go func() {
		 := time.NewTimer(time.Minute)
		.Stop()

		var  int
		for {
			,  := .Receive()
			if  != nil {
				if  == pool.ErrClosed {
					close(.msgCh)
					return
				}
				if  > 0 {
					time.Sleep(.retryBackoff())
				}
				++
				continue
			}

			 = 0
Any message is as good as a ping.
			select {
			case .ping <- struct{}{}:
			default:
			}

			switch msg := .(type) {
Ignore.
Ignore.
			case *Message:
				.Reset(chanSendTimeout)
				select {
				case .msgCh <- :
					if !.Stop() {
						<-.C
					}
				case <-.C:
					internal.Logger.Printf(
						.getContext(),
						"redis: %s channel is full for %s (message is dropped)",
						,
						chanSendTimeout,
					)
				}
			default:
				internal.Logger.Printf(.getContext(), "redis: unknown message type: %T", )
			}
		}
	}()
}
initAllChan must be in sync with initMsgChan.
func ( *PubSub) ( int) {
	 := context.TODO()
	.allCh = make(chan interface{}, )
	go func() {
		 := time.NewTimer(pingTimeout)
		.Stop()

		var  int
		for {
			,  := .Receive()
			if  != nil {
				if  == pool.ErrClosed {
					close(.allCh)
					return
				}
				if  > 0 {
					time.Sleep(.retryBackoff())
				}
				++
				continue
			}

			 = 0
Any message is as good as a ping.
			select {
			case .ping <- struct{}{}:
			default:
			}

			switch msg := .(type) {
			case *Subscription:
				.sendMessage(, )
Ignore.
			case *Message:
				.sendMessage(, )
			default:
				internal.Logger.Printf(.getContext(), "redis: unknown message type: %T", )
			}
		}
	}()
}

func ( *PubSub) ( interface{},  *time.Timer) {
	.Reset(pingTimeout)
	select {
	case .allCh <- :
		if !.Stop() {
			<-.C
		}
	case <-.C:
		internal.Logger.Printf(
			.getContext(),
			"redis: %s channel is full for %s (message is dropped)", , pingTimeout)
	}
}

func ( *PubSub) ( int) time.Duration {
	return internal.RetryBackoff(, .opt.MinRetryBackoff, .opt.MaxRetryBackoff)