SOFTNETWORK-APP/generic-persistence-api
GitHub: SOFTNETWORK-APP/generic-persistence-api
基于 Scala 和 Akka Persistence 的 CQRS/事件溯源框架,用于简化分布式系统中的读写分离与状态管理。
Stars: 1 | Forks: 0

[](https://codecov.io/gh/SOFTNETWORK-APP/generic-persistence-api/)
[](https://www.codacy.com/gh/SOFTNETWORK-APP/generic-persistence-api/dashboard?utm_source=github.com&utm_medium=referral&utm_content=SOFTNETWORK-APP/generic-persistence-api&utm_campaign=Badge_Grade)
[](https://opensource.org/licenses/MIT)
# 通用持久化 API
一个使用 akka persistence 和 Scala 编写的 **CQRS/ES** 框架

“CQRS 只是将原本的一个对象拆分为两个对象。这种分离是基于方法是命令还是查询(这与 Meyer 在命令查询分离中的定义相同:命令是任何改变状态的方法,而查询是任何返回值的方法)。”
—Greg Young,CQRS,基于任务的用户界面,事件溯源 天哪!
## CQRS 写端
**generic-persistence-api** 依赖 [Akka Persistence](https://doc.akka.io/docs/akka/current/typed/persistence.html) 来提供一种**可扩展**且**有弹性**的方式,通过命令和事件实现 CQRS 的写端,从而简化事件溯源系统的实现。
### 集群分片
集群分片是 Akka 中的一种分布式系统机制,它允许有状态 actor 分布在集群节点上,同时确保消息被路由到正确的 actor 实例,而无论该 actor 运行在哪个节点上,这为扩展有状态 actor 提供了强大的机制。
以下是集群分片工作原理的概述:
1. **Actor 实体**:每个需要使用集群分片进行分布的有状态 actor 被称为“actor 实体”。
2. **分片区域**:“分片区域”(节点)负责管理一组 actor 实体。每个分片区域负责系统中 actor 实体的一个子集。
3. **分片标识符**:每个 actor 实体被分配一个唯一的“分片标识符”,用于确定哪个分片区域负责管理该实体。
4. **消息路由**:当向 actor 实体发送消息时,消息首先发送到负责管理该实体的分片区域。然后,分片区域根据实体标识符(标识该分片内的特定实体)将消息路由到相应的 actor 实例。
5. **节点感知**:分片机制感知集群的状态,因此可以确保 actor 实例运行在当前可用且健康的节点上。
6. **持久化**:Akka Persistence 用于持久化每个 actor 实体的状态,确保状态是持久的,并且可以在节点故障或其他类型的系统故障中幸存。
默认情况下,分片标识符是实体标识符的 hashCode 的绝对值对分片总数取模的结果。分片数量由 `akka.cluster.sharding.number-of-shards` 配置。其值对于集群中的所有节点必须相同。
默认实现 _LeastShardAllocationStrategy_ 将新的分片分配给之前分配分片数量最少的分片区域(节点)。
集群分片的主要好处之一是,它允许系统通过将工作负载分布到多个节点来实现水平扩展。分片机制确保实体均匀分布在节点之间,从而防止任何单个节点成为瓶颈。
另一个好处是它为系统提供了容错能力和弹性。如果节点发生故障,由该节点管理的分片会自动移动到集群中的其他节点。这确保了即使发生节点故障,系统也能继续运行。
### 实体行为
在 Akka 中,“EventSourceBehavior”是一个旨在与 Akka Persistence 配合工作的实体 actor,用于确保其状态更改是持久的,并且可以在发生故障或重启时恢复。
使用 Akka **EventSourcedBehavior** 的写端通常涉及以下步骤:
1. **命令处理**:当接收到**命令**时,通常由代表该 actor 实体的 EventSourcedBehavior actor 实例处理。actor 使用实体的当前**状态**来确定如何处理该命令,并可能生成一个或多个**事件**,然后最终响应发送者。
2. **事件持久化**:生成事件后,它们会被追加到**日记**(journal)(使用 Akka Persistence 的事件溯源机制的事件日志)中,并附带可选的**标签**。这允许根据标签轻松过滤和查询事件,从而提高读端**投影**(projections)的效率。
3. **事件重放**:当创建或重启 actor 时,Akka Persistence 会自动从事件日志中重放所有事件,允许 actor 基于这些事件重建其当前状态。
4. **快照**:为了避免在每次创建或重启 actor 时都重放所有事件,Akka EventSourcedBehavior 支持快照。每 N 个事件或在满足给定状态谓词时,actor 可以保存其当前状态的**快照**,包括自上次快照以来累积的所有事件及其标签。此快照与事件日志一起持久化,可用于在稍后的时间点恢复 actor 的状态,而不是从头开始重放所有事件。
“实体行为”是一个**专用工厂**,它依赖集群分片来创建“EventSourceBehavior”actor 的实例,每个实例对应于由分片区域管理的单个 actor 实体。
它必须定义该行为将处理的命令和事件类的类型,以及处理其内存状态的类。
```
trait EntityBehavior[C <: Command, S <: State, E <: Event, R <: CommandResult]
```
必须定义 `persistenceId` 属性,以便定义底层实体 actor 的实体类型键,该键唯一标识集群中的实体类型。
```
final def TypeKey(implicit c: ClassTag[C]): EntityTypeKey[C] = EntityTypeKey[C](s"$persistenceId-$environment")
/** @return
* the key used to define the Entity Type Key of this actor
*/
def persistenceId: String
```
它定义了几个可能被覆盖的参数。
```
/** @return
* number of events before saving a snapshot of the current actor entity state. If multiple
* events are persisted with a single Effect the snapshot will happen after all of the events
* are persisted rather than precisely every `snapshotInterval`
*/
def snapshotInterval: Int = 10
/** @return
* number of snapshots to keep
*/
def numberOfSnapshots: Int = 2
/** @return
* node role required to start the entity actor
*/
def role: String = ""
```
实体 actor 的行为由一组命令和事件处理程序决定,这些处理程序由用户定义。
```
/** @param entityId
* - entity identity
* @param state
* - current state
* @param command
* - command to handle
* @param replyTo
* - optional actor to reply to
* @param timers
* - scheduled messages associated with this entity behavior
* @return
* effect
*/
def handleCommand(
entityId: String,
state: Option[S],
command: C,
replyTo: Option[ActorRef[R]],
timers: TimerScheduler[C]
)(implicit context: ActorContext[C]): Effect[E, Option[S]] =
command match {
case _ => Effect.unhandled
}
/** This method is invoked whenever an event has been persisted successfully or when the entity is
* started up to recover its state from the stored events
*
* @param state
* - current state
* @param event
* - event to hanlde
* @return
* new in-memory state after applying the event to the previous state
*/
def handleEvent(state: Option[S], event: E)(implicit context: ActorContext[_]): Option[S] =
event match {
case _ => state
}
/** associate a set of tags to an event before the latter will be appended to the event log
*
* This allows events to be easily filtered and queried based on their tags, improving the
* efficiency of read-side projections
*
* @param entityId
* - entity id
* @param event
* - the event to tag
* @return
* set of tags to associate to this event
*/
protected def tagEvent(entityId: String, event: E): Set[String] = Set.empty
/** This method is called just after the state of the corresponding entity has been successfully
* recovered
*
* @param state
* - current state
* @param context
* - actor context
*/
def postRecoveryCompleted(state: Option[S])(implicit context: ActorContext[C]): Unit = {}
```
在其初始化期间,工厂使用集群分片为 actor 实体创建一个分片区域,该区域将负责管理实体的生命周期,包括创建新实体、停止实体以及将命令路由到适当的实体。
```
def init(system: ActorSystem[_], maybeRole: Option[String] = None)(implicit
c: ClassTag[C]
): ActorRef[ShardingEnvelope[C]] = {
ClusterSharding(system) init Entity(TypeKey) { entityContext =>
this(
entityContext.entityId,
PersistenceId(entityContext.entityTypeKey.name, entityContext.entityId)
)
}.withRole(maybeRole.getOrElse(role))
}
```
该框架处理事件持久化、重放和快照的复杂性,使开发人员能够专注于定义领域实体对命令的响应行为。
生成的系统非常高效,能够从**快照**快速重建其状态并仅重放部分事件,同时仍然确保系统状态的准确性。
此外,提供**标记事件**的能力使得**读端投影**可以轻松实施和维护,从而提高系统的整体性能和可扩展性。

### 实体模式
该框架定义了“实体模式”,以允许发送者与分布式系统内的 actor 实体轻松通信。
要向 actor 实体发送命令,需要后者的引用。可以通过将用于定义该 actor 的**实体类型键**及其**实体标识符**的键传递给集群分片来获取此引用。
以下是“实体模式”在集群分片上下文中如何工作的概述:
1. **接收命令**:发送者将命令连同要交互的 actor 的实体标识符一起发送给实体模式。
2. **检索 EntityRef**:实体模式通过将其实体类型键和实体标识符传递给集群分片,检索与该实体标识符对应的实体引用。
3. **询问实体**:实体模式在实体引用上调用 ask 方法以发送消息并等待响应。
4. **消息路由**:消息被路由到分片协调器,然后根据实体的分片标识符路由到 actor 实例运行所在的正确节点。
5. **消息处理**:实体 actor 接收并处理消息,可能会修改其状态并发回响应消息。
6. **Future 完成**:当接收到响应消息时,代表响应消息的 Future 将以接收到的响应消息完成。
7. **超时处理**:如果在指定的超时时间内未收到响应消息,Future 将以 TimeoutException 完成。
“实体模式”必须定义它将处理的命令类以及相应的实体类型键。
```
trait EntityPattern[C <: Command, R <: CommandResult] extends Patterns[C, R] with Entity {
_: CommandTypeKey[C] =>
type Recipient = String
//...
}
```
它公开了多种方法,允许发送者与 actor 实体进行通信。
```
def ?(recipient: Recipient, command: C)(implicit
tTag: ClassTag[C],
system: ActorSystem[_]
): Future[R]
def !(recipient: Recipient, command: C)(implicit
tTag: ClassTag[C],
system: ActorSystem[_]
): Unit
def ??[T](key: T, command: C)(implicit tTag: ClassTag[C], system: ActorSystem[_]): Future[R]
def ?(implicit tTag: ClassTag[C], system: ActorSystem[_]): Unit
```

### 命令
在 CQRS 中,命令在定义系统的写端方面起着关键作用。命令是一条消息,它封装了用户执行特定操作的意图,例如创建新帐户、更新现有记录或删除数据。命令的主要目的是启动系统状态的更改。
命令响应在向发起命令的用户提供反馈方面也起着重要作用。当用户向系统发送命令时,他们期望收到确认命令已成功执行或发生错误的响应。
该框架将 trait **_Command_** 定义为发送给系统中接收者的任何命令的根接口,并将 trait **_CommandResult_** 定义为任何命令响应的根接口。

### 事件
事件为同步写端(由处理命令产生)的更改与读端提供了基础。
如果写端在应用程序状态更改时引发事件,读端应响应该事件并更新其查询和视图所使用的数据。
该框架将 trait **_Event_** 定义为系统中任何事件的根接口。

### 状态
实体 actor 的状态可以分为两部分:内存状态和持久化状态。
内存状态表示 actor 正在处理时的当前状态。当 actor 接收命令并据此生成事件时,此状态会被修改。内存状态可以是任何类型的数据结构,例如 case class 或集合。
持久化状态表示 actor 存储在日记中的状态。此状态包括由 actor 生成的所有事件,可用于在发生故障或重启时重建 actor 的内存状态。持久化状态由 Akka Persistence 日记维护,actor 无法直接访问。
当创建实体 actor 时,它以空的内存状态和持久化状态开始。随着事件被持久化到日记,持久化状态会更新,并通过重放日记中的事件来重建内存状态。
该框架将 trait **_State_** 定义为任何实体 actor 的内存状态的根接口。

### 序列化
Akka Persistence 提供了一种内置的序列化机制,使用 Akka Serialization 库。该库允许您为自定义数据类型定义序列化器,以便在持久化到事件存储时自动进行序列化和反序列化。
当 Akka actor 接收到命令时,它会创建一个或多个领域事件并将其发送到事件日记以进行持久化。在持久化事件之前,会使用配置的序列化机制对其进行序列化。当从事件日记读取事件时,会使用相同的机制将其反序列化回领域事件。
最后,在持久化快照之前,actor 的当前状态(其内存状态)会使用配置的序列化机制进行序列化。
该框架原生支持三种类型的**序列化器**(Serializers):
+ **proto**
+ **jackson-cbor**
+ **chill**
```
akka {
actor {
allow-java-serialization = off
serializers {
proto = "akka.remote.serialization.ProtobufSerializer"
jackson-cbor = "akka.serialization.jackson.JacksonCborSerializer"
chill = "com.twitter.chill.akka.AkkaSerializer"
}
...
}
}
```
默认[配置](core/src/main/resources/softnetwork-persistence.conf)定义了以下序列化绑定:
```
akka {
actor {
...
enable-additional-serialization-bindings = on
serialization-bindings {
"app.softnetwork.persistence.model.package$Timestamped" = proto
"app.softnetwork.persistence.message.package$ProtobufEvent" = proto # protobuf events
"app.softnetwork.persistence.model.package$ProtobufDomainObject" = proto # protobuf domain objects
"app.softnetwork.persistence.model.package$ProtobufStateObject" = proto # protobuf state objects
"app.softnetwork.persistence.message.package$CborEvent" = jackson-cbor # cbor events
"app.softnetwork.persistence.model.package$CborDomainObject" = jackson-cbor # cbor domain objects
"app.softnetwork.persistence.message.package$Command" = chill
"app.softnetwork.persistence.message.package$CommandResult" = chill
"app.softnetwork.persistence.message.package$Event" = chill
"app.softnetwork.persistence.model.package$State" = chill
}
}
}
```

#### 版本控制
您将来可能会发现有必要更改特定事件类型或聚合的定义。
您必须考虑系统将如何处理事件类型和聚合的多个版本。
### 事件溯源

## CQRS 读端
虽然通过重放事件流(或其过去某个时间点的状态)来加载对象的当前状态很容易,但执行诸如“查找我所有总值超过 250 美元的订单”之类的查询是困难或昂贵的。
通过实施 CQRS 模式,此类查询通常将在读端执行,在那里您可以确保构建专门用于回答此类问题的数据投影。
CQRS 的读端负责提供对写端写入的数据的快速高效的读访问。
它可以通过多种方式实现,但一种常见的模式是使用事件溯源。在这种方法中,读端订阅由写端发布的事件流,并处理事件以构建和更新读模型。
这种方法通过框架中的“事件处理器流”来实现。
### 事件处理器流
CQRS 读端的“事件处理器流”实现根据**标签**订阅特定事件,这些事件由写端通过事件溯源发布。通过订阅这些事件,读端可以接收和处理仅与其领域相关的事件。
从流中读取事件时,重要的是要跟踪流中读取上一个事件的位置。此位置通常称为“偏移量”(offset)。偏移量可用于确保流中的每个事件仅被读取一次,即使多次读取该流也是如此。
为了确保每个事件仅被读取一次,“事件处理器流”维护一个**偏移量**,这是一个指向它**成功**处理的最后一个事件的指针。通过存储其偏移量,处理器可以在发生故障或重启时从停止的地方继续处理事件。通过跟踪偏移量,“事件流处理器”_消费者_可以确保流中的每个事件仅被读取一次。如果应用程序崩溃或重启,它可以从最后已知的偏移量恢复读取,确保不会遗漏或重复事件。
以下是“事件流处理器”处理偏移量的方式:
1. **初始化偏移量**:当处理器(重新)启动时,它查询“偏移量提供者”以检索其在事件流中成功处理的最后一个事件所对应的当前偏移量。
2. **读取事件**:处理器查询“日记提供者”以获取具有特定标签的事件,从当前偏移量开始。
3. **处理事件**:处理器处理它从事件流中读取的每个事件。
4. **写入移量**:一旦处理器成功处理了事件,它就会使用后者的偏移量值更新其偏移量。
由于不同的处理器可能会读取同一事件流,因此必须能够唯一区分它们的偏移量。
因此,“事件处理器流”由处理器 ID 和特定标签唯一标识。

总体而言,CQRS 读端的“事件处理器流”实现提供了一种高效且可扩展的方式来处理大量事件,同时确保每个事件对于每个流处理器仅处理一次,并且读模型与写端保持最新和一致。

## 与其他子系统的集成
事件提供了一种与其他子系统通信的有用方式。您的事件存储可以发布事件,以通知其他感兴趣的子系统有关应用程序状态的更改。同样,事件存储提供了它向其他系统发布的所有事件的完整记录。
## 快速开始 - 人员示例
标签:Akka, Akka Persistence, CQRS, DNS解析, ES, Scala, 事件溯源, 分布式系统, 后端开发, 命令查询职责分离, 响应大小分析, 响应式编程, 开源项目, 微服务架构, 数据一致性, 状态管理, 集群分片