@@ -13,7 +13,7 @@ import Bilge
1313import Bilge.Assert
1414import Control.Arrow ((&&&) )
1515import Control.Concurrent.Async (Async , async , wait , forConcurrently_ )
16- import Control.Lens ((.~) , (^.) , (^?) , view , (<&>) )
16+ import Control.Lens ((.~) , (^.) , (^?) , view , (<&>) , _2 , (%~) )
1717import Control.Retry (retrying , constantDelay , limitRetries )
1818import Data.Aeson hiding (json )
1919import Data.Aeson.Lens
@@ -112,6 +112,10 @@ tests s = testGroup "Gundeck integration tests" [
112112 [ test s " register a push token" $ testRegisterPushToken
113113 , test s " unregister a push token" $ testUnregisterPushToken
114114 ],
115+ testGroup " Websocket pingpong"
116+ [ test s " pings produce pongs" $ testPingPong
117+ , test s " non-pings are ignored" $ testNoPingNoPong
118+ ],
115119 -- TODO: The following tests require (at the moment), the usage real AWS
116120 -- services so they are kept in a separate group to simplify testing
117121 testGroup " RealAWS"
@@ -711,6 +715,28 @@ testUnregisterPushToken g _ b _ = do
711715 void $ retryWhileN 12 (not . null ) (listPushTokens uid g)
712716 unregisterPushToken uid (tkn^. token) g !!! const 404 === statusCode
713717
718+ testPingPong :: TestSignature ()
719+ testPingPong gu ca _ _ = do
720+ uid :: UserId <- randomId
721+ connid :: ConnId <- randomConnId
722+ [(_, [(chread, chwrite)] :: [(TChan ByteString , TChan ByteString )])]
723+ <- connectUsersAndDevicesWithSendingClients gu ca [(uid, [connid])]
724+ liftIO $ do
725+ atomically $ writeTChan chwrite " ping"
726+ msg <- waitForMessage chread
727+ assertBool " no pong" $ msg == Just " pong"
728+
729+ testNoPingNoPong :: TestSignature ()
730+ testNoPingNoPong gu ca _ _ = do
731+ uid :: UserId <- randomId
732+ connid :: ConnId <- randomConnId
733+ [(_, [(chread, chwrite)] :: [(TChan ByteString , TChan ByteString )])]
734+ <- connectUsersAndDevicesWithSendingClients gu ca [(uid, [connid])]
735+ liftIO $ do
736+ atomically $ writeTChan chwrite " Wire is so much nicer with internet!"
737+ msg <- waitForMessage chread
738+ assertBool " unexpected response on non-ping" $ isNothing msg
739+
714740testSharePushToken :: TestSignature ()
715741testSharePushToken g _ b _ = do
716742 gcmTok <- Token . T. decodeUtf8 . toByteString' <$> randomId
@@ -808,13 +834,25 @@ connectUser gu ca uid con = do
808834 [(_, [ch])] <- connectUsersAndDevices gu ca [(uid, [con])]
809835 return ch
810836
811- connectUsersAndDevices :: HasCallStack => Gundeck -> Cannon -> [(UserId , [ConnId ])] -> Http [(UserId , [TChan ByteString ])]
812- connectUsersAndDevices gu ca uidsAndConnIds = do
837+ connectUsersAndDevices
838+ :: HasCallStack
839+ => Gundeck -> Cannon -> [(UserId , [ConnId ])]
840+ -> Http [(UserId , [TChan ByteString ])]
841+ connectUsersAndDevices gu ca uidsAndConnIds =
842+ strip <$> connectUsersAndDevicesWithSendingClients gu ca uidsAndConnIds
843+ where strip = fmap (_2 %~ fmap fst )
844+
845+ connectUsersAndDevicesWithSendingClients
846+ :: HasCallStack
847+ => Gundeck -> Cannon -> [(UserId , [ConnId ])]
848+ -> Http [(UserId , [(TChan ByteString , TChan ByteString )])]
849+ connectUsersAndDevicesWithSendingClients gu ca uidsAndConnIds = do
813850 chs <- forM uidsAndConnIds $ \ (uid, conns) -> (uid,) <$> do
814851 forM conns $ \ conn -> do
815- ch <- liftIO $ atomically newTChan
816- _ <- wsRun ca uid conn (wsReader ch)
817- pure ch
852+ chread <- liftIO $ atomically newTChan
853+ chwrite <- liftIO $ atomically newTChan
854+ _ <- wsRun ca uid conn (wsReaderWriter chread chwrite)
855+ pure (chread, chwrite)
818856 (\ (uid, conns) -> wsAssertPresences gu uid (length conns)) `mapM_` uidsAndConnIds
819857 pure chs
820858
@@ -841,8 +879,13 @@ wsAssertPresences gu uid numPres = do
841879wsCloser :: MVar () -> WS. ClientApp ()
842880wsCloser m conn = takeMVar m >> WS. sendClose conn C. empty >> putMVar m ()
843881
844- wsReader :: TChan ByteString -> WS. ClientApp ()
845- wsReader ch conn = forever $ WS. receiveData conn >>= atomically . writeTChan ch
882+ wsReaderWriter :: TChan ByteString -> TChan ByteString -> WS. ClientApp ()
883+ wsReaderWriter chread chwrite conn = do
884+ h1 <- async . forever $ do
885+ WS. receiveData conn >>= atomically . writeTChan chread
886+ h2 <- async . forever $ do
887+ WS. sendTextData conn =<< atomically (readTChan chwrite)
888+ wait h1 >> wait h2 -- one way of saying "don't die!"
846889
847890retryWhile :: (MonadIO m ) => (a -> Bool ) -> m a -> m a
848891retryWhile = retryWhileN 10
0 commit comments