diff --git a/akka-docs/src/main/paradox/typed/index-persistence.md b/akka-docs/src/main/paradox/typed/index-persistence.md index 5976589f98..a3da54c018 100644 --- a/akka-docs/src/main/paradox/typed/index-persistence.md +++ b/akka-docs/src/main/paradox/typed/index-persistence.md @@ -20,5 +20,6 @@ project.description: Event Sourcing with Akka Persistence enables actors to pers * [persistence-query-leveldb](../persistence-query-leveldb.md) * [persistence-plugins](../persistence-plugins.md) * [persistence-journals](../persistence-journals.md) +* [active-active-examples](persistence-active-active-examples.md) @@@ diff --git a/akka-docs/src/main/paradox/typed/persistence-active-active-examples.md b/akka-docs/src/main/paradox/typed/persistence-active-active-examples.md new file mode 100644 index 0000000000..782557c848 --- /dev/null +++ b/akka-docs/src/main/paradox/typed/persistence-active-active-examples.md @@ -0,0 +1,106 @@ +# Active-Active Examples + +The following are more realistic examples of building systems with active-active event sourcing. + +## Auction + +In this example we want to show that real-world applications can be implemented by designing events in a way that they +don't conflict. In the end, you will end up with a solution based on a custom CRDT. + +We are building a small auction service. It has the following operations: + + * Place a bid + * Get the highest bid + * Finish the auction + +We model those operations as commands to be sent to the auction actor: + +Scala +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #commands } + +The events: + +Scala +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #events } + +The winner does not have to pay the highest bid but only enough to beat the second highest so the `highestCounterOffer` is in the `AuctionFinished` event. + +Let's have a look at the auction entity that will handle incoming commands: + +Scala +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #command-handler } + +There is nothing specific to active-active about the command handler. It is the same as a command handler for a standard `EventSourcedBehavior`. +For `OfferBid` and `AuctionFinished` we do nothing more than to emit +events corresponding to the command. For `GetHighestBid` we respond with details from the state. Note, that we overwrite the actual +offer of the highest bid here with the amount of the `highestCounterOffer`. This is done to follow the popular auction style where +the actual highest bid is never publicly revealed. + +The auction entity is started with the initial parameters for the auction. +The initial state is taken from a `AuctionSetup` instance. The minimum bid is modelled as +an `initialBid`. + +Scala +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #setup } + + +The auction moves through the following phases: + +Scala +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #phase } + +The closing and closed states are to model waiting for all replicas to see the result of the auction before +actually closing the action. + +Let's have a look at our state class, `AuctionState` which also represents the CRDT in our example. + + +Scala +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #state } + +The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid, +and the highest counter offer so far. + +In the `eventHandler`, we handle persisted events to drive the state change. When a new bid is registered, + + * it needs to be decided whether the new bid is the winning bid or not + * the state needs to be updated accordingly + +The point of CRDTs is that the state must be end up being the same regardless of the order the events have been processed. +We can see how this works in the auction example: we are only interested in the highest bid, so, if we can define an +ordering on all bids, it should suffice to compare the new bid with currently highest to eventually end up with the globally +highest regardless of the order in which the events come in. + +The ordering between bids is crucial, therefore. We need to ensure that it is deterministic and does not depend on local state +outside of our state class so that all replicas come to the same result. We define the ordering as this: + + * A higher bid wins. + * If there's a tie between the two highest bids, the bid that was registered earlier wins. For that we keep track of the + (local) timestamp the bid was registered. + * We need to make sure that no timestamp is used twice in the same replica (missing in this example). + * If there's a tie between the timestamp, we define an arbitrary but deterministic ordering on the replicas, in our case + we just compare the name strings of the replicas. That's why we need to keep the identifier of the replica where a bid was registered + for every `Bid`. + +If the new bid was higher, we keep this one as the new highest and keep the amount of the former highest as the `highestCounterOffer`. +If the new bid was lower, we just update the `highestCounterOffer` if necessary. + +Using those rules, the order of incoming does not matter. Replicas will eventually converge to the same result. + +## Triggering closing + +In the auction we want to ensure that all bids are seen before declaring a winner. That means that an auction can only be closed once +all replicas have seen all bids. + +In the event handler above, when recovery is not running, it calls `eventTriggers`. + +Scala +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala) { #event-triggers } + +The event trigger uses the `ActiveActiveContext` to decide when to trigger the Finish of the action. +When a replica saves the `AuctionFinished` event it checks whether it should close the auction. +For the close to happen the replica must be the one designated to close and all replicas must have +reported that they have finished. + + + diff --git a/akka-docs/src/main/paradox/typed/persistence-active-active.md b/akka-docs/src/main/paradox/typed/persistence-active-active.md index 4b3573f003..61a4bd88e0 100644 --- a/akka-docs/src/main/paradox/typed/persistence-active-active.md +++ b/akka-docs/src/main/paradox/typed/persistence-active-active.md @@ -173,7 +173,25 @@ Including the full state in each event is often not desired. An event typically ## Side effects -TODO https://github.com/akka/akka/issues/29318 +In most cases it is recommended to do side effects as @ref[described for `EventSourcedBehavior`s](./persistence.md#effects-and-side-effects). + +Side effects from the event handler are generally discouraged because the event handlers are also used during replay and when consuming replicated events and that would +result in undesired re-execution of the side effects. + +Uses cases for doing side effects in the event handler: +* Doing a side effect only in a single replica +* Doing a side effect once all replicas have seen an event +* A side effect for a replicated event +* A side effect when a conflict has occured + +There is no built in support for knowing an event has been replicated to all replicas but it can be modelled in your state. +For some use cases you may need to trigger side effects after consuming replicated events. For example when an auction has been closed in +all data centers and all bids have been replicated. + +The @api[ActiveActiveContext] contains the current replica, the origin replica for the event processes, and if a recovery is running. These can be used to +implement side effects that take place once events are fully replicated. If the side effect should happen only once then a particular replica can be +designated to do it. The @ref[Auction example](./persistence-active-active-examples.md#auction) uses these techniques. + ## How it works diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala index 65dcbf9a14..b0b553bc42 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/AAAuctionExampleSpec.scala @@ -24,30 +24,32 @@ object AAAuctionExampleSpec { case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: ReplicaId) - // commands + //#commands sealed trait AuctionCommand case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica final case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand final case class GetHighestBid(replyTo: ActorRef[Bid]) extends AuctionCommand final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand private final case object Close extends AuctionCommand // Internal, should not be sent from the outside + //#commands + //#events sealed trait AuctionEvent extends CborSerializable final case class BidRegistered(bid: Bid) extends AuctionEvent - final case class AuctionFinished(atDc: ReplicaId) extends AuctionEvent - final case class WinnerDecided(atDc: ReplicaId, winningBid: Bid, highestCounterOffer: MoneyAmount) + final case class AuctionFinished(atReplica: ReplicaId) extends AuctionEvent + final case class WinnerDecided(atReplica: ReplicaId, winningBid: Bid, highestCounterOffer: MoneyAmount) extends AuctionEvent + //#events + //#phase sealed trait AuctionPhase case object Running extends AuctionPhase - final case class Closing(finishedAtDc: Set[ReplicaId]) extends AuctionPhase + final case class Closing(finishedAtReplica: Set[ReplicaId]) extends AuctionPhase case object Closed extends AuctionPhase + //#phase - case class AuctionState( - phase: AuctionPhase, - highestBid: Bid, - highestCounterOffer: MoneyAmount // in ebay style auctions, we need to keep track of current highest counter offer - ) { + //#state + case class AuctionState(phase: AuctionPhase, highestBid: Bid, highestCounterOffer: MoneyAmount) { def applyEvent(event: AuctionEvent): AuctionState = event match { @@ -90,14 +92,18 @@ object AAAuctionExampleSpec { (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.id .compareTo(second.originDc.id) < 0) } + //#state + //#setup case class AuctionSetup( name: String, initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner closingAt: Instant, responsibleForClosing: Boolean, - allDcs: Set[ReplicaId]) + allReplicas: Set[ReplicaId]) + //#setup + //#command-handler def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], aaContext: ActiveActiveContext)( state: AuctionState, command: AuctionCommand): Effect[AuctionEvent, AuctionState] = { @@ -105,7 +111,7 @@ object AAAuctionExampleSpec { case Closing(_) | Closed => command match { case GetHighestBid(replyTo) => - replyTo ! state.highestBid + replyTo ! state.highestBid.copy(offer = state.highestCounterOffer) // TODO this is not as described Effect.none case IsClosed(replyTo) => replyTo ! (state.phase == Closed) @@ -143,14 +149,15 @@ object AAAuctionExampleSpec { } } } + //#command-handler private def shouldClose(auctionSetup: AuctionSetup, state: AuctionState): Boolean = { auctionSetup.responsibleForClosing && (state.phase match { case Closing(alreadyFinishedAtDc) => - val allDone = auctionSetup.allDcs.diff(alreadyFinishedAtDc).isEmpty + val allDone = auctionSetup.allReplicas.diff(alreadyFinishedAtDc).isEmpty if (!allDone) { println( - s"Not closing auction as not all DCs have reported finished. All DCs: ${auctionSetup.allDcs}. Reported finished ${alreadyFinishedAtDc}") + s"Not closing auction as not all DCs have reported finished. All DCs: ${auctionSetup.allReplicas}. Reported finished ${alreadyFinishedAtDc}") } allDone case _ => @@ -158,6 +165,7 @@ object AAAuctionExampleSpec { }) } + //#event-handler def eventHandler(ctx: ActorContext[AuctionCommand], aaCtx: ActiveActiveContext, setup: AuctionSetup)( state: AuctionState, event: AuctionEvent): AuctionState = { @@ -170,7 +178,9 @@ object AAAuctionExampleSpec { newState } + //#event-handler + //#event-triggers private def eventTriggers( setup: AuctionSetup, ctx: ActorContext[AuctionCommand], @@ -183,7 +193,7 @@ object AAAuctionExampleSpec { case Closing(alreadyFinishedAtDc) => ctx.log.infoN( "AuctionFinished at {}, already finished at [{}]", - finished.atDc, + finished.atReplica, alreadyFinishedAtDc.mkString(", ")) if (alreadyFinishedAtDc(aaCtx.replicaId)) { if (shouldClose(setup, newState)) ctx.self ! Close @@ -197,6 +207,7 @@ object AAAuctionExampleSpec { case _ => // no trigger for this event } } + //#event-triggers def initialState(setup: AuctionSetup) = AuctionState(phase = Running, highestBid = setup.initialBid, highestCounterOffer = setup.initialBid.offer) @@ -204,7 +215,7 @@ object AAAuctionExampleSpec { def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] { ctx => ActiveActiveEventSourcing - .withSharedJournal(setup.name, replica, setup.allDcs, PersistenceTestKitReadJournal.Identifier) { aaCtx => + .withSharedJournal(setup.name, replica, setup.allReplicas, PersistenceTestKitReadJournal.Identifier) { aaCtx => EventSourcedBehavior( aaCtx.persistenceId, initialState(setup),