Skip to content

Commit

Permalink
Rename and move database methods
Browse files Browse the repository at this point in the history
  • Loading branch information
corny committed Apr 18, 2017
1 parent c47214f commit 8b8b144
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 73 deletions.
2 changes: 1 addition & 1 deletion cmd/yanic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func main() {
func importRRD(path string) {
log.Println("importing RRD from", path)
for ds := range rrd.Read(path) {
connections.AddStatistics(
connections.InsertGlobals(
&runtime.GlobalStats{
Nodes: uint32(ds.Nodes),
Clients: uint32(ds.Clients),
Expand Down
9 changes: 5 additions & 4 deletions database/all/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,16 @@ func Connect(configuration interface{}) (database.Connection, error) {
}
return &Connection{list: list}, nil
}
func (conn *Connection) AddNode(nodeID string, node *runtime.Node) {

func (conn *Connection) InsertNode(node *runtime.Node) {
for _, item := range conn.list {
item.AddNode(nodeID, node)
item.InsertNode(node)
}
}

func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) {
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
for _, item := range conn.list {
item.AddStatistics(stats, time)
item.InsertGlobals(stats, time)
}
}

Expand Down
8 changes: 4 additions & 4 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (

// Connection interface to use for implementation in e.g. influxdb
type Connection interface {
// AddNode stores data of a single node
AddNode(nodeID string, node *runtime.Node)
// InsertNode stores statistics per node
InsertNode(node *runtime.Node)

// AddStatistics stores global statistics
AddStatistics(stats *runtime.GlobalStats, time time.Time)
// InsertGlobals stores global statistics
InsertGlobals(stats *runtime.GlobalStats, time time.Time)

// PruneNodes prunes historical per-node data
PruneNodes(deleteAfter time.Duration)
Expand Down
37 changes: 0 additions & 37 deletions database/influxdb/database.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package influxdb

import (
"fmt"
"log"
"sync"
"time"
Expand All @@ -10,7 +9,6 @@ import (
"github.com/influxdata/influxdb/models"

"github.com/FreifunkBremen/yanic/database"
"github.com/FreifunkBremen/yanic/runtime"
)

const (
Expand Down Expand Up @@ -80,11 +78,6 @@ func Connect(configuration interface{}) (database.Connection, error) {
return db, nil
}

func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second)
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m"))
}

func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fields, time time.Time) {
point, err := client.NewPoint(name, tags.Map(), fields, time)
if err != nil {
Expand All @@ -93,36 +86,6 @@ func (conn *Connection) addPoint(name string, tags models.Tags, fields models.Fi
conn.points <- point
}

// Saves the values of a CounterMap in the database.
// The key are used as 'value' tag.
// The value is used as 'counter' field.
func (conn *Connection) addCounterMap(name string, m runtime.CounterMap) {
now := time.Now()
for key, count := range m {
conn.addPoint(
name,
models.Tags{
models.Tag{Key: []byte("value"), Value: []byte(key)},
},
models.Fields{"count": count},
now,
)
}
}

// AddStatistics implementation of database
func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) {
conn.addPoint(MeasurementGlobal, nil, GlobalStatsFields(stats), time)
conn.addCounterMap(CounterMeasurementModel, stats.Models)
conn.addCounterMap(CounterMeasurementFirmware, stats.Firmwares)
}

// AddNode implementation of database
func (conn *Connection) AddNode(nodeID string, node *runtime.Node) {
tags, fields := nodeToInflux(node)
conn.addPoint(MeasurementNode, tags, fields, time.Now())
}

// Close all connection and clean up
func (conn *Connection) Close() {
close(conn.points)
Expand Down
44 changes: 44 additions & 0 deletions database/influxdb/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package influxdb

import (
"time"

"github.com/FreifunkBremen/yanic/runtime"
"github.com/influxdata/influxdb/models"
)

// InsertGlobals implementation of database
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
conn.addPoint(MeasurementGlobal, nil, GlobalStatsFields(stats), time)
conn.addCounterMap(CounterMeasurementModel, stats.Models)
conn.addCounterMap(CounterMeasurementFirmware, stats.Firmwares)
}

// GlobalStatsFields returns fields for InfluxDB
func GlobalStatsFields(stats *runtime.GlobalStats) map[string]interface{} {
return map[string]interface{}{
"nodes": stats.Nodes,
"gateways": stats.Gateways,
"clients.total": stats.Clients,
"clients.wifi": stats.ClientsWifi,
"clients.wifi24": stats.ClientsWifi24,
"clients.wifi5": stats.ClientsWifi5,
}
}

// Saves the values of a CounterMap in the database.
// The key are used as 'value' tag.
// The value is used as 'counter' field.
func (conn *Connection) addCounterMap(name string, m runtime.CounterMap) {
now := time.Now()
for key, count := range m {
conn.addPoint(
name,
models.Tags{
models.Tag{Key: []byte("value"), Value: []byte(key)},
},
models.Fields{"count": count},
now,
)
}
}
File renamed without changes.
18 changes: 16 additions & 2 deletions database/influxdb/node.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
package influxdb

import (
"fmt"
"strconv"
"time"

client "github.com/influxdata/influxdb/client/v2"
models "github.com/influxdata/influxdb/models"

"github.com/FreifunkBremen/yanic/runtime"
)

// NodeToInflux Returns tags and fields for InfluxDB
func nodeToInflux(node *runtime.Node) (tags models.Tags, fields models.Fields) {
// InsertNode implementation of database
func (conn *Connection) InsertNode(node *runtime.Node) {
tags, fields := buildNodeStats(node)
conn.addPoint(MeasurementNode, tags, fields, time.Now())
}

func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
query := fmt.Sprintf("delete from %s where time < now() - %ds", MeasurementNode, deleteAfter/time.Second)
conn.client.Query(client.NewQuery(query, conn.config.Database(), "m"))
}

// returns tags and fields for InfluxDB
func buildNodeStats(node *runtime.Node) (tags models.Tags, fields models.Fields) {
stats := node.Statistics

tags.SetString("nodeid", stats.NodeID)
Expand Down
2 changes: 1 addition & 1 deletion database/influxdb/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestToInflux(t *testing.T) {
},
}

tags, fields := nodeToInflux(node)
tags, fields := buildNodeStats(node)

assert.Equal("foobar", tags.GetString("nodeid"))
assert.Equal("nobody", tags.GetString("owner"))
Expand Down
17 changes: 0 additions & 17 deletions database/influxdb/stats.go

This file was deleted.

8 changes: 4 additions & 4 deletions database/logging/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ func Connect(configuration interface{}) (database.Connection, error) {
return &Connection{config: config, file: file}, nil
}

func (conn *Connection) AddNode(nodeID string, node *runtime.Node) {
conn.log("AddNode: [", nodeID, "] clients: ", node.Statistics.Clients.Total)
func (conn *Connection) InsertNode(node *runtime.Node) {
conn.log("InsertNode: [", node.Statistics.NodeID, "] clients: ", node.Statistics.Clients.Total)
}

func (conn *Connection) AddStatistics(stats *runtime.GlobalStats, time time.Time) {
conn.log("AddStatistics: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models))
func (conn *Connection) InsertGlobals(stats *runtime.GlobalStats, time time.Time) {
conn.log("InsertGlobals: [", time.String(), "] nodes: ", stats.Nodes, ", clients: ", stats.Clients, " models: ", len(stats.Models))
}

func (conn *Connection) PruneNodes(deleteAfter time.Duration) {
Expand Down
7 changes: 4 additions & 3 deletions respond/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,10 @@ func (coll *Collector) saveResponse(addr net.UDPAddr, res *data.ResponseData) {
node := coll.nodes.Update(nodeID, res)
node.Address = addr.IP

// Store statistics in InfluxDB
// Store statistics in database
if coll.db != nil && node.Statistics != nil {
coll.db.AddNode(nodeID, node)
node.Statistics.NodeID = nodeID
coll.db.InsertNode(node)
}
}

Expand Down Expand Up @@ -251,5 +252,5 @@ func (coll *Collector) globalStatsWorker() {
func (coll *Collector) saveGlobalStats() {
stats := runtime.NewGlobalStats(coll.nodes)

coll.db.AddStatistics(stats, time.Now())
coll.db.InsertGlobals(stats, time.Now())
}

0 comments on commit 8b8b144

Please sign in to comment.