awelzel/zeekpy

GitHub: awelzel/zeekpy

zeekpy是一个用于通过Zeek的WebSocket API进行事件消费和发布的Python库。

Stars: 0 | Forks: 0

# zeekpy 一个简单的纯Python和异步无库,用于通过Zeek的WebSocket API消费和发布Zeek事件,利用Python类型注解进行转换。 ## 使用方法 您构建一个zeekpy.Zeek对象,将其传递给要连接的Zeek集群的WebSocket URI以及可选的订阅主题。 WebSocket连接和Zeek握手在进入Zeek对象的上下文管理器时完成: ``` zeek = Zeek("ws://127.0.0.1:27759/v1/messages/json", topics=["/test/"]) with zeek: # Now connected to Zeek and subscribed to /test/test ... ``` 为了处理事件,您实现具有适当类型注解的处理函数。 您使用zeekpy模块中列出的EventArg联合体中的类型。 例如,为了为接收主题字符串、pubsub_id(计数)和规则记录向量的NetControl::pubsub_add_rules事件注册处理函数,Python代码如下: ``` import dataclasses from zeekpy import Zeek, addr, count @dataclasses.dataclass class Rule: a: addr c: count comment: None | str = None zeek = Zeek("ws://127.0.0.1:27759/v1/messages/json", topics=["/test/"]) @zeek.on("NetControl::pubsub_add_rules") def handle_pubsub_add_rules(topic: str, pubsub_id: count, rules: list[Rule]): print(topic, pubsub_id, rules) with zeek: zeek.consume() ``` ## 发布事件 使用Zeek.publish()将Zeek事件发布到主题。如果事件参数是EventArg中类型的一种,则将其正确序列化(例如计数、枚举或端口)。 如果参数是字典并且只有"@data-type"和"data"键,则它将直接用于JSON有效负载。要将Python int作为Zeek计数发布,您需要将int包装在count实例中:count(42)。否则,42将被编码为整数。这与ZeekJS BigInt的情况类似。同样,对于枚举,库将枚举视为str。要发布字符串作为枚举,请包装它:enum("NetControl::DROP")。 ``` # Zeek 事件声明: # 全局 ev: 事件(c: count); from zeekpy import Zeek, count with Zeek("ws://127.0.0.1:27759/v1/messages/json") as zeek: zeek.publish("/the/topic/", "ev", [count(42)]) ``` 在事件处理程序中发布是完全可以的,例如,回复每个ping事件以pong事件如下: ``` from zeekpy import Zeek, count zeek = Zeek("ws://127.0.0.1:27759/v1/messages/json", topics=["/pings/"]) @zeek.on("ping") def handle_ping(c: count): print("got ping, sending pong", c) zeek.publish("/pongs/", "pong", [c]) with zeek: zeek.consume() ``` 事件处理程序由在consume()中阻塞的线程执行,因此publish()是在该线程的上下文中完成的。您也可以从不同的线程调用zeek.publish()。 ## 类型转换 如果事件处理程序的参数没有类型注解,则处理函数接收从解析的WebSocket JSON有效负载中得到的相应字典。您也可以使用RawArg注解此类参数以使此行为明确。 当您使用addr注解参数时,处理函数将接收由ipaddress.ip_address()生成的ipaddress.IPv4Address或ipaddress.IPv6Address。对于子网也是如此。当使用port注解参数时,结果将是一个具有两个字段的对象:port(int)和proto(str)。 对于记录,使用dataclasses.dataclass和types。实现知道如何根据列出的字段实例化和填充实例。对于&optional,使用typing.Optional或类型 | None表示法。以下是一个包含具有可选字段的记录向量的相当复杂示例: ``` # Zeek 类型与事件声明: # 类型 R: { # c: count; # a: addr; # oa: addr &optional; # f: double &optional; # } # # 全局 ev: 事件(rvec: vector of R); from zeekpy import Zeek, addr, count # Python 类型声明 R。 @dataclasses.dataclass class R: c: count a: addr oa: addr | None = None f: float | None = None # 用法 zeek = Zeek(...) @zeek.on("ev") def ev(rvec: list[R]): pass with zeek: zeek.consume() ``` ## 关于类型的说明 如果您查看EventArg联合体中列出的类型,您会发现有原生Zeek和原生Python类型的混合。这是故意的。粗略的规则是,当存在直接映射时(bool、datetime、list、float、str、dataclasses)使用原生Python类型。否则,当没有直接映射(addr、subnet)时,它只是一个标记的Python类型(count、enum),或者实际上是复合类型的port类型。 ## 集合和表 集合和表尚未实现。作者认为不使用它们进行远程事件是个好主意。使用RawArg注解它们并自行从字典转换。Zeek对复合键的支持使得这很繁琐,并且用例并不明确。技术上允许使用元组作为复合键可能是有意义的,例如,set[tuple[int, str]]。 ## 说明 上下文管理器、consume()和stop()方法可能有点笨拙和有风险,如果您有更好的想法,请随意修复它或分叉一个使用async的版本。没有计划支持async。
标签:API 接口, Python 库, Rootkit, WebSocket, Zeek, 事件发布, 事件处理, 事件订阅, 云计算, 依赖分析, 异步编程, 数据转换, 类型注解, 规则引擎, 逆向工具