Skip to main content

ยท 5 min read
Lucas Weis Polesello

Some weeks ago, we had a incident that was caused mainly due to how we delay job execution. One queue had abnormal behavior during the weekends, which our monitoring systems caught, but we were expecting system to be self-healing and be able to kick through this small hiccup. Tuesday we noticed something was off with our integration systems - where a code that was running for more than a year stopped working at the same time we rolled it for a new big customer.

I got involved early in the incident due to how often I touch that component of the backend and we initiated investigation so we could get back to the customer as fast as possible with some good news.

After some hours of investigation we ended up concluding that our long polling mechanism had a hot-loop on a certain queue - which in turn was always retrying delays. The first time this edge case was triggered happened during the weekends where we saw the previous abnormal behavior. Due to always re-delaying jobs, the amount of postponed jobs to this queue would never actually drain and thus this component would eternally get stuck while trying to drain postponed jobs - delaying all the other queues from the platform.

In a quick 1/2 hour solution we shipped code to stop retrying jobs after more than N retries and unblocking the hot loop.

What about the future?โ€‹

All that to say what? What is has to do with Redis?

As described the previous post from this series - the job delaying mechanism lives within Redis where we use a Sorted Set to pull the next job. We fanout to a certain amount in parallel - but we only restart this processing after all possible queue targets were drained up until some upper bound timestamp (being the score in a SortedSet).

With this fragility in mind and the upcoming code-freeze of December - I suggested to rewrite this design completely.

Current Designโ€‹

Whenever a service requests an postpone - which can be either intentionally - calling the method to postpone it OR unintentionally - due to some IO failure, protocol error or some ephemeral channel churn - the job flows through this DelayedJobsStorage design.

The current production design is a simple Sorted Set from Redis where timestamps are the score and key being the job serialized into string.

On the infrastructure side, we use a cache.r7g.12xlarge ElastiCache (~300GB ๐Ÿšจ) instance to sustain our load SPIKES - like some instabilities or big customers data.

In the other end, there's a separate backend component Scheduler that is responsible for triggering individual jobs at certain interval. This component triggers a service that runs a long-polling logic until the current timestamp.

delayed-jobs-old-design delayed-jobs-old-design

New Designโ€‹

I've created some abstractions to make it easier to implement new storage backends like S3, VFS and many more. Redis is now running as a mainly as reference-to-data component and only stores entire jobs if they are within the next ~2 seconds or some failure happens.

The storage rationale happens with a couple considerations:

  • How long are we planning to postpone?
  • How big (byte size) is this job?
  • Is this storage backend rolled out for certain queue OR tenant _id?
  • Is this storage available? If not, fall back to Redis

We also changed the core logic behind the backend service that re-enqueues delayed jobs:

  • Removed the coupling with Scheduler:
    • Instead of waiting for some schedule job trigger the backend service, it triggers itself given a certain interval.
  • Circuit Breaking:
    • Jobs keeps being re-enqueued up to a certain timeout. Preventing one cycle of starving resources OR locking entire service in some hot-loop.
  • Concurrent Processing with Semaphores:
    • We would always have up to N concurrent processing target queues - decoupled from other executions.
    • Speeds up draining time.
  • Better visibility:
    • We implemented proper monitoring such as latencies for each re-enqueue, amount of re-enqueues and failure/successful re-enqueues.
  • *Horizontally Scale by Controlling the ZPOPMIN:
    • Pop the least score up to an upper-bound timestamp.
    • This enables us to have multiple instances running which we were unable in the past due to the Singleton Design we had.

new-design new-design

redis-is-more-than-cache-data-reference-new-design-read_ redis-is-more-than-cache-data-reference-new-design-read_

Exampleโ€‹

Lets use for example - storing a job for 10minutes that has a medium size due to the context it needs to carry. We would store the job data into S3 then insert a key into Redis formatted as s3::some-queue::384192393192::<some-generated-uui> When trying to re-enqueue the job - the abstraction easily understands the protocol as being s3. This abstraction then uses the implementation of s3 which knows where to fetch a certain job by this uuid.

What to expect from this Designโ€‹

  • Infrastructure downsizing: From 300GB to >10GB
    • Currently we have 4million of keys stored consuming up to 200GB of RAM
    • In the new design this would be ~64mb - if we calculate uuid being 128bits and omit the other small appended data.
    • Roughly we are expecting to downsize to 10GB - since brief postpones can still occurs and overload it. Experimentation will lead us to decide the proper size.
  • Independency: Decouple backend components preventing service A stopping service B of working properly
  • Speed: N replicas will consume queues much faster in some abnormal load
  • Horizontal Scalability/Reliability: Multiple Consuming replicas.
  • Tail-Latency Aware: Latency-Sensitive components will be decoupled such as WebSocket notifications, Chat Messages and all real time communication - heavily used in the platform

Concernsโ€‹

  • RPS - How many jobs are being delayed per sec and their latencies? p75, p90, p99
  • S3 speed for large objects -> Will it impact our re-enqueueing speed?
  • Proper Redis sizing -> How far can we push lower?
  • Redis is still a single point of failure -> What if Redis just dies?

ยท 4 min read
Lucas Weis Polesello

My current company - Luma Health Inc - has an Event-Driven Architecture where all of our backend systems interact via async messaging/jobs. Thus our backbone is sustained by an AMQP broker - RabbitMQ - which routes the jobs to interested services.

Since our jobs are very critical - we cannot support failures AND should design to make the system more resilient because well..we don't want a patient not being notified of their appointment, appointments not being created when they should, patients showing off into facilities where they were never notified the patient had something scheduled.

Besides the infra and product reliability - some use cases could need postponing - maybe reaching out to an external system who's offline/or not responding. Maybe some error which needs a retry - who knows?

The fact is, delaying/retry is a very frequent requirement into Event Driven Architectures. With this a service responsible for doing it was created - and it worked fine.

But - as the company sold bigger contracts and grew up in scale - this system was almost stressed out and not reliable.

The Unreliable Designโ€‹

Before giving the symptoms, let's talk about the organism itself - the service old design.

The design was really straightforward - if our service handlers asked for a postpone OR we failed to send the message to RabbitMQ - we just insert the JSON object from the Job into a Redis Sorted Set and using the Score as the timestamp which it was meant to be retried/published again.

To publish back into RabbitMQ the postponed messages, a job would be triggered each 5 seconds - doing the following:

  1. Read from a set key containing all the existing sorted set keys - basically the queue name
  2. Fetch jobs using zrangebyscore from 0 to current timestamp BUT limit to 5K jobs.
  3. Publish the job to RabbitMQ and remove it from sorted set

The Issuesโ€‹

This solution actually scaled up until 1-2 years ago when we started having issues with it - the main one's being:

  1. It could not catch up to a huge backlog of delayed messages
  2. It would eventually OOM or SPIKE up to 40GB of memory
    1. Due to things being fetched into memory AND some instability OR even some internal logic - we could end up shoveling too much data into Redis - the service just died ๐Ÿ’€
  3. We could not scale horizontally - due to consuming and fetching objects into memory before deleting them.

The Solutionโ€‹

The solution was very simple: we implemented something that I liked to call streaming approach

Using the same data structure, we are now:

  1. Running a zcount from 0 to current timestamp
    • Counting the amount of Jobs -> returning N
  2. Creating an Async Iterator for N times - that used the zpopmin method from Redis
    • zpopmin basically returns AND removes the least score object - ie most recent timestamp

The processor for the SortedSet process-delayed-jobs-code

The Async Iterator job-scan-iterator

And that's all!

This simple algorithm change annihilated the need for:

  1. Big In Memory fetches - makes our memory allocation big
  2. Limit of 5K in fetches - makes our throughput lower

Results - which I think the screenshots can speak for themselvesโ€‹

We processed the entire backlog of 40GB of pending jobs pretty quickly aws-memory-consumption

From a constant usage of ~8GB - we dropped down to ~200MB memory-delayed-jobs We are now - trying to be play safe and still oversize - safely allocating 1/4 of the resources. The git diff was our first resource dump - we went even further.

git-diff-memory-delayed-jobs

Money-wise: We are talking at least of 1K USD/month AND more in the future if we can lower our ElastiCache instance.

Take Away Pointsโ€‹

  • Redis is a Distributed DataStructure database - more than just a simply KeyValue pair storage.
  • You can achieve great designs using Redis
  • Be careful because the way you design a solution with Redis may be costly in the future

Final Thoughtsโ€‹

There could be a lot of discussion wether this is a great way of doing jobs postponing, if Redis is the right storage, if we should really postpone jobs for small network hiccups, shouldn't we leverage DelayedExchange from Rabbit? - etc... But at the end of the day - to succeed as a company we need to solve problems in our daily routine. Some problems are worth, some are not. It's always - one step at a time.

ยท 4 min read
Lucas Weis Polesello

Zalgo Effect is an term used to describe unexpected outcomes of mixing sync and async javascript code.

It means - if you mix these two approaches SOMETHING weird will happen.

It's one of those things you kinda don't understand until you see it in real production systems.

So what it has to do with Resource Leakage?

One day, our SRE team received a couple PagerDuty alerts claiming our services were restarting and not able to work properly due to Error: No channels left to allocate - ie RabbitMQ connections were maxing out in channel allocation. (For RabbitMQ reference into Channels and Connections)

It was clear some code was leaking channel creations. No one knew what could potentially be - but God I had studied this Zalgo Effect in NodeJS Design Patterns Book and it clicked me something.

How was I so sure the Zalgo was the culprit?

The service that was throwing that error was only responsible for fan out a couple messages to a lot of other services - so it was easy as creating a Queue object and running N promises concurrently to publish some message. Checking the RabbitMQ Management UI showed me that we created N channels for that connection.

But why it only happened in some scenarios?

That's where the Zalgo Effect pops in.

Our code was built back in ~2015 - Node 4. The callback style was the mainstream. Our Engineers created the abstraction Queue which dealt with almost 50% of our Event-Driven Architecture by itself and had to make the class style w/ async initializations - not so easily with callbacks.

So the code assumed the following:

  1. Assert exchange, queues and necessary resources - via something we could call consumeChannel.
    1. The consume channel is created whenever the connection is made.
  2. Our confirmChannel - ie the channel we used to publish events was lazily created - mixing async and sync code.

So the problem lives in 2).

Imagine the following:

  • We assertConfirmChannelold-assert-confirmold-get-instance
  • It check's whether the channel EXISTS or NOT.
  • If not, create via PROMISE and return control to EventLoop
  • If does, return it

What happens, if the two concurrent promises reaches the same if without the first promise resolving? We try to create the channel two times and override them - thus keeping channels open but just using only the last one.

This is where the code was leaking channels.

Fixing the problemโ€‹

Well, the fix we actually shipped was simply calling 1 Promise and await it and then fan out the other promises.

But we made it simple due to risks and since the code is being refactored into a new style.

How can I fix If I see something like that?

If you want a real solution, here's what the V2 would look like - the idea is to create Promises and assign variables with them, instead of doing await on it. Example as below:

assert-zalgo

This easily fixes the problem - by setting a variable as promise and checking its existence.

A more robust style, where you actually need to initialize a couple of resources, you could do something like below

get-or-create-client-print

  1. Create a function to execute the entire Promise.
  2. Set up some reference to it
  3. If requested the same, just use the same Promise.

Ok - but why it fixes the problem?

The idea is to make sure - we are running things in a sync manner and just making the promises settled on their timing. We need to think about the synchronous code execution block to reason about our promises usage.

ยท 2 min read
Lucas Weis Polesello

One of the main challenges when dealing w/ the async nature of NodeJS is initializing classes/clients that requires some sort of side effect - such as database connection, disk reads or whatsoever. Even the simple idea of waiting for the first use-case to connect/initialize a resource.

Besides Dependency Injection - I like to use two approaches for this:

1) Leaving it up to the client to call connect or any other synonym - easy as creating an async function as the example below

const redis = require('redis');
const crypto = require('crypto');
//PROS: Damn easy, simple and straight-forward

//CONS: This leaves the entire responsibility to the client
class DistributedDataStructure {
constructor(){
this.client = redis.createClient();
}

async connect(){
return this.client.connect();
}

async add(staffName, reviewId){
//Do some business here - idk,
const accountName = await this.client.get(key);
return this.client.sAdd(`v1:${accountName}:pending-reviews`, reviewId);
}
}

(async () => {
const ds = new DistributedDataStructure();
await ds.connect();
ds.add('Jerome', crypto.randomBytes(12).toString('hex'));
})()

2) Proxying the access

In the real and wild-world we know that we have to deal w/ legacy code, legacy initialization methods and much more unexpected stuff - for this we have a second use-case which leverages the (https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Proxy])[Proxy API for JS]

Using Proxy it would look poorly-like

const redis = require('redis');
const { once } = require('events');
const crypto = require('crypto');

//PROS: No client responsibility - makes it easy for the client
//CONS: More complex and error prone
class ProxiedDistributedDataStructure {
constructor(){
this.client = redis.createClient();
this.client.connect();
return new Proxy(this, {
get(target, property){
const descriptor = target[property];
if(!descriptor){
return;
}
if(target.isReady){
return descriptor;
}
return async function(){
await once(target.client, 'ready');
return descriptor.apply(target, arguments);
}
}
});
}

async add(staffName, reviewId){
//Do some business here - idk - like below
const accountName = await this.client.get(staffName);
return this.client.sAdd(`v1:${accountName}:pending-reviews`, reviewId);
}
}

const client = new ProxiedDistributedDataStructure();
client.add('Jerome', crypto.randomBytes(12).toString('hex'));

The main benefit for the second approach is that we can instantiate the objects in sync contexts and only treat the method calls as async - instead of needing to play around some dirty gimmicks to call connect and chain promises - even worse, callbackifying.

NOTES: AFAIC from Redis V3^ we have an option legacyMode whenever creating the client which we can keep this lazy nature of Redis - doing client buffering of calls.