Not too long ago I was working on a project where even with careful dependency management a circular dependency arose. A needed B which needed A. Of course its never so clear the distinction, its usually through several other layers of necessities. A needs B needs C needs D which needs A. Oof. This was a major frustration because untangling this dependency chain, in my scenario, was non trivial, and more importantly would break some very logical segmentation and API contracts if I did.
So what was I supposed to do? There are a few ways to handle this, one way is to use a lazy dependency initializer and another is to use a pub sub model. In this post I’ll talk about a strongly typed in memory pub sub.
Pub sub
Pub(lish) and sub(scribe) is a pattern to decouple the necessity of interacting with a components directly. Instead of A calling B directly let B subscribe to events that it needs to react on. This can be really nice when you have lifecycle type things - account created, user deleted, stuff like that. When a user is deleted, you might need to delete recurring jobs, delete accounts, notify someone, etc. A direct call pattern might look like
class Users {
...
async userDeleted(id: UserId): Promise<void> {
await Promise.all([
accounts.delete(id),
jobs.unregister(id),
notifications.notifyUserDeleted(id)
])
}
}
But this has problems in that if accounts
, jobs
, or notifications
needs users
then you have a circular problem.
A simple solution is to use an in memory (or swappable) pubsub abstraction
class Users {
...
async userDeleted(id: UserId): Promise<void> {
await pubsub.notify({ type: 'user-deleted', id: id })
}
}
And let anyone subscribe to this event. Building a simple version this is trivial, register hooks based on type and invoke based on registration. Then processing events is as easy as
pubsub.register('user-deleted', async event => { ... })
Pubsub happens all the time, its just another form of event processing. In fact using a pattern like this can be quite nice because you can even swap out the internals and allow for over the wire queueing as well. When you have a generic PubSub<T> abstraction creating decoupled interactions is quite nice.
Building on this simple idea what I really want is to
Have the compiler validate that registered type callbacks are actually correct, and ensure that the handler of the event is typed
Ensure that we can only notify the right types of events and that event types are properly typed
Lets do some typescript magic.
Type magic
If we have some sample events
interface A {
type: 'a';
data: number;
}
interface B {
type: 'b';
other: number;
}
type E = A | B;
Then to get the set of all types “a” and “b” we can create a conditional type
type Types<Obj> = Obj extends { type: infer Key } ? (Key extends string ? Key : never) : never;
Where if we do Types<E>
should return to use the set of “a” and “b”
We can even make a reverse lookup, so if we know E
and the type of “a” we can resolve the type of A
type ByType<Obj, Key extends string> = Key extends Types<Obj> ? (Obj extends { type: Key } ? Obj : never) : never;
We can now make a class with a signature like this
export class PubSub<T extends { type: Types<T> }, Keys extends string = Types<T>>
Which requires a shape of
{
type: "...",
...
}
For all events, and gives us a generic called Keys
with the values of “a” and “b”
Subscribing
We can build a subscribe method really easily now that allows consumer to register a callback based on the event type
protected callbacks = new Map<Keys, Array<(data: T) => void | Promise<void>>>();
/**
* Subscribe to an event
* @param key the type of event to subscribe to
* @param onEvent A callback with that type of event
*/
subscribe<Key extends Types<T>>(key: Key, onEvent: (data: ByType<T, Key>) => void) {
if (!this.callbacks.has(key)) {
this.callbacks.set(key, []);
}
if (this.callbacks.has(key)) {
this.callbacks.get(key)!.push(onEvent as (data: T) => void);
}
If we look at an example usage
We can see that by providing the key of “a” the callback is automatically typed to the type of A. Very cool.
Publishing
How about publishing?
/**
* Waits on all consumers to finish.
* @param event
* @param onError if set, proxies errors to this and nothing is thrown
*/
async publish(event: T, onError?: (e: unknown) => void): Promise<void> {
await Promise.all(
this.callbacks.get(event.type as Keys)?.map(async callback => {
try {
const result = callback(event);
if (result instanceof Promise) {
if (onError) {
await result.catch(onError);
} else {
await result;
}
}
} catch (e) {
if (onError) {
onError(e);
} else {
throw e;
}
}
}) ?? []
);
}
Publishing all we have to do is given the type of the message, look up the callbacks for that type, invoke their handlers, do some exception handling, and await the result.
I don’t have unsubscribe but it can very easily be added by having subscribe return a function that allows you to unsubscribe which would remove that listener from the callbacks set.
Now we have a fully strongly typed pubsub that type checks the publishers and subscribers!
Pitfalls
Pub sub models are highly flexible and can really help in a lot of situations, however I would warn to not abuse this though as understanding the implications of who does what when can get extremely complicated. If you’ve ever built UI forms that are all async knowing who reacts on a “click” can be mind bogglingly complex.
To make that simpler I would suggest that event registrations are all centralized, have one place in code where you register all your listeners so that you can quickly audit who is reacting to what and when. For example, do this
pubsub.subscribe('a', handler1)
pubsub.subscribe('a', handler2)
...
pubsub.subscribe('a', handler20)
And avoid this temptation
class Handler1
constructor(pubsub: PubSub<E>)
pubsub.register('a', this.handler)
}
}
Because this side effect is not clear from the api contract. You aren’t sure if you are publishing OR subscribing to the pubsub just by the constructor argument list.
In fact, I might even suggest that registration and publication are separated into different interfaces so you can create stronger contracts. This would allow only certain areas to register and only certain areas to publish, which you could use to control access to the eventing pipeline.
As with all things, there’s a lot of options and considerations to be had.
Full source available on my github.