What's with the Subjects in RxJS 5
Posted on
RxJS is a great library, it is used more and more in different places & frameworks across the web. Soon it will dominate the asynchronous part of web apps, so that’s why it’s important to know how subjects work. Subjects
combine the best of both an Observable
and Observer
. So you can think of a Subject
being an Observable
and Observer
at the same time.
A Subject
is an Observable
, but an Observable
is unicast. Subjects
on the other hand are multicast, which means that they act like EventEmitters
that can handle a set of multiple listeners or subscribers. To facilitate this behaviour, new observers are registered in a list of Observers
that all get notified. The subscribers are agnostic about what is sending the updates, as it can be a plain Observable
or a Subject
emitting the values.
A Subject
is an Observer
, which means it can also send new updates on next(val)
, emit errors on error(err)
and complete()
the stream.
Subjects
come in different flavours, like a normal Subject
, a BehaviourSubject
, a ReplaySubject
and an AsyncSubject
. We’ll try to explain the differences between all of them and support the explanation with a real-world and readable example.
What is a Subject
?
A Subject
can be used to subscribe all the observers of that Subject
, which is also an Observable
. The Subject
can subscribe to another async operation or source like an API backend, serving as a proxy for a collection of subscribers and a source.
Example
In the example below, we have two Observers
attached to one Subject
, and we feed some values to the Subject
. Notice that there are different ways for subscribing. You can use an object that defines a function for the next value, or use the 3 different callback functions for next
, error
and complete
:
subject.ts
import { Subject } from 'rxjs/Subject';
let subject = new Subject<number>();
subject.subscribe(
(val: number) => { console.log('observer I: ' + val) },
() => {},
() => {}
);
subject.subscribe({
next: (val: number) => console.log('observer II: ' + val)
});
subject.next('test') // ! compile time error !
subject.next(11);
// log: observer I: 11
// log: observer II: 11
subject.next(22);
// log: observer I: 22
// log: observer II: 22
We are using TypeScript in this example so we can provide a type to the Subject
. This way we get type-checking on compile-time if we want to send a next value that does not comply with the type!
See this example in action here: https://plnkr.co/edit/nrXf41PNgUdJTMCjR9gG?p=preview
What is a BehaviorSubject
?
The BehaviorSubject
is different from a normal Subject
in the way that it stores the latest emitted value and requires an initial value. When a new subscriber subscribes to the BehaviorSubject
, the latest value is immediately send to the new subscriber. If there is no value emitted yet, the default value will be returned. Once a BehaviorSubject
is completed it will free up the memory. Newly subscribed Observers
will not receive the latest value.
Example
We give the BehaviorSubject
an initial value of 11
. The first Observer
that subscribes to the subject gets immediately notified of the initial value. The same is true for the next Observer
.
behavior-subject.ts
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
let behaviorSubject = new BehaviorSubject<number>(11);
behaviorSubject.subscribe(
(val: number) => console.log('observer I: ' + val)
);
// log: observer I: 11
behaviorSubject.subscribe({
next: (val: number) => console.log('observer II: ' + val)
});
// log: observer II: 11
From this point on the behavior of the BehaviorSubject
is the same as a normal Subject
. But if we subscribe a new Observer
to the BehaviorSubject
it will immediately receive the last emitted value!
behavior-subject-last-value.ts
behaviorSubject.next(22);
// log: observer I: 22
// log: observer II: 22
behaviorSubject.next(33);
// log: observer I: 33
// log: observer II: 33
behaviorSubject.subscribe({
next: (val: number) => console.log('observer III: ' + val)
});
// observer III: 33
See this example in action here: https://plnkr.co/edit/HPNSAzFmLipvHQ96ZkFt?p=preview
Real world examples cases
You’ll want to use a BehaviorSubject
if you always need an initial value. This initial value can serve as a temporary placeholder while you wait for the first, of a possible sequence, real value. For example:
- You are fetching the contents of a file and you want to show an initial placeholder, stating that the file is loading.
- Your chatbox shows a welcome message before you effectively start receiving real values.
What is a ReplaySubject
?
A ReplaySubject
is kind of similar to a BehaviorSubject.
It can send previous values to new attached Subscribers
, but it can also record a part of the Observable
execution. The ReplaySubject
records multiple values from the Observable
execution and replays them to any new Subscribers
attached.
The first parameter in the constructor when creating a ReplaySubject
defines how many values to replay. The second parameter specifies the window time in miliseconds that, besides the buffer size, determines the maximum lifetime of recorded values.
Example
In the following example we use a rather big buffer size of 25
, but the window time parameter invalidates the recorded items in the buffer after 1000ms
or 1 second. So if we emit some values using a specific timeline of 3 points every 800ms, only the values emitted in the last timeframe are replayed, as all other values were invalidated after 2000ms. The first batch after 1000ms and the second batch after 2000ms in the complete timeframe.
replay-subject.ts
import { ReplaySubject } from 'rxjs/ReplaySubject';
let replaySubject = new ReplaySubject<number>(25, 1000);
setTimeout(() => {
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
}, 0);
setTimeout(() => {
replaySubject.next(4);
replaySubject.next(5);
replaySubject.next(6);
}, 800);
setTimeout(() => {
replaySubject.next(7);
replaySubject.next(8);
replaySubject.next(9);
}, 1600);
setTimeout(() => {
replaySubject.subscribe(
(val: number) => console.log('observer: ' + val)
);
}, 2000);
// log: observer: 7
// log: observer: 8
// log: observer: 9
See this example in action here: https://plnkr.co/edit/dfCLm3Eh2c39JKOlGLZy?p=preview
Real world examples cases
You’ll mostly need a ReplaySubject
if you need to replay a set of actions or values every time you invoke a specific part of your application. For example:
- You could use a
ReplaySubject
in your application to show the last 5 notifications, every time the user goes back to his dashboard. - You could use a
ReplaySubject
in your chat application to show the last 5 messages of a specific user, every time you open up his chatbox.
What is an AsyncSubject
?
The AsyncSubject
is an alternate implementation of a Subject
where only the last value is sent to its observers, and only when the execution completes. This last value is cached forever and every new Observer
subscribing to the AsyncSubject
will immediately receive the cached value.
AsyncSubjects
can be used as a replacement for Promises
. The only difference is that a Promise handles a single value, and AsyncSubjects
handle multiple values, only caching and emitting the last value. Remember to complete the AsyncSubject
, as this is the caching-enabler!
In the following example the only emitted value to the next function of the observer will be 3, as we complete the subject after the last call to next.
async-subject.ts
import { AsyncSubject } from 'rxjs/Subject';
let asyncSubject = new AsyncSubject<number>();
asyncSubject.subscribe(
(val: number) => console.log('observer: ' + val)
);
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);
asyncSubject.complete();
// log: observer: 3
See this example in action here: https://plnkr.co/edit/3QAbza00V36EQv7N5seO?p=preview
Real world examples case
An AsyncSubject
can be used in situations where you do multiple requests to a resource. If your remote service/function uses and returns a singleton implementation of the AsyncSubject
, subsequent calls to this resource will only invoke one call to the backend. All the subsequent calls (and subscribers) will be notified with the same result of the singleton AsyncSubject
.
More details and examples?
If you know of any other related examples for any type of Subject, let me know! I’ll be happy to add them in the list of examples. If you want to read more about Subjects
or RxJS in general you can read trough the extensive readme files on the project’s Github repository: https://github.com/ReactiveX/rxjs/tree/5.0.0-beta.10/doc or on the official website’s documentation: http://reactivex.io/rxjs/identifiers.html.
Further reading
By reading this article I hope you can find a solution for your problem. If it still seems a little bit unclear, you can hire me for helping you solve your specific problem or use case. Sometimes even just a quick code review or second opinion can make a great difference.