diff --git a/rpc/server.go b/rpc/server.go index fb6137ad..e03cb5a0 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -13,6 +13,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "github.com/edaniels/zeroconf" @@ -102,6 +103,10 @@ type Server interface { http.Handler EnsureAuthed(ctx context.Context) (context.Context, error) + + // Stats returns a structure containing numbers that can be interesting for graphing over time + // as a diagnostics tool. + Stats() any } type simpleServer struct { @@ -146,6 +151,18 @@ type simpleServer struct { // authIssuer is the JWT issuer (iss) that will be used for our service. authIssuer string + + // counters are for reporting FTDC metrics. A `simpleServer` sets up both a grpc server wrapping + // a standard http2 over TCP connection. And it also sets up grpc services for webrtc + // PeerConnections. These counters are specifically for requests coming in over TCP. + counters struct { + TCPGrpcRequestsStarted atomic.Int64 + TCPGrpcWebRequestsStarted atomic.Int64 + TCPOtherRequestsStarted atomic.Int64 + TCPGrpcRequestsCompleted atomic.Int64 + TCPGrpcWebRequestsCompleted atomic.Int64 + TCPOtherRequestsCompleted atomic.Int64 + } } var errMixedUnauthAndAuth = errors.New("cannot use unauthenticated and auth handlers at same time") @@ -721,13 +738,19 @@ func (ss *simpleServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { r = requestWithHost(r) switch ss.getRequestType(r) { case requestTypeGRPC: + ss.counters.TCPGrpcRequestsStarted.Add(1) ss.grpcServer.ServeHTTP(w, r) + ss.counters.TCPGrpcRequestsCompleted.Add(1) case requestTypeGRPCWeb: + ss.counters.TCPGrpcWebRequestsStarted.Add(1) ss.grpcWebServer.ServeHTTP(w, r) + ss.counters.TCPGrpcWebRequestsCompleted.Add(1) case requestTypeNone: fallthrough default: + ss.counters.TCPOtherRequestsStarted.Add(1) ss.grpcGatewayHandler.ServeHTTP(w, r) + ss.counters.TCPOtherRequestsCompleted.Add(1) } } @@ -861,6 +884,37 @@ func (ss *simpleServer) Stop() error { return err } +// SimpleServerStats are stats of the simple variety. +type SimpleServerStats struct { + TCPGrpcStats TCPGrpcStats + WebRTCGrpcStats WebRTCGrpcStats +} + +// TCPGrpcStats are stats for the classic tcp/http2 webserver. +type TCPGrpcStats struct { + RequestsStarted int64 + WebRequestsStarted int64 + OtherRequestsStarted int64 + RequestsCompleted int64 + WebRequestsCompleted int64 + OtherRequestsCompleted int64 +} + +// Stats returns stats. The return value of `any` is to satisfy the FTDC interface. +func (ss *simpleServer) Stats() any { + return SimpleServerStats{ + TCPGrpcStats: TCPGrpcStats{ + RequestsStarted: ss.counters.TCPGrpcRequestsStarted.Load(), + WebRequestsStarted: ss.counters.TCPGrpcWebRequestsStarted.Load(), + OtherRequestsStarted: ss.counters.TCPOtherRequestsStarted.Load(), + RequestsCompleted: ss.counters.TCPGrpcRequestsCompleted.Load(), + WebRequestsCompleted: ss.counters.TCPGrpcWebRequestsCompleted.Load(), + OtherRequestsCompleted: ss.counters.TCPOtherRequestsCompleted.Load(), + }, + WebRTCGrpcStats: ss.webrtcServer.Stats(), + } +} + // A RegisterServiceHandlerFromEndpointFunc is a means to have a service attach itself to a gRPC gateway mux. type RegisterServiceHandlerFromEndpointFunc func( ctx context.Context, diff --git a/rpc/wrtc_server.go b/rpc/wrtc_server.go index 5feb2518..6c240deb 100644 --- a/rpc/wrtc_server.go +++ b/rpc/wrtc_server.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "sync" + "sync/atomic" "github.com/viamrobotics/webrtc/v3" "google.golang.org/grpc" @@ -43,6 +44,49 @@ type webrtcServer struct { onPeerAdded func(pc *webrtc.PeerConnection) onPeerRemoved func(pc *webrtc.PeerConnection) + + counters struct { + PeersConnected atomic.Int64 + PeersDisconnected atomic.Int64 + PeerConnectionErrors atomic.Int64 + HeadersProcessed atomic.Int64 + + // TotalTimeConnectingMillis just counts successful connection attempts. + TotalTimeConnectingMillis atomic.Int64 + } +} + +// WebRTCGrpcStats are stats of the webrtc variety. +type WebRTCGrpcStats struct { + PeersConnected int64 + PeersDisconnected int64 + PeerConnectionErrors int64 + PeersActive int64 + HeadersProcessed int64 + CallTicketsAvailable int32 + TotalTimeConnectingMillis int64 + + // When the FTDC frontend is more feature rich, we can remove this and let the frontend compute + // the value. + AverageTimeConnectingMillis float64 +} + +// Stats returns stats. +func (srv *webrtcServer) Stats() WebRTCGrpcStats { + ret := WebRTCGrpcStats{ + PeersConnected: srv.counters.PeersConnected.Load(), + PeersDisconnected: srv.counters.PeersDisconnected.Load(), + PeerConnectionErrors: srv.counters.PeerConnectionErrors.Load(), + HeadersProcessed: srv.counters.HeadersProcessed.Load(), + CallTicketsAvailable: int32(cap(srv.callTickets) - len(srv.callTickets)), + TotalTimeConnectingMillis: srv.counters.TotalTimeConnectingMillis.Load(), + } + ret.PeersActive = ret.PeersConnected - ret.PeersDisconnected + if ret.PeersConnected > 0 { + ret.AverageTimeConnectingMillis = float64(ret.TotalTimeConnectingMillis) / float64(ret.PeersConnected) + } + + return ret } // from grpc. @@ -112,6 +156,7 @@ func (srv *webrtcServer) Stop() { srv.peerConnsMu.Unlock() for _, pc := range peerConns { + srv.counters.PeersDisconnected.Add(1) if err := pc.GracefulClose(); err != nil { srv.logger.Errorw("error closing peer connection", "error", err) } @@ -206,6 +251,7 @@ func (srv *webrtcServer) removePeer(peerConn *webrtc.PeerConnection) { srv.peerConnsMu.Lock() delete(srv.peerConns, peerConn) srv.peerConnsMu.Unlock() + srv.counters.PeersDisconnected.Add(1) if srv.onPeerRemoved != nil { srv.onPeerRemoved(peerConn) } diff --git a/rpc/wrtc_server_stream.go b/rpc/wrtc_server_stream.go index ee0914f3..d40a66fe 100644 --- a/rpc/wrtc_server_stream.go +++ b/rpc/wrtc_server_stream.go @@ -277,6 +277,7 @@ func (s *webrtcServerStream) processHeaders(headers *webrtcpb.RequestHeaders) { } } + s.ch.server.counters.HeadersProcessed.Add(1) s.ch.server.processHeadersMu.RLock() s.ch.server.processHeadersWorkers.Add(1) s.ch.server.processHeadersMu.RUnlock() diff --git a/rpc/wrtc_signaling_answerer.go b/rpc/wrtc_signaling_answerer.go index 6f0c51c3..18c771ee 100644 --- a/rpc/wrtc_signaling_answerer.go +++ b/rpc/wrtc_signaling_answerer.go @@ -337,6 +337,8 @@ type answerAttempt struct { // the designated WebRTC data channel is passed off to the underlying Server which // is then used as the server end of a gRPC connection. func (aa *answerAttempt) connect(ctx context.Context) (err error) { + connectionStartTime := time.Now() + // If SOCKS proxy is indicated by environment, extend WebRTC config with an // `OptionalWebRTCConfig` call to the signaling server. The usage of a SOCKS // proxy indicates that the server may need a local TURN ICE candidate to @@ -361,8 +363,10 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) { // Any error below indicates the signaling server is not present. if s, ok := status.FromError(err); ok && (s.Code() == codes.Unimplemented || (s.Code() == codes.InvalidArgument && s.Message() == hostNotAllowedMsg)) { + aa.server.counters.PeerConnectionErrors.Add(1) return ErrNoWebRTCSignaler } + aa.server.counters.PeerConnectionErrors.Add(1) return err } webrtcConfig = extendWebRTCConfig(&webrtcConfig, configResp.Config, true) @@ -378,6 +382,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) { ) if err != nil { aa.sendError(err) + aa.server.counters.PeerConnectionErrors.Add(1) return err } @@ -413,6 +418,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) { if aa.trickleEnabled { answer, err := pc.CreateAnswer(nil) if err != nil { + aa.server.counters.PeerConnectionErrors.Add(1) return err } @@ -491,6 +497,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) { err = pc.SetLocalDescription(answer) if err != nil { + aa.server.counters.PeerConnectionErrors.Add(1) return err } @@ -500,12 +507,14 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) { // candidate information. This is a Nagle's algorithm-esque batching optimization. I // think. case <-ctx.Done(): + aa.server.counters.PeerConnectionErrors.Add(1) return ctx.Err() } } encodedSDP, err := EncodeSDP(pc.LocalDescription()) if err != nil { + aa.server.counters.PeerConnectionErrors.Add(1) aa.sendError(err) return err } @@ -518,6 +527,7 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) { }, }, }); err != nil { + aa.server.counters.PeerConnectionErrors.Add(1) return err } close(initSent) @@ -574,9 +584,12 @@ func (aa *answerAttempt) connect(ctx context.Context) (err error) { case <-serverChannel.Ready(): // Happy path successful = true + aa.server.counters.PeersConnected.Add(1) + aa.server.counters.TotalTimeConnectingMillis.Add(time.Since(connectionStartTime).Milliseconds()) case <-ctx.Done(): // Timed out or signaling server was closed. aa.sendError(multierr.Combine(ctx.Err(), serverChannel.Close())) + aa.server.counters.PeerConnectionErrors.Add(1) return ctx.Err() }