Message Queue System¤
A flexible and scalable message queue system for the backend that supports both local testing (in-memory) and distributed deployments (Redis).
Overview¤
The message queue system provides:
- Flexible Backend: Switch between in-memory (local testing) and Redis (distributed) providers
- Priority Queuing: Messages processed by priority (1-10, higher = more important)
- Delayed Messages: Schedule messages for future processing
- Retry Logic: Automatic retry with exponential backoff
- Dead Letter Queue: Handle permanently failed messages
- Real-time Stats: Monitor queue performance and health
- Type Safety: Full TypeScript support with typed payloads
Architecture¤
Queue Topology¤
graph TB
%% Define classes for consistent styling
classDef queue fill:#f3e5f5,stroke:#6a1b9a,stroke-width:2px
classDef service fill:#e1f5fe,stroke:#01579b,stroke-width:2px
classDef data fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
classDef external fill:#fff3e0,stroke:#e65100,stroke-width:2px
%% Input sources
API[API Endpoints]:::service
Socket[Socket Events]:::service
Goal[Goal System]:::service
%% Queue service
QS[Queue Service]:::service
%% Message queues by priority
PQ10[Priority 10: Critical]:::queue
PQ9[Priority 9: High Proactive]:::queue
PQ8[Priority 8: Stream Chunks]:::queue
PQ7[Priority 7: Proactive Actions]:::queue
PQ6[Priority 6: Agent Responses]:::queue
PQ5[Priority 5: Chat Messages]:::queue
PQ1[Priority 1-4: Background]:::queue
%% Processing layer
Proc[Message Processors]:::service
%% Retry and failure handling
Retry[Retry Queue]:::queue
DLQ[Dead Letter Queue]:::queue
%% Storage backends
Memory[(In-Memory)]:::data
Redis[(Redis)]:::external
%% Outputs
Agents[AI Agents]:::service
Frontend[Frontend via Socket]:::service
Metrics[Metrics & Logs]:::data
%% Input flow
API --> QS
Socket --> QS
Goal --> QS
%% Priority routing
QS --> PQ10
QS --> PQ9
QS --> PQ8
QS --> PQ7
QS --> PQ6
QS --> PQ5
QS --> PQ1
%% Processing flow
PQ10 --> Proc
PQ9 --> Proc
PQ8 --> Proc
PQ7 --> Proc
PQ6 --> Proc
PQ5 --> Proc
PQ1 --> Proc
%% Failure handling
Proc --> Retry
Retry --> Proc
Retry --> DLQ
%% Storage
QS --> Memory
QS --> Redis
%% Output flow
Proc --> Agents
Proc --> Frontend
Proc --> Metrics
Message Lifecycle¤
sequenceDiagram
participant C as Client
participant Q as Queue Service
participant P as Processor
participant R as Retry Queue
participant D as Dead Letter Queue
participant A as Agent/Handler
C->>Q: Enqueue Message (priority: 7)
Q->>Q: Validate & prioritize
Q->>P: Process by priority
alt Processing Success
P->>A: Handle message
A-->>P: Success response
P->>Q: Mark completed
Q-->>C: Success callback
else Processing Failure
P->>R: Send to retry queue
R->>R: Wait (exponential backoff)
R->>P: Retry processing
alt Retry Success
P->>A: Handle message
A-->>P: Success response
P->>Q: Mark completed
else Max Retries Exceeded
P->>D: Send to dead letter queue
D->>Q: Log dead letter event
Q-->>C: Failure callback
end
end
Priority Processing Flow¤
graph LR
classDef high fill:#ffcdd2,stroke:#c62828,stroke-width:2px
classDef med fill:#fff3e0,stroke:#e65100,stroke-width:2px
classDef low fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px
Input[Incoming Messages]
%% Priority classification
P10[Priority 10: Critical System]:::high
P9[Priority 9: High Proactive]:::high
P8[Priority 8: Stream Chunks]:::high
P7[Priority 7: Proactive Actions]:::med
P6[Priority 6: Agent Responses]:::med
P5[Priority 5: Chat Messages]:::med
P4[Priority 1-4: Background]:::low
Output[Processed Messages]
Input --> P10
Input --> P9
Input --> P8
Input --> P7
Input --> P6
Input --> P5
Input --> P4
P10 --> Output
P9 --> Output
P8 --> Output
P7 --> Output
P6 --> Output
P5 --> Output
P4 --> Output
Quick Start¤
Environment Configuration¤
Set the message queue provider in your .env
file:
Basic Usage¤
The system is automatically initialized when the server starts. You can interact with it through:
- Programmatically - Using the
QueueService
in your code - REST API - Via HTTP endpoints for monitoring and testing
- Socket Events - Automatic processing of queued messages
API Endpoints¤
Health & Monitoring¤
Message Operations¤
Management Operations¤
Queue Types¤
The system supports these pre-defined queues:
chat_messages
- User chat messages awaiting processingagent_responses
- AI agent responses ready to sendproactive_actions
- Proactive actions triggered by goal-seekingstream_chunks
- Real-time streaming message chunksstatus_updates
- System status and agent state updatesvalidation_requests
- Message validation requestsgoal_seeking_updates
- Goal-seeking system state changesconversation_events
- Conversation lifecycle events
Message Priority System¤
Messages are processed by priority (1-10):
- Priority 10: Critical system messages
- Priority 9: High-priority proactive actions
- Priority 8: Stream chunks (real-time feel)
- Priority 7: Proactive actions
- Priority 6: Agent responses
- Priority 5: Normal chat messages (default)
- Priority 1-4: Low priority background tasks
Programmatic Usage¤
Using QueueService¤
Creating Custom Queue Messages¤
Provider Comparison¤
In-Memory Provider¤
- Best for: Local development, testing, single-instance deployments
- Pros: No external dependencies, fast, simple setup
- Cons: Messages lost on restart, no persistence, single-instance only
Redis Provider¤
- Best for: Production, distributed systems, high availability
- Pros: Persistent, scalable, supports multiple instances
- Cons: Requires Redis server, additional complexity
Configuration Examples¤
Local Development Setup¤
Production Redis Setup¤
Docker Compose Example¤
Monitoring & Observability¤
Health Checks¤
Queue Statistics¤
Real-time Socket Events¤
The system emits processed messages via Socket.IO:
Error Handling¤
Retry Logic¤
Messages automatically retry with exponential backoff:
- Attempt 1: Immediate
- Attempt 2: 1 second delay
- Attempt 3: 2 second delay
- Attempt 4: 4 second delay (capped at 30 seconds)
Dead Letter Queue¤
Failed messages emit a deadLetter
event:
Testing¤
Unit Tests¤
Integration Tests¤
Best Practices¤
Queue Design¤
- Use appropriate priorities (don't overuse high priority)
- Keep message payloads small and focused
- Use delayed messages for scheduled actions
- Set appropriate retry limits
Error Handling¤
- Always handle dead letter queue events
- Log queue statistics regularly
- Monitor queue sizes to prevent backlog
- Set up alerts for queue health
Performance¤
- Use Redis for high-throughput scenarios
- Monitor average processing times
- Scale consumers horizontally when needed
- Use message batching for bulk operations
Security¤
- Validate message payloads
- Sanitize user data before queueing
- Use authentication for queue management endpoints
- Monitor for message queue flooding
Troubleshooting¤
Common Issues¤
Queue not processing messages:
- Check if queue service is initialized
- Verify subscribers are set up
- Check for errors in message handlers
High memory usage with in-memory provider:
- Check queue sizes with
/api/queue/stats
- Purge unnecessary queues
- Consider switching to Redis
Redis connection issues:
- Verify Redis server is running
- Check REDIS_URL configuration
- Ensure network connectivity
Messages failing repeatedly:
- Check dead letter queue events
- Verify message handler logic
- Review retry configuration
Debugging Commands¤
Future Enhancements¤
Planned features:
- AWS SQS provider
- Message encryption
- Queue persistence for in-memory provider
- Message deduplication
- Priority queue partitioning
- Advanced routing rules
- Graphical queue monitoring dashboard