BSS Agent Pub/Sub Pattern Implementation¶
Overview¶
The BSS Agent v2 implements multiple pub/sub patterns based on operation criticality and requirements, following the ABS Platform's comprehensive MQTT topic conventions.
Pub/Sub Pattern Selection by Criticality¶
🔴 Critical Sequencing: call/rtrn Pattern¶
Used for: Operations requiring guaranteed responses and strict sequencing - Payment status verification (financial transactions) - Service activation/deactivation (safety-critical operations) - Asset allocation confirmation (resource management)
🟡 Weak Coupling: emit/echo Pattern¶
Used for: Asynchronous operations with eventual consistency - Service intent notifications (customer journey) - Usage reporting to billing systems (accounting) - Location action triggers (physical operations)
🔵 Informational: meta/stat Pattern¶
Used for: Configuration, monitoring, and telemetry data - Agent health monitoring (stat/) - Service configuration updates (meta/) - Performance metrics reporting (stat/)
⚡ Real-time Commands: cmd/dt Pattern¶
Used for: Direct device/system control and data transfer - IoT device commands (cmd/) - Sensor data ingestion (dt/) - Fleet coordination signals (cmd/)
External Requests by Pub/Sub Pattern¶
🔴 Critical Operations: call/rtrn Pattern¶
1. Payment Status Verification¶
Function: checkOdooPaymentStatus() (when stale data)
- Pattern: call/rtrn - guaranteed response required
- Call Topic: call/abs/payment_query/{planId}/status_check
- Return Topic: rtrn/abs/payment_query/{planId}/status_response
- Timeout: 5 seconds (financial transaction)
- Retries: 3 attempts
- Criticality: Financial verification requires guaranteed response
🟡 Medium Operations: emit/echo Pattern¶
2. Service Intent Signal Emission¶
Function: emitServiceIntentSignal() (W2 Workflow)
- Pattern: emit/echo - eventual consistency acceptable
- Emit Topic: emit/abs/service_intent/{customerId}/{targetLocationId}
- Echo Topic: echo/abs/service_intent/{customerId}_{targetLocationId}/confirmed
- Timeout: 30 seconds
- Purpose: Customer journey notifications
3. Fleet Allocation Signal¶
Function: sendAssetAllocationSignal() (W4 Workflow)
- Pattern: emit/echo - asynchronous ARM coordination
- Emit Topic: emit/abs/fleet_allocation/{targetFleet}/{sequenceType}
- Echo Topic: echo/abs/fleet_allocation/{targetFleet}/completed
- Timeout: 300 seconds (ARM processing time)
- Purpose: Asset allocation with ARM
4. Odoo Usage Reporting¶
Function: reportServiceUsageToOdoo()
- Pattern: emit/echo - billing system integration
- Emit Topic: emit/abs/service_usage/{servicePlanId}/{usageType}
- Echo Topic: echo/abs/service_usage/{servicePlanId}/{usageType}/billing_processed
- Timeout: 60 seconds
- Purpose: Usage-based billing triggers
5. Location Action Triggers¶
Function: triggerLocationActions()
- Pattern: emit/echo - physical operations coordination
- Emit Topic: emit/abs/location_actions/{locationId}/request
- Echo Topic: echo/abs/location_actions/{locationId}/completed
- Timeout: 45 seconds
- Purpose: Physical location operations
🔵 Informational Operations: meta/stat Pattern¶
6. Agent Health Monitoring¶
Function: reportAgentHealth()
- Pattern: stat/ - no response expected
- Stat Topic: stat/abs/agent_health/{planId}/metrics
- Purpose: Performance monitoring and telemetry
- Data: Execution metrics, sync status, health score
Implementation Details¶
Critical Design Principle: NO BLOCKING WAITS¶
The Emit/Echo pattern MUST NOT use blocking waits for echo responses. Instead, it uses event-driven continuation patterns:
// ❌ WRONG: Blocking wait for echo
// const response = await mqttClient.emitAndWaitForEcho(...);
// ✅ CORRECT: Emit and continue, handle echo separately
mqttClient.emit(emitPayload);
// Agent continues immediately with current state
return { signals: ['REQUEST_EMITTED'], metadata: {...} };
// Echo handled separately when received:
// Topic: echo/abs/fleet_allocation/{fleetId}/completed
// → Triggers new agent execution with echo data
Event-Driven State Management¶
- Emit Phase: Agent emits request and returns immediately
- State Persistence: Current state saved with correlation tracking
- Echo Phase: When echo arrives, triggers new agent execution
- State Continuation: Agent resumes from saved state with echo data
Correlation ID State Tracking¶
Correlation IDs enable stateless echo handling:
// Emit phase - save pending operation state
const correlationId = `fleet_${targetFleet}_${customerId}_${Date.now()}`;
const pendingState = {
correlation_id: correlationId,
operation: 'FLEET_ALLOCATION',
original_request: requestData,
timestamp: new Date().toISOString(),
timeout_at: new Date(Date.now() + 300000).toISOString()
};
// Store in agent_state.pending_operations[correlationId]
// Return immediately without blocking
Echo Processing Pattern¶
// When echo received via MQTT listener:
// Topic: echo/abs/fleet_allocation/{fleetId}/completed
const handleFleetAllocationEcho = async (echoPayload) => {
const correlationId = echoPayload.correlation_id;
const pendingOp = agent_state.pending_operations[correlationId];
if (pendingOp && !isExpired(pendingOp)) {
// Process echo and update state
const result = processFleetAllocationResponse(echoPayload);
// Trigger new agent execution with echo results
await executeAgent({
action: 'PROCESS_FLEET_ALLOCATION_ECHO',
echo_data: result,
original_request: pendingOp.original_request
});
// Clean up pending operation
delete agent_state.pending_operations[correlationId];
}
};
Weak Coupling Benefits¶
- Fault Tolerance: External system failures don't block agent execution
- Eventual Consistency: Systems synchronize when available
- Non-Blocking: Agent continues processing while awaiting responses
- Scalability: Supports asynchronous processing patterns
Correlation ID Pattern¶
All emit/echo pairs use correlation IDs for request tracking:
const correlationId = `${operation}_${planId}_${timestamp}`;
Timeout Strategy¶
- Location Actions: 30 seconds (physical operations)
- ARM Fleet Allocation: 300 seconds (asset resolution complexity)
- Odoo Billing: 60 seconds (billing processing)
- Odoo Payment Queries: 10 seconds (quick status lookup)
Echo Topic Structure¶
Following consistent pattern: echo/{application}/{context}/{identifier}/{result_type}
MQTT Topic Summary by Pattern¶
🔴 Critical: call/rtrn Topics¶
# Call Topics (ABS → External Systems)
call/abs/payment_query/{planId}/status_check
# Return Topics (External Systems → ABS)
rtrn/abs/payment_query/{planId}/status_response
🟡 Medium: emit/echo Topics¶
# Emit Topics (ABS → External Systems)
emit/abs/service_intent/{customerId}/{locationId}
emit/abs/fleet_allocation/{fleetId}/{sequenceType}
emit/abs/service_usage/{planId}/{usageType}
emit/abs/location_actions/{locationId}/request
# Echo Topics (External Systems → ABS)
echo/abs/service_intent/{customerId}_{locationId}/confirmed
echo/abs/fleet_allocation/{fleetId}/completed
echo/abs/service_usage/{planId}/{usageType}/billing_processed
echo/abs/location_actions/{locationId}/completed
🔵 Informational: stat Topics¶
# Stat Topics (ABS → Monitoring Systems)
stat/abs/agent_health/{planId}/metrics
Pattern Selection Criteria¶
| Pattern | Criticality | Response Required | Timeout | Use Cases |
|---|---|---|---|---|
| call/rtrn | 🔴 Critical | Guaranteed | 3-15s | Payment verification, safety operations |
| emit/echo | 🟡 Medium | Expected | 30-300s | Customer journey, billing, coordination |
| stat | 🔵 Info | No | N/A | Health monitoring, telemetry |
| meta | 🔵 Info | No | N/A | Configuration updates |
Mock Implementation Notes¶
Current implementation includes mock responses for development/testing: - All external requests simulate successful delivery - Echo responses are mocked for immediate testing - Real MQTT client integration comments provided for production deployment
Production Implementation¶
MQTT Listener Integration¶
For production deployment, the MQTT listener must be configured to route echo responses to agent execution:
// MQTT Listener Configuration Example
const echoTopicHandlers = {
'echo/abs/fleet_allocation/+/completed': (topic, payload) => {
const fleetId = extractFleetIdFromTopic(topic);
const correlationId = payload.correlation_id;
// Trigger new agent execution for echo processing
executeAgent({
plan_id: payload.service_plan_id,
action: 'PROCESS_FLEET_ALLOCATION_ECHO',
echo_data: payload
});
},
'echo/abs/service_usage/+/+/billing_processed': (topic, payload) => {
executeAgent({
plan_id: extractPlanIdFromTopic(topic),
action: 'PROCESS_ODOO_BILLING_ECHO',
echo_data: payload
});
}
};
State Persistence Requirements¶
- Pending Operations Storage: Agent state must persist
pending_operationsbetween executions - Correlation ID Tracking: Enable echo responses to find original requests
- Timeout Management: Clean up expired pending operations
- State Consistency: Ensure agent state updates are atomic
Timeout and Cleanup Strategy¶
// Timeout cleanup handler (separate process)
const cleanupExpiredOperations = async () => {
const expiredOps = findExpiredPendingOperations();
for (const op of expiredOps) {
await executeAgent({
plan_id: op.service_plan_id,
action: 'HANDLE_ECHO_TIMEOUT',
timeout_data: op
});
}
};
// Run cleanup periodically
setInterval(cleanupExpiredOperations, 60000); // Every minute
Architecture Compliance¶
✅ Weak Coupling: External systems communicate via MQTT only ✅ Eventual Consistency: No blocking external calls ✅ Fault Tolerance: Agent continues operation during external system failures ✅ Scalability: Supports asynchronous, distributed processing ✅ Federation Principles: Clean separation between ABS and external systems
This implementation ensures the BSS Agent maintains high availability while integrating with external systems through loosely coupled, event-driven architecture.