chanadinh/threat-hunting-pipeline

GitHub: chanadinh/threat-hunting-pipeline

Stars: 0 | Forks: 0

# Threat Hunting Pipeline End-to-end agentic threat hunting pipeline. Pulls threat feeds → normalizes → maps to MITRE ATT&CK → generates Sigma rules via LLM with RAG → hunts against OpenSearch logs → produces analyst reports with recommendations. ## Architecture Maps 1:1 to the AWS architecture in the reference diagrams. Each pipeline stage is a Python module that doubles as a Lambda handler; orchestration is EventBridge + Step Functions in prod, a single Python function locally. feeds (CISA KEV, NVD, OTX, MISP) │ ▼ [1] feed-ingest → S3 raw-feeds (Lambda) [2] normalize → Finding[] (unified) (Lambda) [3] mitre-map → ATT&CK technique IDs (Lambda; STIX lookup + LLM fallback) [4] sigma-convert → Sigma YAML rules (Lambda → Fargate LLM gateway → SageMaker qwen3-coder) ↑ Qdrant RAG: top-5 Sigma exemplars [5] es-query → hunt hits (Lambda → OpenSearch) [6] scoring → hunt_rank (Lambda) [7] report-gen → markdown / docx → S3 (Lambda → LLM) [7b] story-agent → multi-finding narrative (Lambda → LLM) ## Layout pipeline/ schemas.py Finding, SigmaRule, HuntHit, Severity, FeedSource config.py env-driven config (single source of truth) stages/ ingest, normalize, mitre_map, sigma_gen, hunt, score, report llm/ LLM gateway (anthropic | sagemaker | mock) + prompts rag/ Qdrant retriever + in-memory cosine fallback storage/ s3, opensearch, docdb helpers orchestrator.py local Step Functions equivalent lambdas/ AWS handler shims (each calls a pipeline.stages function) infra/ Terraform: VPC, S3, OpenSearch, DocDB, Redis, Lambda, Step Functions data/ sigma_examples/ seed RAG corpus (8 hand-picked rules) samples/ offline test payloads per feed mitre/ cache slot for STIX bundle scripts/run_pipeline.py tests/test_end_to_end.py docker-compose.yml local OpenSearch + Qdrant + Mongo ## Quick start (local, no AWS) cd /home/linux/threat-hunting-pipeline pip install -r requirements.txt python scripts/run_pipeline.py --offline --source cisa_kev,nvd,otx,misp -v Offline mode uses the bundled sample feeds, the in-memory RAG retriever (seeded from `data/sigma_examples/`), the mock LLM, and writes outputs to `/tmp/thp-*`. The MITRE STIX bundle is read from `/home/linux/attack/enterprise-attack.json`. ## Live local (real LLM, real OpenSearch) docker compose up -d # opensearch, qdrant, mongo export LLM_BACKEND=anthropic export ANTHROPIC_API_KEY=... # uses claude-opus-4-7 by default python scripts/run_pipeline.py --source cisa_kev -v ## Production (the AWS diagram) cd infra/ terraform init && terraform apply Sets up: VPC across 2 AZs · S3 buckets (raw-feeds, reports, sigma-rules, t3000-corpus, dashboard, access-logs) · OpenSearch (3 data + 3 master, multi-AZ, KMS-encrypted) · DocumentDB · ElastiCache (Redis) · 8 Lambdas behind a Step Functions state machine · EventBridge daily cron at 06:00 UTC · IAM roles scoped per stage. Not provisioned by this Terraform (deliberately — they need separate decisions): - **SageMaker endpoint for qwen3-coder 30B** — provision via SageMaker JumpStart or a custom container; set `SAGEMAKER_ENDPOINT` env var on the Lambdas. - **Fargate LLM gateway** — thin service that bundles request batching, retries, and cost accounting around the SageMaker endpoint. - **Fargate Qdrant** — persistent vector DB for the Sigma exemplar corpus. - **Fargate syslog receiver** — UDP/TCP 514 listener (Lambda cannot bind raw UDP). - **Cognito user pool + ALB + WAF + CloudFront** for the analyst dashboard. ## How the agentic RAG step works (stage 4) For each `(finding, technique)` pair: 1. Build a query string: `f"{finding.title} {finding.description[:300]} {technique}"`. 2. Retriever returns top-`k` (default 5) most similar Sigma rules from the corpus. Rules already tagged with this technique get a `+0.25` score boost. 3. Prompt the LLM with: - the threat context (title, CVEs, products, observables) - the target ATT&CK technique - the 5 exemplar rules as full YAML (style guide, not template) 4. The LLM emits a single Sigma YAML rule. Code fences are stripped, the doc is parsed and validated against the minimum schema (`title`, `logsource`, `detection` with a `condition`). Invalid rules are dropped with an error logged on the `Finding`, rather than throwing — partial output is more useful than no output. 5. Each accepted rule is written to S3 under `{finding_id}/{rule_id}.yml` with provenance (RAG exemplar IDs, confidence score, generated_by). ## Swapping components - **Different LLM:** add a class to `pipeline/llm/client.py` and wire it via `LLM_BACKEND=...`. - **Different vector DB:** add a class to `pipeline/rag/retriever.py` with the same `index() / search()` interface. - **Different log store:** replace `pipeline/storage/opensearch.py` (Splunk, Elastic, ClickHouse — Sigma backends exist for all of them). ## Cost guardrails - `orchestrator.run(max_findings=5)` caps LLM calls per run. - `sigma_gen.run` generates at most 3 rules per finding. - Step Functions Map state in prod runs LLM-heavy stages with concurrency limits — set in `infra/stepfunctions.tf` once a baseline is profiled. ## Status - [x] All 7 stages implemented and runnable offline - [x] Lambda handlers + Terraform skeleton - [ ] Dashboard (Vite static → S3 → CloudFront) - [ ] Real Sigma backend in `hunt.py` once pysigma is pinned to a tested version - [ ] Cost telemetry per LLM call