@@ -139,6 +139,13 @@ public sealed partial class NpgsqlConnector : IDisposable
139139 /// </summary>
140140 internal int PendingPrependedResponses { get ; set ; }
141141
142+ /// <summary>
143+ /// A ManualResetEventSlim used to make sure a cancellation request doesn't run
144+ /// while we're reading responses for the prepended query
145+ /// as we can't gracefully handle their cancellation.
146+ /// </summary>
147+ readonly ManualResetEventSlim ReadingPrependedMessagesMRE = new ( initialState : true ) ;
148+
142149 internal NpgsqlDataReader ? CurrentReader ;
143150
144151 internal PreparedStatementManager PreparedStatementManager { get ; }
@@ -221,7 +228,20 @@ internal void FlagAsWritableForMultiplexing()
221228 /// cancellation is delivered. This reduces the chance that a cancellation meant for a previous
222229 /// command will accidentally cancel a later one, see #615.
223230 /// </summary>
224- internal object CancelLock { get ; }
231+ object CancelLock { get ; } = new ( ) ;
232+
233+ /// <summary>
234+ /// A lock that's taken to make sure no other concurrent operation is running.
235+ /// Break takes it to set the state of the connector.
236+ /// Anyone else should immediately check the state and exit
237+ /// if the connector is closed.
238+ /// </summary>
239+ object SyncObj { get ; } = new ( ) ;
240+
241+ /// <summary>
242+ /// A lock that's used to wait for the Cleanup to complete while breaking the connection.
243+ /// </summary>
244+ object CleanupLock { get ; } = new ( ) ;
225245
226246 readonly bool _isKeepAliveEnabled ;
227247 readonly Timer ? _keepAliveTimer ;
@@ -341,8 +361,6 @@ internal NpgsqlConnector(ConnectorSource connectorSource, NpgsqlConnection conn)
341361 Settings = connectorSource . Settings ;
342362 PostgresParameters = new Dictionary < string , string > ( ) ;
343363
344- CancelLock = new object ( ) ;
345-
346364 _isKeepAliveEnabled = Settings . KeepAlive > 0 ;
347365 if ( _isKeepAliveEnabled )
348366 {
@@ -504,7 +522,7 @@ internal async Task Open(NpgsqlTimeout timeout, bool async, CancellationToken ca
504522 // Start the keep alive mechanism to work by scheduling the timer.
505523 // Otherwise, it doesn't work for cases when no query executed during
506524 // the connection lifetime in case of a new connector.
507- lock ( this )
525+ lock ( SyncObj )
508526 {
509527 var keepAlive = Settings . KeepAlive * 1000 ;
510528 _keepAliveTimer ! . Change ( keepAlive , keepAlive ) ;
@@ -1298,9 +1316,14 @@ internal ValueTask<IBackendMessage> ReadMessage(bool async, DataRowLoadingMode d
12981316 connector . ReadBuffer . Timeout = TimeSpan . FromMilliseconds ( connector . InternalCommandTimeout ) ;
12991317 for ( ; connector . PendingPrependedResponses > 0 ; connector . PendingPrependedResponses -- )
13001318 await ReadMessageLong ( connector , async , DataRowLoadingMode . Skip , readingNotifications : false, isReadingPrependedMessage : true) ;
1319+ // We've read all the prepended response.
1320+ // Allow cancellation to proceed.
1321+ connector . ReadingPrependedMessagesMRE. Set( ) ;
13011322 }
1302- catch ( PostgresException e )
1323+ catch ( Exception e)
13031324 {
1325+ // Prepended queries should never fail.
1326+ // If they do, we're not even going to attempt to salvage the connector.
13041327 throw connector. Break( e) ;
13051328 }
13061329 }
@@ -1663,18 +1686,39 @@ static RemoteCertificateValidationCallback SslRootValidation(string certRootPath
16631686
16641687 #region Cancel
16651688
1689+ internal void ResetCancellation( )
1690+ {
1691+ // If a cancellation is in progress, wait for it to "complete" before proceeding (#615)
1692+ lock ( CancelLock )
1693+ {
1694+ if ( PendingPrependedResponses > 0 )
1695+ ReadingPrependedMessagesMRE. Reset ( ) ;
1696+ Debug. Assert ( ReadingPrependedMessagesMRE . IsSet || PendingPrependedResponses > 0 ) ;
1697+ }
1698+ }
1699+
16661700 internal void PerformUserCancellation( )
16671701 {
16681702 var connection = Connection;
16691703 if ( connection is null || connection . ConnectorBindingScope == ConnectorBindingScope . Reader )
16701704 return;
16711705
1672- // There's a subtle race condition where cancellation may be happening just as Break is called. Break takes the connector lock, and
1673- // then ends the user action; this disposes the cancellation token registration, which waits until the cancellation callback
1674- // completes. But the callback needs to take the connector lock below, which led to a deadlock (#4654).
1675- // As a result, Break takes CancelLock, and we abort the cancellation attempt immediately if we can't get it here.
1676- if ( ! Monitor . TryEnter ( CancelLock ) )
1677- return;
1706+ // Take the lock first to make sure there is no concurrent Break.
1707+ // We should be safe to take it as Break only take it to set the state.
1708+ lock ( SyncObj )
1709+ {
1710+ // The connector is dead, exit gracefully.
1711+ if ( ! IsConnected )
1712+ return;
1713+ // The connector is still alive, take the CancelLock before exiting SingleUseLock.
1714+ // If a break will happen after, it's going to wait for the cancellation to complete.
1715+ Monitor. Enter ( CancelLock ) ;
1716+ }
1717+
1718+ // Wait before we've read all responses for the prepended queries
1719+ // as we can't gracefully handle their cancellation.
1720+ // Break makes sure that it's going to be set even if we fail while reading them.
1721+ ReadingPrependedMessagesMRE. Wait ( ) ;
16781722
16791723 try
16801724 {
@@ -1687,28 +1731,18 @@ internal void PerformUserCancellation()
16871731 {
16881732 if ( cancellationTimeout > 0 )
16891733 {
1690- lock ( this )
1691- {
1692- if ( ! IsConnected )
1693- return;
1694- UserTimeout = cancellationTimeout;
1695- ReadBuffer. Timeout = TimeSpan . FromMilliseconds ( cancellationTimeout ) ;
1696- ReadBuffer. Cts . CancelAfter ( cancellationTimeout ) ;
1697- }
1734+ UserTimeout = cancellationTimeout;
1735+ ReadBuffer. Timeout = TimeSpan . FromMilliseconds ( cancellationTimeout ) ;
1736+ ReadBuffer. Cts . CancelAfter ( cancellationTimeout ) ;
16981737 }
16991738
17001739 return ;
17011740 }
17021741 }
17031742
1704- lock ( this )
1705- {
1706- if ( ! IsConnected )
1707- return;
1708- UserTimeout = - 1 ;
1709- ReadBuffer. Timeout = _cancelImmediatelyTimeout ;
1710- ReadBuffer. Cts . Cancel ( ) ;
1711- }
1743+ UserTimeout = - 1 ;
1744+ ReadBuffer . Timeout = _cancelImmediatelyTimeout ;
1745+ ReadBuffer . Cts . Cancel ( ) ;
17121746 }
17131747 finally
17141748 {
@@ -1783,8 +1817,7 @@ void DoCancelRequest(int backendProcessId, int backendSecretKey)
17831817 }
17841818 finally
17851819 {
1786- lock ( this )
1787- FullCleanup( ) ;
1820+ FullCleanup( ) ;
17881821 }
17891822 }
17901823
@@ -1890,7 +1923,7 @@ copyOperation is NpgsqlCopyTextWriter ||
18901923 // very unlikely to block (plus locking would need to be worked out)
18911924 internal void Close( )
18921925 {
1893- lock ( this )
1926+ lock ( SyncObj )
18941927 {
18951928 Log. Trace ( "Closing connector" , Id ) ;
18961929
@@ -1920,8 +1953,9 @@ internal void Close()
19201953 }
19211954
19221955 State = ConnectorState . Closed ;
1923- FullCleanup( ) ;
19241956 }
1957+
1958+ FullCleanup( ) ;
19251959 }
19261960
19271961 internal bool TryRemovePendingEnlistedConnector( Transaction transaction )
@@ -1950,11 +1984,43 @@ internal Exception Break(Exception reason)
19501984 {
19511985 Debug. Assert ( ! IsClosed ) ;
19521986
1953- // See PerformUserCancellation on why we take CancelLock
1954- lock ( CancelLock )
1955- lock ( this )
1987+ Monitor . Enter ( SyncObj ) ;
1988+
1989+ if ( State == ConnectorState . Broken )
19561990 {
1957- if ( State != ConnectorState . Broken )
1991+ // We're already broken.
1992+ // Exit SingleUseLock to unblock other threads (like cancellation).
1993+ Monitor. Exit ( SyncObj ) ;
1994+ // Wait for the break to complete before going forward.
1995+ lock ( CleanupLock ) { }
1996+ return reason;
1997+ }
1998+
1999+ try
2000+ {
2001+ // If we're broken while reading prepended messages
2002+ // the cancellation request might still be waiting on the MRE.
2003+ // Unblock it.
2004+ ReadingPrependedMessagesMRE. Set ( ) ;
2005+
2006+ Log. Error ( "Breaking connector" , reason , Id ) ;
2007+
2008+ // Note that we may be reading and writing from the same connector concurrently, so safely set
2009+ // the original reason for the break before actually closing the socket etc.
2010+ Interlocked. CompareExchange ( ref _breakReason , reason , null ) ;
2011+ State = ConnectorState. Broken ;
2012+ // Take the CleanupLock while in SingleUseLock to make sure concurrent Break doesn't take it first.
2013+ Monitor. Enter ( CleanupLock ) ;
2014+ }
2015+ finally
2016+ {
2017+ // Unblock other threads (like cancellation) to proceed and exit gracefully.
2018+ Monitor. Exit ( SyncObj ) ;
2019+ }
2020+
2021+ try
2022+ {
2023+ lock ( CancelLock )
19582024 {
19592025 // Note we only set the cluster to offline and clear the pool if the connection is being broken (we're in this method),
19602026 // *and* the exception indicates that the PG cluster really is down; the latter includes any IO/timeout issue,
@@ -1968,13 +2034,6 @@ internal Exception Break(Exception reason)
19682034 _connectorSource. Clear ( ) ;
19692035 }
19702036
1971- Log. Error ( "Breaking connector" , reason , Id ) ;
1972-
1973- // Note that we may be reading and writing from the same connector concurrently, so safely set
1974- // the original reason for the break before actually closing the socket etc.
1975- Interlocked. CompareExchange ( ref _breakReason , reason , null ) ;
1976- State = ConnectorState. Broken ;
1977-
19782037 var connection = Connection;
19792038
19802039 FullCleanup( ) ;
@@ -1992,41 +2051,47 @@ internal Exception Break(Exception reason)
19922051 connection. Connector = null ;
19932052 connection. ConnectorBindingScope = ConnectorBindingScope . None ;
19942053 }
2054+
19952055 connection. FullState = ConnectionState . Broken ;
19962056 connection. ReleaseCloseLock ( ) ;
19972057 }
19982058 }
19992059
20002060 return reason;
20012061 }
2062+ finally
2063+ {
2064+ Monitor . Exit ( CleanupLock ) ;
2065+ }
20022066 }
20032067
20042068 void FullCleanup( )
20052069 {
2006- Debug. Assert ( Monitor . IsEntered ( this ) ) ;
2007-
2008- if ( Settings . Multiplexing )
2070+ lock ( CleanupLock )
20092071 {
2010- FlagAsNotWritableForMultiplexing( ) ;
2072+ if ( Settings . Multiplexing )
2073+ {
2074+ FlagAsNotWritableForMultiplexing( ) ;
20112075
2012- // Note that in multiplexing, this could be called from the read loop, while the write loop is
2013- // writing into the channel. To make sure this race condition isn't a problem, the channel currently
2014- // isn't set up with SingleWriter (since at this point it doesn't do anything).
2015- CommandsInFlightWriter! . Complete ( ) ;
2076+ // Note that in multiplexing, this could be called from the read loop, while the write loop is
2077+ // writing into the channel. To make sure this race condition isn't a problem, the channel currently
2078+ // isn't set up with SingleWriter (since at this point it doesn't do anything).
2079+ CommandsInFlightWriter! . Complete ( ) ;
20162080
2017- // The connector's read loop has a continuation to observe and log any exception coming out
2018- // (see Open)
2019- }
2081+ // The connector's read loop has a continuation to observe and log any exception coming out
2082+ // (see Open)
2083+ }
20202084
2021- Log. Trace ( "Cleaning up connector" , Id ) ;
2022- Cleanup( ) ;
2085+ Log. Trace ( "Cleaning up connector" , Id ) ;
2086+ Cleanup( ) ;
20232087
2024- if ( _isKeepAliveEnabled )
2025- {
2026- _userLock! . Dispose ( ) ;
2027- _userLock = null ;
2028- _keepAliveTimer! . Change ( Timeout . Infinite , Timeout . Infinite ) ;
2029- _keepAliveTimer. Dispose ( ) ;
2088+ if ( _isKeepAliveEnabled )
2089+ {
2090+ _userLock! . Dispose ( ) ;
2091+ _userLock = null ;
2092+ _keepAliveTimer! . Change ( Timeout . Infinite , Timeout . Infinite ) ;
2093+ _keepAliveTimer. Dispose ( ) ;
2094+ }
20302095 }
20312096 }
20322097
@@ -2270,7 +2335,7 @@ internal UserAction StartUserAction(
22702335 if ( ! _isKeepAliveEnabled )
22712336 return DoStartUserAction( ) ;
22722337
2273- lock ( this )
2338+ lock ( SyncObj )
22742339 {
22752340 if ( ! IsConnected )
22762341 {
@@ -2353,7 +2418,7 @@ internal void EndUserAction()
23532418
23542419 if ( _isKeepAliveEnabled )
23552420 {
2356- lock ( this )
2421+ lock ( SyncObj )
23572422 {
23582423 if ( IsReady || ! IsConnected )
23592424 return;
@@ -2399,7 +2464,7 @@ void PerformKeepAlive(object? state)
23992464
24002465 // SemaphoreSlim.Dispose() isn't thread-safe - it may be in progress so we shouldn't try to wait on it;
24012466 // we need a standard lock to protect it.
2402- if ( ! Monitor . TryEnter ( this ) )
2467+ if ( ! Monitor . TryEnter ( SyncObj ) )
24032468 return;
24042469
24052470 try
@@ -2432,7 +2497,7 @@ void PerformKeepAlive(object? state)
24322497 }
24332498 finally
24342499 {
2435- Monitor. Exit ( this ) ;
2500+ Monitor. Exit ( SyncObj ) ;
24362501 }
24372502 }
24382503#pragma warning restore CA1801 // Review unused parameters
0 commit comments