package pq
Package pq is a pure Go Postgres driver for the database/sql package. This module contains support for Postgres LISTEN/NOTIFY.

import (
	
	
	
	
	
)
Notification represents a single notification from the database.
Process ID (PID) of the notifying postgres backend.
Name of the channel the notification was sent on.
Payload, or the empty string if unspecified.
	Extra string
}

func ( *readBuf) *Notification {
	 := .int32()
	 := .string()
	 := .string()

	return &Notification{, , }
}

const (
	connStateIdle int32 = iota
	connStateExpectResponse
	connStateExpectReadyForQuery
)

type message struct {
	typ byte
	err error
}

var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
ListenerConn is a low-level interface for waiting for notifications. You should use Listener instead.
the sending goroutine will be holding this lock
NewListenerConn creates a new ListenerConn. Use NewListener instead.
func ( string,  chan<- *Notification) (*ListenerConn, error) {
	return newDialListenerConn(defaultDialer{}, , )
}

func ( Dialer,  string,  chan<- *Notification) (*ListenerConn, error) {
	,  := DialOpen(, )
	if  != nil {
		return nil, 
	}

	 := &ListenerConn{
		cn:               .(*conn),
		notificationChan: ,
		connState:        connStateIdle,
		replyChan:        make(chan message, 2),
	}

	go .listenerConnMain()

	return , nil
}
We can only allow one goroutine at a time to be running a query on the connection for various reasons, so the goroutine sending on the connection must be holding senderLock. Returns an error if an unrecoverable error has occurred and the ListenerConn should be abandoned.
we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
	.senderLock.Lock()

	.connectionLock.Lock()
	 := .err
	.connectionLock.Unlock()
	if  != nil {
		.senderLock.Unlock()
		return 
	}
	return nil
}

func ( *ListenerConn) () {
	.senderLock.Unlock()
}
setState advances the protocol state to newState. Returns false if moving to that state from the current state is not allowed.
func ( *ListenerConn) ( int32) bool {
	var  int32

	switch  {
	case connStateIdle:
		 = connStateExpectReadyForQuery
	case connStateExpectResponse:
		 = connStateIdle
	case connStateExpectReadyForQuery:
		 = connStateExpectResponse
	default:
		panic(fmt.Sprintf("unexpected listenerConnState %d", ))
	}

	return atomic.CompareAndSwapInt32(&.connState, , )
}
Main logic is here: receive messages from the postgres backend, forward notifications and query replies and keep the internal state in sync with the protocol state. Returns when the connection has been lost, is about to go away or should be discarded because we couldn't agree on the state with the server backend.
func ( *ListenerConn) () ( error) {
	defer errRecoverNoErrBadConn(&)

	 := &readBuf{}
	for {
		,  := .cn.recvMessage()
		if  != nil {
			return 
		}

		switch  {
recvNotification copies all the data so we don't need to worry about the scratch buffer being overwritten.
only used by tests; ignore

We might receive an ErrorResponse even when not in a query; it is expected that the server will close the connection after that, but we should make sure that the error we display is the one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
			if !.setState(connStateExpectReadyForQuery) {
				return parseError()
			}
			.replyChan <- message{, parseError()}

		case 'C', 'I':
protocol out of sync
				return fmt.Errorf("unexpected CommandComplete")
ExecSimpleQuery doesn't need to know about this message

		case 'Z':
protocol out of sync
				return fmt.Errorf("unexpected ReadyForQuery")
			}
			.replyChan <- message{, nil}

ignore
		default:
			return fmt.Errorf("unexpected message %q from server in listenerConnLoop", )
		}
	}
}
This is the main routine for the goroutine receiving on the database connection. Most of the main logic is in listenerConnLoop.
listenerConnLoop terminated; we're done, but we still have to clean up. Make sure nobody tries to start any new queries by making sure the err pointer is set. It is important that we do not overwrite its value; a connection could be closed by either this goroutine or one sending on the connection -- whoever closes the connection is assumed to have the more meaningful error message (as the other one will probably get net.errClosed), so that goroutine sets the error we expose while the other error is discarded. If the connection is lost while two goroutines are operating on the socket, it probably doesn't matter which error we expose so we don't try to do anything more complex.
	.connectionLock.Lock()
	if .err == nil {
		.err = 
	}
	.cn.Close()
	.connectionLock.Unlock()
There might be a query in-flight; make sure nobody's waiting for a response to it, since there's not going to be one.
let the listener know we're done
this ListenerConn is done
}
Listen sends a LISTEN query to the server. See ExecSimpleQuery.
func ( *ListenerConn) ( string) (bool, error) {
	return .ExecSimpleQuery("LISTEN " + QuoteIdentifier())
}
Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
func ( *ListenerConn) ( string) (bool, error) {
	return .ExecSimpleQuery("UNLISTEN " + QuoteIdentifier())
}
UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
func ( *ListenerConn) () (bool, error) {
	return .ExecSimpleQuery("UNLISTEN *")
}
Ping the remote server to make sure it's alive. Non-nil error means the connection has failed and should be abandoned.
func ( *ListenerConn) () error {
	,  := .ExecSimpleQuery("")
	if ! {
		return 
	}
shouldn't happen
		panic()
	}
	return nil
}
Attempt to send a query on the connection. Returns an error if sending the query failed, and the caller should initiate closure of this connection. The caller must be holding senderLock (see acquireSenderLock and releaseSenderLock).
must set connection state before sending the query
	if !.setState(connStateExpectResponse) {
		panic("two queries running at the same time")
	}
Can't use l.cn.writeBuf here because it uses the scratch buffer which might get overwritten by listenerConnLoop.
	 := &writeBuf{
		buf: []byte("Q\x00\x00\x00\x00"),
		pos: 1,
	}
	.string()
	.cn.send()

	return nil
}
ExecSimpleQuery executes a "simple query" (i.e. one with no bindable parameters) on the connection. The possible return values are: 1) "executed" is true; the query was executed to completion on the database server. If the query failed, err will be set to the error returned by the database, otherwise err will be nil. 2) If "executed" is false, the query could not be executed on the remote server. err will be non-nil. After a call to ExecSimpleQuery has returned an executed=false value, the connection has either been closed or will be closed shortly thereafter, and all subsequently executed queries will return an error.
func ( *ListenerConn) ( string) ( bool,  error) {
	if  = .acquireSenderLock();  != nil {
		return false, 
	}
	defer .releaseSenderLock()

	 = .sendSimpleQuery()
We can't know what state the protocol is in, so we need to abandon this connection.
Set the error pointer if it hasn't been set already; see listenerConnMain.
		if .err == nil {
			.err = 
		}
		.connectionLock.Unlock()
		.cn.c.Close()
		return false, 
	}
now we just wait for a reply..
	for {
		,  := <-.replyChan
We lost the connection to server, don't bother waiting for a a response. err should have been set already.
			.connectionLock.Lock()
			 := .err
			.connectionLock.Unlock()
			return false, 
		}
		switch .typ {
sanity check
			if .err != nil {
				panic("m.err != nil")
done; err might or might not be set
			return true, 

sanity check
			if .err == nil {
				panic("m.err == nil")
server responded with an error; ReadyForQuery to follow
			 = .err

		default:
			return false, fmt.Errorf("unknown response for simple query: %q", .typ)
		}
	}
}
Close closes the connection.
We can't send anything on the connection without holding senderLock. Simply close the net.Conn to wake up everyone operating on it.
	return .cn.c.Close()
}
Err returns the reason the connection was closed. It is not safe to call this function until l.Notify has been closed.
func ( *ListenerConn) () error {
	return .err
}

var errListenerClosed = errors.New("pq: Listener has been closed")
ErrChannelAlreadyOpen is returned from Listen when a channel is already open.
var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
ErrChannelNotOpen is returned from Unlisten when a channel is not open.
var ErrChannelNotOpen = errors.New("pq: channel is not open")
ListenerEventType is an enumeration of listener event types.
ListenerEventConnected is emitted only when the database connection has been initially initialized. The err argument of the callback will always be nil.
ListenerEventDisconnected is emitted after a database connection has been lost, either because of an error or because Close has been called. The err argument will be set to the reason the database connection was lost.
ListenerEventReconnected is emitted after a database connection has been re-established after connection loss. The err argument of the callback will always be nil. After this event has been emitted, a nil pq.Notification is sent on the Listener.Notify channel.
ListenerEventConnectionAttemptFailed is emitted after a connection to the database was attempted, but failed. The err argument will be set to an error describing why the connection attempt did not succeed.
EventCallbackType is the event callback type. See also ListenerEventType constants' documentation.
Listener provides an interface for listening to notifications from a PostgreSQL database. For general usage information, see section "Notifications". Listener can safely be used from concurrently running goroutines.
Channel for receiving notifications from the database. In some cases a nil value will be sent. See section "Notifications" above.
NewListener creates a new database connection dedicated to LISTEN / NOTIFY. name should be set to a connection string to be used to establish the database connection (see section "Connection String Parameters" above). minReconnectInterval controls the duration to wait before trying to re-establish the database connection after connection loss. After each consecutive failure this interval is doubled, until maxReconnectInterval is reached. Successfully completing the connection establishment procedure resets the interval back to minReconnectInterval. The last parameter eventCallback can be set to a function which will be called by the Listener when the state of the underlying database connection changes. This callback will be called by the goroutine which dispatches the notifications over the Notify channel, so you should try to avoid doing potentially time-consuming operations from the callback.
func ( string,
	 time.Duration,
	 time.Duration,
	 EventCallbackType) *Listener {
	return NewDialListener(defaultDialer{}, , , , )
}
NewDialListener is like NewListener but it takes a Dialer.
func ( Dialer,
	 string,
	 time.Duration,
	 time.Duration,
	 EventCallbackType) *Listener {

	 := &Listener{
		name:                 ,
		minReconnectInterval: ,
		maxReconnectInterval: ,
		dialer:               ,
		eventCallback:        ,

		channels: make(map[string]struct{}),

		Notify: make(chan *Notification, 32),
	}
	.reconnectCond = sync.NewCond(&.lock)

	go .listenerMain()

	return 
}
NotificationChannel returns the notification channel for this listener. This is the same channel as Notify, and will not be recreated during the life time of the Listener.
func ( *Listener) () <-chan *Notification {
	return .Notify
}
Listen starts listening for notifications on a channel. Calls to this function will block until an acknowledgement has been received from the server. Note that Listener automatically re-establishes the connection after connection loss, so this function may block indefinitely if the connection can not be re-established. Listen will only fail in three conditions: 1) The channel is already open. The returned error will be ErrChannelAlreadyOpen. 2) The query was executed on the remote server, but PostgreSQL returned an error message in response to the query. The returned error will be a pq.Error containing the information the server supplied. 3) Close is called on the Listener before the request could be completed. The channel name is case-sensitive.
func ( *Listener) ( string) error {
	.lock.Lock()
	defer .lock.Unlock()

	if .isClosed {
		return errListenerClosed
	}
The server allows you to issue a LISTEN on a channel which is already open, but it seems useful to be able to detect this case to spot for mistakes in application logic. If the application genuinely does't care, it can check the exported error and ignore it.
	,  := .channels[]
	if  {
		return ErrChannelAlreadyOpen
	}

If gotResponse is true but error is set, the query was executed on the remote server, but resulted in an error. This should be relatively rare, so it's fine if we just pass the error to our caller. However, if gotResponse is false, we could not complete the query on the remote server and our underlying connection is about to go away, so we only add relname to l.channels, and wait for resync() to take care of the rest.
		,  := .cn.Listen()
		if  &&  != nil {
			return 
		}
	}

	.channels[] = struct{}{}
	for .cn == nil {
we let go of the mutex for a while
		if .isClosed {
			return errListenerClosed
		}
	}

	return nil
}
Unlisten removes a channel from the Listener's channel list. Returns ErrChannelNotOpen if the Listener is not listening on the specified channel. Returns immediately with no error if there is no connection. Note that you might still get notifications for this channel even after Unlisten has returned. The channel name is case-sensitive.
func ( *Listener) ( string) error {
	.lock.Lock()
	defer .lock.Unlock()

	if .isClosed {
		return errListenerClosed
	}
Similarly to LISTEN, this is not an error in Postgres, but it seems useful to distinguish from the normal conditions.
	,  := .channels[]
	if ! {
		return ErrChannelNotOpen
	}

Similarly to Listen (see comment in that function), the caller should only be bothered with an error if it came from the backend as a response to our query.
		,  := .cn.Unlisten()
		if  &&  != nil {
			return 
		}
	}
Don't bother waiting for resync if there's no connection.
	delete(.channels, )
	return nil
}
UnlistenAll removes all channels from the Listener's channel list. Returns immediately with no error if there is no connection. Note that you might still get notifications for any of the deleted channels even after UnlistenAll has returned.
func ( *Listener) () error {
	.lock.Lock()
	defer .lock.Unlock()

	if .isClosed {
		return errListenerClosed
	}

Similarly to Listen (see comment in that function), the caller should only be bothered with an error if it came from the backend as a response to our query.
		,  := .cn.UnlistenAll()
		if  &&  != nil {
			return 
		}
	}
Don't bother waiting for resync if there's no connection.
	.channels = make(map[string]struct{})
	return nil
}
Ping the remote server to make sure it's alive. Non-nil return value means that there is no active connection.
func ( *Listener) () error {
	.lock.Lock()
	defer .lock.Unlock()

	if .isClosed {
		return errListenerClosed
	}
	if .cn == nil {
		return errors.New("no connection")
	}

	return .cn.Ping()
}
Clean up after losing the server connection. Returns l.cn.Err(), which should have the reason the connection was lost.
func ( *Listener) () error {
	.lock.Lock()
	defer .lock.Unlock()
sanity check; can't look at Err() until the channel has been closed
	select {
	case ,  := <-.connNotificationChan:
		if  {
			panic("connNotificationChan not closed")
		}
	default:
		panic("connNotificationChan not closed")
	}

	 := .cn.Err()
	.cn.Close()
	.cn = nil
	return 
}
Synchronize the list of channels we want to be listening on with the server after the connection has been established.
func ( *Listener) ( *ListenerConn,  <-chan *Notification) error {
	 := make(chan error)
	go func( <-chan *Notification) {
If we got a response, return that error to our caller as it's going to be more descriptive than cn.Err().
			,  := .Listen()
			if  &&  != nil {
				 <- 
				return
			}
If we couldn't reach the server, wait for notificationChan to close and then return the error message from the connection, as per ListenerConn's interface.
			if  != nil {
				for range  {
				}
				 <- .Err()
				return
			}
		}
		 <- nil
	}()
Ignore notifications while synchronization is going on to avoid deadlocks. We have to send a nil notification over Notify anyway as we can't possibly know which notifications (if any) were lost while the connection was down, so there's no reason to try and process these messages at all.
	for {
		select {
		case ,  := <-:
			if ! {
				 = nil
			}

		case  := <-:
			return 
		}
	}
}
caller should NOT be holding l.lock
func ( *Listener) () bool {
	.lock.Lock()
	defer .lock.Unlock()

	return .isClosed
}

func ( *Listener) () error {
	 := make(chan *Notification, 32)
	,  := newDialListenerConn(.dialer, .name, )
	if  != nil {
		return 
	}

	.lock.Lock()
	defer .lock.Unlock()

	 = .resync(, )
	if  != nil {
		.Close()
		return 
	}

	.cn = 
	.connNotificationChan = 
	.reconnectCond.Broadcast()

	return nil
}
Close disconnects the Listener from the database and shuts it down. Subsequent calls to its methods will return an error. Close returns an error if the connection has already been closed.
func ( *Listener) () error {
	.lock.Lock()
	defer .lock.Unlock()

	if .isClosed {
		return errListenerClosed
	}

	if .cn != nil {
		.cn.Close()
	}
	.isClosed = true
Unblock calls to Listen()
	.reconnectCond.Broadcast()

	return nil
}

func ( *Listener) ( ListenerEventType,  error) {
	if .eventCallback != nil {
		.eventCallback(, )
	}
}
Main logic here: maintain a connection to the server when possible, wait for notifications and emit events.
func ( *Listener) () {
	var  time.Time

	 := .minReconnectInterval
	for {
		for {
			 := .connect()
			if  == nil {
				break
			}

			if .closed() {
				return
			}
			.emitEvent(ListenerEventConnectionAttemptFailed, )

			time.Sleep()
			 *= 2
			if  > .maxReconnectInterval {
				 = .maxReconnectInterval
			}
		}

		if .IsZero() {
			.emitEvent(ListenerEventConnected, nil)
		} else {
			.emitEvent(ListenerEventReconnected, nil)
			.Notify <- nil
		}

		 = .minReconnectInterval
		 = time.Now().Add()

		for {
			,  := <-.connNotificationChan
lost connection, loop again
				break
			}
			.Notify <- 
		}

		 := .disconnectCleanup()
		if .closed() {
			return
		}
		.emitEvent(ListenerEventDisconnected, )

		time.Sleep(time.Until())
	}
}

func ( *Listener) () {
	.listenerConnLoop()
	close(.Notify)