Skip to content

Commit 05a32e6

Browse files
committed
concurrent read messages
1 parent f4d0096 commit 05a32e6

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

src/Simplex/Messaging/Server.hs

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ import System.Environment (lookupEnv)
126126
import System.Exit (exitFailure, exitSuccess)
127127
import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode)
128128
import System.Mem.Weak (deRefWeak)
129-
import UnliftIO (timeout)
129+
import UnliftIO (mapConcurrently, timeout)
130130
import UnliftIO.Concurrent
131131
import UnliftIO.Directory (doesFileExist, renameFile)
132132
import UnliftIO.Exception
@@ -2114,10 +2114,32 @@ exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath ->
21142114
exportMessages tty st f drainMsgs = do
21152115
logNote $ "saving messages to file " <> T.pack f
21162116
run $ case st of
2117-
StoreMemory ms -> exportMessages_ ms $ getMsgs ms
2118-
StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms
2117+
StoreMemory ms -> exportMessages_ getMsgs ms
2118+
StoreJournal ms -> exportMessages_' getJournalMsgs ms
21192119
where
2120-
exportMessages_ ms get = fmap (\(Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get
2120+
exportMessages_ get ms h =
2121+
fmap (\(Sum n) -> n) $ unsafeWithAllMsgQueues tty ms $ \q ->
2122+
get ms q >>= saveQueueMsgs h q
2123+
exportMessages_' get ms h = do
2124+
qv <- newIORef []
2125+
Sum n <- unsafeWithAllMsgQueues tty ms $ \q -> do
2126+
qs <- (q :) <$> readIORef qv
2127+
if length qs < 8
2128+
then writeIORef qv qs $> Sum 0
2129+
else do
2130+
writeIORef qv []
2131+
saveQueues qs
2132+
qs <- readIORef qv
2133+
if null qs
2134+
then pure n
2135+
else do
2136+
Sum n' <- saveQueues qs
2137+
pure $ n + n'
2138+
where
2139+
saveQueues qs = do
2140+
let qs' = reverse qs
2141+
qMsgs <- zip qs' <$> mapConcurrently (get ms) qs'
2142+
mconcat <$> mapM (uncurry $ saveQueueMsgs h) qMsgs
21212143
run :: (Handle -> IO Int) -> IO ()
21222144
run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \case
21232145
Right n -> logNote $ "messages saved: " <> tshow n
@@ -2130,9 +2152,8 @@ exportMessages tty st f drainMsgs = do
21302152
Nothing -> getJournalQueueMessages ms q
21312153
getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message]
21322154
getMsgs ms q = unsafeRunStore q "saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
2133-
saveQueueMsgs :: (StoreQueue s -> IO [Message]) -> Handle -> StoreQueue s -> IO (Sum Int)
2134-
saveQueueMsgs get h q = do
2135-
msgs <- get q
2155+
saveQueueMsgs :: Handle -> StoreQueue s -> [Message] -> IO (Sum Int)
2156+
saveQueueMsgs h q msgs = do
21362157
unless (null msgs) $ BLD.hPutBuilder h $ encodeMessages (recipientId q) msgs
21372158
pure $ Sum $ length msgs
21382159
encodeMessages rId = mconcat . map (\msg -> BLD.byteString (strEncode $ MLRv3 rId msg) <> BLD.char8 '\n')

tests/CoreTests/MsgStoreTests.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ msgStoreTests = do
5454
around (withMsgStore testSMTStoreConfig) $ describe "STM message store" someMsgStoreTests
5555
around (withMsgStore $ testJournalStoreCfg MQStoreCfg) $ describe "Journal message store" $ do
5656
someMsgStoreTests
57-
it "should export and import journal store" testExportImportStore
57+
fit "should export and import journal store" testExportImportStore
5858
describe "queue state" $ do
5959
it "should restore queue state from the last line" testQueueState
6060
it "should recover when message is written and state is not" testMessageState

0 commit comments

Comments
 (0)