MQTT Topic Conventions - Developer Guide¶
Overview¶
This document provides comprehensive MQTT topic conventions for ABS Platform developers. It consolidates and standardizes all messaging patterns used across the platform, ensuring consistency with the existing EMQX deployment.
Quick Reference¶
Core Message Types¶
| Type | Purpose | Pattern | Example |
|---|---|---|---|
dt |
IoT Data Transfer | dt/{dom}/{app}/{rt}/{fl}/{it} |
dt/arm/bms/ph-mnl1/F7/DEV42 |
cmd |
IoT Commands | cmd/{dom}/{app}/{rt}/{fl}/{it} |
cmd/arm/bms/ph-mnl1/F7/DEV42 |
emit |
Service Events | emit/{dom}/{svc}/{res}/{id}/{evt} |
emit/abs/service/plan/P123/request |
echo |
Response Confirmation | echo/{dom}/{svc}/{res}/{id}/{evt} |
echo/abs/service/plan/P123/confirmed |
call |
RPC Commands | call/{dom}/{svc}/{res}/{id}/{verb} |
call/abs/payment/plan/P123/process |
meta |
Schema/Config | meta/{dom}/{app}/{rt}/{fl}/{it} |
meta/arm/bms/ph-mnl1/F7/DEV42 |
1. IoT Device Communication¶
1.1 Topic Structure¶
dt/{domain}/{app}/{region}/{fleet}/{item} # Data transfer
cmd/{domain}/{app}/{region}/{fleet}/{item} # Commands
meta/{domain}/{app}/{region}/{fleet}/{item} # Metadata (retained)
state/{domain}/{app}/{region}/{fleet}/{item} # State shadows (optional)
1.2 Component Definitions¶
| Component | Description | Examples | Notes |
|---|---|---|---|
domain |
Business domain | arm, abs, ems |
Asset Relations Management, ABS Platform, Energy Management |
app |
Firmware/app class | bms, ssc, gps |
Battery Management System, Swap Station Controller, GPS tracker |
region |
Geographic/network region | ph-mnl1, us-west1 |
Philippines Manila 1, US West 1 |
fleet |
Fleet identifier | F7, FLEET_A |
Short, fixed-length preferred |
item |
Device/asset ID | DEV42, BAT001 |
Unique within fleet, short preferred |
1.3 IoT Payload Structure¶
Data Transfer (dt/) Messages:
{
"sid": "bms.v1", // Schema identifier
"ts": 1730182103, // Unix timestamp (seconds)
"seq": 412, // Monotonic sequence number
"d": { // Data payload
"v": 51.72, // Voltage
"t": 32.1, // Temperature
"soc": 78 // State of charge
}
}
Command (cmd/) Messages:
{
"sid": "cmd.v1", // Command schema
"ts": 1730182103, // Timestamp
"req": "lock", // Command verb
"id": "c-9b7f", // Correlation ID
"arg": {"hold_s": 30} // Command arguments
}
Metadata (meta/) Messages (Retained):
{
"schema_id": "bms.v1",
"rev": 3,
"fields": {
"v": {"unit": "V", "desc": "Pack voltage", "type": "f32", "min": 0, "max": 120},
"t": {"unit": "C", "desc": "Pack temperature", "type": "f32", "min": -40, "max": 85},
"soc": {"unit": "%", "desc": "State of Charge", "type": "u8", "min": 0, "max": 100}
},
"wire": ["json", "cbor"]
}
1.4 IoT Examples¶
# Battery pack telemetry
Topic: dt/arm/bms/ph-mnl1/F7/BAT001
Payload: {"sid":"bms.v1","ts":1730182103,"seq":412,"d":{"v":51.72,"t":32.1,"soc":78}}
# Swap station command
Topic: cmd/arm/ssc/ph-mnl1/F7/STN001
Payload: {"sid":"cmd.v1","ts":1730182103,"req":"unlock","id":"c-9b7f","arg":{"door_id":1}}
# Schema definition (retained)
Topic: meta/arm/bms/ph-mnl1/F7/BAT001
Payload: {"schema_id":"bms.v1","rev":3,"fields":{...}}
2. Service-to-Service Communication¶
2.1 Topic Structure¶
emit/{domain}/{service}/{resource}/{id}/{event} # Events/requests
echo/{domain}/{service}/{resource}/{id}/{event} # Confirmations/responses
call/{domain}/{service}/{resource}/{id}/{verb} # RPC calls
2.2 Component Definitions¶
| Component | Description | Examples | Notes |
|---|---|---|---|
domain |
Business domain | abs, arm, odoo |
Platform/service area |
service |
Service name | payment, service, asset |
Business capability |
resource |
Resource type | plan, invoice, battery |
Entity being operated on |
id |
Resource identifier | P123, INV-456, BAT789 |
Specific instance |
event |
Event/action type | request, confirmed, failed |
What happened |
2.3 Service Payload Structure¶
Standard Service Message:
{
"sid": "evt.v1", // Schema version
"ts": 1730182103, // Unix timestamp
"trace": "abc-123", // Trace ID for distributed tracing
"correlation_id": "corr-456", // For request/response pairing
"d": { // Event-specific data
"amount": 199.0,
"currency": "USD",
"status": "completed"
}
}
2.4 Emit/Echo Pattern¶
The emit/echo pattern provides reliable request-response semantics:
Request (emit):
Topic: emit/abs/service/plan/P123/access_request
Payload: {
"sid": "evt.v1",
"ts": 1730182103,
"trace": "trace-123",
"correlation_id": "req-456",
"d": {
"customer_id": "CUST001",
"station_id": "STN001",
"requested_action": "battery_swap"
}
}
Response (echo):
Topic: echo/abs/service/plan/P123/access_granted
Payload: {
"sid": "evt.v1",
"ts": 1730182104,
"trace": "trace-123",
"correlation_id": "req-456",
"d": {
"access_granted": true,
"session_id": "sess-789",
"expires_at": "2025-01-01T13:00:00Z"
}
}
2.5 Service Examples¶
# Payment processing
Topic: emit/odoo/payment/invoice/INV-12345/process
Payload: {"sid":"evt.v1","ts":1730182103,"trace":"abc-123","d":{"amount":199.0,"ccy":"USD"}}
# Payment confirmation
Topic: echo/odoo/payment/invoice/INV-12345/completed
Payload: {"sid":"evt.v1","ts":1730182104,"trace":"abc-123","d":{"status":"paid","ref":"PAY-456"}}
# Service state change
Topic: emit/abs/fsm/plan/P123/state_transition
Payload: {"sid":"evt.v1","ts":1730182103,"d":{"from":"INITIAL","to":"ACTIVE","trigger":"CONTRACT_SIGNED"}}
3. ABS Platform Specific Conventions¶
3.1 ABS Message Envelope¶
All ABS platform messages MUST use this standardized envelope:
{
"timestamp": "2025-01-01T12:00:00Z", // ISO 8601 timestamp
"plan_id": "bss-plan-001", // Service plan ID
"tenant_id": "tenant-xyz", // Multi-tenant ID
"correlation_id": "corr-123", // Request correlation
"idempotency_key": "idem-abc", // Idempotency protection
"actor": { // Who initiated this
"type": "customer|agent|system",
"id": "cust-123"
},
"data": { // Command/event specific payload
// Event-specific data here
}
}
3.2 ABS Topic Patterns¶
3.2.1 Customer Interactions¶
# Customer service request
emit/abs/customer/plan/{plan_id}/service_request
echo/abs/customer/plan/{plan_id}/service_granted
# Customer payment
emit/abs/customer/plan/{plan_id}/payment_submitted
echo/abs/customer/plan/{plan_id}/payment_confirmed
3.2.2 Agent Communications¶
# Agent calculations
emit/abs/agent/{agent_type}/{plan_id}/calculation_request
echo/abs/agent/{agent_type}/{plan_id}/calculation_result
# Agent state changes
emit/abs/agent/{agent_type}/{plan_id}/state_changed
3.2.3 FSM Events¶
# FSM transitions
emit/abs/fsm/{cycle}/{plan_id}/transition
echo/abs/fsm/{cycle}/{plan_id}/transition_applied
# FSM outputs
emit/abs/fsm/{cycle}/{plan_id}/output_signal
3.3 ABS Domain Mapping¶
| ABS Domain | MQTT Domain | Description |
|---|---|---|
| Battery Swap Service | bss |
Battery swap operations |
| Charger Rental Service | crs |
Charger rental operations |
| Fleet Management | fms |
Fleet tracking and management |
| Energy Management | ems |
Energy trading and optimization |
| Asset Relations | arm |
IoT asset management |
4. MQTT v5 Properties Usage¶
4.1 Standard Properties¶
| Property | Usage | Example |
|---|---|---|
message-expiry-interval |
Message TTL | 300 (5 minutes) |
response-topic |
RPC response route | echo/abs/service/plan/P123/response |
correlation-data |
Request correlation | Binary correlation ID |
4.2 User Properties¶
| Property | Purpose | Example | Required |
|---|---|---|---|
trace_id |
Distributed tracing | trace-abc-123 |
Recommended |
tenant |
Multi-tenancy | tenant-xyz |
If applicable |
application |
Source application | abs-platform |
Recommended |
priority |
Message priority | high\|normal\|low |
Optional |
4.3 Example MQTT v5 Usage¶
// Publishing with properties
client.publish('emit/abs/service/plan/P123/request', payload, {
qos: 1,
properties: {
messageExpiryInterval: 300,
responseTopic: 'echo/abs/service/plan/P123/response',
correlationData: Buffer.from('corr-123'),
userProperties: {
trace_id: 'trace-abc-123',
tenant: 'tenant-xyz',
application: 'abs-platform'
}
}
});
5. Load Balancing and Scaling¶
5.1 Shared Subscriptions¶
Use MQTT v5 shared subscriptions for load balancing:
# Load balanced service processing
$share/abs-workers/emit/abs/+/+/+/+
# Load balanced agent processing
$share/agent-pool/emit/abs/agent/+/+/+
# Load balanced payment processing
$share/payment-workers/emit/+/payment/+/+/+
5.2 Subscription Patterns¶
| Pattern | Purpose | Example |
|---|---|---|
+ |
Single level wildcard | emit/abs/+/plan/P123/+ |
# |
Multi-level wildcard | emit/abs/service/# |
$share/{group}/ |
Load balancing | $share/workers/emit/abs/+/+/+/+ |
6. Quality of Service (QoS)¶
6.1 QoS Guidelines¶
| Message Type | QoS Level | Rationale |
|---|---|---|
IoT telemetry (dt/) |
0 or 1 | Frequent, loss tolerable |
IoT commands (cmd/) |
1 or 2 | Critical, must arrive |
Service events (emit/) |
1 | Important, delivery required |
Service responses (echo/) |
1 | Responses must arrive |
RPC calls (call/) |
1 or 2 | Critical operations |
Metadata (meta/) |
1 + retain | Configuration, must persist |
6.2 Retention Guidelines¶
| Topic Type | Retain | Reason |
|---|---|---|
meta/ |
Yes | Schema definitions needed by new subscribers |
state/ |
Yes | Last known state for new subscribers |
dt/ |
No | Streaming data, latest only |
emit/ |
No | Events, not state |
echo/ |
No | Responses, not state |
7. Error Handling and Dead Letter Queues¶
7.1 Error Topics¶
# Processing errors
ops/error/{original_domain}/{original_service}/{error_type}
# Dead letter queue
ops/dlq/{original_domain}/{original_service}
# System alerts
ops/alert/{severity}/{component}
7.2 Error Payload Structure¶
{
"error_id": "err-123",
"timestamp": "2025-01-01T12:00:00Z",
"original_topic": "emit/abs/service/plan/P123/request",
"original_payload": "...",
"error": {
"type": "ValidationError",
"message": "Invalid plan_id format",
"code": "INVALID_PLAN_ID"
},
"retry_count": 3,
"max_retries": 5
}
8. Developer Implementation Notes¶
8.1 Topic Builder Usage¶
DO NOT hand-craft topics. Use the provided topic builder:
import { topicForCommand } from '../core/messaging';
const topic = topicForCommand({
kind: 'emit',
application: 'ABS',
context: 'BSS',
recipientId: `plan-${planId}`,
command: 'VALIDATE_SERVICE_ACCESS'
});
// Result: emit/ABS/BSS/plan-123/service_access
8.2 Message Validation¶
Always validate messages using the envelope validator:
import { validateEnvelope } from '../core/messaging';
const validation = validateEnvelope(incomingMessage);
if (!validation.valid) {
throw new Error(`Invalid message: ${validation.errors.join(', ')}`);
}
8.3 Correlation Tracking¶
For request-response patterns, always include correlation IDs:
// Request
const correlationId = generateUUID();
await client.publish(emitTopic, {
...payload,
correlation_id: correlationId
}, {
properties: {
responseTopic: echoTopic,
correlationData: Buffer.from(correlationId)
}
});
// Response handler
client.on('message', (topic, payload, packet) => {
const correlationId = packet.properties?.correlationData?.toString();
// Match with pending requests
});
9. Migration Notes¶
9.1 Legacy GATT Format¶
OLD (deprecated):
gatt/abs/bss-plan-001/customer/cust-123/request/service_access
NEW (current):
emit/abs/customer/plan/P123/service_access
9.2 Compatibility Period¶
During migration, both formats may be supported temporarily. New development MUST use the new format.
10. Quick Reference Examples¶
10.1 Common Patterns¶
# Battery swap request
emit/abs/service/plan/P123/swap_request
echo/abs/service/plan/P123/swap_granted
# Payment processing
emit/odoo/payment/invoice/INV-456/process
echo/odoo/payment/invoice/INV-456/completed
# IoT battery data
dt/arm/bms/ph-mnl1/F7/BAT001
# IoT station command
cmd/arm/ssc/ph-mnl1/F7/STN001
# Agent calculation
emit/abs/agent/payment/P123/check_quota
echo/abs/agent/payment/P123/quota_result
10.2 Subscription Examples¶
# Monitor all ABS service events
emit/abs/service/+/+/+
# Monitor specific plan
emit/abs/+/plan/P123/+
# Monitor all payments
emit/+/payment/+/+/+
# Load balanced processing
$share/workers/emit/abs/+/+/+/+
This guide provides comprehensive topic conventions for the ABS Platform. Always refer to this document when implementing MQTT messaging to ensure consistency across the platform.