@@ -126,7 +126,7 @@ import System.Environment (lookupEnv)
126126import System.Exit (exitFailure , exitSuccess )
127127import System.IO (hPrint , hPutStrLn , hSetNewlineMode , universalNewlineMode )
128128import System.Mem.Weak (deRefWeak )
129- import UnliftIO (timeout )
129+ import UnliftIO (mapConcurrently , timeout )
130130import UnliftIO.Concurrent
131131import UnliftIO.Directory (doesFileExist , renameFile )
132132import UnliftIO.Exception
@@ -2114,10 +2114,32 @@ exportMessages :: forall s. MsgStoreClass s => Bool -> MsgStore s -> FilePath ->
21142114exportMessages tty st f drainMsgs = do
21152115 logNote $ " saving messages to file " <> T. pack f
21162116 run $ case st of
2117- StoreMemory ms -> exportMessages_ ms $ getMsgs ms
2118- StoreJournal ms -> exportMessages_ ms $ getJournalMsgs ms
2117+ StoreMemory ms -> exportMessages_ getMsgs ms
2118+ StoreJournal ms -> exportMessages_' getJournalMsgs ms
21192119 where
2120- exportMessages_ ms get = fmap (\ (Sum n) -> n) . unsafeWithAllMsgQueues tty ms . saveQueueMsgs get
2120+ exportMessages_ get ms h =
2121+ fmap (\ (Sum n) -> n) $ unsafeWithAllMsgQueues tty ms $ \ q ->
2122+ get ms q >>= saveQueueMsgs h q
2123+ exportMessages_' get ms h = do
2124+ qv <- newIORef []
2125+ Sum n <- unsafeWithAllMsgQueues tty ms $ \ q -> do
2126+ qs <- (q : ) <$> readIORef qv
2127+ if length qs < 8
2128+ then writeIORef qv qs $> Sum 0
2129+ else do
2130+ writeIORef qv []
2131+ saveQueues qs
2132+ qs <- readIORef qv
2133+ if null qs
2134+ then pure n
2135+ else do
2136+ Sum n' <- saveQueues qs
2137+ pure $ n + n'
2138+ where
2139+ saveQueues qs = do
2140+ let qs' = reverse qs
2141+ qMsgs <- zip qs' <$> mapConcurrently (get ms) qs'
2142+ mconcat <$> mapM (uncurry $ saveQueueMsgs h) qMsgs
21212143 run :: (Handle -> IO Int ) -> IO ()
21222144 run a = liftIO $ withFile f WriteMode $ tryAny . a >=> \ case
21232145 Right n -> logNote $ " messages saved: " <> tshow n
@@ -2130,9 +2152,8 @@ exportMessages tty st f drainMsgs = do
21302152 Nothing -> getJournalQueueMessages ms q
21312153 getMsgs :: MsgStoreClass s' => s' -> StoreQueue s' -> IO [Message ]
21322154 getMsgs ms q = unsafeRunStore q " saveQueueMsgs" $ getQueueMessages_ drainMsgs q =<< getMsgQueue ms q False
2133- saveQueueMsgs :: (StoreQueue s -> IO [Message ]) -> Handle -> StoreQueue s -> IO (Sum Int )
2134- saveQueueMsgs get h q = do
2135- msgs <- get q
2155+ saveQueueMsgs :: Handle -> StoreQueue s -> [Message ] -> IO (Sum Int )
2156+ saveQueueMsgs h q msgs = do
21362157 unless (null msgs) $ BLD. hPutBuilder h $ encodeMessages (recipientId q) msgs
21372158 pure $ Sum $ length msgs
21382159 encodeMessages rId = mconcat . map (\ msg -> BLD. byteString (strEncode $ MLRv3 rId msg) <> BLD. char8 ' \n ' )
0 commit comments