Skip to content

Commit

Permalink
add recycle label, unify Addr
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed Aug 28, 2024
1 parent b9dc2ef commit 685e00d
Showing 1 changed file with 54 additions and 53 deletions.
107 changes: 54 additions & 53 deletions src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::{
use tokio_rustls::TlsAcceptor;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tower_service::Service;
use tracing::{debug, warn};
use tracing::{debug, info, warn};
use uuid::Uuid;

use super::{AsyncCounter, Error, Stats, ALPN_ACME};
Expand All @@ -47,17 +47,17 @@ impl<T: AsyncRead + AsyncWrite + Send + Sync + Unpin> AsyncReadWrite for T {}

#[derive(Clone)]
pub struct Metrics {
pub conns_open: IntGaugeVec,
pub requests: IntCounterVec,
pub bytes_sent: IntCounterVec,
pub bytes_rcvd: IntCounterVec,
pub conn_duration: HistogramVec,
pub requests_per_conn: HistogramVec,
conns_open: IntGaugeVec,
requests: IntCounterVec,
bytes_sent: IntCounterVec,
bytes_rcvd: IntCounterVec,
conn_duration: HistogramVec,
requests_per_conn: HistogramVec,
}

impl Metrics {
pub fn new(registry: &Registry) -> Self {
const LABELS: &[&str] = &["addr", "tls", "family", "forced_close"];
const LABELS: &[&str] = &["addr", "tls", "family", "forced_close", "recycled"];

Self {
conns_open: register_int_gauge_vec_with_registry!(
Expand Down Expand Up @@ -121,6 +121,7 @@ pub struct Options {
pub http2_keepalive_interval: Duration,
pub http2_keepalive_timeout: Duration,
pub grace_period: Duration,
pub max_requests_per_conn: Option<u64>,
}

// TLS information about the connection
Expand Down Expand Up @@ -158,7 +159,7 @@ impl TryFrom<&ServerConnection> for TlsInfo {
pub struct ConnInfo {
pub id: Uuid,
pub accepted_at: Instant,
pub remote_addr: RemoteAddr,
pub remote_addr: Addr,
pub traffic: Arc<Stats>,
pub req_count: AtomicU64,
pub close: CancellationToken,
Expand All @@ -180,55 +181,32 @@ pub enum Listener {
}

impl Listener {
async fn accept(&self) -> Result<(Box<dyn AsyncReadWrite>, RemoteAddr), io::Error> {
async fn accept(&self) -> Result<(Box<dyn AsyncReadWrite>, Addr), io::Error> {
Ok(match self {
Self::Tcp(v) => {
let x = v.accept().await?;
// Disable Nagle's algo
x.0.set_nodelay(true)?;
(Box::new(x.0), RemoteAddr::Tcp(x.1))
(Box::new(x.0), Addr::Tcp(x.1))
}
Self::Unix(v) => {
let x = v.accept().await?;
(
Box::new(x.0),
RemoteAddr::Unix(
x.1.as_pathname()
.map(|x| x.to_string_lossy().to_string())
.unwrap_or_default(),
),
Addr::Unix(x.1.as_pathname().map(|x| x.into()).unwrap_or_default()),
)
}
})
}
}

#[derive(Debug, Clone)]
pub enum LocalAddr {
pub enum Addr {
Tcp(SocketAddr),
Unix(PathBuf),
}

impl Display for LocalAddr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::Tcp(v) => v.to_string(),
Self::Unix(v) => v.to_string_lossy().to_string(),
}
)
}
}

#[derive(Debug, Clone)]
pub enum RemoteAddr {
Tcp(SocketAddr),
Unix(String),
}

impl RemoteAddr {
impl Addr {
pub const fn family(&self) -> &str {
match self {
Self::Tcp(v) => {
Expand All @@ -250,18 +228,22 @@ impl RemoteAddr {
}
}

impl Display for RemoteAddr {
impl Display for Addr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Tcp(v) => write!(f, "{v}"),
Self::Unix(v) => write!(f, "{v}"),
}
write!(
f,
"{}",
match self {
Self::Tcp(v) => v.to_string(),
Self::Unix(v) => v.to_string_lossy().to_string(),
}
)
}
}

struct Conn {
addr: LocalAddr,
remote_addr: RemoteAddr,
addr: Addr,
remote_addr: Addr,
router: Router,
builder: Builder<TokioExecutor>,
token: CancellationToken,
Expand Down Expand Up @@ -328,6 +310,7 @@ impl Conn {
}, // Is TLS
self.remote_addr.family(),
"no",
"no",
];

self.metrics
Expand All @@ -353,9 +336,15 @@ impl Conn {
let (sent, rcvd) = (stats.sent(), stats.rcvd());
let dur = accepted_at.elapsed().as_secs_f64();
let reqs = conn_info.req_count.load(Ordering::SeqCst);

// force-closed
if self.token_close.is_cancelled() {
labels[3] = "yes";
}
// recycled
if self.token.is_cancelled() {
labels[4] = "yes";
}

self.metrics
.conns_open
Expand All @@ -380,7 +369,8 @@ impl Conn {
.observe(reqs as f64);

debug!(
"{self}: connection closed (rcvd: {rcvd}, sent: {sent}, reqs: {reqs}, duration: {dur}, forced close: {})",
"{self}: connection closed (rcvd: {rcvd}, sent: {sent}, reqs: {reqs}, duration: {dur}, graceful: {}, forced close: {})",
self.token.is_cancelled(),
self.token_close.is_cancelled(),
);

Expand Down Expand Up @@ -422,14 +412,21 @@ impl Conn {

// Convert router to Hyper service
let service = hyper::service::service_fn(move |mut request: Request<Incoming>| {
conn_info.req_count.fetch_add(1, Ordering::SeqCst);
let conn_count = conn_info.req_count.fetch_add(1, Ordering::SeqCst);

// Inject connection information
request.extensions_mut().insert(conn_info.clone());
if let Some(v) = &tls_info {
request.extensions_mut().insert(v.clone());
}

// Check if we need to gracefully shutdown this connection
if let Some(v) = self.options.max_requests_per_conn {
if conn_count + 1 >= v {
self.token.cancel();
}
}

// Serve the request
self.router.clone().call(request)
});
Expand Down Expand Up @@ -475,7 +472,7 @@ impl Conn {

// Listens for new connections on addr with an optional TLS and serves provided Router
pub struct Server {
addr: LocalAddr,
addr: Addr,
router: Router,
tracker: TaskTracker,
options: Options,
Expand All @@ -485,7 +482,7 @@ pub struct Server {

impl Server {
pub fn new(
addr: LocalAddr,
addr: Addr,
router: Router,
options: Options,
metrics: Metrics,
Expand All @@ -503,10 +500,8 @@ impl Server {

fn listen(&self) -> Result<Listener, Error> {
Ok(match &self.addr {
LocalAddr::Tcp(v) => Listener::Tcp(listen_tcp_backlog(*v, self.options.backlog)?),
LocalAddr::Unix(v) => {
Listener::Unix(listen_unix_backlog(v.clone(), self.options.backlog)?)
}
Addr::Tcp(v) => Listener::Tcp(listen_tcp_backlog(*v, self.options.backlog)?),
Addr::Unix(v) => Listener::Unix(listen_unix_backlog(v.clone(), self.options.backlog)?),
})
}

Expand Down Expand Up @@ -553,6 +548,12 @@ impl Server {
}

warn!("Server {}: shut down", self.addr);

// Remove the socket
if let Addr::Unix(v) = &self.addr {
let _ = std::fs::remove_file(v);
}

return Ok(());
},

Expand Down Expand Up @@ -587,7 +588,7 @@ impl Server {
// Spawn a task to handle connection & track it
self.tracker.spawn(async move {
if let Err(e) = conn.handle(stream).await {
warn!("Server {}: {}: failed to handle connection: {e:#}", conn.addr, remote_addr);
info!("Server {}: {}: failed to handle connection: {e:#}", conn.addr, remote_addr);
}

debug!(
Expand Down

0 comments on commit 685e00d

Please sign in to comment.