🚀 Executive Summary
TL;DR: MQTT, a pub/sub protocol, struggles with request/response (RPC) patterns, leading to complex, brittle code for correlation and timeouts. MQTT+ is an open-source TypeScript API for MQTT.js that provides a robust abstraction layer, simplifying RPC and streaming with higher-level communication patterns.
🎯 Key Takeaways
- MQTT is fundamentally a publish-subscribe protocol, making direct implementation of request/response (RPC) patterns complex and error-prone.
- Manually implementing RPC over MQTT using correlation IDs and unique response topics leads to significant technical debt, requiring developers to manage subscriptions, timeouts, and error handling.
- MQTT+ provides a purpose-built abstraction layer over MQTT.js, offering a clean `async/await` API for RPC and streaming, automatically handling underlying complexities like topic schemes and correlation IDs.
- For architectures heavily reliant on RPC, consider dedicated RPC protocols like gRPC/Twirp or messaging systems with built-in request/reply like NATS, or use MQTT+ for hybrid systems.
MQTT is a fantastic pub/sub protocol, but forcing it to handle request/response patterns is a recipe for disaster. Let’s explore why you need a proper abstraction layer like MQTT+ for RPC and streams in your IoT stack.
Beyond Pub/Sub: Bringing Sanity (and RPC) to Your MQTT Architecture
I remember one late night, staring at a Grafana dashboard that was just a sea of red. We had a fleet of thousands of IoT gateways that needed to pull down new configurations. The brilliant idea was to use MQTT. Simple, right? The device publishes to gateways/req/config/{gatewayId} and our backend would reply on gateways/res/config/{gatewayId}. What followed was a multi-week nightmare of untraceable timeouts, duplicate requests from flaky network connections, and a rat’s nest of code trying to match a response to the original request using a `correlationId` buried in the JSON payload. We were trying to hammer in a screw, and it was a mess. That’s the exact pain a library like MQTT+ is built to solve.
The “Why”: You’re Fighting The Protocol
Let’s be clear: MQTT is fundamentally a publish-subscribe protocol. It’s designed for decoupled, fire-and-forget messaging. A publisher fires a message to a topic, and it has no idea if zero, one, or a thousand subscribers received it. This is a feature, not a bug! It’s what makes it so resilient and scalable for telemetry and event-driven systems.
The problem arises when you need a direct request/response interaction, also known as Remote Procedure Call (RPC). You send a command and you need a specific answer from a specific service, and you need it now. When you try to force this pattern onto a pub/sub system, you end up manually rebuilding all the things a real RPC framework gives you for free: request/response correlation, timeouts, error handling, and discovery. You’re fighting against the very nature of the protocol, and that’s a battle you’ll eventually lose in complexity.
The Fixes: From Duct Tape to a Proper Toolkit
So, you’re stuck. You need RPC over MQTT. Here’s how we’ve tackled this on my teams, from the quick-and-dirty to the architecturally sound.
Solution 1: The “Roll-Your-Own” RPC Hack
This is the path most people stumble down first. It’s the manual, duct-tape approach I mentioned from my war story. You invent a topic scheme and use a correlation ID to stitch things together.
The Pattern:
- The client generates a unique ID (e.g., a UUID) and a unique response topic.
- The client subscribes to its unique response topic:
responses/my-client/123-abc. - The client publishes a message to a generic request topic like
rpc/get-device-status. The payload includes the response topic and the correlation ID. - The server, listening on
rpc/get-device-status, picks up the message, does its work, and publishes the result to the response topic specified in the payload. - The client receives the message, checks the correlation ID, and continues its work.
// --- Client Side (The Hacky Way) ---
const correlationId = 'unique-id-12345';
const responseTopic = `responses/client-abc/${correlationId}`;
// Listen for the specific response
mqttClient.subscribe(responseTopic);
mqttClient.on('message', (topic, payload) => {
if (topic === responseTopic) {
console.log('Got my response!', JSON.parse(payload));
// Don't forget to unsubscribe and handle timeouts!
mqttClient.unsubscribe(responseTopic);
}
});
// Send the request
mqttClient.publish('requests/get-status', JSON.stringify({
correlationId: correlationId,
replyTo: responseTopic,
params: { deviceId: 'thermostat-99' }
}));
Warning: This “works,” but it’s incredibly brittle. You are now responsible for managing thousands of unique subscriptions, handling timeouts if a response never arrives, and cleaning everything up. It’s a massive source of bugs and technical debt. Use it only if you have a single, critical use case and no time for a better solution.
Solution 2: The Abstraction Layer (Enter MQTT+)
This is the permanent fix. Instead of reinventing the wheel, you use a library that’s purpose-built to solve this exact problem. A tool like MQTT+ sits on top of the excellent MQTT.js library and provides a clean, modern `async/await` API for RPC and even streaming responses. It manages the ugly topic schemes, correlation IDs, and timeouts for you, so you can focus on your application logic.
The Pattern (with MQTT+):
The library handles all the underlying complexity. You just define a procedure and call it.
// --- Server Side (The Clean Way) ---
// Define an RPC endpoint
mqttPlus.rpc.handle('get-device-status', async (payload) => {
const { deviceId } = payload;
const status = await db.getStatusFor(deviceId);
// Just return the result. The library handles the rest.
return { status: status, ts: Date.now() };
});
// --- Client Side (The Clean Way) ---
// Just call the remote procedure!
async function checkStatus() {
try {
const response = await mqttPlus.rpc.call('get-device-status', {
deviceId: 'thermostat-99'
});
console.log('Server responded with:', response);
// { status: 'online', ts: 167... }
} catch (error) {
console.error('RPC call failed:', error.message);
}
}
Look at that. It’s clean, it’s testable, and it reads like any other modern asynchronous code. This is the approach a senior engineer should advocate for. You’re not fighting the tool; you’re using a higher-level tool that leverages the underlying protocol correctly.
Solution 3: The “Wrong Tool” Reality Check
Sometimes, the right move is to take a step back and ask: are we using the right tool for the job? If your entire application is 90% RPC and only 10% pub/sub, you might be forcing MQTT into a role it was never designed for. In this scenario, the “nuclear option” is to use a different primary protocol.
Pro Tip: Don’t be a technology zealot. A good architect knows when to pivot. Being married to a tool, even a great one like MQTT, can be a huge mistake if the application’s requirements have changed.
Here’s a quick breakdown of when to consider alternatives:
| Protocol / Stack | Best For… | Use When… |
|---|---|---|
| MQTT | Decoupled Pub/Sub, Telemetry, Event Notifications | You need high resilience, massive fan-out, and can tolerate eventual consistency. IoT sensor data is the classic use case. |
| MQTT + MQTT+ | Hybrid systems; mostly Pub/Sub with some required RPC | Your architecture is event-driven at its core, but you need to add command-and-control, device configuration, or other request/response features. |
| gRPC / Twirp | High-performance, contract-first, internal service-to-service RPC | You need low-latency, strongly-typed APIs between microservices and performance is critical. You’re building a distributed backend, not necessarily an IoT event bus. |
| NATS | High-performance messaging with built-in Request/Reply | You want a single, simpler solution for both pub/sub and performant RPC without the complexity of gRPC’s Protobuf definitions. It’s a strong MQTT alternative. |
Choosing this option requires courage and honesty. It often means admitting an early architectural choice was wrong. But making that hard call will save you from the “late night dashboard of red” I experienced. Use MQTT for what it’s amazing at, and when you need to extend its capabilities, use a proper abstraction like MQTT+ to do it right.
🤖 Frequently Asked Questions
âť“ Why is implementing RPC directly over MQTT problematic?
Implementing RPC directly over MQTT forces developers to manually manage request/response correlation, unique response topics, timeouts, and error handling, which are complexities inherent to a pub/sub protocol, leading to brittle and bug-prone systems.
âť“ How does MQTT+ compare to other RPC solutions or protocols?
MQTT+ extends MQTT.js for hybrid systems needing some RPC alongside pub/sub, simplifying command-and-control. For pure, high-performance RPC, gRPC/Twirp offer contract-first, low-latency solutions, while NATS provides a simpler, built-in request/reply mechanism as a strong MQTT alternative.
âť“ What is a common implementation pitfall when trying to implement RPC with MQTT, and how does MQTT+ address it?
A common pitfall is manually managing unique response topics and correlation IDs for each request, leading to subscription management overhead and unhandled timeouts. MQTT+ addresses this by providing a higher-level `async/await` API that automatically handles topic schemes, correlation, and timeouts.
Leave a Reply