A Comprehensive Introduction to RxJS

In this series, Rob, a senior machine-learning solutions architect from our advance analytics team, will introduce you to RxJS (a Typescript library that he loves) and share the problem space that it fills and its key features. After the basics are out of the way, he’ll dig in deeper to uncover the usage of its primary components and explore utility operators in future blogs, with the ultimate goal of covering all of the core functionality.
Take it away, Rob.
I’m excited to share the world of RxJS with you. If you do much asynchronous programming, you don’t need me to tell you that it’s challenging. I’m sure you have the battle scars to prove it. It can often be hard to reason about the interaction between async code bits that share state. If you, like me, have needed to poll, monitor a server API, or work with a Websocket
, and you’ve used setTimeout()
, Promise
, or callbacks to do so then read on, the future is bright(er).
Races in real life are exciting. Races in your code, however, are infuriating.
// Classic example of a race condition with promises.
let theData: any = {};
function getDataForId(id: number): Promise {
return new Promise((resolve, reject) => {
https.get(/*http call here*/, (resp) => {
// data fetch impl...
})
});
}
// What if it returns in 5 seconds, ...
getDataForId(123).then(serverData => theData = serverData);
// ... and this returns immediately? ...
getDataForId(456).then(serverData => theData = serverData);
// ...A few moments later...
// ... theData will contain the result from 123, when what we actually wanted the data from 456.
console.log(theData);
// {id: 123, payload: "hello world"}
// Undesired, and possibly unexpected!
In this (admittedly contrived) example, we see an undesired result. When we initiated the call with 456
after the call with 123
, something happened server-side and the calls returned out of sequence. Thus, we end up with the result from the first call. We could work around this with some extra code, but the more code we add, the more could go wrong, and the more we need to test and maintain.
Now, let’s look at the same scenario solved with RxJS.
// Example of a Race condition avoided with switchMap.
let theData: any = {};
const getDataForId = Subject().pipe(
switchMap(
HttpClient().get(/*http call here*/).pipe(
tap(serverData => theData = serverData)
)
)
);
// Starts, would return in 5 seconds, but is cancelled when...
getDataForId.next(123);
// ... this is called. Because SwitchMap is protecting us. ...
getDataForId.next(456);
// ...A few moments later...
// ... theData will always contain the result of the call to 456.
console.log(theData);
// {id: 456, payload: "goodbye world"}
In roughly the same number of lines and roughly the same interaction API, I solve a race condition. The switchMap()
operator will automatically cancel the current request and switch to the new one, avoiding the race condition altogether.
This, of course, is not the only place that RxJS shines. I also use it quite a bit for another common source of async data: user input. Look at how succinctly a typeahead can be managed with RxJS.
// The list of suggestions to display. Here I'm using a complex object,
// but this could just as easily been Observable<string[]>.
//
let typeahead$: Observable<TypeaheadSuggestions[]>;
// Display a spinner?
//
let isSpinnerActive = false;
// The input from the user. See `onInput()` below
//
const typeaheadInput$: Subject<string> = new Subject<string>();
typeahead$ = concat<TypeaheadSuggestions[]>(
// Start off with an empty list of results...
of([]),
// ... and concatenate it to a Subject that will receive user input (see `onInput()`) ...
typeaheadInput$.pipe(
// ... do not search until we have at least 3 characters ...
filter((searchTerm) => searchTerm?.length >= 3),
// ... and there has been a 200 ms pause in input ...
debounceTime(200),
// ... and the value has changed since we last saw it ...
distinctUntilChanged(),
// ... start a spinner to show the user we are searching ...
tap(() => (isSpinnerActive = true)),
// ... cancel the last search if it's not done yet ...
switchMap((searchTerm) =>
// Call the service to search (it returns Observable<TypeaheadSuggestions[]>)...
lookupService.getByTerm(searchTerm).pipe(
// ... if there was an error, show no typeahead results ...
catchError(() => of([])),
// ... turn off the spinner ...
tap(() => (isSpinnerActive = false))
// ... when we're here we have either the results or an empty result set from catchError.
)
)
)
);
function onInput(inputTerm: string) {
typeaheadInput$.next(inputTerm);
}
Here I have written a pretty feature-rich typeahead in ~20 lines of code (minus the comments) that are clear to my colleagues or my future self.
I have shown a couple of simple cases where I find RxJS to be a superior solution. While I admit that these are straightforward cases, I continue to find is that even as the complexity of the problem increases, RxJS remains the better choice. This is especially true when paired with other frameworks that use it, such as Angular and NgRx.
Let’s now take a step back and examine what RxJS is.
RxJS is an open-source Typescript library for working in an event-driven, functional programming paradigm.
With an entity called Observable
, RxJS fills a hole that Promise
leaves in the asynchronous problem space, getting more than one value asynchronously (if you’re wondering, yes, Observable
can work with a single value too). I like to think of the values that Observable
produces as an “Array of values over time,” but I am getting a bit ahead of myself. I’ll get into these details, and many more, soon.
RxJS is Event-Driven
If you write modern applications, you undoubtedly work in all kinds of asynchronous situations. A couple of common examples are taking streaming updates from web sockets or capturing user input. As a programmer, you cannot always force these events to happen precisely when you want them. While async capabilities unlock some of the essential features of modern web applications, the issues that they introduce can be some of the more challenging ones to reason about and debug.
In the same way that Promise
makes getting a value or one set of values asynchronously a first-class citizen, Observable
makes working with streaming asynchronous data a first-class citizen. We “hook” our observable to the source of the asynchronous data, declaratively define what to do to the data after it has arrived, and RxJS will handle the rest. There are even some ready-made objects that make this “hooking” trivial, for instance, webSocket()
and fromEvent()
functions.
import { webSocket } from "rxjs/webSocket";
// Create a new Websocket and connect it.
//
const webSocketHandler = webSocket("ws://localhost:8081");
// We get data from the server by registering callbacks in the subscribe call.
//
webSocketHandler.subscribe(
// This will be called each time we get data from the server.
//
(message) => handleWebsocketMessage(message),
// This will be called if there was an error from the server.
//
(error) => handleWebsocketError(error), // Called if at any point WebSocket API signals some kind of error.
// This will be called when the connection closes.
//
() => console.log("complete") // Called when connection is closed (for whatever reason).
);
// We send data to the server by using built in methods, next, complete, error.
//
webSocketHandler.next({ message: "This is a message back to the server." });
// If we would like to gracefully close the connection to the server we use complete.
//
webSocketHandler.complete();
// If we need to close the connection because of an error we can use error.
//
websocketHandler.error({ errorCode: -1, errorMessage: "An error occurred!" });
Above, I have a bidirectional data source in less than 10 lines of code that can feed my application with continuous updates. More important than the simple structure here is that now I can incorporate these updates into my other RxJS code. This is important because RxJS is very composable.
RxJS uses Declarative, Composable Programming
As opposed to procedural languages which use an imperative form, describing control flow and maintaining state, RxJS employs a reactive, functional programming paradigm. This form of programming focuses more on what should be done to solve a problem, and less on exactly how it’s solved.
RxJS primarily does this in two ways. First, it is reactive because typically, code is set up ahead of time and waits to be invoked by an external event. Second, its library of operators (read: functions) can manipulate values in a pipeline as necessary, in a functional manner.
The functional structure of RxJS’ pipe()
method helps me compartmentalize units of work. pipe
will take an input Observable, and pass it into each operator listed as its arguments, each time taking the output of the prior operator, and passing it as the input of the next operator. In this manner, we can create a pipeline of operations to perform on input data. You can focus on one task or transformation at a time, all while keeping the ultimate goal top of mind, as it will be output at the end of the pipe.
Let us take a moment to look at one of the typescript annotations of pipe:
pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>;
In this example, I took the definition of pipe that takes 3 operators. If we look closely, we discover that each operator’s output type is the same as the input type of the next operator and that the ultimate return type is the same as the output type of the last operator. I hope that this helps us understand how the pipe
method works more clearly. It is merely there to help move the observable through the pipeline of operators. We will get into much more detail in a future article when we dissect Observable
.
The pipe
operator’s use was demonstrated more thoroughly in the real-world typeahead example above, so let’s look at a simplified example.
interval(1000).pipe(
tap(console.log), // Outputs 1, 2, 3 and so on, every second.
map((x) => x * 3),
tap(console.log), // Outputs 3, 6, 9 and so on, every second.
filter((x) => x % 2)
tap(console.log), // Outputs 6, 12, 18 and so on, every second.
withLatestFrom(userInput$),
tap(console.log), // Outputs [6, "Hello world!"], [12, "Hello world!"], [18, "World, hello?"] and so on, every second.
switchMap(([x, userInput]) => myService.updateServer(x, userInput))
tap(console.log) // Outputs "{response: 'success'}, {response: 'success'}, {response: 'success'} and so on, every second.
);
First, note that the tap
operator’s job is to allow us to perform side effects. That is to say that it outputs whatever was input, regardless of what we do.
To start our process, we need a simple data source, so we use the interval
operator. interval
creates an Observable
that emits an incrementing integer value every n milliseconds. Then we use the map
operator to transform the data. Many of us will be familiar with the map
function from mapping over arrays. This map
works in the same way. The story is the same with filter
. It’s analogous to filter
from Array
. The next operator, however, has no analog in the array world. The withLatestFrom
operator will take the most recent value from another observable, and return an array, consisting of the input value, in this case our integer, multiplied by 3, filtered to only evens, and the “latest, or most recent” value from the other observable, in this case the string value “Hello world!”, and in the last emission, “World, hello?”. This is to show that the whole process is reactive to external inputs, and between processing these messages the user has changed the input value. Finally, we make an external call to a service with the switchMap
operator. This operator has two jobs. Its primary job is to flatten the result of the service call (because that service returns an observable), and we actually want the value it returns, not the observable itself. switchMap
also has the job of canceling the service call request if it gets a newer value before it is complete.
Drawbacks
While I enjoy using RxJS and am pleased with the succinct, grokable code that I end up with, there are some drawbacks. I will try to be as forthcoming as I can be about them. There’s little in the programming world worse than converting to a new library only to find its shortcomings too late and be stuck in limbo. I’d rather you be able to make a fully informed decision upfront.
Learning curve
The learning curve for RxJS is steep in my experience, but the payoff in async capability is worth it. Mostly because of the functional nature of RxJS, there is a bit of mental friction to just “picking it up and running with it.” In the beginning, RxJS can be challenging to reason about because it’s not what we’re used to, but this can be somewhat mitigated if you have used the functional programming paradigm before.
Another of the big mental hurdles is understanding the so-called “flattening operators.” Flattening operators take higher-order observables (that is an observable of observables) and “flattens” them into a single observable. Common examples include mergeMap()
, switchMap()
and exhaustMap()
. In the beginning, these are hard to wrap our minds around because there is not only the flattening aspect (many of us have flattened arrays), but there is also a time component. When the value arrives, it plays a part in what to do. Do we want to interrupt our current operation when a new value arrives? Or perhaps we want the new value to “wait” until we process the current operation.
Debugging & Testing
Similarly to the above, debugging can be challenging at times. You’ll quickly be tempted to fall back to throwing tap(console.log)
between each of the steps in your pipeline. Understanding the broad array of operators at your disposal and all of their return structures can be a bit overwhelming in the beginning.
Testing has its own DSL (Domain Specific Language). While interesting and novel, it feels very finicky and has some unexpected/unnatural nuance. Even after you are sure that you have expected results correct, tweaking the timing can be a fight.
The good news is that I intend to address all of these drawbacks throughout this series and hope to mitigate them by showing you how I handle them.
Closing
Thanks for spending your time with Rob and we hope you’ve enjoyed this high-level overview of RxJS. Stay tuned for future articles in the series that’ll be much more detail-oriented as we dive into the RxJS code itself, to see what mysteries we can illuminate.