chanadinh/threat-hunting-pipeline
GitHub: chanadinh/threat-hunting-pipeline
Stars: 0 | Forks: 0
# Threat Hunting Pipeline
End-to-end agentic threat hunting pipeline. Pulls threat feeds → normalizes →
maps to MITRE ATT&CK → generates Sigma rules via LLM with RAG → hunts against
OpenSearch logs → produces analyst reports with recommendations.
## Architecture
Maps 1:1 to the AWS architecture in the reference diagrams. Each pipeline stage
is a Python module that doubles as a Lambda handler; orchestration is
EventBridge + Step Functions in prod, a single Python function locally.
feeds (CISA KEV, NVD, OTX, MISP)
│
▼
[1] feed-ingest → S3 raw-feeds (Lambda)
[2] normalize → Finding[] (unified) (Lambda)
[3] mitre-map → ATT&CK technique IDs (Lambda; STIX lookup + LLM fallback)
[4] sigma-convert → Sigma YAML rules (Lambda → Fargate LLM gateway → SageMaker qwen3-coder)
↑ Qdrant RAG: top-5 Sigma exemplars
[5] es-query → hunt hits (Lambda → OpenSearch)
[6] scoring → hunt_rank (Lambda)
[7] report-gen → markdown / docx → S3 (Lambda → LLM)
[7b] story-agent → multi-finding narrative (Lambda → LLM)
## Layout
pipeline/
schemas.py Finding, SigmaRule, HuntHit, Severity, FeedSource
config.py env-driven config (single source of truth)
stages/ ingest, normalize, mitre_map, sigma_gen, hunt, score, report
llm/ LLM gateway (anthropic | sagemaker | mock) + prompts
rag/ Qdrant retriever + in-memory cosine fallback
storage/ s3, opensearch, docdb helpers
orchestrator.py local Step Functions equivalent
lambdas/ AWS handler shims (each calls a pipeline.stages function)
infra/ Terraform: VPC, S3, OpenSearch, DocDB, Redis, Lambda, Step Functions
data/
sigma_examples/ seed RAG corpus (8 hand-picked rules)
samples/ offline test payloads per feed
mitre/ cache slot for STIX bundle
scripts/run_pipeline.py
tests/test_end_to_end.py
docker-compose.yml local OpenSearch + Qdrant + Mongo
## Quick start (local, no AWS)
cd /home/linux/threat-hunting-pipeline
pip install -r requirements.txt
python scripts/run_pipeline.py --offline --source cisa_kev,nvd,otx,misp -v
Offline mode uses the bundled sample feeds, the in-memory RAG retriever
(seeded from `data/sigma_examples/`), the mock LLM, and writes outputs to
`/tmp/thp-*`. The MITRE STIX bundle is read from `/home/linux/attack/enterprise-attack.json`.
## Live local (real LLM, real OpenSearch)
docker compose up -d # opensearch, qdrant, mongo
export LLM_BACKEND=anthropic
export ANTHROPIC_API_KEY=... # uses claude-opus-4-7 by default
python scripts/run_pipeline.py --source cisa_kev -v
## Production (the AWS diagram)
cd infra/
terraform init && terraform apply
Sets up: VPC across 2 AZs · S3 buckets (raw-feeds, reports, sigma-rules,
t3000-corpus, dashboard, access-logs) · OpenSearch (3 data + 3 master, multi-AZ,
KMS-encrypted) · DocumentDB · ElastiCache (Redis) · 8 Lambdas behind a Step
Functions state machine · EventBridge daily cron at 06:00 UTC · IAM roles
scoped per stage.
Not provisioned by this Terraform (deliberately — they need separate decisions):
- **SageMaker endpoint for qwen3-coder 30B** — provision via SageMaker JumpStart
or a custom container; set `SAGEMAKER_ENDPOINT` env var on the Lambdas.
- **Fargate LLM gateway** — thin service that bundles request batching, retries,
and cost accounting around the SageMaker endpoint.
- **Fargate Qdrant** — persistent vector DB for the Sigma exemplar corpus.
- **Fargate syslog receiver** — UDP/TCP 514 listener (Lambda cannot bind raw UDP).
- **Cognito user pool + ALB + WAF + CloudFront** for the analyst dashboard.
## How the agentic RAG step works (stage 4)
For each `(finding, technique)` pair:
1. Build a query string: `f"{finding.title} {finding.description[:300]} {technique}"`.
2. Retriever returns top-`k` (default 5) most similar Sigma rules from the
corpus. Rules already tagged with this technique get a `+0.25` score boost.
3. Prompt the LLM with:
- the threat context (title, CVEs, products, observables)
- the target ATT&CK technique
- the 5 exemplar rules as full YAML (style guide, not template)
4. The LLM emits a single Sigma YAML rule. Code fences are stripped, the doc
is parsed and validated against the minimum schema (`title`,
`logsource`, `detection` with a `condition`). Invalid rules are dropped
with an error logged on the `Finding`, rather than throwing — partial
output is more useful than no output.
5. Each accepted rule is written to S3 under `{finding_id}/{rule_id}.yml`
with provenance (RAG exemplar IDs, confidence score, generated_by).
## Swapping components
- **Different LLM:** add a class to `pipeline/llm/client.py` and wire it via
`LLM_BACKEND=...`.
- **Different vector DB:** add a class to `pipeline/rag/retriever.py` with the
same `index() / search()` interface.
- **Different log store:** replace `pipeline/storage/opensearch.py` (Splunk,
Elastic, ClickHouse — Sigma backends exist for all of them).
## Cost guardrails
- `orchestrator.run(max_findings=5)` caps LLM calls per run.
- `sigma_gen.run` generates at most 3 rules per finding.
- Step Functions Map state in prod runs LLM-heavy stages with concurrency
limits — set in `infra/stepfunctions.tf` once a baseline is profiled.
## Status
- [x] All 7 stages implemented and runnable offline
- [x] Lambda handlers + Terraform skeleton
- [ ] Dashboard (Vite static → S3 → CloudFront)
- [ ] Real Sigma backend in `hunt.py` once pysigma is pinned to a tested version
- [ ] Cost telemetry per LLM call