Skip to content

Conversation

@epoberezkin
Copy link
Member

No description provided.

@epoberezkin epoberezkin requested a review from spaced4ndy as a code owner May 22, 2025 09:02
atomicModifyIORef'_ (qSubEndB stats) (+ (len `div` 255 + 1)) -- up to 255 ENDs or DELDs in the batch
enqueueEvts evts (AClient _ _ Client {connected, sndQ}) =
whenM (readTVarIO connected) $
nonBlockingWriteTBQueue sndQ ts >> updateEndStats
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the change here is that updateEndStats may be called before transmissions are written to the queue

TM.lookupDelete qId ss >>= mapM readTVar
-- This case catches Just Nothing - it cannot happen here.
-- Nothing is there only before client thread is started.
_ -> TM.lookup qId ss >>= mapM readTVar -- do not insert client if it is already disconnected, but send END to any other client
Copy link
Member Author

@epoberezkin epoberezkin May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2a. previously subscribed client was not removed here, even though END was sent to it - this seems to be incorrect

removeWhenNoSubs c
lookupDeleteSubscribedClient qId queueSubscribers
-- do not insert client if it is already disconnected, but send END to any other client
updateSubDisconnected = lookupDeleteSubscribedClient qId queueSubscribers
Copy link
Member Author

@epoberezkin epoberezkin May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2b. now another subscribed client is removed if the one that subscribed got disconnected

updateSubConnected c
| subscribed = do
modifyTVar' subClients $ IS.insert clntId -- add client to server's subscribed cients
upsertSubscribedClient qId c queueSubscribers
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upsertSubscribedClient won't do swap now in case the client is the same and would return Nothing, but it doesn't change as we check that the client is different in clientToBeNotified anyway.

-- Client lookup by ID is in the same STM transaction.
-- In case client disconnects during the transaction,
-- it will be re-evaluated, and the client won't be stored as subscribed.
(readTVar cls >>= updateSub . IM.lookup clntId)
Copy link
Member Author

@epoberezkin epoberezkin May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1a. previously the client var was read inside STM transaction creating contention for the map of all connected clients.

-- In case client disconnects during the transaction (its `connected` property is read),
-- the transaction will still be re-evaluated, and the client won't be stored as subscribed.
sub@(_, clntId, _) <- atomically $ readTQueue subQ
c_ <- getServerClient clntId srv
Copy link
Member Author

@epoberezkin epoberezkin May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1b. now the client var is read without STM transaction reducing contention

threadDelay ntfInt
readTVarIO ntfSubClients >>= mapM_ (deliverNtfs ns stats)
cIds <- IS.toList <$> readTVarIO (subClients ntfSubscribers)
forM_ cIds $ \cId -> getServerClient cId srv >>= mapM_ (deliverNtfs ns stats)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the cost of not storing subscribed clients in the Map is extra client lookup here (getServerClient), but it does not use STM, and it's in a single thread that delivers notifications.

-- in case no client is subscribed.
whenM (TM.memberIO rId subscribers) $
atomically deliverToSub >>= mapM_ forkDeliver
getSubscribedClient rId (queueSubscribers subscribers)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change takes client lookup outside STM, reducing contention. This is compensated by storing (Maybe AClient) in TVar so in case during delivery the client disconnects and reconnects quickly it "in theory" may re-evaluate delivery twice, here or in a forked thread later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is now changed that while the TVar stills stores Maybe AClient to track disconnections in the same var, the next client connected would still be in another var, as it's removed from map on disconnection/unsubscription.

Nothing -> Nothing <$ TM.insertM entId (newTVar $ Just ac) cs
Just cv ->
readTVar cv >>= \case
Just c' | sameClientId c c' -> pure Nothing
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see comment in serverThread - this check avoids extra writeTVar if client is the same

-- as long as the client is read in the same transaction -
-- it prevents removing the next subscribed client and also avoids STM contention for the Map.
TM.lookupIO entId cs >>=
mapM_ (\c' -> atomically $ whenM (maybe False (sameClientId c) <$> readTVar c') $ writeTVar c' Nothing)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, the subscription was removed from Map.

This change reduces STM contention and makes subscriptions more "continuous" for intermittently connected clients.

@epoberezkin epoberezkin force-pushed the ep/optimize-server branch from 3530aaa to 32da2f7 Compare May 22, 2025 15:00
@epoberezkin epoberezkin force-pushed the ep/optimize-server branch from 9baf432 to ac7ad15 Compare May 22, 2025 20:34
@epoberezkin epoberezkin merged commit af9ca59 into master May 23, 2025
4 checks passed
@epoberezkin epoberezkin deleted the ep/optimize-server branch May 23, 2025 11:52
@epoberezkin epoberezkin restored the ep/optimize-server branch May 23, 2025 11:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant