The multicast()
Operator
In the previous section we learned how to manually use a source Observable with a Subject
in order to multicast to multiple Observers.
In most cases when we need to multicast we do not necessarily need a Subject
.
What we really need is a multicasted Observable.
We'll use the multicast()
or publish()
operators to create multicasted Observables.
Example
Let's look at an example of using the multicast()
operator to create a new ConnectableObservable
:
import { ConnectableObservable, Observable, Subject } from 'rxjs';
import { multicast, takeUntil, } from 'rxjs/operators';
const completeSubject = new Subject<void>();
/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
console.log('%cNew subscription created', 'background: #222; color: #bada55');
let i = 0;
const interval = setInterval(() => {
observer.next(i++);
}, 1000);
return () => {
clearInterval(interval);
};
}).pipe(takeUntil(completeSubject);
const subject = new Subject<number>();
const multicasted = observable.pipe(
multicast(subject)
) as ConnectableObservable<number>;
/* Each subscription receives a copy of Observer. */
multicasted.subscribe((value) => console.log('First subscription', value));
multicasted.subscribe((value) => console.log('Second subscription', value));
/* Connect the subject to the observabe. */
multicasted.connect();
/* Complete the observable after 5 seconds. */
setTimeout(() => {
completeSubject.next();
completeSubject.complete();
}, 5000);
Let's break this down:
- First, we define
completeSubject
, which we will use to trigger the completion of theobservable
using thetakeUntil()
operator. - We are creating a new
Observable
that emits anumber
value every 1000 milliseconds. - We then create a new
Subject
and pass this to themulticast()
operator. We can also provide themulticast()
operator with a factory function that returns aSubject
. - We subscribe multiple times to the
ConnectableObservable
that was created by themulticast()
operator. - We invoke the
connect()
method on theConnectableObservable
instance in order to subscribe to theSubject
. Theconnect()
method returns an instance of aSubcription
, which we can then use to unsubscribe from the multicasted Observable. - Finally, after 5000 milliseconds we complete the source Observable.