From 29b1db9b0e484ccfb08f79ce14b43396e86808d5 Mon Sep 17 00:00:00 2001 From: "Oleg V. Kozlyuk" Date: Tue, 27 Aug 2024 21:29:40 +0200 Subject: [PATCH] Use pool of MemoryStream to reduce memory pressure (#520) --- ClickHouse.Client/ClickHouse.Client.csproj | 1 + ClickHouse.Client/Copy/ClickHouseBulkCopy.cs | 13 +++++++------ 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/ClickHouse.Client/ClickHouse.Client.csproj b/ClickHouse.Client/ClickHouse.Client.csproj index 000608b4..51cac8a9 100644 --- a/ClickHouse.Client/ClickHouse.Client.csproj +++ b/ClickHouse.Client/ClickHouse.Client.csproj @@ -25,6 +25,7 @@ + diff --git a/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs b/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs index 404090c6..b93d07d7 100644 --- a/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs +++ b/ClickHouse.Client/Copy/ClickHouseBulkCopy.cs @@ -13,11 +13,13 @@ using ClickHouse.Client.Formats; using ClickHouse.Client.Types; using ClickHouse.Client.Utility; +using Microsoft.IO; namespace ClickHouse.Client.Copy; public class ClickHouseBulkCopy : IDisposable { + private static readonly RecyclableMemoryStreamManager MemoryStreamManager = new(); private readonly ClickHouseConnection connection; private readonly bool ownsConnection; private long rowsWritten; @@ -143,10 +145,8 @@ public async Task WriteToServerAsync(IEnumerable rows, CancellationTok await Task.WhenAll(tasks).ConfigureAwait(false); } - private Stream SerializeBatch(Batch batch) + private void SerializeBatch(Batch batch, Stream stream) { - var stream = new MemoryStream() { Capacity = 8 * 1024 }; - using (var gzipStream = new BufferedStream(new GZipStream(stream, CompressionLevel.Fastest, true), 256 * 1024)) { using (var textWriter = new StreamWriter(gzipStream, Encoding.UTF8, 4 * 1024, true)) @@ -179,16 +179,17 @@ private Stream SerializeBatch(Batch batch) throw new ClickHouseBulkCopySerializationException(row, col, e); } } - stream.Seek(0, SeekOrigin.Begin); - return stream; } private async Task SendBatchAsync(Batch batch, CancellationToken token) { using (batch) // Dispose object regardless whether sending succeeds { + using var stream = MemoryStreamManager.GetStream(nameof(SendBatchAsync)); // Async serialization - using var stream = await Task.Run(() => SerializeBatch(batch)).ConfigureAwait(false); + await Task.Run(() => SerializeBatch(batch, stream)).ConfigureAwait(false); + // Seek to beginning as after writing it's at end + stream.Seek(0, SeekOrigin.Begin); // Async sending await connection.PostStreamAsync(null, stream, true, token).ConfigureAwait(false); // Increase counter