* * Copyright 2018 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http:www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *
Package channelz defines APIs for enabling channelz service, entry registration/deletion, and accessing channelz data. It also defines channelz metric struct formats. All APIs in this package are experimental.
package channelz

import (
	
	
	
	
	

	
)

const (
	defaultMaxTraceEntry int32 = 30
)

var (
	db    dbWrapper
EntryPerPage defines the number of channelz entries to be shown on a web page.
TurnOn turns on channelz data collection.
IsOn returns whether channelz data collection is on.
func () bool {
	return atomic.CompareAndSwapInt32(&curState, 1, 1)
}
SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel). Setting it to 0 will disable channel tracing.
ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
dbWarpper wraps around a reference to internal channelz data storage, and provide synchronized functionality to set and get the reference.
type dbWrapper struct {
	mu sync.RWMutex
	DB *channelMap
}

func ( *dbWrapper) ( *channelMap) {
	.mu.Lock()
	.DB = 
	.mu.Unlock()
}

func ( *dbWrapper) () *channelMap {
	.mu.RLock()
	defer .mu.RUnlock()
	return .DB
}
NewChannelzStorage initializes channelz data storage and id generator. This function returns a cleanup function to wait for all channelz state to be reset by the grpc goroutines when those entities get closed. By using this cleanup function, we make sure tests don't mess up each other, i.e. lingering goroutine from previous test doing entity removal happen to remove some entity just register by the new test, since the id space is the same. Note: This function is exported for testing purpose only. User should not call it in most cases.
func () ( func() error) {
	db.set(&channelMap{
		topLevelChannels: make(map[int64]struct{}),
		channels:         make(map[int64]*channel),
		listenSockets:    make(map[int64]*listenSocket),
		normalSockets:    make(map[int64]*normalSocket),
		servers:          make(map[int64]*server),
		subChannels:      make(map[int64]*subChannel),
	})
	idGen.reset()
	return func() error {
		var  error
		 := db.get()
		if  == nil {
			return nil
		}
		for  := 0;  < 1000; ++ {
			.mu.Lock()
			if len(.topLevelChannels) == 0 && len(.servers) == 0 && len(.channels) == 0 && len(.subChannels) == 0 && len(.listenSockets) == 0 && len(.normalSockets) == 0 {
all things stored in the channelz map have been cleared.
				return nil
			}
			.mu.Unlock()
			time.Sleep(10 * time.Millisecond)
		}

		.mu.Lock()
		 = fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(.topLevelChannels), len(.servers), len(.channels), len(.subChannels), len(.listenSockets), len(.normalSockets))
		.mu.Unlock()
		return 
	}
}
GetTopChannels returns a slice of top channel's ChannelMetric, along with a boolean indicating whether there's more top channels to be queried for. The arg id specifies that only top channel with id at or above it will be included in the result. The returned slice is up to a length of the arg maxResults or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func ( int64,  int64) ([]*ChannelMetric, bool) {
	return db.get().GetTopChannels(, )
}
GetServers returns a slice of server's ServerMetric, along with a boolean indicating whether there's more servers to be queried for. The arg id specifies that only server with id at or above it will be included in the result. The returned slice is up to a length of the arg maxResults or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func ( int64,  int64) ([]*ServerMetric, bool) {
	return db.get().GetServers(, )
}
GetServerSockets returns a slice of server's (identified by id) normal socket's SocketMetric, along with a boolean indicating whether there's more sockets to be queried for. The arg startID specifies that only sockets with id at or above it will be included in the result. The returned slice is up to a length of the arg maxResults or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func ( int64,  int64,  int64) ([]*SocketMetric, bool) {
	return db.get().GetServerSockets(, , )
}
GetChannel returns the ChannelMetric for the channel (identified by id).
func ( int64) *ChannelMetric {
	return db.get().GetChannel()
}
GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
GetSocket returns the SocketInternalMetric for the socket (identified by id).
func ( int64) *SocketMetric {
	return db.get().GetSocket()
}
GetServer returns the ServerMetric for the server (identified by id).
func ( int64) *ServerMetric {
	return db.get().GetServer()
}
RegisterChannel registers the given channel c in channelz database with ref as its reference name, and add it to the child list of its parent (identified by pid). pid = 0 means no parent. It returns the unique channelz tracking id assigned to this channel.
func ( Channel,  int64,  string) int64 {
	 := idGen.genID()
	 := &channel{
		refName:     ,
		c:           ,
		subChans:    make(map[int64]string),
		nestedChans: make(map[int64]string),
		id:          ,
		pid:         ,
		trace:       &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
	}
	if  == 0 {
		db.get().addChannel(, , true, , )
	} else {
		db.get().addChannel(, , false, , )
	}
	return 
}
RegisterSubChannel registers the given channel c in channelz database with ref as its reference name, and add it to the child list of its parent (identified by pid). It returns the unique channelz tracking id assigned to this subchannel.
func ( Channel,  int64,  string) int64 {
	if  == 0 {
		logger.Error("a SubChannel's parent id cannot be 0")
		return 0
	}
	 := idGen.genID()
	 := &subChannel{
		refName: ,
		c:       ,
		sockets: make(map[int64]string),
		id:      ,
		pid:     ,
		trace:   &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
	}
	db.get().addSubChannel(, , , )
	return 
}
RegisterServer registers the given server s in channelz database. It returns the unique channelz tracking id assigned to this server.
func ( Server,  string) int64 {
	 := idGen.genID()
	 := &server{
		refName:       ,
		s:             ,
		sockets:       make(map[int64]string),
		listenSockets: make(map[int64]string),
		id:            ,
	}
	db.get().addServer(, )
	return 
}
RegisterListenSocket registers the given listen socket s in channelz database with ref as its reference name, and add it to the child list of its parent (identified by pid). It returns the unique channelz tracking id assigned to this listen socket.
func ( Socket,  int64,  string) int64 {
	if  == 0 {
		logger.Error("a ListenSocket's parent id cannot be 0")
		return 0
	}
	 := idGen.genID()
	 := &listenSocket{refName: , s: , id: , pid: }
	db.get().addListenSocket(, , , )
	return 
}
RegisterNormalSocket registers the given normal socket s in channelz database with ref as its reference name, and add it to the child list of its parent (identified by pid). It returns the unique channelz tracking id assigned to this normal socket.
func ( Socket,  int64,  string) int64 {
	if  == 0 {
		logger.Error("a NormalSocket's parent id cannot be 0")
		return 0
	}
	 := idGen.genID()
	 := &normalSocket{refName: , s: , id: , pid: }
	db.get().addNormalSocket(, , , )
	return 
}
RemoveEntry removes an entry with unique channelz trakcing id to be id from channelz database.
func ( int64) {
	db.get().removeEntry()
}
TraceEventDesc is what the caller of AddTraceEvent should provide to describe the event to be added to the channel trace. The Parent field is optional. It is used for event that will be recorded in the entity's parent trace also.
AddTraceEvent adds trace related to the entity with specified id, using the provided TraceEventDesc.
func ( grpclog.DepthLoggerV2,  int64,  int,  *TraceEventDesc) {
	for  := ;  != nil;  = .Parent {
		switch .Severity {
		case CtUNKNOWN:
			.InfoDepth(+1, .Desc)
		case CtINFO:
			.InfoDepth(+1, .Desc)
		case CtWarning:
			.WarningDepth(+1, .Desc)
		case CtError:
			.ErrorDepth(+1, .Desc)
		}
	}
	if getMaxTraceEntry() == 0 {
		return
	}
	db.get().traceEvent(, )
}
channelMap is the storage data structure for channelz. Methods of channelMap can be divided in two two categories with respect to locking. 1. Methods acquire the global lock. 2. Methods that can only be called when global lock is held. A second type of method need always to be called inside a first type of method.
type channelMap struct {
	mu               sync.RWMutex
	topLevelChannels map[int64]struct{}
	servers          map[int64]*server
	channels         map[int64]*channel
	subChannels      map[int64]*subChannel
	listenSockets    map[int64]*listenSocket
	normalSockets    map[int64]*normalSocket
}

func ( *channelMap) ( int64,  *server) {
	.mu.Lock()
	.cm = 
	.servers[] = 
	.mu.Unlock()
}

func ( *channelMap) ( int64,  *channel,  bool,  int64,  string) {
	.mu.Lock()
	.cm = 
	.trace.cm = 
	.channels[] = 
	if  {
		.topLevelChannels[] = struct{}{}
	} else {
		.findEntry().addChild(, )
	}
	.mu.Unlock()
}

func ( *channelMap) ( int64,  *subChannel,  int64,  string) {
	.mu.Lock()
	.cm = 
	.trace.cm = 
	.subChannels[] = 
	.findEntry().addChild(, )
	.mu.Unlock()
}

func ( *channelMap) ( int64,  *listenSocket,  int64,  string) {
	.mu.Lock()
	.cm = 
	.listenSockets[] = 
	.findEntry().addChild(, )
	.mu.Unlock()
}

func ( *channelMap) ( int64,  *normalSocket,  int64,  string) {
	.mu.Lock()
	.cm = 
	.normalSockets[] = 
	.findEntry().addChild(, )
	.mu.Unlock()
}
removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to wait on the deletion of its children and until no other entity's channel trace references it. It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully shutting down server will lead to the server being also deleted.
func ( *channelMap) ( int64) {
	.mu.Lock()
	.findEntry().triggerDelete()
	.mu.Unlock()
}
c.mu must be held by the caller
func ( *channelMap) ( int64) {
	 := .findEntry()
	if ,  := .(tracedChannel);  {
		.decrTraceRefCount()
		.deleteSelfIfReady()
	}
}
c.mu must be held by the caller.
func ( *channelMap) ( int64) entry {
	var  entry
	var  bool
	if ,  = .channels[];  {
		return 
	}
	if ,  = .subChannels[];  {
		return 
	}
	if ,  = .servers[];  {
		return 
	}
	if ,  = .listenSockets[];  {
		return 
	}
	if ,  = .normalSockets[];  {
		return 
	}
	return &dummyEntry{idNotFound: }
}
c.mu must be held by the caller deleteEntry simply deletes an entry from the channelMap. Before calling this method, caller must check this entry is ready to be deleted, i.e removeEntry() has been called on it, and no children still exist. Conditionals are ordered by the expected frequency of deletion of each entity type, in order to optimize performance.
func ( *channelMap) ( int64) {
	var  bool
	if _,  = .normalSockets[];  {
		delete(.normalSockets, )
		return
	}
	if _,  = .subChannels[];  {
		delete(.subChannels, )
		return
	}
	if _,  = .channels[];  {
		delete(.channels, )
		delete(.topLevelChannels, )
		return
	}
	if _,  = .listenSockets[];  {
		delete(.listenSockets, )
		return
	}
	if _,  = .servers[];  {
		delete(.servers, )
		return
	}
}

func ( *channelMap) ( int64,  *TraceEventDesc) {
	.mu.Lock()
	 := .findEntry()
	,  := .(tracedChannel)
	if ! {
		.mu.Unlock()
		return
	}
	.getChannelTrace().append(&TraceEvent{Desc: .Desc, Severity: .Severity, Timestamp: time.Now()})
	if .Parent != nil {
		 := .findEntry(.getParentID())
		var  RefChannelType
		switch .(type) {
		case *channel:
			 = RefChannel
		case *subChannel:
			 = RefSubChannel
		}
		if ,  := .(tracedChannel);  {
			.getChannelTrace().append(&TraceEvent{
				Desc:      .Parent.Desc,
				Severity:  .Parent.Severity,
				Timestamp: time.Now(),
				RefID:     ,
				RefName:   .getRefName(),
				RefType:   ,
			})
			.incrTraceRefCount()
		}
	}
	.mu.Unlock()
}

type int64Slice []int64

func ( int64Slice) () int           { return len() }
func ( int64Slice) (,  int)      { [], [] = [], [] }
func ( int64Slice) (,  int) bool { return [] < [] }

func ( map[int64]string) map[int64]string {
	 := make(map[int64]string)
	for ,  := range  {
		[] = 
	}
	return 
}

func (,  int64) int64 {
	if  <  {
		return 
	}
	return 
}

func ( *channelMap) ( int64,  int64) ([]*ChannelMetric, bool) {
	if  <= 0 {
		 = EntryPerPage
	}
	.mu.RLock()
	 := int64(len(.topLevelChannels))
	 := make([]int64, 0, )
	 := make([]*channel, 0, min(, ))

	for  := range .topLevelChannels {
		 = append(, )
	}
	sort.Sort(int64Slice())
	 := sort.Search(len(), func( int) bool { return [] >=  })
	 := int64(0)
	var  bool
	var  []*ChannelMetric
	for ,  := range [:] {
		if  ==  {
			break
		}
		if ,  := .channels[];  {
			 = append(, )
			 = append(, &ChannelMetric{
				NestedChans: copyMap(.nestedChans),
				SubChans:    copyMap(.subChans),
			})
			++
		}
		if  == len([:])-1 {
			 = true
			break
		}
	}
	.mu.RUnlock()
	if  == 0 {
		 = true
	}

	for ,  := range  {
		[].ChannelData = .c.ChannelzMetric()
		[].ID = .id
		[].RefName = .refName
		[].Trace = .trace.dumpData()
	}
	return , 
}

func ( *channelMap) (,  int64) ([]*ServerMetric, bool) {
	if  <= 0 {
		 = EntryPerPage
	}
	.mu.RLock()
	 := int64(len(.servers))
	 := make([]int64, 0, )
	 := make([]*server, 0, min(, ))
	for  := range .servers {
		 = append(, )
	}
	sort.Sort(int64Slice())
	 := sort.Search(len(), func( int) bool { return [] >=  })
	 := int64(0)
	var  bool
	var  []*ServerMetric
	for ,  := range [:] {
		if  ==  {
			break
		}
		if ,  := .servers[];  {
			 = append(, )
			 = append(, &ServerMetric{
				ListenSockets: copyMap(.listenSockets),
			})
			++
		}
		if  == len([:])-1 {
			 = true
			break
		}
	}
	.mu.RUnlock()
	if  == 0 {
		 = true
	}

	for ,  := range  {
		[].ServerData = .s.ChannelzMetric()
		[].ID = .id
		[].RefName = .refName
	}
	return , 
}

func ( *channelMap) ( int64,  int64,  int64) ([]*SocketMetric, bool) {
	if  <= 0 {
		 = EntryPerPage
	}
	var  *server
	var  bool
	.mu.RLock()
server with id doesn't exist.
		.mu.RUnlock()
		return nil, true
	}
	 := .sockets
	 := int64(len())
	 := make([]int64, 0, )
	 := make([]*normalSocket, 0, min(, ))
	for  := range  {
		 = append(, )
	}
	sort.Sort(int64Slice())
	 := sort.Search(len(), func( int) bool { return [] >=  })
	 := int64(0)
	var  bool
	for ,  := range [:] {
		if  ==  {
			break
		}
		if ,  := .normalSockets[];  {
			 = append(, )
			++
		}
		if  == len([:])-1 {
			 = true
			break
		}
	}
	.mu.RUnlock()
	if  == 0 {
		 = true
	}
	var  []*SocketMetric
	for ,  := range  {
		 := &SocketMetric{}
		.SocketData = .s.ChannelzMetric()
		.ID = .id
		.RefName = .refName
		 = append(, )
	}
	return , 
}

func ( *channelMap) ( int64) *ChannelMetric {
	 := &ChannelMetric{}
	var  *channel
	var  bool
	.mu.RLock()
channel with id doesn't exist.
		.mu.RUnlock()
		return nil
	}
	.NestedChans = copyMap(.nestedChans)
cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when holding the lock to prevent potential data race.
	 := .c
	.mu.RUnlock()
	.ChannelData = .ChannelzMetric()
	.ID = .id
	.RefName = .refName
	.Trace = .trace.dumpData()
	return 
}

func ( *channelMap) ( int64) *SubChannelMetric {
	 := &SubChannelMetric{}
	var  *subChannel
	var  bool
	.mu.RLock()
subchannel with id doesn't exist.
		.mu.RUnlock()
		return nil
	}
sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when holding the lock to prevent potential data race.
	 := .c
	.mu.RUnlock()
	.ChannelData = .ChannelzMetric()
	.ID = .id
	.RefName = .refName
	.Trace = .trace.dumpData()
	return 
}

func ( *channelMap) ( int64) *SocketMetric {
	 := &SocketMetric{}
	.mu.RLock()
	if ,  := .listenSockets[];  {
		.mu.RUnlock()
		.SocketData = .s.ChannelzMetric()
		.ID = .id
		.RefName = .refName
		return 
	}
	if ,  := .normalSockets[];  {
		.mu.RUnlock()
		.SocketData = .s.ChannelzMetric()
		.ID = .id
		.RefName = .refName
		return 
	}
	.mu.RUnlock()
	return nil
}

func ( *channelMap) ( int64) *ServerMetric {
	 := &ServerMetric{}
	var  *server
	var  bool
	.mu.RLock()
	if ,  = .servers[]; ! {
		.mu.RUnlock()
		return nil
	}
	.ListenSockets = copyMap(.listenSockets)
	.mu.RUnlock()
	.ID = .id
	.RefName = .refName
	.ServerData = .s.ChannelzMetric()
	return 
}

type idGenerator struct {
	id int64
}

func ( *idGenerator) () {
	atomic.StoreInt64(&.id, 0)
}

func ( *idGenerator) () int64 {
	return atomic.AddInt64(&.id, 1)