Skip to content

Commit

Permalink
Expose download/upload rate per peer
Browse files Browse the repository at this point in the history
  • Loading branch information
neilalexander committed Nov 18, 2024
1 parent c22a746 commit fa39806
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 12 deletions.
12 changes: 10 additions & 2 deletions cmd/yggdrasilctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,9 @@ func run() int {
if err := json.Unmarshal(recv.Response, &resp); err != nil {
panic(err)
}
table.SetHeader([]string{"URI", "State", "Dir", "IP Address", "Uptime", "RTT", "RX", "TX", "Pr", "Cost", "Last Error"})
table.SetHeader([]string{"URI", "State", "Dir", "IP Address", "Uptime", "RTT", "RX", "TX", "Down", "Up", "Pr", "Cost", "Last Error"})
for _, peer := range resp.Peers {
state, lasterr, dir, rtt := "Up", "-", "Out", "-"
state, lasterr, dir, rtt, rxr, txr := "Up", "-", "Out", "-", "-", "-"
if !peer.Up {
state, lasterr = "Down", fmt.Sprintf("%s ago: %s", peer.LastErrorTime.Round(time.Second), peer.LastError)
} else if rttms := float64(peer.Latency.Microseconds()) / 1000; rttms > 0 {
Expand All @@ -190,6 +190,12 @@ func run() int {
uri.RawQuery = ""
uristring = uri.String()
}
if peer.RXRate > 0 {
rxr = peer.RXRate.String() + "/s"
}
if peer.TXRate > 0 {
txr = peer.TXRate.String() + "/s"
}
table.Append([]string{
uristring,
state,
Expand All @@ -199,6 +205,8 @@ func run() int {
rtt,
peer.RXBytes.String(),
peer.TXBytes.String(),
rxr,
txr,
fmt.Sprintf("%d", peer.Priority),
fmt.Sprintf("%d", peer.Cost),
lasterr,
Expand Down
16 changes: 9 additions & 7 deletions src/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,13 +356,15 @@ type DataUnit uint64

func (d DataUnit) String() string {
switch {
case d > 1024*1024*1024*1024:
return fmt.Sprintf("%2.ftb", float64(d)/1024/1024/1024/1024)
case d > 1024*1024*1024:
return fmt.Sprintf("%2.fgb", float64(d)/1024/1024/1024)
case d > 1024*1024:
return fmt.Sprintf("%2.fmb", float64(d)/1024/1024)
case d >= 1024*1024*1024*1024:
return fmt.Sprintf("%2.1fTB", float64(d)/1024/1024/1024/1024)
case d >= 1024*1024*1024:
return fmt.Sprintf("%2.1fGB", float64(d)/1024/1024/1024)
case d >= 1024*1024:
return fmt.Sprintf("%2.1fMB", float64(d)/1024/1024)
case d >= 100:
return fmt.Sprintf("%2.1fKB", float64(d)/1024)
default:
return fmt.Sprintf("%2.fkb", float64(d)/1024)
return fmt.Sprintf("%dB", d)
}
}
4 changes: 4 additions & 0 deletions src/admin/getpeers.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type PeerEntry struct {
Cost uint64 `json:"cost"`
RXBytes DataUnit `json:"bytes_recvd,omitempty"`
TXBytes DataUnit `json:"bytes_sent,omitempty"`
RXRate DataUnit `json:"rate_recvd,omitempty"`
TXRate DataUnit `json:"rate_sent,omitempty"`
Uptime float64 `json:"uptime,omitempty"`
Latency time.Duration `json:"latency_ms,omitempty"`
LastErrorTime time.Duration `json:"last_error_time,omitempty"`
Expand All @@ -47,6 +49,8 @@ func (a *AdminSocket) getPeersHandler(_ *GetPeersRequest, res *GetPeersResponse)
URI: p.URI,
RXBytes: DataUnit(p.RXBytes),
TXBytes: DataUnit(p.TXBytes),
RXRate: DataUnit(p.RXRate),
TXRate: DataUnit(p.TXRate),
Uptime: p.Uptime.Seconds(),
}
if p.Latency > 0 {
Expand Down
4 changes: 4 additions & 0 deletions src/core/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type PeerInfo struct {
Cost uint64
RXBytes uint64
TXBytes uint64
RXRate uint64
TXRate uint64
Uptime time.Duration
Latency time.Duration
}
Expand Down Expand Up @@ -87,6 +89,8 @@ func (c *Core) GetPeers() []PeerInfo {
peerinfo.Inbound = state.linkType == linkTypeIncoming
peerinfo.RXBytes = atomic.LoadUint64(&c.rx)
peerinfo.TXBytes = atomic.LoadUint64(&c.tx)
peerinfo.RXRate = atomic.LoadUint64(&c.rxrate)
peerinfo.TXRate = atomic.LoadUint64(&c.txrate)
peerinfo.Uptime = time.Since(c.up)
}
if p, ok := conns[conn]; ok {
Expand Down
37 changes: 34 additions & 3 deletions src/core/link.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,36 @@ func (l *links) init(c *Core) error {
l._links = make(map[linkInfo]*link)
l._listeners = make(map[*Listener]context.CancelFunc)

l.Act(nil, l._updateAverages)
return nil
}

func (l *links) _updateAverages() {
select {
case <-l.core.ctx.Done():
return
default:
}

for _, l := range l._links {
if l._conn == nil {
continue
}
rx := atomic.LoadUint64(&l._conn.rx)
tx := atomic.LoadUint64(&l._conn.tx)
lastrx := atomic.LoadUint64(&l._conn.lastrx)
lasttx := atomic.LoadUint64(&l._conn.lasttx)
atomic.StoreUint64(&l._conn.rxrate, rx-lastrx)
atomic.StoreUint64(&l._conn.txrate, tx-lasttx)
atomic.StoreUint64(&l._conn.lastrx, rx)
atomic.StoreUint64(&l._conn.lasttx, tx)
}

time.AfterFunc(time.Second, func() {
l.Act(nil, l._updateAverages)
})
}

func (l *links) shutdown() {
phony.Block(l, func() {
for listener := range l._listeners {
Expand Down Expand Up @@ -699,9 +726,13 @@ func urlForLinkInfo(u url.URL) url.URL {
type linkConn struct {
// tx and rx are at the beginning of the struct to ensure 64-bit alignment
// on 32-bit platforms, see https://pkg.go.dev/sync/atomic#pkg-note-BUG
rx uint64
tx uint64
up time.Time
rx uint64
tx uint64
rxrate uint64
txrate uint64
lastrx uint64
lasttx uint64
up time.Time
net.Conn
}

Expand Down

0 comments on commit fa39806

Please sign in to comment.