Skip to content

Commit

Permalink
AOF Replay using a Background Task at the Replica (#783)
Browse files Browse the repository at this point in the history
* remove unused code

* nit

* cleanup replication tests

* expose replica max lag parameter

* add guarded initialization API for TsavoriteLog to avoid disrupting iterator consumer operations when skipping addresses at replica

* background replay consume implementation

* rename replay max lag parameter and added stats info

* update aof stream checks

* fix LRANGE bug

* ensure replicaof resets replay iterator

* add async replay tests

* remap ReplicationOffsetMaxLag parameter to make infinite lag default

* streamline spinwait for initialization using ProtectAndDrain

* add check to detect AOF divergence at the replica

* update unit tests for ClusterReplayAsync

* delete stale comment

* pageSizeBits instance variable for use with process AOF stream

* update sanity check

* add comment for sanity check

* release new version
  • Loading branch information
vazois authored Nov 21, 2024
1 parent b0e269b commit b2856c3
Show file tree
Hide file tree
Showing 21 changed files with 352 additions and 151 deletions.
2 changes: 1 addition & 1 deletion .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 32) -- NOTE - these two values need to be the same
# 3) update the version in GarnetServer.csproj (~line 8)
######################################
name: 1.0.40
name: 1.0.41
trigger:
branches:
include:
Expand Down
3 changes: 3 additions & 0 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ public MetricsItem[] GetReplicationInfo()
{
var (address, port) = config.GetLocalNodePrimaryAddress();
var primaryLinkStatus = clusterManager.GetPrimaryLinkStatus(config);
var replicationOffsetLag = storeWrapper.appendOnlyFile.TailAddress - replicationManager.ReplicationOffset;
replicationInfo.Add(new("master_host", address));
replicationInfo.Add(new("master_port", port.ToString()));
replicationInfo.Add(primaryLinkStatus[0]);
Expand All @@ -241,6 +242,8 @@ public MetricsItem[] GetReplicationInfo()
replicationInfo.Add(new("slave_read_only", "1"));
replicationInfo.Add(new("replica_announced", "1"));
replicationInfo.Add(new("master_sync_last_io_seconds_ago", replicationManager.LastPrimarySyncSeconds.ToString()));
replicationInfo.Add(new("replication_offset_lag", replicationOffsetLag.ToString()));
replicationInfo.Add(new("replication_offset_max_lag", storeWrapper.serverOptions.ReplicationOffsetMaxLag.ToString()));
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void Dispose()
cts?.Dispose();
}

public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress)
public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddress, long nextAddress, bool isProtected)
{
try
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public bool TryReplicateFromPrimary(out ReadOnlySpan<byte> errorMessage, bool ba
storeWrapper.appendOnlyFile?.WaitForCommit();
}

// Reset background replay iterator
ResetReplayIterator();

// Reset replication offset
ReplicationOffset = 0;

Expand Down
113 changes: 113 additions & 0 deletions libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Threading;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.cluster
{
internal sealed partial class ReplicationManager : IBulkLogEntryConsumer, IDisposable
{
TsavoriteLogScanSingleIterator replayIterator = null;
CancellationTokenSource replicaReplayTaskCts;
SingleWriterMultiReaderLock activeReplay;

/// <summary>
/// Reset background replay iterator
/// </summary>
public void ResetReplayIterator()
{
ResetReplayCts();
replayIterator?.Dispose();
replayIterator = null;

void ResetReplayCts()
{
if (replicaReplayTaskCts == null)
{
replicaReplayTaskCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token);
}
else
{
replicaReplayTaskCts.Cancel();
try
{
activeReplay.WriteLock();
replicaReplayTaskCts.Dispose();
replicaReplayTaskCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token);
}
finally
{
activeReplay.WriteUnlock();
}
}
}
}

public void Throttle() { }

public unsafe void Consume(byte* record, int recordLength, long currentAddress, long nextAddress, bool isProtected)
{
ReplicationOffset = currentAddress;
var ptr = record;
while (ptr < record + recordLength)
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();
var entryLength = storeWrapper.appendOnlyFile.HeaderSize;
var payloadLength = storeWrapper.appendOnlyFile.UnsafeGetLength(ptr);
if (payloadLength > 0)
{
aofProcessor.ProcessAofRecordInternal(ptr + entryLength, payloadLength, true);
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
else if (payloadLength < 0)
{
if (!clusterProvider.serverOptions.EnableFastCommit)
{
throw new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false);
}
TsavoriteLogRecoveryInfo info = new();
info.Initialize(new ReadOnlySpan<byte>(ptr + entryLength, -payloadLength));
storeWrapper.appendOnlyFile?.UnsafeCommitMetadataOnly(info, isProtected);
entryLength += TsavoriteLog.UnsafeAlign(-payloadLength);
}
ptr += entryLength;
ReplicationOffset += entryLength;
}

if (ReplicationOffset != nextAddress)
{
logger?.LogError("ReplicaReplayTask.Consume NextAddress Mismatch recordLength:{recordLength}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; replicationOffset:{ReplicationOffset}", recordLength, currentAddress, nextAddress, ReplicationOffset);
throw new GarnetException($"ReplicaReplayTask.Consume NextAddress Mismatch recordeLength:{recordLength}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; replicationOffset:{ReplicationOffset}", LogLevel.Warning, clientResponse: false);
}
}

public async void ReplicaReplayTask()
{
try
{
activeReplay.ReadLock();
while (true)
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();
await replayIterator.BulkConsumeAllAsync(
this,
clusterProvider.serverOptions.ReplicaSyncDelayMs,
maxChunkSize: 1 << 20,
replicaReplayTaskCts.Token);
}
}
catch (Exception ex)
{
logger?.LogWarning(ex, "An exception occurred at ReplicationManager.ReplicaReplayTask - terminating");
}
finally
{
activeReplay.ReadUnlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,25 @@
// Licensed under the MIT license.

using System;
using System.Runtime.CompilerServices;
using System.Threading;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

namespace Garnet.cluster
{
internal sealed partial class ReplicationManager : IDisposable
{
[MethodImpl(MethodImplOptions.AggressiveInlining)]
void ThrottlePrimary()
{
while (storeWrapper.serverOptions.ReplicationOffsetMaxLag != -1 && replayIterator != null && storeWrapper.appendOnlyFile.TailAddress - ReplicationOffset > storeWrapper.serverOptions.ReplicationOffsetMaxLag)
{
replicaReplayTaskCts.Token.ThrowIfCancellationRequested();
Thread.Yield();
}
}

/// <summary>
/// Apply primary AOF records.
/// </summary>
Expand Down Expand Up @@ -43,19 +54,32 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
if (currentAddress > previousAddress)
{
if (
(currentAddress % (1 << storeWrapper.appendOnlyFile.UnsafeGetLogPageSizeBits()) != 0) || // the skip was to a non-page-boundary
(currentAddress % (1 << pageSizeBits) != 0) || // the skip was to a non-page-boundary
(currentAddress >= previousAddress + recordLength) // the skip will not be auto-handled by the AOF enqueue
)
{
logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress);
storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress);
storeWrapper.appendOnlyFile.SafeInitialize(currentAddress, currentAddress);
ReplicationOffset = currentAddress;
}
}
}

// Address check
if (ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress)
var tail = storeWrapper.appendOnlyFile.TailAddress;
var nextPageBeginAddress = ((tail >> pageSizeBits) + 1) << pageSizeBits;
// Check to ensure:
// 1. if record fits in current page tailAddress of this local node (replica) should be equal to the incoming currentAddress (address of chunk send from primary node)
// 2. if record does not fit in current page start address of the next page matches incoming currentAddress (address of chunk send from primary node)
// otherwise fail and break the connection
if ((tail + recordLength <= nextPageBeginAddress && tail != currentAddress) ||
(tail + recordLength > nextPageBeginAddress && nextPageBeginAddress != currentAddress))
{
logger?.LogError("Divergent AOF Stream recordLength:{recordLength}; previousAddress:{previousAddress}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; tailAddress:{tail}", recordLength, previousAddress, currentAddress, nextAddress, tail);
throw new GarnetException($"Divergent AOF Stream recordLength:{recordLength}; previousAddress:{previousAddress}; currentAddress:{currentAddress}; nextAddress:{nextAddress}; tailAddress:{tail}", LogLevel.Warning, clientResponse: false);
}

// Address check only if synchronous replication is enabled
if (storeWrapper.serverOptions.ReplicationOffsetMaxLag == 0 && ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress)
{
logger?.LogInformation("Processing {recordLength} bytes; previousAddress {previousAddress}, currentAddress {currentAddress}, nextAddress {nextAddress}, current AOF tail {tail}", recordLength, previousAddress, currentAddress, nextAddress, storeWrapper.appendOnlyFile.TailAddress);
logger?.LogError("Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress);
Expand All @@ -65,66 +89,37 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
// Enqueue to AOF
_ = clusterProvider.storeWrapper.appendOnlyFile?.UnsafeEnqueueRaw(new Span<byte>(record, recordLength), noCommit: clusterProvider.serverOptions.EnableFastCommit);

// TODO: rest of the processing can be moved off the critical path

ReplicationOffset = currentAddress;
var ptr = record;
while (ptr < record + recordLength)
if (storeWrapper.serverOptions.ReplicationOffsetMaxLag == 0)
{
var entryLength = storeWrapper.appendOnlyFile.HeaderSize;
var payloadLength = storeWrapper.appendOnlyFile.UnsafeGetLength(ptr);
if (payloadLength > 0)
{
aofProcessor.ProcessAofRecordInternal(ptr + entryLength, payloadLength, true);
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
else if (payloadLength < 0)
{
if (!clusterProvider.serverOptions.EnableFastCommit)
{
throw new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false);
}
TsavoriteLogRecoveryInfo info = new();
info.Initialize(new ReadOnlySpan<byte>(ptr + entryLength, -payloadLength));
storeWrapper.appendOnlyFile?.UnsafeCommitMetadataOnly(info);
entryLength += TsavoriteLog.UnsafeAlign(-payloadLength);
}
ptr += entryLength;
ReplicationOffset += entryLength;
// Synchronous replay
Consume(record, recordLength, currentAddress, nextAddress, isProtected: false);
}

if (ReplicationOffset != nextAddress)
else
{
logger?.LogWarning("Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, nextAddress {nextAddress}", ReplicationOffset, nextAddress);
throw new GarnetException($"Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, nextAddress {nextAddress}", LogLevel.Warning, clientResponse: false);
}
// Throttle to give the opportunity to the background replay task to catch up
ThrottlePrimary();

if (ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress)
{
logger?.LogWarning("After ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress);
throw new GarnetException($"After ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false);
// If background task has not been initialized
// initialize it here and start background replay task
if (replayIterator == null)
{
replayIterator = clusterProvider.storeWrapper.appendOnlyFile.ScanSingle(
previousAddress,
long.MaxValue,
scanUncommitted: true,
recover: false,
logger: logger);

System.Threading.Tasks.Task.Run(ReplicaReplayTask);
}
}
}
catch (Exception ex)
{
logger?.LogWarning(ex, "An exception occurred at ReplicationManager.ProcessPrimaryStream");
ResetReplayIterator();
throw new GarnetException(ex.Message, ex, LogLevel.Warning, clientResponse: false);
}
}

unsafe int GetFirstAofEntryLength(byte* ptr)
{
int entryLength = storeWrapper.appendOnlyFile.HeaderSize;
int payloadLength = storeWrapper.appendOnlyFile.UnsafeGetLength(ptr);
if (payloadLength > 0)
{
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
}
else if (payloadLength < 0)
{
entryLength += TsavoriteLog.UnsafeAlign(-payloadLength);
}
return entryLength;
}
}
}
7 changes: 7 additions & 0 deletions libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ internal sealed partial class ReplicationManager : IDisposable

readonly CancellationTokenSource ctsRepManager = new();

readonly int pageSizeBits;

readonly ILogger logger;
bool _disposed;

Expand Down Expand Up @@ -92,6 +94,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
this.logger = logger;
this.clusterProvider = clusterProvider;
this.storeWrapper = clusterProvider.storeWrapper;
this.pageSizeBits = storeWrapper.appendOnlyFile == null ? 0 : storeWrapper.appendOnlyFile.UnsafeGetLogPageSizeBits();

this.networkPool = networkBufferSettings.CreateBufferPool(logger: logger);
ValidateNetworkBufferSettings();
Expand Down Expand Up @@ -137,6 +140,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null

// After initializing replication history propagate replicationId to ReplicationLogCheckpointManager
SetPrimaryReplicationId();
replicaReplayTaskCts = CancellationTokenSource.CreateLinkedTokenSource(ctsRepManager.Token);
}

/// <summary>
Expand Down Expand Up @@ -189,6 +193,9 @@ public void Dispose()

checkpointStore.WaitForReplicas();
replicaSyncSessionTaskStore.Dispose();
replicaReplayTaskCts.Cancel();
activeReplay.WriteLock();
replicaReplayTaskCts.Dispose();
ctsRepManager.Cancel();
ctsRepManager.Dispose();
aofTaskStore.Dispose();
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Session/ReplicaOfCommand.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Text;
using Garnet.common;
using Garnet.server;
Expand Down Expand Up @@ -40,6 +39,7 @@ private bool TryREPLICAOF(out bool invalidParameters)
}
clusterProvider.clusterManager.TryResetReplica();
clusterProvider.replicationManager.TryUpdateForFailover();
clusterProvider.replicationManager.ResetReplayIterator();
UnsafeBumpAndWaitForEpochTransition();
}
finally
Expand Down
5 changes: 5 additions & 0 deletions libs/host/Configuration/Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ internal sealed class Options
[Option("replica-sync-delay", Required = false, HelpText = "Whether and by how much (milliseconds) should we throttle the replica sync: 0 - disable throttling")]
public int ReplicaSyncDelayMs { get; set; }

[IntRangeValidation(-1, int.MaxValue)]
[Option("replica-offset-max-lag", Required = false, HelpText = "Throttle ClusterAppendLog when replica.AOFTailAddress - ReplicationOffset > ReplicationOffsetMaxLag. 0: Synchronous replay, >=1: background replay with specified lag, -1: infinite lag")]
public int ReplicationOffsetMaxLag { get; set; }

[OptionValidation]
[Option("main-memory-replication", Required = false, HelpText = "Use main-memory replication model.")]
public bool? MainMemoryReplication { get; set; }
Expand Down Expand Up @@ -678,6 +682,7 @@ public GarnetServerOptions GetServerOptions(ILogger logger = null)
CheckpointThrottleFlushDelayMs = CheckpointThrottleFlushDelayMs,
EnableScatterGatherGet = EnableScatterGatherGet.GetValueOrDefault(),
ReplicaSyncDelayMs = ReplicaSyncDelayMs,
ReplicationOffsetMaxLag = ReplicationOffsetMaxLag,
MainMemoryReplication = MainMemoryReplication.GetValueOrDefault(),
OnDemandCheckpoint = OnDemandCheckpoint.GetValueOrDefault(),
UseAofNullDevice = UseAofNullDevice.GetValueOrDefault(),
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace Garnet
public class GarnetServer : IDisposable
{
// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~7 and GarnetServer.csproj line ~8.
readonly string version = "1.0.40";
readonly string version = "1.0.41";

internal GarnetProvider Provider;

Expand Down
Loading

0 comments on commit b2856c3

Please sign in to comment.