// mqtt.tsx
import React, { createContext, useContext } from 'react';
import mqtt from 'mqtt';
import useMqttConnection, { MqttError, MqttStatus } from 'hooks/useMqttConnection';
import { useAppSelector } from 'store';
interface MqttContextValue {
mqttClient: mqtt.MqttClient | null;
mqttStatus: MqttStatus;
mqttError: MqttError | null;
subscribeToTopic: (topic: string, ops?: mqtt.IClientSubscribeOptions) => void;
}
// @ts-ignore
const MqttContext = createContext<MqttContextValue>({ mqttClient: {} });
export const MqttProvider = ({ children }: React.PropsWithChildren) => {
const isAuthenticated = useAppSelector(state => state.auth.authenticated);
const { mqttClient, mqttStatus, mqttError, setMqttError, setMqttStatus } =
useMqttConnection(isAuthenticated);
const subscribeToTopic = (topic: string, ops: mqtt.IClientSubscribeOptions = { qos: 1 }) => {
if (!mqttClient) return;
mqttClient.subscribe(topic, ops, error => {
if (error) {
setMqttStatus('Error');
setMqttError({
type: 'MqttTopic',
msg: error.message,
});
}
});
};
return (
<MqttContext.Provider
value={{
mqttClient,
mqttStatus,
mqttError,
subscribeToTopic,
}}
>
{children}
</MqttContext.Provider>
);
};
export const useMqtt = () => useContext(MqttContext);
// useMqttConnection.ts
import { useState, useEffect, useRef, useCallback } from 'react';
import mqtt, { Timer } from 'mqtt';
import BackgroundTimer from 'react-native-background-timer';
import { instance } from 'api';
export type MqttStatus = 'Connected' | 'Disconnected' | 'Offline' | 'Reconnecting' | 'Error';
export type MqttError = { type: string; msg: string };
interface WssDetails {
signedUrl: string;
clientId: string;
validitySeconds: number;
}
// dont directly assign methods to timer object otherwise this throws: Cannot set property 'NaN' of undefined
const timer: Timer = {
clear: id => BackgroundTimer.clearInterval(id),
// @ts-expect-error
set: (func, time) => BackgroundTimer.setInterval(func, time),
};
const getWssDetails = () => instance.get<WssDetails>('/app/wss');
function useMqttConnection(isAuthenticated: boolean) {
const [mqttStatus, setMqttStatus] = useState<MqttStatus>('Disconnected');
const [mqttError, setMqttError] = useState<MqttError | null>(null);
const [mqttClient, setMqttClient] = useState<mqtt.MqttClient | null>(null);
const [wssDetails, setWssDetails] = useState<WssDetails | null>(null);
const wssDetailsRef = useRef<WssDetails | null>(wssDetails);
const isFetchingWssDetails = useRef(false);
wssDetailsRef.current = wssDetails;
const hasWssDetails = !!wssDetailsRef.current;
const doMqttConnection = isAuthenticated;
const fetchWssDetails = useCallback(() => {
isFetchingWssDetails.current = true;
return getWssDetails()
.then(r => {
setWssDetails(r.data);
})
.catch(() => {
// in case we could not fetch initial details, retry every 25 seconds
setTimeout(() => {
if (!wssDetailsRef.current) {
fetchWssDetails();
}
}, 25000);
})
.finally(() => {
isFetchingWssDetails.current = false;
});
}, []);
useEffect(() => {
if (!doMqttConnection) return;
fetchWssDetails();
}, [doMqttConnection, fetchWssDetails]);
useEffect(() => {
if (!doMqttConnection || !hasWssDetails) return;
const transformWsUrl = (
url: string,
options: mqtt.IClientOptions,
currentClient: mqtt.MqttClient,
) => {
if (!isFetchingWssDetails.current) {
fetchWssDetails();
}
currentClient.options.clientId = wssDetailsRef.current!.clientId;
return wssDetailsRef.current!.signedUrl;
};
const client = mqtt
.connect(wssDetailsRef.current!.signedUrl, {
clientId: wssDetailsRef.current!.clientId,
reconnectPeriod: 5000,
queueQoSZero: true,
resubscribe: true,
clean: true,
keepalive: 60,
protocolVersion: 5,
properties: {
sessionExpiryInterval: 600,
},
timerVariant: timer,
transformWsUrl,
})
.on('connect', () => {
setMqttStatus('Connected');
})
.on('error', error => {
setMqttError({ type: 'MqttGeneral', msg: error.message });
})
.on('disconnect', () => {
setMqttStatus('Disconnected');
})
.on('offline', () => {
setMqttStatus('Offline');
})
.on('reconnect', () => {
setMqttStatus('Reconnecting');
})
.on('close', () => {
setMqttStatus('Disconnected');
});
setMqttClient(client);
return () => {
client.end();
};
}, [doMqttConnection, fetchWssDetails, hasWssDetails]);
return {
mqttClient,
mqttStatus,
mqttError,
setMqttStatus,
setMqttError,
};
}
export default useMqttConnection;
MQTTjs Version
5.8.0
Broker
AWS
Environment
React Native (engine: Hermes)
Description
After releasing a new version of out react native app with the mqtt feature we started receiving crashes in Firebase crashlytics.

This is the only issue I found related to this problem
nodejs/readable-stream#207
Code
Minimal Reproduction
Unfortunately we cannot reproduce this locally. We are only recording crashes in Firebase Crashlytics
Debug logs