Capsoide/LogTrace
GitHub: Capsoide/LogTrace
一个基于 ELK 栈和 immudb 的 Windows 审计日志自动化采集、实时分析与不可变存储系统,用于满足安全合规和法医取证需求。
Stars: 18 | Forks: 0
# LogTrace:Windows 审计日志监控系统
## 简介
该系统负责自动采集与 Windows 系统管理员访问相关的审计日志。日志存储在不可变数据库中以确保其完整性,并通过交互式仪表板提供查询功能,便于分析和监控活动。
该流程通过以下组件组成的管道实现:
- ```Winlogbeat``` 安装在 Windows Server 实例上,负责从事件查看器收集并发送审计日志事件。
- ```Logstash``` 运行在 Debian 系统上,接收来自 Winlogbeat 的日志,将其处理后分发到两个不同的 Redis 队列。
- ```Redis``` 运行在 Debian 系统上,作为队列管理系统,实现日志流的分离:
- **队列 0**(`redis-queue-elastic`)将日志发送到 **Elasticsearch** 进行索引,并通过前端界面进行可视化。
- **队列 1**(`redis-queue-immudb`)用于将日志持久化到不可变数据库(immudb),该数据库设计用于确保完整性、不可否认性和长期保存。在此配置中,保留时间为 24 小时。
此架构确保数据复制用于不同目的(如分析和法医保存),以确保数据在时间过程中的完整性和不可篡改性。
各组件的职责如下:
- `Winlogbeat`:从事件查看器采集日志。
- `Logstash`:复制日志流并发送到相应的 Redis 队列。
- `Redis`:管理数据缓冲区。
- `Immudb`:安全且不可变地归档日志。
- `Elasticsearch`:索引和保存日志。为数据的交互分析提供后端支持。
- `Kibana`:用于搜索、可视化和监控索引日志的图形界面。
整个系统设计用于满足 **ACN**、**ISO/IEC 27001** 和 **NIS2** 指令规定的监管要求,这些指令要求对安全日志进行跟踪、保存和完整性验证:
- [**ACN**](https://www.acn.gov.it/portale/nis/aggiornamento-informazioni)(国家网络安全局)为意大利关键基础设施的安全制定了标准。
- [**ISO/IEC 27001**](https://edirama.org/wp-content/uploads/2023/10/document-1.pdf) 是信息安全管理(ISMS)的国际标准,要求记录和分析访问事件。
- [**NIS2**](https://www.acn.gov.it/portale/nis) 是欧盟关于网络和信息系统安全的指令,对基本服务运营商提出了日志记录、保存和事件响应义务。
## 基础设施架构图
## LogTrace 服务与网络接口映射表
| **模块** | **IP** | **端口** | **协议** | **备注** |
|---------------------------------|------------------|---------------------------------|---------------------|--------------------------------------------------------------------------|
| **Winlogbeat** | 192.168.56.2 | 5044 | TCP | 通过 Beats 模块向 Logstash 发送日志 |
| **Logstash** | 192.168.56.10 | 5044(输入),6379(输出) | TCP, Beats | 接收来自 Winlogbeat 的日志并将其复制到两个不同的 Redis 队列 |
| **Redis(Elasticsearch 队列)** | 192.168.56.10 | 6379 | TCP, RESP | Logstash 读取的队列,用于将日志发送到 Elasticsearch |
| **Redis(immudb 队列)** | 192.168.56.10 | 6379 | TCP, RESP | immudb 的复制队列(单独的键或数据库) |
| **Elasticsearch** | 192.168.56.10 | 9200(REST API),9300(传输) | HTTPS/TCP/TLS | 暴露 REST API,节点间通过内部协议通信 |
| **Kibana** | 192.168.56.10 | 5601 | HTTPS/TCP/TLS | 用于查询 Elasticsearch 的图形界面 |
| **immudb** | 192.168.56.10 | 3322(默认),9497(gRPC API) | TCP/gRPC | 从 Redis 队列读取日志以进行不可变历史化存储 |
## Windows Server → Debian 通信
本节介绍 VirtualBox Host-Only 网络的配置,用于在两台虚拟机之间建立直接通信:Windows Server(日志发送方)和 Debian(日志接收和处理方)。
## Host-Only 适配器配置(VirtualBox)
1. 打开 **VirtualBox** → `文件` → `主机网络管理器`
2. 点击 **创建** 添加新适配器
3. 配置:
- **IP**:`192.168.56.1`
- **子网掩码**:`255.255.255.0`
- **DHCP**:禁用
4. 将适配器分配给虚拟机作为 `适配器 2`:
- 模式:`Host-Only`
- 名称:例如 `vboxnet0`
## 静态 IP 配置(Windows Server)
1. 打开 `网络连接中心` > 修改 `适配器设置`
2. 选择连接到 `vboxnet0` 的接口("以太网 2")
3. 点击 `属性` > `TCP/IPv4` 并设置:
- **IP**:`192.168.56.2`
- **子网掩码**:`255.255.255.0`
- **网关**:留空
4. 使用 ```ipconfig``` 验证
```
C:\Users\vboxuser> ipconfig
Windows IP Configuration
Ethernet adapter Ethernet:
Connection-specific DNS Suffix . : xyz.lan
IPv6 Address. . . . . . . . . . . : fd00::be82:30db:2cc8:18ab
Link-local IPv6 Address . . . . . : fe80::b789:33f2:febd:1d7%14
IPv4 Address. . . . . . . . . . . : 10.0.2.15
Subnet Mask . . . . . . . . . . . : 255.255.255.0
Default Gateway . . . . . . . . . : fe80::2%14
10.0.2.2
Ethernet adapter Ethernet 2:
Connection-specific DNS Suffix . :
Link-local IPv6 Address . . . . . : fe80::6894:81ba:3678:5341%13
IPv4 Address. . . . . . . . . . . : 192.168.56.2 <---
Subnet Mask . . . . . . . . . . . : 255.255.255.0 <---
Default Gateway . . . . . . . . . :
```
## 网络接口配置(Debian)
直接修改 ```/etc/network/``` 目录下的 ```interfaces``` 文件来配置网络接口。
```
# 包含额外配置(如有)
source /etc/network/interfaces.d/*
# Loopback 接口
auto lo
iface lo inet loopback
# NAT 接口(互联网)
auto enp0s3
iface enp0s3 inet dhcp
# VirtualBox 主机专用网络(内部网络)接口
auto enp0s8
iface enp0s8 inet static
address 192.168.56.10 <---
netmask 255.255.255.0 <---
```
## 防火墙规则:允许从 Debian 到 Windows Server 的 ping
需要为 Windows Server 创建规则,因为 Windows 防火墙默认阻止传入的 ICMP Echo Request(ping)数据包。
### 分步操作
#### 1. 打开 Windows Defender 高级安全防火墙
```
Premere il tasto Windows, digitare "Windows Defender Firewall with Advanced Security", successivamente aprire l’app.
```
#### 2. 选择"入站规则"
```
Nel pannello a sinistra, cliccare su Inbound Rules.
```
#### 3. 创建新规则
```
Nel pannello a destra, cliccare su New Rule... (Nuova regola).
```
#### 4. 选择规则类型
```
Selezionare Custom (Personalizzata), poi cliccare su Avanti.
```
#### 5. 选择协议
```
Alla voce “Protocol and Ports” (Protocollo e porte), scegliere ICMPv4 dal menu a tendina “Protocol type”.
```
#### 6. 指定 ICMP 数据包类型
```
Cliccare sul pulsante Customize accanto a ICMP Settings.
Selezionare Echo Request (il tipo usato dal ping).
Confermare con OK.
```
#### 7. IP 地址
```
Nella schermata “Scope” lasciare l’opzione “Any IP address” (qualsiasi indirizzo) sia per origine sia per destinazione (, o limita all’IP del Debian se si vuole maggiore sicurezza).
```
#### 8. 规则操作
```
Selezionare Allow the connection (Consenti la connessione).
```
#### 9. 规则应用时机
```
Spuntare tutte le caselle: Domain, Private, Public.
```
#### 10. 为规则命名
```
Scrivere un nome tipo "Consenti ping ICMP Echo Request" e confermare.
```
### 验证
```
vboxuser@vbox:~$ ip link
1: lo: mtu 65536 qdisc noqueue state UNKNOWN mode DEFAULT group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
2: enp0s3: mtu 1500 qdisc fq_codel state UP mode DEFAULT group default qlen 1000
link/ether 08:00:27:e0:87:cc brd ff:ff:ff:ff:ff:ff
3: enp0s8: mtu 1500 qdisc fq_codel state UP mode DEFAULT group default qlen 1000 <---
link/ether 08:00:27:9d:3a:10 brd ff:ff:ff:ff:ff:ff
#Applicazione delle modifiche
vboxuser@vbox:~$ sudo systemctl restart networking
#Se necessario utilizzare il seguente comando (o inalternativa riaviare la vm)
vboxuser@vbox:~$ sudo ifdown enp0s8 && sudo ifup enp0s8
```
```
#Verifica che l'indirizzo sia stato applicato correttamente
vboxuser@vbox:~$ ip a
1: lo: mtu 65536 qdisc noqueue state UNKNOWN group default qlen 1000
link/loopback 00:00:00:00:00:00 brd 00:00:00:00:00:00
inet 127.0.0.1/8 scope host lo
valid_lft forever preferred_lft forever
inet6 ::1/128 scope host noprefixroute
valid_lft forever preferred_lft forever
2: enp0s3: mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 08:00:27:e0:87:cc brd ff:ff:ff:ff:ff:ff
inet 10.0.2.15/24 brd 10.0.2.255 scope global dynamic enp0s3
valid_lft 84997sec preferred_lft 84997sec
inet6 fd00::a00:27ff:fee0:87cc/64 scope global dynamic mngtmpaddr
valid_lft 86245sec preferred_lft 14245sec
inet6 fe80::a00:27ff:fee0:87cc/64 scope link
valid_lft forever preferred_lft forever
3: enp0s8: mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 08:00:27:9d:3a:10 brd ff:ff:ff:ff:ff:ff
inet 192.168.56.10/24 brd 192.168.56.255 scope global enp0s8 <---
valid_lft forever preferred_lft forever
inet6 fe80::a00:27ff:fe9d:3a10/64 scope link
valid_lft forever preferred_lft forever
```
### 从 Debian 到 Windows Server 的 ping
```
vboxuser@vbox:~$ ping -c 192.168.56.2
PING 192.168.56.2 (192.168.56.2) 56(84) bytes of data.
64 bytes from 192.168.56.2: icmp_seq=1 ttl=128 time=6.43 ms
64 bytes from 192.168.56.2: icmp_seq=2 ttl=128 time=1.18 ms
64 bytes from 192.168.56.2: icmp_seq=3 ttl=128 time=1.16 ms
--- 192.168.56.2 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 2003ms <---
rtt min/avg/max/mdev = 1.160/2.923/6.434/2.482 ms
```
### 从 Windows Server 到 Debian 的 ping
```
C:\Users\vboxuser>ping 192.168.56.10
Pinging 192.168.56.10 with 32 bytes of data:
Reply from 192.168.56.10: bytes=32 time=1ms TTL=64
Reply from 192.168.56.10: bytes=32 time=1ms TTL=64
Reply from 192.168.56.10: bytes=32 time=1ms TTL=64
Reply from 192.168.56.10: bytes=32 time=1ms TTL=64
Ping statistics for 192.168.56.10:
Packets: Sent = 4, Received = 4, Lost = 0 (0% loss), <---
Approximate round trip times in milli-seconds:
Minimum = 1ms, Maximum = 1ms, Average = 1ms
```
## Winlogbeat
Winlogbeat 是一个软件代理,用于收集并发送 Windows 事件日志到 **Redis**、**Logstash** 和 **Elasticsearch** 端点。
### Winlogbeat 功能
1. **事件日志监控**:实时读取来自 `Security`、`System`、`Application`、`ForwardedEvents` 等日志的事件。
2. **智能过滤**:仅收集特定的 `event_id`、提供程序或级别,减少噪音。
3. **日志发送**:将数据转发到 Redis、Logstash 和 Elasticsearch。
4. **ECS 支持**:根据 Elastic Common Schema 标准化数据。
5. **内置 Kibana 仪表板**:提供开箱即用的可视化功能。
## 目录层次结构:Winlogbeat
```
Winlogbeat/
├── winlogbeat.exe
├── winlogbeat.yml
├── winlogbeat.reference.yml
├── install-service-winlogbeat.ps1
├── uninstall-service-winlogbeat.ps1
├── fields.yml
├── LICENSE.txt, NOTICE.txt, README.md
├── .build_hash.txt
├── winlogbeat.yml_bk
├── data/
│ ├── winlogbeat.yml
│ └── meta.json
└── module/
├── powershell/
│ └── config/
│ └── winlogbeat-powershell.js
├── security/
│ ├── dashboards.yml
│ └── config/
│ └── winlogbeat-security.js
└── sysmon/
└── config/
└── winlogbeat-sysmon.js
```
## winlogbeat.yml
路径:```/Winlogbeat/data/winlogbeat.yml```
## 安装为 Windows 服务
```
cd C:\\Winlogbeat
.\install-service-winlogbeat.ps1
Start-Service winlogbeat
Set-Service -Name winlogbeat -StartupType Automatic
```
## 卸载
```
Stop-Service winlogbeat
.\uninstall-service-winlogbeat.ps1
```
## 调试与验证
- **本地日志**:`C:\\ProgramData\\winlogbeat\\Logs\\winlogbeat.log`
- **验证 Redis 输出**:
redis-cli -h 192.168.56.10 -p 6379
LRANGE winlogbeat 0 0
- **手动测试**:
.\winlogbeat.exe -c winlogbeat.yml -e -v
## Logstash
Logstash 是一个开源管道,用于实时管理、处理和转发来自不同来源的数据到一个或多个目标。
在本系统中,Logstash 接收来自 Winlogbeat 的 JSON 格式事件,对其进行处理,然后将数据输出到两个不同的 Redis 队列,实现流复制:
- 第一个队列用于 Elasticsearch 的数据摄取和分析;
- 第二个队列用于 immuDB 的历史化存储。
## 目录层次结构:Logstash
```
/etc/logstash/
├── conf.d/
│ ├── logstash.conf
│ └── logstash1.conf
├── jvm.options
├── log4j2.properties
├── logstash.yml
├── pipelines.yml
└── startup.options
```
## logstash.yml
路径:```/etc/logstash/logstash.yml```
Logstash 的主配置文件,用于定义内部数据的存档目录。
```
# ------------ 数据路径 ------------
# Logstash 及其插件使用的目录
#for any persistent needs. Defaults to LOGSTASH_HOME/data
# path.data: /var/lib/logstash
# ```
## logstash.conf
路径:```/etc/logstash/conf.d/logstash.conf```
为 Logstash 管道定义的配置文件,用于从 ``Winlogbeat``(通过 Beats 协议)接收并将其复制到两个不同的 Redis 队列。
```
# 输入:通过 Beats 协议从 Winlogbeat 接收数据,端口 5044
input {
beats {
port => 5044
}
}
filter {
# Inserire eventuali filtri o parsing (ad esempio, grok, mutate, ecc.)
}
# 输出:将处理后的数据发送到两个不同的 Redis 队列(复制)
output {
# 输出 (1):将数据发送到 Redis 队列 "redis-queue-elastic"
redis {
host => "192.168.56.10"
port => 6379
password => ""
key => "redis-queue-elastic"
data_type => "list"
db => 0
}
# output (2): manda gli stessi dati alla coda Redis "redis-queue-immudb"
redis {
host => "192.168.56.10"
port => 6379
password => ""
key => "redis-queue-immudb"
data_type => "list"
db => 0
}
}
```
## logstash1.conf
路径:```/etc/logstash/conf.d/logstash1.conf```
配置文件用于从 ```Redis``` 读取日志并发送到 ```Elasticsearch```。
```
input {
redis {
host => "192.168.56.10" # Indirizzo del server Redis dove leggere la coda
data_type => "list" # Tipo struttura dati usata in Redis: lista
port => 6379 # Porta del server Redis: 6379 è quella di default
key => "redis-queue-elastic" # Key Redis: nome lista Redis da cui Logstash legge i dati
password => "" # PSW key Redis
codec => json # Codec usato per decodificare i dati ricevuti da Redis: formato JSON, quindi Logstash li trasforma automaticamente in oggetti leggibili e filtrabili
}
}
filter {
#Qui è possibile inserire eventuali filtri per elaborare o arricchire i dati ricevuti prima di inviarli ad Elastic
}
output {
elasticsearch {
hosts => ["http://192.168.56.10:9200"] # Indirizzo del cluster Elasticsearch (modifica in base all'ambiente che si utilizza)
index => "from-redis-%{+YYYY.MM.dd}" # Nome dell'indice su Elasticsearch. Viene usata una data dinamica per indicizzazione giornaliera
# Autenticazione Elasticsearch e certificato ssl
user => ""
password => ""
ssl => true
cacert => "/etc/elasticsearch/certs/ca.crt"
}
stdout{
codec => rubydebug
}
}
```
## pipelines.yml
路径:```/etc/logstash/pipelines.yml```
Logstash 独立管道的定义
• ```main```:用于 immudb 的管道,
• ```elastic-pipeline```:用于 elasticsearch 的管道。
```
# 这是定义管道的地方。可以定义多个管道。
# 有关多管道的更多信息,请参阅文档:
# https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html
# immudb 管道
- pipeline.id: main
path.config: "/etc/logstash/conf.d/logstash.conf"
# elasticsearch 管道
- pipeline.id: elastic-pipeline
path.config: "/etc/logstash/conf.d/logstash1.conf"
```
## Redis
Redis 是一个开源的高性能内存键值存储,用作**分布式队列**系统,实现日志流的分离。凭借对列表(`LPUSH`、`RPUSH`、`LPOP`、`BRPOP`、`LLEN` 等)的原生支持,Redis 可以实现简单快速的 `FIFO` 队列,用于在多个组件之间异步管理日志、事件或消息。它是分布式架构中理想的临时缓冲区或轻量级代理。
Redis 配置文件:```/etc/redis/redis.conf```
```
/etc/redis/
├── redis.conf
└── redis.conf.save
```
## 访问和验证 Redis 队列
访问远程 Redis 服务器并列出可用的键,以验证两个队列的存在:
```
vboxuser@vbox:/$ redis-cli -h 192.168.56.10
192.168.56.10> auth inserisci_la_tua_password
OK
192.168.56.10> keys *
1) "redis-queue-immudb"
2) "redis-queue-elastic
```
验证日志是否已正确插入到两个 Redis 队列中:
```
192.168.56.10> LLEN redis-queue-immudb
(integer) 144
192.168.56.10> LLEN redis-queue-elastic
(integer) 144
```
## Immudb
日志归档通过 immuDB 处理,这是一个设计用于确保数据完整性的不可变数据库。
日志以键值结构保存,其中:
• **键**:日志的唯一标识符。
• **值**:日志本身的 JSON 内容。
此结构可以实现:
• 确保数据的完整性和不可篡改性;
• 通过日志键前缀进行快速搜索和检索。
### 使用的数据库
immuDB 中存在两个独立的数据库:
• **defaultdb**:用于测试的默认数据库
• **logs_immudb**:专用于审计日志
## 目录层次结构(immudb 配置文件)
```
/etc/immudb/
└── immudb.toml
```
## immudb.toml
路径:```/etc/immudb/immudb.toml```
immudb 服务的主配置文件。
```
# 端口、数据目录、认证
address = "0.0.0.0"
admin-password = ''
auth = true
certificate = ''
clientcas = ''
dbname = 'logs_immudb'
detached = 'false'
devmode = true
dir = '/var/lib/immudb'
network = 'tcp'
port 3322
# 带文件名的日志路径
logfile = 'immudb.log'
mtls = false
pidfile = '/var/lib/immudb/immudb.pid'
PKEY = ''
log-level = "DEBUG"
[database]
gc-enabled = 'true'
retention-period = "24h"
```
在 ``immudb.toml`` 配置文件中,指定了数据库运行的路径:``/var/lib/immudb`` 是主数据目录,包含:
• 配置和使用的数据库(``defaultdb`` 和 ``logs_immudb``)。
• 数据的不可变结构(Merkle 树、索引、事务日志)。
```
/var/lib/immudb
├── defaultdb
├── logs_immudb
├── immudb.identifier
├── immudb.pid
├── immulog
| └──immudb.log
└── systemdb
```
## 登录
```
vboxuser@vbox:~$ immuadmin login inerisci_tuo_username
Password: inserisci_la_tua_password
logged in
```
## 创建带有保留期的新数据库
```
vboxuser@vbox:~$ immuadmin database create nome_database --retention-period=24h
```
## 列出现有数据库
```
vboxuser@vbox:~$ immuadmin database list
2 database(s)
- -------------- ---------- ---------- ------ ---------- --------- ------------
# 数据库名称 创建时间 创建者 状态 是否副本 磁盘大小 交易数
- -------------- ---------- ---------- ------ ---------- --------- ------------
1 defaultdb 2025-06-18 systemdb LOADED FALSE 21.3 MB 6045
2 logs_immudb 2025-06-17 nome_utente LOADED FALSE 3.9 MB 5371
- -------------- ---------- ---------- ------ ---------- --------- ------------
```
## queue_consumer.py
路径:``/var/consumer-immudb/queue_consumer.py``
## 描述
`queue_consumer.py` 脚本消费 Redis 队列 `redis-queue-immudb`,提取 JSON 格式的日志并通过键值(KV)模式将其插入 immudb 数据库。
## 工作原理
• 日志通过 `BLPOP` 命令以阻塞模式从 Redis 队列中消费。
• 每个提取的日志都是 JSON 字符串,被反序列化为数据对象。
• JSON 内容经过键排序重组以确保序列化的一致性。
• 计算日志的 `SHA-256` 哈希值,用作插入数据库的`键`。
• 键通过连接前缀、当前时间戳和计算的`哈希值`构建。
• 日志以键值(KV)模式存储在 immudb 数据库中,键和值都编码为字节。
## 主要函数
### SHA-256 哈希计算
```
def hash_key(data: str) -> str:
return hashlib.sha256(data.encode()).hexdigest()
```
接收字符串 `data` 作为输入,将其编码为字节并计算 SHA-256 哈希。返回十六进制格式的哈希值,用于生成唯一键。
### 从 Redis 阻塞读取和 JSON 反序列化
```
item = r.blpop(REDIS_QUEUE_NAME, timeout=5)
if item:
_, raw_log = item
try:
log_data = json.loads(raw_log)
except json.JSONDecodeError:
logging.warning(f"Log non valido JSON: {raw_log}")
continue
```
`blpop` 命令以阻塞模式从 Redis 队列中提取元素,超时时间为 5 秒。如果收到元素,获取内容(`raw_log`)并尝试将其反序列化为 `log_data` 对象。如果 JSON 无效,则忽略并继续。
### 有序序列化和键生成
```
log_str = json.dumps(log_data, sort_keys=True)
ts = int(time.time())
key = f"log:{ts}:{hash_key(log_str)}"
```
日志以键排序的 JSON 字符串序列化(`sort_keys=True`)以确保一致性。获取当前时间戳(秒),并通过连接前缀(`log:`)、`时间戳`和序列化日志的 `SHA-256` 哈希值来构建唯一键。
### 以 KV 模式插入 immudb 数据库
```
immu.set(key.encode(), log_str.encode())
```
键值对使用 `set` 方法插入 immudb 数据库。键和值都按要求编码为字节。
## 调试
### 插入 immudb
```
Jul 15 08:55:50 vbox python[2449]: 2025-07-14 08:55:50,220 - INFO - [KV] Log inserito in immudb con chiave: log:1752735391:fccfd23ab0d1853197e28b5cb3378d848df207f1f0255a2c0336ddf296c023cc
```
### 获取键
```
immuclient>get log:1752735391:fccfd23ab0d1853197e28b5cb3378d848df207f1f0255a2c0336ddf296c023cc
tx: 4871
rev: 1
key: log:1752735391:fccfd23ab0d1853197e28b5cb3378d848df207f1f0255a2c0336ddf296c023cc
value: {"@timestamp": "2025-07-15T08:07:13.681Z", "@version": "1", "agent": {"ephemeral_id": "60973edc-97e2-4e08-9d4d-b8e8b7d89d60", "hostname": "WIN-S", "id": "c156a342-40dc-47ca-977a-f100ebd8e89f",
"name": "WIN-S", "type": "winlogbeat", "version": "7.17.7"}, "ecs": {"version": "1.12.0"}, "event": {"action": "None", "code": "7036", "created": "2025-07-17T06:56:26.240Z", "kind": "event", "provider":
"Service Control Manager"}, "host": {"name": "xxxxx"}, "log": {"file": {"path": "C:\\Users\\vboxuser\\Desktop\\System_log_DC33.evtx"}, "level": "information"}, "message": "The Windows Modules
Installer service entered the running state.", "tags": ["beats_input_codec_plain_applied"], "winlog": {"activity_id": "{811a6918-1b41-4753-815e-979e5d5b2bc7}", "api": "wineventlog", "channel": "System",
"computer_name": "XXX.YYY.lan", "event_data": {"Binary": "540072007500730074006500640049006E007300740061006C006C00650072002F0034000000", "param1": "Windows Modules Installer", "param2": "running"},
"event_id": "7036", "keywords": ["Classic"], "opcode": "Info", "process": {"pid": 672, "thread": {"id": 10596}}, "provider_guid": "{555908d1-a6d7-4695-8e1e-26931d2012f4}", "provider_name": "Service Control
Manager", "record_id": 795414, "task": "None"}}
```
## Redis 验证:队列消费
```
vboxuser@vbox:/$ redis-cli -h 192.168.56.10
192.168.56.10> auth inserisci_la_tua_password
OK
192.168.56.10> keys *
1) "redis-queue-elastic"
192.168.56.10> LLEN "redis-queue-immudb"
(integer) 0
```
队列已正确消费,日志已保存到 immuDB。
## 保留期
已在数据库 `logs_immudb` 上配置了 24 小时的`保留期`,以自动从值日志中删除旧数据,减少磁盘空间占用,确保日志的高效和安全管理。配置是在创建数据库时通过以下命令完成的:
```
immuadmin database create logs_immudb --retention-period=24h
```
要验证 `logs_immudb` 数据库中 `logs` 表的`保留期`是否正常工作,可以对之前插入的键执行 `get` 操作。如果该键因保留期到期而被 immudb 自动删除,命令将返回不存在的消息。
```
immuclient get log:1752735391:fccfd23ab0d1853197e28b5cb3378d848df207f1f0255a2c0336ddf296c023cc
immuclient tbtree: key not found
```
这确认该键已不存在,因为它已根据配置的保留期被系统自动删除。
## Elasticsearch 中 from-redis-* 索引的管理
在 Elasticsearch 中,日志数据通过 Logstash 从 Redis 收集。为了更好地处理大量且持续的日志数据,数据按收集日期组织到独立的 Elasticsearch 索引中。
每个索引都是一个隔离的"容器",仅包含在特定日期生成的文档,命名遵循 `from-redis-YYYY.MM.DD` 模式。
此方法可以实现:
• 数据的**时间组织**,便于在特定时间范围内进行搜索和分析。
• **更好的性能**,因为索引更小且更容易管理。
• **简化的保留期管理**,可以删除整个过时索引。
• **水平扩展**,避免单个索引随时间过度增长。
• **清晰的命名**,标识数据来源(Redis),便于与分析、自动化和可视化工具集成。
### 每日索引 `from-redis-YYYY.MM.DD`
从 Redis 收集的日志通过每日索引在 Elasticsearch 中索引,名称格式为 `from-redis-2025.07.11`、`from-redis-2025.07.10` 等。
这意味着每天都会创建一个新索引,包含当天所有的日志。
可以通过删除整个索引来清除历史数据,而无需逐个删除文档。
这种方法确保了对数据保留期的精细控制。
命令:
```
curl -u user:password --cacert /etc/elasticsearch/certs/ca.crt -X GET "https://192.168.56.10:9200/_cat/indices?v"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open from-redis-2025.07.01 ISqshOdJQTynHlVYOAB8xw 1 1 9451 0 4.4mb 4.4mb
yellow open from-redis-2025.07.02 LQUa1aatTnS2RpjMZnzqKA 1 1 49649 0 24.1mb 24.1mb
yellow open from-redis-2025.07.03 U1PF-URXSdeD98aso_3bGw 1 1 719 0 1000.5kb 1000.5kb
yellow open from-redis-2025.07.10 3pJqrCOpTtaCj91inCiTow 1 1 66863 0 32.9mb 32.9mb
yellow open from-redis-2025.07.11 SuODhIU2TPiLxHH2Rmd1nA 1 1 12992 0 7.1mb 7.1mb
```
可以列出所有索引。
### 自动轮换和删除
为了管理存储并自动删除过时索引,实现了一个 Bash 脚本,删除所有超过 72 小时的 `from-redis-*` 索引。脚本计算截止日期,将其与索引名称中的日期进行比较,并通过 HTTPS 认证使用 Elasticsearch 的 `REST API` 进行删除。
脚本路径 `/usr/local/bin/delete_old_from_redis_indices.sh`
```
#!/bin/bash
ES_HOST="https://192.168.56.10:9200"
ES_USER="user"
ES_PASS="password"
CUTOFF_DATE=$(date -d '3 days ago' +%Y-%m-%d)
echo "Rimuovo gli indici from-redis-* più vecchi di $CUTOFF_DATE"
INDICES=$(curl -s -u $ES_USER:$ES_PASS -k "$ES_HOST/_cat/indices/from-redis-*?h=index" | sort)
for INDEX in $INDICES; do
#Conversione formato data da YYYY.MM.DD a YYYY-MM-DD
IDX_DATE=$(echo $INDEX | sed -E 's/from-redis-([0-9]{4})\.([0-9]{2})\.([0-9]{2})/\1-\2-\3/')
IDX_TS=$(date -d "$IDX_DATE" +%s 2>/dev/null)
CUTOFF_TS=$(date -d "$CUTOFF_DATE" +%s)
if [ -z "$IDX_TS" ]; then
echo "Formato data non valido per indice $INDEX, salto."
continue
fi
if [ $IDX_TS -lt $CUTOFF_TS ]; then
echo "Elimino indice $INDEX (data: $IDX_DATE)"
curl -u $ES_USER:$ES_PASS -X DELETE "$ES_HOST/$INDEX" -k
else
echo "Mantengo indice $INDEX (data: $IDX_DATE)"
fi
done
```
### 测试
已执行 24 小时保留期的测试,因此在本例中,早于 2025-07-10 的索引将被删除:
```
root@vbox:~# sudo /usr/local/bin/delete_old_from_redis_indices.sh
Rimuovo gli indici from-redis-* più vecchi di 2025-07-10
{"acknowledged":true}Elimino indice from-redis-2025.07.01 (data: 2025-07-01)
{"acknowledged":true}Elimino indice from-redis-2025.07.02 (data: 2025-07-02)
{"acknowledged":true}Elimino indice from-redis-2025.07.03 (data: 2025-07-03)
{"acknowledged":true}Elimino indice from-redis-2025.07.10 (data: 2025-07-10)
{"acknowledged":false}Mantengo indice from-redis-2025.07.11 (data: 2025-07-11)
```
可以看到,索引 `from-redis-2025.07.11` 未被删除,因为它是当前索引,不在截止日期(2025-07.10)之前的索引范围内。
可以通过列出删除后所有可用的索引进行进一步验证:
```
root@vbox:~# curl -u username:password -k "https://192.168.56.10:9200/_cat/indices?v&s=index"
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open from-redis-2025.07.11 SuODhIU2TPiLxHH2Rmd1nA 1 1 12992 0 7.1mb 7.1mb
```
从输出可以看到,索引已正确删除,除了 `from-redis-2025.07.11`,因为它不符合删除条件(属于当前日期)而被保留。
## 使用 Elasticsearch 和 Kibana 进行日志分析和图形化用户体验
## Elasticsearch
### 目录层次结构(Elasticsearch 配置文件)
```
/etc/elasticsearch/
├── certs/
│ ├── ca.crt # Certificato pubblico della Certificate Authority (CA) usata per firmare gli altri certificati.
│ ├── ca.key # Chiave privata della CA (va tenuta segreta).
│ ├── ca.srl # Seriale CA, tiene traccia dei certificati già emessi.
│ ├── elasticsearch.crt # Certificato pubblico di Elasticsearch, firmato dalla CA.
│ ├── elasticsearch.csr # Richiesta di firma del certificato per Elasticsearch.
│ ├── elasticsearch.key # Chiave privata di Elasticsearch, usata per TLS.
│ ├── kibana.crt # Certificato pubblico di Kibana, firmato dalla CA.
│ ├── kibana.csr # Richiesta di firma del certificato per Kibana.
│ └── kibana.key # Chiave privata di Kibana (usata da Kibana, ma conservata qui).
|
├── elasticsearch.keystore # File keystore sicuro con segreti (es. password, token).
├── elasticsearch-plugins.example.yml
├── elasticsearch.yml # File principale di configurazione di Elasticsearch.
├── jvm.options # Opzioni JVM (heap size, GC, ecc.).
├── jvm.options.d/ # Directory per opzioni JVM aggiuntive.
├── log4j2.properties # Configurazione logging di Elasticsearch.
├── role_mapping.yml # Mappatura ruoli utenti.
├── roles.yml # Definizione dei ruoli RBAC.
├── users # File contenente gli utenti locali (realm `file`).
└── users_roles # Associazione tra utenti e ruoli.
```
Elasticsearch 是一个分布式搜索和分析引擎,设计用于存储大量数据并实现非常快速和灵活的搜索。在这种情况下,Elasticsearch 收集和索引日志,以便通过 Kibana 进行深入分析和实时可视化。来自其他组件的数据以 JSON 格式到达,被索引以便可以快速灵活地查询,例如按 event.code、host.name、@timestamp 和其他字段查询。
如前所述,Redis 队列 ``redis-queue-elastic`` 中的日志由 Logstash 消费,Logstash 对其进行处理并将其发送到 Elasticsearch 进行日志的存储和搜索。
## Kibana
## 目录层次结构(Kibana 配置文件)
```
/etc/kibana/
├── certs/
│ ├── ca.crt # Certificato della CA usato da Kibana per validare Elasticsearch.
│ ├── kibana.crt # Certificato pubblico usato da Kibana per TLS.
│ └── kibana.key # Chiave privata associata al certificato di Kibana.
|
├── kibana.keystore # File keystore per password e token sensibili.
├── kibana.yml # File principale di configurazione di Kibana.
└── node.options # Opzioni del nodo Kibana (es. parametri Node.js).
```
Kibana 是 Elasticsearch 的图形界面。允许通过仪表板、图表和交互式工具(如 Discover、Visualize、Dashboard、Alerting)可视化和分析存储在 Elasticsearch 中的数据。