Observers
An Observer consumes the values that are emitted by an Observerable
.
The Observer is an object with three properties for each notification type.
interface NextObserver<T> {
closed?: boolean;
next: (value: T) => void;
error?: (err: any) => void;
complete?: () => void;
}
interface ErrorObserver<T> {
closed?: boolean;
next?: (value: T) => void;
error: (err: any) => void;
complete?: () => void;
}
interface CompletionObserver<T> {
closed?: boolean;
next?: (value: T) => void;
error?: (err: any) => void;
complete: () => void;
}
declare type PartialObserver<T> = NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>;
The Observer can optionally specify one or more properties corresponding to each notification type:
next
: the next value notification that is emitted 0 to n number of times.error
: the error notification is emitted 0 or 1 times when an exception occurs, along with the immediate completion of the stream.complete
: the completion notification is emitted upon completion.
When the Observable is complete the Observer will not receive any further notification (of any type).
Finally, we should note that when subscribing to an Observable the PartialObserver
is optional:
subscribe(observer?: PartialObserver<T>): Subscription;
We can also specify the callback functions for each notification type as arguments to the subscribe()
method rather than the PartialObservable
object:
subscribe(next?: (value: T) => void, error?: (error: any) => void, complete?: () => void): Subscription;
Creation of an Observable
It helps us to understand the Observer
by looking at the creation of a new Observable
.
import { Observable } from 'rxjs';
/* create a new observable, providing the observer. */
const observable: Observable<string> = new Observable(observer => {
const interval = setInterval(() => {
observer.next('Hello from Observableland!');
}, 2000);
// teardown
return () => {
clearInterval(interval);
};
});
/* Subscribe to Notifications. */
observable.subscribe(value => console.log(value));
Before we break this down it may be helpful to look at the constructor
function signature:
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic);
Let's break this down:
- First, we import the
Observable
class. - We create a new
Observable
specifying the generic type ofstring
. - The constructor accepts a
subscribe
function whosethis
execution context is anObservable
. The function requires a newObserver
. - The
next()
method on theObserver
emits a next notification to Observers. - The
error()
method on theObserver
emits the error notification when an exception occurs. - The
complete()
method on theObserver
emits the completion notification when the Observable stream is complete and will no longer emit an error or next notification. - Finally, the
subscribe
function returns a teardown function. The teardown function is invoked when the Observable is complete, errors, or the Observer unsubscribes. This is an opportunity for the observable to clean anything up.
Key Takeaways
- Observers have three methods:
next()
,error()
, andcomplete()
. - While not common when using RxJS, you can new-up an
Observable
using itsconstructor()
function.