What's with the Subjects in RxJS 5

Posted on

What's with the Subjects in RxJS 5

RxJS is a great library, it is used more and more in different places & frameworks across the web. Soon it will dominate the asynchronous part of web apps, so that’s why it’s important to know how subjects work.Subjects combine the best of both an Observable and Observer. So you can think of a Subject being an Observable and Observer at the same time.

A Subject is an Observable, but an Observable is unicast. Subjects on the other hand are multicast, which means that  they act like EventEmitters that can handle a set of multiple listeners or subscribers. To facilitate this behaviour, new observers are registered in a list of Observers that all get notified. The subscribers are agnostic about what is sending the updates, as it can be a plain Observable or a Subject emitting the values.

A Subject is an Observer, which means it can also send new updates on next(val), emit errors on error(err) and complete() the stream.

Subjects come in different flavours, like a normal Subject, a BehaviourSubject, a ReplaySubject and an AsyncSubject. We’ll try to explain the differences between all of them and support the explanation with a real-world and readable example.

What is a Subject?

A Subject can be used to subscribe all the observers of that Subject, which is also an Observable. The Subject can subscribe to another async operation or source like an API backend, serving as a proxy for a collection of subscribers and a source.

Example

In the example below, we have two Observers attached to one Subject, and we feed some values to the Subject. Notice that there are different ways for subscribing. You can use an object that defines a function for the next value, or use the 3 different callback functions for next, error and complete:

import {Subject} from 'rxjs/Subject';
let subject = new Subject<number>();

subject.subscribe(
   (val: number) => { console.log('observer I: ' + val) },
   () => {},
   () => {}
);
subject.subscribe({
  next: (val: number) => console.log('observer II: ' + val)
});

subject.next('test')  // ! compile time error !

subject.next(11);
// log: observer I: 11
// log: observer II: 11

subject.next(22);
// log: observer I: 22
// log: observer II: 22

We are using TypeScript in this example so we can provide a type to the Subject. This way we get type-checking on compile-time if we want to send a next value that does not comply with the type!

See this example in action here: https://plnkr.co/edit/nrXf41PNgUdJTMCjR9gG?p=preview

What is a BehaviorSubject?

The BehaviorSubject is different from a normal Subject in the way that it stores the latest emitted value and requires an initial value. When a new subscriber subscribes to the BehaviorSubject, the latest value is immediately send to the new subscriber. If there is no value emitted yet, the default value will be returned. Once a BehaviorSubject is completed it will free up the memory. Newly subscribed Observers will not receive the latest value.

Example

We give the BehaviorSubject an initial value of 11. The first Observer that subscribes to the subject gets immediately notified of the initial value. The same is true for the next Observer.

import {BehaviorSubject} from 'rxjs/BehaviorSubject';
let behaviorSubject = new BehaviorSubject<number>(11);

behaviorSubject.subscribe(
   (val: number) => console.log('observer I: ' + val)
);
// log: observer I: 11

behaviorSubject.subscribe({
   next: (val: number) => console.log('observer II: ' + val)
});
// log: observer II: 11

From this point on the behavior of the BehaviorSubject is the same as a normal Subject. But if we subscribe a new Observer to the BehaviorSubject it will immediately receive the last emitted value!

behaviorSubject.next(22);
// log: observer I: 22
// log: observer II: 22

behaviorSubject.next(33);
// log: observer I: 33
// log: observer II: 33

behaviorSubject.subscribe({
   next: (val: number) => console.log('observer III: ' + val)
});
// observer III: 33

See this example in action here: https://plnkr.co/edit/HPNSAzFmLipvHQ96ZkFt?p=preview

Real world examples cases

You’ll want to use a BehaviorSubject if you always need an initial value. This initial value can serve as a temporary placeholder while you wait for the first, of a possible sequence, real value. For example:

  • You are fetching the contents of a file and you want to show an initial placeholder, stating that the file is loading.
  • Your chatbox shows a welcome message before you effectively start receiving real values.

What is a ReplaySubject?

A ReplaySubject is kind of similar to a BehaviorSubject. It can send previous values to new attached Subscribers, but it can also record a part of the Observable execution. The ReplaySubject records multiple values from the Observable execution and replays them to any new Subscribers attached.

The first parameter in the constructor when creating a ReplaySubject defines how many values to replay. The second parameter specifies the window time in miliseconds that, besides the buffer size, determines the maximum lifetime of recorded values.

Example

In the following example we use a rather big buffer size of 25, but the window time parameter invalidates the recorded items in the buffer after 1000ms or 1 second. So if we emit some values using a specific timeline of 3 points every 800ms, only the values emitted in the last timeframe are replayed, as all other values were invalidated after 2000ms. The first batch after 1000ms and the second batch after 2000ms in the complete timeframe.

import {ReplaySubject} from 'rxjs/ReplaySubject';
let replaySubject = new ReplaySubject<number>(25, 1000);

setTimeout(() => {
  replaySubject.next(1);
  replaySubject.next(2);
  replaySubject.next(3);
}, 0);

setTimeout(() => {
  replaySubject.next(4);
  replaySubject.next(5);
  replaySubject.next(6);
}, 800);

setTimeout(() => {
  replaySubject.next(7);
  replaySubject.next(8);
  replaySubject.next(9);
}, 1600);

setTimeout(() => {

  replaySubject.subscribe(
    (val: number) => console.log('observer: ' + val)
  );

}, 2000);
// log: observer: 7
// log: observer: 8
// log: observer: 9

See this example in action here: https://plnkr.co/edit/dfCLm3Eh2c39JKOlGLZy?p=preview

Real world examples cases

You’ll mostly need a ReplaySubject if you need to replay a set of actions or values every time you invoke a specific part of your application. For example:

  • You could use a ReplaySubject in your application to show the last 5 notifications, every time the user goes back to his dashboard.
  • You could use a ReplaySubject in your chat application to show the last 5 messages of a specific user, every time you open up his chatbox.

What is an AsyncSubject?

The AsyncSubject is an alternate implementation of a Subject where only the last value is sent to its observers, and only when the execution completes. This last value is cached forever and every new Observer subscribing to the AsyncSubject will immediately receive the cached value.

AsyncSubjects can be used a replacement for Promises. The only difference is that a Promise handles a single value, and AsyncSubjects handle multiple values, only caching and emitting the last value. Remember to complete the AsyncSubject, as this is the caching-enabler!

In the following example the only emitted value to the next function of the observer will be 3, as we complete the subject after the last call to next.

import {AsyncSubject} from 'rxjs/Subject';
let asyncSubject = new AsyncSubject<number>();

asyncSubject.subscribe(
  (val: number) => console.log('observer: ' + val)
);

asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3);

asyncSubject.complete();
// log: observer: 3

See this example in action here: https://plnkr.co/edit/3QAbza00V36EQv7N5seO?p=preview

Real world examples case

An AsyncSubject can be used in situations where you do multiple requests to a resource. If your remote service/function uses and returns a singleton implementation of the AsyncSubject, subsequent calls to this resource will only invoke one call to the backend. All the subsequent calls (and subscribers) will be notified with the same result of the singleton AsyncSubject.

More details and examples?

If you know of any other related examples for any type of Subject, let me know! I’ll be happy to add them in the list of examples. If you want to read more about Subjects or RxJS in general you can read trough the extensive readme files on the project’s Github repository: https://github.com/ReactiveX/rxjs/tree/5.0.0-beta.10/doc or on the official website’s documentation: http://reactivex.io/rxjs/identifiers.html.


Leave some feedback please!

Do you have anything to add? Or have some other remarks? Please let me know in the comment section below! Looking forward to the discusion!