Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use slab instead of hashmap + atomic counter #1606

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ serde = { version = "1.0.210", default-features = false, features = [
] } # Default features are disabled due to usage in no_std crates
serde_json = "1.0.128"
serde_yaml = "0.9.34"
slab = "0.4.9"
static_init = "1.0.3"
stabby = "36.1.1"
sha3 = "0.10.8"
Expand Down
1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ phf = { workspace = true }
rand = { workspace = true, features = ["default"] }
serde = { workspace = true, features = ["default"] }
serde_json = { workspace = true }
slab = { workspace = true }
socket2 = { workspace = true }
uhlc = { workspace = true, features = ["default"] }
vec_map = { workspace = true }
Expand Down
9 changes: 4 additions & 5 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ use std::{
time::Duration,
};

use slab::Slab;
use tokio_util::sync::CancellationToken;
use zenoh_protocol::{
core::{ExprId, Reliability, WhatAmI, ZenohIdProto},
network::{
interest::{InterestId, InterestMode, InterestOptions},
Mapping, Push, Request, RequestId, Response, ResponseFinal,
Mapping, Push, Request, Response, ResponseFinal,
},
zenoh::RequestBody,
};
Expand Down Expand Up @@ -70,8 +71,7 @@ pub struct FaceState {
HashMap<InterestId, (Arc<CurrentInterest>, CancellationToken)>,
pub(crate) local_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) remote_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) next_qid: RequestId,
pub(crate) pending_queries: HashMap<RequestId, (Arc<Query>, CancellationToken)>,
pub(crate) pending_queries: Slab<(Arc<Query>, CancellationToken)>,
pub(crate) mcast_group: Option<TransportMulticast>,
pub(crate) in_interceptors: Option<Arc<InterceptorsChain>>,
pub(crate) hat: Box<dyn Any + Send + Sync>,
Expand Down Expand Up @@ -102,8 +102,7 @@ impl FaceState {
pending_current_interests: HashMap::new(),
local_mappings: HashMap::new(),
remote_mappings: HashMap::new(),
next_qid: 0,
pending_queries: HashMap::new(),
pending_queries: Slab::new(),
mcast_group,
in_interceptors,
hat,
Expand Down
25 changes: 13 additions & 12 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,11 @@ pub(crate) fn update_matches_query_routes(tables: &Tables, res: &Arc<Resource>)
#[inline]
fn insert_pending_query(outface: &mut Arc<FaceState>, query: Arc<Query>) -> RequestId {
let outface_mut = get_mut_unchecked(outface);
outface_mut.next_qid += 1;
let qid = outface_mut.next_qid;
outface_mut.pending_queries.insert(
qid,
(query, outface_mut.task_controller.get_cancellation_token()),
);
qid
outface_mut
.pending_queries
.insert((query, outface_mut.task_controller.get_cancellation_token()))
.try_into()
.expect("too many pending queries")
}

#[inline]
Expand Down Expand Up @@ -381,7 +379,7 @@ impl QueryCleanup {
qid,
timeout,
};
if let Some((_, cancellation_token)) = face.pending_queries.get(&qid) {
if let Some((_, cancellation_token)) = face.pending_queries.get(qid as usize) {
let c_cancellation_token = cancellation_token.clone();
face.task_controller
.spawn_with_rt(zenoh_runtime::ZRuntime::Net, async move {
Expand Down Expand Up @@ -422,7 +420,7 @@ impl Timed for QueryCleanup {
let queries_lock = zwrite!(self.tables.queries_lock);
if let Some(query) = get_mut_unchecked(&mut face)
.pending_queries
.remove(&self.qid)
.try_remove(self.qid as usize)
{
drop(queries_lock);
tracing::warn!(
Expand Down Expand Up @@ -682,7 +680,7 @@ pub(crate) fn route_send_response(
inc_res_stats!(face, rx, admin, body)
}

match face.pending_queries.get(&qid) {
match face.pending_queries.get(qid as usize) {
Some((query, _)) => {
drop(queries_lock);

Expand Down Expand Up @@ -717,7 +715,10 @@ pub(crate) fn route_send_response_final(
qid: RequestId,
) {
let queries_lock = zwrite!(tables_ref.queries_lock);
match get_mut_unchecked(face).pending_queries.remove(&qid) {
match get_mut_unchecked(face)
.pending_queries
.try_remove(qid as usize)
{
Some(query) => {
drop(queries_lock);
tracing::debug!(
Expand All @@ -735,7 +736,7 @@ pub(crate) fn route_send_response_final(

pub(crate) fn finalize_pending_queries(tables_ref: &TablesLock, face: &mut Arc<FaceState>) {
let queries_lock = zwrite!(tables_ref.queries_lock);
for (_, query) in get_mut_unchecked(face).pending_queries.drain() {
for query in get_mut_unchecked(face).pending_queries.drain() {
finalize_pending_query(query);
}
drop(queries_lock);
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use crate::{
pub(crate) struct RuntimeState {
zid: ZenohId,
whatami: WhatAmI,
next_id: AtomicU32,
next_id: AtomicU32, // @TODO: manage rollover and uniqueness
router: Arc<Router>,
config: Notifier<Config>,
manager: TransportManager,
Expand Down