Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,8 @@ public bool AllowDataLoss
=> serverOptions.AllowDataLoss;

/// <inheritdoc />
public void Recover()
{
replicationManager.Recover();
}
public ValueTask RecoverAsync()
=> replicationManager.RecoverAsync();

/// <inheritdoc />
public bool PreventRoleChange()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async Task<string> ReplicaSyncAttachTaskAsync(bool downgradeLock, bool forceAsyn
cEntry = GetLatestCheckpointEntryFromDisk();
logger?.LogCheckpointEntry(LogLevel.Information, nameof(ReplicaSyncAttachTaskAsync), cEntry);

storeWrapper.RecoverAOF();
await storeWrapper.RecoverAOFAsync().ConfigureAwait(false);
logger?.LogInformation("InitiateReplicaSync: AOF BeginAddress:{beginAddress} AOF TailAddress:{tailAddress}", storeWrapper.appendOnlyFile.Log.BeginAddress, storeWrapper.appendOnlyFile.Log.TailAddress);

var beginAddress = storeWrapper.appendOnlyFile.Log.BeginAddress;
Expand Down Expand Up @@ -301,10 +301,12 @@ public AofAddress TryReplicaDiskbasedRecovery(
remoteCheckpoint.metadata.storeIndexToken,
remoteCheckpoint.metadata.storeHlogToken);

storeWrapper.RecoverCheckpoint(
#pragma warning disable VSTHRD002 // The replica-recovery RESP path is synchronous and must complete before sending a response.
storeWrapper.RecoverCheckpointAsync(
replicaRecover: true,
recoverStoreFromToken,
remoteCheckpoint.metadata);
remoteCheckpoint.metadata).AsTask().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002

if (replayAOFMap > 0)
{
Expand Down
14 changes: 7 additions & 7 deletions libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -509,20 +509,20 @@ public void Dispose()
/// <summary>
/// Main recover method for replication
/// </summary>
public void Recover()
public async ValueTask RecoverAsync()
{
var nodeRole = clusterProvider.clusterManager.CurrentConfig.LocalNodeRole;

switch (nodeRole)
{
case NodeRole.PRIMARY:
RecoverCheckpointAndAOF();
await RecoverCheckpointAndAOFAsync().ConfigureAwait(false);
break;
case NodeRole.REPLICA:
// If configured, load from disk - otherwise wait to connect with a Primary
if (clusterProvider.serverOptions.ClusterReplicaResumeWithData)
{
RecoverCheckpointAndAOF();
await RecoverCheckpointAndAOFAsync().ConfigureAwait(false);
}

break;
Expand All @@ -535,10 +535,10 @@ public void Recover()
/// <summary>
/// Recover whatever is available from <see cref="storeWrapper"/>.
/// </summary>
private void RecoverCheckpointAndAOF()
private async ValueTask RecoverCheckpointAndAOFAsync()
{
storeWrapper.RecoverCheckpoint();
storeWrapper.RecoverAOF();
await storeWrapper.RecoverCheckpointAsync().ConfigureAwait(false);
await storeWrapper.RecoverAOFAsync().ConfigureAwait(false);
if (clusterProvider.serverOptions.EnableAOF)
{
// If recovered checkpoint corresponds to an unavailable AOF address, we initialize AOF to that address
Expand All @@ -555,7 +555,7 @@ private void RecoverCheckpointAndAOF()

// First recover and then load latest checkpoint info in-memory
if (!InitializeCheckpointStore())
logger?.LogWarning("Failed acquiring latest memory checkpoint metadata at {method}", nameof(RecoverCheckpointAndAOF));
logger?.LogWarning("Failed acquiring latest memory checkpoint metadata at {method}", nameof(RecoverCheckpointAndAOFAsync));
}

/// <summary>
Expand Down
4 changes: 3 additions & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ private GarnetAppendOnlyFile CreateAOF(int dbId)
/// </summary>
public void Start()
{
Provider.Recover();
#pragma warning disable VSTHRD002 // Server startup is synchronous and must complete recovery before accepting connections.
Provider.RecoverAsync().AsTask().GetAwaiter().GetResult();
#pragma warning restore VSTHRD002
for (var i = 0; i < servers.Length; i++)
servers[i].Start();
Provider.Start();
Expand Down
9 changes: 2 additions & 7 deletions libs/server/AOF/GarnetLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,8 @@ public AofAddress MemorySizeBytes
}
}

public void Recover()
{
if (singleLog != null)
singleLog.Recover();
else
shardedLog.Recover();
}
public ValueTask RecoverAsync()
=> singleLog != null ? singleLog.RecoverAsync() : shardedLog.RecoverAsync();

public bool RecoverLatestSequenceNumber(out long recoverUntilSequenceNumber)
{
Expand Down
5 changes: 3 additions & 2 deletions libs/server/AOF/ShardedLog.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Microsoft.Extensions.Logging;
using Tsavorite.core;
Expand Down Expand Up @@ -166,10 +167,10 @@ public AofAddress MemorySizeBytes
}
}

public void Recover()
public async ValueTask RecoverAsync()
{
foreach (var log in sublog)
log.Recover();
await log.RecoverAsync().ConfigureAwait(false);
}

public void Reset()
Expand Down
3 changes: 2 additions & 1 deletion libs/server/AOF/SingleLog.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Tsavorite.core;

Expand Down Expand Up @@ -39,7 +40,7 @@ public class SingleLog(TsavoriteLogSettings logSettings, ILogger logger = null)

public AofAddress MemorySizeBytes => AofAddress.Create(1, value: log.MemorySizeBytes);

public void Recover() => log.Recover();
public ValueTask RecoverAsync() => log.RecoverAsync();
public void Reset() => log.Reset();

public void Dispose()
Expand Down
2 changes: 1 addition & 1 deletion libs/server/Cluster/IClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public interface IClusterProvider : IDisposable
/// <summary>
/// Recover the cluster
/// </summary>
void Recover();
ValueTask RecoverAsync();

/// <summary>
/// Reset gossip stats
Expand Down
19 changes: 8 additions & 11 deletions libs/server/Databases/DatabaseManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ internal abstract class DatabaseManagerBase : IDatabaseManager
public abstract void ResumeCheckpoints(int dbId);

/// <inheritdoc/>
public abstract void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null);
public abstract ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null);

/// <inheritdoc/>
public abstract Task<bool> TakeCheckpointAsync(bool background, int dbId = -1, CancellationToken token = default, ILogger logger = null);
Expand All @@ -57,7 +57,7 @@ public abstract Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit,
public abstract Task WaitForCommitToAofAsync(CancellationToken token = default, ILogger logger = null);

/// <inheritdoc/>
public abstract void RecoverAOF();
public abstract ValueTask RecoverAOFAsync();

/// <inheritdoc/>
public abstract AofAddress ReplayAOF(AofAddress untilAddress);
Expand Down Expand Up @@ -164,18 +164,15 @@ protected DatabaseManagerBase(StoreWrapper.DatabaseCreatorDelegate createDatabas
/// Recover single database from checkpoint
/// </summary>
/// <param name="db">Database to recover</param>
/// <param name="storeVersion">Store version</param>
protected void RecoverDatabaseCheckpoint(GarnetDatabase db, out long storeVersion)
protected async ValueTask<long> RecoverDatabaseCheckpointAsync(GarnetDatabase db)
{
storeVersion = 0;

storeVersion = db.Store.Recover();
var storeVersion = await db.Store.RecoverAsync().ConfigureAwait(false);
Logger?.LogInformation("Recovered store to version {storeVersion}", storeVersion);

if (storeVersion > 0)
{
db.LastSaveTime = DateTimeOffset.UtcNow;
}

return storeVersion;
}

/// <summary>
Expand Down Expand Up @@ -227,11 +224,11 @@ protected static void ResumeCheckpoints(GarnetDatabase db)
/// Recover a single database from AOF
/// </summary>
/// <param name="db">Database to recover</param>
protected void RecoverDatabaseAOF(GarnetDatabase db)
protected async ValueTask RecoverDatabaseAOFAsync(GarnetDatabase db)
{
if (db.AppendOnlyFile == null) return;

db.AppendOnlyFile.Log.Recover();
await db.AppendOnlyFile.Log.RecoverAsync().ConfigureAwait(false);
Logger?.LogInformation("Recovered AOF: begin address = {beginAddress}, tail address = {tailAddress}, DB ID: {id}",
db.AppendOnlyFile.Log.BeginAddress, db.AppendOnlyFile.Log.TailAddress, db.Id);
}
Expand Down
4 changes: 2 additions & 2 deletions libs/server/Databases/IDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public interface IDatabaseManager : IDisposable
/// </summary>
/// <param name="replicaRecover"></param>
/// <param name="recoverFromToken"></param>
public void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null);
public ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null);

/// <summary>
/// Take checkpoint of all active databases (or a specified database) if checkpointing is not in progress
Expand Down Expand Up @@ -140,7 +140,7 @@ public Task TaskCheckpointBasedOnAofSizeLimitAsync(long aofSizeLimit, Cancellati
/// <summary>
/// Recover AOF
/// </summary>
public void RecoverAOF();
public ValueTask RecoverAOFAsync();

/// <summary>
/// When replaying AOF we do not want to write AOF records again.
Expand Down
10 changes: 5 additions & 5 deletions libs/server/Databases/MultiDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,11 @@ public MultiDatabaseManager(SingleDatabaseManager src) :
}

/// <inheritdoc/>
public override void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
public override async ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
{
if (replicaRecover)
throw new GarnetException(
$"Unexpected call to {nameof(MultiDatabaseManager)}.{nameof(RecoverCheckpoint)} with {nameof(replicaRecover)} == true.");
$"Unexpected call to {nameof(MultiDatabaseManager)}.{nameof(RecoverCheckpointAsync)} with {nameof(replicaRecover)} == true.");

var checkpointParentDir = StoreWrapper.serverOptions.StoreCheckpointBaseDirectory;
var checkpointDirBaseName = GarnetServerOptions.GetCheckpointDirectoryName(0);
Expand Down Expand Up @@ -116,7 +116,7 @@ public override void RecoverCheckpoint(bool replicaRecover = false, bool recover

try
{
RecoverDatabaseCheckpoint(db, out storeVersion);
storeVersion = await RecoverDatabaseCheckpointAsync(db).ConfigureAwait(false);
}
catch (TsavoriteNoHybridLogException ex)
{
Expand Down Expand Up @@ -416,7 +416,7 @@ public override async Task WaitForCommitToAofAsync(CancellationToken token = def
}

/// <inheritdoc/>
public override void RecoverAOF()
public override async ValueTask RecoverAOFAsync()
{
var aofParentDir = StoreWrapper.serverOptions.AppendOnlyFileBaseDirectory;
var aofDirBaseName = GarnetServerOptions.GetAppendOnlyFileDirectoryName(0);
Expand All @@ -442,7 +442,7 @@ public override void RecoverAOF()
if (!success)
throw new GarnetException($"Failed to retrieve or create database for AOF recovery (DB ID = {dbId}).");

RecoverDatabaseAOF(db);
await RecoverDatabaseAOFAsync(db).ConfigureAwait(false);
}
}

Expand Down
10 changes: 6 additions & 4 deletions libs/server/Databases/SingleDatabaseManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public override GarnetDatabase TryGetOrAddDatabase(int dbId, out bool success, o
}

/// <inheritdoc/>
public override void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
public override async ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
{
long storeVersion = 0;
try
Expand All @@ -64,15 +64,17 @@ public override void RecoverCheckpoint(bool replicaRecover = false, bool recover
// Note: Since replicaRecover only pertains to cluster-mode, we can use the default store pointers (since multi-db mode is disabled in cluster-mode)
if (metadata!.storeIndexToken != default && metadata.storeHlogToken != default)
{
storeVersion = !recoverFromToken ? Store.Recover() : Store.Recover(metadata.storeIndexToken, metadata.storeHlogToken);
storeVersion = !recoverFromToken
? await Store.RecoverAsync().ConfigureAwait(false)
: await Store.RecoverAsync(metadata.storeIndexToken, metadata.storeHlogToken).ConfigureAwait(false);
}

if (storeVersion > 0)
defaultDatabase.LastSaveTime = DateTimeOffset.UtcNow;
}
else
{
RecoverDatabaseCheckpoint(defaultDatabase, out storeVersion);
storeVersion = await RecoverDatabaseCheckpointAsync(defaultDatabase).ConfigureAwait(false);
}
}
catch (TsavoriteNoHybridLogException ex)
Expand Down Expand Up @@ -239,7 +241,7 @@ public override async Task WaitForCommitToAofAsync(CancellationToken token = def
}

/// <inheritdoc/>
public override void RecoverAOF() => RecoverDatabaseAOF(defaultDatabase);
public override ValueTask RecoverAOFAsync() => RecoverDatabaseAOFAsync(defaultDatabase);

/// <inheritdoc/>
public override AofAddress ReplayAOF(AofAddress untilAddress)
Expand Down
5 changes: 3 additions & 2 deletions libs/server/Providers/GarnetProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT license.

using System.Threading;
using System.Threading.Tasks;
using Garnet.common;
using Garnet.networking;
using Tsavorite.core;
Expand Down Expand Up @@ -43,8 +44,8 @@ public void Start()
/// <summary>
/// Recover
/// </summary>
public void Recover()
=> storeWrapper.Recover();
public ValueTask RecoverAsync()
=> storeWrapper.RecoverAsync();

/// <summary>
/// Dispose
Expand Down
14 changes: 7 additions & 7 deletions libs/server/StoreWrapper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,19 +360,19 @@ public IPEndPoint GetClusterEndpoint()
return localEndPoint;
}

internal void Recover()
internal async ValueTask RecoverAsync()
{
if (serverOptions.EnableCluster)
{
if (serverOptions.Recover)
clusterProvider.Recover();
await clusterProvider.RecoverAsync().ConfigureAwait(false);
}
else
{
if (serverOptions.Recover)
{
RecoverCheckpoint();
RecoverAOF();
await RecoverCheckpointAsync().ConfigureAwait(false);
await RecoverAOFAsync().ConfigureAwait(false);
ReplayAOF(AofAddress.Create(length: serverOptions.AofPhysicalSublogCount, value: -1));
}
}
Expand Down Expand Up @@ -413,10 +413,10 @@ public async Task TakeOnDemandCheckpointAsync(DateTimeOffset entryTime, int dbId
/// <summary>
/// Recover checkpoint
/// </summary>
public void RecoverCheckpoint(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
public async ValueTask RecoverCheckpointAsync(bool replicaRecover = false, bool recoverFromToken = false, CheckpointMetadata metadata = null)
{
StartSizeTrackers(); // We need to start this before recovery to have size tracking during the recovery process.
databaseManager.RecoverCheckpoint(replicaRecover, recoverFromToken, metadata);
await databaseManager.RecoverCheckpointAsync(replicaRecover, recoverFromToken, metadata).ConfigureAwait(false);
}

/// <summary>
Expand Down Expand Up @@ -447,7 +447,7 @@ public void ResumeCheckpoints(int dbId = 0)
/// <summary>
/// Recover AOF
/// </summary>
public void RecoverAOF() => databaseManager.RecoverAOF();
public ValueTask RecoverAOFAsync() => databaseManager.RecoverAOFAsync();

/// <summary>
/// When replaying AOF we do not want to write AOF records again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ internal bool MaybeRecoverStore<SF, A>(TsavoriteKV<SF, A> store)
try
{
var sw = Stopwatch.StartNew();
store.Recover();
store.RecoverAsync().AsTask().GetAwaiter().GetResult();
sw.Stop();
Console.WriteLine($" Completed recovery in {(double)sw.ElapsedMilliseconds / 1000:N3} seconds");
return true;
Expand Down
Loading
Loading