Sign in
Log inSign up
Streaming Redux state as an Observable with RxJS

Streaming Redux state as an Observable with RxJS

Fahad Ibnay Heylaal's photo
Fahad Ibnay Heylaal
·Jan 21, 2018

This article was first published on Medium.

Redux is a great library for managing state in your applications. Combining it with React.js also gives your application a nice structure allowing you to benefit from various other tools built and supported by the community.

I also enjoy RxJS a lot. And the journey of building FrintJS has helped me embrace reactive programming even further. In this post, I will explore how we can stream the state from a Redux store using Observables.

Redux store

Let’s say we have a simple Redux store, that increments and decrements a counter value.

Reducer:

We can start by creating our reducer first:

const INITIAL_STATE = { value: 0 };

function counterReducer(state = INITIAL_STATE, action) {
  switch (action.type) {
    case 'INCREMENT_COUNTER':
      return Object.assign({}, {
        value: state.value + 1
      });
    case 'DECREMENT_COUNTER':
      return Object.assign({}, {
        value: state.value - 1
      });
    default:
      return state;
  }
}

Store:

Now we can create a Store out of it:

import { createStore } from 'redux';

const store = createStore(counterReducer);

Since your store is ready, you can start dispatching actions to it:

store.dispatch({ type: 'INCREMENT_COUNTER' }); // 1 (+1)
store.dispatch({ type: 'INCREMENT_COUNTER' }); // 2 (+1)
store.dispatch({ type: 'DECREMENT_COUNTER' }); // 1 (-1)

Listening to state changes:

You can start listening to your state changes with simple callback:

const unsubscribe = store.subscribe(function () {
  const currentState = store.getState(); // { value: 1 }
});

// cancel listener when you don't need it
unsubscribe();

State as an Observable

Listening to state changes with a simple callback can fit most applications’ needs. But if you are working with Observables already, it would make it easier for you to access the Redux state as a stream, which you can then connect with other Observables as you see fit.

But how can we convert the store to a state$ stream?

It is common convention in the community to end your variable or function name with a $ sign, if it is returning an Observable.

We will first do it the hard way, and then I will show you a lesser known API from Redux that works well with reactive libraries too.

Store’s state as an Observable:

Let’s create a function that accepts Redux store, and returns an Observable of its state.

import { Observable } from 'rxjs/Observable';

function getState$(store) {
  return new Observable(function (observer) {
    // more to follow...
  });
}

const state$ = getState$(store);

const subscription = state$.subscribe(function (state) { 
  console.log(state);
});

Emit on new state changes:

We want the state$ to emit new values as the Redux store changes over time. So let’s add that logic to the function:

function getState$(store) {
  return new Observable(function (observer) {
    const unsubscribe = store.subscribe(function () {
      observer.next(store.getState());
    });
  });
}

What we did above is start listening to the Redux store for changes, and whenever there is any change, we are emitting a new next event with the current state of store.

Emit an initial value:

But we cannot stop just here. Irrespective of when a state change occurs (via dispatching of actions), we want our state$ subscribers to be able to receive an initial value right after their subscription:

function getState$(store) {
  return new Observable(function (observer) {
    observer.next(store.getState());

    const unsubscribe = store.subscribe(function () {
      observer.next(store.getState());
    });
  });
}

Now the subscribers will get an initial value right away, and as more state changes happen, they will keep receiving the new values over time.

Taking care of memory leak

We just need to make one more addition to our function. We have to make sure that as soon as our Observables are unsubscribed, the store listener is also cancelled.

We can do this by returning a function, which will be treated as an unsubscribe callback:

function getState$(store) {
  return new Observable(function (observer) {
    observer.next(store.getState());

    const unsubscribe = store.subscribe(function () {
      observer.next(store.getState());
    });

    return unsubscribe;
  });
}

Unsubscribing in RxJS will be done like this:

const subscription = getState$(store);
subscription.unsubscribe();

Final working function:

Here’s a fully working function with comments, that receives the Redux store as an argument, and returns the state as an Observable:

import { Observable } from 'rxjs/Observable';

function getState$(store) {
  return new Observable(function (observer) {
    // emit the current state as first value:
    observer.next(store.getState());

    const unsubscribe = store.subscribe(function () {
      // emit on every new state changes
      observer.next(store.getState());
    });

    // let's return the function that will be called
    // when the Observable is unsubscribed
    return unsubscribe;
  });
}

Short cut with Observable.from()

While we did everything manually in this post to go through the process of creating an Observable out of Redux store, you can also just use Observable.from():

import { from } from 'rxjs/observable/from';

const state$ = from(store);

Applying RxJS operators on your Redux state

Operators in RxJS will allow to process your state further with ease.

Mapping:

You could get only the counter value (integer) out of your state as a stream:

import { map } from 'rxjs/operators/map';

const state$ = getState$(store);

const counter$ = state$.pipe(
  map(state => state.value)
);

The pipe method is introduced since RxJS v5.5, and you can read further about pipeable (previously called "lettable") operators here.

You can then subscribe to counter$ only:

counter$.subscribe(n => console.log(n));

Filtering:

You can decide which values are emitted with filter. Let’s say, you only want to emit values, if the numbers are even:

import { map } from 'rxjs/operators/map';
import { filter } from 'rxjs/operators/filter';

const state$ = getState$(store);

const evenCounter$ = state$.pipe(
  map(state => state.value),
  filter(n => n % 2 === 0)
);

You can learn about more operators in their documentation here.

Closing thoughts

Redux is awesome, and has helped a big chunk of the JavaScript community to think in a functional way. RxJS is great, and is continuously evolving and helping developers embrace reactive programming with ease.

The two make a great pair. And hope you can benefit from both!

You may also want to check out redux-observable, which has a concept of “Epics”, which allows you to access actions as a stream.

This post was fully focused on state as a stream.