From e3df0891de1ecdef80d4e39d0d7205dd67b38c9c Mon Sep 17 00:00:00 2001 From: Isabel Atkinson Date: Fri, 22 Nov 2024 16:35:27 -0500 Subject: [PATCH] RUST-1222 Cancel in-progress operations when SDAM heartbeats time out (#1249) --- src/client/auth/aws.rs | 4 +- src/client/auth/oidc.rs | 2 +- src/client/auth/plain.rs | 2 +- src/client/auth/scram.rs | 6 +- src/client/auth/x509.rs | 2 +- src/client/executor.rs | 7 +- src/cmap/conn.rs | 62 +++-- src/cmap/conn/pooled.rs | 65 ++++-- src/cmap/conn/wire/message.rs | 48 +++- src/cmap/establish.rs | 13 +- src/cmap/establish/handshake.rs | 4 +- src/cmap/test.rs | 89 ++++---- src/cmap/test/event.rs | 16 +- src/cmap/test/file.rs | 11 +- src/cmap/test/integration.rs | 2 +- src/cmap/worker.rs | 38 +++- src/event/cmap.rs | 34 ++- src/hello.rs | 15 +- src/operation.rs | 5 + src/sdam/monitor.rs | 4 +- src/test/spec/handshake.rs | 1 + .../README.md | 27 +++ .../cmap-format/README.md | 167 ++++++++++++++ .../cmap-format/README.rst | 215 ------------------ .../pool-checkin-make-available.json | 6 +- .../pool-checkin-make-available.yml | 2 + .../cmap-format/pool-checkout-connection.json | 6 +- .../cmap-format/pool-checkout-connection.yml | 2 + ...kout-custom-maxConnecting-is-enforced.json | 2 +- ...ckout-custom-maxConnecting-is-enforced.yml | 50 ++++ .../pool-checkout-error-closed.json | 4 +- .../pool-checkout-error-closed.yml | 2 + ...ol-checkout-maxConnecting-is-enforced.json | 4 +- ...ool-checkout-maxConnecting-is-enforced.yml | 4 +- .../pool-checkout-maxConnecting-timeout.json | 3 +- .../pool-checkout-maxConnecting-timeout.yml | 1 + ...-minPoolSize-connection-maxConnecting.json | 88 +++++++ ...t-minPoolSize-connection-maxConnecting.yml | 63 +++++ ...out-returned-connection-maxConnecting.json | 14 +- ...kout-returned-connection-maxConnecting.yml | 22 +- .../pool-clear-clears-waitqueue.json | 12 +- .../pool-clear-clears-waitqueue.yml | 4 + ...lear-interrupting-pending-connections.json | 77 +++++++ ...clear-interrupting-pending-connections.yml | 42 ++++ .../cmap-format/pool-clear-ready.json | 7 +- .../cmap-format/pool-clear-ready.yml | 3 + ...e-run-interruptInUseConnections-false.json | 81 +++++++ ...le-run-interruptInUseConnections-false.yml | 48 ++++ .../cmap-format/pool-clear.json | 67 ------ .../cmap-format/pool-clear.yml | 34 --- .../pool-create-min-size-error.json | 8 +- .../pool-create-min-size-error.yml | 4 +- .../cmap-format/pool-ready.json | 6 +- .../cmap-format/pool-ready.yml | 2 + .../cmap-format/wait-queue-timeout.json | 6 +- .../cmap-format/wait-queue-timeout.yml | 2 + src/trace/connection.rs | 22 +- 57 files changed, 1034 insertions(+), 503 deletions(-) create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/README.md create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.md delete mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.rst create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-custom-maxConnecting-is-enforced.yml create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-minPoolSize-connection-maxConnecting.json create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-minPoolSize-connection-maxConnecting.yml create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-interrupting-pending-connections.json create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-interrupting-pending-connections.yml create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-schedule-run-interruptInUseConnections-false.json create mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-schedule-run-interruptInUseConnections-false.yml delete mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.json delete mode 100644 src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.yml diff --git a/src/client/auth/aws.rs b/src/client/auth/aws.rs index 8e63b4c34..6ae302461 100644 --- a/src/client/auth/aws.rs +++ b/src/client/auth/aws.rs @@ -83,7 +83,7 @@ async fn authenticate_stream_inner( ); let client_first = sasl_start.into_command(); - let server_first_response = conn.send_command(client_first, None).await?; + let server_first_response = conn.send_message(client_first).await?; let server_first = ServerFirst::parse(server_first_response.auth_response_body(MECH_NAME)?)?; server_first.validate(&nonce)?; @@ -135,7 +135,7 @@ async fn authenticate_stream_inner( let client_second = sasl_continue.into_command(); - let server_second_response = conn.send_command(client_second, None).await?; + let server_second_response = conn.send_message(client_second).await?; let server_second = SaslResponse::parse( MECH_NAME, server_second_response.auth_response_body(MECH_NAME)?, diff --git a/src/client/auth/oidc.rs b/src/client/auth/oidc.rs index 720f4da47..b033c0f4c 100644 --- a/src/client/auth/oidc.rs +++ b/src/client/auth/oidc.rs @@ -880,7 +880,7 @@ async fn send_sasl_command( conn: &mut Connection, command: crate::cmap::Command, ) -> Result { - let response = conn.send_command(command, None).await?; + let response = conn.send_message(command).await?; SaslResponse::parse( MONGODB_OIDC_STR, response.auth_response_body(MONGODB_OIDC_STR)?, diff --git a/src/client/auth/plain.rs b/src/client/auth/plain.rs index 081659bbd..4b1ae5e2f 100644 --- a/src/client/auth/plain.rs +++ b/src/client/auth/plain.rs @@ -35,7 +35,7 @@ pub(crate) async fn authenticate_stream( ) .into_command(); - let response = conn.send_command(sasl_start, None).await?; + let response = conn.send_message(sasl_start).await?; let sasl_response = SaslResponse::parse("PLAIN", response.auth_response_body("PLAIN")?)?; if !sasl_response.done { diff --git a/src/client/auth/scram.rs b/src/client/auth/scram.rs index 00f7c7dcb..82a6a46ca 100644 --- a/src/client/auth/scram.rs +++ b/src/client/auth/scram.rs @@ -151,7 +151,7 @@ impl ScramVersion { let command = client_first.to_command(self); - let server_first = conn.send_command(command, None).await?; + let server_first = conn.send_message(command).await?; Ok(FirstRound { client_first, @@ -215,7 +215,7 @@ impl ScramVersion { let command = client_final.to_command(); - let server_final_response = conn.send_command(command, None).await?; + let server_final_response = conn.send_message(command).await?; let server_final = ServerFinal::parse(server_final_response.auth_response_body("SCRAM")?)?; server_final.validate(salted_password.as_slice(), &client_final, self)?; @@ -231,7 +231,7 @@ impl ScramVersion { ); let command = noop.into_command(); - let server_noop_response = conn.send_command(command, None).await?; + let server_noop_response = conn.send_message(command).await?; let server_noop_response_document: Document = server_noop_response.auth_response_body("SCRAM")?; diff --git a/src/client/auth/x509.rs b/src/client/auth/x509.rs index a0d596e47..9695070c5 100644 --- a/src/client/auth/x509.rs +++ b/src/client/auth/x509.rs @@ -43,7 +43,7 @@ pub(crate) async fn send_client_first( ) -> Result { let command = build_client_first(credential, server_api); - conn.send_command(command, None).await + conn.send_message(command).await } /// Performs X.509 authentication for a given stream. diff --git a/src/client/executor.rs b/src/client/executor.rs index 640084ee3..cd91769a1 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -614,13 +614,12 @@ impl Client { } let should_redact = cmd.should_redact(); - let should_compress = cmd.should_compress(); let cmd_name = cmd.name.clone(); let target_db = cmd.target_db.clone(); - #[allow(unused_mut)] - let mut message = Message::from_command(cmd, Some(request_id))?; + let mut message = Message::try_from(cmd)?; + message.request_id = Some(request_id); #[cfg(feature = "in-use-encryption")] { let guard = self.inner.csfle.read().await; @@ -652,7 +651,7 @@ impl Client { .await; let start_time = Instant::now(); - let command_result = match connection.send_message(message, should_compress).await { + let command_result = match connection.send_message(message).await { Ok(response) => { async fn handle_response( client: &Client, diff --git a/src/cmap/conn.rs b/src/cmap/conn.rs index 630c415b3..c5b9e11bf 100644 --- a/src/cmap/conn.rs +++ b/src/cmap/conn.rs @@ -9,7 +9,11 @@ use derive_where::derive_where; use serde::Serialize; use tokio::{ io::BufStream, - sync::{mpsc, Mutex}, + sync::{ + broadcast::{self, error::RecvError}, + mpsc, + Mutex, + }, }; use self::wire::{Message, MessageFlags}; @@ -171,12 +175,44 @@ impl Connection { self.error.is_some() } + pub(crate) async fn send_message_with_cancellation( + &mut self, + message: impl TryInto>, + cancellation_receiver: &mut broadcast::Receiver<()>, + ) -> Result { + tokio::select! { + biased; + + // A lagged error indicates that more heartbeats failed than the channel's capacity + // between checking out this connection and executing the operation. If this occurs, + // then proceed with cancelling the operation. RecvError::Closed can be ignored, as + // the sender (and by extension the connection pool) dropping does not indicate that + // the operation should be cancelled. + Ok(_) | Err(RecvError::Lagged(_)) = cancellation_receiver.recv() => { + let error: Error = ErrorKind::ConnectionPoolCleared { + message: format!( + "Connection to {} interrupted due to server monitor timeout", + self.address, + ) + }.into(); + self.error = Some(error.clone()); + Err(error) + } + // This future is not cancellation safe because it contains calls to methods that are + // not cancellation safe (e.g. AsyncReadExt::read_exact). However, in the case that + // this future is cancelled because a cancellation message was received, this + // connection will be closed upon being returned to the pool, so any data loss on its + // underlying stream is not an issue. + result = self.send_message(message) => result, + } + } + pub(crate) async fn send_message( &mut self, - message: Message, - // This value is only read if a compression feature flag is enabled. - #[allow(unused_variables)] can_compress: bool, + message: impl TryInto>, ) -> Result { + let message = message.try_into().map_err(Into::into)?; + if self.more_to_come { return Err(Error::internal(format!( "attempted to send a new message to {} but moreToCome bit was set", @@ -192,7 +228,7 @@ impl Connection { feature = "snappy-compression" ))] let write_result = match self.compressor { - Some(ref compressor) if can_compress => { + Some(ref compressor) if message.should_compress => { message .write_op_compressed_to(&mut self.stream, compressor) .await @@ -232,21 +268,6 @@ impl Connection { )) } - /// Executes a `Command` and returns a `CommandResponse` containing the result from the server. - /// - /// An `Ok(...)` result simply means the server received the command and that the driver - /// driver received the response; it does not imply anything about the success of the command - /// itself. - pub(crate) async fn send_command( - &mut self, - command: Command, - request_id: impl Into>, - ) -> Result { - let to_compress = command.should_compress(); - let message = Message::from_command(command, request_id.into())?; - self.send_message(message, to_compress).await - } - /// Receive the next message from the connection. /// This will return an error if the previous response on this connection did not include the /// moreToCome flag. @@ -378,6 +399,7 @@ pub(crate) struct PendingConnection { pub(crate) generation: PoolGeneration, pub(crate) event_emitter: CmapEventEmitter, pub(crate) time_created: Instant, + pub(crate) cancellation_receiver: Option>, } impl PendingConnection { diff --git a/src/cmap/conn/pooled.rs b/src/cmap/conn/pooled.rs index 31fd1ce63..c7ad1ec3e 100644 --- a/src/cmap/conn/pooled.rs +++ b/src/cmap/conn/pooled.rs @@ -5,16 +5,18 @@ use std::{ }; use derive_where::derive_where; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{broadcast, mpsc, Mutex}; use super::{ CmapEventEmitter, Connection, ConnectionGeneration, ConnectionInfo, + Message, PendingConnection, PinnedConnectionHandle, PoolManager, + RawCommandResponse, }; use crate::{ bson::oid::ObjectId, @@ -50,7 +52,7 @@ pub(crate) struct PooledConnection { } /// The state of a pooled connection. -#[derive(Clone, Debug)] +#[derive(Debug)] enum PooledConnectionState { /// The state associated with a connection checked into the connection pool. CheckedIn { available_time: Instant }, @@ -59,6 +61,10 @@ enum PooledConnectionState { CheckedOut { /// The manager used to check this connection back into the pool. pool_manager: PoolManager, + + /// The receiver to receive a cancellation notice. Only present on non-load-balanced + /// connections. + cancellation_receiver: Option>, }, /// The state associated with a pinned connection. @@ -140,6 +146,24 @@ impl PooledConnection { .and_then(|sd| sd.service_id) } + /// Sends a message on this connection. + pub(crate) async fn send_message( + &mut self, + message: impl TryInto>, + ) -> Result { + match self.state { + PooledConnectionState::CheckedOut { + cancellation_receiver: Some(ref mut cancellation_receiver), + .. + } => { + self.connection + .send_message_with_cancellation(message, cancellation_receiver) + .await + } + _ => self.connection.send_message(message).await, + } + } + /// Updates the state of the connection to indicate that it is checked into the pool. pub(crate) fn mark_checked_in(&mut self) { if !matches!(self.state, PooledConnectionState::CheckedIn { .. }) { @@ -155,8 +179,15 @@ impl PooledConnection { } /// Updates the state of the connection to indicate that it is checked out of the pool. - pub(crate) fn mark_checked_out(&mut self, pool_manager: PoolManager) { - self.state = PooledConnectionState::CheckedOut { pool_manager }; + pub(crate) fn mark_checked_out( + &mut self, + pool_manager: PoolManager, + cancellation_receiver: Option>, + ) { + self.state = PooledConnectionState::CheckedOut { + pool_manager, + cancellation_receiver, + }; } /// Whether this connection is idle. @@ -175,15 +206,14 @@ impl PooledConnection { Instant::now().duration_since(available_time) >= max_idle_time } - /// Nullifies the internal state of this connection and returns it in a new [PooledConnection]. - /// If a state is provided, then the new connection will contain that state; otherwise, this - /// connection's state will be cloned. - fn take(&mut self, state: impl Into>) -> Self { + /// Nullifies the internal state of this connection and returns it in a new [PooledConnection] + /// with the given state. + fn take(&mut self, new_state: PooledConnectionState) -> Self { Self { connection: self.connection.take(), generation: self.generation, event_emitter: self.event_emitter.clone(), - state: state.into().unwrap_or_else(|| self.state.clone()), + state: new_state, } } @@ -196,7 +226,9 @@ impl PooledConnection { self.id ))) } - PooledConnectionState::CheckedOut { ref pool_manager } => { + PooledConnectionState::CheckedOut { + ref pool_manager, .. + } => { let (tx, rx) = mpsc::channel(1); self.state = PooledConnectionState::Pinned { // Mark the connection as in-use while the operation currently using the @@ -286,10 +318,11 @@ impl Drop for PooledConnection { // Nothing needs to be done when a checked-in connection is dropped. PooledConnectionState::CheckedIn { .. } => Ok(()), // A checked-out connection should be sent back to the connection pool. - PooledConnectionState::CheckedOut { pool_manager } => { + PooledConnectionState::CheckedOut { pool_manager, .. } => { let pool_manager = pool_manager.clone(); - let mut dropped_connection = self.take(None); - dropped_connection.mark_checked_in(); + let dropped_connection = self.take(PooledConnectionState::CheckedIn { + available_time: Instant::now(), + }); pool_manager.check_in(dropped_connection) } // A pinned connection should be returned to its pinner or to the connection pool. @@ -339,7 +372,11 @@ impl Drop for PooledConnection { } // The pinner of this connection has been dropped while the connection was // sitting in its channel, so the connection should be returned to the pool. - PinnedState::Returned { .. } => pool_manager.check_in(self.take(None)), + PinnedState::Returned { .. } => { + pool_manager.check_in(self.take(PooledConnectionState::CheckedIn { + available_time: Instant::now(), + })) + } } } }; diff --git a/src/cmap/conn/wire/message.rs b/src/cmap/conn/wire/message.rs index 2695f8f75..c746c8b95 100644 --- a/src/cmap/conn/wire/message.rs +++ b/src/cmap/conn/wire/message.rs @@ -28,14 +28,27 @@ use super::{ /// Represents an OP_MSG wire protocol operation. #[derive(Debug)] pub(crate) struct Message { - // OP_MSG payload type 0 + /// OP_MSG payload type 0. pub(crate) document_payload: RawDocumentBuf, - // OP_MSG payload type 1 + + /// OP_MSG payload type 1. pub(crate) document_sequences: Vec, + pub(crate) response_to: i32, + pub(crate) flags: MessageFlags, + pub(crate) checksum: Option, + pub(crate) request_id: Option, + + /// Whether the message should be compressed by the driver. + #[cfg(any( + feature = "zstd-compression", + feature = "zlib-compression", + feature = "snappy-compression" + ))] + pub(crate) should_compress: bool, } #[derive(Clone, Debug)] @@ -44,11 +57,18 @@ pub(crate) struct DocumentSequence { pub(crate) documents: Vec, } -impl Message { - /// Creates a `Message` from a given `Command`. Note that the `response_to` field must be set - /// manually. - pub(crate) fn from_command(command: Command, request_id: Option) -> Result { +/// Creates a Message from a Command. The response_to and request_id fields must be set manually. +impl TryFrom for Message { + type Error = Error; + + fn try_from(command: Command) -> Result { let document_payload = bson::to_raw_document_buf(&command)?; + #[cfg(any( + feature = "zstd-compression", + feature = "zlib-compression", + feature = "snappy-compression" + ))] + let should_compress = command.should_compress(); let mut flags = MessageFlags::empty(); if command.exhaust_allowed { @@ -61,10 +81,18 @@ impl Message { response_to: 0, flags, checksum: None, - request_id, + request_id: None, + #[cfg(any( + feature = "zstd-compression", + feature = "zlib-compression", + feature = "snappy-compression" + ))] + should_compress, }) } +} +impl Message { /// Gets this message's command as a Document. If serialization fails, returns a document /// containing the error. pub(crate) fn get_command_document(&self) -> Document { @@ -233,6 +261,12 @@ impl Message { document_sequences, checksum, request_id: None, + #[cfg(any( + feature = "zstd-compression", + feature = "zlib-compression", + feature = "snappy-compression" + ))] + should_compress: false, }) } diff --git a/src/cmap/establish.rs b/src/cmap/establish.rs index ed44160ed..c2600491a 100644 --- a/src/cmap/establish.rs +++ b/src/cmap/establish.rs @@ -108,11 +108,12 @@ impl ConnectionEstablisher { /// Establishes a connection. pub(crate) async fn establish_connection( &self, - pending_connection: PendingConnection, + mut pending_connection: PendingConnection, credential: Option<&Credential>, ) -> std::result::Result { let pool_gen = pending_connection.generation.clone(); let address = pending_connection.address.clone(); + let cancellation_receiver = pending_connection.cancellation_receiver.take(); let stream = self .make_stream(address) @@ -121,7 +122,10 @@ impl ConnectionEstablisher { let mut connection = PooledConnection::new(pending_connection, stream); #[allow(unused_mut)] - let mut handshake_result = self.handshaker.handshake(&mut connection, credential).await; + let mut handshake_result = self + .handshaker + .handshake(&mut connection, credential, cancellation_receiver) + .await; #[cfg(test)] if let Some(patch) = self.test_patch_reply { patch(&mut handshake_result); @@ -176,7 +180,10 @@ impl ConnectionEstablisher { let stream = self.make_stream(address.clone()).await?; let mut connection = Connection::new(address, stream, id, Instant::now()); - let hello_reply = self.handshaker.handshake(&mut connection, None).await?; + let hello_reply = self + .handshaker + .handshake(&mut connection, None, None) + .await?; Ok((connection, hello_reply)) } diff --git a/src/cmap/establish/handshake.rs b/src/cmap/establish/handshake.rs index 14698d52e..c9b75a538 100644 --- a/src/cmap/establish/handshake.rs +++ b/src/cmap/establish/handshake.rs @@ -5,6 +5,7 @@ use std::env; use bson::{rawdoc, RawBson, RawDocumentBuf}; use once_cell::sync::Lazy; +use tokio::sync::broadcast; #[cfg(any( feature = "zstd-compression", @@ -444,9 +445,10 @@ impl Handshaker { &self, conn: &mut Connection, credential: Option<&Credential>, + cancellation_receiver: Option>, ) -> Result { let (command, client_first) = self.build_command(credential).await?; - let mut hello_reply = run_hello(conn, command).await?; + let mut hello_reply = run_hello(conn, command, cancellation_receiver).await?; conn.stream_description = Some(StreamDescription::from_hello_reply(&hello_reply)); diff --git a/src/cmap/test.rs b/src/cmap/test.rs index 97131b556..fca3495a7 100644 --- a/src/cmap/test.rs +++ b/src/cmap/test.rs @@ -15,7 +15,12 @@ use crate::{ ConnectionPoolOptions, }, error::{Error, ErrorKind, Result}, - event::cmap::{CmapEvent, ConnectionPoolOptions as EventOptions}, + event::cmap::{ + CmapEvent, + ConnectionCheckoutFailedReason, + ConnectionClosedReason, + ConnectionPoolOptions as EventOptions, + }, options::TlsOptions, runtime::{self, AsyncJoinHandle}, sdam::{TopologyUpdater, UpdateMessage}, @@ -30,7 +35,6 @@ use crate::{ Matchable, }, }; -use bson::doc; use super::conn::pooled::PooledConnection; @@ -42,6 +46,11 @@ const TEST_DESCRIPTIONS_TO_SKIP: &[&str] = &[ "waiting on maxConnecting is limited by WaitQueueTimeoutMS", // TODO DRIVERS-1785 remove this skip when test event order is fixed "error during minPoolSize population clears pool", + // TODO RUST-2106: unskip this test + "Pool clear SHOULD schedule the next background thread run immediately \ + (interruptInUseConnections = false)", + // TODO RUST-1052: unskip this test and investigate flaky failure linked in ticket + "threads blocked by maxConnecting check out minPoolSize connections", ]; /// Many different types of CMAP events are emitted from tasks spawned in the drop @@ -288,16 +297,20 @@ impl Operation { ) }); } - Operation::Clear => { + Operation::Clear { + interrupt_in_use_connections, + } => { + let error = if interrupt_in_use_connections == Some(true) { + Error::network_timeout() + } else { + ErrorKind::Internal { + message: "test error".to_string(), + } + .into() + }; + if let Some(pool) = state.pool.read().await.as_ref() { - pool.clear( - ErrorKind::Internal { - message: "test error".to_string(), - } - .into(), - None, - ) - .await; + pool.clear(error, None).await; } } Operation::Ready => { @@ -386,11 +399,17 @@ impl Matchable for CmapEvent { actual.connection_id.matches(&expected.connection_id) } (CmapEvent::ConnectionClosed(actual), CmapEvent::ConnectionClosed(ref expected)) => { - eq_matches("reason", &actual.reason, &expected.reason)?; - actual - .connection_id - .matches(&expected.connection_id) - .prefix("connection_id")?; + if expected.reason != ConnectionClosedReason::Unset { + eq_matches("reason", &actual.reason, &expected.reason)?; + } + // 0 is used as a placeholder for test events that do not specify a value; the + // driver will never actually generate a connection ID with this value. + if expected.connection_id != 0 { + actual + .connection_id + .matches(&expected.connection_id) + .prefix("connection_id")?; + } Ok(()) } ( @@ -405,14 +424,10 @@ impl Matchable for CmapEvent { CmapEvent::ConnectionCheckoutFailed(actual), CmapEvent::ConnectionCheckoutFailed(ref expected), ) => { - if actual.reason == expected.reason { - Ok(()) - } else { - Err(format!( - "expected reason {:?}, got {:?}", - expected.reason, actual.reason - )) + if expected.reason != ConnectionCheckoutFailedReason::Unset { + eq_matches("reason", &actual.reason, &expected.reason)?; } + Ok(()) } (CmapEvent::ConnectionCheckoutStarted(_), CmapEvent::ConnectionCheckoutStarted(_)) => { Ok(()) @@ -425,9 +440,9 @@ impl Matchable for CmapEvent { } } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn cmap_spec_tests() { - async fn run_cmap_spec_tests(test_file: TestFile) { + async fn run_cmap_spec_tests(mut test_file: TestFile) { if TEST_DESCRIPTIONS_TO_SKIP.contains(&test_file.description.as_str()) { return; } @@ -451,28 +466,14 @@ async fn cmap_spec_tests() { } } - let should_disable_fp = test_file.fail_point.is_some(); - if let Some(ref fail_point) = test_file.fail_point { - client - .database("admin") - .run_command(fail_point.clone()) - .await - .unwrap(); - } + let _guard = if let Some(fail_point) = test_file.fail_point.take() { + Some(client.enable_fail_point(fail_point).await.unwrap()) + } else { + None + }; let executor = Executor::new(test_file).await; executor.execute_test().await; - - if should_disable_fp { - client - .database("admin") - .run_command(doc! { - "configureFailPoint": "failCommand", - "mode": "off" - }) - .await - .unwrap(); - } } run_spec_test( diff --git a/src/cmap/test/event.rs b/src/cmap/test/event.rs index 1328ed2bf..1e12167f9 100644 --- a/src/cmap/test/event.rs +++ b/src/cmap/test/event.rs @@ -154,7 +154,7 @@ where #[derive(Debug, Deserialize)] struct ConnectionCheckoutFailedHelper { - pub reason: CheckoutFailedReasonHelper, + reason: Option, } #[derive(Debug, Deserialize)] @@ -173,16 +173,16 @@ where { let helper = ConnectionCheckoutFailedHelper::deserialize(deserializer)?; - // The driver doesn't have a concept of a "closed pool", instead having the pool closed when the - // pool is dropped. Because of this, the driver doesn't implement the "poolClosed" reason for a - // connection checkout failure. While we skip over the corresponding tests in our spec test - // runner, we still need to be able to deserialize the "poolClosed" reason to avoid the test - // harness panicking, so we arbitrarily map the "poolClosed" to "connectionError". let reason = match helper.reason { - CheckoutFailedReasonHelper::PoolClosed | CheckoutFailedReasonHelper::ConnectionError => { + Some(CheckoutFailedReasonHelper::ConnectionError) => { ConnectionCheckoutFailedReason::ConnectionError } - CheckoutFailedReasonHelper::Timeout => ConnectionCheckoutFailedReason::Timeout, + Some(CheckoutFailedReasonHelper::Timeout) => ConnectionCheckoutFailedReason::Timeout, + // The driver does not implement the tests that use the PoolClosed reason, so we map the + // test value to unset to allow for deserialization. + Some(CheckoutFailedReasonHelper::PoolClosed) | None => { + ConnectionCheckoutFailedReason::Unset + } }; Ok(ConnectionCheckoutFailedEvent { diff --git a/src/cmap/test/file.rs b/src/cmap/test/file.rs index 307b8c0a0..01ff71920 100644 --- a/src/cmap/test/file.rs +++ b/src/cmap/test/file.rs @@ -8,9 +8,8 @@ use crate::{ error::Result, event::cmap::CmapEvent, serde_util, - test::RunOn, + test::{util::fail_point::FailPoint, RunOn}, }; -use bson::Document; #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase", deny_unknown_fields)] @@ -26,7 +25,7 @@ pub struct TestFile { pub(crate) events: Vec, #[serde(default)] pub ignore: Vec, - pub fail_point: Option, + pub fail_point: Option, pub(crate) run_on: Option>, } @@ -85,7 +84,11 @@ pub enum Operation { CheckIn { connection: String, }, - Clear, + #[serde(rename_all = "camelCase")] + Clear { + #[serde(default)] + interrupt_in_use_connections: Option, + }, Close, Ready, } diff --git a/src/cmap/test/integration.rs b/src/cmap/test/integration.rs index 26c5c56ed..30f08e73a 100644 --- a/src/cmap/test/integration.rs +++ b/src/cmap/test/integration.rs @@ -65,7 +65,7 @@ async fn acquire_connection_and_send_command() { cmd.set_server_api(server_api); } - let response = connection.send_command(cmd, None).await.unwrap(); + let response = connection.send_message(cmd).await.unwrap(); let doc_response: CommandResponse = response.body().unwrap(); assert!(doc_response.is_success()); diff --git a/src/cmap/worker.rs b/src/cmap/worker.rs index 0faa396aa..a56ffaccf 100644 --- a/src/cmap/worker.rs +++ b/src/cmap/worker.rs @@ -1,3 +1,5 @@ +use tokio::sync::broadcast; + #[cfg(test)] use super::options::BackgroundThreadInterval; use super::{ @@ -136,6 +138,9 @@ pub(crate) struct ConnectionPoolWorker { /// The maximum number of new connections that can be created concurrently. max_connecting: u32, + + /// Sender used to broadcast cancellation notices to checked-out connections. + cancellation_sender: Option>, } impl ConnectionPoolWorker { @@ -215,6 +220,16 @@ impl ConnectionPoolWorker { let credential = options.and_then(|o| o.credential); + let cancellation_sender = if !is_load_balanced { + // There's not necessarily an upper bound on the number of messages that could exist in + // this channel; however, connections use both successfully receiving a message in the + // channel and receiving a lagged error as an indication that cancellation should occur, + // so we use an artificial bound of one message. + Some(broadcast::channel(1).0) + } else { + None + }; + let worker = ConnectionPoolWorker { address, event_emitter, @@ -240,6 +255,7 @@ impl ConnectionPoolWorker { maintenance_frequency, server_updater, max_connecting, + cancellation_sender, }; runtime::spawn(async move { @@ -399,7 +415,7 @@ impl ConnectionPoolWorker { continue; } - conn.mark_checked_out(self.manager.clone()); + conn.mark_checked_out(self.manager.clone(), self.get_cancellation_receiver()); if let Err(request) = request.fulfill(ConnectionRequestResult::Pooled(Box::new(conn))) { @@ -422,6 +438,7 @@ impl ConnectionPoolWorker { let manager = self.manager.clone(); let server_updater = self.server_updater.clone(); let credential = self.credential.clone(); + let cancellation_receiver = self.get_cancellation_receiver(); let handle = runtime::spawn(async move { let mut establish_result = establish_connection( @@ -435,7 +452,7 @@ impl ConnectionPoolWorker { .await; if let Ok(ref mut c) = establish_result { - c.mark_checked_out(manager.clone()); + c.mark_checked_out(manager.clone(), cancellation_receiver); manager.handle_connection_succeeded(ConnectionSucceeded::Used { service_id: c.generation.service_id(), }); @@ -465,6 +482,7 @@ impl ConnectionPoolWorker { generation: self.generation.clone(), event_emitter: self.event_emitter.clone(), time_created: Instant::now(), + cancellation_receiver: self.get_cancellation_receiver(), }; self.next_connection_id += 1; self.event_emitter @@ -514,6 +532,13 @@ impl ConnectionPoolWorker { } fn clear(&mut self, cause: Error, service_id: Option) { + let interrupt_in_use_connections = cause.is_network_timeout(); + if interrupt_in_use_connections { + if let Some(ref cancellation_sender) = self.cancellation_sender { + let _ = cancellation_sender.send(()); + } + } + let was_ready = match (&mut self.generation, service_id) { (PoolGeneration::Normal(gen), None) => { *gen += 1; @@ -534,6 +559,7 @@ impl ConnectionPoolWorker { PoolClearedEvent { address: self.address.clone(), service_id, + interrupt_in_use_connections, } .into() }); @@ -646,6 +672,14 @@ impl ConnectionPoolWorker { } } } + + /// Returns a receiver for the pool's cancellation sender if this pool is not in load-balanced + /// mode. The returned receiver will only receive messages sent after this method is called. + fn get_cancellation_receiver(&self) -> Option> { + self.cancellation_sender + .as_ref() + .map(|sender| sender.subscribe()) + } } /// Helper covering the common connection establishment behavior between diff --git a/src/event/cmap.rs b/src/event/cmap.rs index 1b90d2a9f..b03b99a09 100644 --- a/src/event/cmap.rs +++ b/src/event/cmap.rs @@ -3,12 +3,12 @@ use std::time::Duration; -use serde::{Deserialize, Serialize}; - -use crate::{bson::oid::ObjectId, options::ServerAddress, serde_util}; use derive_more::From; #[cfg(feature = "tracing-unstable")] use derive_where::derive_where; +use serde::{Deserialize, Serialize}; + +use crate::{bson::oid::ObjectId, options::ServerAddress, serde_util}; #[cfg(feature = "tracing-unstable")] use crate::trace::{ @@ -95,6 +95,10 @@ pub struct PoolClearedEvent { /// If the connection is to a load balancer, the id of the selected backend. pub service_id: Option, + + /// Whether in-use connections were interrupted when the pool cleared. + #[serde(default)] + pub interrupt_in_use_connections: bool, } /// Event emitted when a connection pool is cleared. @@ -140,7 +144,7 @@ pub struct ConnectionReadyEvent { pub connection_id: u32, /// The time it took to establish the connection. - #[serde(default = "Duration::default")] + #[serde(skip_deserializing)] pub duration: Duration, } @@ -162,6 +166,7 @@ pub struct ConnectionClosedEvent { pub connection_id: u32, /// The reason that the connection was closed. + #[cfg_attr(test, serde(default = "unset_connection_closed_reason"))] pub reason: ConnectionClosedReason, /// If the `reason` connection checkout failed was `Error`,the associated @@ -192,6 +197,14 @@ pub enum ConnectionClosedReason { /// The pool that the connection belongs to has been closed. PoolClosed, + + #[cfg(test)] + Unset, +} + +#[cfg(test)] +fn unset_connection_closed_reason() -> ConnectionClosedReason { + ConnectionClosedReason::Unset } /// Event emitted when a thread begins checking out a connection to use for an operation. @@ -216,6 +229,7 @@ pub struct ConnectionCheckoutFailedEvent { pub address: ServerAddress, /// The reason a connection was unable to be checked out. + #[cfg_attr(test, serde(default = "unset_connection_checkout_failed_reason"))] pub reason: ConnectionCheckoutFailedReason, /// If the `reason` connection checkout failed was `ConnectionError`,the associated @@ -227,7 +241,7 @@ pub struct ConnectionCheckoutFailedEvent { pub(crate) error: Option, /// See [ConnectionCheckedOutEvent::duration]. - #[serde(default = "Duration::default")] + #[serde(skip_deserializing)] pub duration: Duration, } @@ -242,6 +256,14 @@ pub enum ConnectionCheckoutFailedReason { /// An error occurred while trying to establish a connection (e.g. during the handshake or /// authentication). ConnectionError, + + #[cfg(test)] + Unset, +} + +#[cfg(test)] +fn unset_connection_checkout_failed_reason() -> ConnectionCheckoutFailedReason { + ConnectionCheckoutFailedReason::Unset } /// Event emitted when a connection is successfully checked out. @@ -260,7 +282,7 @@ pub struct ConnectionCheckedOutEvent { pub connection_id: u32, /// The time it took to check out the connection. - #[serde(default = "Duration::default")] + #[serde(skip_deserializing)] pub duration: Duration, } diff --git a/src/hello.rs b/src/hello.rs index 44515fee8..c11bccf82 100644 --- a/src/hello.rs +++ b/src/hello.rs @@ -2,6 +2,7 @@ use std::time::Duration; use bson::{rawdoc, RawDocumentBuf}; use serde::{Deserialize, Serialize}; +use tokio::sync::broadcast; use crate::{ bson::{doc, oid::ObjectId, DateTime, Document, Timestamp}, @@ -72,8 +73,18 @@ pub(crate) fn hello_command( } /// Execute a hello or legacy hello command. -pub(crate) async fn run_hello(conn: &mut Connection, command: Command) -> Result { - let response_result = conn.send_command(command, None).await; +pub(crate) async fn run_hello( + conn: &mut Connection, + command: Command, + mut cancellation_receiver: Option>, +) -> Result { + let response_result = match cancellation_receiver { + Some(ref mut cancellation_receiver) => { + conn.send_message_with_cancellation(command, cancellation_receiver) + .await + } + None => conn.send_message(command).await, + }; response_result.and_then(|raw_response| raw_response.into_hello_reply()) } diff --git a/src/operation.rs b/src/operation.rs index 0062de425..f0af1b1f6 100644 --- a/src/operation.rs +++ b/src/operation.rs @@ -310,6 +310,11 @@ impl Command { REDACTED_COMMANDS.contains(name.as_str()) || should_redact_body(&self.body) } + #[cfg(any( + feature = "zstd-compression", + feature = "zlib-compression", + feature = "snappy-compression" + ))] pub(crate) fn should_compress(&self) -> bool { let name = self.name.to_lowercase(); !REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str()) diff --git a/src/sdam/monitor.rs b/src/sdam/monitor.rs index 524e29410..9a530cf1d 100644 --- a/src/sdam/monitor.rs +++ b/src/sdam/monitor.rs @@ -250,7 +250,7 @@ impl Monitor { opts, ); - run_hello(conn, command).await + run_hello(conn, command, None).await } } None => { @@ -448,7 +448,7 @@ impl RttMonitor { Some(conn.stream_description()?.hello_ok), None, ); - conn.send_command(command, None).await?; + conn.send_message(command).await?; } None => { let connection = self diff --git a/src/test/spec/handshake.rs b/src/test/spec/handshake.rs index 43212ad5b..28c44011f 100644 --- a/src/test/spec/handshake.rs +++ b/src/test/spec/handshake.rs @@ -32,6 +32,7 @@ async fn arbitrary_auth_mechanism() { generation: crate::cmap::PoolGeneration::normal(), event_emitter: CmapEventEmitter::new(None, ObjectId::new()), time_created: Instant::now(), + cancellation_receiver: None, }; establisher .establish_connection(pending, None) diff --git a/src/test/spec/json/connection-monitoring-and-pooling/README.md b/src/test/spec/json/connection-monitoring-and-pooling/README.md new file mode 100644 index 000000000..3d8aee40a --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/README.md @@ -0,0 +1,27 @@ +# Connection Monitoring and Pooling (CMAP) + +______________________________________________________________________ + +## Introduction + +Drivers MUST implement all of the following types of CMAP tests: + +- Pool unit and integration tests as described in [cmap-format/README](./cmap-format/README.md) +- Pool prose tests as described below in [Prose Tests](#prose-tests) +- Logging tests as described below in [Logging Tests](#logging-tests) + +## Prose Tests + +The following tests have not yet been automated, but MUST still be tested: + +1. All ConnectionPoolOptions MUST be specified at the MongoClient level +2. All ConnectionPoolOptions MUST be the same for all pools created by a MongoClient +3. A user MUST be able to specify all ConnectionPoolOptions via a URI string +4. A user MUST be able to subscribe to Connection Monitoring Events in a manner idiomatic to their language and driver +5. When a check out attempt fails because connection set up throws an error, assert that a ConnectionCheckOutFailedEvent + with reason="connectionError" is emitted. + +## Logging Tests + +Tests for connection pool logging can be found in the `/logging` subdirectory and are written in the +[Unified Test Format](../../unified-test-format/unified-test-format.md). diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.md b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.md new file mode 100644 index 000000000..ced96961f --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.md @@ -0,0 +1,167 @@ +# Connection Monitoring and Pooling (CMAP) Unit and Integration Tests + +______________________________________________________________________ + +## Introduction + +The YAML and JSON files in this directory are platform-independent tests that drivers can use to prove their conformance +to the Connection Monitoring and Pooling (CMAP) Spec. + +## Common Test Format + +Each YAML file has the following keys: + +- `version`: A version number indicating the expected format of the spec tests (current version = 1) +- `style`: A string indicating what style of tests this file contains. Contains one of the following: + - `"unit"`: a test that may be run without connecting to a MongoDB deployment. + - `"integration"`: a test that MUST be run against a real MongoDB deployment. +- `description`: A text description of what the test is meant to assert + +## Unit Test Format: + +All Unit Tests have some of the following fields: + +- `poolOptions`: If present, connection pool options to use when creating a pool; both + [standard ConnectionPoolOptions](../../connection-monitoring-and-pooling.md#connection-pool-options) and the + following test-specific options are allowed: + - `backgroundThreadIntervalMS`: A time interval between the end of a + [Background Thread Run](../../connection-monitoring-and-pooling.md#background-thread) and the beginning of the + next Run. If a Connection Pool does not implement a Background Thread, the Test Runner MUST ignore the option. If + the option is not specified, an implementation is free to use any value it finds reasonable. + + Possible values (0 is not allowed): + + - A negative value: never begin a Run. + - A positive value: the interval between Runs in milliseconds. +- `operations`: A list of operations to perform. All operations support the following fields: + - `name`: A string describing which operation to issue. + - `thread`: The name of the thread in which to run this operation. If not specified, runs in the default thread +- `error`: Indicates that the main thread is expected to error during this test. An error may include of the following + fields: + - `type`: the type of error emitted + - `message`: the message associated with that error + - `address`: Address of pool emitting error +- `events`: An array of all connection monitoring events expected to occur while running `operations`. An event may + contain any of the following fields + - `type`: The type of event emitted + - `address`: The address of the pool emitting the event + - `connectionId`: The id of a connection associated with the event + - `duration`: The event duration + - `options`: Options used to create the pool + - `reason`: A reason giving more information on why the event was emitted +- `ignore`: An array of event names to ignore + +Valid Unit Test Operations are the following: + +- `start(target)`: Starts a new thread named `target` + - `target`: The name of the new thread to start +- `wait(ms)`: Sleep the current thread for `ms` milliseconds + - `ms`: The number of milliseconds to sleep the current thread for +- `waitForThread(target)`: wait for thread `target` to finish executing. Propagate any errors to the main thread. + - `target`: The name of the thread to wait for. +- `waitForEvent(event, count, timeout)`: block the current thread until `event` has occurred `count` times + - `event`: The name of the event + - `count`: The number of times the event must occur (counting from the start of the test) + - `timeout`: If specified, time out with an error after waiting for this many milliseconds without seeing the required + events +- `label = pool.checkOut()`: call `checkOut` on pool, returning the checked out connection + - `label`: If specified, associate this label with the returned connection, so that it may be referenced in later + operations +- `pool.checkIn(connection)`: call `checkIn` on pool + - `connection`: A string label identifying which connection to check in. Should be a label that was previously set + with `checkOut` +- `pool.clear()`: call `clear` on Pool + - `interruptInUseConnections`: Determines whether "in use" connections should be also interrupted +- `pool.close()`: call `close` on Pool +- `pool.ready()`: call `ready` on Pool + +## Integration Test Format + +The integration test format is identical to the unit test format with the addition of the following fields to each test: + +- `runOn` (optional): An array of server version and/or topology requirements for which the tests can be run. If the + test environment satisfies one or more of these requirements, the tests may be executed; otherwise, this test should + be skipped. If this field is omitted, the tests can be assumed to have no particular requirements and should be + executed. Each element will have some or all of the following fields: + - `minServerVersion` (optional): The minimum server version (inclusive) required to successfully run the tests. If + this field is omitted, it should be assumed that there is no lower bound on the required server version. + - `maxServerVersion` (optional): The maximum server version (inclusive) against which the tests can be run + successfully. If this field is omitted, it should be assumed that there is no upper bound on the required server + version. +- `failPoint`: optional, a document containing a `configureFailPoint` command to run against the endpoint being used for + the test. +- `poolOptions.appName` (optional): appName attribute to be set in connections, which will be affected by the fail + point. + +## Spec Test Match Function + +The definition of MATCH or MATCHES in the Spec Test Runner is as follows: + +- MATCH takes two values, `expected` and `actual` +- Notation is "Assert `actual` MATCHES `expected`" +- Assertion passes if `expected` is a subset of `actual`, with the values `42` and `"42"` acting as placeholders for + "any value" + +Pseudocode implementation of `actual` MATCHES `expected`: + +```text +If expected is "42" or 42: + Assert that actual exists (is not null or undefined) +Else: + Assert that actual is of the same JSON type as expected + If expected is a JSON array: + For every idx/value in expected: + Assert that actual[idx] MATCHES value + Else if expected is a JSON object: + For every key/value in expected + Assert that actual[key] MATCHES value + Else: + Assert that expected equals actual +``` + +## Unit Test Runner: + +For the unit tests, the behavior of a Connection is irrelevant beyond the need to asserting `connection.id`. Drivers MAY +use a mock connection class for testing the pool behavior in unit tests + +For each YAML file with `style: unit`: + +- Create a Pool `pool`, subscribe and capture any Connection Monitoring events emitted in order. + - If `poolOptions` is specified, use those options to initialize both pools + - The returned pool must have an `address` set as a string value. +- Process each `operation` in `operations` (on the main thread) + - If a `thread` is specified, the main thread MUST schedule the operation to execute in the corresponding thread. + Otherwise, execute the operation directly in the main thread. +- If `error` is presented + - Assert that an actual error `actualError` was thrown by the main thread + - Assert that `actualError` MATCHES `error` +- Else: + - Assert that no errors were thrown by the main thread +- calculate `actualEvents` as every Connection Event emitted whose `type` is not in `ignore` +- if `events` is not empty, then for every `idx`/`expectedEvent` in `events` + - Assert that `actualEvents[idx]` exists + - Assert that `actualEvents[idx]` MATCHES `expectedEvent` + +It is important to note that the `ignore` list is used for calculating `actualEvents`, but is NOT used for the +`waitForEvent` command + +## Integration Test Runner + +The steps to run the integration tests are the same as those used to run the unit tests with the following +modifications: + +- The integration tests MUST be run against an actual endpoint. If the deployment being tested contains multiple + endpoints, then the runner MUST only use one of them to run the tests against. + +- For each test, if `failPoint` is specified, its value is a `configureFailPoint` command. Run the command on the admin + database of the endpoint being tested to enable the fail point. + +- At the end of each test, any enabled fail point MUST be disabled to avoid spurious failures in subsequent tests. The + fail point may be disabled like so: + + ```javascript + db.adminCommand({ + configureFailPoint: "", + mode: "off" + }); + ``` diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.rst b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.rst deleted file mode 100644 index 5bb72dd0f..000000000 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/README.rst +++ /dev/null @@ -1,215 +0,0 @@ -.. role:: javascript(code) - :language: javascript - -=================================================================== -Connection Monitoring and Pooling (CMAP) Unit and Integration Tests -=================================================================== - -.. contents:: - --------- - -Introduction -============ - -The YAML and JSON files in this directory are platform-independent tests that -drivers can use to prove their conformance to the Connection Monitoring and Pooling (CMAP) Spec. - -Common Test Format -================== - -Each YAML file has the following keys: - -- ``version``: A version number indicating the expected format of the spec tests (current version = 1) -- ``style``: A string indicating what style of tests this file contains. Contains one of the following: - - - ``"unit"``: a test that may be run without connecting to a MongoDB deployment. - - ``"integration"``: a test that MUST be run against a real MongoDB deployment. - -- ``description``: A text description of what the test is meant to assert - -Unit Test Format: -================= - -All Unit Tests have some of the following fields: - -- ``poolOptions``: If present, connection pool options to use when creating a pool; - both `standard ConnectionPoolOptions `__ - and the following test-specific options are allowed: - - - ``backgroundThreadIntervalMS``: A time interval between the end of a - `Background Thread Run `__ - and the beginning of the next Run. If a Connection Pool does not implement a Background Thread, the Test Runner MUST ignore the option. - If the option is not specified, an implementation is free to use any value it finds reasonable. - - Possible values (0 is not allowed): - - - A negative value: never begin a Run. - - A positive value: the interval between Runs in milliseconds. - -- ``operations``: A list of operations to perform. All operations support the following fields: - - - ``name``: A string describing which operation to issue. - - ``thread``: The name of the thread in which to run this operation. If not specified, runs in the default thread - -- ``error``: Indicates that the main thread is expected to error during this test. An error may include of the following fields: - - - ``type``: the type of error emitted - - ``message``: the message associated with that error - - ``address``: Address of pool emitting error - -- ``events``: An array of all connection monitoring events expected to occur while running ``operations``. An event may contain any of the following fields - - - ``type``: The type of event emitted - - ``address``: The address of the pool emitting the event - - ``connectionId``: The id of a connection associated with the event - - ``options``: Options used to create the pool - - ``reason``: A reason giving mroe information on why the event was emitted - -- ``ignore``: An array of event names to ignore - -Valid Unit Test Operations are the following: - -- ``start(target)``: Starts a new thread named ``target`` - - - ``target``: The name of the new thread to start - -- ``wait(ms)``: Sleep the current thread for ``ms`` milliseconds - - - ``ms``: The number of milliseconds to sleep the current thread for - -- ``waitForThread(target)``: wait for thread ``target`` to finish executing. Propagate any errors to the main thread. - - - ``target``: The name of the thread to wait for. - -- ``waitForEvent(event, count, timeout)``: block the current thread until ``event`` has occurred ``count`` times - - - ``event``: The name of the event - - ``count``: The number of times the event must occur (counting from the start of the test) - - ``timeout``: If specified, time out with an error after waiting for this many milliseconds without seeing the required events - -- ``label = pool.checkOut()``: call ``checkOut`` on pool, returning the checked out connection - - - ``label``: If specified, associate this label with the returned connection, so that it may be referenced in later operations - -- ``pool.checkIn(connection)``: call ``checkIn`` on pool - - - ``connection``: A string label identifying which connection to check in. Should be a label that was previously set with ``checkOut`` - -- ``pool.clear()``: call ``clear`` on Pool - - - ``interruptInUseConnections``: Determines whether "in use" connections should be also interrupted - -- ``pool.close()``: call ``close`` on Pool -- ``pool.ready()``: call ``ready`` on Pool - - -Integration Test Format -======================= - -The integration test format is identical to the unit test format with -the addition of the following fields to each test: - -- ``runOn`` (optional): An array of server version and/or topology requirements - for which the tests can be run. If the test environment satisfies one or more - of these requirements, the tests may be executed; otherwise, this test should - be skipped. If this field is omitted, the tests can be assumed to have no - particular requirements and should be executed. Each element will have some or - all of the following fields: - - - ``minServerVersion`` (optional): The minimum server version (inclusive) - required to successfully run the tests. If this field is omitted, it should - be assumed that there is no lower bound on the required server version. - - - ``maxServerVersion`` (optional): The maximum server version (inclusive) - against which the tests can be run successfully. If this field is omitted, - it should be assumed that there is no upper bound on the required server - version. - -- ``failPoint``: optional, a document containing a ``configureFailPoint`` - command to run against the endpoint being used for the test. - -- ``poolOptions.appName`` (optional): appName attribute to be set in connections, which will be affected by the fail point. - -Spec Test Match Function -======================== - -The definition of MATCH or MATCHES in the Spec Test Runner is as follows: - -- MATCH takes two values, ``expected`` and ``actual`` -- Notation is "Assert [actual] MATCHES [expected] -- Assertion passes if ``expected`` is a subset of ``actual``, with the values ``42`` and ``"42"`` acting as placeholders for "any value" - -Pseudocode implementation of ``actual`` MATCHES ``expected``: - -:: - - If expected is "42" or 42: - Assert that actual exists (is not null or undefined) - Else: - Assert that actual is of the same JSON type as expected - If expected is a JSON array: - For every idx/value in expected: - Assert that actual[idx] MATCHES value - Else if expected is a JSON object: - For every key/value in expected - Assert that actual[key] MATCHES value - Else: - Assert that expected equals actual - -Unit Test Runner: -================= - -For the unit tests, the behavior of a Connection is irrelevant beyond the need to asserting ``connection.id``. Drivers MAY use a mock connection class for testing the pool behavior in unit tests - -For each YAML file with ``style: unit``: - -- Create a Pool ``pool``, subscribe and capture any Connection Monitoring events emitted in order. - - - If ``poolOptions`` is specified, use those options to initialize both pools - - The returned pool must have an ``address`` set as a string value. - -- Process each ``operation`` in ``operations`` (on the main thread) - - - If a ``thread`` is specified, the main thread MUST schedule the operation to execute in the corresponding thread. Otherwise, execute the operation directly in the main thread. - -- If ``error`` is presented - - - Assert that an actual error ``actualError`` was thrown by the main thread - - Assert that ``actualError`` MATCHES ``error`` - -- Else: - - - Assert that no errors were thrown by the main thread - -- calculate ``actualEvents`` as every Connection Event emitted whose ``type`` is not in ``ignore`` -- if ``events`` is not empty, then for every ``idx``/``expectedEvent`` in ``events`` - - - Assert that ``actualEvents[idx]`` exists - - Assert that ``actualEvents[idx]`` MATCHES ``expectedEvent`` - - -It is important to note that the ``ignore`` list is used for calculating ``actualEvents``, but is NOT used for the ``waitForEvent`` command - -Integration Test Runner -======================= - -The steps to run the integration tests are the same as those used to run the -unit tests with the following modifications: - -- The integration tests MUST be run against an actual endpoint. If the - deployment being tested contains multiple endpoints, then the runner MUST - only use one of them to run the tests against. - -- For each test, if `failPoint` is specified, its value is a - ``configureFailPoint`` command. Run the command on the admin database of the - endpoint being tested to enable the fail point. - -- At the end of each test, any enabled fail point MUST be disabled to avoid - spurious failures in subsequent tests. The fail point may be disabled like - so:: - - db.adminCommand({ - configureFailPoint: , - mode: "off" - }); diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.json index 41c522ae6..3f37f188c 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.json @@ -22,7 +22,8 @@ { "type": "ConnectionCheckedOut", "connectionId": 1, - "address": 42 + "address": 42, + "duration": 42 }, { "type": "ConnectionCheckedIn", @@ -32,7 +33,8 @@ { "type": "ConnectionCheckedOut", "connectionId": 1, - "address": 42 + "address": 42, + "duration": 42 } ], "ignore": [ diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.yml index 517943278..9dbd5aebe 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkin-make-available.yml @@ -12,12 +12,14 @@ events: - type: ConnectionCheckedOut connectionId: 1 address: 42 + duration: 42 - type: ConnectionCheckedIn connectionId: 1 address: 42 - type: ConnectionCheckedOut connectionId: 1 address: 42 + duration: 42 ignore: - ConnectionPoolCreated - ConnectionPoolReady diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.json index d89b34260..c7e8914d4 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.json @@ -23,12 +23,14 @@ { "type": "ConnectionReady", "connectionId": 1, - "address": 42 + "address": 42, + "duration": 42 }, { "type": "ConnectionCheckedOut", "connectionId": 1, - "address": 42 + "address": 42, + "duration": 42 } ], "ignore": [ diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.yml index bbbd03ff5..1d94778dd 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-connection.yml @@ -13,9 +13,11 @@ events: - type: ConnectionReady connectionId: 1 address: 42 + duration: 42 - type: ConnectionCheckedOut connectionId: 1 address: 42 + duration: 42 ignore: - ConnectionPoolReady - ConnectionPoolCreated diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-custom-maxConnecting-is-enforced.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-custom-maxConnecting-is-enforced.json index 931eb8659..6620f82fd 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-custom-maxConnecting-is-enforced.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-custom-maxConnecting-is-enforced.json @@ -78,4 +78,4 @@ "ConnectionPoolCreated", "ConnectionPoolReady" ] -} \ No newline at end of file +} diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-custom-maxConnecting-is-enforced.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-custom-maxConnecting-is-enforced.yml new file mode 100644 index 000000000..dc8852696 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-custom-maxConnecting-is-enforced.yml @@ -0,0 +1,50 @@ +version: 1 +style: integration +description: custom maxConnecting is enforced +runOn: + - + minServerVersion: "4.4.0" +failPoint: + configureFailPoint: failCommand + mode: "alwaysOn" + data: + failCommands: ["isMaster","hello"] + closeConnection: false + blockConnection: true + blockTimeMS: 500 +poolOptions: + maxConnecting: 1 + # gives opportunity for the checkout in thread2 to establish a new connection, which it must not do until thread1 establishes one + maxPoolSize: 2 + waitQueueTimeoutMS: 5000 +operations: + - name: ready + # thread1 exists to consume the single permit to open a connection, + # so that thread2 would be blocked acquiring a permit, which results in ordering its ConnectionCreated event after + # the ConnectionReady event from thread1. + - name: start + target: thread1 + - name: start + target: thread2 + - name: checkOut + thread: thread1 + - name: waitForEvent + event: ConnectionCreated + count: 1 + - name: checkOut + thread: thread2 + - name: waitForEvent + event: ConnectionReady + count: 2 +events: + - type: ConnectionCreated + - type: ConnectionReady + - type: ConnectionCreated + - type: ConnectionReady +ignore: + - ConnectionCheckOutStarted + - ConnectionCheckedIn + - ConnectionCheckedOut + - ConnectionClosed + - ConnectionPoolCreated + - ConnectionPoolReady diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.json index ee2926e1c..614403ef5 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.json @@ -38,7 +38,8 @@ { "type": "ConnectionCheckedOut", "address": 42, - "connectionId": 42 + "connectionId": 42, + "duration": 42 }, { "type": "ConnectionCheckedIn", @@ -56,6 +57,7 @@ { "type": "ConnectionCheckOutFailed", "address": 42, + "duration": 42, "reason": "poolClosed" } ], diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.yml index 4d1b0f3b2..2d0ce8d11 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-error-closed.yml @@ -21,6 +21,7 @@ events: - type: ConnectionCheckedOut address: 42 connectionId: 42 + duration: 42 - type: ConnectionCheckedIn address: 42 connectionId: 42 @@ -30,6 +31,7 @@ events: address: 42 - type: ConnectionCheckOutFailed address: 42 + duration: 42 reason: poolClosed ignore: - ConnectionPoolReady diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.json index 732478bf7..3a63818bf 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.json @@ -19,7 +19,7 @@ ], "closeConnection": false, "blockConnection": true, - "blockTimeMS": 750 + "blockTimeMS": 800 } }, "poolOptions": { @@ -53,7 +53,7 @@ }, { "name": "wait", - "ms": 100 + "ms": 400 }, { "name": "checkOut", diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.yml index 1b7c4bdee..2ea7333d8 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-is-enforced.yml @@ -13,7 +13,7 @@ failPoint: failCommands: ["isMaster","hello"] closeConnection: false blockConnection: true - blockTimeMS: 750 + blockTimeMS: 800 poolOptions: maxPoolSize: 10 waitQueueTimeoutMS: 5000 @@ -36,7 +36,7 @@ operations: count: 1 # wait some more time to ensure thread1 has begun establishing a Connection - name: wait - ms: 100 + ms: 400 # start 2 check out requests. Only one thread should # start creating a Connection and the other one should be # waiting for pendingConnectionCount to be less than maxConnecting, diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.json index 84ddf8fdb..4d9fda1a6 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.json @@ -89,7 +89,8 @@ { "type": "ConnectionCheckOutFailed", "reason": "timeout", - "address": 42 + "address": 42, + "duration": 42 } ], "ignore": [ diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.yml index 383f666ad..3c6fb5da2 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-maxConnecting-timeout.yml @@ -60,6 +60,7 @@ events: - type: ConnectionCheckOutFailed reason: timeout address: 42 + duration: 42 ignore: - ConnectionCreated - ConnectionCheckedIn diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-minPoolSize-connection-maxConnecting.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-minPoolSize-connection-maxConnecting.json new file mode 100644 index 000000000..3b0d43e87 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-minPoolSize-connection-maxConnecting.json @@ -0,0 +1,88 @@ +{ + "version": 1, + "style": "integration", + "description": "threads blocked by maxConnecting check out minPoolSize connections", + "runOn": [ + { + "minServerVersion": "4.4.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 500 + } + }, + "poolOptions": { + "minPoolSize": 2, + "maxPoolSize": 3, + "waitQueueTimeoutMS": 5000 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "start", + "target": "thread2" + }, + { + "name": "wait", + "ms": 200 + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCreated", + "count": 2 + }, + { + "name": "checkOut", + "thread": "thread2" + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckedOut", + "count": 2 + } + ], + "events": [ + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionCreated", + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "address": 42 + } + ], + "ignore": [ + "ConnectionPoolReady", + "ConnectionClosed", + "ConnectionReady", + "ConnectionPoolCreated", + "ConnectionCheckOutStarted" + ] +} diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-minPoolSize-connection-maxConnecting.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-minPoolSize-connection-maxConnecting.yml new file mode 100644 index 000000000..0491c5398 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-minPoolSize-connection-maxConnecting.yml @@ -0,0 +1,63 @@ +version: 1 +style: integration +description: threads blocked by maxConnecting check out minPoolSize connections +runOn: + - + # required for blockConnection in fail point + minServerVersion: "4.4.0" +failPoint: + configureFailPoint: failCommand + mode: "alwaysOn" + data: + failCommands: ["isMaster","hello"] + closeConnection: false + blockConnection: true + blockTimeMS: 500 +poolOptions: + # allows both thread1 and the background thread to start opening connections concurrently + minPoolSize: 2 + # gives opportunity for the checkout in thread2 to open a new connection, which it must not do nonetheless + maxPoolSize: 3 + waitQueueTimeoutMS: 5000 +operations: + - name: ready + # thread1 exists to hold on one of the two permits to open a connection (the other one is initially held by the background thread), + # so that thread2 would be blocked acquiring a permit, which opens an opportunity for it to grab the connection newly opened + # by the background thread instead of opening a third connection. + - name: start + target: thread1 + - name: start + target: thread2 + # Ideally, thread1 should be holding for its permit to open a connection till the end of the test, but we cannot express that. + # This delay emulates the above requirement: + # - it is long enough to make sure that the background thread opens a connection before thread1 releases its permit; + # - it is short enough to allow thread2 to become blocked acquiring a permit to open a connection, and then grab the connection + # opened by the background thread, before the background thread releases its permit. + - name: wait + ms: 200 + - name: checkOut + thread: thread1 + - name: waitForEvent + event: ConnectionCreated + count: 2 + - name: checkOut + thread: thread2 + - name: waitForEvent + event: ConnectionCheckedOut + count: 2 +events: + # exactly 2 connections must be created and checked out + - type: ConnectionCreated + address: 42 + - type: ConnectionCreated + address: 42 + - type: ConnectionCheckedOut + address: 42 + - type: ConnectionCheckedOut + address: 42 +ignore: + - ConnectionPoolReady + - ConnectionClosed + - ConnectionReady + - ConnectionPoolCreated + - ConnectionCheckOutStarted diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.json index 965d56f6d..10b526e0c 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.json @@ -23,6 +23,7 @@ } }, "poolOptions": { + "maxConnecting": 2, "maxPoolSize": 10, "waitQueueTimeoutMS": 5000 }, @@ -72,9 +73,8 @@ "connection": "conn0" }, { - "name": "waitForEvent", - "event": "ConnectionCheckedOut", - "count": 4 + "name": "wait", + "ms": 100 } ], "events": [ @@ -104,14 +104,6 @@ "type": "ConnectionCheckedOut", "connectionId": 1, "address": 42 - }, - { - "type": "ConnectionCheckedOut", - "address": 42 - }, - { - "type": "ConnectionCheckedOut", - "address": 42 } ], "ignore": [ diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.yml index dab6e557d..5e2b5890a 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-checkout-returned-connection-maxConnecting.yml @@ -15,6 +15,7 @@ failPoint: blockConnection: true blockTimeMS: 750 poolOptions: + maxConnecting: 2 maxPoolSize: 10 waitQueueTimeoutMS: 5000 operations: @@ -45,14 +46,13 @@ operations: count: 4 - name: wait ms: 100 - # check original connection back in, so the thread that isn't - # currently establishing will become unblocked. Then wait for - # all threads to complete. + # Check original connection back in, so one of the waiting threads can check + # out the idle connection before the two new connections are ready. - name: checkIn connection: conn0 - - name: waitForEvent - event: ConnectionCheckedOut - count: 4 + # Wait for 100ms to let one of the blocked checkOut operations complete. + - name: wait + ms: 100 events: # main thread checking out a Connection and holding it - type: ConnectionCreated @@ -69,15 +69,13 @@ events: - type: ConnectionCheckedIn connectionId: 1 address: 42 - # remaining thread checking out the returned Connection + # Another thread checks out the returned Connection before the two new + # connections are checked out. - type: ConnectionCheckedOut connectionId: 1 address: 42 - # first two threads finishing Connection establishment - - type: ConnectionCheckedOut - address: 42 - - type: ConnectionCheckedOut - address: 42 + # Events after this can come in different orders but still be valid. + # See DRIVERS-2223 for details. ignore: - ConnectionPoolReady - ConnectionClosed diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.json index d4aef928c..e6077f12a 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.json @@ -59,7 +59,8 @@ }, { "type": "ConnectionCheckedOut", - "address": 42 + "address": 42, + "duration": 42 }, { "type": "ConnectionCheckOutStarted", @@ -76,17 +77,20 @@ { "type": "ConnectionCheckOutFailed", "reason": "connectionError", - "address": 42 + "address": 42, + "duration": 42 }, { "type": "ConnectionCheckOutFailed", "reason": "connectionError", - "address": 42 + "address": 42, + "duration": 42 }, { "type": "ConnectionCheckOutFailed", "reason": "connectionError", - "address": 42 + "address": 42, + "duration": 42 } ], "ignore": [ diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.yml index 521f8ed24..388056f4f 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-clears-waitqueue.yml @@ -38,6 +38,7 @@ events: address: 42 - type: ConnectionCheckedOut address: 42 + duration: 42 - type: ConnectionCheckOutStarted address: 42 - type: ConnectionCheckOutStarted @@ -47,12 +48,15 @@ events: - type: ConnectionCheckOutFailed reason: connectionError address: 42 + duration: 42 - type: ConnectionCheckOutFailed reason: connectionError address: 42 + duration: 42 - type: ConnectionCheckOutFailed reason: connectionError address: 42 + duration: 42 ignore: - ConnectionPoolReady - ConnectionPoolCleared diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-interrupting-pending-connections.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-interrupting-pending-connections.json new file mode 100644 index 000000000..c1fd74632 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-interrupting-pending-connections.json @@ -0,0 +1,77 @@ +{ + "version": 1, + "style": "integration", + "description": "clear with interruptInUseConnections = true closes pending connections", + "runOn": [ + { + "minServerVersion": "4.9.0" + } + ], + "failPoint": { + "configureFailPoint": "failCommand", + "mode": "alwaysOn", + "data": { + "failCommands": [ + "isMaster", + "hello" + ], + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 10000 + } + }, + "poolOptions": { + "minPoolSize": 0 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "start", + "target": "thread1" + }, + { + "name": "checkOut", + "thread": "thread1" + }, + { + "name": "waitForEvent", + "event": "ConnectionCreated", + "count": 1 + }, + { + "name": "clear", + "interruptInUseConnections": true + }, + { + "name": "waitForEvent", + "event": "ConnectionCheckOutFailed", + "count": 1 + } + ], + "events": [ + { + "type": "ConnectionCheckOutStarted" + }, + { + "type": "ConnectionCreated" + }, + { + "type": "ConnectionPoolCleared", + "interruptInUseConnections": true + }, + { + "type": "ConnectionClosed" + }, + { + "type": "ConnectionCheckOutFailed" + } + ], + "ignore": [ + "ConnectionCheckedIn", + "ConnectionCheckedOut", + "ConnectionPoolCreated", + "ConnectionPoolReady" + ] +} diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-interrupting-pending-connections.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-interrupting-pending-connections.yml new file mode 100644 index 000000000..ea0bbc7d4 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-interrupting-pending-connections.yml @@ -0,0 +1,42 @@ +version: 1 +style: integration +description: clear with interruptInUseConnections = true closes pending connections +runOn: + - + minServerVersion: "4.9.0" +failPoint: + configureFailPoint: failCommand + mode: "alwaysOn" + data: + failCommands: ["isMaster","hello"] + closeConnection: false + blockConnection: true + blockTimeMS: 10000 +poolOptions: + minPoolSize: 0 +operations: + - name: ready + - name: start + target: thread1 + - name: checkOut + thread: thread1 + - name: waitForEvent + event: ConnectionCreated + count: 1 + - name: clear + interruptInUseConnections: true + - name: waitForEvent + event: ConnectionCheckOutFailed + count: 1 +events: + - type: ConnectionCheckOutStarted + - type: ConnectionCreated + - type: ConnectionPoolCleared + interruptInUseConnections: true + - type: ConnectionClosed + - type: ConnectionCheckOutFailed +ignore: + - ConnectionCheckedIn + - ConnectionCheckedOut + - ConnectionPoolCreated + - ConnectionPoolReady diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.json index 800c3545a..88c2988ac 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.json @@ -40,7 +40,8 @@ { "type": "ConnectionCheckedOut", "address": 42, - "connectionId": 42 + "connectionId": 42, + "duration": 42 }, { "type": "ConnectionPoolCleared", @@ -49,6 +50,7 @@ { "type": "ConnectionCheckOutFailed", "address": 42, + "duration": 42, "reason": "connectionError" }, { @@ -57,7 +59,8 @@ }, { "type": "ConnectionCheckedOut", - "address": 42 + "address": 42, + "duration": 42 } ], "ignore": [ diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.yml index c783d4d09..93c85bfbe 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-ready.yml @@ -20,15 +20,18 @@ events: - type: ConnectionCheckedOut address: 42 connectionId: 42 + duration: 42 - type: ConnectionPoolCleared address: 42 - type: ConnectionCheckOutFailed address: 42 + duration: 42 reason: connectionError - type: ConnectionPoolReady address: 42 - type: ConnectionCheckedOut address: 42 + duration: 42 ignore: - ConnectionPoolCreated - ConnectionReady diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-schedule-run-interruptInUseConnections-false.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-schedule-run-interruptInUseConnections-false.json new file mode 100644 index 000000000..3d7536951 --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-schedule-run-interruptInUseConnections-false.json @@ -0,0 +1,81 @@ +{ + "version": 1, + "style": "unit", + "description": "Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections = false)", + "poolOptions": { + "backgroundThreadIntervalMS": 10000 + }, + "operations": [ + { + "name": "ready" + }, + { + "name": "checkOut" + }, + { + "name": "checkOut", + "label": "conn" + }, + { + "name": "checkIn", + "connection": "conn" + }, + { + "name": "clear", + "interruptInUseConnections": false + }, + { + "name": "waitForEvent", + "event": "ConnectionPoolCleared", + "count": 1, + "timeout": 1000 + }, + { + "name": "waitForEvent", + "event": "ConnectionClosed", + "count": 1, + "timeout": 1000 + }, + { + "name": "close" + } + ], + "events": [ + { + "type": "ConnectionCheckedOut", + "connectionId": 1, + "address": 42 + }, + { + "type": "ConnectionCheckedOut", + "connectionId": 2, + "address": 42 + }, + { + "type": "ConnectionCheckedIn", + "connectionId": 2, + "address": 42 + }, + { + "type": "ConnectionPoolCleared", + "interruptInUseConnections": false + }, + { + "type": "ConnectionClosed", + "connectionId": 2, + "reason": "stale", + "address": 42 + }, + { + "type": "ConnectionPoolClosed", + "address": 42 + } + ], + "ignore": [ + "ConnectionCreated", + "ConnectionPoolReady", + "ConnectionReady", + "ConnectionCheckOutStarted", + "ConnectionPoolCreated" + ] +} diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-schedule-run-interruptInUseConnections-false.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-schedule-run-interruptInUseConnections-false.yml new file mode 100644 index 000000000..dcaafec8b --- /dev/null +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear-schedule-run-interruptInUseConnections-false.yml @@ -0,0 +1,48 @@ +version: 1 +style: unit +description: Pool clear SHOULD schedule the next background thread run immediately (interruptInUseConnections = false) +poolOptions: + # ensure it's not involved by default + backgroundThreadIntervalMS: 10000 +operations: + - name: ready + - name: checkOut + - name: checkOut + label: conn + - name: checkIn + connection: conn + - name: clear + interruptInUseConnections: false + - name: waitForEvent + event: ConnectionPoolCleared + count: 1 + timeout: 1000 + - name: waitForEvent + event: ConnectionClosed + count: 1 + timeout: 1000 + - name: close +events: + - type: ConnectionCheckedOut + connectionId: 1 + address: 42 + - type: ConnectionCheckedOut + connectionId: 2 + address: 42 + - type: ConnectionCheckedIn + connectionId: 2 + address: 42 + - type: ConnectionPoolCleared + interruptInUseConnections: false + - type: ConnectionClosed + connectionId: 2 + reason: stale + address: 42 + - type: ConnectionPoolClosed + address: 42 +ignore: + - ConnectionCreated + - ConnectionPoolReady + - ConnectionReady + - ConnectionCheckOutStarted + - ConnectionPoolCreated diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.json deleted file mode 100644 index 89da40d83..000000000 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.json +++ /dev/null @@ -1,67 +0,0 @@ -{ - "version": 1, - "style": "unit", - "description": "pool clear halts background minPoolSize establishments (new)", - "poolOptions": { - "minPoolSize": 1 - }, - "operations": [ - { - "name": "ready" - }, - { - "name": "waitForEvent", - "event": "ConnectionReady", - "count": 1 - }, - { - "name": "clear" - }, - { - "name": "wait", - "ms": 200 - }, - { - "name": "ready" - }, - { - "name": "waitForEvent", - "event": "ConnectionReady", - "count": 1 - } - ], - "events": [ - { - "type": "ConnectionPoolReady", - "address": 42 - }, - { - "type": "ConnectionCreated", - "address": 42 - }, - { - "type": "ConnectionReady", - "address": 42 - }, - { - "type": "ConnectionPoolCleared", - "address": 42 - }, - { - "type": "ConnectionPoolReady", - "address": 42 - }, - { - "type": "ConnectionCreated", - "address": 42 - }, - { - "type": "ConnectionReady", - "address": 42 - } - ], - "ignore": [ - "ConnectionPoolCreated", - "ConnectionClosed" - ] -} diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.yml deleted file mode 100644 index 9e162618a..000000000 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-clear.yml +++ /dev/null @@ -1,34 +0,0 @@ -version: 1 -style: unit -description: pool clear halts background minPoolSize establishments (new) -poolOptions: - minPoolSize: 1 -operations: - - name: ready - - name: waitForEvent - event: ConnectionReady - count: 1 - - name: clear - # ensure no connections created after clear - - name: wait - ms: 200 - - name: ready - - name: waitForEvent - event: ConnectionReady -events: - - type: ConnectionPoolReady - address: 42 - - type: ConnectionCreated - address: 42 - - type: ConnectionReady - address: 42 - - type: ConnectionPoolCleared - address: 42 - - type: ConnectionPoolReady - address: 42 - - type: ConnectionCreated - address: 42 - - type: ConnectionReady - address: 42 -ignore: - - ConnectionPoolCreated diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.json index 1c744b850..509b2a235 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.json @@ -49,15 +49,15 @@ "type": "ConnectionCreated", "address": 42 }, + { + "type": "ConnectionPoolCleared", + "address": 42 + }, { "type": "ConnectionClosed", "address": 42, "connectionId": 42, "reason": "error" - }, - { - "type": "ConnectionPoolCleared", - "address": 42 } ], "ignore": [ diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.yml index dd5890b1d..f43c4ee15 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-create-min-size-error.yml @@ -30,11 +30,11 @@ events: address: 42 - type: ConnectionCreated address: 42 + - type: ConnectionPoolCleared + address: 42 - type: ConnectionClosed address: 42 connectionId: 42 reason: error - - type: ConnectionPoolCleared - address: 42 ignore: - ConnectionPoolCreated diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.json index 29ce7326c..a90aed04d 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.json @@ -31,7 +31,8 @@ { "type": "ConnectionCheckOutFailed", "reason": "connectionError", - "address": 42 + "address": 42, + "duration": 42 }, { "type": "ConnectionPoolReady", @@ -47,7 +48,8 @@ }, { "type": "ConnectionCheckedOut", - "address": 42 + "address": 42, + "duration": 42 } ], "ignore": [ diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.yml index 730d4d27b..233209939 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/pool-ready.yml @@ -17,6 +17,7 @@ events: - type: ConnectionCheckOutFailed reason: connectionError address: 42 + duration: 42 - type: ConnectionPoolReady address: 42 - type: ConnectionCheckOutStarted @@ -25,6 +26,7 @@ events: address: 42 - type: ConnectionCheckedOut address: 42 + duration: 42 ignore: - ConnectionPoolCreated - ConnectionReady diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.json b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.json index fbcbdfb04..8bd7c4949 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.json +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.json @@ -48,7 +48,8 @@ { "type": "ConnectionCheckedOut", "connectionId": 42, - "address": 42 + "address": 42, + "duration": 42 }, { "type": "ConnectionCheckOutStarted", @@ -57,7 +58,8 @@ { "type": "ConnectionCheckOutFailed", "reason": "timeout", - "address": 42 + "address": 42, + "duration": 42 }, { "type": "ConnectionCheckedIn", diff --git a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.yml b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.yml index 5433c1489..fdb3b5862 100644 --- a/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.yml +++ b/src/test/spec/json/connection-monitoring-and-pooling/cmap-format/wait-queue-timeout.yml @@ -32,11 +32,13 @@ events: - type: ConnectionCheckedOut connectionId: 42 address: 42 + duration: 42 - type: ConnectionCheckOutStarted address: 42 - type: ConnectionCheckOutFailed reason: timeout address: 42 + duration: 42 - type: ConnectionCheckedIn connectionId: 42 address: 42 diff --git a/src/trace/connection.rs b/src/trace/connection.rs index 27734a2ba..03cd1f13e 100644 --- a/src/trace/connection.rs +++ b/src/trace/connection.rs @@ -143,14 +143,16 @@ impl TracingRepresentation for ConnectionClosedReason { fn tracing_representation(&self) -> &'static str { match self { - ConnectionClosedReason::Stale => "Connection became stale because the pool was cleared", - ConnectionClosedReason::Idle => { + Self::Stale => "Connection became stale because the pool was cleared", + Self::Idle => { "Connection has been available but unused for longer than the configured max idle \ time" } - ConnectionClosedReason::Error => "An error occurred while using the connection", - ConnectionClosedReason::Dropped => "Connection was dropped during an operation", - ConnectionClosedReason::PoolClosed => "Connection pool was closed", + Self::Error => "An error occurred while using the connection", + Self::Dropped => "Connection was dropped during an operation", + Self::PoolClosed => "Connection pool was closed", + #[cfg(test)] + Self::Unset => "Unset", } } } @@ -160,12 +162,10 @@ impl TracingRepresentation for ConnectionCheckoutFailedReason { fn tracing_representation(&self) -> &'static str { match self { - ConnectionCheckoutFailedReason::Timeout => { - "Failed to establish a new connection within connectTimeoutMS" - } - ConnectionCheckoutFailedReason::ConnectionError => { - "An error occurred while trying to establish a new connection" - } + Self::Timeout => "Failed to establish a new connection within connectTimeoutMS", + Self::ConnectionError => "An error occurred while trying to establish a new connection", + #[cfg(test)] + Self::Unset => "Unset", } } }