WebSocket Connection & Reconnection with RxJS & Redux Observables
Connecting web socket
First, we need to create a WebSocket subject, while creating we will provide the WebSocket server URL, and two subjects – one for openObserver
and other for closeObserver
.
let webSocketSubject: WebSocketSubject<{}>;
let onOpenSubject = new Subject();
let onCloseSubject = new Subject();
// Create the websocket subject
const createSocket = websocketUrl => {
onOpenSubject = new Subject();
onCloseSubject = new Subject();
webSocketSubject = webSocket({
url: websocketUrl,
openObserver: onOpenSubject,
closeObserver: onCloseSubject
});
return webSocketSubject;
};
// Connect to web socket
const connectEpic = (action$, state$: StateObservable<IState>) =>
action$
.ofType(WebsocketActionTypes.CONNECT)
.switchMap((action: IConnectWebsocketAction) =>
createSocket(action.payload.websocketUrl)
);
Receiving Messages
webSocketSubject
will emit whenever it receives a new message from the server. We can simply map the emitted data to our Redux actions and update the store based on the data.
We can update our connectEpic
to do it.
const connectEpic = (action$, state$: StateObservable<IState>) =>
action$
.ofType(WebsocketActionTypes.CONNECT)
.switchMap((action: IConnectWebsocketAction) =>
connectSocket(state$.value.eventConfig.webSocketURL)
.map(data => receiveMessageFromWebSocket(data)) // <-- Map and fire receive action
);
Sending messages
Sending message is very simple, we just have to call the next
method on the webSocketSubject
.
const sendMessageEpic = action$ =>
action$.ofType(WebsocketActionTypes.SEND_MESSAGE).map(action => {
webSocketSubject.next(action.payload);
return sentMessage();
});
Disconnection
You might want to close the connection when your component unmounts, to do that, we can call the complete
method on the webSocketSubject
, and that will close the underlying WebSocket connection.
const disconnectEpic = (action$: ActionsObservable<PlaylistAction>) =>
action$.ofType(WebsocketActionTypes.DISCONNECT).map(() => {
onCloseSubject.complete();
webSocketSubject.complete();
return disconnected();
});
We are passing a boolean flag to tell whether we should attempt to reconnect or not, we will be using it later.
Detecting Successful Connection
OpenObserver
will emit when the connection is successfully established, we can listen to it and fire an action to update our Redux store.
// Detect successfull connection by listening to onOpenSubject
const connectedEpic = action$ =>
action$.ofType(WebsocketActionTypes.CONNECT).switchMap(() =>
onOpenSubject.map(() => {
return connected();
})
);
Detecting Connection Failure
There are two types of connection failures that we need to handle.
- Unable to establish the connection.
- Connection lost after establishing due to connectivity issues, WebSocket server crashed, etc.
We can use the catch
operator on the webSocketSubject
to detect the 1st case and for the 2nd case, we can listen to the closeObserver
subject that we provided while creating the WebSocket subject.
We can update our connectEpic
and connectedEpic
like below to detect failures.
const connectEpic = (action$, state$: StateObservable<IState>) =>
action$
.ofType(WebsocketActionTypes.CONNECT)
.switchMap((action: IConnectWebsocketAction) =>
connectSocket(state$.value.config.webSocketURL)
.map(data => receiveMessageFromWebSocket(data))
.catch(e => of(disconnect(true))) // <-- Handle 1st case by using catch operator
);
const connectedEpic = action$ =>
action$.ofType(WebsocketActionTypes.CONNECT).switchMap(() =>
onOpenSubject.map(() => {
onCloseSubject.map(() => disconnect(true)); // <-- Handle 2nd case by listening to onCloseSubject
return connected();
})
);
Retrying connection
We need to reconnect irrespective of the reason for failure. To do that, we will recall the connect
action whenever we get disconnected. We can optionally delay the reconnect attempt using the delay
operator like so:
We will update our disconnectEpic
to perform the reconnect based on the boolean flag that we passed.
const disconnectEpic = (action$, state$: StateObservable<IState>) =>
action$
.ofType(WebsocketActionTypes.DISCONNECT)
.mergeMap((action: IDisconnectFromWebsocketAction) => {
if (action.payload.retry) {
return of(connect(state$.value.config.webSocketURL)) // <-- reconnect
.delay(5000)
.startWith(disconnected());
}
onCloseSubject.complete();
webSocketSubject.complete();
return [disconnected()];
});
Full Implementation
import { combineEpics, StateObservable } from 'redux-observable';
import { of } from 'rxjs';
import { Subject } from 'rxjs/Subject';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
connected,
connect,
disconnected,
disconnect,
IConnectWebsocketAction,
receiveMessageFromWebSocket,
sentMessage,
WebsocketActionTypes,
IDisconnectFromWebsocketAction
} from '../actions/websocketActions';
import { IState } from '../reducers';
let webSocketSubject: WebSocketSubject<{}>;
let onOpenSubject = new Subject();
let onCloseSubject = new Subject();
const connectSocket = websocketUrl => {
onOpenSubject = new Subject();
onCloseSubject = new Subject();
webSocketSubject = webSocket({
url: websocketUrl,
openObserver: onOpenSubject,
closeObserver: onCloseSubject
});
return webSocketSubject;
};
const connectEpic = (action$, state$: StateObservable<IState>) =>
action$
.ofType(WebsocketActionTypes.CONNECT)
.switchMap((action: IConnectWebsocketAction) =>
connectSocket(state$.value.config.webSocketURL)
.map(data => receiveMessageFromWebSocket(data))
.catch(e => of(disconnect(true)))
);
const connectedEpic = action$ =>
action$.ofType(WebsocketActionTypes.CONNECT).switchMap(() =>
onOpenSubject.map(() => {
onCloseSubject.map(() => disconnect(true));
return connected();
})
);
const sendMessageEpic = action$ =>
action$.ofType(WebsocketActionTypes.SEND_MESSAGE).map(action => {
webSocketSubject.next(action.payload);
return sentMessage();
});
const disconnectEpic = (action$, state$: StateObservable<IState>) =>
action$
.ofType(WebsocketActionTypes.DISCONNECT)
.mergeMap((action: IDisconnectFromWebsocketAction) => {
if (action.payload.retry) {
return of(connect(state$.value.config.webSocketURL))
.delay(5000)
.startWith(disconnected());
}
onCloseSubject.complete();
webSocketSubject.complete();
return [disconnected()];
});
export default combineEpics(
connectEpic,
connectedEpic,
sendMessageEpic,
disconnectEpic
);
If you have a better solution especially ones that use retryWhen
and takeUntil
, please do share it.