Abhidalbanjan/pulse-trace

GitHub: Abhidalbanjan/pulse-trace

PulseTrace是一个用Go构建的AI驱动分布式可观测性平台,整合日志、追踪和指标,实现智能异常检测和事件关联,以提升系统监控效率。

Stars: 0 | Forks: 0

# PulseTrace — 分布式可观测性与事件监控平台 一个生产级的微服务可观测性平台,完全使用 Go 构建。 ## 架构图 ``` ┌──────────────────────────────┐ │ API Gateway :8080 │ │ (reverse proxy + tracing) │ └──┬──────────┬──────────┬──────┘ │ │ │ ┌──────▼──┐ ┌────▼────┐ ┌─▼──────────────┐ │ Log │ │ Alert │ │ Correlation │ │ Service │ │ Service │ │ Service │ │ :8081 │ │ :8082 │ │ :8083 │ └────┬────┘ └────┬────┘ └────────┬────────┘ │ │ │ Kafka "logs" Kafka "alerts" RabbitMQ │ │ │ └────────────┘ │ │ ▼ ┌────▼────┐ ┌──────────────────┐ │ Kafka │ │ Notification │ └────┬────┘ │ Service │ │ └──────────────────┘ ┌────▼────┐ │Postgres │ ← logs, alerts, incidents └─────────┘ ┌─────────────────────────────────────────────────────────────────┐ │ Observability Stack │ │ OTel Collector → Jaeger (traces) · Prometheus → Grafana │ └─────────────────────────────────────────────────────────────────┘ ``` ## 已构建组件 | 组件 | 职责 | |------------------------|--------------------------------------------------------------------| | `gateway-service` | 反向代理,W3C 追踪上下文传播,路由所有 API 流量 | | `log-service` | 接入与查询结构化日志事件,发布到 Kafka `logs` 主题 | | `alert-service` | 消费 `logs` 主题,为 ERROR/FATAL 级别创建警报,发布到 `alerts` 主题 | | `correlation-service` | 消费 `alerts` 主题,分组为事件,推断根本原因,发布到 RabbitMQ | | `notification-service` | 消费 RabbitMQ,分发到 Slack / 邮件 / 日志 | | `shared` | 模型、数据库连接池、Kafka 生产者/消费者、RabbitMQ 客户端、OTel 中间件 | | PostgreSQL | 日志条目、警报和事件的持久化存储 | | Kafka | 事件总线:`logs` 和 `alerts` 主题 | | RabbitMQ | 带有死信队列的通知管道 | | OTel Collector | 接收所有服务的 OTLP span,转发至 Jaeger | | Jaeger | 分布式追踪可视化 | | Prometheus | 从 OTel Collector 抓取指标 | | Grafana | 预置的追踪和指标仪表板 | ## 快速启动 ``` # 1. 克隆 git clone pulsetrace && cd pulsetrace # 2. 构建并启动全栈 docker compose up --build # 3. 导入一条 INFO 日志(无警报) curl -X POST http://localhost:8080/api/v1/logs \ -H "Content-Type: application/json" \ -d '{"service": "auth-service", "level": "INFO", "message": "user login successful"}' # 4. 导入一条 ERROR 日志(触发警报 → 事件 → 通知) curl -X POST http://localhost:8080/api/v1/logs \ -H "Content-Type: application/json" \ -d '{"service": "payment-service", "level": "ERROR", "message": "DB connection pool exhausted"}' # 5. 查询日志 curl "http://localhost:8080/api/v1/logs?service=payment-service&level=ERROR" # 6. 查询警报 curl "http://localhost:8080/api/v1/alerts" # 7. 查询事件(分组 + 根本原因) curl "http://localhost:8080/api/v1/incidents" # 8. 获取事件时间线 curl "http://localhost:8080/api/v1/incidents//timeline" ``` ## 可观测性 UI 界面 | UI | URL | 凭据 | |-----------------|----------------------------|-------------------| | Jaeger | http://localhost:16686 | — | | Grafana | http://localhost:3000 | admin / admin | | Prometheus | http://localhost:9090 | — | | RabbitMQ Mgmt | http://localhost:15672 | pulsetrace / pulsetrace_secret | ## 事件流 ``` POST /api/v1/logs │ ▼ gateway-service └─ injects W3C traceparent header │ ▼ log-service ├─ validates & persists to PostgreSQL ├─ starts OTel span (child of gateway span) └─ publishes to Kafka "logs" topic (with trace headers) │ ▼ alert-service consumer ├─ extracts trace context from Kafka headers ├─ level == ERROR or FATAL? │ YES → insert alert into PostgreSQL └─ publishes to Kafka "alerts" topic (trace propagated) │ ▼ correlation-service consumer ├─ extracts trace context ├─ finds open incident in 5-min window for service │ found → add alert to existing incident │ not found → create new incident with root-cause inference └─ publishes NotificationEvent to RabbitMQ │ ▼ notification-service consumer ├─ logs structured notification (always) ├─ posts to Slack (if SLACK_WEBHOOK_URL set) └─ sends email (if SMTP_HOST set) ``` ## API 参考 所有端点都通过 `http://localhost:8080` 的网关进行代理。 ### 日志 | 方法 | 路径 | 描述 | |--------|-------------------------|----------------------------| | POST | `/api/v1/logs` | 接入一个结构化日志事件 | | GET | `/api/v1/logs` | 列出日志(可筛选) | | GET | `/api/v1/logs/{id}` | 通过 ID 获取单条日志 | **POST 请求体:** ``` { "service": "payment-service", "level": "ERROR", "message": "database timeout", "trace_id": "abc-123", "span_id": "def-456", "metadata": { "region": "us-east-1" } } ``` **GET 查询参数:** `service`, `level`, `trace_id`, `from` (RFC3339), `to` (RFC3339), `page`, `page_size` ### 警报 | 方法 | 路径 | 描述 | |--------|------------------------|--------------------| | GET | `/api/v1/alerts` | 列出警报(可筛选) | | GET | `/api/v1/alerts/{id}` | 获取单条警报 | **GET 查询参数:** `service`, `level`, `from`, `to`, `page`, `page_size` ### 事件 | 方法 | 路径 | 描述 | |--------|-----------------------------------|------------------------------| | GET | `/api/v1/incidents` | 列出包含根本原因的事件 | | GET | `/api/v1/incidents/{id}` | 获取单个事件 | | GET | `/api/v1/incidents/{id}/timeline` | 事件的有序事件时间线 | **GET 查询参数:** `status` (OPEN/RESOLVED), `severity`, `service`, `from`, `to`, `page`, `page_size` **事件响应示例:** ``` { "id": "e4661798-...", "title": "[ERROR] payment-service degradation detected", "root_cause": "Database or network connectivity issue", "status": "OPEN", "severity": "ERROR", "services": ["payment-service"], "alert_count": 3, "started_at": "2026-05-16T13:45:41Z" } ``` **时间线响应示例:** ``` [ { "at": "13:45:41", "event_type": "incident_opened", "description": "Incident opened: [ERROR] payment-service degradation detected" }, { "at": "13:45:41", "event_type": "alert_triggered", "service": "payment-service", "level": "ERROR", "description": "[ERROR] payment-service: DB connection pool exhausted (attempt 1)" }, { "at": "13:45:42", "event_type": "alert_triggered", "service": "payment-service", "level": "ERROR", "description": "[ERROR] payment-service: DB connection pool exhausted (attempt 2)" }, { "at": "13:45:43", "event_type": "alert_triggered", "service": "payment-service", "level": "ERROR", "description": "[ERROR] payment-service: DB connection pool exhausted (attempt 3)" } ] ``` ## 根本原因推断 关联引擎扫描警报消息中的已知模式,并将其映射到可能的根本原因: | 检测到的模式 | 推断的根本原因 | |----------------|--------------------------| | `connection` | 数据库或网络连接问题 | | `timeout` | 下游服务延迟或资源耗尽 | | `memory` | 内存压力 — 可能的 OOM 状况 | | `kafka` | Kafka 代理不可用或消费者滞后 | | `auth` | 认证服务降级 | | `permission` | 授权失败或凭证配置错误 | | `crash` | 应用程序 panic 或未处理异常 | | `unavailable` | 上游服务宕机或不可达 | 来自同一服务、在 **5 分钟滑动窗口** 内的警报会被分组为一个单一事件。事件的 `alert_count` 随每个新警报增加,`severity` 自动提升到所见最高级别。 ## 分布式追踪 每个请求在整个调用链中携带一个 W3C `traceparent` 头部: ``` gateway-service: POST /api/v1/logs ← root span └── log-service: POST /api/v1/logs ← child span (HTTP propagation) ├── log.ingest ← handler span ├── db.insert_log ← DB span └── kafka.publish_log ← Kafka publish span (headers injected) └── alert-service: alert.process_log_event ← consumer span (headers extracted) ├── db.insert_alert ← DB span └── kafka.publish_alert ← Kafka publish span └── correlation-service: correlation.process_alert ├── db.upsert_incident └── rabbitmq.publish_notification ``` 在 **http://localhost:16686** 查看追踪 — 选择任何服务并点击 "Find Traces"。 ## 本地运行(不使用 Docker) ``` # 启动依赖项(Postgres、Kafka、RabbitMQ),方式自选,然后: export DATABASE_URL="postgres://pulsetrace:pulsetrace_secret@localhost:5432/pulsetrace?sslmode=disable" export KAFKA_BROKERS="localhost:9092" export RABBITMQ_URL="amqp://guest:guest@localhost:5672/" export OTEL_EXPORTER_OTLP_ENDPOINT="localhost:4317" # 应用迁移 psql $DATABASE_URL -f log-service/migrations/001_create_log_entries.sql psql $DATABASE_URL -f alert-service/migrations/001_create_alerts.sql psql $DATABASE_URL -f correlation-service/migrations/001_create_incidents.sql # 在单独的终端中运行每个服务 cd log-service && go run ./cmd cd alert-service && go run ./cmd cd correlation-service && go run ./cmd cd notification-service && go run ./cmd cd gateway-service && LOG_SERVICE_URL=http://localhost:8081 \ ALERT_SERVICE_URL=http://localhost:8082 \ CORRELATION_SERVICE_URL=http://localhost:8083 \ go run ./cmd ``` ## Kubernetes 部署 清单文件位于 `k8s/`。需要一个运行中的集群,并带有 nginx ingress controller。 ``` # 按顺序应用 kubectl apply -f k8s/namespace.yaml kubectl apply -f k8s/configmap.yaml kubectl apply -f k8s/secret.yaml # update values before applying kubectl apply -f k8s/log-service.yaml kubectl apply -f k8s/alert-service.yaml kubectl apply -f k8s/correlation-service.yaml kubectl apply -f k8s/notification-service.yaml kubectl apply -f k8s/gateway.yaml # 检查发布 kubectl rollout status deployment -n pulsetrace # 通过 ingress 访问(将 pulsetrace.local 添加到 /etc/hosts) curl http://pulsetrace.local/api/v1/logs ``` 每个服务具有: - 在 `/healthz` 上的 `livenessProbe` 和 `readinessProbe` - `HorizontalPodAutoscaler` (CPU 目标 70%,2–10 个副本) - 资源请求和限制 - 来自 ConfigMap / Secrets 的配置(无硬编码值) ## 项目结构 ``` pulsetrace/ ├── gateway-service/ # Reverse proxy + OTel trace propagation │ ├── cmd/main.go │ ├── internal/proxy/ │ └── Dockerfile ├── log-service/ # Log ingestion, query, Kafka publish │ ├── cmd/main.go │ ├── internal/handler/ │ ├── internal/repository/ │ ├── migrations/ │ └── Dockerfile ├── alert-service/ # Kafka consumer → alerts → re-publish │ ├── cmd/main.go │ ├── internal/consumer/ │ ├── internal/handler/ │ ├── internal/repository/ │ ├── migrations/ │ └── Dockerfile ├── correlation-service/ # Incident grouping + root-cause engine │ ├── cmd/main.go │ ├── internal/engine/ # Correlator (sliding window, root cause) │ ├── internal/handler/ # Incident + timeline HTTP API │ ├── internal/repository/ │ ├── migrations/ │ └── Dockerfile ├── notification-service/ # RabbitMQ consumer → Slack / email / log │ ├── cmd/main.go │ ├── internal/worker/ │ └── Dockerfile ├── shared/ # Shared packages │ ├── db/ # PostgreSQL pool │ ├── kafka/ # Producer + ConsumerGroup (OTel-aware) │ ├── rabbitmq/ # Publisher + Consumer with DLQ │ ├── middleware/ # CORS, RequestLogger, Tracing │ ├── models/ # LogEntry, Alert, Incident, Notification │ └── telemetry/ # OTel tracer init, Kafka header propagation ├── otel-collector/ # OTel Collector config ├── prometheus/ # Prometheus scrape config ├── grafana/ # Pre-provisioned datasources + dashboard ├── k8s/ # Kubernetes manifests │ ├── namespace.yaml │ ├── configmap.yaml │ ├── secret.yaml │ ├── log-service.yaml # Deployment + Service + HPA │ ├── alert-service.yaml │ ├── correlation-service.yaml │ ├── notification-service.yaml │ └── gateway.yaml # Deployment + Service + Ingress + HPA ├── docker-compose.yml ├── go.work └── README.md ``` ## 技术栈 | 领域 | 技术 | |---------------------|-------------------------------------------| | 编程语言 | Go 1.22 | | API | `net/http` (标准库) | | 数据库 | PostgreSQL 16 | | 消息代理 | Apache Kafka (Sarama) | | 通知队列 | RabbitMQ 3.13 (amqp091-go) 带死信队列 | | 分布式追踪 | OpenTelemetry SDK + Jaeger | | 指标监控 | Prometheus + Grafana | | 容器化 | Docker + Compose | | 编排 | Kubernetes (Deployments, HPA, Ingress) | ## 路线图 - [x] **阶段 1** — 日志接入,PostgreSQL,REST API - [x] **阶段 2** — Kafka 事件管道,警报服务(事件驱动) - [x] **阶段 3** — 分布式追踪,OpenTelemetry,Jaeger,Prometheus,Grafana - [x] **阶段 4** — 事件关联引擎,RabbitMQ 通知,Kubernetes 清单
标签:AI原生, Apex, API网关, APM, EVTX分析, Go语言, Kafka, OISF, Postgres, RabbitMQ, SonarQube插件, 事件监控, 关联分析, 分布式系统监控, 反向代理, 可观测性平台, 告警管理, 子域名突变, 实时追踪, 异常检测, 微服务监控, 指标收集, 故障检测, 数据库, 日志审计, 日志管理, 智能告警, 机器学习, 根因分析, 测试用例, 消息队列, 生产级平台, 用户代理, 程序破解, 统一开发者体验, 自定义请求头, 请求拦截, 软件成分分析, 运维监控