z3r0-c001995/Big-Data-Analytics-Framework-for-Predictive-Cyber-Threat-Intelligence
GitHub: z3r0-c001995/Big-Data-Analytics-Framework-for-Predictive-Cyber-Threat-Intelligence
基于 Spark、Kafka 和 HDFS 构建的预测性网络威胁情报大数据分析框架,覆盖从多源数据采集、流式处理、ML 模型训练到 SOC 仪表盘与 SOAR 自动响应的全链路。
Stars: 0 | Forks: 0
# System Architecture and Development Guide
This repository implements a predictive CTI (cyber threat intelligence) data
pipeline using big data analytics. The system design and development approach
are based on the project report and proposal in `docs/`, and the runtime is
centered on Spark, Kafka, and HDFS.
## Reference Docs (Repository-Verified)
Present in this repository snapshot:
- `docs/FEATURE_RECORD.md` - change log of implemented ingestion, processing,
runtime, and hardening features.
- `docs/REPO_AUDIT_REPORT.md` - grounded implementation audit of the current
repository state.
- `docs/IMPLEMENTATION_PLAN.md` - phased roadmap for closing the remaining
architecture gaps.
- `docs/FRONTEND_AUDIT_REPORT.md` - audit of the original dashboard frontend
and the upgrade path into the SOC application shell.
- `docs/SOC_UI_ARCHITECTURE.md` - route map, component inventory, and UI
integration design for the SOC interface.
- `img/Enterprise Big Data Analytics Framework for Predictive CTI (SOC-Grade).jpg`
- architecture image currently shipped with the repository.
Referenced by the README but not present in this repository snapshot:
- `docs/Final Year Project.pdf`
- `docs/final year project proposal.docx`
- `docs/BDA Framework for CTI Prediction-2025-11-06-120442.png`
- `docs/BDA Framework for CTI Prediction-2025-11-06-125406.png`
- `docs/Untitled diagram-2025-11-06-115607.png`
Treat the missing proposal/FYP assets as external references unless they are
added back into `docs/`.
## Architecture Summary (Attached Diagram Alignment)
The architecture in the attached diagram is a closed-loop predictive CTI system.
This repository now documents and implements the same flow while keeping Docker
as the runtime backbone.
1. Data sources
- Threat intel, OSINT/dark web, IDS/SIEM, network/app/firewall/host logs.
- Standards: STIX/TAXII and CTI feed APIs.
- Current files: `datasets/`, `ingestion/online_sources.yaml`,
`ingestion/online_ingestion_to_kafka.py`.
2. Ingestion and streaming layer
- Kafka is the streaming backbone; collectors publish events and CTI objects.
- Collection tools:
- Filebeat for host/app logs and Suricata/Zeek file outputs.
- Python API collectors for TAXII/STIX/OpenCTI/MISP/abuse.ch/threat feeds.
- Fluent Bit for lightweight firewall/network file forwarding.
- Logstash only for heavy parsing (network/firewall syslog).
- Current files: `streaming_ingest.py`, `batch_ingest.py`,
`ingestion/config.yaml`, `ingestion/online_sources.yaml`, `pipeline.py`.
- Docker services: `zookeeper`, `kafka`, `filebeat`, `fluent-bit`,
`logstash`, `spark-streaming`, `kafka-ui`, `kafka-monitor-consumer`.
3. Big data processing layer
- Spark Streaming + PySpark ETL for normalization, windowing, and cleaning.
- Standardized schema + label resolution applied for cross-source consistency.
- Current files: `schema_mapper.py`, `batch_ingest.py`, `streaming_ingest.py`.
- Docker services: `spark-master`, `spark-worker`, `spark-streaming`.
4. Detection, feature extraction, and enrichment
- Feature engineering and anomaly-oriented analysis feed training datasets.
- Current implementation path: `training/prepare_training_data.py`,
`evaluation/verify_dataset_schema.py`.
- Planned additions: unsupervised detection jobs and enrichment modules.
5. Distributed storage layer
- Raw/feature data in HDFS + Parquet plus operational search/entity stores.
- Current storage: local `data/raw/cti_raw`, HDFS `/cti/raw`, Elasticsearch
for events/alerts/predictions, MongoDB for entities/relationships.
- Operational export path: `storage/export_operational_data.py`.
- Docker services: `namenode`, `datanode`, `elasticsearch`, `mongodb`.
6. Model training, registry, and serving
- Training pipeline and model registry are implemented.
- The operational export path now supports generic joblib-backed scikit-learn
scorers, so non-linear models such as gradient boosted trees can be
promoted as the active production scorer.
- Model-serving is active and exposes SOC-ready hybrid inference APIs:
- `POST /predict` (model_kind=`network|malware`)
- `POST /anomaly`
- `POST /forecast`
- versioned routes: `/v1/predict/network`, `/v1/predict/malware`,
`/v1/anomaly`, `/v1/forecast`
- Current files: `training/train_baseline_model.py`, `registry/`,
`models/`, `storage/`, `serving/app.py`.
- Current services: `postgres`, `mlflow`.
- Current runtime behavior: `make model-train` logs the selected export model
into MLflow and registers it under `cti-active-export-model`.
- Real-time inference path:
Spark enriched events -> operational export -> model-serving -> Elasticsearch/MongoDB -> dashboard/email/SOAR.
7. Alerting, SOAR, analyst feedback loop
- Diagram flow: alerts + dashboards -> SOAR -> analyst console ->
retraining feedback into feature store/training.
- Current state: SOC UI, SOAR sync backend, and dashboard SOAR visibility are
implemented.
- Current files: `soar/`, `docker/soar/`, `dashboards/index.html`,
`dashboards/app.js`, `dashboards/control_server.py`,
`dashboards/collect_metrics.py`.
- Current behavior:
- `make soar-up`, `make soar-down`, `make soar-status`, `make soar-sync`
- `make soar-bootstrap` provisions TheHive/Cortex service accounts and
writes the resulting API keys back into `.env`
- `make soar-configure-analyzers` curates and enables the Cortex
auto-run worker set for the `bda-cti` org
- TheHive/Cortex health is surfaced in the dashboard snapshot
- synced alert / case / analyzer job summaries are exposed to the SOAR page
- manual `Run analyzers` / `Create case` actions are exposed from Alerts,
Incidents, and Analyst Console via the dashboard control API
- automatic analyzer execution now covers both newly-synced alerts and
already-synced alerts, while still avoiding duplicate auto-runs for the
same alert/analyzer/observable tuple
- default local binds use `19000` for TheHive and `19001` for Cortex to
avoid clashes with unrelated services already common on `9000/9001`
- the default curated analyzer set is:
`IP-API`, `GoogleDNS_resolve`, `Urlscan.io_Search`,
`Crt_sh_Transparency_Logs`, `CIRCLHashlookup`,
`CIRCLVulnerabilityLookup`
- Cortex Docker jobs now use the host-mapped job directory rather than
the in-container path, which is required for analyzer containers to
receive their input payloads correctly
- Operator flow:
1. `make soar-up`
2. `make soar-bootstrap`
3. `make soar-sync`
8. Real-Time Host Logging, Agent and Email Alerting Framework
- Real-time Log Monitoring: The host-level agent `agent/bda_cti_agent.py` tails `/var/log/ufw.log` in real-time, extracts key fields from `[UFW BLOCK]` events (such as SRC, DST, PROTO, SPT, DPT), and publishes them directly into Kafka.
- Network Interface Automation: The script `automation/update_network_config.py` dynamically auto-detects the host's primary active network interface (e.g., `wlo1`), updating the platform public IP globally across container `.env` files and `agent/agent_config.json` to prevent loopback (`127.0.0.1`) connection issues.
- Real-Time Gateway Bridge (`ingestion/agent_bridge.py`): Connects Kafka telemetry topics directly to Elasticsearch and MongoDB. It aggregates and flushes telemetry (from `alerts`, `logs.host`), applies real-time inline Machine Learning threat scoring for critical security alarms (e.g. reverse shells, malware), and evaluates Playbook Engine triggers in the background.
- Resilient Email Alerting Pipeline: An asynchronous `NotificationService` is embedded in the FastAPI backend lifecycle (`backend/main.py`), consuming `alerts` and `audit` topics from Kafka to send immediate alerts via SMTP (with STARTTLS on port 587) to configured analyst emails.
- Complete System Auditing (`backend/core/audit.py`): Hooks system actions (logins, isolated hosts, training triggers) directly into the Kafka pipeline, dispatching them synchronously to the alerting system and analyst emails.
## Docker Deployment Model (Keep As-Is)
This project intentionally keeps Docker Compose as the operational core.
- Existing compose file remains the source of truth: `docker/docker-compose.yaml`.
- Current always-on stack: Kafka/Zookeeper, HDFS (NameNode/DataNode), Spark
cluster (master/worker), streaming job container, Kafka UI, Elasticsearch,
MongoDB, and a persistent monitoring consumer group container.
- `spark-streaming` now uses container-local Ivy cache (`/tmp/.ivy2`) to avoid
host bind-permission issues when downloading Spark Kafka connector packages.
- Default `spark-streaming` Compose env keeps raw HDFS sink disabled
(`RAW_SINK_ENABLED=false`) so feature enrichment stays online even if HDFS is
temporarily unhealthy. Re-enable raw HDFS writes by setting
`RAW_SINK_ENABLED=true` and `RAW_OUTPUT_PATH`/`RAW_CHECKPOINT_PATH`.
- New architecture blocks still pending after this tranche: model serving,
real-time inference, SOAR, analyst workflow.
- Runtime rule:
- Host scripts use `KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092`
- Container-to-container traffic uses `kafka:29092`
## Development Guide (Derived from the Proposal and Report)
The docs recommend a hybrid workflow: CRISP-DM for data science and Agile-Scrum
for iterative system delivery. Use this as the default development path.
1. Business understanding
- Target proactive detection and early warning, not just reactive alerts.
- Keep objectives aligned with the PDF: predictive CTI and incident response.
2. Data understanding
- Inventory data sources (datasets, logs, CTI feeds).
- Validate schema consistency before scaling ingestion.
3. Data preparation
- Normalize columns with `schema_mapper.py`.
- Build repeatable ingestion configs in `ingestion/config.yaml`.
4. Modeling
- Start with supervised baselines (Random Forest, XGBoost), then LSTM for
temporal signals.
- Track experiments with MLflow and store artifacts under `models/`.
5. Evaluation
- Use metrics called out in the report: accuracy, precision, recall, and
mean time to detect (MTTD).
- Keep evaluation outputs under `evaluation/`.
6. Deployment
- Integrate batch and streaming pipelines into the Docker stack.
- Plan for real-time inference in streaming paths before alerting.
## Key Components and Files
- Kafka topics (required architecture):
`logs.suricata`, `logs.host`, `logs.firewall`, `cti.taxii`, `cti.misp`,
`osint.darkweb`, `enriched.features`.
- Kafka topics config/bootstrap: `ingestion/required_topics.yaml`,
`ingestion/ensure_kafka_topics.py`
- Batch ingestion: `batch_ingest.py`
- Streaming ingestion: `streaming_ingest.py`
- OSINT ingestion to Kafka: `ingestion/online_ingestion_to_kafka.py`
- Network scan collector to Kafka: `ingestion/network_scan_to_kafka.py`
- Local dataset-to-Kafka publisher: `ingestion/local_sources_to_kafka.py`
- Host system log-to-Kafka publisher: `ingestion/system_logs_to_kafka.py`
- Ingestion coverage verification: `ingestion/verify_ingestion_stack.py`
- Online sources config: `ingestion/online_sources.yaml`
- Dataset registry: `ingestion/config.yaml`
- Schema normalization: `schema_mapper.py`
- Pipeline orchestration: `pipeline.py`
- Pipeline config runner: `run_pipeline.py`, `pipeline_config.yaml`
- Training data preparation: `training/prepare_training_data.py`
- Kafka feature-store materialization: `training/materialize_feature_store.py`
- Data-processing + anomaly scoring: `training/data_processing_pipeline.py`
- Feature extraction + enrichment: `training/feature_extraction_enrichment.py`
- Baseline model training: `training/train_baseline_model.py`
- Environment template: `.env.example`
- One-command wrapper: `Makefile` (`make collect`, `make nvd-daily`,
`make feature-store`, `make data-processing`, `make feature-enrichment`,
`make model-train`, `make operational-export`, `make smoke`,
`make start`, `make stop`, `make status`)
- Filebeat config: `docker/filebeat/filebeat.yml`
- Logstash pipeline: `docker/logstash/pipeline/logstash.conf`
- Fluent Bit config: `docker/fluent-bit/fluent-bit.conf`
- Sample collector input directories: `logs/app`, `logs/suricata`, `logs/zeek`,
`logs/firewall`, `logs/network`
- Feature record log: `docs/FEATURE_RECORD.md`
- Containerized runtime: `docker/docker-compose.yaml`
- Optional local MISP stack: `docker/misp/docker-compose.yaml`
- Kafka web UI (Kafka UI): `http://localhost:8085`
- Permanent Kafka monitoring consumer group:
`MONITOR_CONSUMER_GROUP` (default `cti-monitor-group`)
- Feature-store dashboard page: `dashboards/feature_store.html`
- Feature-store Spark metrics helper: `dashboards/feature_store_metrics_job.py`
- Dashboard control server: `dashboards/control_server.py`
- One-switch runtime loop: `automation/system_runtime.py`
- Real-time Host Agent: `agent/bda_cti_agent.py` (tails local log files, UFW blocks, system actions)
- Real-time Gateway Bridge: `ingestion/agent_bridge.py` (indexes log events, applies inline ML scoring, triggers playbooks)
- Network IP Auto-configuration: `automation/update_network_config.py` (automatically discovers public interface IP)
- Asynchronous Notification service: `backend/services/notification_service.py` (dispatches real-time email alerts)
- Enterprise SOC UI Shell: `frontend/` / `endguardui` (modern dashboard with views for threat models, hunting playbooks, audits)
- Operational storage exporter: `storage/export_operational_data.py`
- Elasticsearch mappings:
`storage/elasticsearch/cti-events.mapping.json`,
`storage/elasticsearch/cti-alerts.mapping.json`,
`storage/elasticsearch/cti-predictions.mapping.json`
## Big Data Analytics Workflow (Docs Alignment)
The proposal outlines a full analytics workflow that should guide future builds:
data acquisition, ingestion and preprocessing, storage and management, exploratory
analysis, visualization and alerting, feedback and continuous improvement, and
governance and security.
## Environment + One-Command Collection
1. Create local environment file
cp .env.example .env
2. Set required keys in `.env` for private integrations (TAXII/OpenCTI/MISP/
threatfeeds). Public feeds run without these keys.
Monitoring consumer defaults (optional override):
- `MONITOR_CONSUMER_TOPIC=enriched.features`
- `MONITOR_CONSUMER_GROUP=cti-monitor-group`
Local MISP Docker defaults (optional):
- `MISP_HOST_PORT=8081`
- `MISP_BASEURL=http://127.0.0.1:8081`
- `MISP_DATA_ROOT=/home/mr_robot/misp-data` (outside repo)
OpenCTI-specific note:
- `OPENCTI_URL` must be the GraphQL endpoint, not just the UI base URL.
- If your UI is `http://localhost:8080`, set
`OPENCTI_URL=http://localhost:8080/graphql`.
- If a collector runs inside Docker on the same network as OpenCTI, use
`OPENCTI_URL=http://opencti:8080/graphql`.
- Get `OPENCTI_TOKEN` from your OpenCTI user profile API key.
- Quick check:
curl -sS "$OPENCTI_URL" \
-H "Authorization: Bearer $OPENCTI_TOKEN" \
-H "Content-Type: application/json" \
-d '{"query":"{ me { id name } }"}'
Expected:
- JSON response with `data.me` present (token + URL are valid).
3. Run end-to-end ingestion and streaming collection in one command
make collect
Notes:
- `make collect` loads `.env` automatically if present.
- Default behavior skips heavy batch ingestion (`SKIP_BATCH=1`).
- To include batch ingestion:
make collect SKIP_BATCH=0
4. Run incremental NVD ingestion (daily window)
make nvd-daily
Notes:
- Default `NVD_SINCE` is UTC now minus 24 hours.
- Override window and volume as needed:
make nvd-daily NVD_SINCE=2026-02-15T00:00:00Z NVD_MAX_ITEMS=5000
5. Materialize Kafka enriched events into a Parquet feature store
make feature-store FEATURE_LIMIT=5000 FEATURE_MODE=overwrite
Expected:
- Feature rows are written to
`data/processed/feature_store/enriched_features` partitioned by
`event_date` and `source_type`.
6. Run the data-processing pipeline (feature engineering + anomaly scoring)
make data-processing
Expected:
- Curated processed features are written to
`data/processed/data_processing/curated_features`.
- High-anomaly slices are written to
`data/processed/data_processing/anomalies`.
- Run report is written to `training/last_data_processing_report.json`.
7. Run feature extraction and enrichment for model-ready features
make feature-enrichment
Expected:
- Enriched event-level features:
`data/processed/feature_enrichment/event_features`
- Session-level feature aggregates:
`data/processed/feature_enrichment/session_features`
- Run report:
`training/last_feature_enrichment_report.json`
8. Train baseline model from enriched event features
make model-train
Expected:
- Active export model metadata is written to
`models/baseline_logistic_regression_model/metadata.json`.
- The active production scorer may now be a non-linear sklearn model such as
`baseline_gradient_boosted_tree_model`; the output directory name is retained
for backward compatibility with the existing runtime and Makefile wiring.
- Logistic regression is retained as the rollback scorer and is recorded under
`fallback_model` inside the active metadata/report payloads.
- Candidate model artifacts are written under
`models/baseline_logistic_regression_model/candidates/`.
- Training report is written to `training/last_model_training_report.json`.
- Markdown comparison report is written to
`training/last_model_training_report.md`.
- MLflow run metadata is embedded into
`training/last_model_training_report.json["mlflow"]`.
- The export model is registered in MLflow as `cti-active-export-model`.
- Dashboard `Model Training` card shows rows, active model, recommended model,
and active-model metrics.
- Default active-model promotion is now `governed_auto`, not a hard pin to
gradient boosted trees:
- candidate models are evaluated on source-aware validation and out-of-source
test splits
- the promoted active scorer must satisfy minimum validation F1, test F1, and
test recall gates
- logistic regression remains the explicit rollback scorer
Optional registry-only startup:
make registry-up
Registry endpoints:
- MLflow UI/API: `http://127.0.0.1:5001`
- PostgreSQL backend: internal Compose service `postgres`
9. Export operational events, alerts, predictions, entities, and relationships
make operational-export
Expected:
- Elasticsearch event index receives searchable processed events.
- Elasticsearch alert index receives anomaly-driven alert documents.
- Elasticsearch prediction index receives model-scored prediction documents.
- The prediction exporter loads the active production scorer from
`models/baseline_logistic_regression_model/metadata.json` and
`model.joblib`, with automatic fallback to the tuned logistic model if the
promoted scorer artifact is missing or invalid.
- Batch Spark jobs now execute inside `spark-worker` by default, while
`spark-streaming` stays dedicated to the long-running stream ingestion job.
- Batch executors receive `PYTHONPATH=/opt/app` so repo modules such as
`storage.model_scoring` are importable during runtime scoring UDF execution.
- MongoDB receives entity and relationship documents for CTI browsing and
downstream graph-style workflows.
- Run report is written to `storage/last_operational_export_report.json`.
- The export report now includes per-index `document_stats` showing:
- input rows
- unique documents
- duplicates collapsed before upsert
- Elasticsearch index counts are cumulative over time; use the export report to
inspect the current run's unique document totals.
- `make model-train` now trains a stratified candidate suite
(`logistic_regression`, `random_forest`, `gradient_boosted_tree`, `svm`)
with cross-validation, random oversampling, PR-AUC reporting, and
out-of-fold threshold optimization.
- The supervised label is now `is_malicious`, derived upstream from
`source_name` + `raw_label`. `is_anomaly` remains in the operational feature
set, but it is no longer the default predictor target.
- Leakage-prone anomaly-derived inputs were removed from the supervised
training path; the default model feature set no longer trains directly on
`anomaly_score` or `anomaly_density_1h`.
- Default benchmark holdouts are source-aware:
- train: `CSE_CIC_IDS2018_Kaggle`
- validation: `UNSW_NB15_Kaggle`, `UNSW_NB15_HF`
- test: `TON_IOT_Network_Kaggle`
- Active scorer promotion is controlled explicitly through:
- `MODEL_TRAIN_ACTIVE_MODEL` (default: `auto`)
- `MODEL_TRAIN_FALLBACK_MODEL` (default: `logistic_regression`)
- `MODEL_TRAIN_PROMOTION_MIN_VALIDATION_F1` (default: `0.6`)
- `MODEL_TRAIN_PROMOTION_MIN_TEST_F1` (default: `0.55`)
- `MODEL_TRAIN_PROMOTION_MIN_TEST_RECALL` (default: `0.45`)
- Batch execution container defaults:
- `SPARK_BATCH_CONTAINER=spark-worker`
- `SPARK_BATCH_PYTHONPATH=/opt/app`
- The generic operational scorer currently supports:
- logistic regression
- random forest
- gradient boosted tree
- support vector machine
- `training/last_model_training_report.json` now includes a `data_diagnosis`
block with positive prevalence and class-imbalance ratio so misleading
accuracy is easier to spot during review.
- `training/last_model_training_report.json` also includes `split_details`
describing row counts, label balance, and source composition for the
source-aware train / validation / test holdouts.
- `training/train_baseline_model.py` already contains optional XGBoost and
LightGBM hooks; enable them through `MODEL_TRAIN_MODELS=...` once those
dependencies are present in the runtime image.
- The latest governed source-aware run promotes `baseline_random_forest_model`
as the active runtime scorer and keeps
`baseline_logistic_regression_model` as fallback because random forest passes
the automatic promotion gates while logistic regression fails the
out-of-source generalization gates.
Separate malware branch:
- `make malware-intel-snapshot` pulls a MalwareBazaar metadata snapshot into
`data/processed/malware_branch/`.
- `make malware-prepare` prepares EMBER + MalwareBazaar data into a dedicated
malware parquet output without mixing it into the network predictor path.
- MalwareBazaar now requires `MALWAREBAZAAR_AUTH_KEY`; without it, snapshot
collection is skipped cleanly instead of failing with a 401.
- `ingestion/fetch_malwarebazaar_snapshot.py` and
`ingestion/online_ingestion_to_kafka.py` now auto-load `.env` when run
directly, so local credential-backed collectors no longer depend on `make`
for environment export.
Online ingestion defaults are now bounded for operational safety:
- NVD is capped to a single 500-item page by default instead of trying to pull
the full CVE corpus in one run.
- MISP event collection now uses `metadata=1` with `limit=100` so the local
collector completes quickly instead of timing out on oversized event payloads.
- Large text feeds such as FireHOL, OpenPhish, Emerging Threats, and Abuse.ch
are capped to practical default volumes to keep Kafka and local storage under
control.
Training prep default behavior:
- `training/prepare_training_data.py` now uses
`data/processed/data_processing/curated_features` as the default model-training
input when available, and writes the default training dataset to
`data/processed/cti_training/default`.
- Use `--skip-processed` if you need raw batch-only training prep.
10. Run a smoke validation over the live pipeline state
make smoke
Expected:
- Kafka socket readiness passes.
- Critical Compose services report `running`.
- Required Kafka topics exist.
- Latest ingestion/process/training reports are present and parseable.
- Feature-store, processed, enriched, and model artifact paths exist and are
non-empty.
Strict topic offset check:
make smoke SMOKE_STRICT_TOPIC_OFFSETS=1
Strict operational storage check:
make smoke SMOKE_REQUIRE_OPERATIONAL_STORAGE=1
This stricter mode also requires:
- `storage/last_operational_export_report.json` to exist
- Elasticsearch to be reachable and reporting index counts
- MongoDB to be reachable and reporting entity/relationship counts
- the operational export report to show exported documents
## One-Switch Autopilot (System ON/OFF)
Use the dashboard control server to get a single ON/OFF switch for the full
runtime.
make dashboard
Then open:
- `http://localhost:8000` (Overview)
- `http://localhost:8000/feature_store.html` (Feature Store)
- `http://localhost:8085` (Kafka UI)
If port `8000` is already in use:
make dashboard DASHBOARD_PORT=8010
From the `System Switch` card:
- `Turn System On` starts a background automation loop that:
- brings Docker services up,
- auto-reconciles stopped services every 60s,
- ensures Kafka topics exist,
- runs online CTI ingestion on an interval,
- publishes local dataset records on an interval,
- publishes host system logs on an interval,
- runs network scans on an interval,
- materializes feature-store snapshots from `enriched.features`,
- runs data-processing (unsupervised/anomaly stage),
- runs feature extraction + enrichment stage,
- runs baseline model training stage,
- refreshes dashboard metrics on an interval.
- If Docker Compose v1 hits the known `ContainerConfig` recreate bug, the
runtime removes stale non-running stack containers and retries startup.
- `Turn System Off` stops that loop.
- Runtime scan defaults come from `.env`:
`NETWORK_SCAN_TARGETS`, `NETWORK_SCAN_PORTS`,
`NETWORK_SCAN_TIMEOUT`, `NETWORK_SCAN_MAX_HOSTS`.
- Optional API hardening: set `CTI_CONTROL_API_KEY` in `.env`.
The dashboard/API will then require `X-CTI-API-Key` (or `Authorization: Bearer ...`)
for ON/OFF calls.
API equivalents:
curl -H "X-CTI-API-Key: $CTI_CONTROL_API_KEY" -X POST http://127.0.0.1:8000/api/system/on
curl -H "X-CTI-API-Key: $CTI_CONTROL_API_KEY" -X POST http://127.0.0.1:8000/api/system/off
curl http://127.0.0.1:8000/api/system/status
CLI equivalents (without dashboard API calls):
make start
make status
make stop
# stop runtime loop only (keep Docker services up)
make stop STOP_STACK=0
DB host note:
- For the bundled Docker MISP DB, use `MISP_DB_HOST=misp-db` (not `localhost`).
- `localhost` only works if MISP and DB run on the same container/host namespace.
- For the bundled DB, use a non-root app user (default `MISP_DB_USER=misp`);
keep `MISP_DB_ROOT_PASSWORD` only for DB administration.
One-switch startup integration:
make start
make stop
Notes:
- `make start` launches `automation/system_runtime.py` in the background and
writes runtime logs to `runtime/system_runtime.log`.
- `make start` and `make stop` manage the core ingestion, streaming, Spark,
and operational storage export pipeline.
- `make stop STOP_STACK=0` stops only runtime and leaves the currently running
Docker services untouched.
By default Docker service ports are bound to `127.0.0.1` for safer local operation.
To expose externally, set `BIND_ADDR=0.0.0.0` in `.env`.
## Dependency Hardening (Pinned + Lock + CVE + SBOM)
This repository now uses pinned runtime dependencies and a hash-verified lock file.
- Direct pins: `requirements.in`
- Install entrypoint: `requirements.txt` (delegates to lock)
- Reproducible lock: `requirements.lock.txt`
- Security toolchain pins: `requirements-security.txt`
Run the hardening workflow:
make hardening-scan
Or run each step:
make deps-lock
make cve-scan
make sbom
Generated artifacts:
- CVE report: `security/pip_audit_report.json`
- CycloneDX SBOM: `security/sbom.cdx.json`
- Baseline summary: `security/DEPENDENCY_HARDENING.md`
CI gate:
- GitHub Actions workflow: `.github/workflows/security-gate.yml`
- Fails when:
- `requirements.lock.txt` drifts from `requirements.in`
- any vulnerability is present in `security/pip_audit_report.json`
- Local equivalent:
make ci-security-gate
## Workflow Checklist
- Define objectives and scope from the docs in `docs/`.
- Inventory data sources (local datasets, CTI feeds, OSINT, logs).
- Configure batch ingestion in `ingestion/config.yaml`.
- Run batch ingestion and validate output in `data/raw/cti_raw`.
- Start streaming ingestion and validate HDFS output in `/cti/raw`.
- Normalize schemas and labels with `schema_mapper.py` (batch path).
- Build preprocessing/feature pipelines under `preprocessing/` (planned).
- Train and evaluate models under `training/` and `evaluation/` (planned).
- Deploy inference into streaming/batch paths (planned).
- Add dashboards/alerts under `dashboards/` (planned).
## Lightweight Runbook (Commands + Expected Outputs)
1. Start the full system (background runtime + stack + collectors + processing)
make start
make status
Expected:
- Runtime loop is active in background (`runtime/system_runtime.pid` exists).
- Docker services show `running` via `make status`.
- Continuous ingestion and processing stages are scheduled automatically.
2. (Optional/manual) Start only the stack
docker-compose -f docker/docker-compose.yaml up -d
docker-compose -f docker/docker-compose.yaml ps
Expected:
- All services show `Up` including `kafka`, `zookeeper`, `filebeat`,
`fluent-bit`, `logstash`, `spark-master`, `spark-worker`, `spark-streaming`,
`namenode`, `datanode`, `kafka-ui`, `kafka-monitor-consumer`.
- File-based collectors read from:
- `logs/app`, `logs/suricata`, `logs/zeek` (Filebeat)
- `logs/firewall`, `logs/network` (Fluent Bit)
3. Create Kafka topics (idempotent)
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \
python ingestion/ensure_kafka_topics.py --config ingestion/required_topics.yaml
Expected:
- Output indicates each topic is created or already exists (`logs.suricata`,
`logs.host`, `logs.firewall`, `cti.taxii`, `cti.misp`, `osint.darkweb`,
`enriched.features`).
4. Verify topics
docker exec kafka kafka-topics --bootstrap-server localhost:9092 --list
Expected:
- Topic list includes all required architecture topics.
5. Run Python API collectors (TAXII/STIX/OpenCTI/MISP/abuse.ch/OSINT)
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \
python ingestion/online_ingestion_to_kafka.py --max-items 500
Expected:
- Public feeds publish to `cti.taxii`, `cti.misp`, and `osint.darkweb`.
- Credential-gated feeds are explicitly marked as `skipped` with
`missing_env:*` in `ingestion/last_online_ingestion_report.json`.
6. Publish host system logs from Python fallback collector
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \
python ingestion/system_logs_to_kafka.py --max-lines 500
Expected:
- Host logs are published to `logs.host`.
- Report is written to `ingestion/last_system_log_report.json`.
7. (Optional) Publish local configured datasets
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \
python ingestion/local_sources_to_kafka.py --max-rows-per-dataset 200
Expected:
- Dataset-derived records are published to configured topics (defaults include
`logs.host` for log-like datasets and `local-datasets` for others).
- Rows are randomized before `--max-rows-per-dataset` is applied so dataset
ordering does not bias the published benchmark sample.
- Report is written to `ingestion/last_local_stream_report.json`.
Recommended external dataset bootstrap:
make datasets-configure
make datasets-list
make datasets-download EXTERNAL_DATASET_DEFAULT_SET=1
Notes:
- Kaggle credentials are sourced from `kaggle.json` and copied into
`~/.kaggle/kaggle.json` and `~/.config/kaggle/kaggle.json`.
- Public Hugging Face datasets work without a token; set `HF_TOKEN` only for
gated/private repos.
- The external dataset manifest lives in `ingestion/external_datasets.yaml`.
- Newly downloaded benchmark datasets are wired into
`ingestion/config.yaml`, so the existing `make start` / `make collect` /
`ingestion/local_sources_to_kafka.py` path will pick them up automatically
once the files exist under `datasets/external/`.
- Current recommended predictor-training stack is:
`cse_cic_ids2018_kaggle`, `unsw_nb15_kaggle`, `ton_iot_network_kaggle`, and
`unsw_nb15_hf`.
- `LOCAL_STREAM_DATASETS` and `FEATURE_SOURCE_NAMES` default to that same
allowlist so the automated runtime focuses on the structured benchmark
datasets that the current predictor can actually learn from.
- `cyber_threat_intelligence_hf` remains available for future NLP/IOC
enrichment work, but it is not part of the default predictor-training set.
- `ember_2018_features_kaggle` remains optional because it is large and belongs
to the malware-analysis module, not the current network-focused predictor.
- `malwarebazaar_recent` is part of `ingestion/online_sources.yaml` and flows
through the normal online collector loop into `malware-intelligence`.
- This host is nearly full; the manifest therefore points
`cse_cic_ids2018_kaggle` at a smaller cleaned mirror instead of the larger raw
Kaggle dump.
Storage controls for low-disk hosts:
make datasets-cleanup EXTERNAL_DATASET_DEFAULT_SET=1
make space-report
make space-clean
Notes:
- `make datasets-cleanup` removes only extracted dataset archives; it keeps the
extracted files used by the pipeline.
- `make space-clean` compacts `.git`, prunes unused Docker images, removes
regenerable runtime artifacts, and clears Spark temp/checkpoint paths.
- `make space-clean-aggressive` also removes `data/processed/` so the next
pipeline run rebuilds processed features from scratch.
- The largest remaining local footprint is usually `venv/`. The Docker Compose
runtime does not require it, but some host-side Make targets still reference
it indirectly, so remove it only if you are intentionally standardizing on a
Docker-only workflow.
8. Smoke-test heavy parsing path (Logstash -> Kafka)
echo '<13>Jan 30 10:10:10 firewall-1 kernel: Deny TCP 1.2.3.4:443 -> 10.0.0.5:51515' | \
nc -w 1 127.0.0.1 5514
Expected:
- Message is parsed by Logstash and published to `logs.firewall`.
9. Validate streaming + enrichment flow (Spark)
docker exec kafka kafka-run-class kafka.tools.GetOffsetShell \
--broker-list localhost:9092 --topic enriched.features --time -1
Expected:
- `enriched.features` offsets increase as Spark enriches incoming events.
10. Validate the permanent monitoring consumer group + downstream handoff
docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group ${MONITOR_CONSUMER_GROUP:-cti-monitor-group}
Expected:
- Group exists and stays visible in Kafka UI (`Consumers` page).
- `CURRENT-OFFSET`/`LOG-END-OFFSET` values update as new events arrive.
- Runtime loop continues to push data to next stages (`feature-store`,
`data-processing`, `feature-enrichment`, and `model-training` runs update in
`runtime/system_runtime_state.json` and dashboard cards).
Optional (if raw sink enabled):
docker exec namenode hdfs dfs -ls /cti/raw
Expected:
- Raw streaming records are written under `/cti/raw`.
11. Run full collection + verification
KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \
python pipeline.py collect \
--start-collectors \
--max-items 500 \
--local-stream-max-rows 200 \
--verify-coverage
Expected:
- Required topics are initialized.
- Collector services are started.
- Online API collectors, Python host logs, and optional local datasets publish.
- Coverage report confirms ingestion health.
12. Generate the dataset schema + label report
python evaluation/verify_dataset_schema.py --datasets \
CICIDS_Enriched_2022_06_08 Attack_Dataset CloudWatch_Web_Attack \
Cybersecurity_Intrusion_Data Cybersecurity_Extraction Dataset_Cybersecurity_Michele
Expected:
- Report written to `evaluation/label_schema_report.md`.
13. Generate dashboard metrics + open the mini dashboard
python dashboards/collect_metrics.py
python dashboards/control_server.py --port 8000
Expected:
- `dashboards/metrics.json` is created.
- Open `http://localhost:8000` to view the overview dashboard.
- Open `http://localhost:8000/feature_store.html` to view feature-store
partitions, rows, and source-type distribution.
- Dashboard now includes Docker Compose service health (running/stopped by
service), collection runs, Kafka topic offsets, and feature-store snapshot
metrics.
- The `Streaming` card uses `enriched.features` Kafka offsets by default
(works even when raw HDFS sink is disabled).
Optional (include raw streaming path metrics; requires HDFS to be reachable):
python dashboards/collect_metrics.py --include-streaming
14. Stop the full system cleanly
make stop
Expected:
- Runtime background loop is stopped.
- Docker stack services are stopped.
## Common Runtime Issues
- Kafka UI `Consumers` page is empty:
- Kafka only shows active groups. If no long-running consumer is connected,
the list can be empty.
- This stack now includes `kafka-monitor-consumer` with
`cti-monitor-group` to keep a permanent visible group.
- Verify service + group:
- `docker-compose -f docker/docker-compose.yaml ps kafka-monitor-consumer`
- `docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group ${MONITOR_CONSUMER_GROUP:-cti-monitor-group}`
- Local dataset streaming step reports all datasets as `path_not_found`:
- Expected when `datasets/` files are not present on the host.
- In that case `Local Streaming Publish` can legitimately remain `0`.
- Check `ingestion/last_local_stream_report.json` for per-dataset status.
- Some API collectors show `missing_env:*` in online report:
- Expected for TAXII/OpenCTI/MISP/threatfeeds endpoints until credentials are
configured.
- Set required environment variables, then rerun
`ingestion/online_ingestion_to_kafka.py`.
- `spark-streaming` restarts every ~10s with `0 datanode(s) running`:
- This indicates NameNode/DataNode registration drift in the current Docker
state.
- Validate with:
- `docker logs --tail 200 spark-streaming`
- `docker logs --tail 200 datanode`
- If needed, recreate the HDFS services/volumes and then restart streaming.
- `make up` fails with `KeyError: 'ContainerConfig'` on `namenode`:
- This is a known `docker-compose` v1 recreate issue with stale HDFS
containers.
- The `Makefile` `up` target now auto-detects this error, removes stale
`namenode`/`datanode` containers, and retries automatically.
- `make metrics` fails with NumPy C-extension import errors:
- Cause is Python-version mismatch when forcing an incompatible
`venv/lib/pythonX.Y/site-packages` on `PYTHONPATH`.
- `Makefile` now resolves `PYTHONPATH` dynamically using the active Python
minor version and falls back to repo-only path if no compatible venv exists.
- Feature-store materialization fails with `Mkdirs failed` under `/opt/app`:
- Container user permissions on bind-mounted host paths can block Spark
parquet writes.
- `make feature-store` runs `spark-submit` as root inside
`spark-streaming` to ensure write access.
- Dashboard shows switch as `Unavailable`:
- The ON/OFF switch requires API endpoints from
`dashboards/control_server.py`.
- Start it with `make dashboard` (or
`python dashboards/control_server.py --port 8000`).
- Agent experiences KafkaConnectionError (Errno 111) connecting to Kafka:
- If the agent is trying to reach a non-loopback network interface, ensure the Kafka container's advertised listener is resolved to the host's primary IP instead of 127.0.0.1.
- Run the dynamic IP config updater: `python automation/update_network_config.py` or `make start` (which triggers it automatically) to auto-detect the active interface and update `.env` and `agent_config.json` dynamically.
- Restart the agent: `pkill -f agent/bda_cti_agent.py && nohup ./venv/bin/python agent/bda_cti_agent.py > runtime/bda-cti-agent.log 2>&1 &`
- Agent Bridge shows NodeNotReadyError or Network is unreachable:
- Usually occurs when a stale PID file blocks the ingestion bridge service or the Kafka broker is down.
- Re-run `rm -f runtime/*.pid && make soc-up` to clear stale states and trigger a clean startup.
## Fixes Log (Keep Updated)
- 2026-01-26: Batch ingestion now writes datasets incrementally with
`--datasets/--limit` flags to prevent memory exhaustion (`batch_ingest.py`).
- 2026-01-26: Online ingestion supports multiple source types and configurable
sources via `ingestion/online_sources.yaml`.
- 2026-01-26: Kafka bootstrap guidance clarified; use `KAFKA_BOOTSTRAP_SERVERS`
with `127.0.0.1:9092` on host or `kafka:29092` in containers.
- 2026-01-26: Added missing Spark dependencies to `requirements.txt`
(`pyspark`, `pyyaml`) to support batch ingestion.
- 2026-01-26: Added pipeline orchestration (`pipeline.py`) and training data
preparation (`training/prepare_training_data.py`).
- 2026-01-26: Standardized batch schemas (snake_case, common field mapping),
improved label resolution, and corrected dataset label config mismatches.
- 2026-01-26: Added dataset schema + label verification report
(`evaluation/verify_dataset_schema.py`).
- 2026-01-26: Updated architecture docs to match the attached closed-loop CTI
diagram and preserved Docker Compose deployment model.
- 2026-01-26: Added full-source collection flow into Kafka streaming layer via
`ingestion/local_sources_to_kafka.py`, expanded online feeds, and added
ingestion run reports for dashboard visibility.
- 2026-02-15: Added host system log publisher
(`ingestion/system_logs_to_kafka.py`) and integrated it into `pipeline.py`
so `logs.host` can be populated even when local dataset files are
unavailable.
- 2026-02-15: `ingestion/local_sources_to_kafka.py` now exits cleanly with a
report when configured local dataset paths do not exist, instead of failing
Spark startup.
- 2026-02-15: Fixed spark-streaming dependency cache setup in
`docker/docker-compose.yaml` by using container-local Ivy cache settings.
- 2026-02-15: Added stable HDFS container hostnames (`namenode`, `datanode`)
in `docker/docker-compose.yaml` to reduce NameNode/DataNode registration
drift across restarts.
- 2026-02-15: Added required Kafka topic bootstrap with
`ingestion/required_topics.yaml` and `ingestion/ensure_kafka_topics.py`.
- 2026-02-15: Added Filebeat, Logstash, and Fluent Bit configurations and
Compose services for ingestion-layer collection/parsing:
`docker/filebeat/filebeat.yml`,
`docker/logstash/pipeline/logstash.conf`,
`docker/fluent-bit/fluent-bit.conf`.
- 2026-02-15: Updated online collector coverage to include STIX/TAXII/OpenCTI,
MISP, abuse.ch, and threatfeeds-style sources in
`ingestion/online_sources.yaml`; missing credentials are now reported as
explicit skips in `ingestion/online_ingestion_to_kafka.py`.
- 2026-02-15: Streaming job now consumes required architecture topics and writes
enriched feature events to `enriched.features` in `streaming_ingest.py`.
- 2026-02-15: Added raw sink toggles to `streaming_ingest.py`
(`RAW_SINK_ENABLED`, `RAW_OUTPUT_PATH`, `RAW_CHECKPOINT_PATH`) and defaulted
Compose runtime to prioritize continuous enrichment output.
- 2026-02-15: Added ingestion coverage checker
(`ingestion/verify_ingestion_stack.py`) and pipeline hooks for
collector startup, topic initialization, and post-run verification.
- 2026-02-15: Added `.env.example` with all required collector and streaming
environment keys.
- 2026-02-15: Added `Makefile` one-command orchestration (`make collect`) and
helper targets (`make topics`, `make verify`, `make metrics`).
- 2026-02-15: Hardened `make up` to auto-recover from docker-compose v1
`ContainerConfig` errors by removing stale HDFS containers and retrying
startup.
- 2026-02-15: Updated `Makefile` Python path resolution to detect compatible
`venv` site-packages by Python minor version, preventing NumPy ABI mismatch
errors in metrics and verification commands.
- 2026-02-15: Added incremental NVD collection target (`make nvd-daily`) with
configurable `NVD_SINCE`/`NVD_MAX_ITEMS` for daily updates.
- 2026-02-15: Added PySpark feature-store materialization job
(`training/materialize_feature_store.py`) and `make feature-store` to convert
`enriched.features` Kafka events into partitioned Parquet datasets.
- 2026-02-15: Added feature-store dashboard page
(`dashboards/feature_store.html`) and metrics collection support in
`dashboards/collect_metrics.py` (including Spark-in-container fallback via
`dashboards/feature_store_metrics_job.py`).
- 2026-02-15: Added one-switch dashboard control API
(`dashboards/control_server.py`) with background runtime loop
(`automation/system_runtime.py`) for interval-based online CTI ingestion and
network scan collection.
- 2026-02-15: Added consolidated feature record in `docs/FEATURE_RECORD.md`.
- 2026-02-17: Added data-processing stage script
(`training/data_processing_pipeline.py`) and `make data-processing` target to
produce curated feature datasets plus anomaly-scored slices from the feature
store.
- 2026-02-17: Tuned data-processing anomaly sensitivity support via
`PROCESS_ANOMALY_QUANTILE` and added dashboard cards/metrics for processed
feature volume and anomaly slices.
- 2026-02-17: Updated training prep defaults so processed curated features are
the primary model-training input (`data/processed/cti_training/default`).
- 2026-02-17: Added feature extraction and enrichment stage
(`training/feature_extraction_enrichment.py`) with `make feature-enrichment`.
- 2026-02-17: Added `make start` / `make stop` / `make status` for one-command
background runtime control from CLI.
- 2026-02-17: Extended the one-switch runtime loop to continuously run
feature-store materialization, data-processing, and feature-enrichment stages
in addition to ingestion and scanning.
- 2026-02-18: Added a persistent Kafka monitoring consumer service
(`kafka-monitor-consumer`) with configurable group/topic
(`MONITOR_CONSUMER_GROUP`, `MONITOR_CONSUMER_TOPIC`) so a stable consumer
group stays visible in Kafka UI while downstream processing stages continue.
- 2026-05-18: Added real-time log monitoring for the UFW host firewall (`/var/log/ufw.log`). The agent extracts key telemetry fields (SRC, DST, PROTO, SPT, DPT) from blocked packets and publishes them directly to Kafka.
- 2026-05-18: Dynamic Host IP Auto-Discovery: Integrated network interface detection in `automation/update_network_config.py` to identify the host's primary IP (e.g. `wlo1`), dynamically writing it to configurations (`agent/agent_config.json` and `.env`) to resolve looped interface connection failures.
- 2026-05-18: Embedded Asynchronous `NotificationService` within the FastAPI backend lifespan, implementing a highly resilient, STARTTLS-secured SMTP alerting pipeline (Gmail-backed) for direct analyst notifications of incoming alarms and operations.
- 2026-05-18: Integrated Audit Log Hooking: Every system action (logins, isolation triggers, training invocations) publishes immediately to the `audit` topic and dispatches real-time email audits.
- 2026-05-18: Integrated the Real-Time Ingestion Bridge (`ingestion/agent_bridge.py`) which acts as the operational gateway to feed Kafka streams to Elasticsearch indices (`cti-alerts`, `logs.host`) and MongoDB, executing inline ML-based threat scoring for instant tactical alerts.
- 2026-05-18: Modernized SOC UI Dashboard Shell (`frontend/` / `endguardui` React app): Integrated live agent heartbeat displays, real-time alert grids, manual playbooks, audit views, and ML metrics (F1, Precision, Recall, Accuracy, speed, drift, registry).
标签:Kafka, SonarQube插件, Spark, 内容过滤, 大数据分析, 威胁情报, 安全运营中心, 开发者工具, 数据管道, 测试用例, 版权保护, 网络安全, 网络映射, 请求拦截, 软件工程, 软件成分分析, 逆向工具, 隐私保护