godaralokesh29/PulseGuard-AI
GitHub: godaralokesh29/PulseGuard-AI
Stars: 0 | Forks: 1
# PulseGuard - Project Overview
## 🎯 What This Project Does
**RAG of Fire** is an **AI-powered Incident Response System** that automatically analyzes production incidents and recommends mitigation actions using historical data and machine learning.
### Real-World Problem It Solves
When a production system experiences an incident (like a database timeout, memory leak, or API rate limiting), your on-call engineer needs to:
1. Understand what's happening (diagnosis)
2. Find similar past incidents (search historical knowledge)
3. Know what worked before (recommendations)
4. Act quickly to minimize downtime
**RAG of Fire automates all of this.**
## 🏗️ Architecture Overview
### Three Main Components
#### 1. **Backend (Python + FastAPI)**
- REST API that handles incident analysis
- Connected to a vector database (ChromaDB) for semantic search
- Has 10 pre-loaded historical incidents with real metrics
- Can match new incidents to past ones and recommend solutions
#### 2. **Frontend (Next.js + React)**
- Dashboard to visualize incidents
- Forms to search historical data
- Displays recommendations and confidence scores
#### 3. **Database**
- PostgreSQL: Stores structured incident data
- ChromaDB: Vector database for semantic search (finds similar incidents)
## 📊 What "RAG" Means
**RAG = Retrieval-Augmented Generation**
- **Retrieval**: Find similar past incidents from vector database
- **Augmented**: Combine with current incident data
- **Generation**: Use LLM to create personalized recommendations
Example:
Current Incident: Database timeout, 450% spike
↓
Search vector DB for similar incidents
↓
Found: INC-2025-001 (database timeout, 450% spike, fixed by throttling 30%)
↓
LLM generates: "Recommend: Throttle database connections to 30%"
↓
Confidence: 95% (because we have exact match in history)
## 🧪 What The Test Cases Do
The `test_system.py` file runs 7 tests simulating real production incidents:
### Test 1: Database Timeout
error_type: "database_timeout"
spike_percentage: 450% # Database response time increased 4.5x
Expected: "Throttle connections to 30%"
**What it tests**: Can the system detect a database timeout and recommend connection throttling?
### Test 2: Kafka Consumer Lag
error_type: "kafka_consumer_lag"
spike_percentage: 320% # Message queue backed up
Expected: "Throttle consumers to 45%"
**What it tests**: Can the system detect message queue buildup and recommend consumer throttling?
### Test 3: Memory Leak
error_type: "memory_leak"
spike_percentage: 280% # Memory usage increased 2.8x
Expected: "Throttle traffic to 20%"
**What it tests**: Can the system detect memory pressure and recommend traffic reduction?
### Test 4: Stream Anomaly
Reports streaming anomalies in real-time
**What it tests**: Can the system accept live anomaly data?
### Test 5-6: Search & Statistics
Search historical RCA documents
Get statistics about stored incidents
**What it tests**: Can the system retrieve and summarize historical data?
## 🔗 How Everything Integrates
┌─────────────────┐
│ Frontend │
│ (Next.js) │
└────────┬────────┘
│ HTTP requests
↓
┌─────────────────────────────────────┐
│ Backend (FastAPI) │
│ - Decision Engine │
│ - Document Search │
│ - Streaming Anomalies │
└────────┬────────────────────────────┘
│
┌────┴─────┬────────────────┐
↓ ↓ ↓
┌────────┐ ┌──────────┐ ┌────────────┐
│PostgreSQL│ ChromaDB │ LLM Service │
│(DB Data) │(Vector │ (OpenAI or │
│ │Search) │ Mock) │
└──────────┴──────────┴─────────────┘
## 📝 Integration Points
### What needs to be integrated:
1. **LLM Service**
- Currently using a mock LLM (deterministic responses)
- Can integrate real OpenAI/Anthropic API
- Location: `backend/services/llm_engine.py`
2. **Vector Database**
- Currently using ChromaDB (in-memory, great for demo)
- Can integrate Pinecone, Weaviate, etc.
- Location: `backend/services/vector_db.py`
3. **Real Database**
- PostgreSQL models already defined
- Need to connect to real PostgreSQL instance
- Location: `backend/database/db.py`
4. **Streaming Pipeline**
- Currently using asyncio.Queue (mock Kafka/Flink)
- Can integrate real Apache Kafka
- Location: `backend/services/stream_processor.py`
5. **Frontend Features**
- Basic Next.js UI created
- Can add real-time dashboards, advanced filtering
- Location: `app/` and `components/`
## ✅ Current Status
- ✅ Core logic working
- ✅ All 7 tests passing
- ✅ API endpoints functioning
- ✅ Backend starting successfully
## 🔌 WebSocket Real-Time Streaming
### What is WebSocket?
A persistent connection between browser and server that allows **LIVE, TWO-WAY** communication.
### How It Works in This Project
Browser Backend
| |
|-------- Connect via WebSocket ------|
| |
|<---- Real-time Decision Updates ----|
|<---- Anomaly Alerts ------------------|
|<---- System Events ------------------|
### Example Flow: Real-Time Incident Notification
// Frontend (JavaScript)
const ws = new WebSocket('ws://localhost:8000/ws/incidents');
ws.onmessage = (event) => {
const message = JSON.parse(event.data);
if (message.type === 'decision') {
// Update dashboard in real-time
console.log('New Decision:', message.data);
// Display: "Database Timeout - Recommend: Throttle to 30%"
}
if (message.type === 'anomaly') {
// Show alert in UI
showAlert('Anomaly Detected: ' + message.data.error_type);
}
};
// Send ping to keep connection alive
setInterval(() => {
ws.send(JSON.stringify({ type: 'ping' }));
}, 30000);
### What Gets Sent Over WebSocket
# Backend generates a decision
decision = {
"id": "dec_12345",
"matched_incident": "INC-2025-001",
"symptom": "Database response time increased 450%",
"recommended_action": "Throttle connections to 30%",
"confidence_score": 0.95,
"latency_ms": 245
}
# Broadcast to ALL connected browsers
await notification_service.ws_manager.broadcast({
"type": "decision",
"data": decision
})
**Result**: 100+ web browsers get the same notification instantly! ⚡
## 💬 Slack Integration
### What is Slack?
A team messaging platform. Slack integration means **the system sends alerts to your Slack channel automatically**.
### How It Works
Backend detects incident
↓
Generates decision/recommendation
↓
Sends formatted message to Slack webhook
↓
Slack channel receives alert
↓
Team members see: "🚨 Database Timeout - Throttle to 30%"
### Setup Slack (Step-by-Step)
**Step 1**: Go to https://api.slack.com/apps and create an app
- Name: "Incident Response Bot"
**Step 2**: Enable "Incoming Webhooks"
- Get the webhook URL (looks like):
https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
**Step 3**: Add to `.env` file
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXX
**Step 4**: Restart backend
uvicorn backend.main:app --reload
### Slack Message Example
When an incident is detected, Slack receives:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🚨 Incident Decision: INC-2025-001
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Symptom:
Database response time increased 450%
Confidence:
95%
Recommended Action:
Throttle connections to 30%
⏱️ 245ms | ID: dec_12345
### Code: How Slack Alert Gets Sent
# backend/services/notification.py
class SlackNotifier:
async def send_decision_alert(self, decision):
# Build a nice formatted message
payload = {
"text": "🚨 Incident Mitigation Decision Generated",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"Incident Decision: {decision.matched_incident}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Recommended Action:*\n```{decision.recommended_action}```"
}
}
]
}
# Send HTTP POST to Slack webhook
async with aiohttp.ClientSession() as session:
await session.post(self.webhook_url, json=payload)
## 🔄 Complete Workflow Example
### Real Production Incident Flow
**Scenario**: Production database connection pool exhausted
1. MONITORING SYSTEM
└─ Detects: Database response time 450% spike
└─ Sends to: POST /api/v1/decisions/stream-anomaly
2. BACKEND (Decision Engine)
├─ Searches vector DB for similar incidents
├─ Finds: INC-2025-001 (exact match!)
├─ Generates: "Throttle connections to 30%"
├─ Calculates: Confidence 95% (based on exact match)
└─ Creates Decision object
3. NOTIFICATION SERVICE
├─ WebSocket Broadcast:
│ └─ Sends to 42 browsers viewing dashboard
│ └─ Each browser shows alert immediately
│
└─ Slack Notification:
└─ Sends formatted alert to #incidents channel
└─ On-call engineer sees alert in Slack
4. TEAM RESPONSE
├─ Engineer reads: "Throttle to 30%"
├─ Executes: kubectl patch deployment...
├─ Database recovers: Response time back to normal
└─ Incident resolved ✅
### The Code Path
# Step 1: Incoming anomaly
POST /api/v1/decisions/stream-anomaly {
"error_type": "database_timeout",
"spike_percentage": 450
}
# Step 2: Decision generated (backend/routes/decisions.py)
decision = await engine.generate_decision(...)
# Step 3: Notifications sent (backend/services/notification.py)
await notification_service.notify_decision(
decision=decision,
channels=["websocket", "slack"] # Send to BOTH
)
# Step 4: Results
- 42 browsers: See live alert (WebSocket)
- Slack #incidents channel: Formatted message with recommendation
## 🎯 Real Connections vs Mock
Currently you have:
| Component | Status | What It Does |
|-----------|--------|-------------|
| **WebSocket** | ✅ Working | Streams live incidents to dashboards |
| **Slack** | ✅ Ready to connect | When configured, sends alerts to Slack |
| **LLM** | ✅ Mock (can be real) | Generates recommendations |
| **Vector DB** | ✅ Working | Stores & searches 10 historical incidents |
| **PostgreSQL** | ⏳ Optional | Can store permanent incident records |
## 🚀 Next Steps
1. **Verify backend is running**: `uvicorn backend.main:app --reload`
2. **Run tests**: `python test_system.py`
3. **Check API**: Visit `http://localhost:8000/docs`
4. **Optional - Connect Slack**:
- Get webhook URL from Slack API
- Add to `.env`
- Restart backend
- Try incident analysis - alert will appear in Slack!
5. **Develop frontend** or **Integrate with real services**
### What is Slack?
A team messaging platform. Slack integration means **the system sends alerts to your Slack channel automatically**.
### How It Works
Backend detects incident
↓
Generates decision/recommendation
↓
Sends formatted message to Slack webhook
↓
Slack channel receives alert
↓
Team members see: "🚨 Database Timeout - Throttle to 30%"
### Setup Slack (Step-by-Step)
**Step 1**: Go to https://api.slack.com/apps and create an app
- Name: "Incident Response Bot"
**Step 2**: Enable "Incoming Webhooks"
- Get the webhook URL (looks like):
https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
**Step 3**: Add to `.env` file
SLACK_WEBHOOK_URL=https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXX
**Step 4**: Restart backend
uvicorn backend.main:app --reload
### Slack Message Example
When an incident is detected, Slack receives:
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🚨 Incident Decision: INC-2025-001
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Symptom:
Database response time increased 450%
Confidence:
95%
Recommended Action:
Throttle connections to 30%
⏱️ 245ms | ID: dec_12345
### Code: How Slack Alert Gets Sent
# backend/services/notification.py
class SlackNotifier:
async def send_decision_alert(self, decision):
# Build a nice formatted message
payload = {
"text": "🚨 Incident Mitigation Decision Generated",
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"Incident Decision: {decision.matched_incident}"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Recommended Action:*\n```{decision.recommended_action}```"
}
}
]
}
# Send HTTP POST to Slack webhook
async with aiohttp.ClientSession() as session:
await session.post(self.webhook_url, json=payload)
## 🔄 Complete Workflow Example
### Real Production Incident Flow
**Scenario**: Production database connection pool exhausted
1. MONITORING SYSTEM
└─ Detects: Database response time 450% spike
└─ Sends to: POST /api/v1/decisions/stream-anomaly
2. BACKEND (Decision Engine)
├─ Searches vector DB for similar incidents
├─ Finds: INC-2025-001 (exact match!)
├─ Generates: "Throttle connections to 30%"
├─ Calculates: Confidence 95% (based on exact match)
└─ Creates Decision object
3. NOTIFICATION SERVICE
├─ WebSocket Broadcast:
│ └─ Sends to 42 browsers viewing dashboard
│ └─ Each browser shows alert immediately
│
└─ Slack Notification:
└─ Sends formatted alert to #incidents channel
└─ On-call engineer sees alert in Slack
4. TEAM RESPONSE
├─ Engineer reads: "Throttle to 30%"
├─ Executes: kubectl patch deployment...
├─ Database recovers: Response time back to normal
└─ Incident resolved ✅
### The Code Path
# Step 1: Incoming anomaly
POST /api/v1/decisions/stream-anomaly {
"error_type": "database_timeout",
"spike_percentage": 450
}
# Step 2: Decision generated (backend/routes/decisions.py)
decision = await engine.generate_decision(...)
# Step 3: Notifications sent (backend/services/notification.py)
await notification_service.notify_decision(
decision=decision,
channels=["websocket", "slack"] # Send to BOTH
)
# Step 4: Results
- 42 browsers: See live alert (WebSocket)
- Slack #incidents channel: Formatted message with recommendation
## 🎯 Real Connections vs Mock
Currently you have:
| Component | Status | What It Does |
|-----------|--------|-------------|
| **WebSocket** | ✅ Working | Streams live incidents to dashboards |
| **Slack** | ✅ Ready to connect | When configured, sends alerts to Slack |
| **LLM** | ✅ Mock (can be real) | Generates recommendations |
| **Vector DB** | ✅ Working | Stores & searches 10 historical incidents |
| **PostgreSQL** | ⏳ Optional | Can store permanent incident records |
## 🚀 Next Steps
1. **Verify backend is running**: `uvicorn backend.main:app --reload`
2. **Run tests**: `python test_system.py`
3. **Check API**: Visit `http://localhost:8000/docs`
4. **Optional - Connect Slack**:
- Get webhook URL from Slack API
- Add to `.env`
- Restart backend
- Try incident analysis - alert will appear in Slack!
5. **Develop frontend** or **Integrate with real services**标签:自动化攻击