Learn rxjs in an easy way

date
Oct 22, 2022
slug
learn-rxjs-easy-way
status
Published
tags
rxjs
summary
type
Post
Unlike Object-oriented programming (where states and behaviors are coupled together into an object), Reactive programming is a different programming mindset. It focuses on reacting to changes in a pure-function way.
If you have ever used excel you already know what reactive means. In excel, if a cell changes then other cells that are related to this cell also change.
Reactive programming is a programming paradigm that has been implemented in JavaScript through a library called RxJS. Other similar packages include Zen Observable and Bacon.js. This paradigm has also been implemented in other programming languages such as Java with RxJava and Reactor, and in .NET with Rx.NET. For more information on reactive programming implementations in different languages, you can refer to this page (ReactiveX — Languages).
There are two main concepts in rxjs:
  1. Observer: an object that has three key-value pairs. The key is one of error/complete/next, the value is just a function.
2. Observable: a function that acts like a bridge between an observer and data producers
3. Data producers: anything that produces data can be considered a data producer, whether it’s an HTTP request or a simple Math.random() function
It’s important to note that data producers can be either inside or outside of the observable when the data is observed using observables.
  1. If the data producer is inside the observable, then each time the data is observed by a different entity, the data will be different. This type of observable is called a cold observable.
  1. On the other hand, if the data producer is outside of the observable, then each time the data is observed by a different entity, the data will still be the same. This type of observable is called a hot observable.
Before we talk more about rxjs, I would like to list some of the common problems we encounter every day, and then I will show you how rxjs solved those issues with magic operators and observables.
Problem 1: If there is an input box, each time users entered something, it will trigger an api call to get data from the backend. How do you limit the rate of the api call?
Problem 2: In the same scenario, after we implemented the api rate limiting, we could still receive a bunch of api requests. How could we schedule these api calls? Do you want to
  1. perform these calls in parallel but may lose the orders of the API call?
  1. put the calls in a queue and call them one by one while still keeping the original order?
  1. cancel the first api call if it is not finished and immediately call the next one?
  1. call the first one but ignore all the incoming requests until the first api call finishes?
Problem 3: There is one api call that returns some data which is not updating very frequently. How could you cache the api result and how do you invalidate the cache?
Problem 4: How could I share states among components (without react context)?
The code example is here: rxjs-tutorial — CodeSandbox
notion image
rxjs is very popular in angular. But this code example is using rxjs with react. I just want to demonstrate that rxjs works well with UI layers other than angular. There are four pages. Please click the page you are interested in or just follow this simple guide.
The solution for problem 1:
Problem 1: If there is an input box, each time users entered something, it will trigger an api call to get data from the backend. How do you limit the rate of the api call?
In rxjs, everything is represented in a stream and manipulated by composable operators. When we talk about limiting the rate of call while users are typing, we would normally have requirements like this:
  1. after users stop typing for 1.5s, we think that users’ inputs are now stable. It is a good chance to call the api and show suggestions
  1. while users are typing (and did not stop typing), we still want to show suggestions every few seconds for a better user experience
There are three time-related operators that be helpful:
  1. auditTime: do you want to show the last related suggestions if users keep typing (non-stop) within a specified time?
  1. debounceTime: do you want to show suggestions if users stop typing for a few seconds (scenario 1)
  1. throttleTime: do you want to show only the first or last suggestion with a specified time?
rxjs is especially good when dealing with time. In this scenario, we would use auditTime. debounceTime would work only for the first criteria. If users keep typing. There is no chance to emit the last value because there is no last value until users stops for 1.5s. auditTime will enable and disable its internal timer as users constantly emit values. so auditTime can always emit the last available value in 1.5s.
notion image
input-example with auditTime operator
Here is the example code: rxjs-tutorial — CodeSandbox
If you constantly type things in the input box, it emits the last typed values and and calls the api every 1s. The suggestions are changing from one by one.
The solution for problem 2:
Problem 2: In the same scenario, after we implemented the api rate limiting, we could still receive a bunch of api requests. How could we schedule these api calls?
In the above example, you probably already noticed that in addition to auditTime operator. concatMap operator was also used. This is one of the strategy we can use when dealing with multiple http requests. concatMap means that we call these apis in an order. If the first one finishes we call the next one.
There are four strategies when dealing with multiple http calls. These operators are higher order observables. They return observables.
Please check the code example here: rxjs-tutorial — CodeSandbox
  1. mergeMap:perform these calls in parallel but we could potentially lose the orders of the API call:
notion image
mergeMap example
In this mergeMap example, if the button is clicked three times in a short time, the code tries to call three apis in parallel. There user ids are, 48, 88 and 16. But results return in different order. User id, 16, returns first and then 48 and 88.
The use cases could be:
a. delete shopping items in a shopping cart
2. concatMap: put the calls in a queue and call them one by one while still keeping the original order
notion image
concatMap example
In this concatMap example, if we quickly click the button three time, you will find out that the api results are always in order. In the network tab, user id,18, is called and wait for it to be finished. Then user id,151, is called and wait for it to be finished. User id,19, is the last to be called.
3. switchMap: cancel the first api call if it is not finished and immediately call the next one
notion image
switchMap example
In this switchMap example, if we quickly click the button three time, you will find out that the first two apis are called but then are cancelled. Only the last one is called and returns results.
4. exhaustMap: call the first one but ignore all the incoming requests until the first api call finishes
notion image
exhaustMap exam
In this exhaustMap example, if we quickly click the button three times, you will find out that only the first api is called. The rest are ignore. The reason is that the first api call was not finished while we click and fire the last two api calls. It chooses to ignore them.
The solution for problem 3:
Problem 3: There is one api call that returns some data which is not updating very frequently. How could you cache the api result and how do you invalidate the cache?
the shareReplay operator could help us to cache the http results depending on the configuration we passed to it. It uses another operator internally, ReplaySubject, to achieve the cache.
shareReplay is a variant of share. shareReplay accepts a config. The first is how many values we should cache. The second is how long we should cache them for.
Please check the code example here: rxjs-tutorial — CodeSandbox
In the following example, we want to cache the latest value for 3s, which means in the first 3s, any api calls returns the value from cache. After 3s, if the api is called again, it will call the api directly and cache it for another 3s.
notion image
cache http results for 3s
In this example, I clicked the ‘fetch’ button once and immediately clicked it twice in the 3s. After 3s, I clicked the button to fetch new data and clicked it twice after that. From the console and network tab, you should see that there are only two api calls. While the cache is still active, any api calls return values from cache.
The solution for problem 4:
Problem 4: How could I share states among components (without react context)?
The Subject observable allows us to create an observable that passes values to multiple listeners at the same time. It has some variants like BehaviourSubject, ReplaySubject and AsyncSubject. Depending on the requirements, I found that BehaviourSubject is suitable to help us share states among components.
You can think of Subject and its variants as a proxy between data producers to multiple data consumers.
notion image
a graph of Data producer, Subject and Data Consumer
In the context of a Subject in RxJS, data consumers only need to subscribe to the Subject, while data producers only need to send data to the Subject. This means that data producers and data consumers are kept separate from each other.
Example:
notion image
share states using BehaviourSubject
Let’s first check the Asynchronous use case example in here: rxjs-tutorial — CodeSandbox. Each time the ‘load button’ is clicked. It dispatched an action, calls an api and then updates the states. Both the parent and child subscribe to the state.
When we initialize the new instace of the store, the initialState is an BehaviourSubject. Once components start to listen to it, new updates to initialStates will push the values to subscribers.
notion image
It is the ame for Synchronous use case example in here: rxjs-tutorial — CodeSandbox
Components that subscribe to the states just need to dispatch an action. The action will update the state.
notion image

Schedulers:

the order of some operators could be controlled by predefined schedulers. Before talking about schedulers, let’s focus on this simple example and try to find out the order of console.log.
In JavaScript, setTimeout is considered a macro task, while promises are considered micro tasks. When there are both micro and macro tasks in the queue, the micro tasks have a higher priority and are executed before the macro tasks.
Therefore, in the scenario where there is a ‘task 1’, a resolved promise, and a setTimeout function in the queue, ‘task 1’ will be executed first, followed by the resolved promise, and then the setTimeout function.
It is the same in rxjs:
  1. asapScheduler: it is a micro task
  1. asyncScheduler: it is a macro task
  1. queueScheduler: every task is in a queue before micro tasks
  1. animationFrameScheduler: mainly used for css animation
Conclusion:
we talked about:
  1. time-related operators like auditTime and debounceTime
  1. higher order observables: switchMap, concatMap, mergeMap and exhaustMap
  1. caching operator and Observable like: shareReplay and ReplaySubject
  1. multicase observables: Subject, ReplaySubject, and BehaviourSubject
  1. schedulers like asap, async, queue

© ming 2021 - 2025