Skip to content

Commit

Permalink
benchmarks for metric increments
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Oct 19, 2023
1 parent 5029022 commit f95a835
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 55 deletions.
6 changes: 0 additions & 6 deletions _examples/chat_json/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,6 @@ func main() {
LogLevel: centrifuge.LogLevelInfo,
LogHandler: handleLog,
HistoryMetaTTL: 24 * time.Hour,

GetChannelNamespaceLabel: func(channel string) string {
return channel
},
ChannelNamespaceLabelForMessagesSent: true,
ChannelNamespaceLabelForMessagesReceived: true,
})

node.OnConnecting(func(ctx context.Context, e centrifuge.ConnectEvent) (centrifuge.ConnectReply, error) {
Expand Down
65 changes: 16 additions & 49 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ var (
)

func (m *metrics) observeCommandDuration(method commandMethodType, d time.Duration) {
registryMu.RLock()
defer registryMu.RUnlock()

var observer prometheus.Observer

switch method {
Expand Down Expand Up @@ -217,79 +214,49 @@ type transportMessageLabels struct {
}

var (
transportMessagesSentCache map[transportMessageLabels]prometheus.Counter
transportMessagesSentSizeCache map[transportMessageLabels]prometheus.Counter
messagesSentCacheMu sync.RWMutex
transportMessagesSentCache sync.Map
transportMessagesSentSizeCache sync.Map

transportMessagesReceivedCache map[transportMessageLabels]prometheus.Counter
transportMessagesReceivedSizeCache map[transportMessageLabels]prometheus.Counter
messagesReceivedCacheMu sync.RWMutex
transportMessagesReceivedCache sync.Map
transportMessagesReceivedSizeCache sync.Map
)

func init() {
transportMessagesSentCache = make(map[transportMessageLabels]prometheus.Counter)
transportMessagesSentSizeCache = make(map[transportMessageLabels]prometheus.Counter)
transportMessagesReceivedCache = make(map[transportMessageLabels]prometheus.Counter)
transportMessagesReceivedSizeCache = make(map[transportMessageLabels]prometheus.Counter)
}

func (m *metrics) incTransportMessagesSent(transport string, channelGroup string, size int) {
labels := transportMessageLabels{
Transport: transport,
ChannelGroup: channelGroup,
}

messagesSentCacheMu.RLock()
counterSent, okSent := transportMessagesSentCache[labels]
counterSentSize, okSentSize := transportMessagesSentSizeCache[labels]
messagesSentCacheMu.RUnlock()

counterSent, okSent := transportMessagesSentCache.Load(labels)
if !okSent {
counterSent = m.transportMessagesSent.WithLabelValues(transport, channelGroup)
messagesSentCacheMu.Lock()
transportMessagesSentCache[labels] = counterSent
messagesSentCacheMu.Unlock()
transportMessagesSentCache.Store(labels, counterSent)
}

counterSentSize, okSentSize := transportMessagesSentSizeCache.Load(labels)
if !okSentSize {
counterSentSize = m.transportMessagesSentSize.WithLabelValues(transport, channelGroup)
messagesSentCacheMu.Lock()
transportMessagesSentSizeCache[labels] = counterSentSize
messagesSentCacheMu.Unlock()
transportMessagesSentSizeCache.Store(labels, counterSentSize)
}
counterSent.Inc()
counterSentSize.Add(float64(size))
counterSent.(prometheus.Counter).Inc()
counterSentSize.(prometheus.Counter).Add(float64(size))
}

func (m *metrics) incTransportMessagesReceived(transport string, channelGroup string, size int) {
registryMu.RLock()
defer registryMu.RUnlock()

labels := transportMessageLabels{
Transport: transport,
ChannelGroup: channelGroup,
}

messagesReceivedCacheMu.RLock()
counterReceived, okReceived := transportMessagesReceivedCache[labels]
counterReceivedSize, okReceivedSize := transportMessagesReceivedSizeCache[labels]
messagesReceivedCacheMu.RUnlock()

counterReceived, okReceived := transportMessagesReceivedCache.Load(labels)
if !okReceived {
counterReceived = m.transportMessagesReceived.WithLabelValues(transport, channelGroup)
messagesReceivedCacheMu.Lock()
transportMessagesReceivedCache[labels] = counterReceived
messagesReceivedCacheMu.Unlock()
transportMessagesReceivedCache.Store(labels, counterReceived)
}

counterReceivedSize, okReceivedSize := transportMessagesReceivedSizeCache.Load(labels)
if !okReceivedSize {
counterReceivedSize = m.transportMessagesReceivedSize.WithLabelValues(transport, channelGroup)
messagesReceivedCacheMu.Lock()
transportMessagesReceivedSizeCache[labels] = counterReceivedSize
messagesReceivedCacheMu.Unlock()
transportMessagesReceivedSizeCache.Store(labels, counterReceivedSize)
}
counterReceived.Inc()
counterReceivedSize.Add(float64(size))
counterReceived.(prometheus.Counter).Inc()
counterReceivedSize.(prometheus.Counter).Add(float64(size))
}

func (m *metrics) incServerDisconnect(code uint32) {
Expand Down
35 changes: 35 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package centrifuge

import (
"strconv"
"testing"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func BenchmarkTransportMessagesSent(b *testing.B) {
m, err := initMetricsRegistry(prometheus.DefaultRegisterer, "test")
require.NoError(b, err)

b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
m.incTransportMessagesSent("test", "channel"+strconv.Itoa(i%10), 200)
}
})
}

func BenchmarkTransportMessagesReceived(b *testing.B) {
m, err := initMetricsRegistry(prometheus.DefaultRegisterer, "test")
require.NoError(b, err)

b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
i := 0
for pb.Next() {
m.incTransportMessagesReceived("test", "channel"+strconv.Itoa(i%10), 200)
}
})
}

0 comments on commit f95a835

Please sign in to comment.