Jack-star-dot/Sentinel-
GitHub: Jack-star-dot/Sentinel-
一款集成系统信息采集、端口扫描与 Claude AI 威胁分析的 Python 命令行安全助手,帮助用户快速评估主机安全态势并获得红蓝双视角的修复建议。
Stars: 0 | Forks: 0
# Sentinel-
# AI 驱动的个人安全助手,用于系统监控、日志分析和网络调查。
#!/usr/bin/env python3
"""
╔═══════════════════════════════════════════╗
║ SENTINEL SYSINFO v1.0 ║
║ Red Team + Blue Team AI Security Tool ║
╚═══════════════════════════════════════════╝
"""
import os
import sys
import socket
import platform
import subprocess
import json
import time
import datetime
import urllib.request
import urllib.error
# ─── ANSI Colors ────────────────────────────────────────────────────────────
R = "\033[91m" # red
G = "\033[92m" # green
Y = "\033[93m" # yellow
B = "\033[94m" # blue
M = "\033[95m" # magenta
C = "\033[96m" # cyan
W = "\033[97m" # white
DIM = "\033[2m"
BOLD = "\033[1m"
RESET = "\033[0m"
# ─── Progress Spinner ────────────────────────────────────────────────────────
def spin(msg, duration=1.5):
frames = ["⠋","⠙","⠹","⠸","⠼","⠴","⠦","⠧","⠇","⠏"]
end = time.time() + duration
i = 0
while time.time() < end:
print(f"\r{C}{frames[i % len(frames)]}{RESET} {msg}", end="", flush=True)
time.sleep(0.08)
i += 1
print(f"\r{G}✔{RESET} {msg}{' ' * 10}")
def section(title):
print(f"\n{B}{BOLD}{'─' * 50}{RESET}")
print(f"{B}{BOLD} {title}{RESET}")
print(f"{B}{'─' * 50}{RESET}")
# ─── System Info ─────────────────────────────────────────────────────────────
def get_system_info():
spin("正在收集系统信息...", 1.0)
info = {}
```
# 主机名和 IP
info["hostname"] = socket.gethostname()
try:
info["local_ip"] = socket.gethostbyname(info["hostname"])
except:
info["local_ip"] = "127.0.0.1"
# 尝试获取公网 IP
try:
with urllib.request.urlopen("https://api.ipify.org", timeout=3) as r:
info["public_ip"] = r.read().decode()
except:
info["public_ip"] = "Unavailable (offline)"
# OS 信息
info["os"] = platform.system()
info["os_release"] = platform.release()
info["os_version"] = platform.version()
info["architecture"] = platform.machine()
# CPU
try:
cpu_info = subprocess.check_output(
["grep", "-m1", "model name", "/proc/cpuinfo"],
stderr=subprocess.DEVNULL
).decode().split(":")[1].strip()
info["cpu_model"] = cpu_info
except:
info["cpu_model"] = platform.processor() or "Unknown"
try:
cpu_usage = subprocess.check_output(
["top", "-bn1"],
stderr=subprocess.DEVNULL
).decode()
for line in cpu_usage.split("\n"):
if "Cpu(s)" in line or "%Cpu" in line:
info["cpu_usage"] = line.strip()
break
else:
info["cpu_usage"] = "Unable to read"
except:
info["cpu_usage"] = "Unable to read"
# RAM
try:
mem = {}
with open("/proc/meminfo") as f:
for line in f:
parts = line.split()
if parts[0] in ("MemTotal:", "MemAvailable:", "MemFree:"):
mem[parts[0].rstrip(":")] = int(parts[1]) // 1024 # MB
total_gb = round(mem.get("MemTotal", 0) / 1024, 1)
avail_gb = round(mem.get("MemAvailable", 0) / 1024, 1)
used_gb = round(total_gb - avail_gb, 1)
pct = round((used_gb / total_gb) * 100) if total_gb else 0
info["ram_total"] = f"{total_gb} GB"
info["ram_used"] = f"{used_gb} GB"
info["ram_available"] = f"{avail_gb} GB"
info["ram_percent"] = f"{pct}%"
except:
info["ram_total"] = info["ram_used"] = info["ram_available"] = "Unknown"
info["ram_percent"] = "?"
# 运行时间
try:
with open("/proc/uptime") as f:
secs = float(f.read().split()[0])
h = int(secs // 3600)
m = int((secs % 3600) // 60)
info["uptime"] = f"{h}h {m}m"
except:
info["uptime"] = "Unknown"
# 已登录用户
try:
users = subprocess.check_output(["who"], stderr=subprocess.DEVNULL).decode().strip()
info["logged_users"] = users if users else "None"
except:
info["logged_users"] = "Unknown"
# 开放网络连接数
try:
conns = subprocess.check_output(
["ss", "-tunp"],
stderr=subprocess.DEVNULL
).decode().strip().split("\n")
info["open_connections"] = str(max(0, len(conns) - 1))
except:
info["open_connections"] = "Unknown"
return info
```
# ─── Port Scan ───────────────────────────────────────────────────────────────
def run_port_scan(target_ip):
spin(f"正在对 {target_ip} 运行端口扫描...", 1.0)
```
# 首先尝试 python-nmap
try:
import nmap
nm = nmap.PortScanner()
nm.scan(target_ip, "1-1024", arguments="-sV --open -T4")
results = []
for host in nm.all_hosts():
for proto in nm[host].all_protocols():
ports = nm[host][proto].keys()
for port in sorted(ports):
s = nm[host][proto][port]
if s["state"] == "open":
results.append({
"port": port,
"protocol": proto,
"service": s.get("name", "unknown"),
"version": s.get("version", ""),
"product": s.get("product", ""),
})
return results, "nmap"
except ImportError:
pass
except Exception as e:
pass
# 备用方案:raw socket 扫描
spin(f"nmap unavailable — using socket scanner on {target_ip}...", 0.5)
results = []
common_ports = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP",
53: "DNS", 80: "HTTP", 110: "POP3", 143: "IMAP",
443: "HTTPS", 445: "SMB", 993: "IMAPS", 995: "POP3S",
3306: "MySQL", 3389: "RDP", 5432: "PostgreSQL",
6379: "Redis", 8080: "HTTP-Alt", 8443: "HTTPS-Alt",
27017: "MongoDB", 5900: "VNC",
}
for port, service in common_ports.items():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.5)
result = s.connect_ex((target_ip, port))
s.close()
if result == 0:
results.append({
"port": port,
"protocol": "tcp",
"service": service,
"version": "",
"product": "",
})
except:
pass
return results, "socket"
```
# ─── Display System Info ─────────────────────────────────────────────────────
def display_sysinfo(info):
section("📊 系统信息")
rows = [
("主机名", info["hostname"]),
("本地 IP", info["local_ip"]),
("公网 IP", info["public_ip"]),
("操作系统", f"{info['os']} {info['os_release']}"),
("系统架构", info["architecture"]),
("CPU", info["cpu_model"]),
("CPU 使用率", info["cpu_usage"]),
("内存总量", info["ram_total"]),
("已用内存", f"{info['ram_used']} ({info['ram_percent']})"),
("可用内存", info["ram_available"]),
("运行时间", info["uptime"]),
("已登录用户", info["logged_users"]),
("开放连接数", info["open_connections"]),
]
for label, value in rows:
print(f" {DIM}{label:<20}{RESET} {W}{value}{RESET}")
# ─── Display Port Scan ───────────────────────────────────────────────────────
def display_ports(ports, scan_method):
section(f"🔍 端口扫描结果 {DIM}(方法: {scan_method}){RESET}")
if not ports:
print(f" {G}✔ 在扫描范围内未检测到开放端口。{RESET}")
return
print(f" {Y}{'PORT':<8}{'PROTO':<8}{'SERVICE':<16}{'PRODUCT/VERSION'}{RESET}")
print(f" {'─'*54}")
for p in ports:
ver = f"{p['product']} {p['version']}".strip()
color = R if p["service"] in ("telnet","ftp","smb","rdp","vnc") else W
print(f" {color}{p['port']:<8}{p['protocol']:<8}{p['service']:<16}{ver}{RESET}")
# ─── AI Analysis ─────────────────────────────────────────────────────────────
def ai_analysis(system_info, open_ports, api_key):
section("🤖 AI 安全分析 (Red Team + Blue Team)")
print(f" {DIM}正在发送数据至 Claude 进行威胁分析...{RESET}\n")
```
ports_summary = []
for p in open_ports:
ver = f"{p['product']} {p['version']}".strip()
ports_summary.append(
f"Port {p['port']}/{p['protocol']} - {p['service']}" +
(f" ({ver})" if ver else "")
)
prompt = f"""You are SENTINEL, an elite AI security analyst who thinks like both a red team attacker and a blue team defender simultaneously.
```
系统扫描数据:
- 主机名: {system_info['hostname']}
- 本地 IP: {system_info['local_ip']}
- 公网 IP: {system_info['public_ip']}
- 操作系统: {system_info['os']} {system_info['os_release']}
- 架构: {system_info['architecture']}
- 内存: {system_info['ram_total']} 总计, {system_info['ram_used']} 已使用 ({system_info['ram_percent']})
- 运行时间: {system_info['uptime']}
- 已登录用户: {system_info['logged_users']}
- 开放的网络连接: {system_info['open_connections']}
- 发现的开放端口: {chr(10).join(ports_summary) if ports_summary else "未检测到"}
请在以下确切的章节中提供你的分析:
🔴 RED TEAM 视角
像攻击者一样思考。你会首先攻击什么?根据你所看到的信息列出 3-5 个具体的攻击向量。要具体 —— 提及端口号、服务和真实的攻击技术(例如 SSH 暴力破解、SMB 中继、FTP 匿名登录)。如果没有开放端口,请对操作系统级别和网络级别的攻击面进行评估。
🔵 BLUE TEAM 视角
像防御者一样思考。针对 Red Team 识别出的每个风险,给出一个具体的修复方案。要具体 —— 提供在 Linux 上使用的实际命令、设置或工具。不要给出模糊的建议。
⚡ 优先行动清单
列出当前需要修复的前 3 项,按严重程度排序。每项一行。直接明了。
🧠 SENTINEL 裁决
用一段话总结该系统的整体安全态势。给出诚实客观的评估。
语气要犀利、专业、直接。不要废话。这是一份真实的安全报告。"""
```
payload = json.dumps({
"model": "claude-sonnet-4-20250514",
"max_tokens": 1500,
"messages": [{"role": "user", "content": prompt}]
}).encode()
req = urllib.request.Request(
"https://api.anthropic.com/v1/messages",
data=payload,
headers={
"Content-Type": "application/json",
"x-api-key": api_key,
"anthropic-version": "2023-06-01",
},
method="POST"
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode())
analysis = data["content"][0]["text"]
print(analysis)
except urllib.error.HTTPError as e:
body = e.read().decode()
print(f"{R}API Error {e.code}: {body}{RESET}")
except Exception as e:
print(f"{R}Failed to reach AI: {e}{RESET}")
print(f"\n{Y}Tip: Make sure your ANTHROPIC_API_KEY environment variable is set.{RESET}")
print(f"{DIM} export ANTHROPIC_API_KEY='your-key-here'{RESET}")
```
# ─── Save Report ──────────────────────────────────────────────────────────────
def save_report(system_info, open_ports):
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"sentinel_report_{ts}.json"
report = {
"scan_time": ts,
"system_info": system_info,
"open_ports": open_ports,
}
try:
with open(filename, "w") as f:
json.dump(report, f, indent=2)
print(f"\n {G}✔ 报告已保存:{RESET} {filename}")
except Exception as e:
print(f"\n {Y}⚠ 无法保存报告: {e}{RESET}")
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
banner()
```
# 获取 API 密钥
api_key = os.environ.get("ANTHROPIC_API_KEY", "")
if not api_key:
print(f"{Y}⚠ ANTHROPIC_API_KEY not set in environment.{RESET}")
print(f"{DIM} AI analysis will be skipped. Set it with:{RESET}")
print(f"{DIM} export ANTHROPIC_API_KEY='your-key-here'{RESET}\n")
# 目标选择
print(f"{W}Scan target:{RESET}")
print(f" {DIM}[1]{RESET} Localhost (127.0.0.1) {DIM}— safe, scans your own machine{RESET}")
print(f" {DIM}[2]{RESET} Local IP {DIM}— scans your LAN IP{RESET}")
print(f" {DIM}[3]{RESET} Custom IP {DIM}— enter manually{RESET}")
print()
choice = input(f"{C}Choose [1/2/3]: {RESET}").strip()
if choice == "1":
target = "127.0.0.1"
elif choice == "2":
try:
target = socket.gethostbyname(socket.gethostname())
except:
target = "127.0.0.1"
elif choice == "3":
target = input(f"{C}Enter IP address: {RESET}").strip()
if not target:
target = "127.0.0.1"
else:
target = "127.0.0.1"
print(f"\n{G} Target: {BOLD}{target}{RESET}\n")
# ── 运行扫描
system_info = get_system_info()
open_ports, scan_method = run_port_scan(target)
# ── 显示结果
display_sysinfo(system_info)
display_ports(open_ports, scan_method)
# ── AI Analysis
if api_key:
ai_analysis(system_info, open_ports, api_key)
else:
section("🤖 AI ANALYSIS")
print(f" {DIM}Skipped — set ANTHROPIC_API_KEY to enable.{RESET}")
# ── Save report
section("💾 SAVING REPORT")
save_report(system_info, open_ports)
print(f"\n{C}{BOLD} SENTINEL scan complete.{RESET}\n")
```
if __name__ == "__main__":
main()
标签:AMSI绕过, HTTP/HTTPS抓包, Python, 人工智能, 后渗透, 威胁检测, 安全运营, 态势感知, 扫描框架, 数据展示, 无后门, 无线安全, 流量嗅探, 用户模式Hook绕过, 系统信息收集, 红队, 结构化查询, 网络安全, 网络安全审计, 网络调查, 自动化安全, 隐私保护