Have you ever been on a phone call where you were talking for a while only to realize that the call had disconnected and you had been talking to no one. You probably felt somewhat embarrassed or frustrated. Or sometimes you get talking to someone and you realize that they have clearly tuned you out. When that happens to me I like to start talking about some absurd topic like the lifestyles choices of rabid badgers just to see if they are paying attention.

No one likes to waste their breath talking when no-one is listening. Code should be the same way. I don’t want my program to spend time running some calculation or fetching data when it isn’t going to be used.

With RxJs you can create streams of data that can be observed. Different components can subscribe to a stream to be notified when it has new data. In RxJs there are two types of streams: Hot and Cold. Hot streams are always on. They keep spitting out data regardless of whether anyone is listening. You could say that they just like to hear themselves talk. You might know some people like that. Cold streams only turn on when someone subscribes. When everyone has unsubscribed they stop talking.

One of the principles of functional reactive programming, which RxJs facilitates, is that you should use pure functions to perform operations. A pure function is a function where the return value is determined solely based on its inputs, without side-effects. This means that you can run the function as many times as you want with the same inputs and you will get the same output every time. It won’t perform side-effects like logging or changing some internal state. This makes operations predictable, safe, and easy to test.

But sometimes you just need a side-effect. For this RxJs provided the tap operator. This operator accepts the same three callback methods that subscribe accepts (next, error, complete) and lets you execute code when each would be called at the given point in the stream. A big difference between tap and other operators is that it is isolated so that if you do something nefarious and explode it will have no effect on the stream. It also can’t change the data flowing through the stream.

Recently I had the need to observe the state of the subscriptions on the stream. That information isn’t exposed by the tap operator so I created a new operator named tapRefCount. I put “tap” in the name to indicate that it allows worry free side-effects and “refCount” because it is similar to the refCount operator which tracks the number of subscriptions on the stream.

Here is the tapRefCount operator (using RxJs 6):

import { Observable, Observer } from 'rxjs';

/**
 * Enable side-effects to operate based off of the
 * number of subscriptions active on a stream.
 */
export const tapRefCount = (
  onChange: (refCount: number, prevRefCount: number) => void
) => (source: Observable): Observable => {
  let refCount = 0;

  // mute the operator if it has nothing to do
  if (typeof onChange !== 'function') {
    return source;
  }

  // mute errors from side-effects
  const safeOnChange = (refCount, prevRefCount) => {
    try {
      onChange(refCount, prevRefCount);
    } catch (e) {
      console.error(e);
    }
  };
  
  // spy on subscribe
  return Observable.create((observer: Observer) => {
    const subscription = source.subscribe(observer);
    const prevRefCount = refCount;
    refCount++;
    safeOnChange(refCount, prevRefCount);
    
    // spy on unsubscribe
    return () => {
      subscription.unsubscribe();
      const prevRefCount = refCount;
      refCount--;
      safeOnChange(refCount, prevRefCount);
    };
  }) as Observable;
};

The basic idea is that it creates an inner observable for each subscription on the stream so it can increment a counter (refCount) on subscribe. It also wraps the unsubscribe for that observable so that it can decrement the counter. Each time it changes the counter it calls the callback function passed to it. We wrap the callback in a try/catch to avoid killing the stream in the event that the callback throws an error.

Here is a running example of its usage:

Tags:
  1. RxJs
  2. Angular

Erik Murphy

Erik is an agile software developer in Charlotte, NC. He enjoys working full-stack (CSS, JS, C#, SQL) as each layer presents new challenges. His experience involves a variety of applications ranging from developing brochure sites to high-performance streaming applications. He has worked in many domains including military, healthcare, finance, and energy.

Copyright © 2023 Induro, LLC All Rights Reserved.