nadimpallipavan/secmon
GitHub: nadimpallipavan/secmon
Stars: 0 | Forks: 0
# SecMon — Security Monitoring Platform
[](https://github.com/nadimpallipavan/secmon/actions/workflows/ci.yml)
A complete, runnable security monitoring platform that **collects, parses, analyzes, and
visualizes** logs from multiple sources, **detects suspicious activity** with an editable
rules engine, and sends **real-time alerts** — all orchestrated locally with Docker and
**no external cloud credentials required**.
┌─────────────┐
syslog / auth / web ───▶│ Logstash │──┐ (grok-parse → normalize → forward)
(files, TCP/UDP 5514) └─────────────┘ │
▼
log generator / apps ───────────────▶ ┌──────────────────┐ bulk ┌───────────────┐
POST /ingest │ FastAPI (api) │──────────▶│ Elasticsearch │
│ normalize + │ logs-* │ logs-* │
│ detection engine│ │ alerts-* │
└────────┬─────────┘ └──────┬────────┘
alert │ queue (RPUSH) │
▼ ▼
┌──────────────┐ BLPOP ┌───────────────┐
│ Redis │◀────┐ │ Kibana │
│ queue+windows│ │ │ dashboards │
└──────────────┘ ┌───┴──────┐ └───────────────┘
│ worker │ dedup/rate-limit
│ (alerts) │ → file / webhook / Pub/Sub
└──────────┘
## Why these design choices
- **FastAPI is the single writer to Elasticsearch.** Logstash parses multi-source logs and
*forwards* them to `POST /ingest` rather than writing to ES itself. This guarantees every
event — no matter the source — flows through the same normalize → detect → store path, so
detection never depends on where a log came from. No double-indexing.
- **Redis sliding windows for rate-based rules.** Brute-force and traffic-spike detection use
Redis sorted sets (score = timestamp): trim old scores, add the new one, read cardinality.
O(log n), self-expiring, and shared across API workers.
- **Decoupled alert worker.** The API enqueues fired alerts to Redis and returns immediately; a
separate worker dispatches them. A slow webhook can never back-pressure log ingestion.
- **Local-first, cloud-optional.** Webhook and GCP Pub/Sub are both optional and import-guarded.
Alerts are *always* written to a local file, stored in ES, and queryable via `/alerts`.
- **In-memory window store for tests.** The rules engine accepts a pluggable `WindowStore`, so
the full detection suite runs in `pytest` with no Redis.
## Tech stack
| Component | Role |
|----------------|---------------------------------------------------|
| FastAPI | Ingestion API, normalization, detection engine, metrics |
| Elasticsearch | Storage & search (`logs-*`, `alerts-*`) |
| Logstash | Multi-source ingestion + grok parsing |
| Kibana | Dashboards & index patterns |
| Redis | Alert queue, sliding-window counters, rate-limiting |
| Worker | Alert dispatch (file / webhook / Pub/Sub) |
| Docker Compose | Orchestration |
## Quick start
# from the project root
cp .env.example .env # (bin/demo.sh does this for you)
docker compose up -d --build # boots ES, Kibana, Logstash, Redis, api, worker, setup
# wait ~60-120s on first run (ES boot + Kibana), then seed data with embedded attacks:
python scripts/generate_logs.py --api http://localhost:8000 --duration 20 --rate 40
Or one command (Linux/macOS/Git-Bash/WSL):
bash bin/demo.sh
# or: make demo
Then explore:
- **Metrics:** `curl -s http://localhost:8000/status | python -m json.tool`
- **Alerts:** `curl -s http://localhost:8000/alerts | python -m json.tool`
- **API docs:** http://localhost:8000/docs
- **Kibana:** http://localhost:5601 → **Dashboard → “SecMon — Threat Overview”**
The `setup` container automatically installs the ES index templates and imports the Kibana
data views + dashboard on startup.
## Normalized event schema
Every log line is coerced into this common shape before storage / detection
(`app/schema.py`). In Elasticsearch, `timestamp` is stored as `@timestamp`.
| Field | Type | Description |
|---------------|---------|----------------------------------------------------|
| `timestamp` | date | ISO-8601 UTC (`@timestamp` in ES) |
| `source` | keyword | `syslog` \| `auth` \| `web` \| `app` \| … |
| `host` | keyword | Originating host |
| `event_type` | keyword | `ssh_login_failed`, `http_request`, `sudo`, … |
| `severity` | keyword | `info` \| `low` \| `medium` \| `high` \| `critical`|
| `src_ip` | ip | Source IP (CIDR/range-queryable) |
| `dest_ip` | ip | Destination IP |
| `dest_port` | integer | Destination port |
| `user` | keyword | Username |
| `http_method` | keyword | HTTP method |
| `url` | keyword | Request path |
| `status_code` | integer | HTTP status |
| `bytes` | long | Response bytes |
| `country` | keyword | Geo (best-effort) |
| `message` | text | Human-readable summary |
| `raw` | text | Original raw log line |
### Alert schema (`alerts-*`)
`@timestamp`, `rule`, `rule_type`, `severity`, `description`, `src_ip`, `user`, `host`,
`count`, `window_seconds`, `dedup_key`, `message`, `matched_events`.
## Ingestion
`POST /ingest` accepts any of:
// 1) batch wrapper (what the generator sends)
{ "events": [ { "source": "web", "src_ip": "1.2.3.4", "url": "/admin", "status_code": 403 } ] }
// 2) a bare array (what Logstash json_batch sends)
[ { "...": "..." } ]
// 3) raw line + format hint -> server-side parser expands it
{ "events": [ { "format": "auth", "raw": "Jan 10 12:34:56 h sshd[1]: Failed password for root from 9.9.9.9 port 22 ssh2" } ] }
Supported `format` hints: `syslog`, `auth`/`ssh`, `nginx`/`apache`/`web`, `json`/`app`.
### Logstash path
`logstash/pipeline/logstash.conf` ingests from files (`data/logs/{syslog,auth,access,app}.log`)
and TCP/UDP `5514`, grok-parses each source into the normalized schema, then POSTs batches to
`/ingest`. To exercise it:
python scripts/generate_logs.py --write-files # writes raw lines into data/logs/*.log
# Logstash tails them, parses, and forwards to the API → detection runs as usual.
## Detection rules
Rules live in **`app/rules/rules.yaml`** and are reloaded when the API starts. Four rule types
ship out of the box:
| Rule | Type | Fires when … |
|-------------------------|------------------------|-----------------------------------------------------------|
| `ssh_brute_force` | `brute_force` | ≥5 failed logins from one `src_ip` within 60s |
| `unauthorized_access` | `unauthorized_access` | request to a restricted path or a 401/403 (200 ⇒ critical)|
| `anomalous_traffic` | `traffic_spike` | >60 req/30s **or** >15 distinct ports/30s (port scan) |
| `privilege_escalation` | `privilege_escalation` | suspicious sudo/su (`NOT in sudoers`, auth failure, …) |
Each rule has a `cooldown_seconds` that collapses a single attack into one alert (so a burst
doesn’t flood the alerts index — notification rate-limiting in the worker is a second layer).
### Adding a new rule
Append to `app/rules/rules.yaml` and restart the API (`docker compose restart api`):
- name: my_rule
type: brute_force # brute_force | unauthorized_access | traffic_spike | privilege_escalation
description: "What it detects"
severity: high
enabled: true
match: # AND of field -> allowed values, applied to each event
event_type: ["ssh_login_failed"]
group_by: src_ip # window key for rate rules
threshold: 10
window_seconds: 120
cooldown_seconds: 120
For a brand-new *type*, add a `_rule_` handler in `app/rules_engine.py`.
## Real-time alerting
When a rule fires, the API writes the alert to `alerts-*` and enqueues it on Redis. The
**worker** (`worker/alert_worker.py`) consumes the queue and dispatches across sinks:
1. **Local file + stdout** — always (`data/alerts.log`); the guaranteed fallback.
2. **REST webhook** — if `ALERT_WEBHOOK_URL` is set.
3. **GCP Pub/Sub** — if `GCP_PROJECT` + `GCP_PUBSUB_TOPIC` are set and the client is available.
**Dedup / rate-limiting:** external delivery is suppressed for `ALERT_RATE_LIMIT_SECONDS` per
`dedup_key` (`rule:src_ip`) via a Redis `SET NX EX` lock. **Severity routing:** only alerts at
or above `ALERT_MIN_SEVERITY` are sent externally (the file sink still records everything).
## Dashboards (Kibana)
Saved objects are committed at `kibana/export.ndjson` and auto-imported by the `setup`
container. They’re generated from `scripts/build_kibana_export.py` (re-run it to regenerate).
- **Data views:** `logs-*`, `alerts-*`
- **Dashboard “SecMon — Threat Overview”:** event volume over time, alerts by severity over
time, top source IPs, failed vs successful logins, alerts by severity.
If you ever need to re-import manually:
curl -s -X POST "http://localhost:5601/api/saved_objects/_import?overwrite=true" \
-H "kbn-xsrf: true" --form file=@kibana/export.ndjson
## Environment variables (`.env`)
| Variable | Default | Purpose |
|----------------------------|--------------------------------|------------------------------------------|
| `ES_HOST` | `http://elasticsearch:9200` | Elasticsearch URL |
| `LOGS_INDEX_PREFIX` | `logs` | Daily logs index prefix |
| `ALERTS_INDEX_PREFIX` | `alerts` | Daily alerts index prefix |
| `REDIS_URL` | `redis://redis:6379/0` | Redis connection |
| `ALERT_QUEUE` | `alerts:queue` | Redis list the worker consumes |
| `RULES_PATH` | `app/rules/rules.yaml` | Detection rules file |
| `ALERT_WEBHOOK_URL` | *(empty)* | Optional REST webhook for alerts |
| `ALERT_RATE_LIMIT_SECONDS` | `120` | Dedup window per `rule:src_ip` |
| `ALERT_MIN_SEVERITY` | `medium` | Min severity for external delivery |
| `ALERT_LOG_FILE` | `data/alerts.log` | Local fallback alert log |
| `GCP_PROJECT` | *(empty)* | Optional GCP project for Pub/Sub |
| `GCP_PUBSUB_TOPIC` | *(empty)* | Optional Pub/Sub topic |
**Everything cloud/webhook-related is optional.** With an empty `.env` the platform is fully
functional locally.
### Optional: webhook & GCP
ALERT_WEBHOOK_URL=https://hooks.example.com/secmon # e.g. a Slack/Discord/HTTP receiver
GCP_PROJECT=my-project
GCP_PUBSUB_TOPIC=secmon-alerts
# GOOGLE_APPLICATION_CREDENTIALS=/path/key.json (mount into the worker container)
## API reference
| Method | Path | Description |
|--------|------------|--------------------------------------------------------|
| POST | `/ingest` | Ingest a batch; normalize, index, detect, queue alerts |
| GET | `/alerts` | Recent alerts (`?severity=high&limit=50`) |
| GET | `/status` | Event/alert counts, alerts by severity & rule |
| GET | `/health` | Liveness + ES/Redis status |
| GET | `/docs` | Swagger UI |
## Tests
pip install -r requirements.txt
pytest -q
`tests/test_parsers.py` covers each parser; `tests/test_rules.py` covers all four rule types
using the in-memory window store (no Redis needed).
## Project layout
.
├── docker-compose.yml # all services + one-shot `setup` provisioner
├── Dockerfile # api + worker + setup image
├── app/
│ ├── main.py # FastAPI: /ingest /alerts /status /health
│ ├── schema.py # normalized Event schema
│ ├── normalize.py # source parsers (mirror the grok patterns)
│ ├── rules_engine.py # YAML-driven detection engine
│ ├── redis_client.py # Redis + pluggable sliding-window store
│ ├── es_client.py # ES connection, daily indices, bulk writes
│ ├── alerting.py # dedup/rate-limit/route/dispatch
│ ├── gcp.py # optional Pub/Sub (guarded) + fallback
│ ├── config.py # env-driven settings
│ └── rules/rules.yaml # ← editable detection rules
├── worker/alert_worker.py # consumes the alert queue
├── logstash/ # pipeline + grok patterns + config
├── elasticsearch/templates/ # logs-* / alerts-* index templates
├── kibana/export.ndjson # data views + dashboard (auto-imported)
├── scripts/
│ ├── generate_logs.py # realistic traffic + embedded attacks
│ ├── provision.py # install templates + import Kibana objects
│ └── build_kibana_export.py # regenerate the saved objects
├── tests/ # pytest: parsers + rules
├── bin/ # setup.sh, seed.sh, demo.sh, wait_for_ready.sh
└── Makefile
## Troubleshooting
- **Elasticsearch exits / `vm.max_map_count`** (Linux): `sudo sysctl -w vm.max_map_count=262144`.
- **First boot is slow:** ES + Kibana take 60–120s. `bin/wait_for_ready.sh` polls for you.
- **No alerts?** Make sure you seeded data (`make seed`) and that the `api`/`worker` containers
are healthy: `docker compose ps`, `docker compose logs -f worker`.
- **Kibana dashboard empty:** set the time picker to *Last 24 hours* and confirm data exists via
`/status`. Re-import with the curl command above if needed.