This repository has been archived by the owner on May 19, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
collector.go
160 lines (133 loc) · 3.6 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package bgpls
import (
"errors"
"net"
"sync"
)
// ErrCollectorStopped is returned when an operation is not valid due to the collector being stopped
var ErrCollectorStopped = errors.New("collector is stopped")
// Collector is a BGP Link-State collector
//
// Events() returns the events channel or an error if the collector has been stopped.
// The events channel is buffered, its size is configurable via CollectorConfig.
// The events channel should be read from in a loop. It will close only when Stop() is called.
//
// Config() returns the configuration of the Collector.
//
// AddNeighbor() initializes a new bgp-ls neighbor.
// An error is returned if the collector is stopped or the neighbor already exists.
//
// DeleteNeighbor() shuts down and removes a neighbor from the collector.
// An error is returned if the collector is stopped or the neighbor does not exist.
//
// Neighbors() returns the configuration of all neighbors.
//
// Stop() stops the collector and all neighbors.
type Collector interface {
Events() (<-chan Event, error)
Config() *CollectorConfig
AddNeighbor(c *NeighborConfig) error
DeleteNeighbor(address net.IP) error
Neighbors() ([]*NeighborConfig, error)
Stop()
}
// standardCollector satisifies the Collector interface.
type standardCollector struct {
running bool
events chan Event
config *CollectorConfig
neighbors map[string]neighbor
*sync.RWMutex
}
// CollectorConfig is the configuration for the Collector.
// EventBufferSize is the size of the buffered events channel returned from the Events() Collector method.
// It should be set to a value appropriate from a memory consumption perspective.
// Setting this value too low can inhibit bgp io.
type CollectorConfig struct {
ASN uint32
RouterID net.IP
EventBufferSize uint64
}
// NewCollector creates a Collector.
func NewCollector(config *CollectorConfig) (Collector, error) {
c := &standardCollector{
running: true,
events: make(chan Event, config.EventBufferSize),
config: config,
neighbors: make(map[string]neighbor),
RWMutex: &sync.RWMutex{},
}
return c, nil
}
func (c *standardCollector) Events() (<-chan Event, error) {
c.RLock()
defer c.RUnlock()
if c.running {
return c.events, nil
}
return nil, ErrCollectorStopped
}
func (c *standardCollector) Config() *CollectorConfig {
c.RLock()
defer c.RUnlock()
return c.config
}
func (c *standardCollector) AddNeighbor(config *NeighborConfig) error {
c.Lock()
defer c.Unlock()
if !c.running {
return ErrCollectorStopped
}
_, exists := c.neighbors[config.Address.String()]
if exists {
return errors.New("neighbor exists")
}
n := newNeighbor(c.config.RouterID, c.config.ASN, config, c.events)
c.neighbors[config.Address.String()] = n
return nil
}
func (c *standardCollector) Neighbors() ([]*NeighborConfig, error) {
c.RLock()
defer c.RUnlock()
if !c.running {
return nil, ErrCollectorStopped
}
configs := make([]*NeighborConfig, 0)
for _, n := range c.neighbors {
configs = append(configs, n.config())
}
return configs, nil
}
func (c *standardCollector) DeleteNeighbor(address net.IP) error {
c.Lock()
defer c.Unlock()
if !c.running {
return ErrCollectorStopped
}
n, exists := c.neighbors[address.String()]
if !exists {
return errors.New("neighbor does not exist")
}
n.terminate()
delete(c.neighbors, address.String())
return nil
}
func (c *standardCollector) Stop() {
c.Lock()
defer c.Unlock()
if !c.running {
return
}
wg := &sync.WaitGroup{}
for _, n := range c.neighbors {
wg.Add(1)
n := n
go func() {
n.terminate()
wg.Done()
}()
}
wg.Wait()
c.running = false
close(c.events)
}