SlavchoVlakeskiGit/threat-intelligence-aggregator
GitHub: SlavchoVlakeskiGit/threat-intelligence-aggregator
一个基于 FastAPI 和 MySQL 的威胁情报聚合与日志关联服务,用于检测恶意指标并生成安全警报。
Stars: 0 | Forks: 0
# 威胁情报聚合器
用于摄取威胁情报源、处理日志并在检测到恶意指标时生成警报的后端服务。
本项目演示了一个简化的 **SOC (Security Operations Center) 自动化流水线**,使用 Python、FastAPI 和 MySQL 实现。
## API 预览
Swagger API 界面:

日志摄取后生成的示例警报:

## 架构概览
威胁情报源 JSON → 指标数据库 → 日志摄取 → IOC 匹配 → 警报 API
工作流程:
1. 导入威胁指标(恶意 IP 和域名)
2. 将指标存储在 MySQL 数据库中
3. 通过 API 或示例文件摄取日志条目
4. 将日志与已知指标进行比对
5. 检测到匹配时生成警报
## 功能特性
- 从 JSON 源摄取威胁情报
- 通过 REST API 摄取日志
- 自动 IOC 匹配
- 针对恶意指标的警报生成
- 基于置信度的严重性评分
- MySQL 数据库持久化
- 使用 FastAPI 构建的 REST API
- Swagger API 文档
- 用于测试的示例数据导入
- 基础自动化测试
## 技术栈
- Python
- FastAPI
- MySQL
- SQLAlchemy
- PyMySQL
- Pydantic
- Uvicorn
- Pytest
## 项目结构
```
threat-intelligence-aggregator
│
├── app
│ ├── api
│ │ ├── alerts.py
│ │ ├── health.py
│ │ ├── indicators.py
│ │ └── logs.py
│ │
│ ├── core
│ │ └── config.py
│ │
│ ├── db
│ │ ├── database.py
│ │ └── init_db.py
│ │
│ ├── models
│ │ ├── alert.py
│ │ ├── indicator.py
│ │ └── log_entry.py
│ │
│ ├── schemas
│ │ ├── alert.py
│ │ ├── indicator.py
│ │ └── log_entry.py
│ │
│ ├── services
│ │ ├── alert_service.py
│ │ ├── feed_service.py
│ │ ├── log_service.py
│ │ └── matching_service.py
│ │
│ └── main.py
│
├── sample_data
│ ├── threat_feed.json
│ └── sample_logs.json
│
├── tests
│ ├── test_health.py
│ ├── test_indicators.py
│ ├── test_logs.py
│ └── test_alerts.py
│
├── docs
│ └── architecture.md
│
├── requirements.txt
├── pytest.ini
├── .gitignore
├── LICENSE
└── README.md
```
## 设置
克隆仓库:
```
git clone https://github.com/SlavchoVlakeskiGit/Threat-Intelligence-Aggregator.git
cd threat-intelligence-aggregator
```
创建虚拟环境:
```
python -m venv venv
```
激活环境:
**Windows**
```
venv\Scripts\activate
```
**Linux / macOS**
```
source venv/bin/activate
```
安装依赖:
```
pip install -r requirements.txt
```
## 数据库设置
创建数据库和用户:
```
CREATE DATABASE threat_intel_db;
CREATE USER 'threat_user'@'localhost' IDENTIFIED BY 'yourpassword';
GRANT ALL PRIVILEGES ON threat_intel_db.* TO 'threat_user'@'localhost';
FLUSH PRIVILEGES;
```
在项目根目录下创建 `.env` 文件:
```
DATABASE_URL=mysql+pymysql://threat_user:yourpassword@127.0.0.1:3306/threat_intel_db
APP_NAME=Threat Intelligence Aggregator
APP_VERSION=1.0.0
```
初始化数据库表:
```
python -m app.db.init_db
```
## 运行 API
启动服务器:
```
uvicorn app.main:app --reload
```
打开 Swagger 文档:
```
http://127.0.0.1:8000/docs
```
## 示例工作流程
导入威胁指标:
```
POST /api/indicators/import
```
导入示例日志:
```
POST /api/logs/import
```
检查生成的警报:
```
GET /api/alerts/
```
如果日志与威胁源中的恶意 IP 或域名匹配,系统会自动生成警报。
## 示例警报
```
{
"matched_value": "185.220.101.1",
"indicator_type": "ip",
"severity": "high",
"description": "Log entry matched malicious IP indicator: 185.220.101.1 (confidence: 90)"
}
```
## 限制
- 使用本地示例 JSON 源而非实时威胁情报来源
- 匹配目前仅支持精确匹配
- 警报评分基于规则且较为简化
- API 尚未包含身份验证或速率限制
## 运行测试
```
python -m pytest
```
## 未来改进
- 集成真实威胁情报源
- 定时情报源摄取
- SIEM 日志摄取流水线
- 受保护端点的身份验证
- 指标过期和来源加权
- Docker 部署
- 用于警报和指标的 Web 仪表板
## 许可证
MIT License
标签:AMSI绕过, AV绕过, FastAPI, HTTP/HTTPS抓包, IP 地址批量处理, Pydantic, Pytest, Python, REST API, SOC 自动化, SQLAlchemy, Swagger, Uvicorn, 入侵指标, 后端服务, 威胁情报, 威胁检测, 安全告警, 安全运营中心, 开发者工具, 态势感知, 恶意 IP 检测, 无后门, 无线安全, 日志关联分析, 日志摄取, 网络安全, 网络映射, 网络测绘, 脚本检测, 逆向工具, 隐私保护