ipal-ids/ipal_transcriber
GitHub: ipal-ids/ipal_transcriber
工业协议流量转录器,实现协议无关的入侵检测。
Stars: 22 | Forks: 17
# IPAL - 转录器
本存储库是 [IPAL](https://github.com/ipal-ids/ipal) 的一部分 - 工业协议抽象层。IPAL 的目标是建立工业网络流量的抽象表示,以便进行后续的统一和协议无关的工业入侵检测。IPAL 由一个 [转录器](https://github.com/ipal-ids/ipal_transcriber) 组成,用于自动将工业流量转换为 IPAL 表示,一个实现各种工业入侵检测系统 (IIDSs) 的 [IDS 框架](https://github.com/ipal-ids/ipal_ids_framework),以及一系列评估 [数据集](https://github.com/ipal-ids/ipal_datasets)。有关 IPAL 的详细信息,请参阅以下列出的出版物。
随着网络物理系统日益受到复杂攻击者的威胁,他们也在攻击系统的物理方面。补充保护措施,工业入侵检测系统承诺检测此类攻击。然而,由于工业协议多样性和缺乏标准接口,需要付出巨大努力才能将这些技术适应大量不同的协议。为了解决这个问题,我们提出了工业协议抽象层 (IPAL) - 作为工业入侵检测系统输入的工业通信的共同表示。
此软件 (`ipal-transcriber`) 实现了将工业网络流量自动转换为 IPAL 的功能,适用于各种工业协议。如图所示,转录器读取实时网络捕获或 pcap 文件,并将它们转换为 IPAL 表示。
###### 概述图

###### 实现的协议
| 实现的协议 | 状态 | 支持的消息类型 |
|-----------------------|-------------|-------------------------------------------------------------------------------------------------------------------------------|
| CIP | 初级 | 代码:76,77 |
| Goose | 中等 | |
| IEC 60870-5-104 | 良好 | U\_Format
I\_Format:1-21,30-40,45-51,58-64,70,100-106 | | IEC 61162-450 | 中等 | UdPbC,无标签 | | Modbus TCP | 中等 | 功能代码:1,2,3,4,5,6,8,15,16,43 | | MQTT | 初级 | Basic MQTT 3.1 | | NMEA0183 | 良好 | DBT,DPT,GGA,GLL,GNS,GSA,GSV,HDM,HDT,RMC,ROT,RPM,TLL,TTM,VBW,VHW,VLW,VTG,ZDA,RMB,APB,RSA,DTM,Q,AIVDM | | S7 | 初级 | 任务:1,3
功能代码:4,5 | | DNP3 | 初级 | 功能代码:0-2,7,8,13,14,20,24,129,130
对象(组 ID:var):1:2,2:1,20:{0,2},50:3,52:2,60:{1-4},80:1 | | EtherCAT | 初级 | TODO:添加匹配数据 | | MavLink 2.0 | 初级 | 依赖于解析的消息定义 | | Navico BR24 | 中等 | REP,REG,IMG | | CAN Bus | 初级 | 依赖于帧结构 | | Publications | | | - Konrad Wolsing, Eric Wagner, Antoine Saillard, and Martin Henze. 2022. IPAL: Breaking up Silos of Protocol-dependent and Domain-specific Industrial Intrusion Detection Systems. In 25th International Symposium on Research in Attacks, Intrusions and Defenses (RAID 2022), October 26–28, 2022, Limassol, Cyprus. ACM, New York, NY, USA, 17 pages. [https://doi.org/10.1145/3545948.3545968 ](https://doi.org/10.1145/3545948.3545968) | - Wolsing, Konrad, Eric Wagner, and Martin Henze. "Poster: Facilitating Protocol-independent Industrial Intrusion Detection Systems." *Proceedings of the 2020 ACM SIGSAC Conference on Computer and Communications Security*. 2020 [https://doi.org/10.1145/3372297.3420019](https://doi.org/10.1145/3372297.3420019) ## 入门 如果您是 IPAL 的新用户,想了解一般想法或尝试我们的教程,请参阅 IPAL 的主存储库:[https://github.com/ipal-ids/ipal](https://github.com/ipal-ids/ipal). ###### 先决条件 `ipal-trascriber` 需要 `tshark` 已安装。有关您操作系统的安装说明,请参阅 [https://tshark.dev/setup/install/](https://tshark.dev/setup/install/). ###### 安装(pip) 使用 `python3 -m pip install .` 在系统范围内使用 `pip` [python 包安装程序](https://pip.pypa.io/en/stable/installation/) 安装脚本和依赖项。这将安装依赖项并将 `transcriber` 模块安装到本地站点包中,并将 `ipal-transcriber`、`ipal-state-extractor`、`ipal-minimize` 和 `ipal-join` 脚本添加到 `PATH`。然后可以在系统范围内调用这些脚本(例如 `ipal-transcriber -h`)。 ###### 安装(venv) 使用 `misc/install.sh` 或手动使用以下方法安装: ``` python3 -m venv venv source venv/bin/activate python3 -m pip install -r requirements.txt ``` 然后可以从项目存储库的根目录激活虚拟环境后调用这些脚本,例如: ``` source venv/bin/activate ./ipal-transcriber -h deactivate ``` ###### 安装(docker) 使用 `docker build -t ipal-ids-transcriber:latest .` 构建一个包含项目和使用开发依赖项的 `pip` 安装的 Docker 镜像。然后可以使用构建的镜像在容器中使用这些脚本,例如: ``` docker run -it ipal-ids-transcriber:latest /bin/bash ipal-transcriber -h ``` #### 使用转录器 `ipal-transcriber` 可以用于网络接口(```--interface```)或流量捕获(```--pcap```)。将 ```--ipal.output``` 设置为文件以将输出写入,或使用 '-' 写入 *stdout*。如果文件名以 '.gz' 结尾,则自动压缩。 可选的规则文件(```--rules```)允许通过重命名、删除或修改进程名称和值来修改输出。对于某些协议,例如 Modbus,需要缓存请求包以正确解析响应。```--timeout``` 定义了包缓存的上限时间。每个消息都可以标记为恶意或良性,这可以用于后续入侵检测方法的评估。默认值取自 ```--malicious.default```。使用 ```--malicious``` 提供包含特定于包或时间间隔标签的文件。 ``` ./ipal-transcriber -h usage: ipal-transcriber [-h] [--interface INTERFACE] [--pcap FILE] [--protocols STR [STR ...]] [--rules FILE] [--timeout INT] [--malicious FILE] [--malicious.default BOOL] [--crc STR] [--ipal.output FILE] [--log STR] [--logfile FILE] [--compresslevel INT] [--version] [--state.output FILE] [--filter LIST] [--complete-only BOOL] [--state-in-message BOOL] {default,timeslice} ... options: -h, --help show this help message and exit --interface INTERFACE traffic input interface (Use either this or --pcap) --pcap FILE path to pcap file (Use either this or --interface) --protocols STR [STR ...] specify a subset of the available transcribers ['cip', 'dnp3', 'goose', 'iec104', 'iec450', 'modbus', 'mqtt', 'nmea0183udp', 's7', 'ethercat']. (Default: all) --rules FILE file containing rules to transform transcribed messages. --timeout INT number of milliseconds a packet can be responded to. Used for response matching (Default: 250ms) --malicious FILE Attack json file for labeling the packets according to the attacks in a dataset. --malicious.default BOOL set this option to 'true' or 'false' to define default malicious annotation. (Default: None). Can be used in combination with --malicious --crc STR options for CRC calculations are at 'transport', 'application', combined with 'or', or 'and'. (Default: and) --ipal.output FILE output location for ipal messages ('-' stdout, '*.gz' compress). --log STR define logging level as one of DEBUG, INFO, WARNING, ERROR, or CRITICAL. (Default: WARNING) --logfile FILE define file to log to. (Default: stderr) --compresslevel INT set the gzip compress level. 0 no compress, 1 fast/large, ..., 9 slow/tiny. (Default: 6) --version show program's version number and exit --state.output FILE output location for state information. ('-' stdout, '*.gz' compress) --filter LIST semicolon separated list of state names to filter for. (Default: no filter) --complete-only BOOL output complete states after filtering only. (Default: True) --state-in-message BOOL embed state inside the messages. (Default: False) State Extractors: {default,timeslice} These are available state extractor methods. Use -h for further options on each method. default Simple last value buffer of all variables timeslice Outputs complete state in regular time slices. ``` ###### 消息格式 转录器解析每个工业协议数据包,并将每个数据包写入一个 JSON 行到输出。'id' 对每个消息是唯一的。源 ('src') 和目标 ('dest') 是由 ":" 分隔的不同地址级别的字符串。例如,Modbus 可以在理论上通过一个连接地址子设备,IP:端口:设备。活动是以下之一: - 询问:主动请求数据 - 通知:对请求的数据或未请求的消息的响应 - 命令:设置新值或命令动作 - 行动:对命令或(未请求的)执行动作的响应 - 确认:仅作为先前消息的 Layer-5 确认的包 - 不限于命令或询问 字段 'data' 包含一个字典,其中包含所有传输的工业过程值名称和值。如果请求此值,则值设置为 'null'。 ``` { "id": 0, "timestamp": 1445465436.995232, "protocol": "s7", "malicious": null, "src": "10.10.10.20:49156", "dest": "10.10.10.10:102", "length": 82, "crc": true, "type": 1, "activity": "interrogate", "responds to": [], "data": { "16": null, "17": null, "18": null, "19": null, "20": null } } ``` ###### 规则 规则应在通过 `--rules` 传递给转录器的文件中指定。规则允许重命名、删除或修改进程名称和值。更具体地说,它们允许添加和删除与 IPAL 消息匹配的某些数据字段,并重命名消息源和目标字段。 规则文件是一个包含一个名为 `JS` 的变量的 python 模块,该变量指向一个字典,描述所需的后期处理步骤。 规则文件可以可选地声明包含后期处理逻辑的方法。一个示例规则文件可以在 `misc/rules/nmea.py` 下找到,以下是其摘录: ``` def position_sign(vars): if vars[1] in ["N", "E"]: return +vars[0] elif vars[1] in ["S", "W"]: return -vars[0] JS = { "protocols": ["nmea0183udp", "iec450"], "rules": [ { # Position North-South "type": "RMC", "var": ["RMC2", "RMC3"], "method": position_sign, "name": "latitude", "remove": True, }, ], "rename": { ".*:GG": "GNSS", }, } ``` `JS` 可以包含三个键值对: - `protocols`:包含应应用转换规则和重命名操作的协议包的列表(必需) - `rules`:包含描述转换规则的字典列表(可选)。每个字典可以包含以下键: - `var`:包含应用规则的 data 字段列表(必需) - `type`、`src`、`dst`:分别匹配消息类型、源和目标字段的正则表达式(所有可选,省略时默认匹配任何值)。 - `method`:应用于指定在 `var` 中的字段的函数,需要 `name` 也存在(可选) - `name`:新数据字段的新名称,它将包含 `method` 的结果,需要 `method` 也存在(可选) - `flatten`:如果 `flatten` 设置为 true,我们期望 `method` 返回一个字典。然后 `name` 是可选的,我们将 `method` 返回的字典复制到状态中。(可选) - `remove`:是否删除指定在 `var` 中的字段(可选,省略时默认为 false) - `rename`:描述重命名规则的字典键值对(可选)。每个字典键值对应采用以下形式: - `key`:匹配消息源或目标的正则表达式 - `value`:应用于匹配的字符串的新值 在上面的示例中,指定的规则应用于 `type` 为 `RMC` 的消息:从消息中删除数据字段 `RMC2` 和 `RMC3`,并添加一个新字段 `latitude`,其中包含 `position_sign([x,y])` 的返回值,其中 `x` 和 `y` 是字段 `RMC2` 和 `RMC3` 的值。`rename` 键值对 `".*:GG": "GNSS"` 指定应将包含 `:GG` 的所有源和目标字段更新为 `GNSS`。 #### 使用状态提取器 `ipal-state-extractor` 将按包的消息格式转换为许多基于进程的 IDS 使用的状态格式。它可以作为独立程序使用,也可以直接在转录器工具中使用,通过传递相同的参数。状态写入由 ```--state.output``` 提供的文件或到 stdout。从数据包中导出状态的方法有很多。每个状态提取器都有自己的选项,可以通过 ```./state_extractor.py [extractor method] -h``` 获取。 ``` ./ipal-state-extractor -h usage: ipal-state-extractor [-h] [--ipal.input FILE] [--state.output FILE] [--filter LIST] [--complete-only BOOL] [--state-in-message BOOL] [--compresslevel INT] [--log STR] [--logfile FILE] [--version] {default,timeslice} ... options: -h, --help show this help message and exit --ipal.input FILE input location for message information. ('-' stdin, '*gz' compressed) --state.output FILE output location for state information. ('-' stdout, '*.gz' compress) --filter LIST semicolon separated list of state names to filter for. (Default: no filter) --complete-only BOOL output complete states after filtering only. (Default: True) --state-in-message BOOL embed state inside the messages. (Default: False) --compresslevel INT set the gzip compress level. 0 no compress, 1 fast/large, ..., 9 slow/tiny. (Default: 6) --log STR define logging level as one of DEBUG, INFO, WARNING, ERROR, or CRITICAL. (Default: WARNING) --logfile FILE define file to log to. (Default: stderr) --version show program's version number and exit State Extractors: {default,timeslice} These are available state extractor methods. Use -h for further options on each method. default Simple last value buffer of all variables timeslice Outputs complete state in regular time slices. ``` ###### 实现的状态提取器 目前,以下状态提取方法已实现: | 状态提取器 | 描述 | |-----------------|--------------------------------------------------------------------------------------------------| | default | 为每个消息输出一个状态,并保留每个变量的值。 | | timeslice | 保留每个变量的最后一个值,并在常规间隔(例如每秒)输出一个状态。 | ###### 状态格式 状态格式表示给定时间点的整个状态,包括物理过程的传感器和执行器的所有值。'state' 包含所有观察到的变量和值。每个变量的名称是其设备和变量名称的冒号分隔列表。如果自上次状态输出以来至少有一个包是恶意的,则状态标记为 'malicious'。 ``` { "timestamp": 1445465437.00792, "state": { "10.10.10.10:102:16": 0, "10.10.10.10:102:17": 0, "10.10.10.10:102:18": 0, "10.10.10.10:102:19": 0, "10.10.10.10:102:20": 0 }, "malicious": null } ``` #### 最小化 IPAL 消息 `ipal-minimize` 工具从 IPAL 消息或状态文件中清除进程信息(`data` 和 `state`)。这可以在实际进程数据不需要的情况下节省磁盘空间。 ``` ipal-minimize -h usage: ipal-minimize [-h] [--jobs INT] [--all] [--log STR] [--logfile FILE] [--version] FILE [FILE ...] positional arguments: FILE files to minimize ('*.gz' compressed). options: -h, --help show this help message and exit --jobs INT Number of parallel workers (Default: 4). --all Removes all data except those required for evaluation. --log STR define logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default is WARNING. --logfile FILE File to log to. Default is stderr. --version show program's version number and exit ``` #### 合并 IPAL 消息 `ipal-join` 工具可以用于合并来自不同 IIDS 的不同 IPAL 数据集文件。目前,IDS 输出是 ORed。有关更复杂的方法,特别是结合不同的 IDS 输出,请参阅 IPAL [IDS 框架](https://github.com/ipal-ids/ipal_ids_framework) 的 `combiner` 功能。 ``` ipal-join -h usage: ipal-join [-h] --dataset FILE --output FILE [--force-rename] [--log STR] [--logfile FILE] [--version] FILE [FILE ...] positional arguments: FILE files to join ('*.gz' compressed). options: -h, --help show this help message and exit --dataset FILE original dataset ('*.gz' compressed). --output FILE path to store joined output to ('*.gz' compressed). --force-rename Forces renaming dict entries, e.g., scores, metrics (Default: False). --log STR define logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default is WARNING. --logfile FILE File to log to. Default is stderr. --version show program's version number and exit ``` ## 开发 ##### 工具 用于开发、代码格式化、样式检查和测试的工具集可以使用以下命令安装: ``` python3 -m pip install -r requirements-dev.txt ``` 所有工具都可以手动使用以下命令执行,并在遇到错误时报告: ``` black . flake8 python3 -m pytest ``` 在提交之前,可以使用 Git 的 pre-commit 钩子功能强制执行对修改的文件进行 `black` 和 `flake8` 检查: ``` pre-commit install ``` 有关 `black` 和 `flake8` 设置的更多信息,请参阅 https://ljvmiranda921.github.io/notebook/2018/06/21/precommits-using-black-and-flake8/ ##### 添加协议 添加对新协议支持的过程如下: 1. 在 ```transcribers/``` 中添加一个新的模块 2. 创建一个新的解析器类,继承自 Transcriber 类(见 ```transcribers/transcriber.py```)。解析器类可以实现: - `matches_protocol`:给定一个数据包,如果解析器可以处理它,则返回 `True`,否则返回 `False`(必需) - `parse_packet`:给定一个解析器可以处理的数据包,返回一个有效 IPAL 消息的列表(必需) - `state_identifier`:给定一个数据包和数据字段的名称,返回一个字符串,标识相应的字段(可选,由状态提取器使用) - `matches_response`:给定请求消息列表和响应消息,修改响应消息的 `responds_to` 字段,通过添加匹配的请求 `id`,可能返回要从中删除的请求消息列表(可选) 3. 将新的转录器添加到 ```transcri/utils.py``` 中的列表 4. 将新的协议添加到上面的 [实现的协议](#implemented-protocols) 表中 5. 添加覆盖添加协议的测试用例 ##### 添加状态提取器 添加新状态提取方法的过程如下: 1. 在 ```state_extractors/``` 中添加一个新的文件 2. 创建一个新的状态提取器类,继承自 StateExtractor 类(见 ```state_extractors/state_extractor.py```)。状态提取器类可以实现: - `update_state`:给定一个 IPAL 消息,更新当前进程状态,如果需要,可以调用 ``_write_state`` 将状态写入输出(必需) - `finalize`:在所有消息处理完成后由主状态提取器脚本调用,实现 `finalize` 允许在完成时执行逻辑,例如输出一个最终状态(必需) - `add_arguments_to_parser`:向主状态提取器脚本添加参数,`args` 命名空间在初始化时传递给类,允许读取额外的用户配置和标志(可选) 3. 将新的状态提取器添加到 ```state_extractors/utils.py``` 中的列表 4. 将新的状态提取器添加到上面的 [实现的状态提取器](#implemented-state-extractors) 表中 ##### 添加测试 添加测试的过程取决于要添加的测试类型。 ###### 添加输出验证 在添加对新协议的支持时,添加对原始转录器输出、状态提取器输出以及结合转录器和状态提取器输出的验证输出进行检查。这些检查应添加到 `tests/test_transcriber.py`、`tests/test_state_extractor.py` 和 `tests/test_combined.py` 模块中,分别通过向 `RAW_FILES` 列表中添加条目。 例如,将三元组 `("misc/pcaps/s7.pcap", "s7.ipal", "s7")` 添加到 `tests/test_transcriber.py` 中的 `RAW_FILES`,表示将对在协议标志设置为 `s7` 的情况下对数据包捕获文件 `misc/pcaps/s7.pcap` 执行的转录器输出的验证进行检查,该验证与 `tests/snapshots/validation/test_transcriber_raw_s7.ipal` 中的参考文件进行比较。 请注意,在添加新测试后,在第一次测试运行后,将在 `tests/snapshots/validation/` 下创建一个验证 `IPAL` 文件。只需编辑这些新创建的验证文件,删除第一行的 `== new file ==` 标记,然后文件将用作未来测试的验证参考。 ###### 添加转录器测试 为了确保转录器的协议实现保持兼容,可以在 `tests/transcribers/` 中添加一个转录器测试模块。它可以包含测试添加的转录器的单个功能和属性的测试方法。请注意,为了使 pytest 收集它,模块和测试方法必须以 `test_` 前缀,并且测试
本存储库是 [IPAL](https://github.com/ipal-ids/ipal) 的一部分 - 工业协议抽象层。IPAL 的目标是建立工业网络流量的抽象表示,以便进行后续的统一和协议无关的工业入侵检测。IPAL 由一个 [转录器](https://github.com/ipal-ids/ipal_transcriber) 组成,用于自动将工业流量转换为 IPAL 表示,一个实现各种工业入侵检测系统 (IIDSs) 的 [IDS 框架](https://github.com/ipal-ids/ipal_ids_framework),以及一系列评估 [数据集](https://github.com/ipal-ids/ipal_datasets)。有关 IPAL 的详细信息,请参阅以下列出的出版物。
随着网络物理系统日益受到复杂攻击者的威胁,他们也在攻击系统的物理方面。补充保护措施,工业入侵检测系统承诺检测此类攻击。然而,由于工业协议多样性和缺乏标准接口,需要付出巨大努力才能将这些技术适应大量不同的协议。为了解决这个问题,我们提出了工业协议抽象层 (IPAL) - 作为工业入侵检测系统输入的工业通信的共同表示。
此软件 (`ipal-transcriber`) 实现了将工业网络流量自动转换为 IPAL 的功能,适用于各种工业协议。如图所示,转录器读取实时网络捕获或 pcap 文件,并将它们转换为 IPAL 表示。
###### 概述图

###### 实现的协议
| 实现的协议 | 状态 | 支持的消息类型 |
|-----------------------|-------------|-------------------------------------------------------------------------------------------------------------------------------|
| CIP | 初级 | 代码:76,77 |
| Goose | 中等 | |
| IEC 60870-5-104 | 良好 | U\_Format I\_Format:1-21,30-40,45-51,58-64,70,100-106 | | IEC 61162-450 | 中等 | UdPbC,无标签 | | Modbus TCP | 中等 | 功能代码:1,2,3,4,5,6,8,15,16,43 | | MQTT | 初级 | Basic MQTT 3.1 | | NMEA0183 | 良好 | DBT,DPT,GGA,GLL,GNS,GSA,GSV,HDM,HDT,RMC,ROT,RPM,TLL,TTM,VBW,VHW,VLW,VTG,ZDA,RMB,APB,RSA,DTM,Q,AIVDM | | S7 | 初级 | 任务:1,3
功能代码:4,5 | | DNP3 | 初级 | 功能代码:0-2,7,8,13,14,20,24,129,130
对象(组 ID:var):1:2,2:1,20:{0,2},50:3,52:2,60:{1-4},80:1 | | EtherCAT | 初级 | TODO:添加匹配数据 | | MavLink 2.0 | 初级 | 依赖于解析的消息定义 | | Navico BR24 | 中等 | REP,REG,IMG | | CAN Bus | 初级 | 依赖于帧结构 | | Publications | | | - Konrad Wolsing, Eric Wagner, Antoine Saillard, and Martin Henze. 2022. IPAL: Breaking up Silos of Protocol-dependent and Domain-specific Industrial Intrusion Detection Systems. In 25th International Symposium on Research in Attacks, Intrusions and Defenses (RAID 2022), October 26–28, 2022, Limassol, Cyprus. ACM, New York, NY, USA, 17 pages. [https://doi.org/10.1145/3545948.3545968 ](https://doi.org/10.1145/3545948.3545968) | - Wolsing, Konrad, Eric Wagner, and Martin Henze. "Poster: Facilitating Protocol-independent Industrial Intrusion Detection Systems." *Proceedings of the 2020 ACM SIGSAC Conference on Computer and Communications Security*. 2020 [https://doi.org/10.1145/3372297.3420019](https://doi.org/10.1145/3372297.3420019) ## 入门 如果您是 IPAL 的新用户,想了解一般想法或尝试我们的教程,请参阅 IPAL 的主存储库:[https://github.com/ipal-ids/ipal](https://github.com/ipal-ids/ipal). ###### 先决条件 `ipal-trascriber` 需要 `tshark` 已安装。有关您操作系统的安装说明,请参阅 [https://tshark.dev/setup/install/](https://tshark.dev/setup/install/). ###### 安装(pip) 使用 `python3 -m pip install .` 在系统范围内使用 `pip` [python 包安装程序](https://pip.pypa.io/en/stable/installation/) 安装脚本和依赖项。这将安装依赖项并将 `transcriber` 模块安装到本地站点包中,并将 `ipal-transcriber`、`ipal-state-extractor`、`ipal-minimize` 和 `ipal-join` 脚本添加到 `PATH`。然后可以在系统范围内调用这些脚本(例如 `ipal-transcriber -h`)。 ###### 安装(venv) 使用 `misc/install.sh` 或手动使用以下方法安装: ``` python3 -m venv venv source venv/bin/activate python3 -m pip install -r requirements.txt ``` 然后可以从项目存储库的根目录激活虚拟环境后调用这些脚本,例如: ``` source venv/bin/activate ./ipal-transcriber -h deactivate ``` ###### 安装(docker) 使用 `docker build -t ipal-ids-transcriber:latest .` 构建一个包含项目和使用开发依赖项的 `pip` 安装的 Docker 镜像。然后可以使用构建的镜像在容器中使用这些脚本,例如: ``` docker run -it ipal-ids-transcriber:latest /bin/bash ipal-transcriber -h ``` #### 使用转录器 `ipal-transcriber` 可以用于网络接口(```--interface```)或流量捕获(```--pcap```)。将 ```--ipal.output``` 设置为文件以将输出写入,或使用 '-' 写入 *stdout*。如果文件名以 '.gz' 结尾,则自动压缩。 可选的规则文件(```--rules```)允许通过重命名、删除或修改进程名称和值来修改输出。对于某些协议,例如 Modbus,需要缓存请求包以正确解析响应。```--timeout``` 定义了包缓存的上限时间。每个消息都可以标记为恶意或良性,这可以用于后续入侵检测方法的评估。默认值取自 ```--malicious.default```。使用 ```--malicious``` 提供包含特定于包或时间间隔标签的文件。 ``` ./ipal-transcriber -h usage: ipal-transcriber [-h] [--interface INTERFACE] [--pcap FILE] [--protocols STR [STR ...]] [--rules FILE] [--timeout INT] [--malicious FILE] [--malicious.default BOOL] [--crc STR] [--ipal.output FILE] [--log STR] [--logfile FILE] [--compresslevel INT] [--version] [--state.output FILE] [--filter LIST] [--complete-only BOOL] [--state-in-message BOOL] {default,timeslice} ... options: -h, --help show this help message and exit --interface INTERFACE traffic input interface (Use either this or --pcap) --pcap FILE path to pcap file (Use either this or --interface) --protocols STR [STR ...] specify a subset of the available transcribers ['cip', 'dnp3', 'goose', 'iec104', 'iec450', 'modbus', 'mqtt', 'nmea0183udp', 's7', 'ethercat']. (Default: all) --rules FILE file containing rules to transform transcribed messages. --timeout INT number of milliseconds a packet can be responded to. Used for response matching (Default: 250ms) --malicious FILE Attack json file for labeling the packets according to the attacks in a dataset. --malicious.default BOOL set this option to 'true' or 'false' to define default malicious annotation. (Default: None). Can be used in combination with --malicious --crc STR options for CRC calculations are at 'transport', 'application', combined with 'or', or 'and'. (Default: and) --ipal.output FILE output location for ipal messages ('-' stdout, '*.gz' compress). --log STR define logging level as one of DEBUG, INFO, WARNING, ERROR, or CRITICAL. (Default: WARNING) --logfile FILE define file to log to. (Default: stderr) --compresslevel INT set the gzip compress level. 0 no compress, 1 fast/large, ..., 9 slow/tiny. (Default: 6) --version show program's version number and exit --state.output FILE output location for state information. ('-' stdout, '*.gz' compress) --filter LIST semicolon separated list of state names to filter for. (Default: no filter) --complete-only BOOL output complete states after filtering only. (Default: True) --state-in-message BOOL embed state inside the messages. (Default: False) State Extractors: {default,timeslice} These are available state extractor methods. Use -h for further options on each method. default Simple last value buffer of all variables timeslice Outputs complete state in regular time slices. ``` ###### 消息格式 转录器解析每个工业协议数据包,并将每个数据包写入一个 JSON 行到输出。'id' 对每个消息是唯一的。源 ('src') 和目标 ('dest') 是由 ":" 分隔的不同地址级别的字符串。例如,Modbus 可以在理论上通过一个连接地址子设备,IP:端口:设备。活动是以下之一: - 询问:主动请求数据 - 通知:对请求的数据或未请求的消息的响应 - 命令:设置新值或命令动作 - 行动:对命令或(未请求的)执行动作的响应 - 确认:仅作为先前消息的 Layer-5 确认的包 - 不限于命令或询问 字段 'data' 包含一个字典,其中包含所有传输的工业过程值名称和值。如果请求此值,则值设置为 'null'。 ``` { "id": 0, "timestamp": 1445465436.995232, "protocol": "s7", "malicious": null, "src": "10.10.10.20:49156", "dest": "10.10.10.10:102", "length": 82, "crc": true, "type": 1, "activity": "interrogate", "responds to": [], "data": { "16": null, "17": null, "18": null, "19": null, "20": null } } ``` ###### 规则 规则应在通过 `--rules` 传递给转录器的文件中指定。规则允许重命名、删除或修改进程名称和值。更具体地说,它们允许添加和删除与 IPAL 消息匹配的某些数据字段,并重命名消息源和目标字段。 规则文件是一个包含一个名为 `JS` 的变量的 python 模块,该变量指向一个字典,描述所需的后期处理步骤。 规则文件可以可选地声明包含后期处理逻辑的方法。一个示例规则文件可以在 `misc/rules/nmea.py` 下找到,以下是其摘录: ``` def position_sign(vars): if vars[1] in ["N", "E"]: return +vars[0] elif vars[1] in ["S", "W"]: return -vars[0] JS = { "protocols": ["nmea0183udp", "iec450"], "rules": [ { # Position North-South "type": "RMC", "var": ["RMC2", "RMC3"], "method": position_sign, "name": "latitude", "remove": True, }, ], "rename": { ".*:GG": "GNSS", }, } ``` `JS` 可以包含三个键值对: - `protocols`:包含应应用转换规则和重命名操作的协议包的列表(必需) - `rules`:包含描述转换规则的字典列表(可选)。每个字典可以包含以下键: - `var`:包含应用规则的 data 字段列表(必需) - `type`、`src`、`dst`:分别匹配消息类型、源和目标字段的正则表达式(所有可选,省略时默认匹配任何值)。 - `method`:应用于指定在 `var` 中的字段的函数,需要 `name` 也存在(可选) - `name`:新数据字段的新名称,它将包含 `method` 的结果,需要 `method` 也存在(可选) - `flatten`:如果 `flatten` 设置为 true,我们期望 `method` 返回一个字典。然后 `name` 是可选的,我们将 `method` 返回的字典复制到状态中。(可选) - `remove`:是否删除指定在 `var` 中的字段(可选,省略时默认为 false) - `rename`:描述重命名规则的字典键值对(可选)。每个字典键值对应采用以下形式: - `key`:匹配消息源或目标的正则表达式 - `value`:应用于匹配的字符串的新值 在上面的示例中,指定的规则应用于 `type` 为 `RMC` 的消息:从消息中删除数据字段 `RMC2` 和 `RMC3`,并添加一个新字段 `latitude`,其中包含 `position_sign([x,y])` 的返回值,其中 `x` 和 `y` 是字段 `RMC2` 和 `RMC3` 的值。`rename` 键值对 `".*:GG": "GNSS"` 指定应将包含 `:GG` 的所有源和目标字段更新为 `GNSS`。 #### 使用状态提取器 `ipal-state-extractor` 将按包的消息格式转换为许多基于进程的 IDS 使用的状态格式。它可以作为独立程序使用,也可以直接在转录器工具中使用,通过传递相同的参数。状态写入由 ```--state.output``` 提供的文件或到 stdout。从数据包中导出状态的方法有很多。每个状态提取器都有自己的选项,可以通过 ```./state_extractor.py [extractor method] -h``` 获取。 ``` ./ipal-state-extractor -h usage: ipal-state-extractor [-h] [--ipal.input FILE] [--state.output FILE] [--filter LIST] [--complete-only BOOL] [--state-in-message BOOL] [--compresslevel INT] [--log STR] [--logfile FILE] [--version] {default,timeslice} ... options: -h, --help show this help message and exit --ipal.input FILE input location for message information. ('-' stdin, '*gz' compressed) --state.output FILE output location for state information. ('-' stdout, '*.gz' compress) --filter LIST semicolon separated list of state names to filter for. (Default: no filter) --complete-only BOOL output complete states after filtering only. (Default: True) --state-in-message BOOL embed state inside the messages. (Default: False) --compresslevel INT set the gzip compress level. 0 no compress, 1 fast/large, ..., 9 slow/tiny. (Default: 6) --log STR define logging level as one of DEBUG, INFO, WARNING, ERROR, or CRITICAL. (Default: WARNING) --logfile FILE define file to log to. (Default: stderr) --version show program's version number and exit State Extractors: {default,timeslice} These are available state extractor methods. Use -h for further options on each method. default Simple last value buffer of all variables timeslice Outputs complete state in regular time slices. ``` ###### 实现的状态提取器 目前,以下状态提取方法已实现: | 状态提取器 | 描述 | |-----------------|--------------------------------------------------------------------------------------------------| | default | 为每个消息输出一个状态,并保留每个变量的值。 | | timeslice | 保留每个变量的最后一个值,并在常规间隔(例如每秒)输出一个状态。 | ###### 状态格式 状态格式表示给定时间点的整个状态,包括物理过程的传感器和执行器的所有值。'state' 包含所有观察到的变量和值。每个变量的名称是其设备和变量名称的冒号分隔列表。如果自上次状态输出以来至少有一个包是恶意的,则状态标记为 'malicious'。 ``` { "timestamp": 1445465437.00792, "state": { "10.10.10.10:102:16": 0, "10.10.10.10:102:17": 0, "10.10.10.10:102:18": 0, "10.10.10.10:102:19": 0, "10.10.10.10:102:20": 0 }, "malicious": null } ``` #### 最小化 IPAL 消息 `ipal-minimize` 工具从 IPAL 消息或状态文件中清除进程信息(`data` 和 `state`)。这可以在实际进程数据不需要的情况下节省磁盘空间。 ``` ipal-minimize -h usage: ipal-minimize [-h] [--jobs INT] [--all] [--log STR] [--logfile FILE] [--version] FILE [FILE ...] positional arguments: FILE files to minimize ('*.gz' compressed). options: -h, --help show this help message and exit --jobs INT Number of parallel workers (Default: 4). --all Removes all data except those required for evaluation. --log STR define logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default is WARNING. --logfile FILE File to log to. Default is stderr. --version show program's version number and exit ``` #### 合并 IPAL 消息 `ipal-join` 工具可以用于合并来自不同 IIDS 的不同 IPAL 数据集文件。目前,IDS 输出是 ORed。有关更复杂的方法,特别是结合不同的 IDS 输出,请参阅 IPAL [IDS 框架](https://github.com/ipal-ids/ipal_ids_framework) 的 `combiner` 功能。 ``` ipal-join -h usage: ipal-join [-h] --dataset FILE --output FILE [--force-rename] [--log STR] [--logfile FILE] [--version] FILE [FILE ...] positional arguments: FILE files to join ('*.gz' compressed). options: -h, --help show this help message and exit --dataset FILE original dataset ('*.gz' compressed). --output FILE path to store joined output to ('*.gz' compressed). --force-rename Forces renaming dict entries, e.g., scores, metrics (Default: False). --log STR define logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL). Default is WARNING. --logfile FILE File to log to. Default is stderr. --version show program's version number and exit ``` ## 开发 ##### 工具 用于开发、代码格式化、样式检查和测试的工具集可以使用以下命令安装: ``` python3 -m pip install -r requirements-dev.txt ``` 所有工具都可以手动使用以下命令执行,并在遇到错误时报告: ``` black . flake8 python3 -m pytest ``` 在提交之前,可以使用 Git 的 pre-commit 钩子功能强制执行对修改的文件进行 `black` 和 `flake8` 检查: ``` pre-commit install ``` 有关 `black` 和 `flake8` 设置的更多信息,请参阅 https://ljvmiranda921.github.io/notebook/2018/06/21/precommits-using-black-and-flake8/ ##### 添加协议 添加对新协议支持的过程如下: 1. 在 ```transcribers/``` 中添加一个新的模块 2. 创建一个新的解析器类,继承自 Transcriber 类(见 ```transcribers/transcriber.py```)。解析器类可以实现: - `matches_protocol`:给定一个数据包,如果解析器可以处理它,则返回 `True`,否则返回 `False`(必需) - `parse_packet`:给定一个解析器可以处理的数据包,返回一个有效 IPAL 消息的列表(必需) - `state_identifier`:给定一个数据包和数据字段的名称,返回一个字符串,标识相应的字段(可选,由状态提取器使用) - `matches_response`:给定请求消息列表和响应消息,修改响应消息的 `responds_to` 字段,通过添加匹配的请求 `id`,可能返回要从中删除的请求消息列表(可选) 3. 将新的转录器添加到 ```transcri/utils.py``` 中的列表 4. 将新的协议添加到上面的 [实现的协议](#implemented-protocols) 表中 5. 添加覆盖添加协议的测试用例 ##### 添加状态提取器 添加新状态提取方法的过程如下: 1. 在 ```state_extractors/``` 中添加一个新的文件 2. 创建一个新的状态提取器类,继承自 StateExtractor 类(见 ```state_extractors/state_extractor.py```)。状态提取器类可以实现: - `update_state`:给定一个 IPAL 消息,更新当前进程状态,如果需要,可以调用 ``_write_state`` 将状态写入输出(必需) - `finalize`:在所有消息处理完成后由主状态提取器脚本调用,实现 `finalize` 允许在完成时执行逻辑,例如输出一个最终状态(必需) - `add_arguments_to_parser`:向主状态提取器脚本添加参数,`args` 命名空间在初始化时传递给类,允许读取额外的用户配置和标志(可选) 3. 将新的状态提取器添加到 ```state_extractors/utils.py``` 中的列表 4. 将新的状态提取器添加到上面的 [实现的状态提取器](#implemented-state-extractors) 表中 ##### 添加测试 添加测试的过程取决于要添加的测试类型。 ###### 添加输出验证 在添加对新协议的支持时,添加对原始转录器输出、状态提取器输出以及结合转录器和状态提取器输出的验证输出进行检查。这些检查应添加到 `tests/test_transcriber.py`、`tests/test_state_extractor.py` 和 `tests/test_combined.py` 模块中,分别通过向 `RAW_FILES` 列表中添加条目。 例如,将三元组 `("misc/pcaps/s7.pcap", "s7.ipal", "s7")` 添加到 `tests/test_transcriber.py` 中的 `RAW_FILES`,表示将对在协议标志设置为 `s7` 的情况下对数据包捕获文件 `misc/pcaps/s7.pcap` 执行的转录器输出的验证进行检查,该验证与 `tests/snapshots/validation/test_transcriber_raw_s7.ipal` 中的参考文件进行比较。 请注意,在添加新测试后,在第一次测试运行后,将在 `tests/snapshots/validation/` 下创建一个验证 `IPAL` 文件。只需编辑这些新创建的验证文件,删除第一行的 `== new file ==` 标记,然后文件将用作未来测试的验证参考。 ###### 添加转录器测试 为了确保转录器的协议实现保持兼容,可以在 `tests/transcribers/` 中添加一个转录器测试模块。它可以包含测试添加的转录器的单个功能和属性的测试方法。请注意,为了使 pytest 收集它,模块和测试方法必须以 `test_` 前缀,并且测试
标签:AMSI绕过, IPAL, pcap文件, 入侵检测框架, 入侵检测系统, 协议抽象层, 威胁检测, 安全数据湖, 工业协议, 工业协议多样性, 工业协议解析, 工业协议转换, 工业控制系统, 工业网络安全, 工业通信, 数据转换, 标准化接口, 物理网络安全, 网络流量分析, 请求拦截, 逆向工具