Queue Implementation Guide¶
1. Overview¶
This document provides architectural guidance for implementing the embedded FIFO queue system within Service_Plan objects. The queues handle both external messages (from IoT devices and ovAPP) and internal messages (between Z-Agents).
2. Queue Architecture¶
2.1 Service_Plan Integration¶
Each Service_Plan object contains embedded queue structures:
// Conceptual structure - implementation details flexible
interface ServicePlanMessaging {
// Queue Buffers
external_queue: GATTMessage[];
internal_queue: GATTMessage[];
// Configuration
max_buffer_size: number;
priority: 'low' | 'median' | 'high'; // DEFAULT: 'median'
// Processing State
processing_round: 'external' | 'internal';
last_processed_tick: number;
ticks_since_last_process: number;
interrupt_pending: boolean;
// Statistics
external_messages_processed: number;
internal_messages_processed: number;
external_messages_dropped: number;
internal_messages_dropped: number;
processing_time_ms: number;
}
2.2 Queue Characteristics¶
- Type: Simple FIFO buffers (JavaScript arrays)
- Persistence: Embedded within Service_Plan serialization
- Size Limits: Configurable per Service_Plan
- Overflow Behavior: Drop oldest messages (FIFO)
- Processing: Alternating read pattern
3. Queue Operations¶
3.1 Message Enqueue¶
// Conceptual enqueue logic - implementation details flexible
class ServicePlanQueueManager {
pushToExternalQueue(servicePlan: ServicePlan, message: GATTMessage): void {
// Check buffer size limit
if (servicePlan.messaging.external_queue.length >= servicePlan.messaging.max_buffer_size) {
// Drop oldest message (FIFO behavior)
servicePlan.messaging.external_queue.shift();
servicePlan.messaging.external_messages_dropped++;
}
// Add new message to end of queue
servicePlan.messaging.external_queue.push(message);
}
pushToInternalQueue(servicePlan: ServicePlan, message: GATTMessage): void {
// Similar logic for internal queue
if (servicePlan.messaging.internal_queue.length >= servicePlan.messaging.max_buffer_size) {
servicePlan.messaging.internal_queue.shift();
servicePlan.messaging.internal_messages_dropped++;
}
servicePlan.messaging.internal_queue.push(message);
}
}
3.2 Message Dequeue¶
// Conceptual dequeue logic - implementation details flexible
class EventTickProcessor {
private clockInterval: number = 100; // 100ms base tick
private globalTickCounter: number = 0;
startClock(): void {
setInterval(() => {
this.globalTickCounter++;
this.processAllServicePlans();
}, this.clockInterval);
}
private processAllServicePlans(): void {
for (const [planId, servicePlan] of this.servicePlans) {
if (this.shouldProcessServicePlan(servicePlan)) {
this.processServicePlanMessages(servicePlan);
}
}
}
private shouldProcessServicePlan(servicePlan: ServicePlan): boolean {
// Check if Service_Plan has unprocessed messages
const hasMessages = servicePlan.messaging.external_queue.length > 0 ||
servicePlan.messaging.internal_queue.length > 0;
if (!hasMessages) return false;
// Apply priority-based processing frequency
const ticksSinceLastProcess = this.globalTickCounter - servicePlan.messaging.last_processed_tick;
switch (servicePlan.messaging.priority) {
case 'low':
return ticksSinceLastProcess >= 3; // Every 300ms
case 'median':
return ticksSinceLastProcess >= 1; // Every 100ms (default)
case 'high':
return ticksSinceLastProcess >= 1 || servicePlan.messaging.interrupt_pending;
default:
return ticksSinceLastProcess >= 1;
}
}
private processServicePlanMessages(servicePlan: ServicePlan): void {
const startTime = Date.now();
const maxProcessingTime = 50; // 50ms max per Service_Plan per tick
while (this.hasUnprocessedMessages(servicePlan) &&
(Date.now() - startTime) < maxProcessingTime) {
// Alternate between external and internal queues
if (servicePlan.messaging.processing_round === 'external') {
this.processExternalMessage(servicePlan);
servicePlan.messaging.processing_round = 'internal';
} else {
this.processInternalMessage(servicePlan);
servicePlan.messaging.processing_round = 'external';
}
}
// Update processing statistics
servicePlan.messaging.last_processed_tick = this.globalTickCounter;
servicePlan.messaging.processing_time_ms = Date.now() - startTime;
servicePlan.messaging.interrupt_pending = false;
}
private processExternalMessage(servicePlan: ServicePlan): void {
const message = servicePlan.messaging.external_queue.shift();
if (message) {
this.routeToHandler(message, servicePlan);
servicePlan.messaging.external_messages_processed++;
}
}
private processInternalMessage(servicePlan: ServicePlan): void {
const message = servicePlan.messaging.internal_queue.shift();
if (message) {
this.routeToHandler(message, servicePlan);
servicePlan.messaging.internal_messages_processed++;
}
}
}
4. Configuration Guidelines¶
4.1 Default Configuration¶
// Suggested default configuration - adjustable per Service_Plan
const DEFAULT_QUEUE_CONFIG = {
max_buffer_size: 1000, // Messages per queue
priority: 'median', // Processing priority (low/median/high)
clock_interval_ms: 100, // Base clock tick interval
max_processing_time_ms: 50, // Max processing time per Service_Plan per tick
retry_attempts: 3, // Failed message retries
retry_delay_ms: 1000, // Delay between retries
};
4.2 Service_Plan Specific Configuration¶
// High-traffic Service_Plan (e.g., busy swap station)
const HIGH_TRAFFIC_CONFIG = {
max_buffer_size: 2000,
priority: 'high', // Process every tick + interrupt capability
max_processing_time_ms: 100, // More processing time for busy plans
};
// Low-traffic Service_Plan (e.g., new customer)
const LOW_TRAFFIC_CONFIG = {
max_buffer_size: 500,
priority: 'low', // Process every 3rd tick (300ms)
max_processing_time_ms: 25, // Less processing time for quiet plans
};
// Standard Service_Plan (default)
const STANDARD_CONFIG = {
max_buffer_size: 1000,
priority: 'median', // Process every tick (100ms) - DEFAULT
max_processing_time_ms: 50, // Standard processing time
};
5. Message Routing¶
5.1 Handler Registration¶
// Conceptual handler registration - implementation details flexible
interface MessageHandler {
id: string;
name: string;
canHandle(message: GATTMessage): boolean;
handle(message: GATTMessage, servicePlan: ServicePlan): Promise<void>;
priority: number;
}
class MessageRouter {
private handlers: MessageHandler[] = [];
registerHandler(handler: MessageHandler): void {
this.handlers.push(handler);
// Sort by priority (higher priority first)
this.handlers.sort((a, b) => b.priority - a.priority);
}
async routeMessage(message: GATTMessage, servicePlan: ServicePlan): Promise<void> {
for (const handler of this.handlers) {
if (handler.canHandle(message)) {
await handler.handle(message, servicePlan);
return;
}
}
// No handler found - log warning
console.warn(`No handler found for message: ${message.topic}`);
}
}
5.2 Agent-Specific Handlers¶
// Conceptual agent handler - implementation details flexible
class PaymentAgentMessageHandler implements MessageHandler {
id = 'payment-agent-handler';
name = 'Payment Agent Message Handler';
priority = 100;
canHandle(message: GATTMessage): boolean {
return message.topic.includes('/payment/') ||
message.intent.business_context === 'payment';
}
async handle(message: GATTMessage, servicePlan: ServicePlan): Promise<void> {
// Find PaymentAgent in service plan
const paymentAgent = servicePlan.agent_pool.find(
agent => agent.agent_type === 'payment'
) as PaymentAgent;
if (paymentAgent) {
await paymentAgent.processEvent(message.payload);
}
}
}
6. Error Handling¶
6.1 Processing Failures¶
// Conceptual error handling - implementation details flexible
class EventTickProcessor {
private async processMessageSafely(message: GATTMessage, servicePlan: ServicePlan): Promise<void> {
try {
await this.routeMessage(message, servicePlan);
} catch (error) {
console.error(`Failed to process message ${message.id}:`, error);
// Increment error count
servicePlan.messaging.processing_errors =
(servicePlan.messaging.processing_errors || 0) + 1;
// Optionally move to dead letter queue
if (this.shouldMoveToDeadLetter(error)) {
await this.moveToDeadLetter(message, error);
}
}
}
private shouldMoveToDeadLetter(error: any): boolean {
// Move to dead letter for certain error types
return error.code === 'VALIDATION_ERROR' ||
error.code === 'AGENT_NOT_FOUND';
}
}
6.2 Queue Health Monitoring¶
// Conceptual health monitoring - implementation details flexible
interface QueueHealthMetrics {
external_queue_size: number;
internal_queue_size: number;
external_processing_rate: number;
internal_processing_rate: number;
external_drop_rate: number;
internal_drop_rate: number;
processing_errors: number;
last_processing_time: Date;
}
class QueueHealthMonitor {
getQueueHealth(servicePlan: ServicePlan): QueueHealthMetrics {
return {
external_queue_size: servicePlan.messaging.external_queue.length,
internal_queue_size: servicePlan.messaging.internal_queue.length,
external_processing_rate: this.calculateProcessingRate(servicePlan, 'external'),
internal_processing_rate: this.calculateProcessingRate(servicePlan, 'internal'),
external_drop_rate: this.calculateDropRate(servicePlan, 'external'),
internal_drop_rate: this.calculateDropRate(servicePlan, 'internal'),
processing_errors: servicePlan.messaging.processing_errors || 0,
last_processing_time: new Date()
};
}
}
7. Performance Considerations¶
7.1 Memory Management¶
- Queue Size Limits: Prevent memory exhaustion
- Message Cleanup: Remove processed messages immediately
- Object Pooling: Reuse message objects when possible
- Garbage Collection: Monitor GC impact of queue operations
7.2 Processing Optimization¶
- Batch Processing: Process multiple messages per tick if possible
- Async Processing: Use async/await for non-blocking operations
- Priority Queues: Consider priority-based processing for urgent messages
- Caching: Cache frequently accessed Service_Plan data
7.3 Scalability¶
- Horizontal Scaling: Each Service_Plan is independent
- Load Distribution: MQTT topics naturally distribute load
- Resource Limits: Monitor CPU and memory usage per Service_Plan
8. Monitoring and Observability¶
8.1 Key Metrics¶
// Metrics to track - implementation details flexible
interface QueueMetrics {
// Queue Sizes
external_queue_size: number;
internal_queue_size: number;
// Processing Rates
external_messages_per_second: number;
internal_messages_per_second: number;
// Error Rates
external_drop_rate: number;
internal_drop_rate: number;
processing_error_rate: number;
// Latency
average_processing_latency_ms: number;
max_processing_latency_ms: number;
}
8.2 Health Checks¶
// Health check logic - implementation details flexible
class QueueHealthChecker {
isHealthy(servicePlan: ServicePlan): boolean {
const metrics = this.getQueueMetrics(servicePlan);
return (
metrics.external_queue_size < servicePlan.messaging.max_buffer_size * 0.9 &&
metrics.internal_queue_size < servicePlan.messaging.max_buffer_size * 0.9 &&
metrics.processing_error_rate < 0.01 && // Less than 1% error rate
metrics.average_processing_latency_ms < 500
);
}
}
9. Testing Guidelines¶
9.1 Unit Testing¶
- Queue Operations: Test enqueue/dequeue operations
- Overflow Handling: Test buffer size limits
- Error Scenarios: Test processing failures
- Statistics: Test metric collection
9.2 Integration Testing¶
- MQTT Integration: Test message flow from MQTT to queues
- Agent Integration: Test message routing to agents
- Persistence: Test queue state persistence
- Performance: Test under load conditions
9.3 Load Testing¶
- Message Volume: Test with high message volumes
- Concurrent Processing: Test multiple Service_Plans
- Memory Usage: Monitor memory consumption
- Processing Latency: Measure end-to-end latency
10. Migration Strategy¶
10.1 Phase 1: Basic Implementation¶
- Implement queue structure in Service_Plan
- Add basic enqueue/dequeue operations
- Implement alternating processing
10.2 Phase 2: Integration¶
- Integrate with MQTT message handler
- Add message routing to agents
- Implement error handling
10.3 Phase 3: Optimization¶
- Add monitoring and metrics
- Optimize performance
- Add advanced features (priority, batching)
11. Success Criteria¶
11.1 Functional Requirements¶
- ✅ Messages are correctly enqueued and dequeued
- ✅ Alternating processing works correctly
- ✅ Queue overflow is handled gracefully
- ✅ Messages are routed to correct handlers
11.2 Performance Requirements¶
- ✅ Processing latency < 500ms
- ✅ Queue overflow rate < 1%
- ✅ Memory usage stays within limits
- ✅ System handles expected message volume
11.3 Operational Requirements¶
- ✅ Monitoring and alerting in place
- ✅ Error handling and recovery mechanisms
- ✅ Configuration management
- ✅ Documentation for operations team
Note: This document provides architectural guidance for queue implementation. Specific implementation details, code patterns, and technology choices are left to the development team based on their expertise and project requirements.