RefCount Operator

Multicasting Operators

Manual Subscription

Previously we learned about the multicast() operator. You may have noticed that if you omitted invoking the connect() method that the Observers never received a notification. This is because the connect() method determines when the ConnectableObservable is "connected" (or subscribed) to the source Observable.

It can be a bit verbose to always call connect() and then subsequently unsubscribe() from the Subscription that is returned.

The solution to this is the refCount() operator.

Automatic Subscription

The refCount() operator tracks the number of Observers to a ConnectableObservable. When the number of Observers goes from 0 to 1 the refCount() operator will automatically invoke the connect() method for us. And, when the number of Observers goes from 1 to 0 then the refCount() operator will automatically unsubscribe() from the Subscription that is returned (internally) from the connect() method.

Example

import { ConnectableObservable, Observable, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
  console.log('%cNew subscription created', 'background: #222; color: #bada55');

  let i = 0;
  const interval = setInterval(() => {
    observer.next(i++);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
});

/** Create the ConnectableObservable. */
const subject = new Subject<number>();
const multicasted = observable.pipe(
  multicast(subject),
  refCount()
);

/* Each subscription receives a copy of Observer. */
const subscription = multicasted.subscribe((value) =>
  console.log('First subscription', value)
);
subscription.add(
  multicasted.subscribe((value) => console.log('Second subscription', value))
);

/* Complete the observable after 5 seconds. */
setTimeout(() => subscription.unsubscribe(), 5000);

See example on codesandbox

Let's review the code above.

  • This is pretty similar to the code in the previous section on the multicast() operator. The main difference is that instead of unsubscribing after 5000 milliseconds using the takeUntil() operator we are using the refCount() operator to manage the Subscription for us.
  • Within the pipe() for the observable we first use the multicast() operator, which accepts an Observable as the input and returns a new ConnectableObservable. Then, we use the refCount() operator, which returns a new Observable. As a side note, this is why we no longer need to cast multicasted as a ConnectableObserver since it is now an Observable.
  • We then subscribe multiple times to the multicasted and refcounted Observable. We keep track of the Subscription instance that is returned, and use the add() method to add child Subscriptions.
  • Finally, after 5000 milliseconds we invoke the unsubscribe() method on the parent subscription.

RefCount Operator

Multicasting Operators

Manual Subscription

Previously we learned about the multicast() operator. You may have noticed that if you omitted invoking the connect() method that the Observers never received a notification. This is because the connect() method determines when the ConnectableObservable is "connected" (or subscribed) to the source Observable.

It can be a bit verbose to always call connect() and then subsequently unsubscribe() from the Subscription that is returned.

The solution to this is the refCount() operator.

Automatic Subscription

The refCount() operator tracks the number of Observers to a ConnectableObservable. When the number of Observers goes from 0 to 1 the refCount() operator will automatically invoke the connect() method for us. And, when the number of Observers goes from 1 to 0 then the refCount() operator will automatically unsubscribe() from the Subscription that is returned (internally) from the connect() method.

Example

import { ConnectableObservable, Observable, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

/* Create a new observable, providing the observer. */
const observable = new Observable<number>((observer) => {
  console.log('%cNew subscription created', 'background: #222; color: #bada55');

  let i = 0;
  const interval = setInterval(() => {
    observer.next(i++);
  }, 1000);

  return () => {
    clearInterval(interval);
  };
});

/** Create the ConnectableObservable. */
const subject = new Subject<number>();
const multicasted = observable.pipe(
  multicast(subject),
  refCount()
);

/* Each subscription receives a copy of Observer. */
const subscription = multicasted.subscribe((value) =>
  console.log('First subscription', value)
);
subscription.add(
  multicasted.subscribe((value) => console.log('Second subscription', value))
);

/* Complete the observable after 5 seconds. */
setTimeout(() => subscription.unsubscribe(), 5000);

See example on codesandbox

Let's review the code above.

  • This is pretty similar to the code in the previous section on the multicast() operator. The main difference is that instead of unsubscribing after 5000 milliseconds using the takeUntil() operator we are using the refCount() operator to manage the Subscription for us.
  • Within the pipe() for the observable we first use the multicast() operator, which accepts an Observable as the input and returns a new ConnectableObservable. Then, we use the refCount() operator, which returns a new Observable. As a side note, this is why we no longer need to cast multicasted as a ConnectableObserver since it is now an Observable.
  • We then subscribe multiple times to the multicasted and refcounted Observable. We keep track of the Subscription instance that is returned, and use the add() method to add child Subscriptions.
  • Finally, after 5000 milliseconds we invoke the unsubscribe() method on the parent subscription.

Copyright

Site © by LiveLoveApp, LLC

We Can Help!

Need help with RxJS in your project?

Hire Us