op7ic/amphunt

GitHub: op7ic/amphunt

为 Cisco Secure Endpoint 提供高级威胁狩猎与自动化分析的 Python 工具集合。

Stars: 14 | Forks: 5

# amphunt 本仓库包含用于 [Cisco Secure Endpoint](https://www.cisco.com/site/us/en/products/security/endpoint-security/secure-endpoint/index.html) API 的高级威胁狩猎脚本。这些脚本利用 AMP API 搜索威胁、分析终端行为,并使用 API 版本 0 和 1 检测整个环境中的潜在妥协。 ## 概述 这些脚本基于一个 Python 库(`amp_client`),它提供以下功能: - **速率限制**:自动 API 速率限制管理 - **连接池**:通过连接复用提升性能 - **响应缓存**:减少冗余 API 调用 - **错误处理**:优雅的降级和自动重试 已知的 Windows SHA256 哈希值取自 [WINFINGER](https://github.com/op7ic/WINFINGER) 仓库,可用于搜索潜在的恶意命令,例如 `net user admin /add` 这类依赖内置 Windows 工具的命令。多个包含已知黑客工具包的 GitHub 仓库的哈希值也已提供,以增强检测能力。 ## 安装 1. **克隆仓库:** ``` git clone https://github.com/op7ic/amphunt.git cd amphunt ``` 2. **安装 AMP 客户端库:** ``` pip install -e . ``` 3. **配置 API 凭证:** 创建 `config.txt` 文件(参见[示例](config.txt)): ``` [settings] client_id = YOUR_CLIENT_ID api_key = YOUR_API_KEY region = nam # nam, eu, or apjc ``` 或使用环境变量: ``` export AMP_CLIENT_ID="your_client_id" export AMP_API_KEY="your_api_key" export AMP_REGION="nam" # nam, eu, or apjc ``` ## AMP API 端点 选择适合您区域的端点: - `nam` - 北美(`api.amp.cisco.com`) - `eu` - 欧洲(`api.eu.amp.cisco.com`) - `apjc` - 亚太、日本和中国(`api.apjc.amp.cisco.com`) ## 脚本文档 ### 时间线分析 #### timeliner.py 提取所有端点的完整事件时间线,并为每个端点写入独立文件。 **用法:** ``` python3 timeliner.py -c -o # 或 python3 timeliner.py -c config.txt -o ./timelines/ ``` **分析提示:** ``` # 查找新创建的文件 grep 'Created by' *.txt # 查找已执行文件 grep 'Executed by' *.txt # 查找威胁 grep 'Threat' *.txt # 查找网络连接 grep 'NFM' *.txt # 查找文档活动 grep -E '\.doc|\.xls|\.pdf|\.ppt' *.txt # 可疑进程周围的环境 grep -A 10 -B 10 -i "cmd\.exe\|rundll32\.exe\|powershell\.exe" *.txt ``` #### surround.py 通过计算机的 UUID 生成时间线。 **用法:** ``` python3 surround.py -c config.txt -o ./output/ -u ``` ### 哈希分析 #### hash2processarg.py 搜索匹配特定 SHA256 哈希的进程并获取其命令行参数。 **用法:** ``` python3 hash2processarg.py -c config.txt hashset/windows-binaries/cmd.exe.txt python3 hash2processarg.py -c config.txt hashes.txt --csv output.csv ``` **示例输出:** ``` [+] Process SHA256 : abc123... Child SHA256: def456... [+] 2024-01-01 10:00:00 : hostname Process name: cmd.exe args: /c whoami ``` #### hash2connection.py 查找与特定文件哈希关联的网络连接。 **用法:** ``` python3 hash2connection.py -c config.txt hashset/hacking-tools/mimikatz.txt python3 hash2connection.py -c config.txt suspicious_hashes.txt --csv connections.csv ``` ### 网络分析 #### allconnections.py 转储所有端点的所有网络连接。 **用法:** ``` python3 allconnections.py -c config.txt python3 allconnections.py -c config.txt --csv all_connections.csv python3 allconnections.py -c config.txt --no-sanitize # Don't sanitize IPs/URLs python3 allconnections.py -c config.txt --limit 100 # Process only 100 computers ``` **示例输出:** ``` [+] Outbound network event at hostname : workstation01 2024-01-01 10:00:00 : outbound : workstation01 : TCP 10.0.0.100:51234 -> 93.184.216.34:443 ``` #### dumpallURL.py 提取所有端点的所有 URL 请求。 **用法:** ``` python3 dumpallURL.py -c config.txt python3 dumpallURL.py -c config.txt --csv urls.csv python3 dumpallURL.py -c config.txt --summary # Show domain statistics ``` #### lateral_movement.py 通过监控特定端口(SMB、RDP、WinRM、RPC)检测潜在横向移动。 **监控端口:** - 139, 445(SMB) - 3389(RDP) - 5985, 5986(WinRM) - 135(RPC/WMIC) ### 威胁狩猎 #### multikeyword_search.py 在所有端点中搜索多个关键字、IP 地址或 SHA256 哈希。 **用法:** ``` python3 multikeyword_search.py -c config.txt keywords.txt python3 multikeyword_search.py -c config.txt keywords.txt --csv results.csv ``` **示例关键字文件:** ``` mimikatz.exe 192.168.1.100 powershell.exe -enc e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 net user /add ``` #### fresh_vulnerabilities.py 识别具有 CVE 详细信息的易受攻击的应用程序。 **用法:** ``` python3 fresh_vulnerabilities.py -c config.txt python3 fresh_vulnerabilities.py -c config.txt --csv vulnerabilities.csv --summary ``` **输出包括:** - CVE 标识符 - CVSS 分数(单个和平均值) - 受影响的产品和版本 - 参考 URL ### 统计与报告 #### amp_generic_stats.py 生成用于异常检测的全面统计信息。 **用法:** ``` python3 amp_generic_stats.py -c config.txt python3 amp_generic_stats.py -c config.txt --csv stats.csv ``` **收集的指标:** - 易受攻击的应用程序 - 网络事件(NFM) - 文件执行、创建、移动 - 威胁检测和隔离 - 恶意活动检测 #### getSpecificEvent.py 按事件 ID 提取所有事件。 **用法:** ``` python3 getSpecificEvent.py -c config.txt output.csv # 示例: python3 getSpecificEvent.py -c config.txt 1090519054 threats.csv # Threat Detected python3 getSpecificEvent.py -c config.txt 1107296274 cloud_ioc.csv # Cloud IOC python3 getSpecificEvent.py -c config.txt 1107296279 vulnerable.csv # Vulnerable Application ``` **常见事件 ID:** - `1090519054` - 威胁检测 - `553648143` - 威胁隔离 - `1107296272` - 执行恶意软件 - `1107296274` - 云 IOC - `1107296279` - 检测到易受攻击的应用程序 - `1107296284` - 潜在勒索软件 - `553648147` - 网络文件移动(NFM) ## 哈希集合 本仓库包含预计算的 SHA256 哈希值: ### Windows 二进制文件(LOLBINS) 位于 `hashset/windows-binaries/`: - 可被滥用的系统实用工具(certutil.exe、bitsadmin.exe 等) - PowerShell 和脚本主机 - 网络工具(net.exe、netsh.exe) ### 黑客工具 位于 `hashset/hacking-tools/`: - [LaZagne](https://github.com/AlessandroZ/LaZagne) - 密码恢复 - [Mimikatz](https://github.com/gentilkiwi/mimikatz) - 凭据转储 - [Impacket](https://github.com/SecureAuthCorp/impacket) - 网络协议 - [PowerSploit](https://github.com/PowerShellMafia/PowerSploit) - PowerShell 后渗透 - [GhostPack](https://github.com/GhostPack) - .NET 后渗透 - [Metasploit](https://github.com/rapid7/metasploit-framework) 模块 ### 漏洞利用 位于 `hashset/exploits/`: - Windows 内核漏洞 - Linux 内核漏洞 - Exploit-DB 存档 ## 狩猎示例 ### 检测 PowerShell Empire/编码命令 ``` echo "powershell.exe -enc" > keywords.txt echo "powershell.exe -encoded" >> keywords.txt echo "iex(" >> keywords.txt python3 multikeyword_search.py -c config.txt keywords.txt --csv powershell_suspicious.csv ``` ### 搜索凭据转储 ``` cat hashset/hacking-tools/mimikatz.txt > cred_tools.txt cat hashset/windows-binaries/procdump.exe.txt >> cred_tools.txt echo "lsass" >> cred_tools.txt python3 multikeyword_search.py -c config.txt cred_tools.txt --csv credential_dumping.csv ``` ### 识别横向移动 ``` # 检查 PsExec 和类似工具 python3 hash2connection.py -c config.txt hashset/psexec/psexec.exe.txt --csv psexec_connections.csv # 监控横向移动协议 python3 lateral_movement.py -c config.txt --csv lateral_movement.csv --summary # 检查可疑服务名称 echo "psexesvc" > keywords.txt echo "paexec" >> keywords.txt python3 multikeyword_search.py -c config.txt keywords.txt ``` ### 检测持久化机制 ``` echo "schtasks /create" > persistence.txt echo "reg add.*CurrentVersion\\Run" >> persistence.txt echo "sc create" >> persistence.txt echo "wmic.*startup" >> persistence.txt python3 multikeyword_search.py -c config.txt persistence.txt --csv persistence.csv ``` ## AI 辅助威胁狩猎 amphunt 包含内置的 AI 技能,支持三大主流 LLM 平台。当您在任何受支持的 CLI 中打开此项目时,AI 代理会自动发现狩猎技能并代表您执行。 ### 支持的平台 | 平台 | 技能源(SKILLS/) | 部署位置 | 用法 | |------|------------------|----------|------| | Claude Code | `.claude/skills/` | `.claude/skills/` | 输入 `claude` 然后提问 | | Codex CLI | `.codex/` | `AGENTS.md` + `instructions/` | 输入 `codex` 然后提问 | | Gemini CLI | `.gemini/` | `GEMINI.md` + `playbooks/` | 输入 `gemini` 然后提问 | ### 安装设置 技能存储在 `SKILLS/` 中,并部署到平台原生位置: ``` bash SKILLS/install.sh # Deploy all platforms bash SKILLS/install.sh claude # Claude Code only bash SKILLS/install.sh codex # Codex CLI only bash SKILLS/install.sh gemini # Gemini CLI only ``` ### 快速开始 ``` # Claude Code cd amphunt && claude # “在所有端点中查找凭据转储工具” # Codex CLI cd amphunt && codex # “运行横向移动检测扫描” # Gemini CLI cd amphunt && gemini # “检查环境中的持久性机制” ``` ### AI 能做什么 AI 代理可以处理上述手动示例中的所有十个狩猎场景,并额外提供: - 基于自然语言请求的自动脚本选择 - 狩猎链(一个狩猎的发现作为下一个狩猎的输入) - 使用 MITRE ATT&CK 映射进行结果解释 - 误报分类指导 - 范围管理(`--limit` 用于大型环境) 每个平台拥有 10 个模块化狩猎技能,涵盖:时间线分析、基于哈希的 IOC 狩猎、网络连接、横向移动、漏洞、关键字/IOC 搜索、凭据窃取、持久化机制、统计异常分析和事件提取。 请参阅 `SKILLS/README.md` 获取完整文档、平台对比和贡献指南。 ## 最佳实践 1. **速率限制**:库会自动处理速率限制 2. **缓存**:对重复查询启用缓存: export AMP_CACHE_ENABLED=true export AMP_CACHE_TTL=300 # 5 分钟 3. **输出管理**:处理大型数据集时始终使用 `--csv` 标志以避免控制台溢出。 4. **定期狩猎**:定期执行以下狩猎: - 新的易受攻击的应用程序 - 可疑的网络连接 - 已知的恶意哈希 - 横向移动模式 ## 限制 - AMP 轨迹 API 每个端点最多返回 500 个事件 - 历史数据可能受保留策略限制 - 如果超出速率限制,某些事件可能会被遗漏 ## 编写新脚本 本节介绍如何使用 amp_client 库创建新脚本。 ### 基本脚本模板 ``` #!/usr/bin/env python3 """ Script Name: your_script.py Author: Your Name Copyright: See LICENSE file Github: https://github.com/op7ic/amphunt your_script.py - Brief description of what your script does This script performs [detailed description of functionality]. Usage: python your_script.py -c [options] """ import sys import argparse from amp_client import AMPClient, Config # 根据需要添加其他导入 def main(): parser = argparse.ArgumentParser(description='Your script description') parser.add_argument('-c', '--config', required=True, help='Configuration file path') # Add other arguments as needed parser.add_argument('--csv', help='Export results to CSV file') parser.add_argument('--limit', type=int, help='Limit number of computers to process') args = parser.parse_args() # Load configuration config = Config.from_file(args.config) # Create client and perform operations with AMPClient(config) as client: # Your code here pass if __name__ == '__main__': main() ``` ### 必需导入 #### 所有脚本的核心导入: ``` from amp_client import AMPClient, Config ``` #### 基于功能的其他导入: **事件处理:** ``` from amp_client.models import EventType, Event ``` **计算机/轨迹数据:** ``` from amp_client.models import Computer, ComputerTrajectory, TrajectoryEvent ``` **网络事件:** ``` from amp_client.models import NetworkEvent from amp_client.utils import NetworkEventFormatter ``` **文件事件:** ``` from amp_client.utils import FileEventFormatter ``` **CSV/JSON 导出:** ``` from amp_client.utils import CSVExporter, JSONExporter ``` **横向移动检测:** ``` from amp_client.utils import LateralMovementDetector ``` **哈希验证:** ``` from amp_client.utils import HashIdentifier, Validators ``` **错误处理:** ``` from amp_client import AMPError, AMPAuthenticationError, AMPRateLimitError ``` ### 通用模式 #### 1. 处理所有计算机: ``` with AMPClient(config) as client: computers = list(client.computers.list(limit=args.limit)) for computer in computers: print(f"Processing {computer.hostname} ({computer.connector_guid})") # Process each computer ``` #### 2. 获取计算机的事件: ``` events = list(client.events.list( connector_guid=computer.connector_guid, limit=500 # Max 500 events )) # 或针对特定事件类型 events = list(client.events.list( connector_guid=computer.connector_guid, event_type=[EventType.THREAT_DETECTED, EventType.THREAT_QUARANTINED] )) ``` #### 3. 获取计算机轨迹: ``` trajectory = client.computers.get_trajectory( connector_guid=computer.connector_guid, limit=500 # Max 500 events ) for event in trajectory.events: if event.file and event.file.identity: print(f"{event.timestamp}: {event.file.file_name} - {event.file.identity.sha256}") ``` #### 4. 导出为 CSV: ``` # 创建导出器 csv_exporter = CSVExporter(args.csv) # 定义列 csv_exporter.write_header(['timestamp', 'hostname', 'event_type', 'description']) # 写入数据 for event in events: csv_exporter.write_row([ event.timestamp, computer.hostname, event.event_type_name, event.description ]) csv_exporter.close() ``` #### 5. 错误处理: ``` try: with AMPClient(config) as client: # Your code pass except AMPAuthenticationError: print("[-] Authentication failed. Check your credentials.") sys.exit(1) except AMPRateLimitError: print("[-] Rate limit exceeded. Try again later.") sys.exit(1) except AMPError as e: print(f"[-] AMP API error: {e}") sys.exit(1) ``` ### 配置选项 Config 类接受以下参数: - `client_id`(必需):您的 AMP API 客户端 ID - `api_key`(必需):您的 AMP API 密钥 - `region`:`'nam'`(默认)、`'eu'` 或 `'apjc'` - `domain`:自定义 API 域(由区域自动设置) - `rate_limit_buffer`:在速率限制前等待的秒数(默认:5) - `retry_attempts`:重试次数(默认:3) - `retry_backoff`:重试退避倍数(默认:2) - `cache_enabled`:启用响应缓存(默认:False) - `cache_ttl`:缓存生存时间(秒),默认 300 ### 最佳实践 1. **始终使用上下文管理器**:使用 `with AMPClient(config) as client:` 确保正确清理 2. **处理分页**:库会自动处理使用 `.list()` 方法时的分页 3. **添加进度指示器**:对于长时间运行的操作,显示进度给用户 4. **验证输入**:使用 `Validators` 类验证哈希和输入 5. **遵循命名约定**:使用描述性变量名并遵循 PEP 8 6. **添加详细帮助**:清晰记录所有命令行参数 7. **使用限制进行测试**:在处理所有计算机之前,始终先使用 `--limit` 标志测试 ### 示例:创建自定义威胁狩猎器 ``` #!/usr/bin/env python3 """ custom_hunter.py - Hunt for specific IOCs across all endpoints """ import sys import argparse from collections import defaultdict from amp_client import AMPClient, Config from amp_client.models import EventType from amp_client.utils import CSVExporter, HashIdentifier def main(): parser = argparse.ArgumentParser(description='Hunt for custom IOCs') parser.add_argument('-c', '--config', required=True, help='Configuration file path') parser.add_argument('-i', '--iocs', required=True, help='File containing IOCs (one per line)') parser.add_argument('--csv', help='Export results to CSV file') parser.add_argument('--limit', type=int, help='Limit number of computers to process') args = parser.parse_args() # Load IOCs with open(args.iocs, 'r') as f: iocs = {line.strip().lower() for line in f if line.strip()} # Categorize IOCs hash_identifier = HashIdentifier() sha256_hashes = {ioc for ioc in iocs if hash_identifier.is_sha256(ioc)} other_iocs = iocs - sha256_hashes print(f"[+] Loaded {len(sha256_hashes)} SHA256 hashes and {len(other_iocs)} other IOCs") # Load configuration and create client config = Config.from_file(args.config) with AMPClient(config) as client: # Get computers computers = list(client.computers.list(limit=args.limit)) print(f"[+] Processing {len(computers)} computers") results = [] for computer in computers: print(f"[+] Checking {computer.hostname}") # Get trajectory trajectory = client.computers.get_trajectory( connector_guid=computer.connector_guid, limit=500 ) # Check each event for event in trajectory.events: # Check file hashes if event.file and event.file.identity: if event.file.identity.sha256 in sha256_hashes: results.append({ 'timestamp': event.timestamp, 'hostname': computer.hostname, 'ioc_type': 'SHA256', 'ioc_value': event.file.identity.sha256, 'file_name': event.file.file_name, 'file_path': event.file.file_path }) # Check other IOCs in file paths and names if event.file: for ioc in other_iocs: if (event.file.file_name and ioc in event.file.file_name.lower() or event.file.file_path and ioc in event.file.file_path.lower()): results.append({ 'timestamp': event.timestamp, 'hostname': computer.hostname, 'ioc_type': 'String', 'ioc_value': ioc, 'file_name': event.file.file_name, 'file_path': event.file.file_path }) # Display results print(f"\n[+] Found {len(results)} matches") for result in results: print(f"[!] {result['timestamp']} - {result['hostname']} - " f"{result['ioc_type']}: {result['ioc_value']} - " f"{result['file_name']}") # Export to CSV if requested if args.csv: csv_exporter = CSVExporter(args.csv) csv_exporter.write_header(list(results[0].keys()) if results else []) for result in results: csv_exporter.write_row(list(result.values())) csv_exporter.close() print(f"[+] Results exported to {args.csv}") if __name__ == '__main__': main() ``` ## 贡献 欢迎提交问题与拉取请求到 GitHub。 ## 许可证 请参阅 LICENSE 文件。 ## 免责声明 **本软件按“原样”提供,不附带任何形式的明示或暗示的保证,包括但不限于适销性、特定用途适用性和非侵权保证。在任何情况下,作者或版权持有人均不对任何索赔、损害或其他责任负责,无论是在合同行为、侵权行为或其他行为中,因使用或与本软件相关而引起的、从本软件中产生的或与本软件相关的软件使用或其他交易。
标签:AI SKILLs, AMP API, AMSI绕过, API v0, API v1, Cisco Secure Endpoint, IOC检测, PB级数据处理, Python, Windows 调试器, WINFINGER, 哈希匹配, 威胁情报, 威胁检测, 安全运维, 开发者工具, 数字取证, 无后门, 爬虫与收集, 端点取证, 端点安全, 端点监测, 网络安全, 自动化脚本, 补丁管理, 逆向工具, 隐私保护