SlavchoVlakeskiGit/threat-intelligence-aggregator

GitHub: SlavchoVlakeskiGit/threat-intelligence-aggregator

一个基于 FastAPI 和 MySQL 的威胁情报聚合与日志关联服务,用于检测恶意指标并生成安全警报。

Stars: 0 | Forks: 0

# 威胁情报聚合器 用于摄取威胁情报源、处理日志并在检测到恶意指标时生成警报的后端服务。 本项目演示了一个简化的 **SOC (Security Operations Center) 自动化流水线**,使用 Python、FastAPI 和 MySQL 实现。 ## API 预览 Swagger API 界面: ![Swagger](https://raw.githubusercontent.com/SlavchoVlakeskiGit/threat-intelligence-aggregator/main/docs/swagger_api.png) 日志摄取后生成的示例警报: ![Alerts](https://static.pigsec.cn/wp-content/uploads/repos/2026/03/d700a5ca85124324.png) ## 架构概览 威胁情报源 JSON → 指标数据库 → 日志摄取 → IOC 匹配 → 警报 API 工作流程: 1. 导入威胁指标(恶意 IP 和域名) 2. 将指标存储在 MySQL 数据库中 3. 通过 API 或示例文件摄取日志条目 4. 将日志与已知指标进行比对 5. 检测到匹配时生成警报 ## 功能特性 - 从 JSON 源摄取威胁情报 - 通过 REST API 摄取日志 - 自动 IOC 匹配 - 针对恶意指标的警报生成 - 基于置信度的严重性评分 - MySQL 数据库持久化 - 使用 FastAPI 构建的 REST API - Swagger API 文档 - 用于测试的示例数据导入 - 基础自动化测试 ## 技术栈 - Python - FastAPI - MySQL - SQLAlchemy - PyMySQL - Pydantic - Uvicorn - Pytest ## 项目结构 ``` threat-intelligence-aggregator │ ├── app │ ├── api │ │ ├── alerts.py │ │ ├── health.py │ │ ├── indicators.py │ │ └── logs.py │ │ │ ├── core │ │ └── config.py │ │ │ ├── db │ │ ├── database.py │ │ └── init_db.py │ │ │ ├── models │ │ ├── alert.py │ │ ├── indicator.py │ │ └── log_entry.py │ │ │ ├── schemas │ │ ├── alert.py │ │ ├── indicator.py │ │ └── log_entry.py │ │ │ ├── services │ │ ├── alert_service.py │ │ ├── feed_service.py │ │ ├── log_service.py │ │ └── matching_service.py │ │ │ └── main.py │ ├── sample_data │ ├── threat_feed.json │ └── sample_logs.json │ ├── tests │ ├── test_health.py │ ├── test_indicators.py │ ├── test_logs.py │ └── test_alerts.py │ ├── docs │ └── architecture.md │ ├── requirements.txt ├── pytest.ini ├── .gitignore ├── LICENSE └── README.md ``` ## 设置 克隆仓库: ``` git clone https://github.com/SlavchoVlakeskiGit/Threat-Intelligence-Aggregator.git cd threat-intelligence-aggregator ``` 创建虚拟环境: ``` python -m venv venv ``` 激活环境: **Windows** ``` venv\Scripts\activate ``` **Linux / macOS** ``` source venv/bin/activate ``` 安装依赖: ``` pip install -r requirements.txt ``` ## 数据库设置 创建数据库和用户: ``` CREATE DATABASE threat_intel_db; CREATE USER 'threat_user'@'localhost' IDENTIFIED BY 'yourpassword'; GRANT ALL PRIVILEGES ON threat_intel_db.* TO 'threat_user'@'localhost'; FLUSH PRIVILEGES; ``` 在项目根目录下创建 `.env` 文件: ``` DATABASE_URL=mysql+pymysql://threat_user:yourpassword@127.0.0.1:3306/threat_intel_db APP_NAME=Threat Intelligence Aggregator APP_VERSION=1.0.0 ``` 初始化数据库表: ``` python -m app.db.init_db ``` ## 运行 API 启动服务器: ``` uvicorn app.main:app --reload ``` 打开 Swagger 文档: ``` http://127.0.0.1:8000/docs ``` ## 示例工作流程 导入威胁指标: ``` POST /api/indicators/import ``` 导入示例日志: ``` POST /api/logs/import ``` 检查生成的警报: ``` GET /api/alerts/ ``` 如果日志与威胁源中的恶意 IP 或域名匹配,系统会自动生成警报。 ## 示例警报 ``` { "matched_value": "185.220.101.1", "indicator_type": "ip", "severity": "high", "description": "Log entry matched malicious IP indicator: 185.220.101.1 (confidence: 90)" } ``` ## 限制 - 使用本地示例 JSON 源而非实时威胁情报来源 - 匹配目前仅支持精确匹配 - 警报评分基于规则且较为简化 - API 尚未包含身份验证或速率限制 ## 运行测试 ``` python -m pytest ``` ## 未来改进 - 集成真实威胁情报源 - 定时情报源摄取 - SIEM 日志摄取流水线 - 受保护端点的身份验证 - 指标过期和来源加权 - Docker 部署 - 用于警报和指标的 Web 仪表板 ## 许可证 MIT License
标签:AMSI绕过, AV绕过, FastAPI, HTTP/HTTPS抓包, IP 地址批量处理, Pydantic, Pytest, Python, REST API, SOC 自动化, SQLAlchemy, Swagger, Uvicorn, 入侵指标, 后端服务, 威胁情报, 威胁检测, 安全告警, 安全运营中心, 开发者工具, 态势感知, 恶意 IP 检测, 无后门, 无线安全, 日志关联分析, 日志摄取, 网络安全, 网络映射, 网络测绘, 脚本检测, 逆向工具, 隐私保护