WebSocketSubject

Subject Variants

WebSocket

The WebSocket protocol enables a single connection with full-duplex communication. WebSocket connections are ideal for streaming data in real-time.

Some example use cases for a WebSocket may include:

  • Chat application
  • Financial application
  • News application
  • Sports game application
  • Game application

At this point you may be thinking:

WebSockets + RxJs sound like a perfect match

RxJS is indeed well suited for streaming data from a WebSocket into an application.

webSocket() Function

To create a new WebSocketSubject we will use the webSocket() function. The function signature is as follows:

webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>

Let's break this down:

  • First, we can specify the generic type T for the next notification values.
  • The function accepts either the string URL for connecting to a WebSocket or a WebSocketSubjectConfig object.
  • Finally, the function returns an instance of the WebSocketSubject.

WebSocketSubject

The WebSocketSubject enables us to communicate with the WebSocket:

  • An attempt to open a connection occurs upon subscribing to the WebSocketSubject unless a connection is already open. Additional Observers to the WebSocketSubject instance will result in a multicast of notifications from the single connection.
  • The connection is closed when the number of Observers goes from 1 to 0.
  • All Observers receive a next notification for messages received from the server. By default, messages are deserialized via JSON.parse().
  • All Observers receive a complete notification when the connection is closed.
  • All Observers receive an error notification when an error occurs.
  • Emitting a next notification using the next() method on the WebSocketSubject instance sends a message to the server.
  • Emitting a complete notification using the complete() method on the WebSocketSubject instance will close the connection.
  • Emitting an error notification using the error() method on the WebSocketSubject instance will close the connection and send the status code to the server.

The WebSocketSubject also provides the ability for multiplexing. This involves emulating multiple separate WebSocket connections through a single connection to improve performance.

Example

Let's look at an example to get started:

import { webSocket } from 'rxjs/webSocket';

/** Create a new WebSocketSubject using the webSocket operator. */
const webSocketSubject = webSocket<string>('wss://echo.websocket.org');

/** Send a message prior to opening the connection (subscribing). */
webSocketSubject.next('first');

/** Subscribe to the WebSocketSubject. */
webSocketSubject.subscribe({
  error: e => console.error(e),
  next: console.log,
  complete: () => console.log('complete')
});

/** Send additional messages after opening the connection. */
webSocketSubject.next('second');
webSocketSubject.next('third');

/** After a few seconds close the WebSocket connection. */
window.setTimeout(() => {
  webSocketSubject.complete();
}, 2000);

See example on codesandbox

Let's review the code above:

  • First, we import the webSocket() function from the "rxjs/webSocket" module.
  • We then create a new WebSocketSubject using the webSocket() function specifying the generic type of string for the next notification values, and the connection URL.
  • The "first" message is emitted using the next() method before opening the connecting by subscribing to the WebSocketSubject. All messages sent when the connection is closed are buffered until the connection is opened.
  • Next, we subscribe to the WebSocketSubject, which will trigger an attempt to connect to the WebSocket server.
  • Then, we send some additional messages, which will be immediately sent to the WebSocket server.
  • Finally, after 2000 milliseconds we invoke the complete() method to close the connection to the server.

WebSocketSubjectConfig

The WebSocketSubjectConfig object enables us to provide additional configuration to the WebSocketSubject that is created using the webSocket() operator. Let's look at a few of the configuration properties:

  • url is the only required property, which specifies the WebSocket connection URL.
  • deserializer enables us to override the default behavior of parsing the JSON messages using JSON.parse() received from the server.
  • serializer enables us to create a custom serializer before sending a message to the server.
  • openObserver enables us to provide an Observer whose next() method is invoked when a connection is opened.
  • closeObserver enables us to provide an Observer whose next() method is invoked when a connection is closed.

Let's take a quick look at an example of using the openObserver and closeObserver configuration properties:

import { Subject } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';

/** Observers the open event on the websocket. */
const open = new Subject();

/** Observes the close event on the websocket. */
const close = new Subject();

/** The websocket subject. */
const webSocketSubject = webSocket<string>({
  url: 'wss://echo.websocket.org',
  openObserver: open,
  closeObserver: close
});

/** Subscribe to open and close Observers. */
close.subscribe(console.log);
open.subscribe(console.log);

/** Subscribe to WebSocketSubject to open the connection. */
webSocketSubject.subscribe({
  next: console.log,
  error: (e) => console.error(e),
  complete: () => console.log('complete')
});

/** Send message. */
webSocketSubject.next('first');

/** After a few seconds close the WebSocket connection. */
window.setTimeout(() => {
  webSocketSubject.complete();
}, 2000);

See example on codesandbox

In the example above we are not providing the WebSocketSubjectConfig object to the webSocket() operator. Let's review the example above:

  • First, we create two new Subject instances called open and close. We'll use these as Observers to the open and close events for the WebSocket connection.
  • We then create a new WebSocketSubject instance using the webSocket() function providing the WebSocketSubjectConfig object. In the config object we set the url along with the openObserver and closeObserver properties. The openObserver and closeObserver property values are set to the open and close Subjects accordingly.
  • Then, we subscribe to both the close and open Subjects and pass the console.log() function as the next notification callback function.
  • Then, we subscribe to the webSocketSubject and provide an Observer with properties for each notification type.
  • Next, we send a message using the next() method on the webSocketSubject. We should expect an echo message back from the server.
  • Finally, after 2000 milliseconds we invoke the complete() method to close the connection to the server.

WebSocketSubject

Subject Variants

WebSocket

The WebSocket protocol enables a single connection with full-duplex communication. WebSocket connections are ideal for streaming data in real-time.

Some example use cases for a WebSocket may include:

  • Chat application
  • Financial application
  • News application
  • Sports game application
  • Game application

At this point you may be thinking:

WebSockets + RxJs sound like a perfect match

RxJS is indeed well suited for streaming data from a WebSocket into an application.

webSocket() Function

To create a new WebSocketSubject we will use the webSocket() function. The function signature is as follows:

webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>

Let's break this down:

  • First, we can specify the generic type T for the next notification values.
  • The function accepts either the string URL for connecting to a WebSocket or a WebSocketSubjectConfig object.
  • Finally, the function returns an instance of the WebSocketSubject.

WebSocketSubject

The WebSocketSubject enables us to communicate with the WebSocket:

  • An attempt to open a connection occurs upon subscribing to the WebSocketSubject unless a connection is already open. Additional Observers to the WebSocketSubject instance will result in a multicast of notifications from the single connection.
  • The connection is closed when the number of Observers goes from 1 to 0.
  • All Observers receive a next notification for messages received from the server. By default, messages are deserialized via JSON.parse().
  • All Observers receive a complete notification when the connection is closed.
  • All Observers receive an error notification when an error occurs.
  • Emitting a next notification using the next() method on the WebSocketSubject instance sends a message to the server.
  • Emitting a complete notification using the complete() method on the WebSocketSubject instance will close the connection.
  • Emitting an error notification using the error() method on the WebSocketSubject instance will close the connection and send the status code to the server.

The WebSocketSubject also provides the ability for multiplexing. This involves emulating multiple separate WebSocket connections through a single connection to improve performance.

Example

Let's look at an example to get started:

import { webSocket } from 'rxjs/webSocket';

/** Create a new WebSocketSubject using the webSocket operator. */
const webSocketSubject = webSocket<string>('wss://echo.websocket.org');

/** Send a message prior to opening the connection (subscribing). */
webSocketSubject.next('first');

/** Subscribe to the WebSocketSubject. */
webSocketSubject.subscribe({
  error: e => console.error(e),
  next: console.log,
  complete: () => console.log('complete')
});

/** Send additional messages after opening the connection. */
webSocketSubject.next('second');
webSocketSubject.next('third');

/** After a few seconds close the WebSocket connection. */
window.setTimeout(() => {
  webSocketSubject.complete();
}, 2000);

See example on codesandbox

Let's review the code above:

  • First, we import the webSocket() function from the "rxjs/webSocket" module.
  • We then create a new WebSocketSubject using the webSocket() function specifying the generic type of string for the next notification values, and the connection URL.
  • The "first" message is emitted using the next() method before opening the connecting by subscribing to the WebSocketSubject. All messages sent when the connection is closed are buffered until the connection is opened.
  • Next, we subscribe to the WebSocketSubject, which will trigger an attempt to connect to the WebSocket server.
  • Then, we send some additional messages, which will be immediately sent to the WebSocket server.
  • Finally, after 2000 milliseconds we invoke the complete() method to close the connection to the server.

WebSocketSubjectConfig

The WebSocketSubjectConfig object enables us to provide additional configuration to the WebSocketSubject that is created using the webSocket() operator. Let's look at a few of the configuration properties:

  • url is the only required property, which specifies the WebSocket connection URL.
  • deserializer enables us to override the default behavior of parsing the JSON messages using JSON.parse() received from the server.
  • serializer enables us to create a custom serializer before sending a message to the server.
  • openObserver enables us to provide an Observer whose next() method is invoked when a connection is opened.
  • closeObserver enables us to provide an Observer whose next() method is invoked when a connection is closed.

Let's take a quick look at an example of using the openObserver and closeObserver configuration properties:

import { Subject } from 'rxjs';
import { webSocket } from 'rxjs/webSocket';

/** Observers the open event on the websocket. */
const open = new Subject();

/** Observes the close event on the websocket. */
const close = new Subject();

/** The websocket subject. */
const webSocketSubject = webSocket<string>({
  url: 'wss://echo.websocket.org',
  openObserver: open,
  closeObserver: close
});

/** Subscribe to open and close Observers. */
close.subscribe(console.log);
open.subscribe(console.log);

/** Subscribe to WebSocketSubject to open the connection. */
webSocketSubject.subscribe({
  next: console.log,
  error: (e) => console.error(e),
  complete: () => console.log('complete')
});

/** Send message. */
webSocketSubject.next('first');

/** After a few seconds close the WebSocket connection. */
window.setTimeout(() => {
  webSocketSubject.complete();
}, 2000);

See example on codesandbox

In the example above we are not providing the WebSocketSubjectConfig object to the webSocket() operator. Let's review the example above:

  • First, we create two new Subject instances called open and close. We'll use these as Observers to the open and close events for the WebSocket connection.
  • We then create a new WebSocketSubject instance using the webSocket() function providing the WebSocketSubjectConfig object. In the config object we set the url along with the openObserver and closeObserver properties. The openObserver and closeObserver property values are set to the open and close Subjects accordingly.
  • Then, we subscribe to both the close and open Subjects and pass the console.log() function as the next notification callback function.
  • Then, we subscribe to the webSocketSubject and provide an Observer with properties for each notification type.
  • Next, we send a message using the next() method on the webSocketSubject. We should expect an echo message back from the server.
  • Finally, after 2000 milliseconds we invoke the complete() method to close the connection to the server.

Copyright

Site © by LiveLoveApp, LLC

We Can Help!

Need help with RxJS in your project?

Hire Us