Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StatusCheckMessage does not use specified timeout #128

Open
tilmanginzel opened this issue Nov 23, 2017 · 3 comments
Open

StatusCheckMessage does not use specified timeout #128

tilmanginzel opened this issue Nov 23, 2017 · 3 comments

Comments

@tilmanginzel
Copy link

tilmanginzel commented Nov 23, 2017

Hi,

we use the StatusCheckMessage to verify if a connection to rabbitmq is established. It looks like the specified timeout is never used, so the status check never runs into the timeout. Apparently the private method withTimeout is unused.

This example code never fails, even if rabbitmq is not available:

def health(): Future[Done] = {
  val statusCheckMessage = new StatusCheckMessage(2 seconds)
  rabbitControl ! statusCheckMessage
  
  statusCheckMessage.okay map {
    _ => Done
  } recover {
    case t: Throwable => throw t
  }
}

Workaround: We created our own withTimeout function and wrapped it around the Future.

def withTimeout[T](duration: FiniteDuration)(f: => Future[T]): Future[T] = {
  val timer = akka.pattern.after(duration, using = system.scheduler) {
    Future.failed(new scala.concurrent.TimeoutException(s"Response not received after $duration."))
  }
  Future.firstCompletedOf(timer :: f :: Nil)
}

def health(): Future[Done] = {
  val statusCheckMessage = new StatusCheckMessage()
  rabbitControl ! statusCheckMessage

  withTimeout(2 seconds) {
    statusCheckMessage.okay map {
      _ => Done
    } recover {
      case t: Throwable => throw t
    }
  }
}

Cheers :)

@afrancoc2000
Copy link

This looks great! thanks I'm going to try it :)

@afrancoc2000
Copy link

afrancoc2000 commented Oct 1, 2018

Hi,

I tried it with a timers.startPeriodicTimer() method to check the health every minute, the weird thing is that even when I see all consumers went down and the producer gets a timeout with every message it sends I'm not getting any error and the future always returns "Done".

I'm thinking on sending a message instead of doing the status check, using ask and kill the rabbit connection if I get a timeout, hoping the supervisor will restart the connection.

What I don't like is that I will be getting a lot of messages I don't need in my queue.

Have you had a similar problem before?

Its like the connection is alive but is useless

@afrancoc2000
Copy link

I'm thinking on something like this:

def checkHealth(rabbitControl: ActorRef): Future[ConfirmResponse] = {
  val keyPublisher = Publisher.exchange(RABBIT_EXCHANGE)
  val received: Future[ConfirmResponse] = (
      rabbitControl ? Message(s"$address ${rabbitControl.path.name} is healthy", keyPublisher)
      ).mapTo[ConfirmResponse]

  received.onFailure{
    case e =>
      LogHelper.logger.info(s"$address ${rabbitControl.path.name} is not healthy: ${e.getMessage}")
      rabbitControl ! Kill
  }

  received
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants