-
Notifications
You must be signed in to change notification settings - Fork 99
/
metric_consolidator.go
119 lines (107 loc) · 3.63 KB
/
metric_consolidator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package gostatsd
import (
"context"
"time"
"github.com/tilinna/clock"
)
// MetricConsolidator will consolidate metrics randomly in to a slice of MetricMaps, and either send the slice to
// the provided channel, or make them available synchronously through Drain[WithContext]/Fill. Run can also be
// started in a long running goroutine to perform flushing, or Flush can be called externally to trigger the channel
// send.
//
// Used to consolidate metrics such as:
// - counter[name=x, value=1]
// - counter[name=x, value=1]
// - counter[name=x, value=1]
// - counter[name=x, value=1]
// - counter[name=x, value=1]
//
// in to:
// - counter[name=x, value=5]
//
// Similar consolidation is performed for other metric types.
type MetricConsolidator struct {
maps chan *MetricMap
sink chan<- []*MetricMap
flushInterval time.Duration
forwarded bool
}
// NewMetricConsolidator returns a new MetricConsolidator, all MetricMap's generated by it will propagate
// the forwarded value from the provided forwarded parameter, and not from any source MetricMap.
func NewMetricConsolidator(spots int, forwarded bool, flushInterval time.Duration, sink chan<- []*MetricMap) *MetricConsolidator {
mc := &MetricConsolidator{
maps: make(chan *MetricMap, spots),
sink: sink,
flushInterval: flushInterval,
forwarded: forwarded,
}
mc.Fill()
return mc
}
func (mc *MetricConsolidator) Run(ctx context.Context) {
clck := clock.FromContext(ctx)
t := clck.NewTicker(mc.flushInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
mc.Flush()
return
case <-t.C:
mc.Flush()
}
}
}
// Drain will collect all the MetricMaps in the MetricConsolidator and return them.
func (mc *MetricConsolidator) Drain() []*MetricMap {
return mc.DrainWithContext(context.Background())
}
// DrainWithContext will collect all the MetricMaps in the MetricConsolidator and return them.
// If the context.Context is canceled before everything can be collected, they are returned to
// the MetricConsolidator and nil is returned.
func (mc *MetricConsolidator) DrainWithContext(ctx context.Context) []*MetricMap {
mms := make([]*MetricMap, 0, cap(mc.maps))
for i := 0; i < cap(mc.maps); i++ {
select {
case mm := <-mc.maps:
mms = append(mms, mm)
case <-ctx.Done():
// Put everything back, so we're consistent, just in case. No need to check for termination,
// because we know it will fit exactly.
for _, mm := range mms {
mc.maps <- mm
}
return nil
}
}
return mms
}
// Flush will collect all the MetricMaps in to a slice, send them to the channel provided, then
// create new MetricMaps for new metrics to land in. Not thread-safe.
func (mc *MetricConsolidator) Flush() {
// Send the collected data to the sink before putting new maps in place. This allows back-pressure
// to propagate through the system, if the sink can't keep up.
mc.sink <- mc.Drain()
mc.Fill()
}
// Fill re-populates the MetricConsolidator with empty MetricMaps, it is the pair to Drain[WithContext] and
// must be called after a successful Drain[WithContext], must not be called after a failed DrainWithContext.
func (mc *MetricConsolidator) Fill() {
for i := 0; i < cap(mc.maps); i++ {
mc.maps <- NewMetricMap(mc.forwarded)
}
}
// ReceiveMetrics will push a slice of Metrics in to one of the MetricMaps
func (mc *MetricConsolidator) ReceiveMetrics(metrics []*Metric) {
mmTo := <-mc.maps
for _, m := range metrics {
mmTo.Receive(m)
}
mc.maps <- mmTo
}
// ReceiveMetricMap will merge a MetricMap in to one of the MetricMaps
func (mc *MetricConsolidator) ReceiveMetricMap(mm *MetricMap) {
mmTo := <-mc.maps
mmTo.Merge(mm)
mc.maps <- mmTo
}