Skip to content

Commit

Permalink
Use pool of MemoryStream to reduce memory pressure (#520)
Browse files Browse the repository at this point in the history
  • Loading branch information
DarkWanderer authored Aug 27, 2024
1 parent 1d4043b commit 29b1db9
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 6 deletions.
1 change: 1 addition & 0 deletions ClickHouse.Client/ClickHouse.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Http" Version="8.0.0" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="3.0.1" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All" />
<PackageReference Include="NodaTime" Version="3.1.11" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.118">
Expand Down
13 changes: 7 additions & 6 deletions ClickHouse.Client/Copy/ClickHouseBulkCopy.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
using ClickHouse.Client.Formats;
using ClickHouse.Client.Types;
using ClickHouse.Client.Utility;
using Microsoft.IO;

namespace ClickHouse.Client.Copy;

public class ClickHouseBulkCopy : IDisposable
{
private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new();
private readonly ClickHouseConnection connection;
private readonly bool ownsConnection;
private long rowsWritten;
Expand Down Expand Up @@ -143,10 +145,8 @@ public async Task WriteToServerAsync(IEnumerable<object[]> rows, CancellationTok
await Task.WhenAll(tasks).ConfigureAwait(false);
}

private Stream SerializeBatch(Batch batch)
private void SerializeBatch(Batch batch, Stream stream)
{
var stream = new MemoryStream() { Capacity = 8 * 1024 };

using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024))
{
using (var textWriter = new StreamWriter(gzipStream, Encoding.UTF8, 4 * 1024, true))
Expand Down Expand Up @@ -179,16 +179,17 @@ private Stream SerializeBatch(Batch batch)
throw new ClickHouseBulkCopySerializationException(row, col, e);
}
}
stream.Seek(0, SeekOrigin.Begin);
return stream;
}

private async Task SendBatchAsync(Batch batch, CancellationToken token)
{
using (batch) // Dispose object regardless whether sending succeeds
{
using var stream = MemoryStreamManager.GetStream(nameof(SendBatchAsync));
// Async serialization
using var stream = await Task.Run(() => SerializeBatch(batch)).ConfigureAwait(false);
await Task.Run(() => SerializeBatch(batch, stream)).ConfigureAwait(false);

Check warning on line 190 in ClickHouse.Client/Copy/ClickHouseBulkCopy.cs

View workflow job for this annotation

GitHub Actions / Short

Forward the 'token' parameter to the 'Run' method or pass in 'CancellationToken.None' explicitly to indicate intentionally not propagating the token (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca2016)
// Seek to beginning as after writing it's at end
stream.Seek(0, SeekOrigin.Begin);
// Async sending
await connection.PostStreamAsync(null, stream, true, token).ConfigureAwait(false);
// Increase counter
Expand Down

0 comments on commit 29b1db9

Please sign in to comment.