Skip to content

Commit

Permalink
Merge branch 'develop' into fix/5285
Browse files Browse the repository at this point in the history
  • Loading branch information
jcnelson authored Nov 21, 2024
2 parents 212914b + c79fa6c commit 6ddfd6c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 40 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/p2p-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ jobs:
- net::tests::convergence::test_walk_star_15_org_biased
- net::tests::convergence::test_walk_inbound_line_15
- net::api::tests::postblock_proposal::test_try_make_response
- net::server::tests::test_http_10_threads_getinfo
- net::server::tests::test_http_10_threads_getblock
- net::server::tests::test_http_too_many_clients
- net::server::tests::test_http_slow_client
steps:
## Setup test environment
- name: Setup Test Environment
Expand Down
52 changes: 23 additions & 29 deletions stackslib/src/net/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ pub struct ConversationHttp {
pending_request: Option<ReplyHandleHttp>,
/// outstanding response
pending_response: Option<StacksHttpResponse>,
/// whether or not there's an error response pending
pending_error_response: bool,
/// how much data to buffer (i.e. the socket's send buffer size)
socket_send_buffer_size: u32,
}
Expand Down Expand Up @@ -166,7 +164,6 @@ impl ConversationHttp {
canonical_stacks_tip_height: None,
pending_request: None,
pending_response: None,
pending_error_response: false,
keep_alive: true,
total_request_count: 0,
total_reply_count: 0,
Expand Down Expand Up @@ -228,15 +225,6 @@ impl ConversationHttp {
);
return Err(net_error::InProgress);
}
if self.pending_error_response {
test_debug!(
"{:?},id={}: Error response is inflight",
&self.peer_host,
self.conn_id
);
return Err(net_error::InProgress);
}

let handle = self.start_request(req)?;

self.pending_request = Some(handle);
Expand All @@ -255,12 +243,12 @@ impl ConversationHttp {
);
return Err(net_error::InProgress);
}
if self.pending_error_response {
// error already in-flight
return Ok(());
}
let (mut preamble, body_contents) = res.try_into_contents()?;
preamble.content_length = body_contents.content_length();
preamble.keep_alive = false;

let (preamble, body_contents) = res.try_into_contents()?;
// account for the request
self.total_request_count += 1;

// make the relay handle. There may not have been a valid request in the first place, so
// we'll use a relay handle (not a reply handle) to push out the error.
Expand All @@ -269,7 +257,6 @@ impl ConversationHttp {
// queue up the HTTP headers, and then stream back the body.
preamble.consensus_serialize(&mut reply)?;
self.reply_streams.push_back((reply, body_contents, false));
self.pending_error_response = true;
Ok(())
}

Expand Down Expand Up @@ -388,11 +375,12 @@ impl ConversationHttp {
if broken || (drained_handle && drained_stream) {
// done with this stream
test_debug!(
"{:?}: done with stream (broken={}, drained_handle={}, drained_stream={})",
"{:?}: done with stream (broken={}, drained_handle={}, drained_stream={}, do_keep_alive={})",
&self,
broken,
drained_handle,
drained_stream
drained_stream,
do_keep_alive,
);
self.total_reply_count += 1;
self.reply_streams.pop_front();
Expand Down Expand Up @@ -482,6 +470,14 @@ impl ConversationHttp {

/// Is the connection idle?
pub fn is_idle(&self) -> bool {
test_debug!(
"{:?} is_idle? {},{},{},{}",
self,
self.pending_response.is_none(),
self.connection.inbox_len(),
self.connection.outbox_len(),
self.reply_streams.len()
);
self.pending_response.is_none()
&& self.connection.inbox_len() == 0
&& self.connection.outbox_len() == 0
Expand All @@ -491,9 +487,13 @@ impl ConversationHttp {
/// Is the conversation out of pending data?
/// Don't consider it drained if we haven't received anything yet
pub fn is_drained(&self) -> bool {
((self.total_request_count > 0 && self.total_reply_count > 0)
|| self.pending_error_response)
&& self.is_idle()
test_debug!(
"{:?} is_drained? {},{}",
self,
self.total_request_count,
self.total_reply_count
);
self.total_request_count > 0 && self.total_reply_count > 0 && self.is_idle()
}

/// Should the connection be kept alive even if drained?
Expand Down Expand Up @@ -523,11 +523,6 @@ impl ConversationHttp {
&mut self,
node: &mut StacksNodeState,
) -> Result<Vec<StacksMessageType>, net_error> {
// if we have an in-flight error, then don't take any more requests.
if self.pending_error_response {
return Ok(vec![]);
}

// handle in-bound HTTP request(s)
let num_inbound = self.connection.inbox_len();
let mut ret = vec![];
Expand Down Expand Up @@ -568,7 +563,6 @@ impl ConversationHttp {
}
StacksHttpMessage::Error(path, resp) => {
// new request, but resulted in an error when parsing it
self.total_request_count += 1;
self.last_request_timestamp = get_epoch_time_secs();
let start_time = Instant::now();
self.reply_error(resp)?;
Expand Down
15 changes: 4 additions & 11 deletions stackslib/src/net/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ impl HttpPeer {
/// Deregister a socket/event pair
#[cfg_attr(test, mutants::skip)]
pub fn deregister_http(&mut self, network_state: &mut NetworkState, event_id: usize) -> () {
test_debug!("Remove HTTP event {}", event_id);
self.peers.remove(&event_id);

match self.sockets.remove(&event_id) {
Expand Down Expand Up @@ -456,7 +457,7 @@ impl HttpPeer {
"Failed to flush HTTP 400 to socket {:?}: {:?}",
&client_sock, &e
);
convo_dead = true;
// convo_dead = true;
}
}
Err(e) => {
Expand Down Expand Up @@ -559,19 +560,11 @@ impl HttpPeer {
let mut to_remove = vec![];
let mut msgs = vec![];
for event_id in &poll_state.ready {
if !self.sockets.contains_key(&event_id) {
let Some(client_sock) = self.sockets.get_mut(&event_id) else {
debug!("Rogue socket event {}", event_id);
to_remove.push(*event_id);
continue;
}

let client_sock_opt = self.sockets.get_mut(&event_id);
if client_sock_opt.is_none() {
debug!("No such socket event {}", event_id);
to_remove.push(*event_id);
continue;
}
let client_sock = client_sock_opt.unwrap();
};

match self.peers.get_mut(event_id) {
Some(ref mut convo) => {
Expand Down

0 comments on commit 6ddfd6c

Please sign in to comment.