awelzel/zeekpy
GitHub: awelzel/zeekpy
zeekpy是一个用于通过Zeek的WebSocket API进行事件消费和发布的Python库。
Stars: 0 | Forks: 0
# zeekpy
一个简单的纯Python和异步无库,用于通过Zeek的WebSocket API消费和发布Zeek事件,利用Python类型注解进行转换。
## 使用方法
您构建一个zeekpy.Zeek对象,将其传递给要连接的Zeek集群的WebSocket URI以及可选的订阅主题。
WebSocket连接和Zeek握手在进入Zeek对象的上下文管理器时完成:
```
zeek = Zeek("ws://127.0.0.1:27759/v1/messages/json", topics=["/test/"])
with zeek:
# Now connected to Zeek and subscribed to /test/test
...
```
为了处理事件,您实现具有适当类型注解的处理函数。
您使用zeekpy模块中列出的EventArg联合体中的类型。
例如,为了为接收主题字符串、pubsub_id(计数)和规则记录向量的NetControl::pubsub_add_rules事件注册处理函数,Python代码如下:
```
import dataclasses
from zeekpy import Zeek, addr, count
@dataclasses.dataclass
class Rule:
a: addr
c: count
comment: None | str = None
zeek = Zeek("ws://127.0.0.1:27759/v1/messages/json", topics=["/test/"])
@zeek.on("NetControl::pubsub_add_rules")
def handle_pubsub_add_rules(topic: str, pubsub_id: count, rules: list[Rule]):
print(topic, pubsub_id, rules)
with zeek:
zeek.consume()
```
## 发布事件
使用Zeek.publish()将Zeek事件发布到主题。如果事件参数是EventArg中类型的一种,则将其正确序列化(例如计数、枚举或端口)。
如果参数是字典并且只有"@data-type"和"data"键,则它将直接用于JSON有效负载。要将Python int作为Zeek计数发布,您需要将int包装在count实例中:count(42)。否则,42将被编码为整数。这与ZeekJS BigInt的情况类似。同样,对于枚举,库将枚举视为str。要发布字符串作为枚举,请包装它:enum("NetControl::DROP")。
```
# Zeek 事件声明:
# 全局 ev: 事件(c: count);
from zeekpy import Zeek, count
with Zeek("ws://127.0.0.1:27759/v1/messages/json") as zeek:
zeek.publish("/the/topic/", "ev", [count(42)])
```
在事件处理程序中发布是完全可以的,例如,回复每个ping事件以pong事件如下:
```
from zeekpy import Zeek, count
zeek = Zeek("ws://127.0.0.1:27759/v1/messages/json", topics=["/pings/"])
@zeek.on("ping")
def handle_ping(c: count):
print("got ping, sending pong", c)
zeek.publish("/pongs/", "pong", [c])
with zeek:
zeek.consume()
```
事件处理程序由在consume()中阻塞的线程执行,因此publish()是在该线程的上下文中完成的。您也可以从不同的线程调用zeek.publish()。
## 类型转换
如果事件处理程序的参数没有类型注解,则处理函数接收从解析的WebSocket JSON有效负载中得到的相应字典。您也可以使用RawArg注解此类参数以使此行为明确。
当您使用addr注解参数时,处理函数将接收由ipaddress.ip_address()生成的ipaddress.IPv4Address或ipaddress.IPv6Address。对于子网也是如此。当使用port注解参数时,结果将是一个具有两个字段的对象:port(int)和proto(str)。
对于记录,使用dataclasses.dataclass和types。实现知道如何根据列出的字段实例化和填充实例。对于&optional,使用typing.Optional或类型 | None表示法。以下是一个包含具有可选字段的记录向量的相当复杂示例:
```
# Zeek 类型与事件声明:
# 类型 R: {
# c: count;
# a: addr;
# oa: addr &optional;
# f: double &optional;
# }
# # 全局 ev: 事件(rvec: vector of R);
from zeekpy import Zeek, addr, count
# Python 类型声明 R。
@dataclasses.dataclass
class R:
c: count
a: addr
oa: addr | None = None
f: float | None = None
# 用法
zeek = Zeek(...)
@zeek.on("ev")
def ev(rvec: list[R]):
pass
with zeek:
zeek.consume()
```
## 关于类型的说明
如果您查看EventArg联合体中列出的类型,您会发现有原生Zeek和原生Python类型的混合。这是故意的。粗略的规则是,当存在直接映射时(bool、datetime、list、float、str、dataclasses)使用原生Python类型。否则,当没有直接映射(addr、subnet)时,它只是一个标记的Python类型(count、enum),或者实际上是复合类型的port类型。
## 集合和表
集合和表尚未实现。作者认为不使用它们进行远程事件是个好主意。使用RawArg注解它们并自行从字典转换。Zeek对复合键的支持使得这很繁琐,并且用例并不明确。技术上允许使用元组作为复合键可能是有意义的,例如,set[tuple[int, str]]。
## 说明
上下文管理器、consume()和stop()方法可能有点笨拙和有风险,如果您有更好的想法,请随意修复它或分叉一个使用async的版本。没有计划支持async。
标签:API 接口, Python 库, Rootkit, WebSocket, Zeek, 事件发布, 事件处理, 事件订阅, 云计算, 依赖分析, 异步编程, 数据转换, 类型注解, 规则引擎, 逆向工具