Skip to content

ABS Platform Messaging & Queue Architecture

1. Overview

This document defines the messaging architecture for the ABS Platform, focusing on how Z-Agents, customers, and IoT assets communicate through a hybrid MQTT + embedded queue system.

2. Architecture Intent

2.1 Core Principles

  • MQTT for Transport: All external communications use MQTT with GATT topic naming
  • Embedded Queues: Simple FIFO buffers within each Service_Plan object
  • Alternating Processing: Fair distribution between external and internal messages
  • Service_Plan Isolation: Each plan manages its own message queues independently

2.2 High-Level Flow

External Entities (IoT/ovAPP) → MQTT Broker → Message Handler → Service_Plan Queues → Event Tick Processor → Z-Agents

3. GATT Topic Naming Convention

3.1 Topic Structure

gatt/{domain}/{service_plan_id}/{entity_type}/{entity_id}/{intent}/{action}

3.2 Domain Examples

  • gatt/abs/ - Asset-Based Subscription (general)
  • gatt/bss/ - Battery Swap Service
  • gatt/crs/ - Charger Rental Service

3.3 Entity Types

  • customer - Customer interactions (ovAPP)
  • asset - IoT asset communications
  • agent - Z-Agent communications
  • service_plan - Service plan events
  • system - System-level events

3.4 Intent Categories

  • request - Request-response pattern
  • response - Response to requests
  • signal - State change signals
  • notification - Informational messages
  • command - Direct commands

3.5 Example Topics

# Customer requests battery swap
gatt/abs/bss-plan-001/customer/cust-123/request/battery_swap

# Asset reports ready status
gatt/abs/bss-plan-001/asset/battery-456/signal/ready_for_swap

# Payment agent signals quota exceeded
gatt/abs/bss-plan-001/agent/payment-agent-001/signal/quota_exceeded

# Internal agent communication
gatt/abs/bss-plan-001/agent/payment-agent-001/agent/quota-agent-002/request/check_quota

4. Service_Plan Queue Structure

4.1 Queue Types

Each Service_Plan contains two FIFO message buffers:

External Queue: - Purpose: Messages from external entities (customers, IoT assets) - Source: MQTT messages filtered by Service_Plan ID - Processing: Handled by event tick processor

Internal Queue: - Purpose: Messages between Z-Agents within the same Service_Plan - Source: Agent-generated messages, system events - Processing: Handled by event tick processor

4.2 Queue Characteristics

  • Type: Simple FIFO buffers (arrays)
  • Persistence: Embedded within Service_Plan object
  • Size Limits: Configurable per Service_Plan
  • Overflow Behavior: Drop oldest messages (FIFO)
  • Processing: Alternating read (external → internal → external...)

4.3 Suggested Data Structure

// Conceptual structure - implementation details left to developers
interface ServicePlanMessaging {
  // Queue Buffers
  external_queue: GATTMessage[];
  internal_queue: GATTMessage[];

  // Configuration
  max_buffer_size: number;
  priority: 'low' | 'median' | 'high'; // DEFAULT: 'median'

  // Processing State
  processing_round: 'external' | 'internal';
  last_processed_tick: number;
  ticks_since_last_process: number;
  interrupt_pending: boolean;

  // Statistics
  external_messages_processed: number;
  internal_messages_processed: number;
  external_messages_dropped: number;
  internal_messages_dropped: number;
  processing_time_ms: number;
}

5. Message Handler Architecture

5.1 Responsibilities

  • MQTT Subscription: Listen to relevant GATT topics
  • Message Filtering: Route messages to appropriate Service_Plan
  • Queue Management: Push messages to external or internal queues
  • Topic Parsing: Parse GATT topics to extract routing information

5.2 Message Routing Logic

1. Parse GATT topic to extract service_plan_id
2. Determine if message is external or internal
3. Find corresponding Service_Plan
4. Push to appropriate queue (external or internal)
5. Handle queue overflow if necessary

5.3 External vs Internal Classification

External Messages: - Topics containing customer/ or asset/ - Messages from IoT devices - Messages from ovAPP

Internal Messages: - Topics containing agent/ or service_plan/ - Messages between Z-Agents - System-generated messages

6. Event Tick Processing

6.1 Processing Pattern

Clock Tick (100ms) → Check Service_Plan Queues → Process Messages → Throttle → Next Tick

Processing Logic: - Clock-Driven: System clock provides base 100ms tick interval - Queue-Driven: Only process Service_Plans with unprocessed messages - Throttled: Each Service_Plan gets processing opportunity per tick - Priority-Based: Different tick frequencies based on Service_Plan priority

6.2 Processing Steps

  1. Clock Tick: System clock triggers every 100ms
  2. Queue Check: Identify Service_Plans with unprocessed messages
  3. Priority Filter: Apply priority-based processing frequency
  4. Message Processing: Process messages from both queues (alternating)
  5. Throttling: Ensure fair distribution across all Service_Plans
  6. Cleanup: Update statistics and state

6.3 Priority-Based Processing

// Conceptual priority configuration - implementation details flexible
enum ServicePlanPriority {
  LOW = 'low',      // Process every 3rd tick (300ms)
  MEDIAN = 'median', // Process every tick (100ms) - DEFAULT
  HIGH = 'high'     // Process every tick (100ms) + interrupt capability
}

interface ServicePlanTickConfig {
  priority: ServicePlanPriority;
  last_processed_tick: number;
  ticks_since_last_process: number;
  interrupt_pending: boolean;
}

6.4 Error Handling

  • Processing Failures: Log error, continue with next message
  • Queue Empty: Skip processing for that round
  • Agent Errors: Handle gracefully, don't block queue processing
  • Clock Interrupts: Handle forced processing for high-priority Service_Plans

7. Z-Agent Integration

7.1 Message Publishing

Z-Agents can publish messages to internal queue:

// Conceptual example - implementation details flexible
class PaymentAgent {
  async signalQuotaExceeded(): Promise<void> {
    const message = {
      topic: `gatt/abs/${this.servicePlanId}/agent/${this.id}/signal/quota_exceeded`,
      payload: { current_usage: this.state.credits_used },
      priority: 'high'
    };

    // Publish to internal queue
    await this.publishToInternalQueue(message);
  }
}

7.2 Message Consumption

Z-Agents receive messages through event tick processing:

// Conceptual example - implementation details flexible
class QuotaAgent {
  async handleMessage(message: GATTMessage): Promise<void> {
    if (message.topic.includes('/payment/')) {
      await this.handlePaymentSignal(message);
    }
  }
}

8. Event Tick System

8.1 Clock-Driven Processing

The event tick system uses a clock-driven approach with intelligent queue management:

// Conceptual tick system - implementation details flexible
class EventTickProcessor {
  private clockInterval: number = 100; // 100ms base tick
  private globalTickCounter: number = 0;
  private servicePlans: Map<string, ServicePlan> = new Map();

  startClock(): void {
    setInterval(() => {
      this.globalTickCounter++;
      this.processAllServicePlans();
    }, this.clockInterval);
  }

  private processAllServicePlans(): void {
    for (const [planId, servicePlan] of this.servicePlans) {
      if (this.shouldProcessServicePlan(servicePlan)) {
        this.processServicePlanMessages(servicePlan);
      }
    }
  }

  private shouldProcessServicePlan(servicePlan: ServicePlan): boolean {
    // Check if Service_Plan has unprocessed messages
    const hasMessages = servicePlan.messaging.external_queue.length > 0 || 
                       servicePlan.messaging.internal_queue.length > 0;

    if (!hasMessages) return false;

    // Apply priority-based processing frequency
    const ticksSinceLastProcess = this.globalTickCounter - servicePlan.messaging.last_processed_tick;

    switch (servicePlan.messaging.priority) {
      case 'low':
        return ticksSinceLastProcess >= 3; // Every 300ms
      case 'median':
        return ticksSinceLastProcess >= 1; // Every 100ms (default)
      case 'high':
        return ticksSinceLastProcess >= 1 || servicePlan.messaging.interrupt_pending;
      default:
        return ticksSinceLastProcess >= 1;
    }
  }
}

8.2 Interrupt-Driven Processing

High-priority Service_Plans can trigger immediate processing:

// Conceptual interrupt system - implementation details flexible
class ServicePlanInterruptManager {
  triggerInterrupt(servicePlanId: string): void {
    const servicePlan = this.getServicePlan(servicePlanId);
    if (servicePlan && servicePlan.messaging.priority === 'high') {
      servicePlan.messaging.interrupt_pending = true;
      // Force immediate processing
      this.processServicePlanImmediately(servicePlan);
    }
  }

  private processServicePlanImmediately(servicePlan: ServicePlan): void {
    // Process messages immediately, bypassing clock tick
    this.processServicePlanMessages(servicePlan);
    servicePlan.messaging.interrupt_pending = false;
  }
}

8.3 Throttling and Fair Distribution

The system ensures fair processing across all Service_Plans:

// Conceptual throttling logic - implementation details flexible
class ServicePlanThrottler {
  private maxProcessingTimePerTick: number = 50; // 50ms max per Service_Plan per tick
  private totalProcessingTime: number = 0;

  private processServicePlanMessages(servicePlan: ServicePlan): void {
    const startTime = Date.now();

    // Process messages with time limit
    while (this.hasUnprocessedMessages(servicePlan) && 
           (Date.now() - startTime) < this.maxProcessingTimePerTick) {
      this.processNextMessage(servicePlan);
    }

    // Update processing statistics
    servicePlan.messaging.last_processed_tick = this.globalTickCounter;
    servicePlan.messaging.processing_time_ms = Date.now() - startTime;
  }
}

9. Configuration Guidelines

9.1 Queue Size Limits

Suggested Defaults: - External Queue: 1000 messages per Service_Plan - Internal Queue: 500 messages per Service_Plan - Configurable: Per Service_Plan based on expected traffic

9.2 Event Tick Configuration

Suggested Defaults: - Base Clock Frequency: Every 100ms - Priority Processing Frequencies: - LOW: Every 300ms (3 ticks) - MEDIAN: Every 100ms (1 tick) - DEFAULT - HIGH: Every 100ms (1 tick) + interrupt capability - Max Processing Time: 50ms per Service_Plan per tick - Configurable: Per Service_Plan based on business requirements

9.3 MQTT Configuration

  • QoS Levels: Use QoS 1 for reliable delivery
  • Retain Messages: Disable for real-time messaging
  • Clean Sessions: Enable for fresh state management

10. Monitoring & Observability

10.1 Key Metrics

  • Queue Sizes: Current messages in external/internal queues
  • Processing Rates: Messages processed per second
  • Drop Rates: Messages dropped due to overflow
  • Processing Latency: Time from queue entry to processing

10.2 Health Checks

  • Queue Health: Monitor for stuck or overflowing queues
  • Processing Health: Monitor for failed message processing
  • MQTT Health: Monitor connection and subscription status

11. Implementation Considerations

11.1 Technology Choices

  • MQTT Broker: Existing infrastructure (Mosquitto/AWS IoT)
  • Queue Storage: In-memory arrays within Service_Plan objects
  • Persistence: Service_Plan serialization includes queue state
  • Processing: Synchronous or asynchronous based on requirements

11.2 Scalability Considerations

  • Horizontal Scaling: Each Service_Plan is independent
  • Vertical Scaling: Adjust queue sizes and processing frequency
  • Load Distribution: MQTT topics naturally distribute load

11.3 Security Considerations

  • MQTT Authentication: Use existing authentication mechanisms
  • Topic Authorization: Validate topic access per Service_Plan
  • Message Validation: Validate message structure and content

12. Migration Strategy

12.1 Phase 1: Foundation

  • Implement basic queue structure in Service_Plan
  • Create MQTT message handler
  • Implement event tick processor

12.2 Phase 2: Integration

  • Integrate with existing Z-Agents
  • Add message publishing capabilities
  • Implement monitoring and metrics

12.3 Phase 3: Optimization

  • Tune queue sizes and processing frequency
  • Add advanced error handling
  • Implement performance optimizations

13. Success Criteria

13.1 Functional Requirements

  • ✅ Messages are routed to correct Service_Plan queues
  • ✅ Alternating processing between external and internal queues
  • ✅ Queue overflow handled gracefully
  • ✅ Z-Agents can publish and consume messages

13.2 Performance Requirements

  • ✅ Message processing latency < 500ms
  • ✅ Queue overflow rate < 1%
  • ✅ System handles expected message volume
  • ✅ No message loss under normal conditions

13.3 Operational Requirements

  • ✅ Monitoring and alerting in place
  • ✅ Error handling and recovery mechanisms
  • ✅ Configuration management for queue parameters
  • ✅ Documentation for operations team

Note: This document provides architectural intent and design guidance. Implementation details, specific code patterns, and technology choices are left to the development team based on their expertise and project requirements.