nadimpallipavan/secmon

GitHub: nadimpallipavan/secmon

Stars: 0 | Forks: 0

# SecMon — Security Monitoring Platform [![CI](https://static.pigsec.cn/wp-content/uploads/repos/2026/06/a6a2dbf4dd172514.svg)](https://github.com/nadimpallipavan/secmon/actions/workflows/ci.yml) A complete, runnable security monitoring platform that **collects, parses, analyzes, and visualizes** logs from multiple sources, **detects suspicious activity** with an editable rules engine, and sends **real-time alerts** — all orchestrated locally with Docker and **no external cloud credentials required**. ┌─────────────┐ syslog / auth / web ───▶│ Logstash │──┐ (grok-parse → normalize → forward) (files, TCP/UDP 5514) └─────────────┘ │ ▼ log generator / apps ───────────────▶ ┌──────────────────┐ bulk ┌───────────────┐ POST /ingest │ FastAPI (api) │──────────▶│ Elasticsearch │ │ normalize + │ logs-* │ logs-* │ │ detection engine│ │ alerts-* │ └────────┬─────────┘ └──────┬────────┘ alert │ queue (RPUSH) │ ▼ ▼ ┌──────────────┐ BLPOP ┌───────────────┐ │ Redis │◀────┐ │ Kibana │ │ queue+windows│ │ │ dashboards │ └──────────────┘ ┌───┴──────┐ └───────────────┘ │ worker │ dedup/rate-limit │ (alerts) │ → file / webhook / Pub/Sub └──────────┘ ## Why these design choices - **FastAPI is the single writer to Elasticsearch.** Logstash parses multi-source logs and *forwards* them to `POST /ingest` rather than writing to ES itself. This guarantees every event — no matter the source — flows through the same normalize → detect → store path, so detection never depends on where a log came from. No double-indexing. - **Redis sliding windows for rate-based rules.** Brute-force and traffic-spike detection use Redis sorted sets (score = timestamp): trim old scores, add the new one, read cardinality. O(log n), self-expiring, and shared across API workers. - **Decoupled alert worker.** The API enqueues fired alerts to Redis and returns immediately; a separate worker dispatches them. A slow webhook can never back-pressure log ingestion. - **Local-first, cloud-optional.** Webhook and GCP Pub/Sub are both optional and import-guarded. Alerts are *always* written to a local file, stored in ES, and queryable via `/alerts`. - **In-memory window store for tests.** The rules engine accepts a pluggable `WindowStore`, so the full detection suite runs in `pytest` with no Redis. ## Tech stack | Component | Role | |----------------|---------------------------------------------------| | FastAPI | Ingestion API, normalization, detection engine, metrics | | Elasticsearch | Storage & search (`logs-*`, `alerts-*`) | | Logstash | Multi-source ingestion + grok parsing | | Kibana | Dashboards & index patterns | | Redis | Alert queue, sliding-window counters, rate-limiting | | Worker | Alert dispatch (file / webhook / Pub/Sub) | | Docker Compose | Orchestration | ## Quick start # from the project root cp .env.example .env # (bin/demo.sh does this for you) docker compose up -d --build # boots ES, Kibana, Logstash, Redis, api, worker, setup # wait ~60-120s on first run (ES boot + Kibana), then seed data with embedded attacks: python scripts/generate_logs.py --api http://localhost:8000 --duration 20 --rate 40 Or one command (Linux/macOS/Git-Bash/WSL): bash bin/demo.sh # or: make demo Then explore: - **Metrics:** `curl -s http://localhost:8000/status | python -m json.tool` - **Alerts:** `curl -s http://localhost:8000/alerts | python -m json.tool` - **API docs:** http://localhost:8000/docs - **Kibana:** http://localhost:5601 → **Dashboard → “SecMon — Threat Overview”** The `setup` container automatically installs the ES index templates and imports the Kibana data views + dashboard on startup. ## Normalized event schema Every log line is coerced into this common shape before storage / detection (`app/schema.py`). In Elasticsearch, `timestamp` is stored as `@timestamp`. | Field | Type | Description | |---------------|---------|----------------------------------------------------| | `timestamp` | date | ISO-8601 UTC (`@timestamp` in ES) | | `source` | keyword | `syslog` \| `auth` \| `web` \| `app` \| … | | `host` | keyword | Originating host | | `event_type` | keyword | `ssh_login_failed`, `http_request`, `sudo`, … | | `severity` | keyword | `info` \| `low` \| `medium` \| `high` \| `critical`| | `src_ip` | ip | Source IP (CIDR/range-queryable) | | `dest_ip` | ip | Destination IP | | `dest_port` | integer | Destination port | | `user` | keyword | Username | | `http_method` | keyword | HTTP method | | `url` | keyword | Request path | | `status_code` | integer | HTTP status | | `bytes` | long | Response bytes | | `country` | keyword | Geo (best-effort) | | `message` | text | Human-readable summary | | `raw` | text | Original raw log line | ### Alert schema (`alerts-*`) `@timestamp`, `rule`, `rule_type`, `severity`, `description`, `src_ip`, `user`, `host`, `count`, `window_seconds`, `dedup_key`, `message`, `matched_events`. ## Ingestion `POST /ingest` accepts any of: // 1) batch wrapper (what the generator sends) { "events": [ { "source": "web", "src_ip": "1.2.3.4", "url": "/admin", "status_code": 403 } ] } // 2) a bare array (what Logstash json_batch sends) [ { "...": "..." } ] // 3) raw line + format hint -> server-side parser expands it { "events": [ { "format": "auth", "raw": "Jan 10 12:34:56 h sshd[1]: Failed password for root from 9.9.9.9 port 22 ssh2" } ] } Supported `format` hints: `syslog`, `auth`/`ssh`, `nginx`/`apache`/`web`, `json`/`app`. ### Logstash path `logstash/pipeline/logstash.conf` ingests from files (`data/logs/{syslog,auth,access,app}.log`) and TCP/UDP `5514`, grok-parses each source into the normalized schema, then POSTs batches to `/ingest`. To exercise it: python scripts/generate_logs.py --write-files # writes raw lines into data/logs/*.log # Logstash tails them, parses, and forwards to the API → detection runs as usual. ## Detection rules Rules live in **`app/rules/rules.yaml`** and are reloaded when the API starts. Four rule types ship out of the box: | Rule | Type | Fires when … | |-------------------------|------------------------|-----------------------------------------------------------| | `ssh_brute_force` | `brute_force` | ≥5 failed logins from one `src_ip` within 60s | | `unauthorized_access` | `unauthorized_access` | request to a restricted path or a 401/403 (200 ⇒ critical)| | `anomalous_traffic` | `traffic_spike` | >60 req/30s **or** >15 distinct ports/30s (port scan) | | `privilege_escalation` | `privilege_escalation` | suspicious sudo/su (`NOT in sudoers`, auth failure, …) | Each rule has a `cooldown_seconds` that collapses a single attack into one alert (so a burst doesn’t flood the alerts index — notification rate-limiting in the worker is a second layer). ### Adding a new rule Append to `app/rules/rules.yaml` and restart the API (`docker compose restart api`): - name: my_rule type: brute_force # brute_force | unauthorized_access | traffic_spike | privilege_escalation description: "What it detects" severity: high enabled: true match: # AND of field -> allowed values, applied to each event event_type: ["ssh_login_failed"] group_by: src_ip # window key for rate rules threshold: 10 window_seconds: 120 cooldown_seconds: 120 For a brand-new *type*, add a `_rule_` handler in `app/rules_engine.py`. ## Real-time alerting When a rule fires, the API writes the alert to `alerts-*` and enqueues it on Redis. The **worker** (`worker/alert_worker.py`) consumes the queue and dispatches across sinks: 1. **Local file + stdout** — always (`data/alerts.log`); the guaranteed fallback. 2. **REST webhook** — if `ALERT_WEBHOOK_URL` is set. 3. **GCP Pub/Sub** — if `GCP_PROJECT` + `GCP_PUBSUB_TOPIC` are set and the client is available. **Dedup / rate-limiting:** external delivery is suppressed for `ALERT_RATE_LIMIT_SECONDS` per `dedup_key` (`rule:src_ip`) via a Redis `SET NX EX` lock. **Severity routing:** only alerts at or above `ALERT_MIN_SEVERITY` are sent externally (the file sink still records everything). ## Dashboards (Kibana) Saved objects are committed at `kibana/export.ndjson` and auto-imported by the `setup` container. They’re generated from `scripts/build_kibana_export.py` (re-run it to regenerate). - **Data views:** `logs-*`, `alerts-*` - **Dashboard “SecMon — Threat Overview”:** event volume over time, alerts by severity over time, top source IPs, failed vs successful logins, alerts by severity. If you ever need to re-import manually: curl -s -X POST "http://localhost:5601/api/saved_objects/_import?overwrite=true" \ -H "kbn-xsrf: true" --form file=@kibana/export.ndjson ## Environment variables (`.env`) | Variable | Default | Purpose | |----------------------------|--------------------------------|------------------------------------------| | `ES_HOST` | `http://elasticsearch:9200` | Elasticsearch URL | | `LOGS_INDEX_PREFIX` | `logs` | Daily logs index prefix | | `ALERTS_INDEX_PREFIX` | `alerts` | Daily alerts index prefix | | `REDIS_URL` | `redis://redis:6379/0` | Redis connection | | `ALERT_QUEUE` | `alerts:queue` | Redis list the worker consumes | | `RULES_PATH` | `app/rules/rules.yaml` | Detection rules file | | `ALERT_WEBHOOK_URL` | *(empty)* | Optional REST webhook for alerts | | `ALERT_RATE_LIMIT_SECONDS` | `120` | Dedup window per `rule:src_ip` | | `ALERT_MIN_SEVERITY` | `medium` | Min severity for external delivery | | `ALERT_LOG_FILE` | `data/alerts.log` | Local fallback alert log | | `GCP_PROJECT` | *(empty)* | Optional GCP project for Pub/Sub | | `GCP_PUBSUB_TOPIC` | *(empty)* | Optional Pub/Sub topic | **Everything cloud/webhook-related is optional.** With an empty `.env` the platform is fully functional locally. ### Optional: webhook & GCP ALERT_WEBHOOK_URL=https://hooks.example.com/secmon # e.g. a Slack/Discord/HTTP receiver GCP_PROJECT=my-project GCP_PUBSUB_TOPIC=secmon-alerts # GOOGLE_APPLICATION_CREDENTIALS=/path/key.json (mount into the worker container) ## API reference | Method | Path | Description | |--------|------------|--------------------------------------------------------| | POST | `/ingest` | Ingest a batch; normalize, index, detect, queue alerts | | GET | `/alerts` | Recent alerts (`?severity=high&limit=50`) | | GET | `/status` | Event/alert counts, alerts by severity & rule | | GET | `/health` | Liveness + ES/Redis status | | GET | `/docs` | Swagger UI | ## Tests pip install -r requirements.txt pytest -q `tests/test_parsers.py` covers each parser; `tests/test_rules.py` covers all four rule types using the in-memory window store (no Redis needed). ## Project layout . ├── docker-compose.yml # all services + one-shot `setup` provisioner ├── Dockerfile # api + worker + setup image ├── app/ │ ├── main.py # FastAPI: /ingest /alerts /status /health │ ├── schema.py # normalized Event schema │ ├── normalize.py # source parsers (mirror the grok patterns) │ ├── rules_engine.py # YAML-driven detection engine │ ├── redis_client.py # Redis + pluggable sliding-window store │ ├── es_client.py # ES connection, daily indices, bulk writes │ ├── alerting.py # dedup/rate-limit/route/dispatch │ ├── gcp.py # optional Pub/Sub (guarded) + fallback │ ├── config.py # env-driven settings │ └── rules/rules.yaml # ← editable detection rules ├── worker/alert_worker.py # consumes the alert queue ├── logstash/ # pipeline + grok patterns + config ├── elasticsearch/templates/ # logs-* / alerts-* index templates ├── kibana/export.ndjson # data views + dashboard (auto-imported) ├── scripts/ │ ├── generate_logs.py # realistic traffic + embedded attacks │ ├── provision.py # install templates + import Kibana objects │ └── build_kibana_export.py # regenerate the saved objects ├── tests/ # pytest: parsers + rules ├── bin/ # setup.sh, seed.sh, demo.sh, wait_for_ready.sh └── Makefile ## Troubleshooting - **Elasticsearch exits / `vm.max_map_count`** (Linux): `sudo sysctl -w vm.max_map_count=262144`. - **First boot is slow:** ES + Kibana take 60–120s. `bin/wait_for_ready.sh` polls for you. - **No alerts?** Make sure you seeded data (`make seed`) and that the `api`/`worker` containers are healthy: `docker compose ps`, `docker compose logs -f worker`. - **Kibana dashboard empty:** set the time picker to *Last 24 hours* and confirm data exists via `/status`. Re-import with the curl command above if needed.