Abhidalbanjan/pulse-trace
GitHub: Abhidalbanjan/pulse-trace
PulseTrace是一个用Go构建的AI驱动分布式可观测性平台,整合日志、追踪和指标,实现智能异常检测和事件关联,以提升系统监控效率。
Stars: 0 | Forks: 0
# PulseTrace — 分布式可观测性与事件监控平台
一个生产级的微服务可观测性平台,完全使用 Go 构建。
## 架构图
```
┌──────────────────────────────┐
│ API Gateway :8080 │
│ (reverse proxy + tracing) │
└──┬──────────┬──────────┬──────┘
│ │ │
┌──────▼──┐ ┌────▼────┐ ┌─▼──────────────┐
│ Log │ │ Alert │ │ Correlation │
│ Service │ │ Service │ │ Service │
│ :8081 │ │ :8082 │ │ :8083 │
└────┬────┘ └────┬────┘ └────────┬────────┘
│ │ │
Kafka "logs" Kafka "alerts" RabbitMQ
│ │ │
└────────────┘ │
│ ▼
┌────▼────┐ ┌──────────────────┐
│ Kafka │ │ Notification │
└────┬────┘ │ Service │
│ └──────────────────┘
┌────▼────┐
│Postgres │ ← logs, alerts, incidents
└─────────┘
┌─────────────────────────────────────────────────────────────────┐
│ Observability Stack │
│ OTel Collector → Jaeger (traces) · Prometheus → Grafana │
└─────────────────────────────────────────────────────────────────┘
```
## 已构建组件
| 组件 | 职责 |
|------------------------|--------------------------------------------------------------------|
| `gateway-service` | 反向代理,W3C 追踪上下文传播,路由所有 API 流量 |
| `log-service` | 接入与查询结构化日志事件,发布到 Kafka `logs` 主题 |
| `alert-service` | 消费 `logs` 主题,为 ERROR/FATAL 级别创建警报,发布到 `alerts` 主题 |
| `correlation-service` | 消费 `alerts` 主题,分组为事件,推断根本原因,发布到 RabbitMQ |
| `notification-service` | 消费 RabbitMQ,分发到 Slack / 邮件 / 日志 |
| `shared` | 模型、数据库连接池、Kafka 生产者/消费者、RabbitMQ 客户端、OTel 中间件 |
| PostgreSQL | 日志条目、警报和事件的持久化存储 |
| Kafka | 事件总线:`logs` 和 `alerts` 主题 |
| RabbitMQ | 带有死信队列的通知管道 |
| OTel Collector | 接收所有服务的 OTLP span,转发至 Jaeger |
| Jaeger | 分布式追踪可视化 |
| Prometheus | 从 OTel Collector 抓取指标 |
| Grafana | 预置的追踪和指标仪表板 |
## 快速启动
```
# 1. 克隆
git clone pulsetrace && cd pulsetrace
# 2. 构建并启动全栈
docker compose up --build
# 3. 导入一条 INFO 日志(无警报)
curl -X POST http://localhost:8080/api/v1/logs \
-H "Content-Type: application/json" \
-d '{"service": "auth-service", "level": "INFO", "message": "user login successful"}'
# 4. 导入一条 ERROR 日志(触发警报 → 事件 → 通知)
curl -X POST http://localhost:8080/api/v1/logs \
-H "Content-Type: application/json" \
-d '{"service": "payment-service", "level": "ERROR", "message": "DB connection pool exhausted"}'
# 5. 查询日志
curl "http://localhost:8080/api/v1/logs?service=payment-service&level=ERROR"
# 6. 查询警报
curl "http://localhost:8080/api/v1/alerts"
# 7. 查询事件(分组 + 根本原因)
curl "http://localhost:8080/api/v1/incidents"
# 8. 获取事件时间线
curl "http://localhost:8080/api/v1/incidents//timeline"
```
## 可观测性 UI 界面
| UI | URL | 凭据 |
|-----------------|----------------------------|-------------------|
| Jaeger | http://localhost:16686 | — |
| Grafana | http://localhost:3000 | admin / admin |
| Prometheus | http://localhost:9090 | — |
| RabbitMQ Mgmt | http://localhost:15672 | pulsetrace / pulsetrace_secret |
## 事件流
```
POST /api/v1/logs
│
▼
gateway-service
└─ injects W3C traceparent header
│
▼
log-service
├─ validates & persists to PostgreSQL
├─ starts OTel span (child of gateway span)
└─ publishes to Kafka "logs" topic (with trace headers)
│
▼
alert-service consumer
├─ extracts trace context from Kafka headers
├─ level == ERROR or FATAL?
│ YES → insert alert into PostgreSQL
└─ publishes to Kafka "alerts" topic (trace propagated)
│
▼
correlation-service consumer
├─ extracts trace context
├─ finds open incident in 5-min window for service
│ found → add alert to existing incident
│ not found → create new incident with root-cause inference
└─ publishes NotificationEvent to RabbitMQ
│
▼
notification-service consumer
├─ logs structured notification (always)
├─ posts to Slack (if SLACK_WEBHOOK_URL set)
└─ sends email (if SMTP_HOST set)
```
## API 参考
所有端点都通过 `http://localhost:8080` 的网关进行代理。
### 日志
| 方法 | 路径 | 描述 |
|--------|-------------------------|----------------------------|
| POST | `/api/v1/logs` | 接入一个结构化日志事件 |
| GET | `/api/v1/logs` | 列出日志(可筛选) |
| GET | `/api/v1/logs/{id}` | 通过 ID 获取单条日志 |
**POST 请求体:**
```
{
"service": "payment-service",
"level": "ERROR",
"message": "database timeout",
"trace_id": "abc-123",
"span_id": "def-456",
"metadata": { "region": "us-east-1" }
}
```
**GET 查询参数:** `service`, `level`, `trace_id`, `from` (RFC3339), `to` (RFC3339), `page`, `page_size`
### 警报
| 方法 | 路径 | 描述 |
|--------|------------------------|--------------------|
| GET | `/api/v1/alerts` | 列出警报(可筛选) |
| GET | `/api/v1/alerts/{id}` | 获取单条警报 |
**GET 查询参数:** `service`, `level`, `from`, `to`, `page`, `page_size`
### 事件
| 方法 | 路径 | 描述 |
|--------|-----------------------------------|------------------------------|
| GET | `/api/v1/incidents` | 列出包含根本原因的事件 |
| GET | `/api/v1/incidents/{id}` | 获取单个事件 |
| GET | `/api/v1/incidents/{id}/timeline` | 事件的有序事件时间线 |
**GET 查询参数:** `status` (OPEN/RESOLVED), `severity`, `service`, `from`, `to`, `page`, `page_size`
**事件响应示例:**
```
{
"id": "e4661798-...",
"title": "[ERROR] payment-service degradation detected",
"root_cause": "Database or network connectivity issue",
"status": "OPEN",
"severity": "ERROR",
"services": ["payment-service"],
"alert_count": 3,
"started_at": "2026-05-16T13:45:41Z"
}
```
**时间线响应示例:**
```
[
{ "at": "13:45:41", "event_type": "incident_opened", "description": "Incident opened: [ERROR] payment-service degradation detected" },
{ "at": "13:45:41", "event_type": "alert_triggered", "service": "payment-service", "level": "ERROR", "description": "[ERROR] payment-service: DB connection pool exhausted (attempt 1)" },
{ "at": "13:45:42", "event_type": "alert_triggered", "service": "payment-service", "level": "ERROR", "description": "[ERROR] payment-service: DB connection pool exhausted (attempt 2)" },
{ "at": "13:45:43", "event_type": "alert_triggered", "service": "payment-service", "level": "ERROR", "description": "[ERROR] payment-service: DB connection pool exhausted (attempt 3)" }
]
```
## 根本原因推断
关联引擎扫描警报消息中的已知模式,并将其映射到可能的根本原因:
| 检测到的模式 | 推断的根本原因 |
|----------------|--------------------------|
| `connection` | 数据库或网络连接问题 |
| `timeout` | 下游服务延迟或资源耗尽 |
| `memory` | 内存压力 — 可能的 OOM 状况 |
| `kafka` | Kafka 代理不可用或消费者滞后 |
| `auth` | 认证服务降级 |
| `permission` | 授权失败或凭证配置错误 |
| `crash` | 应用程序 panic 或未处理异常 |
| `unavailable` | 上游服务宕机或不可达 |
来自同一服务、在 **5 分钟滑动窗口** 内的警报会被分组为一个单一事件。事件的 `alert_count` 随每个新警报增加,`severity` 自动提升到所见最高级别。
## 分布式追踪
每个请求在整个调用链中携带一个 W3C `traceparent` 头部:
```
gateway-service: POST /api/v1/logs ← root span
└── log-service: POST /api/v1/logs ← child span (HTTP propagation)
├── log.ingest ← handler span
├── db.insert_log ← DB span
└── kafka.publish_log ← Kafka publish span (headers injected)
└── alert-service: alert.process_log_event ← consumer span (headers extracted)
├── db.insert_alert ← DB span
└── kafka.publish_alert ← Kafka publish span
└── correlation-service: correlation.process_alert
├── db.upsert_incident
└── rabbitmq.publish_notification
```
在 **http://localhost:16686** 查看追踪 — 选择任何服务并点击 "Find Traces"。
## 本地运行(不使用 Docker)
```
# 启动依赖项(Postgres、Kafka、RabbitMQ),方式自选,然后:
export DATABASE_URL="postgres://pulsetrace:pulsetrace_secret@localhost:5432/pulsetrace?sslmode=disable"
export KAFKA_BROKERS="localhost:9092"
export RABBITMQ_URL="amqp://guest:guest@localhost:5672/"
export OTEL_EXPORTER_OTLP_ENDPOINT="localhost:4317"
# 应用迁移
psql $DATABASE_URL -f log-service/migrations/001_create_log_entries.sql
psql $DATABASE_URL -f alert-service/migrations/001_create_alerts.sql
psql $DATABASE_URL -f correlation-service/migrations/001_create_incidents.sql
# 在单独的终端中运行每个服务
cd log-service && go run ./cmd
cd alert-service && go run ./cmd
cd correlation-service && go run ./cmd
cd notification-service && go run ./cmd
cd gateway-service && LOG_SERVICE_URL=http://localhost:8081 \
ALERT_SERVICE_URL=http://localhost:8082 \
CORRELATION_SERVICE_URL=http://localhost:8083 \
go run ./cmd
```
## Kubernetes 部署
清单文件位于 `k8s/`。需要一个运行中的集群,并带有 nginx ingress controller。
```
# 按顺序应用
kubectl apply -f k8s/namespace.yaml
kubectl apply -f k8s/configmap.yaml
kubectl apply -f k8s/secret.yaml # update values before applying
kubectl apply -f k8s/log-service.yaml
kubectl apply -f k8s/alert-service.yaml
kubectl apply -f k8s/correlation-service.yaml
kubectl apply -f k8s/notification-service.yaml
kubectl apply -f k8s/gateway.yaml
# 检查发布
kubectl rollout status deployment -n pulsetrace
# 通过 ingress 访问(将 pulsetrace.local 添加到 /etc/hosts)
curl http://pulsetrace.local/api/v1/logs
```
每个服务具有:
- 在 `/healthz` 上的 `livenessProbe` 和 `readinessProbe`
- `HorizontalPodAutoscaler` (CPU 目标 70%,2–10 个副本)
- 资源请求和限制
- 来自 ConfigMap / Secrets 的配置(无硬编码值)
## 项目结构
```
pulsetrace/
├── gateway-service/ # Reverse proxy + OTel trace propagation
│ ├── cmd/main.go
│ ├── internal/proxy/
│ └── Dockerfile
├── log-service/ # Log ingestion, query, Kafka publish
│ ├── cmd/main.go
│ ├── internal/handler/
│ ├── internal/repository/
│ ├── migrations/
│ └── Dockerfile
├── alert-service/ # Kafka consumer → alerts → re-publish
│ ├── cmd/main.go
│ ├── internal/consumer/
│ ├── internal/handler/
│ ├── internal/repository/
│ ├── migrations/
│ └── Dockerfile
├── correlation-service/ # Incident grouping + root-cause engine
│ ├── cmd/main.go
│ ├── internal/engine/ # Correlator (sliding window, root cause)
│ ├── internal/handler/ # Incident + timeline HTTP API
│ ├── internal/repository/
│ ├── migrations/
│ └── Dockerfile
├── notification-service/ # RabbitMQ consumer → Slack / email / log
│ ├── cmd/main.go
│ ├── internal/worker/
│ └── Dockerfile
├── shared/ # Shared packages
│ ├── db/ # PostgreSQL pool
│ ├── kafka/ # Producer + ConsumerGroup (OTel-aware)
│ ├── rabbitmq/ # Publisher + Consumer with DLQ
│ ├── middleware/ # CORS, RequestLogger, Tracing
│ ├── models/ # LogEntry, Alert, Incident, Notification
│ └── telemetry/ # OTel tracer init, Kafka header propagation
├── otel-collector/ # OTel Collector config
├── prometheus/ # Prometheus scrape config
├── grafana/ # Pre-provisioned datasources + dashboard
├── k8s/ # Kubernetes manifests
│ ├── namespace.yaml
│ ├── configmap.yaml
│ ├── secret.yaml
│ ├── log-service.yaml # Deployment + Service + HPA
│ ├── alert-service.yaml
│ ├── correlation-service.yaml
│ ├── notification-service.yaml
│ └── gateway.yaml # Deployment + Service + Ingress + HPA
├── docker-compose.yml
├── go.work
└── README.md
```
## 技术栈
| 领域 | 技术 |
|---------------------|-------------------------------------------|
| 编程语言 | Go 1.22 |
| API | `net/http` (标准库) |
| 数据库 | PostgreSQL 16 |
| 消息代理 | Apache Kafka (Sarama) |
| 通知队列 | RabbitMQ 3.13 (amqp091-go) 带死信队列 |
| 分布式追踪 | OpenTelemetry SDK + Jaeger |
| 指标监控 | Prometheus + Grafana |
| 容器化 | Docker + Compose |
| 编排 | Kubernetes (Deployments, HPA, Ingress) |
## 路线图
- [x] **阶段 1** — 日志接入,PostgreSQL,REST API
- [x] **阶段 2** — Kafka 事件管道,警报服务(事件驱动)
- [x] **阶段 3** — 分布式追踪,OpenTelemetry,Jaeger,Prometheus,Grafana
- [x] **阶段 4** — 事件关联引擎,RabbitMQ 通知,Kubernetes 清单
标签:AI原生, Apex, API网关, APM, EVTX分析, Go语言, Kafka, OISF, Postgres, RabbitMQ, SonarQube插件, 事件监控, 关联分析, 分布式系统监控, 反向代理, 可观测性平台, 告警管理, 子域名突变, 实时追踪, 异常检测, 微服务监控, 指标收集, 故障检测, 数据库, 日志审计, 日志管理, 智能告警, 机器学习, 根因分析, 测试用例, 消息队列, 生产级平台, 用户代理, 程序破解, 统一开发者体验, 自定义请求头, 请求拦截, 软件成分分析, 运维监控