Multicast Operator

Multicasting Operators

The multicast() Operator

In the previous section we learned how to manually use a source Observable with a Subject in order to multicast to multiple Observers. In most cases when we need to multicast we do not necessarily need a Subject. What we really need is a multicasted Observable.

We'll use the multicast() or publish() operators to create multicasted Observables.

Example

Let's look at an example of using the multicast() operator to create a new ConnectableObservable:

import { ConnectableObservable, Observable, Subject } from 'rxjs';
import { multicast, takeUntil, } from 'rxjs/operators';

const completeSubject = new Subject<void>();

/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
  console.log('%cNew subscription created', 'background: #222; color: #bada55');

  let i = 0;
  const interval = setInterval(() => {
    observer.next(i++);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
}).pipe(takeUntil(completeSubject);

const subject = new Subject<number>();
const multicasted = observable.pipe(
  multicast(subject)
) as ConnectableObservable<number>;

/* Each subscription receives a copy of Observer. */
multicasted.subscribe((value) => console.log('First subscription', value));
multicasted.subscribe((value) => console.log('Second subscription', value));

/* Connect the subject to the observabe. */
multicasted.connect();

/* Complete the observable after 5 seconds. */
setTimeout(() => {
  completeSubject.next();
  completeSubject.complete();
}, 5000);

See example on codesandbox

Let's break this down:

  • First, we define completeSubject, which we will use to trigger the completion of the observable using the takeUntil() operator.
  • We are creating a new Observable that emits a number value every 1000 milliseconds.
  • We then create a new Subject and pass this to the multicast() operator. We can also provide the multicast() operator with a factory function that returns a Subject.
  • We subscribe multiple times to the ConnectableObservable that was created by the multicast() operator.
  • We invoke the connect() method on the ConnectableObservable instance in order to subscribe to the Subject. The connect() method returns an instance of a Subcription, which we can then use to unsubscribe from the multicasted Observable.
  • Finally, after 5000 milliseconds we complete the source Observable.

Multicast Operator

Multicasting Operators

The multicast() Operator

In the previous section we learned how to manually use a source Observable with a Subject in order to multicast to multiple Observers. In most cases when we need to multicast we do not necessarily need a Subject. What we really need is a multicasted Observable.

We'll use the multicast() or publish() operators to create multicasted Observables.

Example

Let's look at an example of using the multicast() operator to create a new ConnectableObservable:

import { ConnectableObservable, Observable, Subject } from 'rxjs';
import { multicast, takeUntil, } from 'rxjs/operators';

const completeSubject = new Subject<void>();

/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
  console.log('%cNew subscription created', 'background: #222; color: #bada55');

  let i = 0;
  const interval = setInterval(() => {
    observer.next(i++);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
}).pipe(takeUntil(completeSubject);

const subject = new Subject<number>();
const multicasted = observable.pipe(
  multicast(subject)
) as ConnectableObservable<number>;

/* Each subscription receives a copy of Observer. */
multicasted.subscribe((value) => console.log('First subscription', value));
multicasted.subscribe((value) => console.log('Second subscription', value));

/* Connect the subject to the observabe. */
multicasted.connect();

/* Complete the observable after 5 seconds. */
setTimeout(() => {
  completeSubject.next();
  completeSubject.complete();
}, 5000);

See example on codesandbox

Let's break this down:

  • First, we define completeSubject, which we will use to trigger the completion of the observable using the takeUntil() operator.
  • We are creating a new Observable that emits a number value every 1000 milliseconds.
  • We then create a new Subject and pass this to the multicast() operator. We can also provide the multicast() operator with a factory function that returns a Subject.
  • We subscribe multiple times to the ConnectableObservable that was created by the multicast() operator.
  • We invoke the connect() method on the ConnectableObservable instance in order to subscribe to the Subject. The connect() method returns an instance of a Subcription, which we can then use to unsubscribe from the multicasted Observable.
  • Finally, after 5000 milliseconds we complete the source Observable.

Copyright

Site © by LiveLoveApp, LLC

We Can Help!

Need help with RxJS in your project?

Hire Us