Admin-or-Admin/rac-agents
GitHub: Admin-or-Admin/rac-agents
基于三个协作 AI 智能体的网络安全流水线,实现从原始日志到威胁修复方案的端到端自动化处理。
Stars: 0 | Forks: 0
# rac-agents
**识别. 分析. 分类.**
三个自主 AI 智能体(Agent)构成了一条网络安全流水线。原始日志从一端输入,分类后的威胁及修复方案从另一端输出。每个智能体都是一个独立的 Python 进程,通过 Apache Kafka 进行通信,这意味着你可以在不同的机器上运行它们,独立扩展它们,或者在不影响其他智能体的情况下替换其中一个。
本文档涵盖了每个智能体的功能、如何从零开始设置一切,以及各个部分是如何协同工作的。
## 目录
- [工作原理](#how-it-works)
- [智能体](#agents)
- [分类器 (Classifier)](#classifier)
- [分析师 (Analyst)](#analyst)
- [响应器 (Responder)](#responder)
- [Kafka 主题](#kafka-topics)
- [分析统计](#analytics)
- [知识库](#knowledge-base)
- [项目结构](#project-structure)
- [安装设置](#setup)
- [配置说明](#configuration)
- [运行智能体](#running-the-agents)
- [Pipeline JSON 回退机制](#pipeline-json-fallback)
- [添加新的知识文件](#adding-a-new-knowledge-file)
## 工作原理
每条进入系统的日志都会按顺序经历三个阶段。
分类器(Classifier)从 `logs.unfiltered` Kafka 主题读取原始日志,将每一条发送给 Gemini 进行分类,并将结果发布到 `logs.categories`。分析师(Analyst)从 `logs.categories` 读取数据,过滤掉置信度低于阈值或未被标记为安全相关的内容,并使用具备跨域上下文的 Gemini 对其余内容进行调查。它将调查结果发布到 `logs.solver_plan`。响应器(Responder)从 `logs.solver_plan` 读取数据,生成包含每步风险级别和审批要求的具体修复计划,并将最终输出发布到 `logs.solution`。
所有三个智能体还会向 `analytics` 主题发布操作事件,以便你能全面了解每个智能体正在做什么、耗时多少以及它们是否存活。
```
logs.unfiltered
|
classifier
|
logs.categories
|
analyst
|
logs.solver_plan
|
responder
|
logs.solution
|
analytics <-- all three agents publish here in parallel
```
## 智能体
### 分类器
**文件:** `classifier.py`
**读取自:** `logs.unfiltered`
**写入至:** `logs.categories`, `analytics`
分类器是流水线的入口点。它接收任意格式的原始日志消息,并请求 Gemini 对其进行解析。输出是一个结构化的 JSON 对象,所有下游智能体都依赖于此。
对于每条日志,它生成:
- `category` — `security`(安全)、`infrastructure`(基础设施)、`application`(应用)或 `deployment`(部署)之一
- `severity` — `critical`(严重)、`high`(高)、`medium`(中)、`low`(低)或 `info`(信息)之一
- `tags` — 两到五个小写描述性标签
- `isCybersecurity` — 此日志是否具有任何安全相关性
- `sendToInvestigationAgent` — 分析师是否应接手此日志。仅当 `isCybersecurity` 为真且严重程度不是 `info` 时才为真
- `classificationConfidence` — 0 到 100 的整数,表示模型对其分类的置信度
- `reasoning` — 解释为何如此分类的一句话
分类器在由 `CLASSIFIER_MODE` 环境变量控制的两种模式下运行。在 `manual` 模式下,它从 stdin 读取,以便你可以粘贴单条日志行进行测试。在 `kafka` 模式下,它持续监听 `logs.unfiltered`。
如果 `knowledge/` 文件夹中有文件,其内容会在每次分类调用前注入到系统提示词中。这就是你向分类器传授公司特定日志格式、GDPR 要求或任何自定义分类规则的方式。详情请参阅 [知识库](#knowledge-base) 部分。
### 分析师
**文件:** `analyst.py`
**读取自:** `logs.categories`
**写入至:** `logs.solver_plan`, `analytics`
分析师接收已分类的日志,并决定哪些值得调查。在花费 Token 进行分析之前,它会应用两个过滤器。
第一个过滤器是置信度。任何 `classificationConfidence` 低于 `MIN_CLASSIFICATION_CONFIDENCE` 的日志都会被丢弃,并向 `analytics` 发布跳过事件。该阈值可在 `.env` 中配置。将其设置为 `0` 可完全禁用过滤器并传递所有内容。提高阈值意味着调查的日志更少,但输入质量更高。降低阈值意味着覆盖范围更广,但数据噪声更多。
第二个过滤器是安全相关性。`sendToInvestigationAgent` 为假或 `isCybersecurity` 为假的日志会被跳过。这些是分类器已确定不属于安全问题的基础设施噪声、部署事件和应用程序错误。
对于通过这两个过滤器的日志,分析师将日志文本及其分类发送给 Gemini,并请求进行威胁调查。输出包括:
- `aiSuggestion` — 两到三句话描述正在发生的事情及原因
- `attackVector` — 正在使用的具体技术
- `complexity` — `simple` 或 `complex`,决定了响应器如何处理它
- `autoFixable` — 脚本是否可以在无人身风险的情况下安全地修复此问题
- `requiresHumanApproval` — 在执行任何操作之前是否需要人员签字批准
- `priority` — 1 到 5 的整数,其中 1 最紧急
- `recurrenceRate` — 24 小时内再次发生的概率(0 到 100)
- `notifyTeams` — 应通知哪些团队
- `proposedSteps` — 有序的修复步骤列表,每个步骤包含标题、描述、可选命令、风险级别、预计时间和回滚说明
### 响应器
**文件:** `responder.py`
**读取自:** `logs.solver_plan`
**写入至:** `logs.solution`, `analytics`
响应器获取分析师的调查结果,并将其转化为可执行的解决计划。它携带完整上下文(日志、分类和调查结果)再次调用 Gemini,并生成应用层可据此执行的结构化计划。
输出包括:
- `resolutionMode` — 如果问题可以在无人干预的情况下修复,则为 `autonomous`(自主);如果需要人员参与,则为 `guided`(引导)
- `executiveSummary` — 两到三句话描述事件和解决方法
- `immediateActions` — 立即采取的步骤列表。每个步骤都有一个 `approvalStatus` 字段,值为 `auto` 或 `pending`。标记为 `auto` 的步骤无需询问任何人即可安全执行。标记为 `pending` 的步骤在运行前需要人员在应用层批准
- `followUpActions` — 未来 24 到 48 小时内防止问题再次发生的任务,每个任务都有负责团队和截止日期
- `postIncidentSummary` — 用通俗易懂的语言解释发生了什么、影响了什么或可能影响什么、最可能的根本原因以及未来应做何改变
响应器不会在命令行请求人工输入。所有批准决策均通过读取每个步骤的 `approvalStatus` 字段在应用层进行。当人员在应用中批准或拒绝某个步骤时,该决策会被发布到 `actions` Kafka 主题,响应器消费该主题以执行相应操作或跳过。
## Kafka 主题
| 主题 | 生产者 | 消费者 | 内容 |
|---|---|---|---|
| `logs.unfiltered` | 你的接入服务 | 分类器 | 从源系统接收的原始日志文档 |
| `logs.categories` | 分类器 | 分析师 | 增加了类别、严重程度、标签、置信度和安全标记的日志 |
| `logs.solver_plan` | 分析师 | 响应器 | 包含完整威胁调查和建议修复步骤的安全日志 |
| `logs.solution` | 响应器 | 账本, 网关 | 包含每步批准状态的完整解决计划 |
| `analytics` | 所有三个智能体 | 账本, 网关 | 心跳、每次事件统计和累计计数器 |
如果主题尚不存在,所有主题都会由智能体使用 `ensure_topic()` 自动创建。你无需手动创建它们。
## 分析统计
每个智能体都会向 `analytics` 主题发布消息。每条消息都有一个 `agent` 字段,以便消费者按来源过滤,还有一个 `event` 字段,以便消费者知道消息的类型。
**分类器** 发布:
- `heartbeat` — 存活信号,包含运行时间、接收计数、已分类计数、转发给分析师的计数以及平均处理时间(毫秒)
- `classification_produced` — 每次分类后触发,包含结果详情以及按类别和严重程度的累计细分
**分析师** 发布:
- `heartbeat` — 存活信号,包含运行时间、接收计数、已调查计数、跳过计数和平均处理时间
- `investigation_produced` — 每次成功调查后触发,包含攻击向量、复杂度、优先级、复发率、按风险级别划分的步骤细分以及累计统计
- `log_skipped` — 当日志被丢弃时触发,包含原因(`low_confidence` 或 `not_security`)以及日志的严重程度和类别
**响应器** 发布:
- `heartbeat` — 存活信号,包含运行时间和快速统计
- `resolution_produced` — 每次生成解决计划后触发,包含解决模式、步骤细分以及自主与引导模式的累计比率
- `human_decision` — 当人员在应用中批准或拒绝某个步骤时触发,包含步骤 ID、结果、可选原因以及累计总数
每个智能体的心跳间隔可独立配置。每个事件中的计数器在进程生命周期内是累计的,这意味着它们会在智能体重启时重置。若要永久存储这些指标,请将账本服务指向 `analytics` 主题。
## 知识库
`knowledge/` 文件夹允许你将特定领域的上下文注入分类器的提示词中,而无需修改任何代码。放置在此文件夹中的任何文件都会在分类器启动时加载一次,并附加到每次分类调用的系统提示词中。
支持的文件类型:
- `.pdf` — 使用 PyMuPDF 提取
- `.docx` — 使用 python-docx 提取
- `.txt` — 作为纯文本读取
- `.md` — 作为纯文本读取
此处可放置的内容示例:
- 解释公司内部日志格式及每个字段含义的 PDF
- 影响日志分类方式的 GDPR 或 HIPAA 合规性要求
- OWASP Top 10 参考资料
- 描述团队认为哪些日志模式属于严重 vs 高严重级别的指南
- 内部服务名称及其功能的列表,以便分类器做出更好的决策
要添加新知识,只需将文件放入 `knowledge/` 文件夹并重启分类器。无需其他操作。
如果你有非常大的文档(100 页或更多),请注意全文会被注入到每一次 API 调用中。这会增加成本和延迟。对于当前设置,这是可以接受的。如果这成为问题,下一步是使用像 Chroma 这样的向量数据库实现 RAG,即每次只检索相关的知识块,而不是整个知识库。
## 项目结构
```
rac-agents/
├── knowledge/ Drop PDFs, DOCX, or TXT files here for classifier context
├── classifier.py Agent 1 — classifies raw logs
├── analyst.py Agent 2 — investigates security events
├── responder.py Agent 3 — generates remediation plans
├── kafka_client.py Shared Kafka producer and consumer wrappers
├── knowledge_loader.py Reads and extracts text from files in knowledge/
├── pipeline.json Shared state file used in manual mode
├── .env Environment variables (not committed)
└── .gitignore
```
## 安装设置
**要求:**
- Python 3.10 或更高版本。Python 3.9 可用,但 Google 的库会打印弃用警告。
- 可从你的机器访问的运行中的 Kafka broker
- 来自 Google AI Studio 的 Gemini API 密钥
**克隆并创建虚拟环境:**
```
cd rac-agents
python -m venv venv
source venv/bin/activate # Mac / Linux
venv\Scripts\activate # Windows
```
**安装依赖:**
```
pip install langchain langchain-google-genai kafka-python python-dotenv pymupdf python-docx
```
**创建 `.env` 文件:**
```
cp .env.example .env
```
然后填写你的值。有关所有可用变量,请参阅 [配置说明](#configuration) 部分。
**创建 knowledge 文件夹:**
```
mkdir knowledge
```
**创建 pipeline 文件:**
```
echo "{}" > pipeline.json
```
## 配置说明
所有配置均位于 `.env` 中。这些值均不会提交到代码库。
| 变量 | 默认值 | 描述 |
|---|---|---|
| `GEMINI_API_KEY` | 必填 | 来自 Google AI Studio 的 Google Gemini API 密钥 |
| `KAFKA_BROKERS` | `localhost:29092` | Kafka broker 的地址。如果 Kafka 运行在另一台机器上,请使用该机器的局域网 IP,而不是 localhost。只需定义一次 — 如果你有两个条目,最后一个生效,第一个会被静默忽略 |
| `KAFKA_GROUP_ID` | `classifier-group` | 分类器的 Kafka 消费者组 ID。更改此项会使 Kafka 将分类器视为全新的消费者,这将导致它从主题开头重新处理所有消息 |
| `BATCH_SIZE` | `20` | 分类器在单次轮询中从 Kafka 拉取的消息数量。这并不意味着它们会批量发送给 Gemini — 每条消息仍会单独分类。较高的值可减少轮询开销,但会增加每个周期的内存使用量 |
| `CLASSIFIER_MODE` | `manual` | 设置为 `kafka` 以自动从 Kafka 消费。保留为 `manual` 则从 stdin 读取用于测试 |
| `ANALYST_MODE` | `manual` | 设置为 `kafka` 以自动从 Kafka 消费 |
| `RESPONDER_MODE` | `manual` | 设置为 `kafka` 以自动从 Kafka 消费 |
| `MIN_CLASSIFICATION_CONFIDENCE` | `70` | 0 到 100 的百分比。分类器置信度此值的日志会被分析师跳过。设置为 `0` 可完全禁用过滤器并传递所有日志,无论置信度如何 |
| `CLASSIFIER_HEARTBEAT_INTERVAL` | `30` | 分类器向 analytics 主题发布心跳的频率(秒) |
| `ANALYST_HEARTBEAT_INTERVAL` | `30` | 分析师发布心跳的频率(秒) |
| `RESPONDER_HEARTBEAT_INTERVAL` | `30` | 响应器发布心跳的频率(秒) |
| `KNOWLEDGE_DIR` | `knowledge` | 包含分类器知识文件的文件夹路径。可以是绝对路径,也可以是相对于运行脚本位置的相对路径 |
供参考的完整 `.env` 示例:
```
# Gemini
GEMINI_API_KEY=your_key_here
# Kafka — 使用运行 Docker 机器的 LAN IP,而非 localhost
KAFKA_BROKERS=192.168.1.6:29092
KAFKA_GROUP_ID=classifier-group
BATCH_SIZE=20
# Agent modes — 手动模式用于测试,kafka 用于生产环境
CLASSIFIER_MODE=kafka
ANALYST_MODE=kafka
RESPONDER_MODE=kafka
# Confidence filter — 设置为 0 以禁用并让所有日志通过
MIN_CLASSIFICATION_CONFIDENCE=0
# Heartbeat intervals(以秒为单位)
CLASSIFIER_HEARTBEAT_INTERVAL=30
ANALYST_HEARTBEAT_INTERVAL=30
RESPONDER_HEARTBEAT_INTERVAL=30
```
## 运行智能体
打开三个单独的终端窗口。在每个窗口中激活虚拟环境。
```
source venv/bin/activate
```
按以下顺序启动智能体。每个智能体都依赖于前一个发布消息后才有内容可消费。
**终端 1 — 分类器:**
```
python classifier.py
```
**终端 2 — 分析师:**
```
python analyst.py
```
**终端 3 — 响应器:**
```
python responder.py
```
一旦这三个都运行起来,任何发布到 `logs.unfiltered` 的消息都会自动流经整个流水线。
要在不使用 Kafka 的情况下进行手动测试,请在 `.env` 中将所有三个 `_MODE` 变量设置为 `manual`。然后在任意单个终端中依次运行它们。分类器会提示你粘贴一条日志消息,将结果写入 `pipeline.json`,然后退出。接下来运行分析师以从 `pipeline.json` 读取、进行调查并将结果写回。最后运行响应器以读取调查结果并生成最终计划。
## Pipeline JSON 回退机制
`pipeline.json` 是一个扁平 JSON 文件,在手动模式下运行时充当智能体之间的共享状态。每个智能体在完成其阶段时都会读取并写回该文件。在 Kafka 模式下,该文件仍会在处理每条消息时被写入,作为最后处理项的本地记录,但它不是主要的通信通道。
不要将 `pipeline.json` 提交到代码库。它已在 `.gitignore` 中列出。
## 添加新的知识文件
1. 获取 PDF、DOCX、TXT 或 MD 格式的文件。
2. 将其复制到 `knowledge/` 文件夹中。
3. 重启分类器。
该文件将在启动时加载,你会看到确认信息:
```
[Knowledge] Loaded: gdpr-requirements.pdf (42381 chars)
```
如果文件加载失败,你将看到一条包含原因的错误消息。分类器将继续运行,只是不会加载该文件,而不是崩溃。
要移除知识文件,请将其从文件夹中删除并重启分类器。
标签:Apache Kafka, Gemini, PyRIT, Python, Recognise-Analyse-Counter, SOAR, 人工智能, 分布式系统, 响应大小分析, 多智能体系统, 威胁分析, 安全运营, 扫描框架, 无后门, 日志分类, 深度学习, 用户模式Hook绕过, 网络安全, 自动修复, 自动化侦查工具, 软件成分分析, 逆向工具, 隐私保护