Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/1.0.1 #224

Merged
merged 8 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Messaging/Common/ExportRequestType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace Monai.Deploy.Messaging.Common
{
public enum ExportRequestType
Expand Down
6 changes: 6 additions & 0 deletions src/Messaging/Events/WorkflowRequestEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,28 +67,34 @@ public override bool Equals(object? obj)
DataService.Equals(dataOrigin.DataService);
}
}

public enum DataService
{
/// <summary>
/// Unknown data service
/// </summary>
Unknown,

/// <summary>
/// Data received via DIMSE services
/// </summary>
DIMSE,

/// <summary>
/// Data received via DICOMWeb services
/// </summary>
DicomWeb,

/// <summary>
/// Data received via FHIR services
/// </summary>
FHIR,

/// <summary>
/// Data received via HL7 services
/// </summary>
HL7,

/// <summary>
/// Data received via ACR API call
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/Messaging/Tests/IServiceCollectionExtensionsTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,11 @@ internal class GoodSubscriberService : IMessageBrokerSubscriberService
public string Name => throw new NotImplementedException();

#pragma warning disable CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used

// event used by users of this library
public event ConnectionErrorHandler? OnConnectionError;
#pragma warning restore CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used

#pragma warning restore CS0067 // The event 'GoodSubscriberService.OnConnectionError' is never used

public void Acknowledge(MessageBase message) => throw new NotImplementedException();
Expand Down
1 change: 0 additions & 1 deletion src/Messaging/Tests/TaskCallbackEventTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

namespace Monai.Deploy.Messaging.Tests
{

public class TaskCallbackEventTest
{
[Fact(DisplayName = "Validation throws on error")]
Expand Down
6 changes: 1 addition & 5 deletions src/Messaging/Tests/WorkflowRequestMessageTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,25 @@ public void ConvertsJsonMessageToMessage()
{
DataService = DataService.DicomWeb,
Source = Guid.NewGuid().ToString(),
Destination = Guid.NewGuid().ToString(),

Destination = Guid.NewGuid().ToString(),
});
input.DataOrigins.Add(new DataOrigin
{
DataService = DataService.FHIR,
Source = Guid.NewGuid().ToString(),
Destination = Guid.NewGuid().ToString(),

});
input.DataOrigins.Add(new DataOrigin
{
DataService = DataService.DIMSE,
Source = Guid.NewGuid().ToString(),
Destination = Guid.NewGuid().ToString(),

});
input.DataOrigins.Add(new DataOrigin
{
DataService = DataService.HL7,
Source = Guid.NewGuid().ToString(),
Destination = Guid.NewGuid().ToString(),

});

var files = new List<BlockStorageInfo>()
Expand Down
20 changes: 10 additions & 10 deletions src/Plugins/RabbitMQ/Logger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static partial class Logger
{
internal static readonly string LoggingScopeMessageApplication = "Message ID={0}. Application ID={1}.";

[LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message to {endpoint}/{virtualHost}. Exchange={exchange}, Routing Key={topic}.")]
[LoggerMessage(EventId = 10000, Level = LogLevel.Information, Message = "Publishing message to {endpoint}/{virtualHost}. Exchange: {exchange}, Topic: {topic}.")]
public static partial void PublshingRabbitMQ(this ILogger logger, string endpoint, string virtualHost, string exchange, string topic);

[LoggerMessage(EventId = 10001, Level = LogLevel.Debug, Message = "{ServiceName} connecting to {endpoint}/{virtualHost}.")]
Expand All @@ -32,32 +32,32 @@ public static partial class Logger
[LoggerMessage(EventId = 10002, Level = LogLevel.Information, Message = "Message received from queue {queue} for {topic}.")]
public static partial void MessageReceivedFromQueue(this ILogger logger, string queue, string topic);

[LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}/{virtualHost}. Exchange={exchange}, Queue={queue}, Routing Key={topic}.")]
[LoggerMessage(EventId = 10003, Level = LogLevel.Information, Message = "Listening for messages from {endpoint}/{virtualHost}. Exchange: {exchange}, Queue: {queue}, Topic: {topic}.")]
public static partial void SubscribeToRabbitMQQueue(this ILogger logger, string endpoint, string virtualHost, string exchange, string queue, string topic);

[LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgement for message {messageId}.")]
[LoggerMessage(EventId = 10004, Level = LogLevel.Information, Message = "Sending message acknowledgment. MessageId: {messageId}.")]
public static partial void SendingAcknowledgement(this ILogger logger, string messageId);

[LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Ackowledge sent for message {messageId}. Event Duration {durationMilliseconds}")]
[LoggerMessage(EventId = 10005, Level = LogLevel.Information, Message = "Acknowledgment sent. Message ID: {messageId}. Event Duration: {durationMilliseconds}")]
public static partial void AcknowledgementSent(this ILogger logger, string messageId, double durationMilliseconds);

[LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message {messageId} and requeuing.")]
[LoggerMessage(EventId = 10006, Level = LogLevel.Information, Message = "Sending nack message. Message ID: {messageId} and requeuing.")]
public static partial void SendingNAcknowledgement(this ILogger logger, string messageId);

[LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent for message {messageId}, requeue={requeue}.")]
[LoggerMessage(EventId = 10007, Level = LogLevel.Information, Message = "Nack message sent. Message ID: {messageId}. Requeue: @{requeue}.")]
public static partial void NAcknowledgementSent(this ILogger logger, string messageId, bool requeue);

[LoggerMessage(EventId = 10008, Level = LogLevel.Information, Message = "Closing connections.")]
public static partial void ClosingConnections(this ILogger logger);

[LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue={queueName}, Topic={topic}, Message ID={messageId}.")]
[LoggerMessage(EventId = 10009, Level = LogLevel.Error, Message = "Invalid or corrupted message received: Queue name: {queueName}. Topic: {topic}. Message ID: {messageId}.")]
public static partial void InvalidMessage(this ILogger logger, string queueName, string topic, string messageId, Exception ex);

[LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue={queueName}, Topic={topic}, Message ID={messageId}.")]
[LoggerMessage(EventId = 10010, Level = LogLevel.Error, Message = "Exception not handled by the subscriber's callback function: Queue name: {queueName}. Topic: {topic}. Message ID: {messageId}.")]
public static partial void ErrorNotHandledByCallback(this ILogger logger, string queueName, string topic, string messageId, Exception ex);

[LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Exception thrown: Message ID={messageId}.")]
public static partial void Exception(this ILogger logger, string messageId, Exception ex);
[LoggerMessage(EventId = 10011, Level = LogLevel.Error, Message = "Error requeuing. Message ID: {messageId}.")]
public static partial void ErrorRequeue(this ILogger logger, string messageId, Exception ex);

[LoggerMessage(EventId = 10012, Level = LogLevel.Error, Message = "Health check failure.")]
public static partial void HealthCheckError(this ILogger logger, Exception ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ public Task Publish(string topic, Message message)

using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["MessageId"] = message.MessageId,
["ApplicationId"] = message.ApplicationId,
["CorrelationId"] = message.CorrelationId
["@messageId"] = message.MessageId,
["@applicationId"] = message.ApplicationId,
["@correlationId"] = message.CorrelationId
});

_logger.PublshingRabbitMQ(_endpoint, _virtualHost, _exchange, topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,10 @@ private EventingBasicConsumer CreateConsumer(Func<MessageReceivedEventArgs, Task
{
using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["MessageId"] = eventArgs.BasicProperties.MessageId,
["ApplicationId"] = eventArgs.BasicProperties.AppId,
["CorrelationId"] = eventArgs.BasicProperties.CorrelationId,
["RecievedTime"] = DateTime.UtcNow
["@messageId"] = eventArgs.BasicProperties.MessageId,
["@applicationId"] = eventArgs.BasicProperties.AppId,
["@correlationId"] = eventArgs.BasicProperties.CorrelationId,
["@recievedTime"] = DateTime.UtcNow
});

_logger.MessageReceivedFromQueue(queueDeclareResult.QueueName, eventArgs.RoutingKey);
Expand Down Expand Up @@ -274,7 +274,7 @@ public void Acknowledge(MessageBase message)

using var loggingScope = _logger.BeginScope(new LoggingDataDictionary<string, object>
{
["EventDuration"] = eventDuration
["@eventDuration"] = eventDuration
});
_logger.AcknowledgementSent(message.MessageId, eventDuration);
}
Expand All @@ -291,7 +291,7 @@ public async Task RequeueWithDelay(MessageBase message)
}
catch (Exception e)
{
_logger.Exception($"Requeue delay failed.", e);
_logger.ErrorRequeue($"Requeue delay failed.", e);
Reject(message, true);
}
}
Expand Down Expand Up @@ -379,6 +379,7 @@ private static MessageReceivedEventArgs CreateMessage(string topic, BasicDeliver
deliveryTag: eventArgs.DeliveryTag.ToString(CultureInfo.InvariantCulture)),
CancellationToken.None);
}

private (bool exists, bool accessable) QueueExists(string queueName)
{
var testChannel = _rabbitMqConnectionFactory.MakeTempChannel(ChannelType.Subscriber, _endpoint, _username, _password, _virtualHost, _useSSL, _portNumber);
Expand Down
1 change: 0 additions & 1 deletion src/Plugins/RabbitMQ/Tests/Integration/ReliabilityTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ public void GivenMessages_WhenPublished_SubscribeShallReceiveAndAckMessages()
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
}


Parallel.For(0, MessageCount, new ParallelOptions { MaxDegreeOfParallelism = MaxDegreeOfParallelism }, i =>
{
var guid = Guid.NewGuid().ToString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ await Task.Run(() =>
s_messageReceived = args.Message;
service.Acknowledge(args.Message);
}).ConfigureAwait(false);

});
};
Assert.Throws<OperationInterruptedException>(asyncAct);
Expand Down
Loading