z3r0-c001995/Big-Data-Analytics-Framework-for-Predictive-Cyber-Threat-Intelligence

GitHub: z3r0-c001995/Big-Data-Analytics-Framework-for-Predictive-Cyber-Threat-Intelligence

基于 Spark、Kafka 和 HDFS 构建的预测性网络威胁情报大数据分析框架,覆盖从多源数据采集、流式处理、ML 模型训练到 SOC 仪表盘与 SOAR 自动响应的全链路。

Stars: 0 | Forks: 0

# System Architecture and Development Guide This repository implements a predictive CTI (cyber threat intelligence) data pipeline using big data analytics. The system design and development approach are based on the project report and proposal in `docs/`, and the runtime is centered on Spark, Kafka, and HDFS. ## Reference Docs (Repository-Verified) Present in this repository snapshot: - `docs/FEATURE_RECORD.md` - change log of implemented ingestion, processing, runtime, and hardening features. - `docs/REPO_AUDIT_REPORT.md` - grounded implementation audit of the current repository state. - `docs/IMPLEMENTATION_PLAN.md` - phased roadmap for closing the remaining architecture gaps. - `docs/FRONTEND_AUDIT_REPORT.md` - audit of the original dashboard frontend and the upgrade path into the SOC application shell. - `docs/SOC_UI_ARCHITECTURE.md` - route map, component inventory, and UI integration design for the SOC interface. - `img/Enterprise Big Data Analytics Framework for Predictive CTI (SOC-Grade).jpg` - architecture image currently shipped with the repository. Referenced by the README but not present in this repository snapshot: - `docs/Final Year Project.pdf` - `docs/final year project proposal.docx` - `docs/BDA Framework for CTI Prediction-2025-11-06-120442.png` - `docs/BDA Framework for CTI Prediction-2025-11-06-125406.png` - `docs/Untitled diagram-2025-11-06-115607.png` Treat the missing proposal/FYP assets as external references unless they are added back into `docs/`. ## Architecture Summary (Attached Diagram Alignment) The architecture in the attached diagram is a closed-loop predictive CTI system. This repository now documents and implements the same flow while keeping Docker as the runtime backbone. 1. Data sources - Threat intel, OSINT/dark web, IDS/SIEM, network/app/firewall/host logs. - Standards: STIX/TAXII and CTI feed APIs. - Current files: `datasets/`, `ingestion/online_sources.yaml`, `ingestion/online_ingestion_to_kafka.py`. 2. Ingestion and streaming layer - Kafka is the streaming backbone; collectors publish events and CTI objects. - Collection tools: - Filebeat for host/app logs and Suricata/Zeek file outputs. - Python API collectors for TAXII/STIX/OpenCTI/MISP/abuse.ch/threat feeds. - Fluent Bit for lightweight firewall/network file forwarding. - Logstash only for heavy parsing (network/firewall syslog). - Current files: `streaming_ingest.py`, `batch_ingest.py`, `ingestion/config.yaml`, `ingestion/online_sources.yaml`, `pipeline.py`. - Docker services: `zookeeper`, `kafka`, `filebeat`, `fluent-bit`, `logstash`, `spark-streaming`, `kafka-ui`, `kafka-monitor-consumer`. 3. Big data processing layer - Spark Streaming + PySpark ETL for normalization, windowing, and cleaning. - Standardized schema + label resolution applied for cross-source consistency. - Current files: `schema_mapper.py`, `batch_ingest.py`, `streaming_ingest.py`. - Docker services: `spark-master`, `spark-worker`, `spark-streaming`. 4. Detection, feature extraction, and enrichment - Feature engineering and anomaly-oriented analysis feed training datasets. - Current implementation path: `training/prepare_training_data.py`, `evaluation/verify_dataset_schema.py`. - Planned additions: unsupervised detection jobs and enrichment modules. 5. Distributed storage layer - Raw/feature data in HDFS + Parquet plus operational search/entity stores. - Current storage: local `data/raw/cti_raw`, HDFS `/cti/raw`, Elasticsearch for events/alerts/predictions, MongoDB for entities/relationships. - Operational export path: `storage/export_operational_data.py`. - Docker services: `namenode`, `datanode`, `elasticsearch`, `mongodb`. 6. Model training, registry, and serving - Training pipeline and model registry are implemented. - The operational export path now supports generic joblib-backed scikit-learn scorers, so non-linear models such as gradient boosted trees can be promoted as the active production scorer. - Model-serving is active and exposes SOC-ready hybrid inference APIs: - `POST /predict` (model_kind=`network|malware`) - `POST /anomaly` - `POST /forecast` - versioned routes: `/v1/predict/network`, `/v1/predict/malware`, `/v1/anomaly`, `/v1/forecast` - Current files: `training/train_baseline_model.py`, `registry/`, `models/`, `storage/`, `serving/app.py`. - Current services: `postgres`, `mlflow`. - Current runtime behavior: `make model-train` logs the selected export model into MLflow and registers it under `cti-active-export-model`. - Real-time inference path: Spark enriched events -> operational export -> model-serving -> Elasticsearch/MongoDB -> dashboard/email/SOAR. 7. Alerting, SOAR, analyst feedback loop - Diagram flow: alerts + dashboards -> SOAR -> analyst console -> retraining feedback into feature store/training. - Current state: SOC UI, SOAR sync backend, and dashboard SOAR visibility are implemented. - Current files: `soar/`, `docker/soar/`, `dashboards/index.html`, `dashboards/app.js`, `dashboards/control_server.py`, `dashboards/collect_metrics.py`. - Current behavior: - `make soar-up`, `make soar-down`, `make soar-status`, `make soar-sync` - `make soar-bootstrap` provisions TheHive/Cortex service accounts and writes the resulting API keys back into `.env` - `make soar-configure-analyzers` curates and enables the Cortex auto-run worker set for the `bda-cti` org - TheHive/Cortex health is surfaced in the dashboard snapshot - synced alert / case / analyzer job summaries are exposed to the SOAR page - manual `Run analyzers` / `Create case` actions are exposed from Alerts, Incidents, and Analyst Console via the dashboard control API - automatic analyzer execution now covers both newly-synced alerts and already-synced alerts, while still avoiding duplicate auto-runs for the same alert/analyzer/observable tuple - default local binds use `19000` for TheHive and `19001` for Cortex to avoid clashes with unrelated services already common on `9000/9001` - the default curated analyzer set is: `IP-API`, `GoogleDNS_resolve`, `Urlscan.io_Search`, `Crt_sh_Transparency_Logs`, `CIRCLHashlookup`, `CIRCLVulnerabilityLookup` - Cortex Docker jobs now use the host-mapped job directory rather than the in-container path, which is required for analyzer containers to receive their input payloads correctly - Operator flow: 1. `make soar-up` 2. `make soar-bootstrap` 3. `make soar-sync` 8. Real-Time Host Logging, Agent and Email Alerting Framework - Real-time Log Monitoring: The host-level agent `agent/bda_cti_agent.py` tails `/var/log/ufw.log` in real-time, extracts key fields from `[UFW BLOCK]` events (such as SRC, DST, PROTO, SPT, DPT), and publishes them directly into Kafka. - Network Interface Automation: The script `automation/update_network_config.py` dynamically auto-detects the host's primary active network interface (e.g., `wlo1`), updating the platform public IP globally across container `.env` files and `agent/agent_config.json` to prevent loopback (`127.0.0.1`) connection issues. - Real-Time Gateway Bridge (`ingestion/agent_bridge.py`): Connects Kafka telemetry topics directly to Elasticsearch and MongoDB. It aggregates and flushes telemetry (from `alerts`, `logs.host`), applies real-time inline Machine Learning threat scoring for critical security alarms (e.g. reverse shells, malware), and evaluates Playbook Engine triggers in the background. - Resilient Email Alerting Pipeline: An asynchronous `NotificationService` is embedded in the FastAPI backend lifecycle (`backend/main.py`), consuming `alerts` and `audit` topics from Kafka to send immediate alerts via SMTP (with STARTTLS on port 587) to configured analyst emails. - Complete System Auditing (`backend/core/audit.py`): Hooks system actions (logins, isolated hosts, training triggers) directly into the Kafka pipeline, dispatching them synchronously to the alerting system and analyst emails. ## Docker Deployment Model (Keep As-Is) This project intentionally keeps Docker Compose as the operational core. - Existing compose file remains the source of truth: `docker/docker-compose.yaml`. - Current always-on stack: Kafka/Zookeeper, HDFS (NameNode/DataNode), Spark cluster (master/worker), streaming job container, Kafka UI, Elasticsearch, MongoDB, and a persistent monitoring consumer group container. - `spark-streaming` now uses container-local Ivy cache (`/tmp/.ivy2`) to avoid host bind-permission issues when downloading Spark Kafka connector packages. - Default `spark-streaming` Compose env keeps raw HDFS sink disabled (`RAW_SINK_ENABLED=false`) so feature enrichment stays online even if HDFS is temporarily unhealthy. Re-enable raw HDFS writes by setting `RAW_SINK_ENABLED=true` and `RAW_OUTPUT_PATH`/`RAW_CHECKPOINT_PATH`. - New architecture blocks still pending after this tranche: model serving, real-time inference, SOAR, analyst workflow. - Runtime rule: - Host scripts use `KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092` - Container-to-container traffic uses `kafka:29092` ## Development Guide (Derived from the Proposal and Report) The docs recommend a hybrid workflow: CRISP-DM for data science and Agile-Scrum for iterative system delivery. Use this as the default development path. 1. Business understanding - Target proactive detection and early warning, not just reactive alerts. - Keep objectives aligned with the PDF: predictive CTI and incident response. 2. Data understanding - Inventory data sources (datasets, logs, CTI feeds). - Validate schema consistency before scaling ingestion. 3. Data preparation - Normalize columns with `schema_mapper.py`. - Build repeatable ingestion configs in `ingestion/config.yaml`. 4. Modeling - Start with supervised baselines (Random Forest, XGBoost), then LSTM for temporal signals. - Track experiments with MLflow and store artifacts under `models/`. 5. Evaluation - Use metrics called out in the report: accuracy, precision, recall, and mean time to detect (MTTD). - Keep evaluation outputs under `evaluation/`. 6. Deployment - Integrate batch and streaming pipelines into the Docker stack. - Plan for real-time inference in streaming paths before alerting. ## Key Components and Files - Kafka topics (required architecture): `logs.suricata`, `logs.host`, `logs.firewall`, `cti.taxii`, `cti.misp`, `osint.darkweb`, `enriched.features`. - Kafka topics config/bootstrap: `ingestion/required_topics.yaml`, `ingestion/ensure_kafka_topics.py` - Batch ingestion: `batch_ingest.py` - Streaming ingestion: `streaming_ingest.py` - OSINT ingestion to Kafka: `ingestion/online_ingestion_to_kafka.py` - Network scan collector to Kafka: `ingestion/network_scan_to_kafka.py` - Local dataset-to-Kafka publisher: `ingestion/local_sources_to_kafka.py` - Host system log-to-Kafka publisher: `ingestion/system_logs_to_kafka.py` - Ingestion coverage verification: `ingestion/verify_ingestion_stack.py` - Online sources config: `ingestion/online_sources.yaml` - Dataset registry: `ingestion/config.yaml` - Schema normalization: `schema_mapper.py` - Pipeline orchestration: `pipeline.py` - Pipeline config runner: `run_pipeline.py`, `pipeline_config.yaml` - Training data preparation: `training/prepare_training_data.py` - Kafka feature-store materialization: `training/materialize_feature_store.py` - Data-processing + anomaly scoring: `training/data_processing_pipeline.py` - Feature extraction + enrichment: `training/feature_extraction_enrichment.py` - Baseline model training: `training/train_baseline_model.py` - Environment template: `.env.example` - One-command wrapper: `Makefile` (`make collect`, `make nvd-daily`, `make feature-store`, `make data-processing`, `make feature-enrichment`, `make model-train`, `make operational-export`, `make smoke`, `make start`, `make stop`, `make status`) - Filebeat config: `docker/filebeat/filebeat.yml` - Logstash pipeline: `docker/logstash/pipeline/logstash.conf` - Fluent Bit config: `docker/fluent-bit/fluent-bit.conf` - Sample collector input directories: `logs/app`, `logs/suricata`, `logs/zeek`, `logs/firewall`, `logs/network` - Feature record log: `docs/FEATURE_RECORD.md` - Containerized runtime: `docker/docker-compose.yaml` - Optional local MISP stack: `docker/misp/docker-compose.yaml` - Kafka web UI (Kafka UI): `http://localhost:8085` - Permanent Kafka monitoring consumer group: `MONITOR_CONSUMER_GROUP` (default `cti-monitor-group`) - Feature-store dashboard page: `dashboards/feature_store.html` - Feature-store Spark metrics helper: `dashboards/feature_store_metrics_job.py` - Dashboard control server: `dashboards/control_server.py` - One-switch runtime loop: `automation/system_runtime.py` - Real-time Host Agent: `agent/bda_cti_agent.py` (tails local log files, UFW blocks, system actions) - Real-time Gateway Bridge: `ingestion/agent_bridge.py` (indexes log events, applies inline ML scoring, triggers playbooks) - Network IP Auto-configuration: `automation/update_network_config.py` (automatically discovers public interface IP) - Asynchronous Notification service: `backend/services/notification_service.py` (dispatches real-time email alerts) - Enterprise SOC UI Shell: `frontend/` / `endguardui` (modern dashboard with views for threat models, hunting playbooks, audits) - Operational storage exporter: `storage/export_operational_data.py` - Elasticsearch mappings: `storage/elasticsearch/cti-events.mapping.json`, `storage/elasticsearch/cti-alerts.mapping.json`, `storage/elasticsearch/cti-predictions.mapping.json` ## Big Data Analytics Workflow (Docs Alignment) The proposal outlines a full analytics workflow that should guide future builds: data acquisition, ingestion and preprocessing, storage and management, exploratory analysis, visualization and alerting, feedback and continuous improvement, and governance and security. ## Environment + One-Command Collection 1. Create local environment file cp .env.example .env 2. Set required keys in `.env` for private integrations (TAXII/OpenCTI/MISP/ threatfeeds). Public feeds run without these keys. Monitoring consumer defaults (optional override): - `MONITOR_CONSUMER_TOPIC=enriched.features` - `MONITOR_CONSUMER_GROUP=cti-monitor-group` Local MISP Docker defaults (optional): - `MISP_HOST_PORT=8081` - `MISP_BASEURL=http://127.0.0.1:8081` - `MISP_DATA_ROOT=/home/mr_robot/misp-data` (outside repo) OpenCTI-specific note: - `OPENCTI_URL` must be the GraphQL endpoint, not just the UI base URL. - If your UI is `http://localhost:8080`, set `OPENCTI_URL=http://localhost:8080/graphql`. - If a collector runs inside Docker on the same network as OpenCTI, use `OPENCTI_URL=http://opencti:8080/graphql`. - Get `OPENCTI_TOKEN` from your OpenCTI user profile API key. - Quick check: curl -sS "$OPENCTI_URL" \ -H "Authorization: Bearer $OPENCTI_TOKEN" \ -H "Content-Type: application/json" \ -d '{"query":"{ me { id name } }"}' Expected: - JSON response with `data.me` present (token + URL are valid). 3. Run end-to-end ingestion and streaming collection in one command make collect Notes: - `make collect` loads `.env` automatically if present. - Default behavior skips heavy batch ingestion (`SKIP_BATCH=1`). - To include batch ingestion: make collect SKIP_BATCH=0 4. Run incremental NVD ingestion (daily window) make nvd-daily Notes: - Default `NVD_SINCE` is UTC now minus 24 hours. - Override window and volume as needed: make nvd-daily NVD_SINCE=2026-02-15T00:00:00Z NVD_MAX_ITEMS=5000 5. Materialize Kafka enriched events into a Parquet feature store make feature-store FEATURE_LIMIT=5000 FEATURE_MODE=overwrite Expected: - Feature rows are written to `data/processed/feature_store/enriched_features` partitioned by `event_date` and `source_type`. 6. Run the data-processing pipeline (feature engineering + anomaly scoring) make data-processing Expected: - Curated processed features are written to `data/processed/data_processing/curated_features`. - High-anomaly slices are written to `data/processed/data_processing/anomalies`. - Run report is written to `training/last_data_processing_report.json`. 7. Run feature extraction and enrichment for model-ready features make feature-enrichment Expected: - Enriched event-level features: `data/processed/feature_enrichment/event_features` - Session-level feature aggregates: `data/processed/feature_enrichment/session_features` - Run report: `training/last_feature_enrichment_report.json` 8. Train baseline model from enriched event features make model-train Expected: - Active export model metadata is written to `models/baseline_logistic_regression_model/metadata.json`. - The active production scorer may now be a non-linear sklearn model such as `baseline_gradient_boosted_tree_model`; the output directory name is retained for backward compatibility with the existing runtime and Makefile wiring. - Logistic regression is retained as the rollback scorer and is recorded under `fallback_model` inside the active metadata/report payloads. - Candidate model artifacts are written under `models/baseline_logistic_regression_model/candidates/`. - Training report is written to `training/last_model_training_report.json`. - Markdown comparison report is written to `training/last_model_training_report.md`. - MLflow run metadata is embedded into `training/last_model_training_report.json["mlflow"]`. - The export model is registered in MLflow as `cti-active-export-model`. - Dashboard `Model Training` card shows rows, active model, recommended model, and active-model metrics. - Default active-model promotion is now `governed_auto`, not a hard pin to gradient boosted trees: - candidate models are evaluated on source-aware validation and out-of-source test splits - the promoted active scorer must satisfy minimum validation F1, test F1, and test recall gates - logistic regression remains the explicit rollback scorer Optional registry-only startup: make registry-up Registry endpoints: - MLflow UI/API: `http://127.0.0.1:5001` - PostgreSQL backend: internal Compose service `postgres` 9. Export operational events, alerts, predictions, entities, and relationships make operational-export Expected: - Elasticsearch event index receives searchable processed events. - Elasticsearch alert index receives anomaly-driven alert documents. - Elasticsearch prediction index receives model-scored prediction documents. - The prediction exporter loads the active production scorer from `models/baseline_logistic_regression_model/metadata.json` and `model.joblib`, with automatic fallback to the tuned logistic model if the promoted scorer artifact is missing or invalid. - Batch Spark jobs now execute inside `spark-worker` by default, while `spark-streaming` stays dedicated to the long-running stream ingestion job. - Batch executors receive `PYTHONPATH=/opt/app` so repo modules such as `storage.model_scoring` are importable during runtime scoring UDF execution. - MongoDB receives entity and relationship documents for CTI browsing and downstream graph-style workflows. - Run report is written to `storage/last_operational_export_report.json`. - The export report now includes per-index `document_stats` showing: - input rows - unique documents - duplicates collapsed before upsert - Elasticsearch index counts are cumulative over time; use the export report to inspect the current run's unique document totals. - `make model-train` now trains a stratified candidate suite (`logistic_regression`, `random_forest`, `gradient_boosted_tree`, `svm`) with cross-validation, random oversampling, PR-AUC reporting, and out-of-fold threshold optimization. - The supervised label is now `is_malicious`, derived upstream from `source_name` + `raw_label`. `is_anomaly` remains in the operational feature set, but it is no longer the default predictor target. - Leakage-prone anomaly-derived inputs were removed from the supervised training path; the default model feature set no longer trains directly on `anomaly_score` or `anomaly_density_1h`. - Default benchmark holdouts are source-aware: - train: `CSE_CIC_IDS2018_Kaggle` - validation: `UNSW_NB15_Kaggle`, `UNSW_NB15_HF` - test: `TON_IOT_Network_Kaggle` - Active scorer promotion is controlled explicitly through: - `MODEL_TRAIN_ACTIVE_MODEL` (default: `auto`) - `MODEL_TRAIN_FALLBACK_MODEL` (default: `logistic_regression`) - `MODEL_TRAIN_PROMOTION_MIN_VALIDATION_F1` (default: `0.6`) - `MODEL_TRAIN_PROMOTION_MIN_TEST_F1` (default: `0.55`) - `MODEL_TRAIN_PROMOTION_MIN_TEST_RECALL` (default: `0.45`) - Batch execution container defaults: - `SPARK_BATCH_CONTAINER=spark-worker` - `SPARK_BATCH_PYTHONPATH=/opt/app` - The generic operational scorer currently supports: - logistic regression - random forest - gradient boosted tree - support vector machine - `training/last_model_training_report.json` now includes a `data_diagnosis` block with positive prevalence and class-imbalance ratio so misleading accuracy is easier to spot during review. - `training/last_model_training_report.json` also includes `split_details` describing row counts, label balance, and source composition for the source-aware train / validation / test holdouts. - `training/train_baseline_model.py` already contains optional XGBoost and LightGBM hooks; enable them through `MODEL_TRAIN_MODELS=...` once those dependencies are present in the runtime image. - The latest governed source-aware run promotes `baseline_random_forest_model` as the active runtime scorer and keeps `baseline_logistic_regression_model` as fallback because random forest passes the automatic promotion gates while logistic regression fails the out-of-source generalization gates. Separate malware branch: - `make malware-intel-snapshot` pulls a MalwareBazaar metadata snapshot into `data/processed/malware_branch/`. - `make malware-prepare` prepares EMBER + MalwareBazaar data into a dedicated malware parquet output without mixing it into the network predictor path. - MalwareBazaar now requires `MALWAREBAZAAR_AUTH_KEY`; without it, snapshot collection is skipped cleanly instead of failing with a 401. - `ingestion/fetch_malwarebazaar_snapshot.py` and `ingestion/online_ingestion_to_kafka.py` now auto-load `.env` when run directly, so local credential-backed collectors no longer depend on `make` for environment export. Online ingestion defaults are now bounded for operational safety: - NVD is capped to a single 500-item page by default instead of trying to pull the full CVE corpus in one run. - MISP event collection now uses `metadata=1` with `limit=100` so the local collector completes quickly instead of timing out on oversized event payloads. - Large text feeds such as FireHOL, OpenPhish, Emerging Threats, and Abuse.ch are capped to practical default volumes to keep Kafka and local storage under control. Training prep default behavior: - `training/prepare_training_data.py` now uses `data/processed/data_processing/curated_features` as the default model-training input when available, and writes the default training dataset to `data/processed/cti_training/default`. - Use `--skip-processed` if you need raw batch-only training prep. 10. Run a smoke validation over the live pipeline state make smoke Expected: - Kafka socket readiness passes. - Critical Compose services report `running`. - Required Kafka topics exist. - Latest ingestion/process/training reports are present and parseable. - Feature-store, processed, enriched, and model artifact paths exist and are non-empty. Strict topic offset check: make smoke SMOKE_STRICT_TOPIC_OFFSETS=1 Strict operational storage check: make smoke SMOKE_REQUIRE_OPERATIONAL_STORAGE=1 This stricter mode also requires: - `storage/last_operational_export_report.json` to exist - Elasticsearch to be reachable and reporting index counts - MongoDB to be reachable and reporting entity/relationship counts - the operational export report to show exported documents ## One-Switch Autopilot (System ON/OFF) Use the dashboard control server to get a single ON/OFF switch for the full runtime. make dashboard Then open: - `http://localhost:8000` (Overview) - `http://localhost:8000/feature_store.html` (Feature Store) - `http://localhost:8085` (Kafka UI) If port `8000` is already in use: make dashboard DASHBOARD_PORT=8010 From the `System Switch` card: - `Turn System On` starts a background automation loop that: - brings Docker services up, - auto-reconciles stopped services every 60s, - ensures Kafka topics exist, - runs online CTI ingestion on an interval, - publishes local dataset records on an interval, - publishes host system logs on an interval, - runs network scans on an interval, - materializes feature-store snapshots from `enriched.features`, - runs data-processing (unsupervised/anomaly stage), - runs feature extraction + enrichment stage, - runs baseline model training stage, - refreshes dashboard metrics on an interval. - If Docker Compose v1 hits the known `ContainerConfig` recreate bug, the runtime removes stale non-running stack containers and retries startup. - `Turn System Off` stops that loop. - Runtime scan defaults come from `.env`: `NETWORK_SCAN_TARGETS`, `NETWORK_SCAN_PORTS`, `NETWORK_SCAN_TIMEOUT`, `NETWORK_SCAN_MAX_HOSTS`. - Optional API hardening: set `CTI_CONTROL_API_KEY` in `.env`. The dashboard/API will then require `X-CTI-API-Key` (or `Authorization: Bearer ...`) for ON/OFF calls. API equivalents: curl -H "X-CTI-API-Key: $CTI_CONTROL_API_KEY" -X POST http://127.0.0.1:8000/api/system/on curl -H "X-CTI-API-Key: $CTI_CONTROL_API_KEY" -X POST http://127.0.0.1:8000/api/system/off curl http://127.0.0.1:8000/api/system/status CLI equivalents (without dashboard API calls): make start make status make stop # stop runtime loop only (keep Docker services up) make stop STOP_STACK=0 DB host note: - For the bundled Docker MISP DB, use `MISP_DB_HOST=misp-db` (not `localhost`). - `localhost` only works if MISP and DB run on the same container/host namespace. - For the bundled DB, use a non-root app user (default `MISP_DB_USER=misp`); keep `MISP_DB_ROOT_PASSWORD` only for DB administration. One-switch startup integration: make start make stop Notes: - `make start` launches `automation/system_runtime.py` in the background and writes runtime logs to `runtime/system_runtime.log`. - `make start` and `make stop` manage the core ingestion, streaming, Spark, and operational storage export pipeline. - `make stop STOP_STACK=0` stops only runtime and leaves the currently running Docker services untouched. By default Docker service ports are bound to `127.0.0.1` for safer local operation. To expose externally, set `BIND_ADDR=0.0.0.0` in `.env`. ## Dependency Hardening (Pinned + Lock + CVE + SBOM) This repository now uses pinned runtime dependencies and a hash-verified lock file. - Direct pins: `requirements.in` - Install entrypoint: `requirements.txt` (delegates to lock) - Reproducible lock: `requirements.lock.txt` - Security toolchain pins: `requirements-security.txt` Run the hardening workflow: make hardening-scan Or run each step: make deps-lock make cve-scan make sbom Generated artifacts: - CVE report: `security/pip_audit_report.json` - CycloneDX SBOM: `security/sbom.cdx.json` - Baseline summary: `security/DEPENDENCY_HARDENING.md` CI gate: - GitHub Actions workflow: `.github/workflows/security-gate.yml` - Fails when: - `requirements.lock.txt` drifts from `requirements.in` - any vulnerability is present in `security/pip_audit_report.json` - Local equivalent: make ci-security-gate ## Workflow Checklist - Define objectives and scope from the docs in `docs/`. - Inventory data sources (local datasets, CTI feeds, OSINT, logs). - Configure batch ingestion in `ingestion/config.yaml`. - Run batch ingestion and validate output in `data/raw/cti_raw`. - Start streaming ingestion and validate HDFS output in `/cti/raw`. - Normalize schemas and labels with `schema_mapper.py` (batch path). - Build preprocessing/feature pipelines under `preprocessing/` (planned). - Train and evaluate models under `training/` and `evaluation/` (planned). - Deploy inference into streaming/batch paths (planned). - Add dashboards/alerts under `dashboards/` (planned). ## Lightweight Runbook (Commands + Expected Outputs) 1. Start the full system (background runtime + stack + collectors + processing) make start make status Expected: - Runtime loop is active in background (`runtime/system_runtime.pid` exists). - Docker services show `running` via `make status`. - Continuous ingestion and processing stages are scheduled automatically. 2. (Optional/manual) Start only the stack docker-compose -f docker/docker-compose.yaml up -d docker-compose -f docker/docker-compose.yaml ps Expected: - All services show `Up` including `kafka`, `zookeeper`, `filebeat`, `fluent-bit`, `logstash`, `spark-master`, `spark-worker`, `spark-streaming`, `namenode`, `datanode`, `kafka-ui`, `kafka-monitor-consumer`. - File-based collectors read from: - `logs/app`, `logs/suricata`, `logs/zeek` (Filebeat) - `logs/firewall`, `logs/network` (Fluent Bit) 3. Create Kafka topics (idempotent) KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \ python ingestion/ensure_kafka_topics.py --config ingestion/required_topics.yaml Expected: - Output indicates each topic is created or already exists (`logs.suricata`, `logs.host`, `logs.firewall`, `cti.taxii`, `cti.misp`, `osint.darkweb`, `enriched.features`). 4. Verify topics docker exec kafka kafka-topics --bootstrap-server localhost:9092 --list Expected: - Topic list includes all required architecture topics. 5. Run Python API collectors (TAXII/STIX/OpenCTI/MISP/abuse.ch/OSINT) KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \ python ingestion/online_ingestion_to_kafka.py --max-items 500 Expected: - Public feeds publish to `cti.taxii`, `cti.misp`, and `osint.darkweb`. - Credential-gated feeds are explicitly marked as `skipped` with `missing_env:*` in `ingestion/last_online_ingestion_report.json`. 6. Publish host system logs from Python fallback collector KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \ python ingestion/system_logs_to_kafka.py --max-lines 500 Expected: - Host logs are published to `logs.host`. - Report is written to `ingestion/last_system_log_report.json`. 7. (Optional) Publish local configured datasets KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \ python ingestion/local_sources_to_kafka.py --max-rows-per-dataset 200 Expected: - Dataset-derived records are published to configured topics (defaults include `logs.host` for log-like datasets and `local-datasets` for others). - Rows are randomized before `--max-rows-per-dataset` is applied so dataset ordering does not bias the published benchmark sample. - Report is written to `ingestion/last_local_stream_report.json`. Recommended external dataset bootstrap: make datasets-configure make datasets-list make datasets-download EXTERNAL_DATASET_DEFAULT_SET=1 Notes: - Kaggle credentials are sourced from `kaggle.json` and copied into `~/.kaggle/kaggle.json` and `~/.config/kaggle/kaggle.json`. - Public Hugging Face datasets work without a token; set `HF_TOKEN` only for gated/private repos. - The external dataset manifest lives in `ingestion/external_datasets.yaml`. - Newly downloaded benchmark datasets are wired into `ingestion/config.yaml`, so the existing `make start` / `make collect` / `ingestion/local_sources_to_kafka.py` path will pick them up automatically once the files exist under `datasets/external/`. - Current recommended predictor-training stack is: `cse_cic_ids2018_kaggle`, `unsw_nb15_kaggle`, `ton_iot_network_kaggle`, and `unsw_nb15_hf`. - `LOCAL_STREAM_DATASETS` and `FEATURE_SOURCE_NAMES` default to that same allowlist so the automated runtime focuses on the structured benchmark datasets that the current predictor can actually learn from. - `cyber_threat_intelligence_hf` remains available for future NLP/IOC enrichment work, but it is not part of the default predictor-training set. - `ember_2018_features_kaggle` remains optional because it is large and belongs to the malware-analysis module, not the current network-focused predictor. - `malwarebazaar_recent` is part of `ingestion/online_sources.yaml` and flows through the normal online collector loop into `malware-intelligence`. - This host is nearly full; the manifest therefore points `cse_cic_ids2018_kaggle` at a smaller cleaned mirror instead of the larger raw Kaggle dump. Storage controls for low-disk hosts: make datasets-cleanup EXTERNAL_DATASET_DEFAULT_SET=1 make space-report make space-clean Notes: - `make datasets-cleanup` removes only extracted dataset archives; it keeps the extracted files used by the pipeline. - `make space-clean` compacts `.git`, prunes unused Docker images, removes regenerable runtime artifacts, and clears Spark temp/checkpoint paths. - `make space-clean-aggressive` also removes `data/processed/` so the next pipeline run rebuilds processed features from scratch. - The largest remaining local footprint is usually `venv/`. The Docker Compose runtime does not require it, but some host-side Make targets still reference it indirectly, so remove it only if you are intentionally standardizing on a Docker-only workflow. 8. Smoke-test heavy parsing path (Logstash -> Kafka) echo '<13>Jan 30 10:10:10 firewall-1 kernel: Deny TCP 1.2.3.4:443 -> 10.0.0.5:51515' | \ nc -w 1 127.0.0.1 5514 Expected: - Message is parsed by Logstash and published to `logs.firewall`. 9. Validate streaming + enrichment flow (Spark) docker exec kafka kafka-run-class kafka.tools.GetOffsetShell \ --broker-list localhost:9092 --topic enriched.features --time -1 Expected: - `enriched.features` offsets increase as Spark enriches incoming events. 10. Validate the permanent monitoring consumer group + downstream handoff docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group ${MONITOR_CONSUMER_GROUP:-cti-monitor-group} Expected: - Group exists and stays visible in Kafka UI (`Consumers` page). - `CURRENT-OFFSET`/`LOG-END-OFFSET` values update as new events arrive. - Runtime loop continues to push data to next stages (`feature-store`, `data-processing`, `feature-enrichment`, and `model-training` runs update in `runtime/system_runtime_state.json` and dashboard cards). Optional (if raw sink enabled): docker exec namenode hdfs dfs -ls /cti/raw Expected: - Raw streaming records are written under `/cti/raw`. 11. Run full collection + verification KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \ python pipeline.py collect \ --start-collectors \ --max-items 500 \ --local-stream-max-rows 200 \ --verify-coverage Expected: - Required topics are initialized. - Collector services are started. - Online API collectors, Python host logs, and optional local datasets publish. - Coverage report confirms ingestion health. 12. Generate the dataset schema + label report python evaluation/verify_dataset_schema.py --datasets \ CICIDS_Enriched_2022_06_08 Attack_Dataset CloudWatch_Web_Attack \ Cybersecurity_Intrusion_Data Cybersecurity_Extraction Dataset_Cybersecurity_Michele Expected: - Report written to `evaluation/label_schema_report.md`. 13. Generate dashboard metrics + open the mini dashboard python dashboards/collect_metrics.py python dashboards/control_server.py --port 8000 Expected: - `dashboards/metrics.json` is created. - Open `http://localhost:8000` to view the overview dashboard. - Open `http://localhost:8000/feature_store.html` to view feature-store partitions, rows, and source-type distribution. - Dashboard now includes Docker Compose service health (running/stopped by service), collection runs, Kafka topic offsets, and feature-store snapshot metrics. - The `Streaming` card uses `enriched.features` Kafka offsets by default (works even when raw HDFS sink is disabled). Optional (include raw streaming path metrics; requires HDFS to be reachable): python dashboards/collect_metrics.py --include-streaming 14. Stop the full system cleanly make stop Expected: - Runtime background loop is stopped. - Docker stack services are stopped. ## Common Runtime Issues - Kafka UI `Consumers` page is empty: - Kafka only shows active groups. If no long-running consumer is connected, the list can be empty. - This stack now includes `kafka-monitor-consumer` with `cti-monitor-group` to keep a permanent visible group. - Verify service + group: - `docker-compose -f docker/docker-compose.yaml ps kafka-monitor-consumer` - `docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group ${MONITOR_CONSUMER_GROUP:-cti-monitor-group}` - Local dataset streaming step reports all datasets as `path_not_found`: - Expected when `datasets/` files are not present on the host. - In that case `Local Streaming Publish` can legitimately remain `0`. - Check `ingestion/last_local_stream_report.json` for per-dataset status. - Some API collectors show `missing_env:*` in online report: - Expected for TAXII/OpenCTI/MISP/threatfeeds endpoints until credentials are configured. - Set required environment variables, then rerun `ingestion/online_ingestion_to_kafka.py`. - `spark-streaming` restarts every ~10s with `0 datanode(s) running`: - This indicates NameNode/DataNode registration drift in the current Docker state. - Validate with: - `docker logs --tail 200 spark-streaming` - `docker logs --tail 200 datanode` - If needed, recreate the HDFS services/volumes and then restart streaming. - `make up` fails with `KeyError: 'ContainerConfig'` on `namenode`: - This is a known `docker-compose` v1 recreate issue with stale HDFS containers. - The `Makefile` `up` target now auto-detects this error, removes stale `namenode`/`datanode` containers, and retries automatically. - `make metrics` fails with NumPy C-extension import errors: - Cause is Python-version mismatch when forcing an incompatible `venv/lib/pythonX.Y/site-packages` on `PYTHONPATH`. - `Makefile` now resolves `PYTHONPATH` dynamically using the active Python minor version and falls back to repo-only path if no compatible venv exists. - Feature-store materialization fails with `Mkdirs failed` under `/opt/app`: - Container user permissions on bind-mounted host paths can block Spark parquet writes. - `make feature-store` runs `spark-submit` as root inside `spark-streaming` to ensure write access. - Dashboard shows switch as `Unavailable`: - The ON/OFF switch requires API endpoints from `dashboards/control_server.py`. - Start it with `make dashboard` (or `python dashboards/control_server.py --port 8000`). - Agent experiences KafkaConnectionError (Errno 111) connecting to Kafka: - If the agent is trying to reach a non-loopback network interface, ensure the Kafka container's advertised listener is resolved to the host's primary IP instead of 127.0.0.1. - Run the dynamic IP config updater: `python automation/update_network_config.py` or `make start` (which triggers it automatically) to auto-detect the active interface and update `.env` and `agent_config.json` dynamically. - Restart the agent: `pkill -f agent/bda_cti_agent.py && nohup ./venv/bin/python agent/bda_cti_agent.py > runtime/bda-cti-agent.log 2>&1 &` - Agent Bridge shows NodeNotReadyError or Network is unreachable: - Usually occurs when a stale PID file blocks the ingestion bridge service or the Kafka broker is down. - Re-run `rm -f runtime/*.pid && make soc-up` to clear stale states and trigger a clean startup. ## Fixes Log (Keep Updated) - 2026-01-26: Batch ingestion now writes datasets incrementally with `--datasets/--limit` flags to prevent memory exhaustion (`batch_ingest.py`). - 2026-01-26: Online ingestion supports multiple source types and configurable sources via `ingestion/online_sources.yaml`. - 2026-01-26: Kafka bootstrap guidance clarified; use `KAFKA_BOOTSTRAP_SERVERS` with `127.0.0.1:9092` on host or `kafka:29092` in containers. - 2026-01-26: Added missing Spark dependencies to `requirements.txt` (`pyspark`, `pyyaml`) to support batch ingestion. - 2026-01-26: Added pipeline orchestration (`pipeline.py`) and training data preparation (`training/prepare_training_data.py`). - 2026-01-26: Standardized batch schemas (snake_case, common field mapping), improved label resolution, and corrected dataset label config mismatches. - 2026-01-26: Added dataset schema + label verification report (`evaluation/verify_dataset_schema.py`). - 2026-01-26: Updated architecture docs to match the attached closed-loop CTI diagram and preserved Docker Compose deployment model. - 2026-01-26: Added full-source collection flow into Kafka streaming layer via `ingestion/local_sources_to_kafka.py`, expanded online feeds, and added ingestion run reports for dashboard visibility. - 2026-02-15: Added host system log publisher (`ingestion/system_logs_to_kafka.py`) and integrated it into `pipeline.py` so `logs.host` can be populated even when local dataset files are unavailable. - 2026-02-15: `ingestion/local_sources_to_kafka.py` now exits cleanly with a report when configured local dataset paths do not exist, instead of failing Spark startup. - 2026-02-15: Fixed spark-streaming dependency cache setup in `docker/docker-compose.yaml` by using container-local Ivy cache settings. - 2026-02-15: Added stable HDFS container hostnames (`namenode`, `datanode`) in `docker/docker-compose.yaml` to reduce NameNode/DataNode registration drift across restarts. - 2026-02-15: Added required Kafka topic bootstrap with `ingestion/required_topics.yaml` and `ingestion/ensure_kafka_topics.py`. - 2026-02-15: Added Filebeat, Logstash, and Fluent Bit configurations and Compose services for ingestion-layer collection/parsing: `docker/filebeat/filebeat.yml`, `docker/logstash/pipeline/logstash.conf`, `docker/fluent-bit/fluent-bit.conf`. - 2026-02-15: Updated online collector coverage to include STIX/TAXII/OpenCTI, MISP, abuse.ch, and threatfeeds-style sources in `ingestion/online_sources.yaml`; missing credentials are now reported as explicit skips in `ingestion/online_ingestion_to_kafka.py`. - 2026-02-15: Streaming job now consumes required architecture topics and writes enriched feature events to `enriched.features` in `streaming_ingest.py`. - 2026-02-15: Added raw sink toggles to `streaming_ingest.py` (`RAW_SINK_ENABLED`, `RAW_OUTPUT_PATH`, `RAW_CHECKPOINT_PATH`) and defaulted Compose runtime to prioritize continuous enrichment output. - 2026-02-15: Added ingestion coverage checker (`ingestion/verify_ingestion_stack.py`) and pipeline hooks for collector startup, topic initialization, and post-run verification. - 2026-02-15: Added `.env.example` with all required collector and streaming environment keys. - 2026-02-15: Added `Makefile` one-command orchestration (`make collect`) and helper targets (`make topics`, `make verify`, `make metrics`). - 2026-02-15: Hardened `make up` to auto-recover from docker-compose v1 `ContainerConfig` errors by removing stale HDFS containers and retrying startup. - 2026-02-15: Updated `Makefile` Python path resolution to detect compatible `venv` site-packages by Python minor version, preventing NumPy ABI mismatch errors in metrics and verification commands. - 2026-02-15: Added incremental NVD collection target (`make nvd-daily`) with configurable `NVD_SINCE`/`NVD_MAX_ITEMS` for daily updates. - 2026-02-15: Added PySpark feature-store materialization job (`training/materialize_feature_store.py`) and `make feature-store` to convert `enriched.features` Kafka events into partitioned Parquet datasets. - 2026-02-15: Added feature-store dashboard page (`dashboards/feature_store.html`) and metrics collection support in `dashboards/collect_metrics.py` (including Spark-in-container fallback via `dashboards/feature_store_metrics_job.py`). - 2026-02-15: Added one-switch dashboard control API (`dashboards/control_server.py`) with background runtime loop (`automation/system_runtime.py`) for interval-based online CTI ingestion and network scan collection. - 2026-02-15: Added consolidated feature record in `docs/FEATURE_RECORD.md`. - 2026-02-17: Added data-processing stage script (`training/data_processing_pipeline.py`) and `make data-processing` target to produce curated feature datasets plus anomaly-scored slices from the feature store. - 2026-02-17: Tuned data-processing anomaly sensitivity support via `PROCESS_ANOMALY_QUANTILE` and added dashboard cards/metrics for processed feature volume and anomaly slices. - 2026-02-17: Updated training prep defaults so processed curated features are the primary model-training input (`data/processed/cti_training/default`). - 2026-02-17: Added feature extraction and enrichment stage (`training/feature_extraction_enrichment.py`) with `make feature-enrichment`. - 2026-02-17: Added `make start` / `make stop` / `make status` for one-command background runtime control from CLI. - 2026-02-17: Extended the one-switch runtime loop to continuously run feature-store materialization, data-processing, and feature-enrichment stages in addition to ingestion and scanning. - 2026-02-18: Added a persistent Kafka monitoring consumer service (`kafka-monitor-consumer`) with configurable group/topic (`MONITOR_CONSUMER_GROUP`, `MONITOR_CONSUMER_TOPIC`) so a stable consumer group stays visible in Kafka UI while downstream processing stages continue. - 2026-05-18: Added real-time log monitoring for the UFW host firewall (`/var/log/ufw.log`). The agent extracts key telemetry fields (SRC, DST, PROTO, SPT, DPT) from blocked packets and publishes them directly to Kafka. - 2026-05-18: Dynamic Host IP Auto-Discovery: Integrated network interface detection in `automation/update_network_config.py` to identify the host's primary IP (e.g. `wlo1`), dynamically writing it to configurations (`agent/agent_config.json` and `.env`) to resolve looped interface connection failures. - 2026-05-18: Embedded Asynchronous `NotificationService` within the FastAPI backend lifespan, implementing a highly resilient, STARTTLS-secured SMTP alerting pipeline (Gmail-backed) for direct analyst notifications of incoming alarms and operations. - 2026-05-18: Integrated Audit Log Hooking: Every system action (logins, isolation triggers, training invocations) publishes immediately to the `audit` topic and dispatches real-time email audits. - 2026-05-18: Integrated the Real-Time Ingestion Bridge (`ingestion/agent_bridge.py`) which acts as the operational gateway to feed Kafka streams to Elasticsearch indices (`cti-alerts`, `logs.host`) and MongoDB, executing inline ML-based threat scoring for instant tactical alerts. - 2026-05-18: Modernized SOC UI Dashboard Shell (`frontend/` / `endguardui` React app): Integrated live agent heartbeat displays, real-time alert grids, manual playbooks, audit views, and ML metrics (F1, Precision, Recall, Accuracy, speed, drift, registry).
标签:Kafka, SonarQube插件, Spark, 内容过滤, 大数据分析, 威胁情报, 安全运营中心, 开发者工具, 数据管道, 测试用例, 版权保护, 网络安全, 网络映射, 请求拦截, 软件工程, 软件成分分析, 逆向工具, 隐私保护