Dark Mode

WebSocket Connection & Reconnection with RxJS & Redux Observables

redux-observable-websocket

Connecting web socket

First, we need to create a WebSocket subject, while creating we will provide the WebSocket server URL, and two subjects – one for openObserver and other for closeObserver.

let webSocketSubject: WebSocketSubject<{}>;
let onOpenSubject = new Subject();
let onCloseSubject = new Subject();

// Create the websocket subject
const createSocket = websocketUrl => {
  onOpenSubject = new Subject();
  onCloseSubject = new Subject();
  webSocketSubject = webSocket({
    url: websocketUrl,
    openObserver: onOpenSubject,
    closeObserver: onCloseSubject
  });
  return webSocketSubject;
};

// Connect to web socket
const connectEpic = (action$, state$: StateObservable<IState>) =>
  action$
    .ofType(WebsocketActionTypes.CONNECT)
    .switchMap((action: IConnectWebsocketAction) =>
      createSocket(action.payload.websocketUrl)
    );

Receiving Messages

webSocketSubject will emit whenever it receives a new message from the server. We can simply map the emitted data to our Redux actions and update the store based on the data.

We can update our connectEpic to do it.

const connectEpic = (action$, state$: StateObservable<IState>) =>
  action$
    .ofType(WebsocketActionTypes.CONNECT)
    .switchMap((action: IConnectWebsocketAction) =>
      connectSocket(state$.value.eventConfig.webSocketURL)
        .map(data => receiveMessageFromWebSocket(data)) // <-- Map and fire receive action
    );

Sending messages

Sending message is very simple, we just have to call the next method on the webSocketSubject.

const sendMessageEpic = action$ =>
  action$.ofType(WebsocketActionTypes.SEND_MESSAGE).map(action => {
    webSocketSubject.next(action.payload);
    return sentMessage();
  });

Disconnection

You might want to close the connection when your component unmounts, to do that, we can call the complete method on the webSocketSubject, and that will close the underlying WebSocket connection.

const disconnectEpic = (action$: ActionsObservable<PlaylistAction>) =>
  action$.ofType(WebsocketActionTypes.DISCONNECT).map(() => {
    onCloseSubject.complete();
    webSocketSubject.complete();
    return disconnected();
  });

We are passing a boolean flag to tell whether we should attempt to reconnect or not, we will be using it later.

Detecting Successful Connection

OpenObserver will emit when the connection is successfully established, we can listen to it and fire an action to update our Redux store.

// Detect successfull connection by listening to onOpenSubject
const connectedEpic = action$ =>
  action$.ofType(WebsocketActionTypes.CONNECT).switchMap(() =>
    onOpenSubject.map(() => {
      return connected();
    })
  );

Detecting Connection Failure

There are two types of connection failures that we need to handle.

  1. Unable to establish the connection.
  2. Connection lost after establishing due to connectivity issues, WebSocket server crashed, etc.

We can use the catch operator on the webSocketSubject to detect the 1st case and for the 2nd case, we can listen to the closeObserver subject that we provided while creating the WebSocket subject.

We can update our connectEpic and connectedEpic like below to detect failures.

const connectEpic = (action$, state$: StateObservable<IState>) =>
  action$
    .ofType(WebsocketActionTypes.CONNECT)
    .switchMap((action: IConnectWebsocketAction) =>
      connectSocket(state$.value.config.webSocketURL)
        .map(data => receiveMessageFromWebSocket(data))
        .catch(e => of(disconnect(true))) // <-- Handle 1st case by using catch operator
    );

const connectedEpic = action$ =>
  action$.ofType(WebsocketActionTypes.CONNECT).switchMap(() =>
    onOpenSubject.map(() => {
      onCloseSubject.map(() => disconnect(true)); // <-- Handle 2nd case by listening to onCloseSubject
      return connected();
    })
  );

Retrying connection

We need to reconnect irrespective of the reason for failure. To do that, we will recall the connect action whenever we get disconnected. We can optionally delay the reconnect attempt using the delay operator like so:

We will update our disconnectEpic to perform the reconnect based on the boolean flag that we passed.

const disconnectEpic = (action$, state$: StateObservable<IState>) =>
  action$
    .ofType(WebsocketActionTypes.DISCONNECT)
    .mergeMap((action: IDisconnectFromWebsocketAction) => {
      if (action.payload.retry) {
        return of(connect(state$.value.config.webSocketURL)) // <-- reconnect
          .delay(5000)
          .startWith(disconnected());
      }
      onCloseSubject.complete();
      webSocketSubject.complete();
      return [disconnected()];
    });

Full Implementation

import { combineEpics, StateObservable } from 'redux-observable';
import { of } from 'rxjs';
import { Subject } from 'rxjs/Subject';
import { webSocket, WebSocketSubject } from 'rxjs/webSocket';
import {
  connected,
  connect,
  disconnected,
  disconnect,
  IConnectWebsocketAction,
  receiveMessageFromWebSocket,
  sentMessage,
  WebsocketActionTypes,
  IDisconnectFromWebsocketAction
} from '../actions/websocketActions';
import { IState } from '../reducers';

let webSocketSubject: WebSocketSubject<{}>;
let onOpenSubject = new Subject();
let onCloseSubject = new Subject();

const connectSocket = websocketUrl => {
  onOpenSubject = new Subject();
  onCloseSubject = new Subject();
  webSocketSubject = webSocket({
    url: websocketUrl,
    openObserver: onOpenSubject,
    closeObserver: onCloseSubject
  });
  return webSocketSubject;
};

const connectEpic = (action$, state$: StateObservable<IState>) =>
  action$
    .ofType(WebsocketActionTypes.CONNECT)
    .switchMap((action: IConnectWebsocketAction) =>
      connectSocket(state$.value.config.webSocketURL)
        .map(data => receiveMessageFromWebSocket(data))
        .catch(e => of(disconnect(true)))
    );

const connectedEpic = action$ =>
  action$.ofType(WebsocketActionTypes.CONNECT).switchMap(() =>
    onOpenSubject.map(() => {
      onCloseSubject.map(() => disconnect(true));
      return connected();
    })
  );

const sendMessageEpic = action$ =>
  action$.ofType(WebsocketActionTypes.SEND_MESSAGE).map(action => {
    webSocketSubject.next(action.payload);
    return sentMessage();
  });

const disconnectEpic = (action$, state$: StateObservable<IState>) =>
  action$
    .ofType(WebsocketActionTypes.DISCONNECT)
    .mergeMap((action: IDisconnectFromWebsocketAction) => {
      if (action.payload.retry) {
        return of(connect(state$.value.config.webSocketURL))
          .delay(5000)
          .startWith(disconnected());
      }
      onCloseSubject.complete();
      webSocketSubject.complete();
      return [disconnected()];
    });

export default combineEpics(
  connectEpic,
  connectedEpic,
  sendMessageEpic,
  disconnectEpic
);

If you have a better solution especially ones that use retryWhen and takeUntil, please do share it.

See responses (3)