I recently decided to refactor a small platform smoke test framework called Splinter I wrote a while back. Aside from applying lessons learned since instantiation and pulling in the latest Node LTS release, I wanted to cleanup code resembling callback hell by using async/await. It’s practically 2020 I thought, promises aren’t exactly new, so this will be easy!
I soon discovered good intentions line the road to callback hell… Splinter simply executes minimalistic CRUD tests against MongoDB, MySQL, PostgreSQL, RabbitMQ and Redis. The idea is to quickly validate shared services are consumable. Refactoring MongoDB was quick and easy since I’ve done most Node.js projects using Mongo. Moving on to MySQL and PostgreSQL was also a delight, since the upstream client libraries from NPM have evolved over the years to make async/await a first class citizen.
Once I got to RabbitMQ, I hit a bit of a roadblock. My first instinct was to
refactor using similar patterns as prior services, sprinkling async/await pixie
dust throughout the code and instantly reaping magical benefits. As naive as it
sounds, that mostly worked… mostly. I hit a roadblock with channel.publish
and channel.consume which
weren’t willing to let me await
as expected. Who says the machines aren’t in
control?
Stepping back a bit, this all started with the following callback hell (which also contained a number of other code smells we’ll address as we go along):
middleware.testRabbit = (req, res, next) => {
const svc = req.app.locals.conf.rabbitInstance
const cfg = init(req, res, svc)
const q = randName()
// may be called before conn or ch are defined
const cleanup = () => {
const finish = () => {
try {
conn.close()
return next()
} catch (err) {
return next()
}
}
try {
ch.deleteQueue(q, {}, (err, ok) => {
finish()
})
} catch (err) {
finish()
}
}
rabbit.connect(
cfg.creds.uri,
{ noDelay: true },
(err, conn) => {
if (err) {
handleErr(err, cfg, cleanup)
} else {
conn.createChannel((err, ch) => {
if (err) {
handleErr(err, cfg, cleanup)
} else {
ch.assertQueue(q, { exclusive: true, autoDelete: true }, (err, ok) => {
if (err) {
handleErr(err, cfg, cleanup)
} else {
ch.sendToQueue(q, Buffer.from(cfg.time.toString()))
ch.consume(q, msg => {
cfg.results.seconds_elapsed = (Date.now() - Number(msg.content.toString())) / 1000
req.app.locals.testResults[svc] = cfg.results
ch.ackAll()
cleanup()
})
}
})
}
})
}
})
}
Pardon the first-pass code from a just-get-it-working hacking session now a couple years old… but even setting aside obvious smells, the never ending indentation makes my eyes gloss over. Worse, I wanted to make this test case (which simply used the default exchange) more realistic, adding even more callbacks!
How do we improve this? As stated above, a goal was refactoring these tests to use async/await. Let’s take a naive first attempt at that by simply replacing nested callbacks with async/await goodness wrapped in try/catch:
const testRabbit = async instance => {
const testState = init(instance)
const exchange = uuid().slice(0, 8)
const queue = uuid().slice(-8)
const key = 'test'
const { connection, channel } = await dbConnect(getCreds(instance))
try {
await channel.assertExchange(exchange, 'direct', { autoDelete: true })
await channel.assertQueue(queue, { exclusive: true, autoDelete: true })
await channel.bindQueue(queue, exchange, key)
await channel.publish(exchange, key, Buffer.from(testState.time.toString()))
const time = await channel.consume(queue)
if (time) {
testState.results.secondsElapsed = (Date.now() - Number(time.toString())) / 1000
}
} catch (error) {
console.log(`ERROR - ${error.stack}`)
testState.results.message = error.message
} finally {
if (channel) {
await channel.unbindQueue(queue, exchange, key)
await channel.deleteQueue(queue)
await channel.deleteExchange(exchange)
await channel.close()
}
if (connection) await connection.close()
}
return testState.results
}
Better, if it worked…To keep this article from being even longer, I combined several steps… I don’t want to get too far into the weeds, but to follow along note that we’ve:
Even with more functionality, the code is a lot easier to read.
There is one problem: We can’t actually await
the calls
to publish and consume. Even with amqplib
’s promise-friendly version (simply
doing require('amqplib')
vs require('amqplib/callback_api')
), support is not
complete as expected. Perhaps by design, I’m a lowly DevOps zealot that writes
code as a hobby vs a RabbitMQ expert. It feels like I am missing something, or
perhaps they don’t want to tightly couple publishers and consumers even though
it’s convenient for my use case.
“When you’re going through hell, keep on going.” —Winston Churchill
One answer might be simply finding an alternate AMQP client library. Since
amqplib
is the most populuar option on NPM
and used in the official tutorial
(albeit with callbacks), I did a bit of research and decided to artistically borrow (read:
steal) ideas found while Google Engineering…
Specifically, I wrapped the problematic functions in promises, forcing them
to do my bidding:
// promisfy message publishing
const publishMessage = ({ channel, exchange, key, data }) => {
return new Promise((resolve, reject) => {
try {
channel.publish(exchange, key, Buffer.from(data.toString()))
resolve()
} catch (error) {
reject(error)
}
})
}
// promisfy message consumption
const consumeMessage = ({ connection, channel, queue }) => {
return new Promise((resolve, reject) => {
channel.consume(queue, async message => {
// https://github.com/squaremo/amqp.node/issues/176
if (message) {
const time = Number(message.content.toString())
await channel.ack(message)
resolve(time)
}
})
// handle connection closed
connection.on('close', error => {
return reject(error)
})
// handle errors
connection.on('error', error => {
return reject(error)
})
})
}
const testRabbit = async instance => {
const testState = init(instance)
const exchange = uuid().slice(0, 8)
const queue = uuid().slice(-8)
const key = 'test'
const { connection, channel } = await dbConnect(getCreds(instance))
try {
await channel.assertExchange(exchange, 'direct', { autoDelete: true })
await channel.assertQueue(queue, { exclusive: true, autoDelete: true })
await channel.bindQueue(queue, exchange, key)
await publishMessage({ channel, exchange, key, data: testState.time })
const time = await consumeMessage({ connection, channel, queue })
if (time) {
testState.results.secondsElapsed = (Date.now() - time) / 1000
}
} catch (error) {
console.log(`ERROR - ${error.stack}`)
testState.results.message = error.message
} finally {
if (channel) {
await channel.unbindQueue(queue, exchange, key)
await channel.deleteQueue(queue)
await channel.deleteExchange(exchange)
await channel.close()
}
if (connection) await connection.close()
}
return testState.results
}
This worked! I did a late-night dance around the living room, and patted myself on the back for being so good at stealing (read: adopting) the work of others. There are at least two problems here. First, it’s ugly. In all the other tests I refactored, moving to async/await not only reduced indentation but also lines of code. This time we’ve added complexity with our helper functions, which sent me scurrying back to Google. To my surprise, I found simply rolling your own promises is a common anti-pattern.
It seemed clear after more reading that I needed a better solution if I wanted to fit in with the cool kids that really know how to code. Luckily, the same site discussing the anti-pattern also built a solution (which has >18k stars on GitHub and supportive if not aged Stack Overflow threads.
Given my goal of building a lightweight test framework vs something more complex
where Bluebird’s advanced features
might be an obvious win, I had
concerns over using a sledgehammer to drive a tack nail… Since my refactoring
effort included moving to Node 12.13 (LTS as of writing) and native promises
have seen a lot of improvements, I considered giving util.promisfy
a chance. After
thinking that through, I would end up with an extra import (although of minimal
native code), promisfy
wrappers, and some edge cases (not all functions support
error-first callbacks). For those reasons, I decided to leave well enough
alone… for now.
What would you have done here? Is there an obviously better way to approach
this, or simply different options depending on context? Would reverting to the
callback_api
and promisfying
everything have been a cleaner route? Is the fact
I’m even thinking about this for such a simple use case proof that premature
optimization really is the root of all evil?