Albuqr/Transaction_Monitor
GitHub: Albuqr/Transaction_Monitor
基于规则引擎的实时金融异常检测流水线,在零标签数据条件下通过滚动基线比较和人工审核实现可解释的交易欺诈预警。
Stars: 0 | Forks: 0
[🇺🇸 English](#transaction-monitor) | [🇧🇷 Português](#monitor-de-transações)
# Transaction Monitor






为巴西一家糖果制造商打造的实时金融异常检测流水线:通过 Kafka 流式传输成本中心交易,将每笔交易与按中心计算的滚动平均值进行比较,并将偏差超过 ±20% 的异常通过 Streamlit 仪表板呈现,供人工审核。
## 问题背景
一家拥有 28 台生产机器的巴西糖果制造商以手动方式记录所有金融交易。该工厂需要一种方法来实时检测异常的金融交易——即与历史成本中心基线发生显著偏差的付款——以便员工能够在资金流失前进行调查。
源数据中不包含供应商级别的描述;工厂从未记录过每笔付款的供应商归属。系统以成本中心为粒度运行,这是数据实际支持的最细粒度。编造供应商映射的方案被否决:因为这会使异常检测产生误导,而失去其实用价值。
## 系统架构
```
Producer ──► Kafka (KRaft) ──► Consumer ──► Redis ──► SQLite ──► FastAPI ──► Streamlit
```
- **Producer** — 从 BigQuery 读取成本中心预算数据,并将交易事件发布到 `transactions` Kafka 主题
- **Kafka (KRaft)** — 持久化事件流;消费者组偏移量确保每笔交易只被处理和提交一次
- **Consumer** — 读取事件,将每笔交易与 Redis 中的基线进行比对,将异常写入 SQLite,并更新正常交易的基线
- **Redis** — 存储按成本中心计算的滚动统计信息(计数、总和、平均值),用于 O(1) 复杂度的基线查询
- **SQLite** — 持久化标记的告警及其审核状态;因其在此规模下的运维简便性而被选用
- **FastAPI** — 暴露 `/alerts` 和 `/resolutions` 接口的 REST 层;仪表板不直接与数据库通信
- **Streamlit** — 审核仪表板,操作员在此将每个告警分类为合法或欺诈,审核结果会反馈到 Redis 基线中
## 关键设计决策
**1. 基于规则的异常检测,而非机器学习**
零标记的异常历史使得监督式训练根本不可能——没有任何可用于训练的数据。使用模型只会是披着科学外衣的猜测。采用统计阈值来标记偏离各成本中心滚动平均值 ±20% 以上的交易,这种做法具有确定性、可审计性,并且无需历史异常标签即可在首日直接部署。
**2. 成本中心粒度**
源数据不包含供应商描述;工厂从未记录过每笔付款的供应商归属。编造映射的方案被否决,因为这会使异常检测产生误导。成本中心是数据实际支持的最精细粒度,系统围绕这一约束进行设计,而不是去掩盖它。
**3. KRaft 模式的 Kafka**
彻底消除了 ZooKeeper,减少了技术栈的运维表面积。KRaft 模式在不引入 ZooKeeper 协调开销的情况下,展示了基于偏移量的持久化消费和消费者组重平衡——这些概念在生产环境的流处理场景中至关重要。相较于 Redis Streams 增加的复杂性是有意为之:它暴露了分布式流处理的原语,而这些是更简单的替代方案所抽象掉的。
## 技术栈
| 技术 | 作用 |
|---|---|
| Python 3.11 | 核心语言 |
| Apache Kafka (KRaft) | 事件流——无 ZooKeeper 依赖 |
| Redis | 按成本中心存储内存级滚动基线 |
| SQLite | 轻量级告警持久化 |
| FastAPI | REST API——`/alerts`、`/resolutions` |
| Streamlit | 异常审核仪表板 |
| BigQuery | 历史数据仓库 |
| dbt | Bronze/silver/gold 转换 |
## 工作原理
1. Producer 查询 BigQuery 获取成本中心预算数据,并将交易事件发布到 `transactions` Kafka 主题。
2. Consumer 读取每个事件,并在 Redis 中查找该成本中心的滚动基线。
3. 如果基线存在且交易金额在平均值 ±20% 的范围内,则增量更新基线(计数、总和、平均值)并提交消息。
4. 如果交易偏差超过 ±20%,则会连同其偏差百分比作为告警写入 SQLite,随后进行提交。
5. 如果某个成本中心尚无基线,则直接从该交易初始化一个基线。
6. FastAPI 暴露 `/alerts` 接口以列出标记的交易,暴露 `/resolutions` 接口以将告警标记为已审核;一次合法的审核判定会更新 Redis 基线,以便未来的交易能与扩展后的历史记录进行比较。
7. Streamlit 仪表板轮询 `/alerts` 接口,通过可视化阈值图表展示每个异常,并允许操作员将每笔交易分类为合法或欺诈。
## 本地运行
**前置条件**
- Docker 和 Docker Compose
- 具备 BigQuery 读取权限的 GCP 服务账号密钥,位于 `./gcp-credentials.json`
- 根据 `.env.example` 填写的 `.env` 文件
```
# 1. 配置环境
cp .env.example .env
# 填写 KAFKA_TOPIC, BIGQUERY_PROJECT, BIGQUERY_DATASET, GOOGLE_APPLICATION_CREDENTIALS
# 2. 启动所有服务
docker compose up -d
# 3. 使用来自 BigQuery 的历史成本中心基线填充 Redis
# REDIS_HOST=localhost 覆盖 Docker 服务名称,以便在本地执行脚本
REDIS_HOST=localhost python processing/seed_redis.py
# 4. 向 Kafka 发布一批 transactions
docker compose run --rm producer
# 5. 在 http://localhost:8501 打开 dashboard
```
## 在线演示
[transaction-monitor.albuqr.com](https://transaction-monitor.albuqr.com)
## 更大平台的一部分
此仓库是为同一客户构建的三个互联系统中的第二个。[Factory Lakehouse](https://github.com/Albuqr/Factory_Lakehouse)(仓库 1)通过由 Airflow 编排的 dbt bronze/silver/gold 转换,将所有 28 台机器的原始 Excel 导出数据摄取到 BigQuery 中——此流水线读取的成本中心表正是在那里生成的。
[🇺🇸 English](#transaction-monitor) | [🇧🇷 Português](#monitor-de-transações)
# Transaction Monitor






Pipeline de detecção de anomalias financeiras em tempo real para um fabricante brasileiro de confeitos: transmite transações por centro de custo através do Kafka, compara cada uma com uma média móvel por centro, e expõe desvios acima de ±20% para revisão humana via dashboard Streamlit.
## 问题
Um fabricante brasileiro de confeitos com 28 máquinas de produção registra todas as transações financeiras manualmente. A fábrica precisava de uma forma de detectar transações financeiras anômalas em tempo real — pagamentos que desviam significativamente das linhas de base históricas por centro de custo — para que a equipe pudesse investigar antes que o dinheiro fosse perdido.
Os dados de origem não continham descrições em nível de fornecedor; a fábrica nunca registrou qual fornecedor recebeu cada pagamento. O sistema opera na granularidade de centro de custo, o nível mais detalhado que os dados realmente suportam. Fabricar mapeamentos de fornecedores foi rejeitado: isso tornaria a detecção de anomalias enganosa em vez de útil.
## 架构
```
Produtor ──► Kafka (KRaft) ──► Consumidor ──► Redis ──► SQLite ──► FastAPI ──► Streamlit
```
- **Produtor** — lê dados de orçamento por centro de custo do BigQuery e publica eventos de transação no tópico `transactions` do Kafka
- **Kafka (KRaft)** — stream de eventos durável; offsets do grupo consumidor garantem que cada transação seja processada e confirmada exatamente uma vez
- **Consumidor** — lê eventos, compara cada transação com a linha de base no Redis, grava anomalias no SQLite e atualiza a linha de base para transações normais
- **Redis** — armazena estatísticas móveis por centro de custo (contagem, soma, média) para consultas de linha de base em O(1)
- **SQLite** — persiste alertas sinalizados e seus status de revisão; escolhido pela simplicidade operacional nessa escala
- **FastAPI** — camada REST expondo `/alerts` e `/resolutions`; o dashboard nunca acessa o banco de dados diretamente
- **Streamlit** — dashboard de revisão onde operadores classificam cada alerta como legítimo ou fraude, com resoluções realimentadas na linha de base do Redis
## 设计决策
**1. Detecção de anomalias por regras, não por ML**
Zero histórico de anomalias rotuladas torna o treinamento supervisionado impossível — não há nada com que treinar. Um modelo seria uma suposição disfarçada de ciência. Um limiar estatístico que sinaliza transações além de ±20% da média móvel por centro de custo é determinístico, auditável e implantável desde o primeiro dia, sem necessidade de rótulos históricos de anomalias.
**2. Granularidade de centro de custo**
Os dados de origem não contêm descrições de fornecedores; a fábrica nunca registrou qual fornecedor recebeu cada pagamento. Fabricar mapeamentos foi rejeitado porque tornaria a detecção de anomalias enganosa em vez de útil. Centros de custo são o nível mais granular que os dados realmente suportam, e o sistema é projetado em torno dessa restrição em vez de encobri-la.
**3. Kafka no modo KRaft**
Elimina o ZooKeeper completamente, reduzindo a superfície operacional da stack. O modo KRaft demonstra consumo durável baseado em offsets e rebalanceamento de grupos consumidores sem a sobrecarga de coordenação que o ZooKeeper introduz — conceitos que importam em ambientes de streaming em produção. A complexidade adicional em relação ao Redis Streams é deliberada: ela expõe as primitivas de streaming distribuído que alternativas mais simples abstraem.
## 技术栈
| Tecnologia | Função |
|---|---|
| Python 3.11 | Linguagem principal |
| Apache Kafka (KRaft) | Streaming de eventos — sem dependência de ZooKeeper |
| Redis | Armazenamento em memória de linhas de base móveis por centro de custo |
| SQLite | Persistência leve de alertas |
| FastAPI | API REST — `/alerts`, `/resolutions` |
| Streamlit | Dashboard de revisão de anomalias |
| BigQuery | Data warehouse histórico |
| dbt | Transformações bronze/silver/gold |
## 工作原理
1. O produtor consulta o BigQuery para obter dados de orçamento por centro de custo e publica eventos de transação no tópico `transactions` do Kafka.
2. O consumidor lê cada evento e busca a linha de base móvel do centro de custo no Redis.
3. Se a linha de base existir e a transação estiver dentro de ±20% da média, a linha de base é atualizada incrementalmente (contagem, soma, média) e a mensagem é confirmada.
4. Se a transação desviar além de ±20%, ela é gravada no SQLite como um alerta com seu percentual de desvio e então confirmada.
5. Se ainda não existir linha de base para um centro de custo, uma é inicializada a partir da própria transação.
6. O FastAPI expõe `/alerts` para listar transações sinalizadas e `/resolutions` para marcar um alerta como revisado; uma resolução legítima atualiza a linha de base no Redis para que transações futuras sejam comparadas com um histórico expandido.
7. O dashboard Streamlit consulta `/alerts`, renderiza cada anomalia com um gráfico de limiar visual e permite que operadores classifiquem cada transação como legítima ou fraude.
## 本地运行
**Pré-requisitos**
- Docker e Docker Compose
- Uma chave de conta de serviço GCP com acesso de leitura ao BigQuery em `./gcp-credentials.json`
- `.env` preenchido a partir de `.env.example`
```
# 1. 配置环境
cp .env.example .env
# 设置 KAFKA_TOPIC, BIGQUERY_PROJECT, BIGQUERY_DATASET, GOOGLE_APPLICATION_CREDENTIALS
# 2. 启动所有服务
docker compose up -d
# 3. 使用 BigQuery 的历史基线填充 Redis
# REDIS_HOST=localhost 覆盖 Docker 服务名称,以便在本地执行脚本
REDIS_HOST=localhost python processing/seed_redis.py
# 4. 向 Kafka 发布一批 transactions
docker compose run --rm producer
# 5. 在 http://localhost:8501 打开 dashboard
```
## 现场演示
[transaction-monitor.albuqr.com](https://transaction-monitor.albuqr.com)
## 更大平台的一部分
Este repositório é o segundo de três sistemas interconectados desenvolvidos para o mesmo cliente. O [Factory Lakehouse](https://github.com/Albuqr/Factory_Lakehouse) (Repositório 1) ingere exportações brutas em Excel de todas as 28 máquinas no BigQuery via transformações dbt bronze/s/gold orquestradas pelo Airflow — as tabelas de centro de custo que este pipeline lê são produzidas lá.
标签:AV绕过, FastAPI, Kafka, KRaft, Kubernetes, Python, Redis, SonarQube插件, SQLite, Streamlit, 业务风控, 事件驱动架构, 云计算, 制造业数字化转型, 大数据, 实时仪表盘, 实时异常检测, 巴西糖果制造商, 异常交易预警, 成本中心管理, 搜索引擎查询, 数据流水线, 无后门, 无机器学习, 流数据处理, 滚动基线, 目录扫描, 规则引擎, 访问控制, 请求拦截, 财务合规, 金融交易监控, 零样本检测