Skip to content

Commit 23b61f5

Browse files
committed
Fix cancellation being able to cancel prepended queries (#4907)
Fixes #4906 (cherry picked from commit 218a160)
1 parent 415c02a commit 23b61f5

File tree

3 files changed

+205
-68
lines changed

3 files changed

+205
-68
lines changed

src/Npgsql/Internal/NpgsqlConnector.cs

Lines changed: 130 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,13 @@ public sealed partial class NpgsqlConnector : IDisposable
139139
/// </summary>
140140
internal int PendingPrependedResponses { get; set; }
141141

142+
/// <summary>
143+
/// A ManualResetEventSlim used to make sure a cancellation request doesn't run
144+
/// while we're reading responses for the prepended query
145+
/// as we can't gracefully handle their cancellation.
146+
/// </summary>
147+
readonly ManualResetEventSlim ReadingPrependedMessagesMRE = new(initialState: true);
148+
142149
internal NpgsqlDataReader? CurrentReader;
143150

144151
internal PreparedStatementManager PreparedStatementManager { get; }
@@ -221,7 +228,20 @@ internal void FlagAsWritableForMultiplexing()
221228
/// cancellation is delivered. This reduces the chance that a cancellation meant for a previous
222229
/// command will accidentally cancel a later one, see #615.
223230
/// </summary>
224-
internal object CancelLock { get; }
231+
object CancelLock { get; } = new();
232+
233+
/// <summary>
234+
/// A lock that's taken to make sure no other concurrent operation is running.
235+
/// Break takes it to set the state of the connector.
236+
/// Anyone else should immediately check the state and exit
237+
/// if the connector is closed.
238+
/// </summary>
239+
object SyncObj { get; } = new();
240+
241+
/// <summary>
242+
/// A lock that's used to wait for the Cleanup to complete while breaking the connection.
243+
/// </summary>
244+
object CleanupLock { get; } = new();
225245

226246
readonly bool _isKeepAliveEnabled;
227247
readonly Timer? _keepAliveTimer;
@@ -341,8 +361,6 @@ internal NpgsqlConnector(ConnectorSource connectorSource, NpgsqlConnection conn)
341361
Settings = connectorSource.Settings;
342362
PostgresParameters = new Dictionary<string, string>();
343363

344-
CancelLock = new object();
345-
346364
_isKeepAliveEnabled = Settings.KeepAlive > 0;
347365
if (_isKeepAliveEnabled)
348366
{
@@ -504,7 +522,7 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
504522
// Start the keep alive mechanism to work by scheduling the timer.
505523
// Otherwise, it doesn't work for cases when no query executed during
506524
// the connection lifetime in case of a new connector.
507-
lock (this)
525+
lock (SyncObj)
508526
{
509527
var keepAlive = Settings.KeepAlive * 1000;
510528
_keepAliveTimer!.Change(keepAlive, keepAlive);
@@ -1298,9 +1316,14 @@ internal ValueTask<IBackendMessage> ReadMessage(bool async, DataRowLoadingMode d
12981316
connector.ReadBuffer.Timeout = TimeSpan.FromMilliseconds(connector.InternalCommandTimeout);
12991317
for (; connector.PendingPrependedResponses > 0; connector.PendingPrependedResponses--)
13001318
await ReadMessageLong(connector, async, DataRowLoadingMode.Skip, readingNotifications: false, isReadingPrependedMessage: true);
1319+
// We've read all the prepended response.
1320+
// Allow cancellation to proceed.
1321+
connector.ReadingPrependedMessagesMRE.Set();
13011322
}
1302-
catch (PostgresException e)
1323+
catch (Exception e)
13031324
{
1325+
// Prepended queries should never fail.
1326+
// If they do, we're not even going to attempt to salvage the connector.
13041327
throw connector.Break(e);
13051328
}
13061329
}
@@ -1663,18 +1686,39 @@ static RemoteCertificateValidationCallback SslRootValidation(string certRootPath
16631686

16641687
#region Cancel
16651688

1689+
internal void ResetCancellation()
1690+
{
1691+
// If a cancellation is in progress, wait for it to "complete" before proceeding (#615)
1692+
lock (CancelLock)
1693+
{
1694+
if (PendingPrependedResponses > 0)
1695+
ReadingPrependedMessagesMRE.Reset();
1696+
Debug.Assert(ReadingPrependedMessagesMRE.IsSet || PendingPrependedResponses > 0);
1697+
}
1698+
}
1699+
16661700
internal void PerformUserCancellation()
16671701
{
16681702
var connection = Connection;
16691703
if (connection is null || connection.ConnectorBindingScope == ConnectorBindingScope.Reader)
16701704
return;
16711705

1672-
// There's a subtle race condition where cancellation may be happening just as Break is called. Break takes the connector lock, and
1673-
// then ends the user action; this disposes the cancellation token registration, which waits until the cancellation callback
1674-
// completes. But the callback needs to take the connector lock below, which led to a deadlock (#4654).
1675-
// As a result, Break takes CancelLock, and we abort the cancellation attempt immediately if we can't get it here.
1676-
if (!Monitor.TryEnter(CancelLock))
1677-
return;
1706+
// Take the lock first to make sure there is no concurrent Break.
1707+
// We should be safe to take it as Break only take it to set the state.
1708+
lock (SyncObj)
1709+
{
1710+
// The connector is dead, exit gracefully.
1711+
if (!IsConnected)
1712+
return;
1713+
// The connector is still alive, take the CancelLock before exiting SingleUseLock.
1714+
// If a break will happen after, it's going to wait for the cancellation to complete.
1715+
Monitor.Enter(CancelLock);
1716+
}
1717+
1718+
// Wait before we've read all responses for the prepended queries
1719+
// as we can't gracefully handle their cancellation.
1720+
// Break makes sure that it's going to be set even if we fail while reading them.
1721+
ReadingPrependedMessagesMRE.Wait();
16781722

16791723
try
16801724
{
@@ -1687,28 +1731,18 @@ internal void PerformUserCancellation()
16871731
{
16881732
if (cancellationTimeout > 0)
16891733
{
1690-
lock (this)
1691-
{
1692-
if (!IsConnected)
1693-
return;
1694-
UserTimeout = cancellationTimeout;
1695-
ReadBuffer.Timeout = TimeSpan.FromMilliseconds(cancellationTimeout);
1696-
ReadBuffer.Cts.CancelAfter(cancellationTimeout);
1697-
}
1734+
UserTimeout = cancellationTimeout;
1735+
ReadBuffer.Timeout = TimeSpan.FromMilliseconds(cancellationTimeout);
1736+
ReadBuffer.Cts.CancelAfter(cancellationTimeout);
16981737
}
16991738

17001739
return;
17011740
}
17021741
}
17031742

1704-
lock (this)
1705-
{
1706-
if (!IsConnected)
1707-
return;
1708-
UserTimeout = -1;
1709-
ReadBuffer.Timeout = _cancelImmediatelyTimeout;
1710-
ReadBuffer.Cts.Cancel();
1711-
}
1743+
UserTimeout = -1;
1744+
ReadBuffer.Timeout = _cancelImmediatelyTimeout;
1745+
ReadBuffer.Cts.Cancel();
17121746
}
17131747
finally
17141748
{
@@ -1783,8 +1817,7 @@ void DoCancelRequest(int backendProcessId, int backendSecretKey)
17831817
}
17841818
finally
17851819
{
1786-
lock (this)
1787-
FullCleanup();
1820+
FullCleanup();
17881821
}
17891822
}
17901823

@@ -1890,7 +1923,7 @@ copyOperation is NpgsqlCopyTextWriter ||
18901923
// very unlikely to block (plus locking would need to be worked out)
18911924
internal void Close()
18921925
{
1893-
lock (this)
1926+
lock (SyncObj)
18941927
{
18951928
Log.Trace("Closing connector", Id);
18961929

@@ -1920,8 +1953,9 @@ internal void Close()
19201953
}
19211954

19221955
State = ConnectorState.Closed;
1923-
FullCleanup();
19241956
}
1957+
1958+
FullCleanup();
19251959
}
19261960

19271961
internal bool TryRemovePendingEnlistedConnector(Transaction transaction)
@@ -1950,11 +1984,43 @@ internal Exception Break(Exception reason)
19501984
{
19511985
Debug.Assert(!IsClosed);
19521986

1953-
// See PerformUserCancellation on why we take CancelLock
1954-
lock (CancelLock)
1955-
lock (this)
1987+
Monitor.Enter(SyncObj);
1988+
1989+
if (State == ConnectorState.Broken)
19561990
{
1957-
if (State != ConnectorState.Broken)
1991+
// We're already broken.
1992+
// Exit SingleUseLock to unblock other threads (like cancellation).
1993+
Monitor.Exit(SyncObj);
1994+
// Wait for the break to complete before going forward.
1995+
lock (CleanupLock) { }
1996+
return reason;
1997+
}
1998+
1999+
try
2000+
{
2001+
// If we're broken while reading prepended messages
2002+
// the cancellation request might still be waiting on the MRE.
2003+
// Unblock it.
2004+
ReadingPrependedMessagesMRE.Set();
2005+
2006+
Log.Error("Breaking connector", reason, Id);
2007+
2008+
// Note that we may be reading and writing from the same connector concurrently, so safely set
2009+
// the original reason for the break before actually closing the socket etc.
2010+
Interlocked.CompareExchange(ref _breakReason, reason, null);
2011+
State = ConnectorState.Broken;
2012+
// Take the CleanupLock while in SingleUseLock to make sure concurrent Break doesn't take it first.
2013+
Monitor.Enter(CleanupLock);
2014+
}
2015+
finally
2016+
{
2017+
// Unblock other threads (like cancellation) to proceed and exit gracefully.
2018+
Monitor.Exit(SyncObj);
2019+
}
2020+
2021+
try
2022+
{
2023+
lock (CancelLock)
19582024
{
19592025
// Note we only set the cluster to offline and clear the pool if the connection is being broken (we're in this method),
19602026
// *and* the exception indicates that the PG cluster really is down; the latter includes any IO/timeout issue,
@@ -1968,13 +2034,6 @@ internal Exception Break(Exception reason)
19682034
_connectorSource.Clear();
19692035
}
19702036

1971-
Log.Error("Breaking connector", reason, Id);
1972-
1973-
// Note that we may be reading and writing from the same connector concurrently, so safely set
1974-
// the original reason for the break before actually closing the socket etc.
1975-
Interlocked.CompareExchange(ref _breakReason, reason, null);
1976-
State = ConnectorState.Broken;
1977-
19782037
var connection = Connection;
19792038

19802039
FullCleanup();
@@ -1992,41 +2051,47 @@ internal Exception Break(Exception reason)
19922051
connection.Connector = null;
19932052
connection.ConnectorBindingScope = ConnectorBindingScope.None;
19942053
}
2054+
19952055
connection.FullState = ConnectionState.Broken;
19962056
connection.ReleaseCloseLock();
19972057
}
19982058
}
19992059

20002060
return reason;
20012061
}
2062+
finally
2063+
{
2064+
Monitor.Exit(CleanupLock);
2065+
}
20022066
}
20032067

20042068
void FullCleanup()
20052069
{
2006-
Debug.Assert(Monitor.IsEntered(this));
2007-
2008-
if (Settings.Multiplexing)
2070+
lock (CleanupLock)
20092071
{
2010-
FlagAsNotWritableForMultiplexing();
2072+
if (Settings.Multiplexing)
2073+
{
2074+
FlagAsNotWritableForMultiplexing();
20112075

2012-
// Note that in multiplexing, this could be called from the read loop, while the write loop is
2013-
// writing into the channel. To make sure this race condition isn't a problem, the channel currently
2014-
// isn't set up with SingleWriter (since at this point it doesn't do anything).
2015-
CommandsInFlightWriter!.Complete();
2076+
// Note that in multiplexing, this could be called from the read loop, while the write loop is
2077+
// writing into the channel. To make sure this race condition isn't a problem, the channel currently
2078+
// isn't set up with SingleWriter (since at this point it doesn't do anything).
2079+
CommandsInFlightWriter!.Complete();
20162080

2017-
// The connector's read loop has a continuation to observe and log any exception coming out
2018-
// (see Open)
2019-
}
2081+
// The connector's read loop has a continuation to observe and log any exception coming out
2082+
// (see Open)
2083+
}
20202084

2021-
Log.Trace("Cleaning up connector", Id);
2022-
Cleanup();
2085+
Log.Trace("Cleaning up connector", Id);
2086+
Cleanup();
20232087

2024-
if (_isKeepAliveEnabled)
2025-
{
2026-
_userLock!.Dispose();
2027-
_userLock = null;
2028-
_keepAliveTimer!.Change(Timeout.Infinite, Timeout.Infinite);
2029-
_keepAliveTimer.Dispose();
2088+
if (_isKeepAliveEnabled)
2089+
{
2090+
_userLock!.Dispose();
2091+
_userLock = null;
2092+
_keepAliveTimer!.Change(Timeout.Infinite, Timeout.Infinite);
2093+
_keepAliveTimer.Dispose();
2094+
}
20302095
}
20312096
}
20322097

@@ -2270,7 +2335,7 @@ internal UserAction StartUserAction(
22702335
if (!_isKeepAliveEnabled)
22712336
return DoStartUserAction();
22722337

2273-
lock (this)
2338+
lock (SyncObj)
22742339
{
22752340
if (!IsConnected)
22762341
{
@@ -2353,7 +2418,7 @@ internal void EndUserAction()
23532418

23542419
if (_isKeepAliveEnabled)
23552420
{
2356-
lock (this)
2421+
lock (SyncObj)
23572422
{
23582423
if (IsReady || !IsConnected)
23592424
return;
@@ -2399,7 +2464,7 @@ void PerformKeepAlive(object? state)
23992464

24002465
// SemaphoreSlim.Dispose() isn't thread-safe - it may be in progress so we shouldn't try to wait on it;
24012466
// we need a standard lock to protect it.
2402-
if (!Monitor.TryEnter(this))
2467+
if (!Monitor.TryEnter(SyncObj))
24032468
return;
24042469

24052470
try
@@ -2432,7 +2497,7 @@ void PerformKeepAlive(object? state)
24322497
}
24332498
finally
24342499
{
2435-
Monitor.Exit(this);
2500+
Monitor.Exit(SyncObj);
24362501
}
24372502
}
24382503
#pragma warning restore CA1801 // Review unused parameters

src/Npgsql/NpgsqlCommand.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1400,9 +1400,7 @@ internal async ValueTask<NpgsqlDataReader> ExecuteReader(CommandBehavior behavio
14001400
TraceCommandStart(connector);
14011401

14021402
// If a cancellation is in progress, wait for it to "complete" before proceeding (#615)
1403-
lock (connector.CancelLock)
1404-
{
1405-
}
1403+
connector.ResetCancellation();
14061404

14071405
// We do not wait for the entire send to complete before proceeding to reading -
14081406
// the sending continues in parallel with the user's reading. Waiting for the

0 commit comments

Comments
 (0)