Docs and example for triggers (#29396)
This commit is contained in:
parent
21713bbc30
commit
f531d1e57d
4 changed files with 152 additions and 16 deletions
|
|
@ -20,5 +20,6 @@ project.description: Event Sourcing with Akka Persistence enables actors to pers
|
||||||
* [persistence-query-leveldb](../persistence-query-leveldb.md)
|
* [persistence-query-leveldb](../persistence-query-leveldb.md)
|
||||||
* [persistence-plugins](../persistence-plugins.md)
|
* [persistence-plugins](../persistence-plugins.md)
|
||||||
* [persistence-journals](../persistence-journals.md)
|
* [persistence-journals](../persistence-journals.md)
|
||||||
|
* [active-active-examples](persistence-active-active-examples.md)
|
||||||
|
|
||||||
@@@
|
@@@
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,106 @@
|
||||||
|
# Active-Active Examples
|
||||||
|
|
||||||
|
The following are more realistic examples of building systems with active-active event sourcing.
|
||||||
|
|
||||||
|
## Auction
|
||||||
|
|
||||||
|
In this example we want to show that real-world applications can be implemented by designing events in a way that they
|
||||||
|
don't conflict. In the end, you will end up with a solution based on a custom CRDT.
|
||||||
|
|
||||||
|
We are building a small auction service. It has the following operations:
|
||||||
|
|
||||||
|
* Place a bid
|
||||||
|
* Get the highest bid
|
||||||
|
* Finish the auction
|
||||||
|
|
||||||
|
We model those operations as commands to be sent to the auction actor:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #commands }
|
||||||
|
|
||||||
|
The events:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #events }
|
||||||
|
|
||||||
|
The winner does not have to pay the highest bid but only enough to beat the second highest so the `highestCounterOffer` is in the `AuctionFinished` event.
|
||||||
|
|
||||||
|
Let's have a look at the auction entity that will handle incoming commands:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #command-handler }
|
||||||
|
|
||||||
|
There is nothing specific to active-active about the command handler. It is the same as a command handler for a standard `EventSourcedBehavior`.
|
||||||
|
For `OfferBid` and `AuctionFinished` we do nothing more than to emit
|
||||||
|
events corresponding to the command. For `GetHighestBid` we respond with details from the state. Note, that we overwrite the actual
|
||||||
|
offer of the highest bid here with the amount of the `highestCounterOffer`. This is done to follow the popular auction style where
|
||||||
|
the actual highest bid is never publicly revealed.
|
||||||
|
|
||||||
|
The auction entity is started with the initial parameters for the auction.
|
||||||
|
The initial state is taken from a `AuctionSetup` instance. The minimum bid is modelled as
|
||||||
|
an `initialBid`.
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #setup }
|
||||||
|
|
||||||
|
|
||||||
|
The auction moves through the following phases:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #phase }
|
||||||
|
|
||||||
|
The closing and closed states are to model waiting for all replicas to see the result of the auction before
|
||||||
|
actually closing the action.
|
||||||
|
|
||||||
|
Let's have a look at our state class, `AuctionState` which also represents the CRDT in our example.
|
||||||
|
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #state }
|
||||||
|
|
||||||
|
The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid,
|
||||||
|
and the highest counter offer so far.
|
||||||
|
|
||||||
|
In the `eventHandler`, we handle persisted events to drive the state change. When a new bid is registered,
|
||||||
|
|
||||||
|
* it needs to be decided whether the new bid is the winning bid or not
|
||||||
|
* the state needs to be updated accordingly
|
||||||
|
|
||||||
|
The point of CRDTs is that the state must be end up being the same regardless of the order the events have been processed.
|
||||||
|
We can see how this works in the auction example: we are only interested in the highest bid, so, if we can define an
|
||||||
|
ordering on all bids, it should suffice to compare the new bid with currently highest to eventually end up with the globally
|
||||||
|
highest regardless of the order in which the events come in.
|
||||||
|
|
||||||
|
The ordering between bids is crucial, therefore. We need to ensure that it is deterministic and does not depend on local state
|
||||||
|
outside of our state class so that all replicas come to the same result. We define the ordering as this:
|
||||||
|
|
||||||
|
* A higher bid wins.
|
||||||
|
* If there's a tie between the two highest bids, the bid that was registered earlier wins. For that we keep track of the
|
||||||
|
(local) timestamp the bid was registered.
|
||||||
|
* We need to make sure that no timestamp is used twice in the same replica (missing in this example).
|
||||||
|
* If there's a tie between the timestamp, we define an arbitrary but deterministic ordering on the replicas, in our case
|
||||||
|
we just compare the name strings of the replicas. That's why we need to keep the identifier of the replica where a bid was registered
|
||||||
|
for every `Bid`.
|
||||||
|
|
||||||
|
If the new bid was higher, we keep this one as the new highest and keep the amount of the former highest as the `highestCounterOffer`.
|
||||||
|
If the new bid was lower, we just update the `highestCounterOffer` if necessary.
|
||||||
|
|
||||||
|
Using those rules, the order of incoming does not matter. Replicas will eventually converge to the same result.
|
||||||
|
|
||||||
|
## Triggering closing
|
||||||
|
|
||||||
|
In the auction we want to ensure that all bids are seen before declaring a winner. That means that an auction can only be closed once
|
||||||
|
all replicas have seen all bids.
|
||||||
|
|
||||||
|
In the event handler above, when recovery is not running, it calls `eventTriggers`.
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #event-triggers }
|
||||||
|
|
||||||
|
The event trigger uses the `ActiveActiveContext` to decide when to trigger the Finish of the action.
|
||||||
|
When a replica saves the `AuctionFinished` event it checks whether it should close the auction.
|
||||||
|
For the close to happen the replica must be the one designated to close and all replicas must have
|
||||||
|
reported that they have finished.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -173,7 +173,25 @@ Including the full state in each event is often not desired. An event typically
|
||||||
|
|
||||||
## Side effects
|
## Side effects
|
||||||
|
|
||||||
TODO https://github.com/akka/akka/issues/29318
|
In most cases it is recommended to do side effects as @ref[described for `EventSourcedBehavior`s](./persistence.md#effects-and-side-effects).
|
||||||
|
|
||||||
|
Side effects from the event handler are generally discouraged because the event handlers are also used during replay and when consuming replicated events and that would
|
||||||
|
result in undesired re-execution of the side effects.
|
||||||
|
|
||||||
|
Uses cases for doing side effects in the event handler:
|
||||||
|
* Doing a side effect only in a single replica
|
||||||
|
* Doing a side effect once all replicas have seen an event
|
||||||
|
* A side effect for a replicated event
|
||||||
|
* A side effect when a conflict has occured
|
||||||
|
|
||||||
|
There is no built in support for knowing an event has been replicated to all replicas but it can be modelled in your state.
|
||||||
|
For some use cases you may need to trigger side effects after consuming replicated events. For example when an auction has been closed in
|
||||||
|
all data centers and all bids have been replicated.
|
||||||
|
|
||||||
|
The @api[ActiveActiveContext] contains the current replica, the origin replica for the event processes, and if a recovery is running. These can be used to
|
||||||
|
implement side effects that take place once events are fully replicated. If the side effect should happen only once then a particular replica can be
|
||||||
|
designated to do it. The @ref[Auction example](./persistence-active-active-examples.md#auction) uses these techniques.
|
||||||
|
|
||||||
|
|
||||||
## How it works
|
## How it works
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -24,30 +24,32 @@ object AAAuctionExampleSpec {
|
||||||
|
|
||||||
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: ReplicaId)
|
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: ReplicaId)
|
||||||
|
|
||||||
// commands
|
//#commands
|
||||||
sealed trait AuctionCommand
|
sealed trait AuctionCommand
|
||||||
case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica
|
case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica
|
||||||
final case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand
|
final case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand
|
||||||
final case class GetHighestBid(replyTo: ActorRef[Bid]) extends AuctionCommand
|
final case class GetHighestBid(replyTo: ActorRef[Bid]) extends AuctionCommand
|
||||||
final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand
|
final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand
|
||||||
private final case object Close extends AuctionCommand // Internal, should not be sent from the outside
|
private final case object Close extends AuctionCommand // Internal, should not be sent from the outside
|
||||||
|
//#commands
|
||||||
|
|
||||||
|
//#events
|
||||||
sealed trait AuctionEvent extends CborSerializable
|
sealed trait AuctionEvent extends CborSerializable
|
||||||
final case class BidRegistered(bid: Bid) extends AuctionEvent
|
final case class BidRegistered(bid: Bid) extends AuctionEvent
|
||||||
final case class AuctionFinished(atDc: ReplicaId) extends AuctionEvent
|
final case class AuctionFinished(atReplica: ReplicaId) extends AuctionEvent
|
||||||
final case class WinnerDecided(atDc: ReplicaId, winningBid: Bid, highestCounterOffer: MoneyAmount)
|
final case class WinnerDecided(atReplica: ReplicaId, winningBid: Bid, highestCounterOffer: MoneyAmount)
|
||||||
extends AuctionEvent
|
extends AuctionEvent
|
||||||
|
//#events
|
||||||
|
|
||||||
|
//#phase
|
||||||
sealed trait AuctionPhase
|
sealed trait AuctionPhase
|
||||||
case object Running extends AuctionPhase
|
case object Running extends AuctionPhase
|
||||||
final case class Closing(finishedAtDc: Set[ReplicaId]) extends AuctionPhase
|
final case class Closing(finishedAtReplica: Set[ReplicaId]) extends AuctionPhase
|
||||||
case object Closed extends AuctionPhase
|
case object Closed extends AuctionPhase
|
||||||
|
//#phase
|
||||||
|
|
||||||
case class AuctionState(
|
//#state
|
||||||
phase: AuctionPhase,
|
case class AuctionState(phase: AuctionPhase, highestBid: Bid, highestCounterOffer: MoneyAmount) {
|
||||||
highestBid: Bid,
|
|
||||||
highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer
|
|
||||||
) {
|
|
||||||
|
|
||||||
def applyEvent(event: AuctionEvent): AuctionState =
|
def applyEvent(event: AuctionEvent): AuctionState =
|
||||||
event match {
|
event match {
|
||||||
|
|
@ -90,14 +92,18 @@ object AAAuctionExampleSpec {
|
||||||
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.id
|
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.id
|
||||||
.compareTo(second.originDc.id) < 0)
|
.compareTo(second.originDc.id) < 0)
|
||||||
}
|
}
|
||||||
|
//#state
|
||||||
|
|
||||||
|
//#setup
|
||||||
case class AuctionSetup(
|
case class AuctionSetup(
|
||||||
name: String,
|
name: String,
|
||||||
initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner
|
initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner
|
||||||
closingAt: Instant,
|
closingAt: Instant,
|
||||||
responsibleForClosing: Boolean,
|
responsibleForClosing: Boolean,
|
||||||
allDcs: Set[ReplicaId])
|
allReplicas: Set[ReplicaId])
|
||||||
|
//#setup
|
||||||
|
|
||||||
|
//#command-handler
|
||||||
def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ActiveActiveContext)(
|
def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ActiveActiveContext)(
|
||||||
state: AuctionState,
|
state: AuctionState,
|
||||||
command: AuctionCommand): Effect[AuctionEvent, AuctionState] = {
|
command: AuctionCommand): Effect[AuctionEvent, AuctionState] = {
|
||||||
|
|
@ -105,7 +111,7 @@ object AAAuctionExampleSpec {
|
||||||
case Closing(_) | Closed =>
|
case Closing(_) | Closed =>
|
||||||
command match {
|
command match {
|
||||||
case GetHighestBid(replyTo) =>
|
case GetHighestBid(replyTo) =>
|
||||||
replyTo ! state.highestBid
|
replyTo ! state.highestBid.copy(offer = state.highestCounterOffer) // TODO this is not as described
|
||||||
Effect.none
|
Effect.none
|
||||||
case IsClosed(replyTo) =>
|
case IsClosed(replyTo) =>
|
||||||
replyTo ! (state.phase == Closed)
|
replyTo ! (state.phase == Closed)
|
||||||
|
|
@ -143,14 +149,15 @@ object AAAuctionExampleSpec {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//#command-handler
|
||||||
|
|
||||||
private def shouldClose(auctionSetup: AuctionSetup, state: AuctionState): Boolean = {
|
private def shouldClose(auctionSetup: AuctionSetup, state: AuctionState): Boolean = {
|
||||||
auctionSetup.responsibleForClosing && (state.phase match {
|
auctionSetup.responsibleForClosing && (state.phase match {
|
||||||
case Closing(alreadyFinishedAtDc) =>
|
case Closing(alreadyFinishedAtDc) =>
|
||||||
val allDone = auctionSetup.allDcs.diff(alreadyFinishedAtDc).isEmpty
|
val allDone = auctionSetup.allReplicas.diff(alreadyFinishedAtDc).isEmpty
|
||||||
if (!allDone) {
|
if (!allDone) {
|
||||||
println(
|
println(
|
||||||
s"Not closing auction as not all DCs have reported finished. All DCs: ${auctionSetup.allDcs}. Reported finished ${alreadyFinishedAtDc}")
|
s"Not closing auction as not all DCs have reported finished. All DCs: ${auctionSetup.allReplicas}. Reported finished ${alreadyFinishedAtDc}")
|
||||||
}
|
}
|
||||||
allDone
|
allDone
|
||||||
case _ =>
|
case _ =>
|
||||||
|
|
@ -158,6 +165,7 @@ object AAAuctionExampleSpec {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#event-handler
|
||||||
def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ActiveActiveContext, setup: AuctionSetup)(
|
def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ActiveActiveContext, setup: AuctionSetup)(
|
||||||
state: AuctionState,
|
state: AuctionState,
|
||||||
event: AuctionEvent): AuctionState = {
|
event: AuctionEvent): AuctionState = {
|
||||||
|
|
@ -170,7 +178,9 @@ object AAAuctionExampleSpec {
|
||||||
newState
|
newState
|
||||||
|
|
||||||
}
|
}
|
||||||
|
//#event-handler
|
||||||
|
|
||||||
|
//#event-triggers
|
||||||
private def eventTriggers(
|
private def eventTriggers(
|
||||||
setup: AuctionSetup,
|
setup: AuctionSetup,
|
||||||
ctx: ActorContext[AuctionCommand],
|
ctx: ActorContext[AuctionCommand],
|
||||||
|
|
@ -183,7 +193,7 @@ object AAAuctionExampleSpec {
|
||||||
case Closing(alreadyFinishedAtDc) =>
|
case Closing(alreadyFinishedAtDc) =>
|
||||||
ctx.log.infoN(
|
ctx.log.infoN(
|
||||||
"AuctionFinished at {}, already finished at [{}]",
|
"AuctionFinished at {}, already finished at [{}]",
|
||||||
finished.atDc,
|
finished.atReplica,
|
||||||
alreadyFinishedAtDc.mkString(", "))
|
alreadyFinishedAtDc.mkString(", "))
|
||||||
if (alreadyFinishedAtDc(aaCtx.replicaId)) {
|
if (alreadyFinishedAtDc(aaCtx.replicaId)) {
|
||||||
if (shouldClose(setup, newState)) ctx.self ! Close
|
if (shouldClose(setup, newState)) ctx.self ! Close
|
||||||
|
|
@ -197,6 +207,7 @@ object AAAuctionExampleSpec {
|
||||||
case _ => // no trigger for this event
|
case _ => // no trigger for this event
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//#event-triggers
|
||||||
|
|
||||||
def initialState(setup: AuctionSetup) =
|
def initialState(setup: AuctionSetup) =
|
||||||
AuctionState(phase = Running, highestBid = setup.initialBid, highestCounterOffer = setup.initialBid.offer)
|
AuctionState(phase = Running, highestBid = setup.initialBid, highestCounterOffer = setup.initialBid.offer)
|
||||||
|
|
@ -204,7 +215,7 @@ object AAAuctionExampleSpec {
|
||||||
def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
|
def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
|
||||||
ctx =>
|
ctx =>
|
||||||
ActiveActiveEventSourcing
|
ActiveActiveEventSourcing
|
||||||
.withSharedJournal(setup.name, replica, setup.allDcs, PersistenceTestKitReadJournal.Identifier) { aaCtx =>
|
.withSharedJournal(setup.name, replica, setup.allReplicas, PersistenceTestKitReadJournal.Identifier) { aaCtx =>
|
||||||
EventSourcedBehavior(
|
EventSourcedBehavior(
|
||||||
aaCtx.persistenceId,
|
aaCtx.persistenceId,
|
||||||
initialState(setup),
|
initialState(setup),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue