Source File
notify.go
Belonging Package
github.com/lib/pq
package pq
import (
)
Extra string
}
func ( *readBuf) *Notification {
:= .int32()
:= .string()
:= .string()
return &Notification{, , }
}
const (
connStateIdle int32 = iota
connStateExpectResponse
connStateExpectReadyForQuery
)
type message struct {
typ byte
err error
}
var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
senderLock sync.Mutex
notificationChan chan<- *Notification
replyChan chan message
}
func ( string, chan<- *Notification) (*ListenerConn, error) {
return newDialListenerConn(defaultDialer{}, , )
}
func ( Dialer, string, chan<- *Notification) (*ListenerConn, error) {
, := DialOpen(, )
if != nil {
return nil,
}
:= &ListenerConn{
cn: .(*conn),
notificationChan: ,
connState: connStateIdle,
replyChan: make(chan message, 2),
}
go .listenerConnMain()
return , nil
}
.senderLock.Lock()
.connectionLock.Lock()
:= .err
.connectionLock.Unlock()
if != nil {
.senderLock.Unlock()
return
}
return nil
}
func ( *ListenerConn) () {
.senderLock.Unlock()
}
func ( *ListenerConn) ( int32) bool {
var int32
switch {
case connStateIdle:
= connStateExpectReadyForQuery
case connStateExpectResponse:
= connStateIdle
case connStateExpectReadyForQuery:
= connStateExpectResponse
default:
panic(fmt.Sprintf("unexpected listenerConnState %d", ))
}
return atomic.CompareAndSwapInt32(&.connState, , )
}
func ( *ListenerConn) () ( error) {
defer errRecoverNoErrBadConn(&)
:= &readBuf{}
for {
, := .cn.recvMessage()
if != nil {
return
}
switch {
if !.setState(connStateExpectReadyForQuery) {
return parseError()
}
.replyChan <- message{, parseError()}
case 'C', 'I':
case 'Z':
func ( *ListenerConn) () {
:= .listenerConnLoop()
.connectionLock.Lock()
if .err == nil {
.err =
}
.cn.Close()
.connectionLock.Unlock()
}
func ( *ListenerConn) ( string) (bool, error) {
return .ExecSimpleQuery("LISTEN " + QuoteIdentifier())
}
func ( *ListenerConn) ( string) (bool, error) {
return .ExecSimpleQuery("UNLISTEN " + QuoteIdentifier())
}
func ( *ListenerConn) () (bool, error) {
return .ExecSimpleQuery("UNLISTEN *")
}
func ( *ListenerConn) () error {
, := .ExecSimpleQuery("")
if ! {
return
}
func ( *ListenerConn) ( string) ( error) {
defer errRecoverNoErrBadConn(&)
if !.setState(connStateExpectResponse) {
panic("two queries running at the same time")
}
func ( *ListenerConn) ( string) ( bool, error) {
if = .acquireSenderLock(); != nil {
return false,
}
defer .releaseSenderLock()
= .sendSimpleQuery()
for {
, := <-.replyChan
.connectionLock.Lock()
:= .err
.connectionLock.Unlock()
return false,
}
switch .typ {
return true,
func ( *ListenerConn) () error {
.connectionLock.Lock()
if .err != nil {
.connectionLock.Unlock()
return errListenerConnClosed
}
.err = errListenerConnClosed
func ( *ListenerConn) () error {
return .err
}
var errListenerClosed = errors.New("pq: Listener has been closed")
var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
var ErrChannelNotOpen = errors.New("pq: channel is not open")
type ListenerEventType int
type EventCallbackType func(event ListenerEventType, err error)
func ( string,
time.Duration,
time.Duration,
EventCallbackType) *Listener {
return NewDialListener(defaultDialer{}, , , , )
}
func ( Dialer,
string,
time.Duration,
time.Duration,
EventCallbackType) *Listener {
:= &Listener{
name: ,
minReconnectInterval: ,
maxReconnectInterval: ,
dialer: ,
eventCallback: ,
channels: make(map[string]struct{}),
Notify: make(chan *Notification, 32),
}
.reconnectCond = sync.NewCond(&.lock)
go .listenerMain()
return
}
func ( *Listener) () <-chan *Notification {
return .Notify
}
, := .channels[]
if {
return ErrChannelAlreadyOpen
}
if .isClosed {
return errListenerClosed
}
}
return nil
}
, := .channels[]
if ! {
return ErrChannelNotOpen
}
, := .cn.UnlistenAll()
if && != nil {
return
}
}
func ( *Listener) ( *ListenerConn, <-chan *Notification) error {
:= make(chan error)
go func( <-chan *Notification) {
func ( *Listener) () bool {
.lock.Lock()
defer .lock.Unlock()
return .isClosed
}
func ( *Listener) () error {
:= make(chan *Notification, 32)
, := newDialListenerConn(.dialer, .name, )
if != nil {
return
}
.lock.Lock()
defer .lock.Unlock()
= .resync(, )
if != nil {
.Close()
return
}
.cn =
.connNotificationChan =
.reconnectCond.Broadcast()
return nil
}
.reconnectCond.Broadcast()
return nil
}
func ( *Listener) ( ListenerEventType, error) {
if .eventCallback != nil {
.eventCallback(, )
}
}
func ( *Listener) () {
var time.Time
:= .minReconnectInterval
for {
for {
:= .connect()
if == nil {
break
}
if .closed() {
return
}
.emitEvent(ListenerEventConnectionAttemptFailed, )
time.Sleep()
*= 2
if > .maxReconnectInterval {
= .maxReconnectInterval
}
}
if .IsZero() {
.emitEvent(ListenerEventConnected, nil)
} else {
.emitEvent(ListenerEventReconnected, nil)
.Notify <- nil
}
= .minReconnectInterval
= time.Now().Add()
for {
, := <-.connNotificationChan
break
}
.Notify <-
}
:= .disconnectCleanup()
if .closed() {
return
}
.emitEvent(ListenerEventDisconnected, )
time.Sleep(time.Until())
}
}
func ( *Listener) () {
.listenerConnLoop()
close(.Notify)
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |