Jack-star-dot/Sentinel-

GitHub: Jack-star-dot/Sentinel-

一款集成系统信息采集、端口扫描与 Claude AI 威胁分析的 Python 命令行安全助手,帮助用户快速评估主机安全态势并获得红蓝双视角的修复建议。

Stars: 0 | Forks: 0

# Sentinel- # AI 驱动的个人安全助手,用于系统监控、日志分析和网络调查。 #!/usr/bin/env python3 """ ╔═══════════════════════════════════════════╗ ║ SENTINEL SYSINFO v1.0 ║ ║ Red Team + Blue Team AI Security Tool ║ ╚═══════════════════════════════════════════╝ """ import os import sys import socket import platform import subprocess import json import time import datetime import urllib.request import urllib.error # ─── ANSI Colors ──────────────────────────────────────────────────────────── R = "\033[91m" # red G = "\033[92m" # green Y = "\033[93m" # yellow B = "\033[94m" # blue M = "\033[95m" # magenta C = "\033[96m" # cyan W = "\033[97m" # white DIM = "\033[2m" BOLD = "\033[1m" RESET = "\033[0m" # ─── Progress Spinner ──────────────────────────────────────────────────────── def spin(msg, duration=1.5): frames = ["⠋","⠙","⠹","⠸","⠼","⠴","⠦","⠧","⠇","⠏"] end = time.time() + duration i = 0 while time.time() < end: print(f"\r{C}{frames[i % len(frames)]}{RESET} {msg}", end="", flush=True) time.sleep(0.08) i += 1 print(f"\r{G}✔{RESET} {msg}{' ' * 10}") def section(title): print(f"\n{B}{BOLD}{'─' * 50}{RESET}") print(f"{B}{BOLD} {title}{RESET}") print(f"{B}{'─' * 50}{RESET}") # ─── System Info ───────────────────────────────────────────────────────────── def get_system_info(): spin("正在收集系统信息...", 1.0) info = {} ``` # 主机名和 IP info["hostname"] = socket.gethostname() try: info["local_ip"] = socket.gethostbyname(info["hostname"]) except: info["local_ip"] = "127.0.0.1" # 尝试获取公网 IP try: with urllib.request.urlopen("https://api.ipify.org", timeout=3) as r: info["public_ip"] = r.read().decode() except: info["public_ip"] = "Unavailable (offline)" # OS 信息 info["os"] = platform.system() info["os_release"] = platform.release() info["os_version"] = platform.version() info["architecture"] = platform.machine() # CPU try: cpu_info = subprocess.check_output( ["grep", "-m1", "model name", "/proc/cpuinfo"], stderr=subprocess.DEVNULL ).decode().split(":")[1].strip() info["cpu_model"] = cpu_info except: info["cpu_model"] = platform.processor() or "Unknown" try: cpu_usage = subprocess.check_output( ["top", "-bn1"], stderr=subprocess.DEVNULL ).decode() for line in cpu_usage.split("\n"): if "Cpu(s)" in line or "%Cpu" in line: info["cpu_usage"] = line.strip() break else: info["cpu_usage"] = "Unable to read" except: info["cpu_usage"] = "Unable to read" # RAM try: mem = {} with open("/proc/meminfo") as f: for line in f: parts = line.split() if parts[0] in ("MemTotal:", "MemAvailable:", "MemFree:"): mem[parts[0].rstrip(":")] = int(parts[1]) // 1024 # MB total_gb = round(mem.get("MemTotal", 0) / 1024, 1) avail_gb = round(mem.get("MemAvailable", 0) / 1024, 1) used_gb = round(total_gb - avail_gb, 1) pct = round((used_gb / total_gb) * 100) if total_gb else 0 info["ram_total"] = f"{total_gb} GB" info["ram_used"] = f"{used_gb} GB" info["ram_available"] = f"{avail_gb} GB" info["ram_percent"] = f"{pct}%" except: info["ram_total"] = info["ram_used"] = info["ram_available"] = "Unknown" info["ram_percent"] = "?" # 运行时间 try: with open("/proc/uptime") as f: secs = float(f.read().split()[0]) h = int(secs // 3600) m = int((secs % 3600) // 60) info["uptime"] = f"{h}h {m}m" except: info["uptime"] = "Unknown" # 已登录用户 try: users = subprocess.check_output(["who"], stderr=subprocess.DEVNULL).decode().strip() info["logged_users"] = users if users else "None" except: info["logged_users"] = "Unknown" # 开放网络连接数 try: conns = subprocess.check_output( ["ss", "-tunp"], stderr=subprocess.DEVNULL ).decode().strip().split("\n") info["open_connections"] = str(max(0, len(conns) - 1)) except: info["open_connections"] = "Unknown" return info ``` # ─── Port Scan ─────────────────────────────────────────────────────────────── def run_port_scan(target_ip): spin(f"正在对 {target_ip} 运行端口扫描...", 1.0) ``` # 首先尝试 python-nmap try: import nmap nm = nmap.PortScanner() nm.scan(target_ip, "1-1024", arguments="-sV --open -T4") results = [] for host in nm.all_hosts(): for proto in nm[host].all_protocols(): ports = nm[host][proto].keys() for port in sorted(ports): s = nm[host][proto][port] if s["state"] == "open": results.append({ "port": port, "protocol": proto, "service": s.get("name", "unknown"), "version": s.get("version", ""), "product": s.get("product", ""), }) return results, "nmap" except ImportError: pass except Exception as e: pass # 备用方案:raw socket 扫描 spin(f"nmap unavailable — using socket scanner on {target_ip}...", 0.5) results = [] common_ports = { 21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP", 53: "DNS", 80: "HTTP", 110: "POP3", 143: "IMAP", 443: "HTTPS", 445: "SMB", 993: "IMAPS", 995: "POP3S", 3306: "MySQL", 3389: "RDP", 5432: "PostgreSQL", 6379: "Redis", 8080: "HTTP-Alt", 8443: "HTTPS-Alt", 27017: "MongoDB", 5900: "VNC", } for port, service in common_ports.items(): try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(0.5) result = s.connect_ex((target_ip, port)) s.close() if result == 0: results.append({ "port": port, "protocol": "tcp", "service": service, "version": "", "product": "", }) except: pass return results, "socket" ``` # ─── Display System Info ───────────────────────────────────────────────────── def display_sysinfo(info): section("📊 系统信息") rows = [ ("主机名", info["hostname"]), ("本地 IP", info["local_ip"]), ("公网 IP", info["public_ip"]), ("操作系统", f"{info['os']} {info['os_release']}"), ("系统架构", info["architecture"]), ("CPU", info["cpu_model"]), ("CPU 使用率", info["cpu_usage"]), ("内存总量", info["ram_total"]), ("已用内存", f"{info['ram_used']} ({info['ram_percent']})"), ("可用内存", info["ram_available"]), ("运行时间", info["uptime"]), ("已登录用户", info["logged_users"]), ("开放连接数", info["open_connections"]), ] for label, value in rows: print(f" {DIM}{label:<20}{RESET} {W}{value}{RESET}") # ─── Display Port Scan ─────────────────────────────────────────────────────── def display_ports(ports, scan_method): section(f"🔍 端口扫描结果 {DIM}(方法: {scan_method}){RESET}") if not ports: print(f" {G}✔ 在扫描范围内未检测到开放端口。{RESET}") return print(f" {Y}{'PORT':<8}{'PROTO':<8}{'SERVICE':<16}{'PRODUCT/VERSION'}{RESET}") print(f" {'─'*54}") for p in ports: ver = f"{p['product']} {p['version']}".strip() color = R if p["service"] in ("telnet","ftp","smb","rdp","vnc") else W print(f" {color}{p['port']:<8}{p['protocol']:<8}{p['service']:<16}{ver}{RESET}") # ─── AI Analysis ───────────────────────────────────────────────────────────── def ai_analysis(system_info, open_ports, api_key): section("🤖 AI 安全分析 (Red Team + Blue Team)") print(f" {DIM}正在发送数据至 Claude 进行威胁分析...{RESET}\n") ``` ports_summary = [] for p in open_ports: ver = f"{p['product']} {p['version']}".strip() ports_summary.append( f"Port {p['port']}/{p['protocol']} - {p['service']}" + (f" ({ver})" if ver else "") ) prompt = f"""You are SENTINEL, an elite AI security analyst who thinks like both a red team attacker and a blue team defender simultaneously. ``` 系统扫描数据: - 主机名: {system_info['hostname']} - 本地 IP: {system_info['local_ip']} - 公网 IP: {system_info['public_ip']} - 操作系统: {system_info['os']} {system_info['os_release']} - 架构: {system_info['architecture']} - 内存: {system_info['ram_total']} 总计, {system_info['ram_used']} 已使用 ({system_info['ram_percent']}) - 运行时间: {system_info['uptime']} - 已登录用户: {system_info['logged_users']} - 开放的网络连接: {system_info['open_connections']} - 发现的开放端口: {chr(10).join(ports_summary) if ports_summary else "未检测到"} 请在以下确切的章节中提供你的分析: 🔴 RED TEAM 视角 像攻击者一样思考。你会首先攻击什么?根据你所看到的信息列出 3-5 个具体的攻击向量。要具体 —— 提及端口号、服务和真实的攻击技术(例如 SSH 暴力破解、SMB 中继、FTP 匿名登录)。如果没有开放端口,请对操作系统级别和网络级别的攻击面进行评估。 🔵 BLUE TEAM 视角 像防御者一样思考。针对 Red Team 识别出的每个风险,给出一个具体的修复方案。要具体 —— 提供在 Linux 上使用的实际命令、设置或工具。不要给出模糊的建议。 ⚡ 优先行动清单 列出当前需要修复的前 3 项,按严重程度排序。每项一行。直接明了。 🧠 SENTINEL 裁决 用一段话总结该系统的整体安全态势。给出诚实客观的评估。 语气要犀利、专业、直接。不要废话。这是一份真实的安全报告。""" ``` payload = json.dumps({ "model": "claude-sonnet-4-20250514", "max_tokens": 1500, "messages": [{"role": "user", "content": prompt}] }).encode() req = urllib.request.Request( "https://api.anthropic.com/v1/messages", data=payload, headers={ "Content-Type": "application/json", "x-api-key": api_key, "anthropic-version": "2023-06-01", }, method="POST" ) try: with urllib.request.urlopen(req, timeout=30) as resp: data = json.loads(resp.read().decode()) analysis = data["content"][0]["text"] print(analysis) except urllib.error.HTTPError as e: body = e.read().decode() print(f"{R}API Error {e.code}: {body}{RESET}") except Exception as e: print(f"{R}Failed to reach AI: {e}{RESET}") print(f"\n{Y}Tip: Make sure your ANTHROPIC_API_KEY environment variable is set.{RESET}") print(f"{DIM} export ANTHROPIC_API_KEY='your-key-here'{RESET}") ``` # ─── Save Report ────────────────────────────────────────────────────────────── def save_report(system_info, open_ports): ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"sentinel_report_{ts}.json" report = { "scan_time": ts, "system_info": system_info, "open_ports": open_ports, } try: with open(filename, "w") as f: json.dump(report, f, indent=2) print(f"\n {G}✔ 报告已保存:{RESET} {filename}") except Exception as e: print(f"\n {Y}⚠ 无法保存报告: {e}{RESET}") # ─── Main ───────────────────────────────────────────────────────────────────── def main(): banner() ``` # 获取 API 密钥 api_key = os.environ.get("ANTHROPIC_API_KEY", "") if not api_key: print(f"{Y}⚠ ANTHROPIC_API_KEY not set in environment.{RESET}") print(f"{DIM} AI analysis will be skipped. Set it with:{RESET}") print(f"{DIM} export ANTHROPIC_API_KEY='your-key-here'{RESET}\n") # 目标选择 print(f"{W}Scan target:{RESET}") print(f" {DIM}[1]{RESET} Localhost (127.0.0.1) {DIM}— safe, scans your own machine{RESET}") print(f" {DIM}[2]{RESET} Local IP {DIM}— scans your LAN IP{RESET}") print(f" {DIM}[3]{RESET} Custom IP {DIM}— enter manually{RESET}") print() choice = input(f"{C}Choose [1/2/3]: {RESET}").strip() if choice == "1": target = "127.0.0.1" elif choice == "2": try: target = socket.gethostbyname(socket.gethostname()) except: target = "127.0.0.1" elif choice == "3": target = input(f"{C}Enter IP address: {RESET}").strip() if not target: target = "127.0.0.1" else: target = "127.0.0.1" print(f"\n{G} Target: {BOLD}{target}{RESET}\n") # ── 运行扫描 system_info = get_system_info() open_ports, scan_method = run_port_scan(target) # ── 显示结果 display_sysinfo(system_info) display_ports(open_ports, scan_method) # ── AI Analysis if api_key: ai_analysis(system_info, open_ports, api_key) else: section("🤖 AI ANALYSIS") print(f" {DIM}Skipped — set ANTHROPIC_API_KEY to enable.{RESET}") # ── Save report section("💾 SAVING REPORT") save_report(system_info, open_ports) print(f"\n{C}{BOLD} SENTINEL scan complete.{RESET}\n") ``` if __name__ == "__main__": main()
标签:AMSI绕过, HTTP/HTTPS抓包, Python, 人工智能, 后渗透, 威胁检测, 安全运营, 态势感知, 扫描框架, 数据展示, 无后门, 无线安全, 流量嗅探, 用户模式Hook绕过, 系统信息收集, 红队, 结构化查询, 网络安全, 网络安全审计, 网络调查, 自动化安全, 隐私保护