Skip to content

Commit

Permalink
Fix clippy warnings (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcelha authored Oct 6, 2022
1 parent 279dc47 commit fdb6fe8
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 33 deletions.
18 changes: 8 additions & 10 deletions src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ where
match &self.active_servers {
None => return Err(ClientError::NoServersAvailable),
Some(active_servers) => {
if active_servers.len() == 0 {
if active_servers.is_empty() {
return Err(ClientError::NoServersAvailable);
}

Expand All @@ -180,14 +180,12 @@ where

// If there are no stream for the address, create a new one
// This is on a nested block so it controlls the guards in `self.stream`
{
if let None = self.streams.get(address) {
let stream = TcpStream::connect(&address).await.map_err(|x| {
ClientError::Unknown(format!("(Address {}) - {:?}", address, x))
})?;
let stream = Framed::new(stream, LengthDelimitedCodec::new());
self.streams.insert(address.to_string(), stream);
}
if self.streams.get(address).is_none() {
let stream = TcpStream::connect(&address)
.await
.map_err(|x| ClientError::Unknown(format!("(Address {}) - {:?}", address, x)))?;
let stream = Framed::new(stream, LengthDelimitedCodec::new());
self.streams.insert(address.to_string(), stream);
};

self.streams
Expand Down Expand Up @@ -454,7 +452,7 @@ mod test {
#[tokio::test]
async fn test_service_clone() {
let client = client_with_members().await;
let _ = client.clone();
let _ = client;
}

// TODO integration tests
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/membership_protocol/peer_to_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ where
local_storage.push(member.clone()).await?;
let mut client = Client::new(local_storage);

if let Err(_) = client.ping().await {
if (client.ping().await).is_err() {
self.members_storage()
.notify_failure(member.ip(), member.port())
.await?;
Expand Down
14 changes: 7 additions & 7 deletions src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use thiserror::Error;

use crate::protocol::ClientError;

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum HandlerError {
#[error("object not found")]
ObjectNotFound,
Expand All @@ -22,19 +22,19 @@ pub enum HandlerError {
Unknown,
}

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum ServiceObjectLifeCycleError {
#[error("unknown error")]
Unknown,
}

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum ClientBuilderError {
#[error("no MembersStorage provided")]
NoMembersStorage,
}

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum ServerBuilderError {
#[error("no MembersStorage provided")]
NoMembersStorage,
Expand All @@ -46,7 +46,7 @@ pub enum ServerBuilderError {
Unknown(String),
}

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum ServerError {
#[error("bind")]
Bind(String),
Expand All @@ -61,7 +61,7 @@ pub enum ServerError {
Run,
}

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum MembershipError {
#[error("upstream error")]
Upstream(String),
Expand All @@ -76,7 +76,7 @@ impl From<sqlx::Error> for MembershipError {
}
}

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum ClusterProviderServeError {
#[error("can't communicate with membership provider's storage")]
MembershipProviderError(String),
Expand Down
4 changes: 2 additions & 2 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl From<HandlerError> for ResponseEnvelope {
}
}

#[derive(Debug, PartialEq, Error, Serialize, Deserialize)]
#[derive(Debug, PartialEq, Eq, Error, Serialize, Deserialize)]
pub enum ResponseError {
#[error("ServiceObject is in another server")]
Redirect(String),
Expand Down Expand Up @@ -78,7 +78,7 @@ impl From<HandlerError> for ResponseError {
}
}

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug, PartialEq, Eq)]
pub enum ClientError {
#[error("server response error")]
ResponseError(ResponseError),
Expand Down
22 changes: 11 additions & 11 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where

pub fn build(self) -> Result<Server<S, C, P>, ServerBuilderError> {
let address = self.address;
let registry = self.registry.unwrap_or_else(|| Registry::new());
let registry = self.registry.unwrap_or_else(Registry::new);
let cluster_provider = self
.cluster_provider
.ok_or(ServerBuilderError::NoMembersStorage)?;
Expand Down Expand Up @@ -172,7 +172,7 @@ where
.max_size(self.client_pool_size)
.build(pool_manager)
.await
.map_err(|err| ServerError::ClientBuilder(err))?;
.map_err(ServerError::ClientBuilder)?;
self.app_data(client_pool);

let (admin_sender, admin_receiver) = mpsc::unbounded_channel::<AdminCommands>();
Expand All @@ -183,7 +183,7 @@ where
accept_result?;
}
cluster_provider_serve_result = self.cluster_provider.serve(&self.address) => {
cluster_provider_serve_result.map_err(|e| ServerError::ClusterProviderServe(e))?;
cluster_provider_serve_result.map_err(ServerError::ClusterProviderServe)?;
}
_ = self.consume_admin_commands(admin_receiver) => {
println!("admin command serve finished first");
Expand Down Expand Up @@ -228,15 +228,15 @@ where
}
}

impl<S: MembersStorage, C: ClusterProvider<S>, P: ObjectPlacementProvider> Into<Service<S, P>>
for &Server<S, C, P>
impl<S: MembersStorage, C: ClusterProvider<S>, P: ObjectPlacementProvider> From<&Server<S, C, P>>
for Service<S, P>
{
fn into(self) -> Service<S, P> {
let address = self.address.clone();
let registry = self.registry.clone();
let object_placement_provider = self.object_placement_provider.clone();
let app_data = self.app_data.clone();
let members_storage = self.cluster_provider.members_storage().clone();
fn from(server: &Server<S, C, P>) -> Self {
let address = server.address.clone();
let registry = server.registry.clone();
let object_placement_provider = server.object_placement_provider.clone();
let app_data = server.app_data.clone();
let members_storage = server.cluster_provider.members_storage().clone();

Service {
address,
Expand Down
2 changes: 1 addition & 1 deletion src/service_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub trait ServiceObject:
}

async fn shutdown(&mut self, app_data: &AppData) -> Result<(), ServiceObjectLifeCycleError> {
self.before_shutdown(&app_data).await?;
self.before_shutdown(app_data).await?;
let admin_sender = app_data.get::<AdminSender>().clone();
admin_sender
.send(AdminCommands::Shutdown(
Expand Down
2 changes: 1 addition & 1 deletion src/state/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use serde::Serialize;
/// `LocalState` is a state provider for testing purposes
///
/// It stores all the serialized states into a single `DashMap`
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct LocalState {
data: DashMap<(String, String, String), String>,
}
Expand Down

0 comments on commit fdb6fe8

Please sign in to comment.