Source File
funcs.go
Belonging Package
google.golang.org/grpc/internal/channelz
package channelz
import (
)
const (
defaultMaxTraceEntry int32 = 30
)
var (
db dbWrapper
func () {
if !IsOn() {
NewChannelzStorage()
atomic.StoreInt32(&curState, 1)
}
}
func () bool {
return atomic.CompareAndSwapInt32(&curState, 1, 1)
}
func ( int32) {
atomic.StoreInt32(&maxTraceEntry, )
}
func () {
atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
}
func () int {
:= atomic.LoadInt32(&maxTraceEntry)
return int()
}
func () ( func() error) {
db.set(&channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
listenSockets: make(map[int64]*listenSocket),
normalSockets: make(map[int64]*normalSocket),
servers: make(map[int64]*server),
subChannels: make(map[int64]*subChannel),
})
idGen.reset()
return func() error {
var error
:= db.get()
if == nil {
return nil
}
for := 0; < 1000; ++ {
.mu.Lock()
if len(.topLevelChannels) == 0 && len(.servers) == 0 && len(.channels) == 0 && len(.subChannels) == 0 && len(.listenSockets) == 0 && len(.normalSockets) == 0 {
return nil
}
.mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
.mu.Lock()
= fmt.Errorf("after 10s the channelz map has not been cleaned up yet, topchannels: %d, servers: %d, channels: %d, subchannels: %d, listen sockets: %d, normal sockets: %d", len(.topLevelChannels), len(.servers), len(.channels), len(.subChannels), len(.listenSockets), len(.normalSockets))
.mu.Unlock()
return
}
}
func ( int64, int64) ([]*ChannelMetric, bool) {
return db.get().GetTopChannels(, )
}
func ( int64, int64) ([]*ServerMetric, bool) {
return db.get().GetServers(, )
}
func ( int64, int64, int64) ([]*SocketMetric, bool) {
return db.get().GetServerSockets(, , )
}
func ( int64) *ChannelMetric {
return db.get().GetChannel()
}
func ( int64) *SubChannelMetric {
return db.get().GetSubChannel()
}
func ( int64) *SocketMetric {
return db.get().GetSocket()
}
func ( int64) *ServerMetric {
return db.get().GetServer()
}
func ( Channel, int64, string) int64 {
:= idGen.genID()
:= &channel{
refName: ,
c: ,
subChans: make(map[int64]string),
nestedChans: make(map[int64]string),
id: ,
pid: ,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
if == 0 {
db.get().addChannel(, , true, , )
} else {
db.get().addChannel(, , false, , )
}
return
}
func ( Channel, int64, string) int64 {
if == 0 {
logger.Error("a SubChannel's parent id cannot be 0")
return 0
}
:= idGen.genID()
:= &subChannel{
refName: ,
c: ,
sockets: make(map[int64]string),
id: ,
pid: ,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
}
db.get().addSubChannel(, , , )
return
}
func ( int64) {
db.get().removeEntry()
}
type TraceEventDesc struct {
Desc string
Severity Severity
Parent *TraceEventDesc
}
func ( grpclog.DepthLoggerV2, int64, int, *TraceEventDesc) {
for := ; != nil; = .Parent {
switch .Severity {
case CtUNKNOWN:
.InfoDepth(+1, .Desc)
case CtINFO:
.InfoDepth(+1, .Desc)
case CtWarning:
.WarningDepth(+1, .Desc)
case CtError:
.ErrorDepth(+1, .Desc)
}
}
if getMaxTraceEntry() == 0 {
return
}
db.get().traceEvent(, )
}
type channelMap struct {
mu sync.RWMutex
topLevelChannels map[int64]struct{}
servers map[int64]*server
channels map[int64]*channel
subChannels map[int64]*subChannel
listenSockets map[int64]*listenSocket
normalSockets map[int64]*normalSocket
}
func ( *channelMap) ( int64, *server) {
.mu.Lock()
.cm =
.servers[] =
.mu.Unlock()
}
func ( *channelMap) ( int64, *channel, bool, int64, string) {
.mu.Lock()
.cm =
.trace.cm =
.channels[] =
if {
.topLevelChannels[] = struct{}{}
} else {
.findEntry().addChild(, )
}
.mu.Unlock()
}
func ( *channelMap) ( int64, *subChannel, int64, string) {
.mu.Lock()
.cm =
.trace.cm =
.subChannels[] =
.findEntry().addChild(, )
.mu.Unlock()
}
func ( *channelMap) ( int64, *listenSocket, int64, string) {
.mu.Lock()
.cm =
.listenSockets[] =
.findEntry().addChild(, )
.mu.Unlock()
}
func ( *channelMap) ( int64, *normalSocket, int64, string) {
.mu.Lock()
.cm =
.normalSockets[] =
.findEntry().addChild(, )
.mu.Unlock()
}
func ( *channelMap) ( int64) {
.mu.Lock()
.findEntry().triggerDelete()
.mu.Unlock()
}
func ( *channelMap) ( int64) {
:= .findEntry()
if , := .(tracedChannel); {
.decrTraceRefCount()
.deleteSelfIfReady()
}
}
func ( *channelMap) ( int64) entry {
var entry
var bool
if , = .channels[]; {
return
}
if , = .subChannels[]; {
return
}
if , = .servers[]; {
return
}
if , = .listenSockets[]; {
return
}
if , = .normalSockets[]; {
return
}
return &dummyEntry{idNotFound: }
}
func ( *channelMap) ( int64) {
var bool
if _, = .normalSockets[]; {
delete(.normalSockets, )
return
}
if _, = .subChannels[]; {
delete(.subChannels, )
return
}
if _, = .channels[]; {
delete(.channels, )
delete(.topLevelChannels, )
return
}
if _, = .listenSockets[]; {
delete(.listenSockets, )
return
}
if _, = .servers[]; {
delete(.servers, )
return
}
}
func ( *channelMap) ( int64, *TraceEventDesc) {
.mu.Lock()
:= .findEntry()
, := .(tracedChannel)
if ! {
.mu.Unlock()
return
}
.getChannelTrace().append(&TraceEvent{Desc: .Desc, Severity: .Severity, Timestamp: time.Now()})
if .Parent != nil {
:= .findEntry(.getParentID())
var RefChannelType
switch .(type) {
case *channel:
= RefChannel
case *subChannel:
= RefSubChannel
}
if , := .(tracedChannel); {
.getChannelTrace().append(&TraceEvent{
Desc: .Parent.Desc,
Severity: .Parent.Severity,
Timestamp: time.Now(),
RefID: ,
RefName: .getRefName(),
RefType: ,
})
.incrTraceRefCount()
}
}
.mu.Unlock()
}
type int64Slice []int64
func ( int64Slice) () int { return len() }
func ( int64Slice) (, int) { [], [] = [], [] }
func ( int64Slice) (, int) bool { return [] < [] }
func ( map[int64]string) map[int64]string {
:= make(map[int64]string)
for , := range {
[] =
}
return
}
func (, int64) int64 {
if < {
return
}
return
}
func ( *channelMap) ( int64, int64) ([]*ChannelMetric, bool) {
if <= 0 {
= EntryPerPage
}
.mu.RLock()
:= int64(len(.topLevelChannels))
:= make([]int64, 0, )
:= make([]*channel, 0, min(, ))
for := range .topLevelChannels {
= append(, )
}
sort.Sort(int64Slice())
:= sort.Search(len(), func( int) bool { return [] >= })
:= int64(0)
var bool
var []*ChannelMetric
for , := range [:] {
if == {
break
}
if , := .channels[]; {
= append(, )
= append(, &ChannelMetric{
NestedChans: copyMap(.nestedChans),
SubChans: copyMap(.subChans),
})
++
}
if == len([:])-1 {
= true
break
}
}
.mu.RUnlock()
if == 0 {
= true
}
for , := range {
[].ChannelData = .c.ChannelzMetric()
[].ID = .id
[].RefName = .refName
[].Trace = .trace.dumpData()
}
return ,
}
func ( *channelMap) (, int64) ([]*ServerMetric, bool) {
if <= 0 {
= EntryPerPage
}
.mu.RLock()
:= int64(len(.servers))
:= make([]int64, 0, )
:= make([]*server, 0, min(, ))
for := range .servers {
= append(, )
}
sort.Sort(int64Slice())
:= sort.Search(len(), func( int) bool { return [] >= })
:= int64(0)
var bool
var []*ServerMetric
for , := range [:] {
if == {
break
}
if , := .servers[]; {
= append(, )
= append(, &ServerMetric{
ListenSockets: copyMap(.listenSockets),
})
++
}
if == len([:])-1 {
= true
break
}
}
.mu.RUnlock()
if == 0 {
= true
}
for , := range {
[].ServerData = .s.ChannelzMetric()
[].ID = .id
[].RefName = .refName
}
return ,
}
func ( *channelMap) ( int64, int64, int64) ([]*SocketMetric, bool) {
if <= 0 {
= EntryPerPage
}
var *server
var bool
.mu.RLock()
.mu.RUnlock()
return nil, true
}
:= .sockets
:= int64(len())
:= make([]int64, 0, )
:= make([]*normalSocket, 0, min(, ))
for := range {
= append(, )
}
sort.Sort(int64Slice())
:= sort.Search(len(), func( int) bool { return [] >= })
:= int64(0)
var bool
for , := range [:] {
if == {
break
}
if , := .normalSockets[]; {
= append(, )
++
}
if == len([:])-1 {
= true
break
}
}
.mu.RUnlock()
if == 0 {
= true
}
var []*SocketMetric
for , := range {
:= &SocketMetric{}
.SocketData = .s.ChannelzMetric()
.ID = .id
.RefName = .refName
= append(, )
}
return ,
}
func ( *channelMap) ( int64) *ChannelMetric {
:= &ChannelMetric{}
var *channel
var bool
.mu.RLock()
.mu.RUnlock()
return nil
}
.NestedChans = copyMap(.nestedChans)
:= .c
.mu.RUnlock()
.ChannelData = .ChannelzMetric()
.ID = .id
.RefName = .refName
.Trace = .trace.dumpData()
return
}
func ( *channelMap) ( int64) *SubChannelMetric {
:= &SubChannelMetric{}
var *subChannel
var bool
.mu.RLock()
:= .c
.mu.RUnlock()
.ChannelData = .ChannelzMetric()
.ID = .id
.RefName = .refName
.Trace = .trace.dumpData()
return
}
func ( *channelMap) ( int64) *SocketMetric {
:= &SocketMetric{}
.mu.RLock()
if , := .listenSockets[]; {
.mu.RUnlock()
.SocketData = .s.ChannelzMetric()
.ID = .id
.RefName = .refName
return
}
if , := .normalSockets[]; {
.mu.RUnlock()
.SocketData = .s.ChannelzMetric()
.ID = .id
.RefName = .refName
return
}
.mu.RUnlock()
return nil
}
func ( *channelMap) ( int64) *ServerMetric {
:= &ServerMetric{}
var *server
var bool
.mu.RLock()
if , = .servers[]; ! {
.mu.RUnlock()
return nil
}
.ListenSockets = copyMap(.listenSockets)
.mu.RUnlock()
.ID = .id
.RefName = .refName
.ServerData = .s.ChannelzMetric()
return
}
type idGenerator struct {
id int64
}
func ( *idGenerator) () {
atomic.StoreInt64(&.id, 0)
}
func ( *idGenerator) () int64 {
return atomic.AddInt64(&.id, 1)
![]() |
The pages are generated with Golds v0.3.2-preview. (GOOS=darwin GOARCH=amd64) Golds is a Go 101 project developed by Tapir Liu. PR and bug reports are welcome and can be submitted to the issue list. Please follow @Go100and1 (reachable from the left QR code) to get the latest news of Golds. |