Admin-or-Admin/rac-agents

GitHub: Admin-or-Admin/rac-agents

基于三个协作 AI 智能体的网络安全流水线,实现从原始日志到威胁修复方案的端到端自动化处理。

Stars: 0 | Forks: 0

# rac-agents **识别. 分析. 分类.** 三个自主 AI 智能体(Agent)构成了一条网络安全流水线。原始日志从一端输入,分类后的威胁及修复方案从另一端输出。每个智能体都是一个独立的 Python 进程,通过 Apache Kafka 进行通信,这意味着你可以在不同的机器上运行它们,独立扩展它们,或者在不影响其他智能体的情况下替换其中一个。 本文档涵盖了每个智能体的功能、如何从零开始设置一切,以及各个部分是如何协同工作的。 ## 目录 - [工作原理](#how-it-works) - [智能体](#agents) - [分类器 (Classifier)](#classifier) - [分析师 (Analyst)](#analyst) - [响应器 (Responder)](#responder) - [Kafka 主题](#kafka-topics) - [分析统计](#analytics) - [知识库](#knowledge-base) - [项目结构](#project-structure) - [安装设置](#setup) - [配置说明](#configuration) - [运行智能体](#running-the-agents) - [Pipeline JSON 回退机制](#pipeline-json-fallback) - [添加新的知识文件](#adding-a-new-knowledge-file) ## 工作原理 每条进入系统的日志都会按顺序经历三个阶段。 分类器(Classifier)从 `logs.unfiltered` Kafka 主题读取原始日志,将每一条发送给 Gemini 进行分类,并将结果发布到 `logs.categories`。分析师(Analyst)从 `logs.categories` 读取数据,过滤掉置信度低于阈值或未被标记为安全相关的内容,并使用具备跨域上下文的 Gemini 对其余内容进行调查。它将调查结果发布到 `logs.solver_plan`。响应器(Responder)从 `logs.solver_plan` 读取数据,生成包含每步风险级别和审批要求的具体修复计划,并将最终输出发布到 `logs.solution`。 所有三个智能体还会向 `analytics` 主题发布操作事件,以便你能全面了解每个智能体正在做什么、耗时多少以及它们是否存活。 ``` logs.unfiltered | classifier | logs.categories | analyst | logs.solver_plan | responder | logs.solution | analytics <-- all three agents publish here in parallel ``` ## 智能体 ### 分类器 **文件:** `classifier.py` **读取自:** `logs.unfiltered` **写入至:** `logs.categories`, `analytics` 分类器是流水线的入口点。它接收任意格式的原始日志消息,并请求 Gemini 对其进行解析。输出是一个结构化的 JSON 对象,所有下游智能体都依赖于此。 对于每条日志,它生成: - `category` — `security`(安全)、`infrastructure`(基础设施)、`application`(应用)或 `deployment`(部署)之一 - `severity` — `critical`(严重)、`high`(高)、`medium`(中)、`low`(低)或 `info`(信息)之一 - `tags` — 两到五个小写描述性标签 - `isCybersecurity` — 此日志是否具有任何安全相关性 - `sendToInvestigationAgent` — 分析师是否应接手此日志。仅当 `isCybersecurity` 为真且严重程度不是 `info` 时才为真 - `classificationConfidence` — 0 到 100 的整数,表示模型对其分类的置信度 - `reasoning` — 解释为何如此分类的一句话 分类器在由 `CLASSIFIER_MODE` 环境变量控制的两种模式下运行。在 `manual` 模式下,它从 stdin 读取,以便你可以粘贴单条日志行进行测试。在 `kafka` 模式下,它持续监听 `logs.unfiltered`。 如果 `knowledge/` 文件夹中有文件,其内容会在每次分类调用前注入到系统提示词中。这就是你向分类器传授公司特定日志格式、GDPR 要求或任何自定义分类规则的方式。详情请参阅 [知识库](#knowledge-base) 部分。 ### 分析师 **文件:** `analyst.py` **读取自:** `logs.categories` **写入至:** `logs.solver_plan`, `analytics` 分析师接收已分类的日志,并决定哪些值得调查。在花费 Token 进行分析之前,它会应用两个过滤器。 第一个过滤器是置信度。任何 `classificationConfidence` 低于 `MIN_CLASSIFICATION_CONFIDENCE` 的日志都会被丢弃,并向 `analytics` 发布跳过事件。该阈值可在 `.env` 中配置。将其设置为 `0` 可完全禁用过滤器并传递所有内容。提高阈值意味着调查的日志更少,但输入质量更高。降低阈值意味着覆盖范围更广,但数据噪声更多。 第二个过滤器是安全相关性。`sendToInvestigationAgent` 为假或 `isCybersecurity` 为假的日志会被跳过。这些是分类器已确定不属于安全问题的基础设施噪声、部署事件和应用程序错误。 对于通过这两个过滤器的日志,分析师将日志文本及其分类发送给 Gemini,并请求进行威胁调查。输出包括: - `aiSuggestion` — 两到三句话描述正在发生的事情及原因 - `attackVector` — 正在使用的具体技术 - `complexity` — `simple` 或 `complex`,决定了响应器如何处理它 - `autoFixable` — 脚本是否可以在无人身风险的情况下安全地修复此问题 - `requiresHumanApproval` — 在执行任何操作之前是否需要人员签字批准 - `priority` — 1 到 5 的整数,其中 1 最紧急 - `recurrenceRate` — 24 小时内再次发生的概率(0 到 100) - `notifyTeams` — 应通知哪些团队 - `proposedSteps` — 有序的修复步骤列表,每个步骤包含标题、描述、可选命令、风险级别、预计时间和回滚说明 ### 响应器 **文件:** `responder.py` **读取自:** `logs.solver_plan` **写入至:** `logs.solution`, `analytics` 响应器获取分析师的调查结果,并将其转化为可执行的解决计划。它携带完整上下文(日志、分类和调查结果)再次调用 Gemini,并生成应用层可据此执行的结构化计划。 输出包括: - `resolutionMode` — 如果问题可以在无人干预的情况下修复,则为 `autonomous`(自主);如果需要人员参与,则为 `guided`(引导) - `executiveSummary` — 两到三句话描述事件和解决方法 - `immediateActions` — 立即采取的步骤列表。每个步骤都有一个 `approvalStatus` 字段,值为 `auto` 或 `pending`。标记为 `auto` 的步骤无需询问任何人即可安全执行。标记为 `pending` 的步骤在运行前需要人员在应用层批准 - `followUpActions` — 未来 24 到 48 小时内防止问题再次发生的任务,每个任务都有负责团队和截止日期 - `postIncidentSummary` — 用通俗易懂的语言解释发生了什么、影响了什么或可能影响什么、最可能的根本原因以及未来应做何改变 响应器不会在命令行请求人工输入。所有批准决策均通过读取每个步骤的 `approvalStatus` 字段在应用层进行。当人员在应用中批准或拒绝某个步骤时,该决策会被发布到 `actions` Kafka 主题,响应器消费该主题以执行相应操作或跳过。 ## Kafka 主题 | 主题 | 生产者 | 消费者 | 内容 | |---|---|---|---| | `logs.unfiltered` | 你的接入服务 | 分类器 | 从源系统接收的原始日志文档 | | `logs.categories` | 分类器 | 分析师 | 增加了类别、严重程度、标签、置信度和安全标记的日志 | | `logs.solver_plan` | 分析师 | 响应器 | 包含完整威胁调查和建议修复步骤的安全日志 | | `logs.solution` | 响应器 | 账本, 网关 | 包含每步批准状态的完整解决计划 | | `analytics` | 所有三个智能体 | 账本, 网关 | 心跳、每次事件统计和累计计数器 | 如果主题尚不存在,所有主题都会由智能体使用 `ensure_topic()` 自动创建。你无需手动创建它们。 ## 分析统计 每个智能体都会向 `analytics` 主题发布消息。每条消息都有一个 `agent` 字段,以便消费者按来源过滤,还有一个 `event` 字段,以便消费者知道消息的类型。 **分类器** 发布: - `heartbeat` — 存活信号,包含运行时间、接收计数、已分类计数、转发给分析师的计数以及平均处理时间(毫秒) - `classification_produced` — 每次分类后触发,包含结果详情以及按类别和严重程度的累计细分 **分析师** 发布: - `heartbeat` — 存活信号,包含运行时间、接收计数、已调查计数、跳过计数和平均处理时间 - `investigation_produced` — 每次成功调查后触发,包含攻击向量、复杂度、优先级、复发率、按风险级别划分的步骤细分以及累计统计 - `log_skipped` — 当日志被丢弃时触发,包含原因(`low_confidence` 或 `not_security`)以及日志的严重程度和类别 **响应器** 发布: - `heartbeat` — 存活信号,包含运行时间和快速统计 - `resolution_produced` — 每次生成解决计划后触发,包含解决模式、步骤细分以及自主与引导模式的累计比率 - `human_decision` — 当人员在应用中批准或拒绝某个步骤时触发,包含步骤 ID、结果、可选原因以及累计总数 每个智能体的心跳间隔可独立配置。每个事件中的计数器在进程生命周期内是累计的,这意味着它们会在智能体重启时重置。若要永久存储这些指标,请将账本服务指向 `analytics` 主题。 ## 知识库 `knowledge/` 文件夹允许你将特定领域的上下文注入分类器的提示词中,而无需修改任何代码。放置在此文件夹中的任何文件都会在分类器启动时加载一次,并附加到每次分类调用的系统提示词中。 支持的文件类型: - `.pdf` — 使用 PyMuPDF 提取 - `.docx` — 使用 python-docx 提取 - `.txt` — 作为纯文本读取 - `.md` — 作为纯文本读取 此处可放置的内容示例: - 解释公司内部日志格式及每个字段含义的 PDF - 影响日志分类方式的 GDPR 或 HIPAA 合规性要求 - OWASP Top 10 参考资料 - 描述团队认为哪些日志模式属于严重 vs 高严重级别的指南 - 内部服务名称及其功能的列表,以便分类器做出更好的决策 要添加新知识,只需将文件放入 `knowledge/` 文件夹并重启分类器。无需其他操作。 如果你有非常大的文档(100 页或更多),请注意全文会被注入到每一次 API 调用中。这会增加成本和延迟。对于当前设置,这是可以接受的。如果这成为问题,下一步是使用像 Chroma 这样的向量数据库实现 RAG,即每次只检索相关的知识块,而不是整个知识库。 ## 项目结构 ``` rac-agents/ ├── knowledge/ Drop PDFs, DOCX, or TXT files here for classifier context ├── classifier.py Agent 1 — classifies raw logs ├── analyst.py Agent 2 — investigates security events ├── responder.py Agent 3 — generates remediation plans ├── kafka_client.py Shared Kafka producer and consumer wrappers ├── knowledge_loader.py Reads and extracts text from files in knowledge/ ├── pipeline.json Shared state file used in manual mode ├── .env Environment variables (not committed) └── .gitignore ``` ## 安装设置 **要求:** - Python 3.10 或更高版本。Python 3.9 可用,但 Google 的库会打印弃用警告。 - 可从你的机器访问的运行中的 Kafka broker - 来自 Google AI Studio 的 Gemini API 密钥 **克隆并创建虚拟环境:** ``` cd rac-agents python -m venv venv source venv/bin/activate # Mac / Linux venv\Scripts\activate # Windows ``` **安装依赖:** ``` pip install langchain langchain-google-genai kafka-python python-dotenv pymupdf python-docx ``` **创建 `.env` 文件:** ``` cp .env.example .env ``` 然后填写你的值。有关所有可用变量,请参阅 [配置说明](#configuration) 部分。 **创建 knowledge 文件夹:** ``` mkdir knowledge ``` **创建 pipeline 文件:** ``` echo "{}" > pipeline.json ``` ## 配置说明 所有配置均位于 `.env` 中。这些值均不会提交到代码库。 | 变量 | 默认值 | 描述 | |---|---|---| | `GEMINI_API_KEY` | 必填 | 来自 Google AI Studio 的 Google Gemini API 密钥 | | `KAFKA_BROKERS` | `localhost:29092` | Kafka broker 的地址。如果 Kafka 运行在另一台机器上,请使用该机器的局域网 IP,而不是 localhost。只需定义一次 — 如果你有两个条目,最后一个生效,第一个会被静默忽略 | | `KAFKA_GROUP_ID` | `classifier-group` | 分类器的 Kafka 消费者组 ID。更改此项会使 Kafka 将分类器视为全新的消费者,这将导致它从主题开头重新处理所有消息 | | `BATCH_SIZE` | `20` | 分类器在单次轮询中从 Kafka 拉取的消息数量。这并不意味着它们会批量发送给 Gemini — 每条消息仍会单独分类。较高的值可减少轮询开销,但会增加每个周期的内存使用量 | | `CLASSIFIER_MODE` | `manual` | 设置为 `kafka` 以自动从 Kafka 消费。保留为 `manual` 则从 stdin 读取用于测试 | | `ANALYST_MODE` | `manual` | 设置为 `kafka` 以自动从 Kafka 消费 | | `RESPONDER_MODE` | `manual` | 设置为 `kafka` 以自动从 Kafka 消费 | | `MIN_CLASSIFICATION_CONFIDENCE` | `70` | 0 到 100 的百分比。分类器置信度此值的日志会被分析师跳过。设置为 `0` 可完全禁用过滤器并传递所有日志,无论置信度如何 | | `CLASSIFIER_HEARTBEAT_INTERVAL` | `30` | 分类器向 analytics 主题发布心跳的频率(秒) | | `ANALYST_HEARTBEAT_INTERVAL` | `30` | 分析师发布心跳的频率(秒) | | `RESPONDER_HEARTBEAT_INTERVAL` | `30` | 响应器发布心跳的频率(秒) | | `KNOWLEDGE_DIR` | `knowledge` | 包含分类器知识文件的文件夹路径。可以是绝对路径,也可以是相对于运行脚本位置的相对路径 | 供参考的完整 `.env` 示例: ``` # Gemini GEMINI_API_KEY=your_key_here # Kafka — 使用运行 Docker 机器的 LAN IP,而非 localhost KAFKA_BROKERS=192.168.1.6:29092 KAFKA_GROUP_ID=classifier-group BATCH_SIZE=20 # Agent modes — 手动模式用于测试,kafka 用于生产环境 CLASSIFIER_MODE=kafka ANALYST_MODE=kafka RESPONDER_MODE=kafka # Confidence filter — 设置为 0 以禁用并让所有日志通过 MIN_CLASSIFICATION_CONFIDENCE=0 # Heartbeat intervals(以秒为单位) CLASSIFIER_HEARTBEAT_INTERVAL=30 ANALYST_HEARTBEAT_INTERVAL=30 RESPONDER_HEARTBEAT_INTERVAL=30 ``` ## 运行智能体 打开三个单独的终端窗口。在每个窗口中激活虚拟环境。 ``` source venv/bin/activate ``` 按以下顺序启动智能体。每个智能体都依赖于前一个发布消息后才有内容可消费。 **终端 1 — 分类器:** ``` python classifier.py ``` **终端 2 — 分析师:** ``` python analyst.py ``` **终端 3 — 响应器:** ``` python responder.py ``` 一旦这三个都运行起来,任何发布到 `logs.unfiltered` 的消息都会自动流经整个流水线。 要在不使用 Kafka 的情况下进行手动测试,请在 `.env` 中将所有三个 `_MODE` 变量设置为 `manual`。然后在任意单个终端中依次运行它们。分类器会提示你粘贴一条日志消息,将结果写入 `pipeline.json`,然后退出。接下来运行分析师以从 `pipeline.json` 读取、进行调查并将结果写回。最后运行响应器以读取调查结果并生成最终计划。 ## Pipeline JSON 回退机制 `pipeline.json` 是一个扁平 JSON 文件,在手动模式下运行时充当智能体之间的共享状态。每个智能体在完成其阶段时都会读取并写回该文件。在 Kafka 模式下,该文件仍会在处理每条消息时被写入,作为最后处理项的本地记录,但它不是主要的通信通道。 不要将 `pipeline.json` 提交到代码库。它已在 `.gitignore` 中列出。 ## 添加新的知识文件 1. 获取 PDF、DOCX、TXT 或 MD 格式的文件。 2. 将其复制到 `knowledge/` 文件夹中。 3. 重启分类器。 该文件将在启动时加载,你会看到确认信息: ``` [Knowledge] Loaded: gdpr-requirements.pdf (42381 chars) ``` 如果文件加载失败,你将看到一条包含原因的错误消息。分类器将继续运行,只是不会加载该文件,而不是崩溃。 要移除知识文件,请将其从文件夹中删除并重启分类器。
标签:Apache Kafka, Gemini, PyRIT, Python, Recognise-Analyse-Counter, SOAR, 人工智能, 分布式系统, 响应大小分析, 多智能体系统, 威胁分析, 安全运营, 扫描框架, 无后门, 日志分类, 深度学习, 用户模式Hook绕过, 网络安全, 自动修复, 自动化侦查工具, 软件成分分析, 逆向工具, 隐私保护