Manual Subscription
Previously we learned about the multicast()
operator.
You may have noticed that if you omitted invoking the connect()
method that the Observers never received a notification.
This is because the connect()
method determines when the ConnectableObservable
is "connected" (or subscribed) to the source Observable.
It can be a bit verbose to always call connect()
and then subsequently unsubscribe()
from the Subscription
that is returned.
The solution to this is the refCount()
operator.
Automatic Subscription
The refCount()
operator tracks the number of Observers to a ConnectableObservable
.
When the number of Observers goes from 0
to 1
the refCount()
operator will automatically invoke the connect()
method for us.
And, when the number of Observers goes from 1
to 0
then the refCount()
operator will automatically unsubscribe()
from the Subscription
that is returned (internally) from the connect()
method.
Example
import { ConnectableObservable, Observable, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';
/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
console.log('%cNew subscription created', 'background: #222; color: #bada55');
let i = 0;
const interval = setInterval(() => {
observer.next(i++);
}, 1000);
return () => {
clearInterval(interval);
};
});
/** Create the ConnectableObservable. */
const subject = new Subject<number>();
const multicasted = observable.pipe(
multicast(subject),
refCount()
);
/* Each subscription receives a copy of Observer. */
const subscription = multicasted.subscribe((value) =>
console.log('First subscription', value)
);
subscription.add(
multicasted.subscribe((value) => console.log('Second subscription', value))
);
/* Complete the observable after 5 seconds. */
setTimeout(() => subscription.unsubscribe(), 5000);
Let's review the code above.
- This is pretty similar to the code in the previous section on the
multicast()
operator. The main difference is that instead of unsubscribing after 5000 milliseconds using thetakeUntil()
operator we are using therefCount()
operator to manage the Subscription for us. - Within the
pipe()
for theobservable
we first use themulticast()
operator, which accepts anObservable
as the input and returns a newConnectableObservable
. Then, we use therefCount()
operator, which returns a newObservable
. As a side note, this is why we no longer need to castmulticasted
as aConnectableObserver
since it is now anObservable
. - We then subscribe multiple times to the multicasted and refcounted Observable.
We keep track of the
Subscription
instance that is returned, and use theadd()
method to add child Subscriptions. - Finally, after 5000 milliseconds we invoke the
unsubscribe()
method on the parentsubscription
.