Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

brig: make rabbitmq field optional again #4340

Merged
merged 4 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 10 additions & 26 deletions services/brig/src/Brig/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ module Brig.App
rabbitmqChannelLens,
disabledVersionsLens,
enableSFTFederationLens,
readChannel,

-- * App Monad
AppT (..),
Expand Down Expand Up @@ -141,31 +140,23 @@ import OpenSSL.EVP.Digest (Digest, getDigestByName)
import OpenSSL.Session (SSLOption (..))
import OpenSSL.Session qualified as SSL
import Polysemy
import Polysemy.Error (throw)
import Polysemy.Error qualified as Polysemy
import Polysemy.Fail
import Polysemy.Final
import Polysemy.Input (Input, input)
import Polysemy.TinyLog (TinyLog)
import Polysemy.TinyLog qualified as P
import Prometheus
import Ssl.Util
import System.FSNotify qualified as FS
import System.Logger.Class hiding (Settings, settings)
import System.Logger.Class qualified as LC
import System.Logger.Extended qualified as Log
import System.Timeout (timeout)
import Util.Options
import Util.SuffixNamer
import Wire.API.Error (errorToWai)
import Wire.API.Error.Brig qualified as E
import Wire.API.Federation.Error (federationNotImplemented)
import Wire.API.Locale (Locale)
import Wire.API.Routes.Version
import Wire.API.User.Identity
import Wire.EmailSending.SMTP qualified as SMTP
import Wire.EmailSubsystem.Template (TemplateBranding, forLocale)
import Wire.Error (HttpError (StdError))
import Wire.SessionStore
import Wire.SessionStore.Cassandra
import Wire.UserKeyStore
Expand Down Expand Up @@ -211,15 +202,23 @@ data Env = Env
indexEnv :: IndexEnv,
randomPrekeyLocalLock :: Maybe (MVar ()),
keyPackageLocalLock :: MVar (),
rabbitmqChannel :: MVar Q.Channel,
rabbitmqChannel :: Maybe (MVar Q.Channel),
disabledVersions :: Set Version,
enableSFTFederation :: Maybe Bool
}

makeLensesWith (lensRules & lensField .~ suffixNamer) ''Env

validateOptions :: Opts -> IO ()
validateOptions o =
case (o.federatorInternal, o.rabbitmq) of
(Nothing, Just _) -> error "RabbitMQ config is specified and federator is not, please specify both or none"
(Just _, Nothing) -> error "Federator is specified and RabbitMQ config is not, please specify both or none"
_ -> pure ()

newEnv :: Opts -> IO Env
newEnv opts = do
validateOptions opts
Just md5 <- getDigestByName "MD5"
Just sha256 <- getDigestByName "SHA256"
Just sha512 <- getDigestByName "SHA512"
Expand Down Expand Up @@ -262,7 +261,7 @@ newEnv opts = do
Log.info lgr $ Log.msg (Log.val "randomPrekeys: not active; using dynamoDB instead.")
pure Nothing
kpLock <- newMVar ()
rabbitChan <- Q.mkRabbitMqChannelMVar lgr opts.rabbitmq
rabbitChan <- traverse (Q.mkRabbitMqChannelMVar lgr) opts.rabbitmq
let allDisabledVersions = foldMap expandVersionExp opts.settings.disabledAPIVersions
idxEnv <- mkIndexEnv opts.elasticsearch lgr (Opt.galley opts) mgr
pure $!
Expand Down Expand Up @@ -629,21 +628,6 @@ instance (MonadIndexIO (AppT r)) => MonadIndexIO (ExceptT err (AppT r)) where
instance HasRequestId (AppT r) where
getRequestId = asks (.requestId)

readChannel ::
( Member (Embed IO) r,
Member (Polysemy.Error HttpError) r,
Member TinyLog r
) =>
MVar Q.Channel ->
Sem r Q.Channel
readChannel chanMVar = do
mChan <- liftIO $ timeout 1_000_000 $ readMVar chanMVar
maybe onNothing pure mChan
where
onNothing = do
P.err $ Log.msg @Text "failed to connect to RabbitMQ"
throw $ StdError $ errorToWai @'E.NotificationQueueConnectionError

-------------------------------------------------------------------------------
-- Ad hoc interpreters

Expand Down
3 changes: 0 additions & 3 deletions services/brig/src/Brig/CanonicalInterpreter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import Control.Monad.Catch (throwM)
import Data.Qualified (Local, toLocalUnsafe)
import Data.Time.Clock (UTCTime, getCurrentTime)
import Imports
import Network.AMQP
import Polysemy
import Polysemy.Async
import Polysemy.Conc
Expand Down Expand Up @@ -116,7 +115,6 @@ type BrigLowerLevelEffects =
DeleteQueue,
Wire.Events.Events,
NotificationSubsystem,
Input Channel,
Error UserSubsystemError,
Error TeamInvitationSubsystemError,
Error AuthenticationSubsystemError,
Expand Down Expand Up @@ -278,7 +276,6 @@ runBrigToIO e (AppT ma) = do
. mapError authenticationSubsystemErrorToHttpError
. mapError teamInvitationErrorToHttpError
. mapError userSubsystemErrorToHttpError
. runInputSem (readChannel e.rabbitmqChannel)
. runNotificationSubsystemGundeck (defaultNotificationSubsystemConfig e.requestId)
. runEvents
. runDeleteQueue e.internalEvents
Expand Down
14 changes: 11 additions & 3 deletions services/brig/src/Brig/Federation/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,17 @@ notifyUserDeleted self remotes = do
let remoteConnections = tUnqualified remotes
let notif = UserDeletedConnectionsNotification (tUnqualified self) remoteConnections
remoteDomain = tDomain remotes
chanVar <- asks (.rabbitmqChannel)
enqueueNotification (tDomain self) remoteDomain Q.Persistent chanVar $
fedQueueClient @'OnUserDeletedConnectionsTag notif

asks (.rabbitmqChannel) >>= \case
Just chanVar -> do
enqueueNotification (tDomain self) remoteDomain Q.Persistent chanVar $
fedQueueClient @'OnUserDeletedConnectionsTag notif
Nothing ->
Log.err $
Log.msg ("Federation error while notifying remote backends of a user deletion." :: ByteString)
. Log.field "user_id" (show self)
. Log.field "domain" (domainText remoteDomain)
. Log.field "error" (show FederationNotConfigured)

-- | Enqueues notifications in RabbitMQ. Retries 3 times with a delay of 1s.
enqueueNotification :: (MonadIO m, MonadMask m, Log.MonadLogger m, MonadReader Env m) => Domain -> Domain -> Q.DeliveryMode -> MVar Q.Channel -> FedQueueClient c () -> m ()
Expand Down
2 changes: 1 addition & 1 deletion services/brig/src/Brig/Options.hs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ data Opts = Opts
-- | SFT Federation
multiSFT :: !(Maybe Bool),
-- | RabbitMQ settings, required when federation is enabled.
rabbitmq :: !AmqpEndpoint,
rabbitmq :: !(Maybe AmqpEndpoint),
-- | AWS settings
aws :: !AWSOpts,
-- | Enable Random Prekey Strategy
Expand Down
Loading