wrhalpin/GNAT
GitHub: wrhalpin/GNAT
GNAT 是一个覆盖 159 个安全平台的威胁情报统一抽象层,通过双向 STIX 2.1 转换和统一 ORM 接口,让安全团队无需重写代码即可在任意平台间摄取、关联、分发情报并自动化整个运营流程。
Stars: 0 | Forks: 0
[](https://github.com/wrhalpin/GNAT)
[](https://python.org)
[](LICENSE)
[](https://github.com/wrhalpin/GNAT/actions/workflows/pylint.yml)
[](#running-tests)
[](pyproject.toml)
[](https://oasis-open.github.io/cti-documentation/stix/intro.html)
GNAT 提供了一个统一且一致的抽象层,覆盖了 159 个平台——包括威胁情报平台、SIEM、EDR、漏洞扫描器、SOAR 工具、网络传感器、AI 助手、云安全态势产品等。每个连接器都实现了相同的接口和双向 STIX 2.1 转换,使得自动化具备可移植性:您可以切换平台、增加来源或替换工具,而无需重写 pipeline、调度计划或报告。
```
[ 159 Platforms ] → GNATClient → STIX 2.1 ORM → Ingest / Export / Report / Schedule / Research
```
## 目录
- [核心功能](#key-capabilities)
- [支持的平台](#supported-platforms)
- [安装](#installation)
- [快速开始](#quick-start)
- [核心概念](#core-concepts)
- [摄取 Pipeline](#ingest-pipelines)
- [导出 Pipeline](#export-pipelines)
- [调度](#scheduling)
- [AI 代理与研究库](#ai-agents--research-library)
- [自然语言查询](#natural-language-queries)
- [行业目标情报](#sector-targeting-intelligence)
- [自动化报告](#automated-reports)
- [Solr 搜索 Sidecar](#solr-search-sidecar)
- [TAXII 2.1 服务器](#taxii-21-server)
- [STIX Pattern 验证](#stix-pattern-validation)
- [多租户部署](#multi-tenant-deployments)
- [终端 UI 与 Web 仪表板](#terminal-ui--web-dashboard)
- [Docker 与容器化](#docker--containerization)
- [连接器功能与代码生成](#connector-capabilities--code-generation)
- [质量与安全](#quality--security)
- [项目结构](#project-structure)
- [开发](#development)
- [架构](#architecture)
- [许可证](#license)
## 核心功能
| 层级 | 功能说明 |
|-------|-------------|
| **159 个连接器** | 统一的 CRUD + 双向 STIX 2.1 转换,适用于所有支持的平台 |
| **STIX 2.1 ORM** | Indicator、ThreatActor、Vulnerability、Malware、AttackPattern、Relationship、Observables |
| **摄取 Pipeline** | 15 个源读取器 × 13 个映射器;从任意平台、文件、订阅源、数据库或 Kafka topic 拉取数据 |
| **导出 Pipeline** | EDL 文件、Netskope CE、STIX bundle、CSV;可配置的过滤器 + 转换器 + 投递 |
| **FeedScheduler** | 所有作业类型的漂移修正 cron 调度;APScheduler/Celery 适配器 |
| **AI 代理** | ResearchAgent (Claude)、ParsingAgent (从文本中提取 STIX)、CopilotReader (M365);quality/、security/ 和 repo_maintenance/ 子代理包 |
| **NLP 查询** | 自然语言查询引擎——内置基于规则或由 Claude 支持的结构化提取 |
| **研究库** | 团队知识库,具备暂存/策划工作流、TTL 管理和去重功能 |
| **自动化报告** | PDF、HTML、DOCX、Markdown;按日/周/年生成;AI 辅助综合;通过电子邮件 + SharePoint 投递 |
| **行业情报** | 所有连接器统一实现 `x_target_sectors`,支持别名扩展和可组合过滤器 |
| **分析层** | NATO Admiralty Scale 置信度评分、TLP 2.0、分析师调查、跨平台关联、时间线重建、证据图查询、基础设施角色分类,以及基于 LLM 的盲区检测 + 报告起草 |
| **归因与攻击活动** | 攻击活动生命周期 (SUSPECTED → ACTIVE → DORMANT → CONCLUDED)、Diamond Model (ACIV)、杀伤链进展跟踪、带有 Admiralty Scale 评分的竞争性归因假设、具有能力矩阵的攻击者画像、集群到攻击活动的提升 |
| **调查构建器** | 从任意组合的已连接平台中,执行五步跨平台证据图 pipeline (seed → incident expansion → normalise → correlate → materialise) |
| **HuntGNAT** | STIX Pattern → 检测规则转换 (Sigma、YARA、Suricata、Snort),具有生命周期管理的狩猎包,ATT&CK 覆盖矩阵,带有漂移检测的部署跟踪,验证评分 |
| **遥测摄取** | 从 Kafka topic 获取大量传感器数据 (蜜罐、netflow、IDS 告警、DNS 日志);基于 Redis 的去重;自动将摄取的 Indicator 关联到攻击活动 |
| **规则引擎** | 三引擎假设评估 (Hy/Lisp、YAML 声明式 DSL、Prolog 逻辑);26 个分析师可编写 helper 谓词;基于优先级的首选匹配及审计追踪;AI-60 置信度上限;功能标志默认为 OFF |
| **情报报告** | 结构化的最终情报产品,具备五状态生命周期 (DRAFT → PUBLISHED),发布时导出 STIX 2.1 SDO,支持版本控制和归因 |
| **分发** | ExportService (STIX/JSON/PDF)、带有 HMAC 签名的 webhook 扇出、TAXII 2.1 服务器、Bearer token REST 网关 |
| **Solr 搜索** | 全文搜索 sidecar;写入时自动索引;Grafana SimpleJSON 仪表板 |
| **TAXII 2.1 服务器** | 每个 GNAT 工作空间作为一个 TAXII 2.1 集合公开;完全兼容协议 |
| **STIX 验证器** | 双层 pattern 验证器 (内置 + ANTLR `stix2-patterns`);ORM `validate=True` 选择启用 |
| **多租户** | 为 MSP 部署提供透明的工作空间命名空间隔离 |
| **终端 UI** | Textual TUI——通过 SSH 工作;NLP 查询、库浏览器、调度器、报告查看器 |
| **Web 仪表板** | FastAPI SPA——API 密钥认证、速率限制、研究库 + 报告 + 调度器 |
| **Docker** | 3 服务 Compose 技术栈;用于 VS Code/Codespaces 的 DevContainer;Docker 集成测试工具 |
| **能力反射** | 在运行时内省任何连接器的方法;通过 `call()` 安全守护调度 |
| **健康监控** | `ConnectorHealthJob`——定期 ping + schema 漂移检测,附带 Slack/电子邮件告警 |
| **XSOAR 包生成器** | 通过 `gnat codegen xsoar` 从任意连接器生成 XSOAR 6 内容包 zip |
| **贡献 Pipeline** | 7 步合规门 → 通过 `gnat contribute` 起草 GitHub PR |
| **Rust 扩展** | 可选的 `gnat._core`,用于热路径 IOC 分类/脱敏(Defang)/重脱敏(Refang)/extract_pattern_value |
| **5,100+ 测试** | 强制执行 70% 的最低覆盖率;针对 Elasticsearch + Solr 的 Docker 集成测试工具 |
## 支持的平台
### 威胁情报平台
| Key | 平台 | 认证 |
|-----|----------|------|
| `threatq` | ThreatQ Threat Intelligence Platform | OAuth2 |
| `crowdstrike` | CrowdStrike Falcon | OAuth2 |
| `recordedfuture` | Recorded Future Connect API | API key |
| `alienvault` | AlienVault OTX | API key |
| `virustotal` | VirusTotal | API key |
| `shadowserver` | Shadowserver Foundation | API key |
| `feedly` | Feedly Threat Intelligence | Bearer / API key |
| `threatconnect` | ThreatConnect | OAuth2 / API token |
| `mandiant` | Mandiant Advantage | OAuth2 |
| `defenderti` | Microsoft Defender Threat Intelligence | OAuth2 (Azure AD) |
| `threatstream` | Anomali ThreatStream (OPTIC) | API key + username |
| `socradar` | SOCRadar Extended Threat Intelligence | API key |
| `pulsedive` | Pulsedive | API key |
| `flare` | Flare (Darknet/Threat Exposure Monitoring) | API key |
| `yeti` | YETI (Your Everyday Threat Intelligence) | API key |
| `cloudsek` | CloudSEK Digital Risk Protection | API key |
| `zerofox` | ZeroFox Digital Risk Protection | Bearer |
| `group_ib` | Group-IB Threat Intelligence | API key |
| `cyble_vision` | Cyble Vision | API key |
| `flashpoint` | Flashpoint Underground / Dark Web CTI | Bearer |
| `hudsonrock` | Hudson Rock Breach Intelligence | API key |
| `intel471` | Intel 471 Cybercrime Intelligence | Bearer |
| `misp` | MISP Threat Sharing Platform | API key |
| `opencti` | OpenCTI | API key |
| `hibp` | Have I Been Pwned (HIBP) | API key |
| `synapse` | Vertex Project Synapse | API key / Bearer |
| `osint_feed` | Generic OSINT Feed (TAXII 2.x / STIX-JSON) | None / Basic / API key / Bearer / OAuth2 |
| `mitre_attack` | MITRE ATT&CK (TAXII 2.1) | None (public) |
| `abusech` | Abuse.ch (URLhaus / MalwareBazaar / ThreatFox / Feodo / SSLBL) | Optional Auth-Key |
| `cloudflare_intel` | Cloudflare Threat Intelligence | Bearer token + account_id |
| `cofense_intel` | Cofense Intelligence (human-verified phishing) | HTTP Basic |
| `trm_labs` | TRM Labs (blockchain / crypto intel) | API key (Basic, empty password) |
| `talos` | Cisco Talos Intelligence | None (public reputation) |
| `fortiguard` | Fortinet FortiGuard Labs | Optional Bearer (IOC service) |
| `kaspersky_opentip` | Kaspersky OpenTIP | Optional `x-api-key` |
| `eset_ti` | ESET Threat Intelligence | Bearer token |
| `bitdefender_iz` | Bitdefender IntelliZone | `X-API-Key` header |
| `abuseipdb` | AbuseIPDB community IP reputation | `Key` header |
| `project_honey_pot` | Project Honey Pot http:BL | http:BL access key (DNS-based) |
### SIEMs & 日志分析
| Key | 平台 | 认证 |
|-----|----------|------|
| `splunk` | Splunk Enterprise / Splunk ES | Token / Basic |
| `elastic` | Elastic SIEM / Security | API key / Basic |
| `qradar` | IBM QRadar | API token |
| `sentinel` | Microsoft Sentinel | OAuth2 (Azure AD) |
| `graylog` | Graylog | API key / Basic |
| `ossim` | OSSIM / AlienVault SIEM | API key |
| `security_onion` | Security Onion | Bearer |
| `wazuh` | Wazuh SIEM/XDR | API key / Basic |
| `google_chronicle` | Google Chronicle (SecOps SIEM) | Service account / API key |
| `logrhythm` | LogRhythm NextGen SIEM | Bearer / OAuth2 |
| `datadog` | Datadog Cloud SIEM / Security Monitoring | API key + App key |
| `cribl` | Cribl Stream / Edge (Data Pipeline) | Bearer / username+password |
### SOAR & 事件响应
| Key | 平台 | 认证 |
|-----|----------|------|
| `xsoar` | Palo Alto XSOAR 6 | API key |
| `thehive` | TheHive Security Incident Response | API key |
| `greymatter` | GreyMatter | OAuth2 |
| `servicenow` | ServiceNow ITSM / SecOps | Basic / Bearer |
| `servicenow_secops` | ServiceNow SecOps (SIR + VR + TIARA) | Basic / Bearer |
| `jira` | Atlassian Jira | Basic / Bearer |
| `fortisoar` | Fortinet FortiSOAR | JWT / Basic |
### 网络检测与
| Key | 平台 | 认证 |
|-----|----------|------|
| `snort` | Snort IDS | File / Syslog |
| `suricata` | Suricata IDS/IPS | File / Syslog |
| `zeek` | Zeek Network Monitor | File / Syslog |
| `vectra` | Vectra AI NDR (Network Detection & Response) | API token |
| `extrahop` | ExtraHop Reveal(x) NDR | API key / OAuth2 |
| `darktrace` | Darktrace Enterprise Immune System | HMAC public/private key |
| `nozomi` | Nozomi Networks Guardian / Vantage (OT/IoT) | API token / Basic |
| `dragos` | Dragos Platform (OT/ICS Threat Intelligence) | Basic (API key + secret) |
| `cisco_umbrella` | Cisco Umbrella (Investigate / Enforcement / Management) | Multiple API keys (Investigate, Enforcement, Management) |
### 漏洞管理
| Key | 平台 | 认证 |
|-----|----------|------|
| `rapid7` | Rapid7 InsightVM / InsightIDR | API key |
| `nucleus` | Nucleus Security | API key |
| `tenable_one` | Tenable One Exposure Management | X-ApiKeys |
| `qualys` | Qualys VMDR | Basic |
| `greenbone` | Greenbone / OpenVAS | GMP username/password |
| `defectdojo` | DefectDojo Vulnerability Management | API token |
| `osv` | OSV.dev (open-source vulnerabilities) | None (public) |
| `vulncheck` | VulnCheck (exploit intelligence) | Bearer token |
### 云安全与 ASM
| Key | 平台 | 认证 |
|-----|----------|------|
| `orca` | Orca Security (Agentless CNAPP) | Bearer |
| `wiz` | Wiz CNAPP | OAuth2 |
| `cortex_xpanse` | Cortex Xpanse (External ASM) | API key |
| `cortex_xdr` | Palo Alto Cortex XDR / XSIAM | API key pair (HMAC-signed) |
| `prisma_cloud` | Palo Alto Prisma Cloud (CSPM/CNAPP) | Access key + secret (JWT) |
| `cycognito` | CyCognito ASM | Bearer |
| `riskrecon` | RiskRecon | OAuth2 |
| `censys` | Censys Internet Intelligence / ASM | API ID + secret |
| `bitsight` | BitSight Security Ratings & Vendor Risk | API token |
| `upguard` | UpGuard Vendor Risk + CAASM + DRP | API key |
| `aws_security` | AWS Security Hub / GuardDuty | AWS SigV4 (access key + secret) |
| `securityscorecard` | SecurityScorecard Security Ratings | API token |
| `jupiterone` | JupiterOne (CAASM / Cyber Asset Graph) | Bearer (API key) |
| `runzero` | runZero (CAASM asset inventory) | Organization Export token (Bearer) |
| `securitytrails` | SecurityTrails (passive DNS / WHOIS history) | API key (`APIKEY` header) |
| `domaintools` | DomainTools Iris (WHOIS / hosting history / pivoting) | API username + API key |
| `silent_push` | Silent Push (future-attack infrastructure) | API key (`X-API-KEY` header) |
| `ip_api` | ip-api.com IP geolocation / enrichment | None (public) |
### 资产与端点管理
| Key | 平台 | 认证 |
|-----|----------|------|
| `netskope` | Netskope SASE / SSE | API token |
| `controlup` | ControlUp DEX | Bearer |
| `sentinelone` | SentinelOne Singularity XDR | API token |
| `carbon_black` | VMware Carbon Black Cloud | API key + connector ID |
| `armis` | Armis Centrix (IT/OT/IoT) | API secret key |
| `axonius` | Axonius Cybersecurity Asset Management | API key + secret |
| `claroty` | Claroty Platform (OT/IoT) | Username / password |
| `stellarcyber` | Stellar Cyber Open XDR | API key |
| `whistic` | Whistic (Vendor Risk) | API key |
| `proofpoint` | Proofpoint TAP | Basic |
| `trellix` | Trellix XDR / ePolicy Orchestrator (ePO) | OAuth2 |
| `sophos` | Sophos Central (Endpoint + Threat Intelligence) | OAuth2 |
| `lansweeper` | Lansweeper IT Asset Management | OAuth2 / Bearer |
| `fortiedr` | Fortinet FortiEDR | Username / password |
| `fortisiem` | Fortinet FortiSIEM | Username / password |
| `shodan` | Shodan | API key |
| `greynoise` | GreyNoise | API key |
| `cisa` | CISA KEV Catalog (public feed) | None (public) |
| `tanium` | Tanium Endpoint Management & Security | API token / session |
| `trendmicro_visionone` | Trend Micro Vision One XDR | Bearer token |
| `dynatrace` | Dynatrace Observability + App Security | API token |
### 恶意软件沙箱与动态分析
| Key | 平台 | 认证 |
|-----|----------|------|
| `joe_sandbox` | Joe Sandbox Cloud (dynamic malware analysis) | API key (form field) |
| `any_run` | ANY.RUN (interactive sandbox) | API key (`API-Key` header) |
| `hybrid_analysis` | Hybrid Analysis / Falcon Sandbox | API key + User-Agent header |
| `vmray` | VMRay (hypervisor-level analysis) | API key (`api_key` header) |
| `intezer` | Intezer Analyze (binary DNA attribution) | API key → JWT Bearer |
| `cuckoo` | Cuckoo Sandbox / CAPEv2 (dynamic malware analysis) | Bearer token |
### 托管检测与响应 (MDR)
| Key | 平台 | 认证 |
|-----|----------|------|
| `huntress` | Huntress Managed EDR / ITDR | HTTP Basic (key id + secret) |
| `arctic_wolf` | Arctic Wolf MDR | Bearer token (+ optional customer id) |
| `red_canary` | Red Canary MDR | API key (`X-Api-Key` header) |
### 渗透与攻击模拟 / 安全验证
| Key | 平台 | 认证 |
|-----|----------|------|
| `safebreach` | SafeBreach BAS | `x-apitoken` + `x-accountid` headers |
| `attackiq` | AttackIQ Security Optimization | Token header |
| `cymulate` | Cymulate BAS | `x-token` header |
| `picus` | Picus Security Validation | Refresh token → Bearer |
| `pentera` | Pentera automated validation | Bearer (tenant JWT) |
| `xm_cyber` | XM Cyber Attack Path Management | API key → session Bearer |
### 身份提供商与 ITDR
| Key | 平台 | 认证 |
|-----|----------|------|
| `okta` | Okta Identity Cloud | `Authorization: SSWS` |
| `entra_id` | Microsoft Entra ID (Azure AD) | OAuth2 (Microsoft Graph) |
| `ping_identity` | Ping Identity (PingOne) | OAuth2 client credentials |
| `silverfort` | Silverfort (ITDR runtime identity telemetry) | OAuth2 client credentials |
| `semperis` | Semperis DSP (AD / Entra posture + IoE/IoC) | Bearer token |
### 电子邮件安全
| Key | 平台 | 认证 |
|-----|----------|------|
| `mimecast` | Mimecast API 2.0 email security | OAuth2 client credentials |
| `ironscales` | IRONSCALES AI email security | Bearer + `X-Company-Id` header |
| `abnormal` | Abnormal Security (BEC / vendor impersonation) | Bearer token |
### 内部风险与用户行为分析 (UEBA)
| Key | 平台 | 认证 |
|-----|----------|------|
| `code42` | Code42 Incydr (file exfiltration / insider risk) | OAuth2 client credentials |
| `dtex` | DTEX InTERCEPT (behavioral insider threat) | Bearer token |
| `gurucul` | Gurucul UEBA | Bearer token |
| `exabeam` | Exabeam Security Operations Platform | OAuth2 client credentials |
| `securonix` | Securonix cloud SIEM / UEBA | Username/password → session token |
### DevSecOps 与机密检测
| Key | 平台 | 认证 |
|-----|----------|------|
| `gitguardian` | GitGuardian (secret incidents) | API key (`Authorization: Token`) |
### 实时事件情报与危机 Feed
| Key | 平台 | 认证 |
|-----|----------|------|
| `dataminr` | Dataminr Pulse (real-time event intelligence) | OAuth2 → `Dmauth` token |
| `factal` | Factal verified breaking-news intelligence | Bearer token |
| `samdesk` | Samdesk global crisis detection | `X-Api-Key` header |
| `human_security` | HUMAN Security (bot defense / ATO / credential stuffing) | OAuth2 client credentials |
### 证书透明度
| Key | 平台 | 认证 |
|-----|----------|------|
| `crtsh` | crt.sh public Certificate Transparency search | None (public) |
| `google_ct` | Google Certificate Transparency log API | None (public; per-log path) |
### 数字取证与事件响应 (DFIR)
| Key | 平台 | 认证 |
|-----|----------|------|
| `velociraptor` | Velociraptor open-source DFIR | Bearer token or mTLS cert/key |
| `magnet_axiom` | Magnet AXIOM Cyber (remote forensics) | `X-API-Key` header |
### 漏洞赏金与漏洞披露
| Key | 平台 | 认证 |
|-----|----------|------|
| `hackerone` | HackerOne (bug bounty / VDP) | HTTP Basic (username + token) |
| `bugcrowd` | Bugcrowd (managed bug bounty / pentest) | `Authorization: Token` header |
### GNAT 联邦
| Key | 平台 | 认证 |
|-----|----------|------|
| `gnat_remote` | Remote GNAT instance (federation / workspace sync) | Bearer token |
### AI 助手与协作
| Key | 平台 | 认证 |
|-----|----------|------|
| `copilot` | Microsoft Copilot for Security | DirectLine / Bearer |
| `chatgpt` | OpenAI ChatGPT | API key |
| `gemini` | Google Gemini | API key |
| `grok` | Grok AI | API key |
| `discord` | Discord (IOC extraction / CTI notifications/ GNAT command channel) | Bot token |
## 安装
```
pip install gnat # Core — urllib3 transport only
pip install "gnat[yaml]" # YAML support (pyyaml)
pip install "gnat[taxii]" # TAXII 2.x reading (taxii2-client)
pip install "gnat[ingest]" # Full ingest pipeline (taxii2-client + feedparser)
pip install "gnat[async]" # Async client (httpx)
pip install "gnat[persist]" # DB persistence (sqlalchemy)
pip install "gnat[schedule]" # Cron scheduling (croniter)
pip install "gnat[reports]" # PDF/DOCX reports (reportlab + python-docx)
pip install "gnat[viz]" # Visualization (plotly, networkx, openpyxl)
pip install "gnat[serve]" # Web dashboard + TAXII 2.1 server (fastapi, uvicorn)
pip install "gnat[tui]" # Interactive terminal UI (textual)
pip install "gnat[nlp]" # NLP query engine (zero deps for builtin; Claude backend requires [agents])
pip install "gnat[stix-validate]" # Tier-2 STIX pattern validation (stix2-patterns / ANTLR)
pip install "gnat[telemetry]" # High-volume sensor ingestion (kafka-python-ng + redis)
pip install "gnat[rules]" # Hy + YAML rule engines for hypothesis evaluation
pip install "gnat[rules-prolog]" # Prolog rule engine (requires SWI-Prolog)
pip install "gnat[analysis]" # Attribution & campaign tracking (sqlalchemy)
pip install "gnat[fast]" # Rust IOC hot-path extension (maturin wheel)
pip install "gnat[all]" # Core extras (yaml, taxii, ingest, async, persist, schedule, reports, viz, serve)
pip install "gnat[dev]" # All + ruff, mypy, pytest, httpx, sqlalchemy
```
## 快速开始
### 1. 配置
将 `config/config.ini.example` 复制到 `~/.gnat/config.ini`:
```
[DEFAULT]
timeout = 30
verify_ssl = true
[threatq]
host = https://threatq.example.com
client_id = my-client-id
client_secret = s3cr3t
auth_type = oauth2
[crowdstrike]
host = https://api.crowdstrike.com
client_id = cs-cid
client_secret = cs-secret
auth_type = oauth2
```
### 2. 连接并查询
```
import gnat
cli = gnat.GNATClient()
cli.connect(target="threatq")
# Ping — 验证连通性
print(cli.ping()) # True
# 列出指标
indicators = cli.list_objects("indicator", limit=50)
for ind in indicators:
print(ind.name, ind.pattern)
```
### 3. 使用 STIX ORM
```
# 创建
ind = gnat.Indicator(
client=cli,
name="Malicious IP",
pattern="[ipv4-addr:value = '198.51.100.99']",
indicator_types=["malicious-activity"],
confidence=85,
)
ind.save()
print(ind.id) # stix-id assigned by platform after save
# 更新
ind.description = "Seen in phishing campaign Q1-2025"
ind.save()
# 删除
ind.delete()
# 其他 ORM 类型
actor = gnat.ThreatActor(client=cli, name="APT-XYZ").save()
malware = gnat.Malware(client=cli, name="BlackCat", is_family=True).save()
vuln = gnat.Vulnerability(client=cli, name="CVE-2024-12345").save()
rel = gnat.Relationship(client=cli, relationship_type="uses",
source_ref=actor.id, target_ref=malware.id).save()
```
### 4. 切换平台——零代码修改
```
cli.connect(target="crowdstrike") # same ORM calls, different platform
ind = gnat.Indicator(client=cli, name="Evil Hash").save()
```
### 5. 自然语言查询
```
results = cli.natural_language_query(
"Get all malicious IPs related to Lazarus Group since January"
)
# → list[STIXBase] 已分发至所有已配置的 connectors
```
## 核心概念
### STIX 2.1 ORM
来自每个连接器的每个对象都会规范化为相同的 STIX 2.1 类型:
| ORM 类 | STIX 类型 | 典型用途 |
|-----------|-----------|-------------|
| `Indicator` | `indicator` | 带有 STIX pattern 的 IOC |
| `ThreatActor` | `threat-actor` | 攻击者画像、别名、归因 |
| `Vulnerability` | `vulnerability` | CVE、CVSS 评分、漏洞利用标志 |
| `AttackPattern` | `attack-pattern` | MITRE ATT&CK TTPs |
| `Malware` | `malware` | 恶意软件家族、功能、杀伤链阶段 |
| `Relationship` | `relationship` | 对象之间的链接 |
| `ObservedData` | `observed-data` | 原始 SIEM/传感器观测数据 |
所有类型均继承自 `STIXBase` (`gnat/orm/base.py`) 并支持:
- `to_dict()` / `from_dict()` / `to_stix_bundle()` —— 序列化
- `save()` / `select()` / `delete()` / `refresh()` —— 平台 I/O
- `Indicator` 上的 `validate=True` kwarg 在保存前验证 pattern
### 连接器契约
每个连接器都实现了 `BaseClient + ConnectorMixin`:
```
authenticate() # set up auth headers/tokens
health_check() # lightweight connectivity test → bool
get_object(id) # fetch single object → STIXBase
list_objects(type, ...) # paginated fetch → list[STIXBase]
upsert_object(stix_obj) # create or update → id string
delete_object(id) # delete by id
to_stix(raw_obj) # platform dict → STIXBase
from_stix(stix_obj) # STIXBase → platform dict
```
功能可在运行时进行内省:
```
from gnat.clients import get_client
client = get_client("threatq")
caps = client.capabilities() # all public methods with signatures and docs
result = client.call("list_objects", stix_type="indicator", allow_write=False)
```
## 摄取 Pipeline
将任意来源的威胁情报拉取为 STIX 2.1 对象:
```
import gnat
cli = gnat.GNATClient().connect("threatq")
result = (
gnat.IngestPipeline("daily-blocklist")
.read_from(gnat.PlainTextReader("blocklist.txt"))
.map_with(gnat.FlatIOCMapper(tlp_marking="amber", confidence=75))
.write_to(cli)
.deduplicate(key_fields=["name"])
.filter(lambda o: getattr(o, "confidence", 0) >= 50)
.run()
)
# IngestResult: 1204 条记录 → 1198 个 STIX 对象,已写入 1102 个
```
**STIX/TAXII Feed → CrowdStrike:**
```
from taxii2client.v21 import Server
server = Server("https://limo.anomali.com/api/v1/taxii2/", user="guest", password="guest")
collection = server.api_roots[0].collections[0]
result = (
gnat.IngestPipeline("taxii-feed")
.read_from(gnat.TAXIICollectionReader(collection, stix_types=["indicator"]))
.map_with(gnat.STIXPassthroughMapper(client=cli))
.write_to(cli)
.deduplicate()
.run()
)
```
**NVD CVE Feed → 漏洞跟踪:**
```
result = (
gnat.IngestPipeline("nvd-daily")
.read_from(gnat.JSONReader("nvdcve-1.1-recent.json", records_key="CVE_Items"))
.map_with(gnat.NVDCVEMapper(confidence=95))
.filter(lambda v: getattr(v, "x_cvss_score", 0) >= 7.0) # HIGH+ only
.run()
)
```
### 源读取器 (14)
| Reader | 来源 |
|--------|--------|
| `PlainTextReader` | 每行一个 IOC,自动分类 |
| `CSVReader` | 带有列映射的分隔文件 |
| `JSONReader` / `JSONLReader` | JSON 数组或 NDJSON |
| `STIXBundleReader` | STIX 2.x bundle 文件 |
| `TAXIICollectionReader` | TAXII 2.x 集合 |
| `SQLReader` | 任何 DB-API 2.0 数据库 |
| `MISPReader` | MISP 事件导出 JSON |
| `SyslogReader` | Syslog / CEF / LEEF 日志 |
| `RSSReader` | RSS 2.0 / Atom 1.0 订阅源 |
| `EmailReader` | RFC 2822 `.eml` 文件 |
| `OpenIOCReader` | OpenIOC 1.1 XML |
| `SplunkReader` | Splunk REST Search API |
| `ElasticReader` | Elasticsearch scroll API |
### 映射器 (12)
| Mapper | 生成内容 |
|--------|---------|
| `FlatIOCMapper` | `Indicator` |
| `STIXPassthroughMapper` | 任何 STIX 类型 |
| `MISPAttributeMapper` | `Indicator`、`Vulnerability`、`Malware` |
| `CEFMapper` | `Indicator` |
| `SQLRowMapper` | 可配置 |
| `CSVIndicatorMapper` | `Indicator` |
| `RSSEntryMapper` | `Indicator`、`Vulnerability` |
| `EmailIOCMapper` | `Indicator` |
| `OpenIOCMapper` | `Indicator` |
| `SplunkResultMapper` / `ElasticResultMapper` | `Indicator` |
| `NVDCVEMapper` | `Vulnerability` |
## 导出 Pipeline
通过可组合的过滤器、转换器和投递目标将 STIX 对象推送到目标位置:
```
result = (
gnat.ExportPipeline("tq-to-netskope")
.read_from(workspace)
.filter_with(gnat.TypeFilter("indicator"))
.filter_with(gnat.ConfidenceFilter(min=70))
.filter_with(gnat.SectorFilter(["healthcare", "financial"]))
.transform_with(gnat.NetskopeCETransform(
source_label="ThreatQ",
ioc_types=["domain", "url", "sha256"],
))
.deliver_to(gnat.PlatformDelivery(netskope_client))
.run()
)
```
### 过滤器
| Filter | 用途 |
|--------|---------|
| `TypeFilter` | 按 STIX 对象类型过滤 |
| `ConfidenceFilter` | 最低置信度阈值 |
| `TLPFilter` | 按 TLP 标记过滤 |
| `SectorFilter` | 按 `x_target_sectors` 过滤,支持别名扩展 |
| `IOCTypeFilter` | 按 IOC 类型过滤 |
| `LimitFilter` | 对象数量硬性上限 |
### 投递目标
| Target | 描述 |
|--------|-------------|
| `FileDelivery` | 写入本地文件 |
| `EDLServer` | 通过 HTTP 提供 EDL 服务 (为防火墙提供实时更新的 Indicator 列表) |
| `PlatformDelivery` | 推送到任何已注册的 GNAT 连接器 |
| `MultiDelivery` | 同时扇出分发到多个目标 |
## 调度
所有作业类型都在一个带有漂移修正 cron 调度的 `FeedScheduler` 中运行:
```
from gnat.schedule import FeedScheduler, FeedJob
scheduler = FeedScheduler()
class DailyIngestJob(FeedJob):
name = "daily-threatq-ingest"
cron = "0 2 * * *" # 02:00 UTC daily
def run(self, ctx):
# ctx.last_success_iso available for incremental ingestion
pipeline = IngestPipeline("threatq").read_from(...).run()
scheduler.register(DailyIngestJob())
scheduler.start()
# Export + Report 作业使用相同的 scheduler
scheduler.register(ExportJob(...))
scheduler.register(ReportJob(...))
```
功能:重叠保护 (跳过或排队策略)、`on_success`/`on_failure` 回调、
用于增量读取的 `ctx.last_success_iso`、用于静态 crontab 导出的 `to_cron_lines()`,
以及用于现有基础设施的 APScheduler 和 Celery 适配器。
## AI 代理与研究库
### 核心 AI 代理
| Agent | 角色 | 后端 |
|-------|------|---------|
| `ResearchAgent` | 主题驱动的综合分析;基于 Feed 的监控 | Claude API (`web_search` 工具) |
| `ParsingAgent` | 从非结构化文本中提取 STIX 对象 | Claude API |
| `CopilotReader` | 通过 DirectLine 查询 M365 内容 | Microsoft Bot Framework |
| `ConnectorHealthJob` | 定期连接器健康检查 + schema 漂移检测 | 内置 |
| `LLMClient` | 统一的 LLM 外观——Claude、OpenAI、Grok、Gemini,带有自动回退 | 多种 |
所有 AI 提取的对象的置信度上限为 `confidence_ceiling = 60` (可配置),并标记为
`x_source_type = "ai_extracted"`。默认的导出 pipeline 使用 `ConfidenceFilter(min=70)`,
确保 AI 情报在到达生产 EDL 之前需要分析师的提升确认。
```
[claude]
api_key = sk-...
model = claude-sonnet-4-6
ai_confidence_ceiling = 60
```
### 质量代理 (`gnat/agents/quality/`)
自动化连接器保证 pipeline——在 CI 和按需运行:
| Agent | 角色 |
|-------|------|
| `FixtureCoverageAgent` | 识别缺少测试夹具的连接器;生成覆盖率差距报告 |
| `NormalizationRegressionAgent` | 运行黄金夹具回归测试,以检测 STIX 规范化漂移 |
| `ContractAgent` | 验证所有 8 个必需的 `ConnectorMixin` 方法是否存在且类型正确 |
```
from gnat.agents.quality import NormalizationRegressionAgent, ContractAgent
agent = NormalizationRegressionAgent(policy=RegressionPolicy(fail_on_drift=True))
result = agent.run_all() # compare against golden fixtures
contract = ContractAgent()
profile = contract.check("crowdstrike") # ContractCheckResult
```
### 安全代理 (`gnat/agents/security/`)
用于运行时机密管理和代码整洁度的两个子包:
**代码整洁度** (`gnat/agents/security/hygiene/`):
| 模块 | 角色 |
|--------|------|
| `leak_scanner` | 扫描连接器输出中意外泄露的凭据/PII |
| `unsafe_patterns` | 检测不安全的编码模式 (硬编码机密、裸 `except` 等) |
| `duplicate_detector` | 标记重复的连接器注册和冲突的密钥别名 |
**机密管理** (`gnat/agents/security/secrets/`):
| 组件 | 角色 |
|-----------|------|
| `SecretsBroker` | 中央解析器——分派给配置的提供者 |
| `providers/` | 可插拔后端:`AzureKeyVaultProvider`、`CyberArkProvider`、`MemoryProvider` |
| `SecretResolver` | 解析 INI 配置值中的 `${secret:key}` 插值 token |
| `SecretsAuditLog` | 不可变的只追加日志,记录每一次机密访问,以供合规审计 |
### 仓库维护代理 (`gnat/agents/repo_maintenance/`)
自动化连接器生命周期管理:
| 组件 | 角色 |
|-----------|------|
| `DiscoveryEngine` | 扫描连接器目录;检测新增、已修改或失效的连接器 |
| `RepairPlanner` | 为偏离 `ConnectorMixin` 契约的连接器生成 `RepairPlan` (基于差异) |
| `VerificationEngine` | 运行修复后的验证检查并生成 `VerificationResult` |
| `MaintenanceExecutor` | 端到端编排发现 → 修复 → 验证 → PR 创建 |
| `ConnectorRegistry` | 包含元数据的、可查询的内存中所有 `ConnectorSpec` 条目的注册表 |
### 研究库
具有可控提升机制的三层团队知识库:
```
Personal Workspaces → Staging (_gnat_staging) → Library (_gnat_library)
analyst-owned anyone can write curated, read-only
nothing auto-promotes CurationJob every 4h
```
```
from gnat.research import ResearchLibrary
lib = ResearchLibrary.default()
# 在运行昂贵的 AI 研究前检查数据新鲜度
if not lib.is_fresh("APT29", max_age_hours=72):
agent = ResearchAgent(config)
results = agent.research("APT29 latest TTPs")
lib.promote(workspace, topic="APT29", researcher="analyst1", note="Q1 2025 update")
# 查询库
entries = lib.search("Cobalt Strike", tlp="amber")
```
`CurationJob` 对条目进行去重 (每个 topic 保留最新条目) 并强制执行 TTL:
`indicator` → 24 小时,`vulnerability` → 72 小时,`campaign` → 14 天,`threat_actor` → 30 天。
## 自然语言查询
```
cli = gnat.GNATClient()
results = cli.natural_language_query(
"Get all malicious IPs associated with Lazarus Group in the past 30 days"
)
# → list[STIXBase] 已分发至所有已配置的 connectors
```
两种后端——通过 `[nlp]` 节进行配置:
```
[nlp]
backend = builtin # builtin (zero deps) | claude (structured extraction)
model = claude-sonnet-4-6
```
**builtin** 后端使用正则表达式和关键词规则,零额外依赖。
**claude** 后端使用 Claude API 进行结构化的 `QuerySpec` 提取,支持
复杂的实体、IOC 类型、时间范围和平台过滤器。
CLI:`gnat nlq "Show me all CVEs exploited in the wild this week"`
## 行业目标情报
GNAT 将所有连接器的行业/部门数据规范化为一个单一的规范字段
`x_target_sectors` (任何 STIX 对象上的字符串列表),具有针对特定平台的提取逻辑:
| 平台 | 原生字段 | 提取方式 |
|----------|-------------|------------|
| ThreatQ | `attributes[].name` ∈ 扇区变体 | `_extract_sectors()` |
| Recorded Future | `relatedEntities[type=Industry].entity.name` | JSON 路径 |
| CrowdStrike | `target_industries[]` (对手对象) | JSON 路径 |
| VirusTotal | `popular_threat_category{}.value` | JSON 路径 |
| ShadowServer | `sector` (顶级报告字段) | JSON 路径 |
| Nucleus | `asset.industry` + `asset.tags[]` | 组合 |
Sector 别名在 `[sector_aliases]` 中配置,并由 `SectorFilter` 自动扩展:
```
[sector_aliases]
healthcare = Healthcare, Health, Medical, H-ISAC, Hospitals and Health Centers
financial = Financial Services, Finance, Banking, FS-ISAC
energy = Energy, Electric, Oil and Gas, E-ISAC
```
`SectorFilter` 是一个可组合的 `ExportFilter` 子类,可作为 `gnat.export.SectorFilter`
获取,并从 `gnat.reports.base` 重新导出以实现向后兼容。
## 自动化报告
三种报告类型,完全可调度:
| 类型 | 调度 | AI | 格式 | 受众 |
|------|----------|----|---------|----------|
| 每日情报 | `0 6 * * *` | 辅助 | PDF、HTML、Markdown | SOC / 换班交接 |
| 趋势 | `0 6 * * 1` | 辅助 | PDF、HTML | 团队负责人 |
| 年度 | `0 6 1 1 *` | 完全 | PDF、DOCX | 管理层 / 合规 |
```
from gnat.reports import ReportGenerator
gen = ReportGenerator.from_config(config)
gen.run("daily_healthcare", formats=["pdf", "html"], output_dir="/var/reports/")
```
生成 pipeline:`DataAggregator` → `ReportSynthesizer` (每个章节一次 Claude 调用) →
`Renderers` (MD/HTML/PDF/DOCX) → `Delivery` (电子邮件正文 HTML + SharePoint)。
CLI:`gnat report list` / `gnat report run --config report.daily_healthcare --formats pdf html`
## Solr 搜索 Sidecar
同时跨所有连接器的所有 STIX 对象进行全文搜索:
```
[search]
solr_url = http://localhost:8983/solr/gnat
enabled = true
batch_size = 100
```
| 组件 | 文件 | 功能 |
|-----------|------|----------|
| `GNATIndexer` | `gnat/search/index.py` | 添加/更新/删除 Solr 文档 |
| `SearchMixin` | `gnat/search/mixin.py` | 在 `upsert_object()` 时自动索引——零连接器代码修改 |
| `ORM 集成` | `gnat/search/orm_with_mixin.py` | 增强搜索功能的 STIX 对象 |
| `PipelinePatch` | `gnat/search/pipeline_patch.py` | 在映射后将摄取记录路由到 Solr |
| `LibraryPatch` | `gnat/search/library_patch.py` | ResearchLibrary 跨源关联 |
| Solr schema | `gnat/search/solr_schema_gnat.xml` | 针对 GNAT 字段的 Solr 9.x schema |
通过 `gnat viz serve --with-solr` 或 `gnat viz solr-dashboard --file dashboard.json` 生成 Grafana 仪表板。
## TAXII 2.1 服务器
每个 GNAT 工作空间都作为一个 TAXII 2.1 集合公开:
```
gnat taxii --port 8090 --api-key s3cr3t
gnat taxii --title "Acme TAXII" --contact admin@acme.com --port 9000
```
完整的端点覆盖:Discovery、API Root、Collections (列表/详情)、Objects (GET/POST/分页)、
Manifest、单个对象和版本历史记录。需要 `gnat[serve]`。
## STIX Pattern 验证
双层验证器——第一层无需依赖:
```
from gnat.stix import validate_pattern, PatternValidationError
result = validate_pattern("[ipv4-addr:value = '1.2.3.4']")
assert result.valid
validate_pattern("[bad", raise_on_error=True) # raises PatternValidationError
```
ORM 集成 (非破坏性,按对象选择启用):
```
from gnat.orm.indicator import Indicator
ind = Indicator(pattern="[ipv4-addr:value = '1.2.3.4']", validate=True)
```
CLI:`gnat validate pattern "[domain-name:value = 'evil.com']"`
`gnat validate bundle indicators.json --strict` (如果已安装,则使用 `stix2-patterns` ANTLR)
## 多租户部署
透明的工作空间命名空间隔离——无需进行 schema 迁移:
```
from gnat.context import WorkspaceManager
manager = WorkspaceManager.default()
acme = manager.for_tenant("acme")
ws = acme.create("apt28-investigation")
# 存储为 "acme::apt28-investigation"
beta = manager.for_tenant("beta")
ws2 = beta.create("apt28-investigation")
# 存储为 "beta::apt28-investigation" — 无冲突
```
CLI:`gnat tenant list` / `gnat tenant create acme --display-name "Acme Corp"` /
`gnat tenant workspaces acme` / `gnat tenant delete acme --yes`
## 终端 UI 与 Web 仪表板
### 终端 UI (Textual)
通过 SSH 工作——无需浏览器,无需显示服务器:
```
gnat tui # launch on dashboard screen
gnat tui query # start directly on NLP query screen
```
四个屏幕 (F1–F4):
- **Query** —— NLP 搜索栏 + 可滚动的 STIX 结果表
- **Library** —— 研究库浏览器;提升 (Ctrl+P) / 拒绝 (Ctrl+X)
- **Scheduler** —— 带有手动触发 (Ctrl+T) 的实时作业状态
- **Reports** —— PDF/HTML/DOCX 浏览器;在系统浏览器中打开 (Ctrl+O)
安装:`pip install "gnat[tui]"`
### Web 仪表板
标签:AI安全助手, DNS枚举, EDR, HTTP/HTTPS抓包, masscan, nuclei, ORM, Python, SOAR, Solr, STIX 2.1, TAXII 2.1, TIP, Web界面, 云端安全, 双向转换, 威胁情报, 威胁情报平台, 安全信息与事件管理, 安全数据管道, 安全聚合, 安全集成, 开发者工具, 态势感知, 情报接入, 情报转换, 搜索引擎查询, 搜索引擎爬取, 无后门, 版权保护, 统一接口, 网络传感器, 网络安全, 网络安全工具, 网络调试, 脆弱性评估, 自动化, 自动化报告, 隐私保护