Skip to content

Commit

Permalink
Replicated consumer reconnect logic in producer #238
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Jul 31, 2024
1 parent aebde9c commit f55aff4
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
9 changes: 6 additions & 3 deletions src/Pulsar.Client/Internal/ProducerImpl.fs
Original file line number Diff line number Diff line change
Expand Up @@ -580,11 +580,14 @@ type internal ProducerImpl<'T> private (producerConfig: ProducerConfiguration, c
connectionHandler.ConnectionClosed clientCnx
clientCnx.RemoveProducer(producerId)

| ProducerMessage.ConnectionFailed _ ->
| ProducerMessage.ConnectionFailed ex ->

Log.Logger.LogDebug("{0} ConnectionFailed", prefix)
if Stopwatch.GetElapsedTime(createProducerStartTime) > clientConfig.OperationTimeout then
Log.Logger.LogInformation("{0} creation failed", prefix)
let nonRetriableError = ex |> PulsarClientException.isRetriableError |> not
let timeout = Stopwatch.GetElapsedTime(createProducerStartTime) > clientConfig.OperationTimeout
if ((nonRetriableError || timeout) && producerCreatedTsc.TrySetException(ex)) then
Log.Logger.LogInformation("{0} creation failed {1}", prefix,
if nonRetriableError then "with unretriableError" else "after timeout")
connectionHandler.Failed()
stopProducer()
continueLoop <- false
Expand Down
6 changes: 3 additions & 3 deletions src/Pulsar.Client/Pulsar.Client.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
<Title>Pulsar.Client</Title>
<RootNamespace>Pulsar.Client</RootNamespace>
<AssemblyName>Pulsar.Client</AssemblyName>
<Version>3.5.2</Version>
<Version>3.5.3</Version>
<Company>F# community</Company>
<Description>.NET client library for Apache Pulsar</Description>
<RepositoryUrl>https://github.com/fsprojects/pulsar-client-dotnet</RepositoryUrl>
<PackageReleaseNotes>Fix for missing key in reconsumed message</PackageReleaseNotes>
<PackageReleaseNotes>Replicated consumer reconnect logic in producer</PackageReleaseNotes>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/fsprojects/pulsar-client-dotnet</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>pulsar</PackageTags>
<Authors>F# community</Authors>
<PackageVersion>3.5.2</PackageVersion>
<PackageVersion>3.5.3</PackageVersion>
<DebugType>portable</DebugType>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageReadmeFile>README.md</PackageReadmeFile>
Expand Down

0 comments on commit f55aff4

Please sign in to comment.