ReplaySubject

Subject Variants

ReplaySubject

The ReplaySubject class extends the Subject class, which extends the Observable class. The ReplaySubject emits all or a specified number of past next notifications, and all future next notifications to an Observer upon subscribing until the Observer unsubscribes or the Observable is complete or errors.

As the name suggests, the notifications are "replayed" to an Observer no matter when the Observer subscribes.

The first optional argument to the constructor() function is bufferSize. This specifies the maximum number next notification values that will be emitted immediately to an Observer upon subscribing.

The second optional argument to the constructor() function is windowTime. This specifies the maximum number of milliseconds that each next notification value is available to be emitted immediately to on Observer upon subscribing.

Example

Let's look at an example:

import { ReplaySubject } from 'rxjs';

/* Create an instance of ReplaySubject. */
const replaySubject = new ReplaySubject<number>();

/* Subscribe to subject. */
replaySubject.subscribe({
  next: (value) => console.log('before:', value),
  error: (error) => console.error('before', error),
  complete: () => console.log('complete before')
});

/* Emit some values. */
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);

/* Subscribe late to subject. */
replaySubject.subscribe({
  next: (value) => console.log('after:', value),
  error: (error) => console.error('after:', error),
  complete: () => console.log('complete after')
});

/* Complete the observable stream. */
replaySubject.complete();

See example on codesandbox

Let's review the code above:

  • First, we create a new ReplaySubject instance and specify the generic type.
  • Then, we subscribe to the replaySubject before any next notifications are emitted.
  • We emit three values using the next() method.
  • Then, we subscribe again to the replaySubject after the next notifications have been emitted.
  • Finally, we complete() the Observable.

In this example the first Observer will receive the next notifications as they are emitted. However, as soon as the second Observer subscribes, the Observer receives all past next notifications.

Finally, both Observers receive the completion notification when we invoke complete().

Example with bufferSize Set

Now, let's take the previous example and specify a buffer size of 2.

import { ReplaySubject } from 'rxjs';

/* Create an instance of ReplaySubject. */
const replaySubject = new ReplaySubject<number>(2);

/* Subscribe to subject. */
replaySubject.subscribe({
  next: (value) => console.log('before:', value),
  error: (error) => console.error('before', error),
  complete: () => console.log('complete before')
});

/* Emit some values. */
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);

/* Subscribe late to subject. */
replaySubject.subscribe({
  next: (value) => console.log('after:', value),
  error: (error) => console.error('after:', error),
  complete: () => console.log('complete after')
});

/* Complete the observable stream. */
replaySubject.complete();

In the example above, the late Observer should only receive the last 2 next notification upon subscribing.

ReplaySubject

Subject Variants

ReplaySubject

The ReplaySubject class extends the Subject class, which extends the Observable class. The ReplaySubject emits all or a specified number of past next notifications, and all future next notifications to an Observer upon subscribing until the Observer unsubscribes or the Observable is complete or errors.

As the name suggests, the notifications are "replayed" to an Observer no matter when the Observer subscribes.

The first optional argument to the constructor() function is bufferSize. This specifies the maximum number next notification values that will be emitted immediately to an Observer upon subscribing.

The second optional argument to the constructor() function is windowTime. This specifies the maximum number of milliseconds that each next notification value is available to be emitted immediately to on Observer upon subscribing.

Example

Let's look at an example:

import { ReplaySubject } from 'rxjs';

/* Create an instance of ReplaySubject. */
const replaySubject = new ReplaySubject<number>();

/* Subscribe to subject. */
replaySubject.subscribe({
  next: (value) => console.log('before:', value),
  error: (error) => console.error('before', error),
  complete: () => console.log('complete before')
});

/* Emit some values. */
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);

/* Subscribe late to subject. */
replaySubject.subscribe({
  next: (value) => console.log('after:', value),
  error: (error) => console.error('after:', error),
  complete: () => console.log('complete after')
});

/* Complete the observable stream. */
replaySubject.complete();

See example on codesandbox

Let's review the code above:

  • First, we create a new ReplaySubject instance and specify the generic type.
  • Then, we subscribe to the replaySubject before any next notifications are emitted.
  • We emit three values using the next() method.
  • Then, we subscribe again to the replaySubject after the next notifications have been emitted.
  • Finally, we complete() the Observable.

In this example the first Observer will receive the next notifications as they are emitted. However, as soon as the second Observer subscribes, the Observer receives all past next notifications.

Finally, both Observers receive the completion notification when we invoke complete().

Example with bufferSize Set

Now, let's take the previous example and specify a buffer size of 2.

import { ReplaySubject } from 'rxjs';

/* Create an instance of ReplaySubject. */
const replaySubject = new ReplaySubject<number>(2);

/* Subscribe to subject. */
replaySubject.subscribe({
  next: (value) => console.log('before:', value),
  error: (error) => console.error('before', error),
  complete: () => console.log('complete before')
});

/* Emit some values. */
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);

/* Subscribe late to subject. */
replaySubject.subscribe({
  next: (value) => console.log('after:', value),
  error: (error) => console.error('after:', error),
  complete: () => console.log('complete after')
});

/* Complete the observable stream. */
replaySubject.complete();

In the example above, the late Observer should only receive the last 2 next notification upon subscribing.

Copyright

Site © by LiveLoveApp, LLC

We Can Help!

Need help with RxJS in your project?

Hire Us