Skip to content

Commit

Permalink
Fixes #109
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeyzimarev committed Jul 8, 2022
1 parent 82a514b commit e61c131
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 37 deletions.
11 changes: 3 additions & 8 deletions src/Core/src/Eventuous.Producers/BaseProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ namespace Eventuous.Producers;

public abstract class BaseProducer<TProduceOptions> : BaseProducer, IEventProducer<TProduceOptions>
where TProduceOptions : class {
protected BaseProducer(bool requiresInit, ProducerTracingOptions? tracingOptions = null)
: base(requiresInit, tracingOptions) { }
protected BaseProducer(ProducerTracingOptions? tracingOptions = null)
: base(tracingOptions) { }

protected abstract Task ProduceMessages(
StreamName stream,
Expand Down Expand Up @@ -47,10 +47,9 @@ protected override Task ProduceMessages(
public abstract class BaseProducer : IEventProducer {
protected KeyValuePair<string, object?>[] DefaultTags { get; }

protected BaseProducer(bool requiresInit, ProducerTracingOptions? tracingOptions = null) {
protected BaseProducer(ProducerTracingOptions? tracingOptions = null) {
var options = tracingOptions ?? new ProducerTracingOptions();
DefaultTags = options.AllTags.Concat(EventuousDiagnostics.Tags).ToArray();
if (!requiresInit) ReadyNow();
}

protected abstract Task ProduceMessages(
Expand Down Expand Up @@ -85,8 +84,4 @@ public async Task Produce(
return (act?.Start(), new[] { producedMessage });
}
}

public bool Ready { get; private set; }

protected void ReadyNow() => Ready = true;
}
8 changes: 6 additions & 2 deletions src/Core/src/Eventuous.Producers/IEventProducer.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
using Microsoft.Extensions.Hosting;

namespace Eventuous.Producers;

[PublicAPI]
Expand All @@ -15,8 +17,6 @@ Task Produce(
IEnumerable<ProducedMessage> messages,
CancellationToken cancellationToken = default
);

bool Ready { get; }
}

[PublicAPI]
Expand All @@ -36,4 +36,8 @@ Task Produce(
TProduceOptions? options,
CancellationToken cancellationToken = default
);
}

public interface IHostedProducer : IHostedService {
bool Ready { get; }
}
3 changes: 3 additions & 0 deletions src/EventStore/src/Eventuous.EventStore/EsdbEventStore.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

// ReSharper disable CoVariantArrayConversion

using System.Runtime.CompilerServices;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public EventStoreProducer(
EventStoreClient eventStoreClient,
IEventSerializer? serializer = null,
IMetadataSerializer? metaSerializer = null
) : base(false, TracingOptions) {
) : base(TracingOptions) {
_client = Ensure.NotNull(eventStoreClient);
_serializer = serializer ?? DefaultEventSerializer.Instance;
_metaSerializer = metaSerializer ?? DefaultMetadataSerializer.Instance;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

// ReSharper disable UnusedAutoPropertyAccessor.Global

namespace Eventuous.ElasticSearch.Index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace Eventuous.ElasticSearch.Producers;
public class ElasticProducer : BaseProducer<ElasticProduceOptions> {
readonly IElasticClient _elasticClient;

public ElasticProducer(IElasticClient elasticClient) : base(false, TracingOptions) => _elasticClient = elasticClient;
public ElasticProducer(IElasticClient elasticClient) : base(TracingOptions) => _elasticClient = elasticClient;

static readonly ProducerTracingOptions TracingOptions = new() {
MessagingSystem = "elasticsearch",
Expand Down
18 changes: 10 additions & 8 deletions src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,7 @@ public async Task Produce(
T? options,
CancellationToken cancellationToken = default
) {
while (!_inner.Ready) {
EventuousEventSource.Log.Warn("Producer not ready, waiting...");
await Task.Delay(1000, cancellationToken);
}

await WaitForInner(_inner, cancellationToken);
await _inner.Produce(stream, messages, options, cancellationToken);
}
}
Expand All @@ -32,10 +28,16 @@ public async Task Produce(
IEnumerable<ProducedMessage> messages,
CancellationToken cancellationToken = default
) {
while (!_inner.Ready) await Task.Delay(10, cancellationToken);

await WaitForInner(_inner, cancellationToken);
await _inner.Produce(stream, messages, cancellationToken);
}

public bool Ready => true;
protected static async ValueTask WaitForInner(IEventProducer inner, CancellationToken cancellationToken) {
if (inner is not IHostedProducer hosted) return;

while (!hosted.Ready) {
EventuousEventSource.Log.Warn("Producer not ready, waiting...");
await Task.Delay(1000, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,6 @@ protected override Task ProduceMessages(
return Task.CompletedTask;
}

public TestProducer() : base(false) { }
public TestProducer() : base() { }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected override Task ProduceMessages(
return Task.CompletedTask;
}

public TestProducer() : base(false) { }
public TestProducer() : base() { }
}

record TestProduceOptions { }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using Eventuous.Diagnostics;
// Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Eventuous.Producers;
using Eventuous.Producers.Diagnostics;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using static Google.Cloud.PubSub.V1.PublisherClient;

Expand All @@ -13,7 +14,7 @@ namespace Eventuous.GooglePubSub.Producers;
/// Producer for Google PubSub
/// </summary>
[PublicAPI]
public class GooglePubSubProducer : BaseProducer<PubSubProduceOptions>, IHostedService {
public class GooglePubSubProducer : BaseProducer<PubSubProduceOptions>, IHostedProducer {
readonly IEventSerializer _serializer;
readonly ClientCache _clientCache;
readonly PubSubAttributes _attributes;
Expand Down Expand Up @@ -47,7 +48,7 @@ public GooglePubSubProducer(
public GooglePubSubProducer(
PubSubProducerOptions options,
IEventSerializer? serializer = null
) : base(true, TracingOptions) {
) : base(TracingOptions) {
Ensure.NotNull(options);

_serializer = serializer ?? DefaultEventSerializer.Instance;
Expand All @@ -66,7 +67,7 @@ public GooglePubSubProducer(
) : this(options.Value, serializer) { }

public Task StartAsync(CancellationToken cancellationToken = default) {
ReadyNow();
Ready = true;
return Task.CompletedTask;
}

Expand Down Expand Up @@ -108,9 +109,9 @@ PubsubMessage CreateMessage(ProducedMessage message, PubSubProduceOptions? optio
OrderingKey = options?.OrderingKey ?? "",
Attributes = {
{ _attributes.ContentType, contentType },
{ _attributes.EventType, eventType }
{ _attributes.EventType, eventType },
{ _attributes.MessageId, message.MessageId.ToString() }
},
MessageId = message.MessageId.ToString()
};

if (message.Metadata != null) {
Expand All @@ -131,4 +132,6 @@ PubsubMessage CreateMessage(ProducedMessage message, PubSubProduceOptions? optio

return psm;
}

public bool Ready { get; private set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ namespace Eventuous.GooglePubSub;
public class PubSubAttributes {
public string EventType { get; set; } = "eventType";
public string ContentType { get; set; } = "contentType";
public string MessageId { get; set; } = "messageId";
}
11 changes: 8 additions & 3 deletions src/Kafka/src/Eventuous.Kafka/Producers/KafkaBasicProducer.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using Confluent.Kafka;
using Eventuous.Producers;
using Eventuous.Producers.Diagnostics;
Expand All @@ -9,13 +12,13 @@ namespace Eventuous.Kafka.Producers;
/// Produces messages with byte[] payload without using the schema registry. The message type is specified in the
/// headers, so the type mapping is required.
/// </summary>
public class KafkaBasicProducer : BaseProducer<KafkaProduceOptions>, IHostedService {
public class KafkaBasicProducer : BaseProducer<KafkaProduceOptions>, IHostedProducer {
readonly IProducer<string, byte[]> _producerWithKey;
readonly IProducer<Null, byte[]> _producerWithoutKey;
readonly IEventSerializer _serializer;

public KafkaBasicProducer(KafkaProducerOptions options, IEventSerializer? serializer = null) :
base(true, TracingOptions) {
base(TracingOptions) {
_producerWithKey = new ProducerBuilder<string, byte[]>(options.ProducerConfig).Build();
_producerWithoutKey = new DependentProducerBuilder<Null, byte[]>(_producerWithKey.Handle).Build();
_serializer = serializer ?? DefaultEventSerializer.Instance;
Expand Down Expand Up @@ -91,7 +94,7 @@ void Report(Error error) {
}

public Task StartAsync(CancellationToken cancellationToken) {
ReadyNow();
Ready = true;
return Task.CompletedTask;
}

Expand All @@ -103,4 +106,6 @@ public async Task StopAsync(CancellationToken cancellationToken) {
await Task.Delay(100, cancellationToken).NoContext();
}
}

public bool Ready { get; private set; }
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (C) 2021-2022 Ubiquitous AS. All rights reserved
// Licensed under the Apache License, Version 2.0.

using System.Diagnostics;
using Eventuous.Diagnostics;
using Eventuous.Producers;
Expand All @@ -11,7 +14,7 @@ namespace Eventuous.RabbitMq.Producers;
/// RabbitMQ producer
/// </summary>
[PublicAPI]
public class RabbitMqProducer : BaseProducer<RabbitMqProduceOptions>, IHostedService {
public class RabbitMqProducer : BaseProducer<RabbitMqProduceOptions>, IHostedProducer {
readonly RabbitMqExchangeOptions? _options;
readonly IEventSerializer _serializer;
readonly ConnectionFactory _connectionFactory;
Expand All @@ -29,7 +32,7 @@ public RabbitMqProducer(
ConnectionFactory connectionFactory,
IEventSerializer? serializer = null,
RabbitMqExchangeOptions? options = null
) : base(true, TracingOptions) {
) : base(TracingOptions) {
_options = options;
_serializer = serializer ?? DefaultEventSerializer.Instance;
_connectionFactory = Ensure.NotNull(connectionFactory);
Expand All @@ -39,9 +42,7 @@ public Task StartAsync(CancellationToken cancellationToken = default) {
_connection = _connectionFactory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ConfirmSelect();

ReadyNow();

Ready = true;
return Task.CompletedTask;
}

Expand Down Expand Up @@ -146,4 +147,6 @@ public Task StopAsync(CancellationToken cancellationToken = default) {

return Task.CompletedTask;
}

public bool Ready { get; private set; }
}

0 comments on commit e61c131

Please sign in to comment.