Skip to content

Commit

Permalink
Use updated Service trait
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 4, 2024
1 parent 8d5bf36 commit ddf87ef
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 42 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [4.3.0] - 2024-11-04

* Use updated Service trait

## [4.2.1] - 2024-11-01

* Better rediness error handling
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-mqtt"
version = "4.2.1"
version = "4.3.0"
authors = ["ntex contributors <[email protected]>"]
description = "Client and Server framework for MQTT v5 and v3.1.1 protocols"
documentation = "https://docs.rs/ntex-mqtt"
Expand All @@ -17,8 +17,8 @@ features = ["ntex/tokio"]
[dependencies]
ntex-io = "2"
ntex-net = "2"
ntex-util = "2"
ntex-service = "3.2.1"
ntex-util = "2.5"
ntex-service = "3.3"
ntex-bytes = "0.1"
ntex-codec = "0.6"
ntex-router = "0.5"
Expand Down
48 changes: 37 additions & 11 deletions src/inflight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::{cell::Cell, future::poll_fn, rc::Rc, task::Context, task::Poll};

use ntex_service::{Middleware, Service, ServiceCtx};
use ntex_util::task::LocalWaker;
use ntex_util::{future::join, future::select, task::LocalWaker};

/// Trait for types that could be sized
pub trait SizedRequest {
Expand Down Expand Up @@ -73,15 +73,21 @@ where
type Response = S::Response;
type Error = S::Error;

ntex_service::forward_shutdown!(service);

#[inline]
async fn ready(&self, ctx: ServiceCtx<'_, Self>) -> Result<(), S::Error> {
ctx.ready(&self.service).await?;
if !self.count.is_available() {
let (_, res) = join(self.count.available(), ctx.ready(&self.service)).await;
res
} else {
ctx.ready(&self.service).await
}
}

// check if we have capacity
self.count.available().await;
Ok(())
#[inline]
async fn not_ready(&self) {
if self.count.is_available() {
select(self.count.unavailable(), self.service.not_ready()).await;
}
}

#[inline]
Expand All @@ -92,6 +98,8 @@ where
drop(task_guard);
result
}

ntex_service::forward_shutdown!(service);
}

struct Counter(Rc<CounterInner>);
Expand Down Expand Up @@ -119,9 +127,18 @@ impl Counter {
CounterGuard::new(size, self.0.clone())
}

fn is_available(&self) -> bool {
(self.0.max_cap == 0 || self.0.cur_cap.get() < self.0.max_cap)
&& (self.0.max_size == 0 || self.0.cur_size.get() <= self.0.max_size)
}

async fn available(&self) {
poll_fn(|cx| if self.0.available(cx) { Poll::Ready(()) } else { Poll::Pending }).await
}

async fn unavailable(&self) {
poll_fn(|cx| if self.0.available(cx) { Poll::Pending } else { Poll::Ready(()) }).await
}
}

struct CounterGuard(u32, Rc<CounterInner>);
Expand All @@ -143,8 +160,14 @@ impl Drop for CounterGuard {

impl CounterInner {
fn inc(&self, size: u32) {
self.cur_cap.set(self.cur_cap.get() + 1);
self.cur_size.set(self.cur_size.get() + size as usize);
let cur_cap = self.cur_cap.get() + 1;
self.cur_cap.set(cur_cap);
let cur_size = self.cur_size.get() + size as usize;
self.cur_size.set(cur_size);

if cur_cap == self.max_cap || cur_size >= self.max_size {
self.task.wake();
}
}

fn dec(&self, size: u32) {
Expand All @@ -161,12 +184,12 @@ impl CounterInner {
}

fn available(&self, cx: &Context<'_>) -> bool {
self.task.register(cx.waker());
if (self.max_cap == 0 || self.cur_cap.get() < self.max_cap)
&& (self.max_size == 0 || self.cur_size.get() <= self.max_size)
{
true
} else {
self.task.register(cx.waker());
false
}

Check failure on line 194 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / clippy

this if-then-else expression returns a bool literal

error: this if-then-else expression returns a bool literal --> src/inflight.rs:188:9 | 188 | / if (self.max_cap == 0 || self.cur_cap.get() < self.max_cap) 189 | | && (self.max_size == 0 || self.cur_size.get() <= self.max_size) 190 | | { 191 | | true 192 | | } else { 193 | | false 194 | | } | |_________^ help: you can reduce it to: `(self.max_cap == 0 || self.cur_cap.get() < self.max_cap) && (self.max_size == 0 || self.cur_size.get() <= self.max_size)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_bool note: the lint level is defined here --> src/lib.rs:1:27 | 1 | #![deny(rust_2018_idioms, warnings, unreachable_pub)] | ^^^^^^^^ = note: `#[deny(clippy::needless_bool)]` implied by `#[deny(warnings)]`
}
Expand Down Expand Up @@ -261,13 +284,14 @@ mod tests {
.await
}

async fn call(&self, _: (), _: ServiceCtx<'_, Self>) -> Result<(), ()> {
async fn call(&self, _: (), ctx: ServiceCtx<'_, Self>) -> Result<(), ()> {

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / stable - aarch64-apple-darwin

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / 1.75.0 - x86_64-unknown-linux-gnu

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / nightly - aarch64-apple-darwin

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / stable - aarch64-apple-darwin

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / nightly - aarch64-apple-darwin

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / 1.75.0 - x86_64-unknown-linux-gnu

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-unknown-linux-gnu

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-unknown-linux-gnu

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-pc-windows-msvc

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / stable - x86_64-pc-windows-msvc

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-pc-windows-msvc

unused variable: `ctx`

Check failure on line 287 in src/inflight.rs

View workflow job for this annotation

GitHub Actions / nightly - x86_64-pc-windows-msvc

unused variable: `ctx`
let fut = sleep(self.dur);
self.cnt.set(true);
self.waker.wake();

let _ = fut.await;
self.cnt.set(false);
self.waker.wake();
Ok::<_, ()>(())
}
}
Expand All @@ -286,6 +310,7 @@ mod tests {
}))
.bind();
assert_eq!(lazy(|cx| srv.poll_ready(cx)).await, Poll::Ready(Ok(())));
assert_eq!(lazy(|cx| srv.poll_not_ready(cx)).await, Poll::Pending);

let srv2 = srv.clone();
ntex_util::spawn(async move {
Expand All @@ -300,6 +325,7 @@ mod tests {
let _ = poll_fn(|cx| srv2.poll_ready(cx)).await;
let _ = tx.send(());
});
assert_eq!(poll_fn(|cx| srv.poll_ready(cx)).await, Ok(()));

let _ = rx.await;
}
Expand Down
58 changes: 36 additions & 22 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ pin_project_lite::pin_project! {
bitflags::bitflags! {
#[derive(Copy, Clone, Eq, PartialEq, Debug)]
struct Flags: u8 {
const READY_ERR = 0b00001;
const IO_ERR = 0b00010;
const KA_ENABLED = 0b00100;
const KA_TIMEOUT = 0b01000;
const READ_TIMEOUT = 0b10000;
const READY_ERR = 0b000001;
const IO_ERR = 0b000010;
const KA_ENABLED = 0b000100;
const KA_TIMEOUT = 0b001000;
const READ_TIMEOUT = 0b010000;
const READY = 0b100000;
}
}

Expand Down Expand Up @@ -426,26 +427,39 @@ where
}
}

fn check_error(&mut self) -> PollService<U> {
// check for errors
let mut state = self.state.borrow_mut();
if let Some(err) = state.error.take() {
log::trace!("{}: Error occured, stopping dispatcher", self.io.tag());
self.st = IoDispatcherState::Stop;
match err {
IoDispatcherError::Encoder(err) => {
PollService::Item(DispatchItem::EncoderError(err))
}
IoDispatcherError::Service(err) => {
state.error = Some(IoDispatcherError::Service(err));
PollService::Continue
}
}
} else {
PollService::Ready
}
}

fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
// check service readiness
if self.flags.contains(Flags::READY) {
if self.service.poll_not_ready(cx).is_pending() {
return Poll::Ready(self.check_error());
}
self.flags.remove(Flags::READY);
}

match self.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
// check for errors
let mut state = self.state.borrow_mut();
Poll::Ready(if let Some(err) = state.error.take() {
log::trace!("{}: Error occured, stopping dispatcher", self.io.tag());
self.st = IoDispatcherState::Stop;
match err {
IoDispatcherError::Encoder(err) => {
PollService::Item(DispatchItem::EncoderError(err))
}
IoDispatcherError::Service(err) => {
state.error = Some(IoDispatcherError::Service(err));
PollService::Continue
}
}
} else {
PollService::Ready
})
self.flags.insert(Flags::READY);
Poll::Ready(self.check_error())
}
// pause io read task
Poll::Pending => {
Expand Down
10 changes: 10 additions & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ where
ready2
}

#[inline]
async fn not_ready(&self) {
select(self.handlers.0.not_ready(), self.handlers.1.not_ready()).await;
}

#[inline]
async fn shutdown(&self) {
self.handlers.0.shutdown().await;
Expand Down Expand Up @@ -296,6 +301,11 @@ where
Service::<IoBoxed>::ready(self, ctx).await
}

#[inline]
async fn not_ready(&self) {
Service::<IoBoxed>::not_ready(self).await
}

#[inline]
async fn shutdown(&self) {
Service::<IoBoxed>::shutdown(self).await
Expand Down
7 changes: 6 additions & 1 deletion src/v3/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc};

use ntex_io::DispatchItem;
use ntex_service::{Pipeline, Service, ServiceCtx};
use ntex_util::future::{join, Either};
use ntex_util::future::{join, select, Either};
use ntex_util::{services::inflight::InFlightService, HashSet};

use crate::error::{HandshakeError, MqttError, ProtocolError};
Expand Down Expand Up @@ -86,6 +86,11 @@ where
}
}

#[inline]
async fn not_ready(&self) {
select(self.publish.not_ready(), self.inner.control.not_ready()).await;
}

async fn shutdown(&self) {
self.inner.sink.close();
let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await;
Expand Down
7 changes: 6 additions & 1 deletion src/v3/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ntex_io::DispatchItem;
use ntex_service::{Pipeline, Service, ServiceCtx, ServiceFactory};
use ntex_util::services::buffer::{BufferService, BufferServiceError};
use ntex_util::services::inflight::InFlightService;
use ntex_util::{future::join, HashSet};
use ntex_util::{future::join, future::select, HashSet};

use crate::error::{HandshakeError, MqttError, ProtocolError};
use crate::types::QoS;
Expand Down Expand Up @@ -145,6 +145,11 @@ where
}
}

#[inline]
async fn not_ready(&self) {
select(self.publish.not_ready(), self.inner.control.not_ready()).await;
}

async fn shutdown(&self) {
self.inner.sink.close();
let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await;
Expand Down
21 changes: 20 additions & 1 deletion src/v3/router.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::rc::Rc;
use std::{future::poll_fn, future::Future, pin::Pin, rc::Rc, task::Poll};

use ntex_router::{IntoPattern, RouterBuilder};
use ntex_service::boxed::{self, BoxService, BoxServiceFactory};
Expand Down Expand Up @@ -116,6 +116,25 @@ impl<Err> Service<Publish> for RouterService<Err> {
ctx.ready(&self.default).await
}

#[inline]
async fn not_ready(&self) {
let mut futs = Vec::with_capacity(self.handlers.len() + 1);
for hnd in &self.handlers {
futs.push(Box::pin(hnd.not_ready()));
}
futs.push(Box::pin(self.default.not_ready()));

poll_fn(|cx| {
for hnd in &mut futs {
if Pin::new(hnd).poll(cx).is_ready() {
return Poll::Ready(());
}
}
Poll::Pending
})
.await;
}

async fn call(
&self,
mut req: Publish,
Expand Down
7 changes: 6 additions & 1 deletion src/v5/client/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{cell::RefCell, marker::PhantomData, num::NonZeroU16, rc::Rc};
use ntex_bytes::ByteString;
use ntex_io::DispatchItem;
use ntex_service::{Pipeline, Service, ServiceCtx};
use ntex_util::{future::join, future::Either, HashMap, HashSet};
use ntex_util::{future::join, future::select, future::Either, HashMap, HashSet};

use crate::error::{HandshakeError, MqttError, ProtocolError};
use crate::types::packet_type;
Expand Down Expand Up @@ -114,6 +114,11 @@ where
}
}

#[inline]
async fn not_ready(&self) {
select(self.publish.not_ready(), self.inner.control.not_ready()).await;
}

async fn shutdown(&self) {
self.inner.sink.drop_sink();
let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await;
Expand Down
7 changes: 6 additions & 1 deletion src/v5/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ntex_io::DispatchItem;
use ntex_service::{self as service, Pipeline, Service, ServiceCtx, ServiceFactory};
use ntex_util::services::inflight::InFlightService;
use ntex_util::services::{buffer::BufferService, buffer::BufferServiceError};
use ntex_util::{future::join, HashMap, HashSet};
use ntex_util::{future::join, future::select, HashMap, HashSet};

use crate::error::{HandshakeError, MqttError, ProtocolError};
use crate::types::QoS;
Expand Down Expand Up @@ -155,6 +155,11 @@ where
}
}

#[inline]
async fn not_ready(&self) {
select(self.publish.not_ready(), self.inner.control.not_ready()).await;
}

async fn shutdown(&self) {
self.inner.sink.drop_sink();
let _ = Pipeline::new(&self.inner.control).call(Control::closed()).await;
Expand Down
Loading

0 comments on commit ddf87ef

Please sign in to comment.