A Comprehensive Introduction to RxJS

// Classic example of a race condition with promises.

let theData: any = {};

function getDataForId(id: number): Promise {
return new Promise((resolve, reject) => {
https.get(/*http call here*/, (resp) => {
// data fetch impl...
})
});
}

// What if it returns in 5 seconds, ...
getDataForId(123).then(serverData => theData = serverData);

// ... and this returns immediately? ...
getDataForId(456).then(serverData => theData = serverData);

// ...A few moments later...

// ... theData will contain the result from 123, when what we actually wanted the data from 456.
console.log(theData);
// {id: 123, payload: "hello world"}
// Undesired, and possibly unexpected!
// Example of a Race condition avoided with switchMap.

let theData: any = {};

const getDataForId = Subject().pipe(
switchMap(
HttpClient().get(/*http call here*/).pipe(
tap(serverData => theData = serverData)
)
)
);

// Starts, would return in 5 seconds, but is cancelled when...
getDataForId.next(123);

// ... this is called. Because SwitchMap is protecting us. ...
getDataForId.next(456);

// ...A few moments later...

// ... theData will always contain the result of the call to 456.
console.log(theData);
// {id: 456, payload: "goodbye world"}
// The list of suggestions to display. Here I'm using a complex object,
// but this could just as easily been Observable<string[]>.
//
let typeahead$: Observable<TypeaheadSuggestions[]>;
// Display a spinner?
//
let isSpinnerActive = false;
// The input from the user. See `onInput()` below
//
const typeaheadInput$: Subject<string> = new Subject<string>();

typeahead$ = concat<TypeaheadSuggestions[]>(
// Start off with an empty list of results...
of([]),
// ... and concatenate it to a Subject that will receive user input (see `onInput()`) ...
typeaheadInput$.pipe(
// ... do not search until we have at least 3 characters ...
filter((searchTerm) => searchTerm?.length >= 3),
// ... and there has been a 200 ms pause in input ...
debounceTime(200),
// ... and the value has changed since we last saw it ...
distinctUntilChanged(),
// ... start a spinner to show the user we are searching ...
tap(() => (isSpinnerActive = true)),
// ... cancel the last search if it's not done yet ...
switchMap((searchTerm) =>
// Call the service to search (it returns Observable<TypeaheadSuggestions[]>)...
lookupService.getByTerm(searchTerm).pipe(
// ... if there was an error, show no typeahead results ...
catchError(() => of([])),
// ... turn off the spinner ...
tap(() => (isSpinnerActive = false))
// ... when we're here we have either the results or an empty result set from catchError.
)
)
)
);

function onInput(inputTerm: string) {
typeaheadInput$.next(inputTerm);
}
import { webSocket } from "rxjs/webSocket";

// Create a new Websocket and connect it.
//
const webSocketHandler = webSocket("ws://localhost:8081");

// We get data from the server by registering callbacks in the subscribe call.
//
webSocketHandler.subscribe(
// This will be called each time we get data from the server.
//
(message) => handleWebsocketMessage(message),
// This will be called if there was an error from the server.
//
(error) => handleWebsocketError(error), // Called if at any point WebSocket API signals some kind of error.
// This will be called when the connection closes.
//
() => console.log("complete") // Called when connection is closed (for whatever reason).
);

// We send data to the server by using built in methods, next, complete, error.
//
webSocketHandler.next({ message: "This is a message back to the server." });

// If we would like to gracefully close the connection to the server we use complete.
//
webSocketHandler.complete();

// If we need to close the connection because of an error we can use error.
//
websocketHandler.error({ errorCode: -1, errorMessage: "An error occurred!" });
pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>;
interval(1000).pipe(
tap(console.log), // Outputs 1, 2, 3 and so on, every second.
map((x) => x * 3),
tap(console.log), // Outputs 3, 6, 9 and so on, every second.
filter((x) => x % 2)
tap(console.log), // Outputs 6, 12, 18 and so on, every second.
withLatestFrom(userInput$),
tap(console.log), // Outputs [6, "Hello world!"], [12, "Hello world!"], [18, "World, hello?"] and so on, every second.
switchMap(([x, userInput]) => myService.updateServer(x, userInput))
tap(console.log) // Outputs "{response: 'success'}, {response: 'success'}, {response: 'success'} and so on, every second.
);

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store