SOFTNETWORK-APP/generic-persistence-api

GitHub: SOFTNETWORK-APP/generic-persistence-api

基于 Scala 和 Akka Persistence 的 CQRS/事件溯源框架,用于简化分布式系统中的读写分离与状态管理。

Stars: 1 | Forks: 0

![构建状态](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/a467428fea052254.svg) [![codecov](https://codecov.io/github/SOFTNETWORK-APP/generic-persistence-api/branch/main/graph/badge.svg?token=M9KVXR8KGS)](https://codecov.io/gh/SOFTNETWORK-APP/generic-persistence-api/) [![Codacy 徽章](https://app.codacy.com/project/badge/Grade/122252a6bdfb41c3af16d31f8cefaecc)](https://www.codacy.com/gh/SOFTNETWORK-APP/generic-persistence-api/dashboard?utm_source=github.com&utm_medium=referral&utm_content=SOFTNETWORK-APP/generic-persistence-api&utm_campaign=Badge_Grade) [![许可证: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) # 通用持久化 API 一个使用 akka persistence 和 Scala 编写的 **CQRS/ES** 框架 ![CQRS](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/1fbd685908052255.svg) “CQRS 只是将原本的一个对象拆分为两个对象。这种分离是基于方法是命令还是查询(这与 Meyer 在命令查询分离中的定义相同:命令是任何改变状态的方法,而查询是任何返回值的方法)。” —Greg Young,CQRS,基于任务的用户界面,事件溯源 天哪! ## CQRS 写端 **generic-persistence-api** 依赖 [Akka Persistence](https://doc.akka.io/docs/akka/current/typed/persistence.html) 来提供一种**可扩展**且**有弹性**的方式,通过命令和事件实现 CQRS 的写端,从而简化事件溯源系统的实现。 ### 集群分片 集群分片是 Akka 中的一种分布式系统机制,它允许有状态 actor 分布在集群节点上,同时确保消息被路由到正确的 actor 实例,而无论该 actor 运行在哪个节点上,这为扩展有状态 actor 提供了强大的机制。 以下是集群分片工作原理的概述: 1. **Actor 实体**:每个需要使用集群分片进行分布的有状态 actor 被称为“actor 实体”。 2. **分片区域**:“分片区域”(节点)负责管理一组 actor 实体。每个分片区域负责系统中 actor 实体的一个子集。 3. **分片标识符**:每个 actor 实体被分配一个唯一的“分片标识符”,用于确定哪个分片区域负责管理该实体。 4. **消息路由**:当向 actor 实体发送消息时,消息首先发送到负责管理该实体的分片区域。然后,分片区域根据实体标识符(标识该分片内的特定实体)将消息路由到相应的 actor 实例。 5. **节点感知**:分片机制感知集群的状态,因此可以确保 actor 实例运行在当前可用且健康的节点上。 6. **持久化**:Akka Persistence 用于持久化每个 actor 实体的状态,确保状态是持久的,并且可以在节点故障或其他类型的系统故障中幸存。 默认情况下,分片标识符是实体标识符的 hashCode 的绝对值对分片总数取模的结果。分片数量由 `akka.cluster.sharding.number-of-shards` 配置。其值对于集群中的所有节点必须相同。 默认实现 _LeastShardAllocationStrategy_ 将新的分片分配给之前分配分片数量最少的分片区域(节点)。 集群分片的主要好处之一是,它允许系统通过将工作负载分布到多个节点来实现水平扩展。分片机制确保实体均匀分布在节点之间,从而防止任何单个节点成为瓶颈。 另一个好处是它为系统提供了容错能力和弹性。如果节点发生故障,由该节点管理的分片会自动移动到集群中的其他节点。这确保了即使发生节点故障,系统也能继续运行。 ### 实体行为 在 Akka 中,“EventSourceBehavior”是一个旨在与 Akka Persistence 配合工作的实体 actor,用于确保其状态更改是持久的,并且可以在发生故障或重启时恢复。 使用 Akka **EventSourcedBehavior** 的写端通常涉及以下步骤: 1. **命令处理**:当接收到**命令**时,通常由代表该 actor 实体的 EventSourcedBehavior actor 实例处理。actor 使用实体的当前**状态**来确定如何处理该命令,并可能生成一个或多个**事件**,然后最终响应发送者。 2. **事件持久化**:生成事件后,它们会被追加到**日记**(journal)(使用 Akka Persistence 的事件溯源机制的事件日志)中,并附带可选的**标签**。这允许根据标签轻松过滤和查询事件,从而提高读端**投影**(projections)的效率。 3. **事件重放**:当创建或重启 actor 时,Akka Persistence 会自动从事件日志中重放所有事件,允许 actor 基于这些事件重建其当前状态。 4. **快照**:为了避免在每次创建或重启 actor 时都重放所有事件,Akka EventSourcedBehavior 支持快照。每 N 个事件或在满足给定状态谓词时,actor 可以保存其当前状态的**快照**,包括自上次快照以来累积的所有事件及其标签。此快照与事件日志一起持久化,可用于在稍后的时间点恢复 actor 的状态,而不是从头开始重放所有事件。 “实体行为”是一个**专用工厂**,它依赖集群分片来创建“EventSourceBehavior”actor 的实例,每个实例对应于由分片区域管理的单个 actor 实体。 它必须定义该行为将处理的命令和事件类的类型,以及处理其内存状态的类。 ``` trait EntityBehavior[C <: Command, S <: State, E <: Event, R <: CommandResult] ``` 必须定义 `persistenceId` 属性,以便定义底层实体 actor 的实体类型键,该键唯一标识集群中的实体类型。 ``` final def TypeKey(implicit c: ClassTag[C]): EntityTypeKey[C] = EntityTypeKey[C](s"$persistenceId-$environment") /** @return * the key used to define the Entity Type Key of this actor */ def persistenceId: String ``` 它定义了几个可能被覆盖的参数。 ``` /** @return * number of events before saving a snapshot of the current actor entity state. If multiple * events are persisted with a single Effect the snapshot will happen after all of the events * are persisted rather than precisely every `snapshotInterval` */ def snapshotInterval: Int = 10 /** @return * number of snapshots to keep */ def numberOfSnapshots: Int = 2 /** @return * node role required to start the entity actor */ def role: String = "" ``` 实体 actor 的行为由一组命令和事件处理程序决定,这些处理程序由用户定义。 ``` /** @param entityId * - entity identity * @param state * - current state * @param command * - command to handle * @param replyTo * - optional actor to reply to * @param timers * - scheduled messages associated with this entity behavior * @return * effect */ def handleCommand( entityId: String, state: Option[S], command: C, replyTo: Option[ActorRef[R]], timers: TimerScheduler[C] )(implicit context: ActorContext[C]): Effect[E, Option[S]] = command match { case _ => Effect.unhandled } /** This method is invoked whenever an event has been persisted successfully or when the entity is * started up to recover its state from the stored events * * @param state * - current state * @param event * - event to hanlde * @return * new in-memory state after applying the event to the previous state */ def handleEvent(state: Option[S], event: E)(implicit context: ActorContext[_]): Option[S] = event match { case _ => state } /** associate a set of tags to an event before the latter will be appended to the event log * * This allows events to be easily filtered and queried based on their tags, improving the * efficiency of read-side projections * * @param entityId * - entity id * @param event * - the event to tag * @return * set of tags to associate to this event */ protected def tagEvent(entityId: String, event: E): Set[String] = Set.empty /** This method is called just after the state of the corresponding entity has been successfully * recovered * * @param state * - current state * @param context * - actor context */ def postRecoveryCompleted(state: Option[S])(implicit context: ActorContext[C]): Unit = {} ``` 在其初始化期间,工厂使用集群分片为 actor 实体创建一个分片区域,该区域将负责管理实体的生命周期,包括创建新实体、停止实体以及将命令路由到适当的实体。 ``` def init(system: ActorSystem[_], maybeRole: Option[String] = None)(implicit c: ClassTag[C] ): ActorRef[ShardingEnvelope[C]] = { ClusterSharding(system) init Entity(TypeKey) { entityContext => this( entityContext.entityId, PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId) ) }.withRole(maybeRole.getOrElse(role)) } ``` 该框架处理事件持久化、重放和快照的复杂性,使开发人员能够专注于定义领域实体对命令的响应行为。 生成的系统非常高效,能够从**快照**快速重建其状态并仅重放部分事件,同时仍然确保系统状态的准确性。 此外,提供**标记事件**的能力使得**读端投影**可以轻松实施和维护,从而提高系统的整体性能和可扩展性。 ![EntityBehavior](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/00b6e365b1052256.svg) ### 实体模式 该框架定义了“实体模式”,以允许发送者与分布式系统内的 actor 实体轻松通信。 要向 actor 实体发送命令,需要后者的引用。可以通过将用于定义该 actor 的**实体类型键**及其**实体标识符**的键传递给集群分片来获取此引用。 以下是“实体模式”在集群分片上下文中如何工作的概述: 1. **接收命令**:发送者将命令连同要交互的 actor 的实体标识符一起发送给实体模式。 2. **检索 EntityRef**:实体模式通过将其实体类型键和实体标识符传递给集群分片,检索与该实体标识符对应的实体引用。 3. **询问实体**:实体模式在实体引用上调用 ask 方法以发送消息并等待响应。 4. **消息路由**:消息被路由到分片协调器,然后根据实体的分片标识符路由到 actor 实例运行所在的正确节点。 5. **消息处理**:实体 actor 接收并处理消息,可能会修改其状态并发回响应消息。 6. **Future 完成**:当接收到响应消息时,代表响应消息的 Future 将以接收到的响应消息完成。 7. **超时处理**:如果在指定的超时时间内未收到响应消息,Future 将以 TimeoutException 完成。 “实体模式”必须定义它将处理的命令类以及相应的实体类型键。 ``` trait EntityPattern[C <: Command, R <: CommandResult] extends Patterns[C, R] with Entity { _: CommandTypeKey[C] => type Recipient = String //... } ``` 它公开了多种方法,允许发送者与 actor 实体进行通信。 ``` def ?(recipient: Recipient, command: C)(implicit tTag: ClassTag[C], system: ActorSystem[_] ): Future[R] def !(recipient: Recipient, command: C)(implicit tTag: ClassTag[C], system: ActorSystem[_] ): Unit def ??[T](key: T, command: C)(implicit tTag: ClassTag[C], system: ActorSystem[_]): Future[R] def ?![T](key: T, command: C)(implicit tTag: ClassTag[C], system: ActorSystem[_]): Unit ``` ![Patterns](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/7f731ac75d052257.svg) ### 命令 在 CQRS 中,命令在定义系统的写端方面起着关键作用。命令是一条消息,它封装了用户执行特定操作的意图,例如创建新帐户、更新现有记录或删除数据。命令的主要目的是启动系统状态的更改。 命令响应在向发起命令的用户提供反馈方面也起着重要作用。当用户向系统发送命令时,他们期望收到确认命令已成功执行或发生错误的响应。 该框架将 trait **_Command_** 定义为发送给系统中接收者的任何命令的根接口,并将 trait **_CommandResult_** 定义为任何命令响应的根接口。 ![Command](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/fb510ebe47052258.svg) ### 事件 事件为同步写端(由处理命令产生)的更改与读端提供了基础。 如果写端在应用程序状态更改时引发事件,读端应响应该事件并更新其查询和视图所使用的数据。 该框架将 trait **_Event_** 定义为系统中任何事件的根接口。 ![Event](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/c3a833356b052259.svg) ### 状态 实体 actor 的状态可以分为两部分:内存状态和持久化状态。 内存状态表示 actor 正在处理时的当前状态。当 actor 接收命令并据此生成事件时,此状态会被修改。内存状态可以是任何类型的数据结构,例如 case class 或集合。 持久化状态表示 actor 存储在日记中的状态。此状态包括由 actor 生成的所有事件,可用于在发生故障或重启时重建 actor 的内存状态。持久化状态由 Akka Persistence 日记维护,actor 无法直接访问。 当创建实体 actor 时,它以空的内存状态和持久化状态开始。随着事件被持久化到日记,持久化状态会更新,并通过重放日记中的事件来重建内存状态。 该框架将 trait **_State_** 定义为任何实体 actor 的内存状态的根接口。 ![State](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/23c2f28648052301.svg) ### 序列化 Akka Persistence 提供了一种内置的序列化机制,使用 Akka Serialization 库。该库允许您为自定义数据类型定义序列化器,以便在持久化到事件存储时自动进行序列化和反序列化。 当 Akka actor 接收到命令时,它会创建一个或多个领域事件并将其发送到事件日记以进行持久化。在持久化事件之前,会使用配置的序列化机制对其进行序列化。当从事件日记读取事件时,会使用相同的机制将其反序列化回领域事件。 最后,在持久化快照之前,actor 的当前状态(其内存状态)会使用配置的序列化机制进行序列化。 该框架原生支持三种类型的**序列化器**(Serializers): + **proto** + **jackson-cbor** + **chill** ``` akka { actor { allow-java-serialization = off serializers { proto = "akka.remote.serialization.ProtobufSerializer" jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer" chill = "com.twitter.chill.akka.AkkaSerializer" } ... } } ``` 默认[配置](core/src/main/resources/softnetwork-persistence.conf)定义了以下序列化绑定: ``` akka { actor { ... enable-additional-serialization-bindings = on serialization-bindings { "app.softnetwork.persistence.model.package$Timestamped" = proto "app.softnetwork.persistence.message.package$ProtobufEvent" = proto # protobuf events "app.softnetwork.persistence.model.package$ProtobufDomainObject" = proto # protobuf domain objects "app.softnetwork.persistence.model.package$ProtobufStateObject" = proto # protobuf state objects "app.softnetwork.persistence.message.package$CborEvent" = jackson-cbor # cbor events "app.softnetwork.persistence.model.package$CborDomainObject" = jackson-cbor # cbor domain objects "app.softnetwork.persistence.message.package$Command" = chill "app.softnetwork.persistence.message.package$CommandResult" = chill "app.softnetwork.persistence.message.package$Event" = chill "app.softnetwork.persistence.model.package$State" = chill } } } ``` ![Serialization](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/747583153e052302.svg) #### 版本控制 您将来可能会发现有必要更改特定事件类型或聚合的定义。 您必须考虑系统将如何处理事件类型和聚合的多个版本。 ### 事件溯源 ![EventSourcing](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/8f46d17bb9052303.svg) ## CQRS 读端 虽然通过重放事件流(或其过去某个时间点的状态)来加载对象的当前状态很容易,但执行诸如“查找我所有总值超过 250 美元的订单”之类的查询是困难或昂贵的。 通过实施 CQRS 模式,此类查询通常将在读端执行,在那里您可以确保构建专门用于回答此类问题的数据投影。 CQRS 的读端负责提供对写端写入的数据的快速高效的读访问。 它可以通过多种方式实现,但一种常见的模式是使用事件溯源。在这种方法中,读端订阅由写端发布的事件流,并处理事件以构建和更新读模型。 这种方法通过框架中的“事件处理器流”来实现。 ### 事件处理器流 CQRS 读端的“事件处理器流”实现根据**标签**订阅特定事件,这些事件由写端通过事件溯源发布。通过订阅这些事件,读端可以接收和处理仅与其领域相关的事件。 从流中读取事件时,重要的是要跟踪流中读取上一个事件的位置。此位置通常称为“偏移量”(offset)。偏移量可用于确保流中的每个事件仅被读取一次,即使多次读取该流也是如此。 为了确保每个事件仅被读取一次,“事件处理器流”维护一个**偏移量**,这是一个指向它**成功**处理的最后一个事件的指针。通过存储其偏移量,处理器可以在发生故障或重启时从停止的地方继续处理事件。通过跟踪偏移量,“事件流处理器”_消费者_可以确保流中的每个事件仅被读取一次。如果应用程序崩溃或重启,它可以从最后已知的偏移量恢复读取,确保不会遗漏或重复事件。 以下是“事件流处理器”处理偏移量的方式: 1. **初始化偏移量**:当处理器(重新)启动时,它查询“偏移量提供者”以检索其在事件流中成功处理的最后一个事件所对应的当前偏移量。 2. **读取事件**:处理器查询“日记提供者”以获取具有特定标签的事件,从当前偏移量开始。 3. **处理事件**:处理器处理它从事件流中读取的每个事件。 4. **写入移量**:一旦处理器成功处理了事件,它就会使用后者的偏移量值更新其偏移量。 由于不同的处理器可能会读取同一事件流,因此必须能够唯一区分它们的偏移量。 因此,“事件处理器流”由处理器 ID 和特定标签唯一标识。 ![EventProcessorStream](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/d0f22c4496052304.svg) 总体而言,CQRS 读端的“事件处理器流”实现提供了一种高效且可扩展的方式来处理大量事件,同时确保每个事件对于每个流处理器仅处理一次,并且读模型与写端保持最新和一致。 ![CQRS-read-side](https://static.pigsec.cn/wp-content/uploads/repos/2026/04/5ca028fdba052305.svg) ## 与其他子系统的集成 事件提供了一种与其他子系统通信的有用方式。您的事件存储可以发布事件,以通知其他感兴趣的子系统有关应用程序状态的更改。同样,事件存储提供了它向其他系统发布的所有事件的完整记录。 ## 快速开始 - 人员示例
标签:Akka, Akka Persistence, CQRS, DNS解析, ES, Scala, 事件溯源, 分布式系统, 后端开发, 命令查询职责分离, 响应大小分析, 响应式编程, 开源项目, 微服务架构, 数据一致性, 状态管理, 集群分片