Skip to content

Commit

Permalink
Merge pull request #1216 from ably/bugfix/clientoptions-queuemessages
Browse files Browse the repository at this point in the history
Fix QueueMessages when set to false
  • Loading branch information
sacOO7 authored May 22, 2023
2 parents c2e368f + d4bddff commit f387e1d
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 44 deletions.
16 changes: 14 additions & 2 deletions src/IO.Ably.Shared/Realtime/Workflows/RealtimeWorkflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ private async Task<RealtimeCommand> ProcessCommandInner(RealtimeCommand command)
if (State.Connection.CurrentStateObject.CanSend || cmd.Force)
{
var sendResult = SendMessage(cmd.ProtocolMessage, cmd.Callback);
if (sendResult.IsFailure && State.Connection.CurrentStateObject.CanQueue)
if (sendResult.IsFailure && State.Connection.CurrentStateObject.CanQueue && Client.Options.QueueMessages)
{
Logger.Debug("Failed to send message. Queuing it.");
State.PendingMessages.Add(new MessageAndCallback(
Expand All @@ -343,7 +343,7 @@ private async Task<RealtimeCommand> ProcessCommandInner(RealtimeCommand command)
Logger));
}
}
else if (State.Connection.CurrentStateObject.CanQueue)
else if (State.Connection.CurrentStateObject.CanQueue && Client.Options.QueueMessages)
{
Logger.Debug("Queuing message");
State.PendingMessages.Add(new MessageAndCallback(
Expand Down Expand Up @@ -554,6 +554,7 @@ ErrorInfo GetErrorInfoFromTransportException(Exception ex, ErrorInfo @default)
return ErrorInfo.ReasonRefused;
}

@default.InnerException = ex;
return @default;
}

Expand Down Expand Up @@ -812,6 +813,17 @@ ErrorInfo TransformIfTokenErrorAndNotRetryable()

SetState(disconnectedState, skipTimer: cmd.SkipAttach);

if (Client.Options.QueueMessages == false)
{
var failAckMessages = new ErrorInfo(
"Clearing message AckQueue(created at connected state) because Options.QueueMessages is false",
ErrorCodes.Disconnected,
HttpStatusCode.BadRequest,
null,
cmd.Error);
ClearAckQueueAndFailMessages(failAckMessages);
}

if (cmd.SkipAttach == false)
{
ConnectionManager.DestroyTransport();
Expand Down
24 changes: 15 additions & 9 deletions src/IO.Ably.Shared/Transport/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,17 +226,23 @@ public void Send(
Logger.Error($"Failed to encode protocol message: {encodingResult.Error.Message}");
}

if (State.CanSend == false && State.CanQueue == false)
if (State.CanSend == false)
{
throw new AblyException($"The current state [{State.State}] does not allow messages to be sent.");
}
if (Options.QueueMessages == false)
{
throw new AblyException(
$"Not queuing messages for [{State.State}] since Options.QueueMessages is set to False.",
ErrorInfo.ReasonUnknown.Code,
HttpStatusCode.BadRequest);
}

if (State.CanSend == false && State.CanQueue && Options.QueueMessages == false)
{
throw new AblyException(
$"Current state is [{State.State}] which supports queuing but Options.QueueMessages is set to False.",
Connection.ConnectionState.DefaultErrorInfo.Code,
HttpStatusCode.ServiceUnavailable);
if (State.CanQueue == false)
{
throw new AblyException(
$"The current connection state [{State.State}] does not allow messages to be sent.",
ErrorInfo.ReasonUnknown.Code,
HttpStatusCode.BadRequest);
}
}

ExecuteCommand(SendMessageCommand.Create(message, callback).TriggeredBy("ConnectionManager.Send()"));
Expand Down
2 changes: 1 addition & 1 deletion src/IO.Ably.Shared/Types/ErrorInfo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace IO.Ably
public class ErrorInfo
{
internal static readonly ErrorInfo ReasonClosed = new ErrorInfo("Connection closed by client", ErrorCodes.NoError);
internal static readonly ErrorInfo ReasonDisconnected = new ErrorInfo("Connection temporarily unavailable", 80003);
internal static readonly ErrorInfo ReasonDisconnected = new ErrorInfo("Connection temporarily unavailable", ErrorCodes.Disconnected);
internal static readonly ErrorInfo ReasonSuspended = new ErrorInfo("Connection unavailable", ErrorCodes.ConnectionSuspended);
internal static readonly ErrorInfo ReasonFailed = new ErrorInfo("Connection failed", ErrorCodes.ConnectionFailed);
internal static readonly ErrorInfo ReasonRefused = new ErrorInfo("Access refused", ErrorCodes.Unauthorized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class TestTransportFactory : ITransportFactory
private readonly Action<TestTransportWrapper> _onWrappedTransportCreated;
internal Action<TestTransportWrapper> OnTransportCreated = delegate { };

internal Action<ProtocolMessage> BeforeMessageSent = delegate { };
internal Action<ProtocolMessage> OnMessageSent = delegate { };

internal Action<ProtocolMessage> BeforeDataProcessed;
Expand All @@ -30,7 +31,8 @@ var transport

transport.BeforeDataProcessed = BeforeDataProcessed;
OnTransportCreated(transport);
transport.MessageSent = OnMessageSent;
transport.BeforeMessageSend = BeforeMessageSent;
transport.AfterMessageSent = OnMessageSent;
_onWrappedTransportCreated?.Invoke(transport);
return transport;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ public void OnTransportEvent(Guid transportId, TransportState state, Exception e

public List<ProtocolMessage.MessageAction> BlockReceiveActions { get; set; } = new List<ProtocolMessage.MessageAction>();

public Action<ProtocolMessage> BeforeMessageSend = delegate { };
public Action<ProtocolMessage> AfterMessageSent = delegate { };

public Action<ProtocolMessage> BeforeDataProcessed;
public Action<ProtocolMessage> AfterDataReceived;
public Action<ProtocolMessage> MessageSent = delegate { };

public TestTransportWrapper(ITransport wrappedTransport, Protocol protocol)
{
Expand Down Expand Up @@ -135,15 +137,16 @@ public void Close(bool suppressClosedEvent = true)

public Result Send(RealtimeTransportData data)
{
BeforeMessageSend(data.Original);

if (BlockSendActions.Contains(data.Original.Action))
{
return Result.Ok();
}

ProtocolMessagesSent.Add(data.Original);
MessageSent(data.Original);
WrappedTransport.Send(data);

AfterMessageSent(data.Original);
return Result.Ok();
}

Expand Down
104 changes: 76 additions & 28 deletions src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,80 @@ public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_WhenChannelIsIn
client.Close();
}

[Theory]
[ProtocolData]
[Trait("spec", "RTP16b")]
[Trait("spec", "RTP19a")]
public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_ShouldFailAckQueueMessages_WhenSendFails(Protocol protocol)
{
var transportFactory = new TestTransportFactory
{
BeforeMessageSent = message =>
{
if (message.Action == ProtocolMessage.MessageAction.Presence)
{
throw new Exception("RTP16b : error while sending message");
}
}
};

var client = await GetRealtimeClient(protocol, (options, settings) =>
{
options.ClientId = "RTP16b";
options.QueueMessages = false;
options.TransportFactory = transportFactory;
});

await client.WaitForState(ConnectionState.Connected);

var channel = GetRandomChannel(client, "RTP16a");
channel.Attach();
await channel.WaitForAttachedState();

var tsc = new TaskCompletionAwaiter();
ErrorInfo err = null;
bool? success = null;
channel.Presence.Enter(client.Connection.State.ToString(), (b, info) =>
{
success = b;
err = info;
tsc.SetCompleted();
});

// Ack Queue has one presence message
channel.RealtimeClient.State.WaitingForAck.Should().HaveCount(1);

// No pending message queue, since QueueMessages is false
channel.RealtimeClient.State.PendingMessages.Should().HaveCount(0);

Presence.QueuedPresenceMessage[] presenceMessages = channel.Presence.PendingPresenceQueue.ToArray();

presenceMessages.Should().HaveCount(0);

await tsc.Task;

// No pending message queue, since QueueMessages=false
channel.RealtimeClient.State.PendingMessages.Should().HaveCount(0);

await WaitFor(500, done =>
{
// Ack cleared after flushing the queue for transport disconnection, because QueueMessages=false
if (channel.RealtimeClient.State.WaitingForAck.Count == 0)
{
done();
}
});

success.Should().HaveValue();
success.Value.Should().BeFalse();
err.Should().NotBeNull();
err.Message.Should().Be("Clearing message AckQueue(created at connected state) because Options.QueueMessages is false");
err.Cause.InnerException.Message.Should().Be("RTP16b : error while sending message");

// clean up
client.Close();
}

[Theory]
[ProtocolData]
[Trait("spec", "RTP16b")]
Expand Down Expand Up @@ -1767,32 +1841,6 @@ await WaitForMultiple(2, partialDone =>
client.Close();
}

[Theory]
[ProtocolData]
[Trait("spec", "RTP16b")]
public async Task ChannelStateCondition_WhenQueueMessagesIsFalse_WhenChannelIsInitializedOrAttaching_MessageAreNotPublished(Protocol protocol)
{
var client = await GetRealtimeClient(protocol, (options, settings) =>
{
options.ClientId = "RTP16b";
options.QueueMessages = false;
});
var channel = GetRandomChannel(client, "RTP16a");

await client.WaitForState(ConnectionState.Connected);
client.Workflow.QueueCommand(SetDisconnectedStateCommand.Create(null));
await client.WaitForState(ConnectionState.Disconnected);

channel.Presence.Enter(client.Connection.State.ToString(), (b, info) => { });

Presence.QueuedPresenceMessage[] presenceMessages = channel.Presence.PendingPresenceQueue.ToArray();

presenceMessages.Should().HaveCount(0);

// clean up
client.Close();
}

[Theory]
[ProtocolData]
[Trait("spec", "RTP16c")]
Expand All @@ -1815,7 +1863,7 @@ async Task TestWithConnectionState(ConnectionState state, RealtimeCommand change

// capture all outbound protocol messages for later inspection
List<ProtocolMessage> messageList = new List<ProtocolMessage>();
client.GetTestTransport().MessageSent = messageList.Add;
client.GetTestTransport().AfterMessageSent = messageList.Add;

// force state
client.Workflow.QueueCommand(changeStateCommand);
Expand Down Expand Up @@ -1870,7 +1918,7 @@ async Task TestWithChannelState(ChannelState state)

// capture all outbound protocol messages for later inspection
List<ProtocolMessage> messageList = new List<ProtocolMessage>();
client.GetTestTransport().MessageSent = messageList.Add;
client.GetTestTransport().AfterMessageSent = messageList.Add;

// force state
channel.SetChannelState(state);
Expand Down

0 comments on commit f387e1d

Please sign in to comment.