diff --git a/src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs b/src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs index fdeeb4601..13572e063 100644 --- a/src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs +++ b/src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs @@ -334,7 +334,7 @@ private async Task ProcessCommandInner(RealtimeCommand command) if (State.Connection.CurrentStateObject.CanSend || cmd.Force) { var sendResult = SendMessage(cmd.ProtocolMessage, cmd.Callback); - if (sendResult.IsFailure && State.Connection.CurrentStateObject.CanQueue) + if (sendResult.IsFailure && State.Connection.CurrentStateObject.CanQueue && Client.Options.QueueMessages) { Logger.Debug("Failed to send message. Queuing it."); State.PendingMessages.Add(new MessageAndCallback( @@ -343,7 +343,7 @@ private async Task ProcessCommandInner(RealtimeCommand command) Logger)); } } - else if (State.Connection.CurrentStateObject.CanQueue) + else if (State.Connection.CurrentStateObject.CanQueue && Client.Options.QueueMessages) { Logger.Debug("Queuing message"); State.PendingMessages.Add(new MessageAndCallback( @@ -554,6 +554,7 @@ ErrorInfo GetErrorInfoFromTransportException(Exception ex, ErrorInfo @default) return ErrorInfo.ReasonRefused; } + @default.InnerException = ex; return @default; } @@ -812,6 +813,17 @@ ErrorInfo TransformIfTokenErrorAndNotRetryable() SetState(disconnectedState, skipTimer: cmd.SkipAttach); + if (Client.Options.QueueMessages == false) + { + var failAckMessages = new ErrorInfo( + "Clearing message AckQueue(created at connected state) because Options.QueueMessages is false", + ErrorCodes.Disconnected, + HttpStatusCode.BadRequest, + null, + cmd.Error); + ClearAckQueueAndFailMessages(failAckMessages); + } + if (cmd.SkipAttach == false) { ConnectionManager.DestroyTransport(); diff --git a/src/IO.Ably.Shared/Transport/ConnectionManager.cs b/src/IO.Ably.Shared/Transport/ConnectionManager.cs index 0375716bf..3795ad799 100644 --- a/src/IO.Ably.Shared/Transport/ConnectionManager.cs +++ b/src/IO.Ably.Shared/Transport/ConnectionManager.cs @@ -226,17 +226,23 @@ public void Send( Logger.Error($"Failed to encode protocol message: {encodingResult.Error.Message}"); } - if (State.CanSend == false && State.CanQueue == false) + if (State.CanSend == false) { - throw new AblyException($"The current state [{State.State}] does not allow messages to be sent."); - } + if (Options.QueueMessages == false) + { + throw new AblyException( + $"Not queuing messages for [{State.State}] since Options.QueueMessages is set to False.", + ErrorInfo.ReasonUnknown.Code, + HttpStatusCode.BadRequest); + } - if (State.CanSend == false && State.CanQueue && Options.QueueMessages == false) - { - throw new AblyException( - $"Current state is [{State.State}] which supports queuing but Options.QueueMessages is set to False.", - Connection.ConnectionState.DefaultErrorInfo.Code, - HttpStatusCode.ServiceUnavailable); + if (State.CanQueue == false) + { + throw new AblyException( + $"The current connection state [{State.State}] does not allow messages to be sent.", + ErrorInfo.ReasonUnknown.Code, + HttpStatusCode.BadRequest); + } } ExecuteCommand(SendMessageCommand.Create(message, callback).TriggeredBy("ConnectionManager.Send()")); diff --git a/src/IO.Ably.Shared/Types/ErrorInfo.cs b/src/IO.Ably.Shared/Types/ErrorInfo.cs index 21891e7ac..83b58692e 100644 --- a/src/IO.Ably.Shared/Types/ErrorInfo.cs +++ b/src/IO.Ably.Shared/Types/ErrorInfo.cs @@ -13,7 +13,7 @@ namespace IO.Ably public class ErrorInfo { internal static readonly ErrorInfo ReasonClosed = new ErrorInfo("Connection closed by client", ErrorCodes.NoError); - internal static readonly ErrorInfo ReasonDisconnected = new ErrorInfo("Connection temporarily unavailable", 80003); + internal static readonly ErrorInfo ReasonDisconnected = new ErrorInfo("Connection temporarily unavailable", ErrorCodes.Disconnected); internal static readonly ErrorInfo ReasonSuspended = new ErrorInfo("Connection unavailable", ErrorCodes.ConnectionSuspended); internal static readonly ErrorInfo ReasonFailed = new ErrorInfo("Connection failed", ErrorCodes.ConnectionFailed); internal static readonly ErrorInfo ReasonRefused = new ErrorInfo("Access refused", ErrorCodes.Unauthorized); diff --git a/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs b/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs index 5f737db43..9485b5dcc 100644 --- a/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs +++ b/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportFactory.cs @@ -9,6 +9,7 @@ public class TestTransportFactory : ITransportFactory private readonly Action _onWrappedTransportCreated; internal Action OnTransportCreated = delegate { }; + internal Action BeforeMessageSent = delegate { }; internal Action OnMessageSent = delegate { }; internal Action BeforeDataProcessed; @@ -30,7 +31,8 @@ var transport transport.BeforeDataProcessed = BeforeDataProcessed; OnTransportCreated(transport); - transport.MessageSent = OnMessageSent; + transport.BeforeMessageSend = BeforeMessageSent; + transport.AfterMessageSent = OnMessageSent; _onWrappedTransportCreated?.Invoke(transport); return transport; } diff --git a/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportWrapper.cs b/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportWrapper.cs index 4205c65aa..7c55886b7 100644 --- a/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportWrapper.cs +++ b/src/IO.Ably.Tests.Shared/Infrastructure/TestTransportWrapper.cs @@ -89,9 +89,11 @@ public void OnTransportEvent(Guid transportId, TransportState state, Exception e public List BlockReceiveActions { get; set; } = new List(); + public Action BeforeMessageSend = delegate { }; + public Action AfterMessageSent = delegate { }; + public Action BeforeDataProcessed; public Action AfterDataReceived; - public Action MessageSent = delegate { }; public TestTransportWrapper(ITransport wrappedTransport, Protocol protocol) { @@ -135,15 +137,16 @@ public void Close(bool suppressClosedEvent = true) public Result Send(RealtimeTransportData data) { + BeforeMessageSend(data.Original); + if (BlockSendActions.Contains(data.Original.Action)) { return Result.Ok(); } ProtocolMessagesSent.Add(data.Original); - MessageSent(data.Original); WrappedTransport.Send(data); - + AfterMessageSent(data.Original); return Result.Ok(); } diff --git a/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs b/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs index b75f9d1b9..7686d7cd2 100644 --- a/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs +++ b/src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs @@ -1662,6 +1662,80 @@ public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_WhenChannelIsIn client.Close(); } + [Theory] + [ProtocolData] + [Trait("spec", "RTP16b")] + [Trait("spec", "RTP19a")] + public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_ShouldFailAckQueueMessages_WhenSendFails(Protocol protocol) + { + var transportFactory = new TestTransportFactory + { + BeforeMessageSent = message => + { + if (message.Action == ProtocolMessage.MessageAction.Presence) + { + throw new Exception("RTP16b : error while sending message"); + } + } + }; + + var client = await GetRealtimeClient(protocol, (options, settings) => + { + options.ClientId = "RTP16b"; + options.QueueMessages = false; + options.TransportFactory = transportFactory; + }); + + await client.WaitForState(ConnectionState.Connected); + + var channel = GetRandomChannel(client, "RTP16a"); + channel.Attach(); + await channel.WaitForAttachedState(); + + var tsc = new TaskCompletionAwaiter(); + ErrorInfo err = null; + bool? success = null; + channel.Presence.Enter(client.Connection.State.ToString(), (b, info) => + { + success = b; + err = info; + tsc.SetCompleted(); + }); + + // Ack Queue has one presence message + channel.RealtimeClient.State.WaitingForAck.Should().HaveCount(1); + + // No pending message queue, since QueueMessages is false + channel.RealtimeClient.State.PendingMessages.Should().HaveCount(0); + + Presence.QueuedPresenceMessage[] presenceMessages = channel.Presence.PendingPresenceQueue.ToArray(); + + presenceMessages.Should().HaveCount(0); + + await tsc.Task; + + // No pending message queue, since QueueMessages=false + channel.RealtimeClient.State.PendingMessages.Should().HaveCount(0); + + await WaitFor(500, done => + { + // Ack cleared after flushing the queue for transport disconnection, because QueueMessages=false + if (channel.RealtimeClient.State.WaitingForAck.Count == 0) + { + done(); + } + }); + + success.Should().HaveValue(); + success.Value.Should().BeFalse(); + err.Should().NotBeNull(); + err.Message.Should().Be("Clearing message AckQueue(created at connected state) because Options.QueueMessages is false"); + err.Cause.InnerException.Message.Should().Be("RTP16b : error while sending message"); + + // clean up + client.Close(); + } + [Theory] [ProtocolData] [Trait("spec", "RTP16b")] @@ -1767,32 +1841,6 @@ await WaitForMultiple(2, partialDone => client.Close(); } - [Theory] - [ProtocolData] - [Trait("spec", "RTP16b")] - public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_WhenChannelIsInitializedOrAttaching_MessageAreNotPublished(Protocol protocol) - { - var client = await GetRealtimeClient(protocol, (options, settings) => - { - options.ClientId = "RTP16b"; - options.QueueMessages = false; - }); - var channel = GetRandomChannel(client, "RTP16a"); - - await client.WaitForState(ConnectionState.Connected); - client.Workflow.QueueCommand(SetDisconnectedStateCommand.Create(null)); - await client.WaitForState(ConnectionState.Disconnected); - - channel.Presence.Enter(client.Connection.State.ToString(), (b, info) => { }); - - Presence.QueuedPresenceMessage[] presenceMessages = channel.Presence.PendingPresenceQueue.ToArray(); - - presenceMessages.Should().HaveCount(0); - - // clean up - client.Close(); - } - [Theory] [ProtocolData] [Trait("spec", "RTP16c")] @@ -1815,7 +1863,7 @@ async Task TestWithConnectionState(ConnectionState state, RealtimeCommand change // capture all outbound protocol messages for later inspection List messageList = new List(); - client.GetTestTransport().MessageSent = messageList.Add; + client.GetTestTransport().AfterMessageSent = messageList.Add; // force state client.Workflow.QueueCommand(changeStateCommand); @@ -1870,7 +1918,7 @@ async Task TestWithChannelState(ChannelState state) // capture all outbound protocol messages for later inspection List messageList = new List(); - client.GetTestTransport().MessageSent = messageList.Add; + client.GetTestTransport().AfterMessageSent = messageList.Add; // force state channel.SetChannelState(state);