RxJS by Example: Part 4

John Tucker
codeburst
Published in
5 min readNov 13, 2017

--

We end this series by writing the core functionality of Redux in three lines of RxJS code; A real testimony on RxJS. Wow!

This article is part of a series starting with RxJS by Example: Part 1.

Having been exposed to most of the core RxJS concepts, we explore how to implement a Redux-like solution with RxJS. The main reason is because Reactive Extensions (underlying RxJS) has been written in many other languages, e.g., Java and Swift, and Redux has proven to be a powerful JavaScript state management solution.

note: The remainder of this series assumes that you are already familiar with Redux.

If you are wondering, “Why Redux?”, you might find the following article that I wrote helpful (it also links to an introductory series on Redux that I also wrote).

It turns out that I am not the first person to think of this.

scan

The RxJS scan operator is the secret sauce of this implementation.

Applies an accumulator function over the source Observable, and returns each intermediate result, with an optional seed value.

RxJS Reference

I find these sorts of terse definitions incredibly hard to understand, so here is a simple example of a variable counter. The initial stored value is 0. Each event is a number (v) which is added to the previously stored value (o). The output of the following is:

1
3
6

src/scan.js

/* eslint no-console: "off" */
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/operator/scan';
const mySubject = new Subject();
const myCounter = mySubject
.scan((o, v) => o + v, 0);
myCounter.subscribe({
next: o => console.log(o),
});
mySubject.next(1);
mySubject.next(2);
mySubject.next(3);

reducer

Interestingly enough, the function passed to scan is to have the same signature as a Redux reducer; the first parameter is the current state and the second the action.

In this example, the counter function is lifted from my series on Redux. The output is:

1
2
1

src/reducer.js

/* eslint no-console: "off" */
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/operator/scan';
const reducer = (state = 0, action) => {
switch (action.type) {
case 'INCREMENT':
return state + 1;
case 'DECREMENT':
return state - 1;
default:
return state;
}
};
const myStore = (new Subject())
.scan(reducer, 0);
myStore.subscribe({
next: o => console.log(o),
});
myStore.next({
type: 'INCREMENT',
});
myStore.next({
type: 'INCREMENT',
});
myStore.next({
type: 'DECREMENT',
});

combine-reducers

The problem with our last example is that we need to support more complex states and at the same time we want keep our reducers simple; enter the function combineReducers.

note: I completely borrowed the combineReducers code from Redux.

The output of the following code is:

{ counter1: 1, counter2: 0 }
{ counter1: 2, counter2: 0 }
{ counter1: 1, counter2: 0 }
{ counter1: 1, counter2: 1 }

./src/combine-reducers.js

/* eslint no-console: "off" */
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/operator/scan';
// BORROWED HEAVILY FROM REDUX
const combineReducers = (reducers) => {
const reducerKeys = Object.keys(reducers);
return (state, action) => {
const nextState = {};
let hasChanged = false;
for (let i = 0; i < reducerKeys.length; i += 1) {
const key = reducerKeys[i];
const reducer = reducers[key];
const previousStateForKey = state[key];
const nextStateForKey = reducer(previousStateForKey, action);
nextState[key] = nextStateForKey;
hasChanged = hasChanged || nextStateForKey !== previousStateForKey;
}
return hasChanged ? nextState : state;
};
};
const counter1 = (state = 0, action) => {
switch (action.type) {
case 'C1_INCREMENT':
return state + 1;
case 'C1_DECREMENT':
return state - 1;
default:
return state;
}
};
const counter2 = (state = 0, action) => {
switch (action.type) {
case 'C2_INCREMENT':
return state + 1;
case 'C2_DECREMENT':
return state - 1;
default:
return state;
}
};
const reducer = combineReducers({
counter1,
counter2,
});
const initialState = reducer({}, { TYPE: '_INIT' });
const myStore = (new Subject())
.scan(reducer, initialState);
myStore.subscribe({
next: o => console.log(o),
});
myStore.next({
type: 'C1_INCREMENT',
});
myStore.next({
type: 'C1_INCREMENT',
});
myStore.next({
type: 'C1_DECREMENT',
});
myStore.next({
type: 'C2_INCREMENT',
});

behavior-subject

There is actually two problems with the previous example:

  • First, If I setup a second subscriber I get a completely different re-initialized state; think back to the non-emitter discussion.
  • Second, when I first subscribe to the store, the observer’s next function is not initially called; looking at the earlier output the initial state is never logged.

The first problem reminds me of the work we did leading up to Multicast Observables in the previous article. Turns out that another RxJS feature, BehaviorSubject, solves both problems.

Because it is a Subject, each subscription simply adds itself to the list of callbacks (solves first problem). The bonus is that BehaviorSubject also will trigger the callback during the subscribe as the following example illustrates (bold entries).

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

src/behavior-subject.js

/* eslint no-console: "off" */
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
const mySubject = new BehaviorSubject(0);
mySubject.subscribe({
next: o => console.log(`observerA: ${o}`),
});
mySubject.next(1);
mySubject.next(2);
mySubject.subscribe({
next: o => console.log(`observerB: ${o}`),
});
mySubject.next(3);

redux

Working the BehaviorSubject into our earlier example, we get something that behaves much like Redux. The following code outputs the expected (bold output is generated when Observer is first subscribed:

A
{ counter1: 0, counter2: 0 }

A
{ counter1: 1, counter2: 0 }
B
{ counter1: 1, counter2: 0 }

A
{ counter1: 2, counter2: 0 }
B
{ counter1: 2, counter2: 0 }

src/redux.js

/* eslint no-console: "off" */
import { Subject } from 'rxjs/Subject';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/operator/scan';
...
const initialState = reducer({}, { TYPE: '_INIT' });
const mySubject = (new Subject())
.scan(reducer, initialState);
const myStore = new BehaviorSubject(initialState);
mySubject.subscribe(myStore);
myStore.subscribe({
next: (o) => {
console.log('A');
console.log(o);
},
});
mySubject.next({
type: 'C1_INCREMENT',
});
myStore.subscribe({
next: (o) => {
console.log('B');
console.log(o);
},
});
mySubject.next({
type: 'C1_INCREMENT',
});

One oddity is that I used myStore to subscribe and mySubject to dispatch new events. Could have created a new object to hide the specifics; just did not want to do in this learning context.

Wrap Up

The fact that we were able to recreate the core functionality of Redux in three lines of code illustrates the power of using RxJS (or more specifically the theory of Reactive Extensions).

src/redux.js

...
const mySubject = (new Subject())
.scan(reducer, initialState);
const myStore = new BehaviorSubject(initialState);
mySubject.subscribe(myStore);
...

--

--