Polish Auction example

* adjust the style in the Scala example
  * AuctionEntity class instead of Setup class that is passed around
* add timer in recovery completed
This commit is contained in:
Patrik Nordwall 2020-08-11 09:49:02 +02:00 committed by Christopher Batey
parent 5e9e490d88
commit ac469e1a56
8 changed files with 507 additions and 396 deletions

View file

@ -0,0 +1,121 @@
# Auction example
In this example we want to show that real-world applications can be implemented by designing events in a way that they
don't conflict. In the end, you will end up with a solution based on a custom CRDT.
We are building a small auction service. It has the following operations:
* Place a bid
* Get the highest bid
* Finish the auction
We model those operations as commands to be sent to the auction actor:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #commands }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #commands }
The events:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #events }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #events }
The winner does not have to pay the highest bid but only enough to beat the second highest so the `highestCounterOffer` is in the `AuctionFinished` event.
Let's have a look at the auction entity that will handle incoming commands:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #command-handler }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #command-handler }
There is nothing specific to Replicated Event Sourcing about the command handler. It is the same as a command handler for a standard `EventSourcedBehavior`.
For `OfferBid` and `AuctionFinished` we do nothing more than to emit
events corresponding to the command. For `GetHighestBid` we respond with details from the state. Note, that we overwrite the actual
offer of the highest bid here with the amount of the `highestCounterOffer`. This is done to follow the popular auction style where
the actual highest bid is never publicly revealed.
The auction entity is started with the initial parameters for the auction.
The minimum bid is modelled as an `initialBid`.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #setup }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #setup }
@@@ div { .group-scala }
The auction moves through the following phases:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #phase }
@@@
The closing and closed states are to model waiting for all replicas to see the result of the auction before
actually closing the action.
Let's have a look at our state class, `AuctionState` which also represents the CRDT in our example.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #state }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #state }
The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid,
and the highest counter offer so far.
In the `eventHandler`, we handle persisted events to drive the state change. When a new bid is registered,
* it needs to be decided whether the new bid is the winning bid or not
* the state needs to be updated accordingly
The point of CRDTs is that the state must be end up being the same regardless of the order the events have been processed.
We can see how this works in the auction example: we are only interested in the highest bid, so, if we can define an
ordering on all bids, it should suffice to compare the new bid with currently highest to eventually end up with the globally
highest regardless of the order in which the events come in.
The ordering between bids is crucial, therefore. We need to ensure that it is deterministic and does not depend on local state
outside of our state class so that all replicas come to the same result. We define the ordering as this:
* A higher bid wins.
* If there's a tie between the two highest bids, the bid that was registered earlier wins. For that we keep track of the
(local) timestamp the bid was registered.
* We need to make sure that no timestamp is used twice in the same replica (missing in this example).
* If there's a tie between the timestamp, we define an arbitrary but deterministic ordering on the replicas, in our case
we just compare the name strings of the replicas. That's why we need to keep the identifier of the replica where a bid was registered
for every `Bid`.
If the new bid was higher, we keep this one as the new highest and keep the amount of the former highest as the `highestCounterOffer`.
If the new bid was lower, we just update the `highestCounterOffer` if necessary.
Using those rules, the order of incoming does not matter. Replicas will eventually converge to the same result.
## Triggering closing
In the auction we want to ensure that all bids are seen before declaring a winner. That means that an auction can only be closed once
all replicas have seen all bids.
In the event handler above, when recovery is not running, it calls `eventTriggers`.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #event-triggers }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #event-triggers }
The event trigger uses the `ReplicationContext` to decide when to trigger the Finish of the action.
When a replica saves the `AuctionFinished` event it checks whether it should close the auction.
For the close to happen the replica must be the one designated to close and all replicas must have
reported that they have finished.

View file

@ -2,125 +2,11 @@
The following are more realistic examples of building systems with Replicated Event Sourcing.
## Auction
@@toc { depth=1 }
In this example we want to show that real-world applications can be implemented by designing events in a way that they
don't conflict. In the end, you will end up with a solution based on a custom CRDT.
@@@ index
We are building a small auction service. It has the following operations:
* Place a bid
* Get the highest bid
* Finish the auction
We model those operations as commands to be sent to the auction actor:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #commands }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #commands }
The events:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #events }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #events }
The winner does not have to pay the highest bid but only enough to beat the second highest so the `highestCounterOffer` is in the `AuctionFinished` event.
Let's have a look at the auction entity that will handle incoming commands:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #command-handler }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #command-handler }
There is nothing specific to Replicated Event Sourcing about the command handler. It is the same as a command handler for a standard `EventSourcedBehavior`.
For `OfferBid` and `AuctionFinished` we do nothing more than to emit
events corresponding to the command. For `GetHighestBid` we respond with details from the state. Note, that we overwrite the actual
offer of the highest bid here with the amount of the `highestCounterOffer`. This is done to follow the popular auction style where
the actual highest bid is never publicly revealed.
The auction entity is started with the initial parameters for the auction.
The initial state is taken from a `AuctionSetup` instance. The minimum bid is modelled as
an `initialBid`.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #setup }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #setup }
@@@ div { .group-scala }
The auction moves through the following phases:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #phase }
* [auction](replicated-eventsourcing-auction.md)
@@@
The closing and closed states are to model waiting for all replicas to see the result of the auction before
actually closing the action.
Let's have a look at our state class, `AuctionState` which also represents the CRDT in our example.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #state }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #state }
The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid,
and the highest counter offer so far.
In the `eventHandler`, we handle persisted events to drive the state change. When a new bid is registered,
* it needs to be decided whether the new bid is the winning bid or not
* the state needs to be updated accordingly
The point of CRDTs is that the state must be end up being the same regardless of the order the events have been processed.
We can see how this works in the auction example: we are only interested in the highest bid, so, if we can define an
ordering on all bids, it should suffice to compare the new bid with currently highest to eventually end up with the globally
highest regardless of the order in which the events come in.
The ordering between bids is crucial, therefore. We need to ensure that it is deterministic and does not depend on local state
outside of our state class so that all replicas come to the same result. We define the ordering as this:
* A higher bid wins.
* If there's a tie between the two highest bids, the bid that was registered earlier wins. For that we keep track of the
(local) timestamp the bid was registered.
* We need to make sure that no timestamp is used twice in the same replica (missing in this example).
* If there's a tie between the timestamp, we define an arbitrary but deterministic ordering on the replicas, in our case
we just compare the name strings of the replicas. That's why we need to keep the identifier of the replica where a bid was registered
for every `Bid`.
If the new bid was higher, we keep this one as the new highest and keep the amount of the former highest as the `highestCounterOffer`.
If the new bid was lower, we just update the `highestCounterOffer` if necessary.
Using those rules, the order of incoming does not matter. Replicas will eventually converge to the same result.
## Triggering closing
In the auction we want to ensure that all bids are seen before declaring a winner. That means that an auction can only be closed once
all replicas have seen all bids.
In the event handler above, when recovery is not running, it calls `eventTriggers`.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #event-triggers }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #event-triggers }
The event trigger uses the `ReplicationContext` to decide when to trigger the Finish of the action.
When a replica saves the `AuctionFinished` event it checks whether it should close the auction.
For the close to happen the replica must be the one designated to close and all replicas must have
reported that they have finished.

View file

@ -199,17 +199,17 @@ all data centers and all bids have been replicated.
The @api[ReplicationContext] contains the current replica, the origin replica for the event processes, and if a recovery is running. These can be used to
implement side effects that take place once events are fully replicated. If the side effect should happen only once then a particular replica can be
designated to do it. The @ref[Auction example](./replicated-eventsourcing-examples.md#auction) uses these techniques.
designated to do it. The @ref[Auction example](replicated-eventsourcing-auction.md) uses these techniques.
## How it works
You dont have to read this section to be able to use the feature, but to use the abstraction efficiently and for the right type of use cases it can be good to understand how its implemented. For example, it should give you the right expectations of the overhead that the solution introduces compared to using just `EventSourcedBehavior`s.
### Causal deliver order
### Causal delivery order
Causal delivery order means that events persisted in one replica are read in the same order in other replicas. The order of concurrent events is undefined, which should be no problem
when using [CRDT's](#conflict-free-replicated-data-types)
when using @ref:[CRDT's](#conflict-free-replicated-data-types)
and otherwise will be detected via the `ReplicationContext` concurrent method.
For example:

View file

@ -80,7 +80,8 @@ public class ReplicatedEventSourcingTest extends JUnitSuite {
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal("ReplicatedEventSourcingTest",
return ReplicatedEventSourcing.withSharedJournal(
"ReplicatedEventSourcingTest",
entityId,
replicaId,
allReplicas,

View file

@ -33,7 +33,12 @@ public class MyReplicatedBehavior
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, String queryPluginId) {
return ReplicatedEventSourcing.withSharedJournal(
"MyReplicatedEntity", entityId, replicaId, ALL_REPLICAS, queryPluginId, MyReplicatedBehavior::new);
"MyReplicatedEntity",
entityId,
replicaId,
ALL_REPLICAS,
queryPluginId,
MyReplicatedBehavior::new);
}
// #factory-shared
@ -44,7 +49,11 @@ public class MyReplicatedBehavior
allReplicasAndQueryPlugins.put(DCB, "journalForDCB");
return ReplicatedEventSourcing.create(
"MyReplicatedEntity", entityId, replicaId, allReplicasAndQueryPlugins, MyReplicatedBehavior::new);
"MyReplicatedEntity",
entityId,
replicaId,
allReplicasAndQueryPlugins,
MyReplicatedBehavior::new);
}
private MyReplicatedBehavior(ReplicationContext replicationContext) {

View file

@ -11,10 +11,18 @@ import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.TimerScheduler;
import akka.persistence.testkit.PersistenceTestKitPlugin;
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.RecoveryCompleted;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.CommandHandlerBuilder;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior;
import akka.persistence.typed.javadsl.ReplicatedEventSourcing;
import akka.persistence.typed.javadsl.ReplicationContext;
import akka.persistence.typed.javadsl.SignalHandler;
import akka.serialization.jackson.CborSerializable;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.junit.ClassRule;
@ -22,6 +30,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
@ -29,7 +38,7 @@ import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import static jdocs.akka.persistence.typed.ReplicatedAuctionExample.*;
import static jdocs.akka.persistence.typed.AuctionEntity.*;
import static org.junit.Assert.assertEquals;
public class ReplicatedAuctionExampleTest extends JUnitSuite {
@ -41,22 +50,14 @@ public class ReplicatedAuctionExampleTest extends JUnitSuite {
@Test
public void auctionExample() {
AuctionSetup setupA =
new AuctionSetup(
"old-skis",
new Bid("chbatey", 12, Instant.now(), R1),
Instant.now().plusSeconds(10),
true);
String auctionName = "old-skis";
Bid initialBid = new Bid("chbatey", 12, Instant.now(), R1);
Instant closeAt = Instant.now().plusSeconds(10);
AuctionSetup setupB =
new AuctionSetup(
"old-skis",
new Bid("chbatey", 12, Instant.now(), R1),
Instant.now().plusSeconds(10),
false);
ActorRef<Command> replicaA = testKit.spawn(create(setupA, R1));
ActorRef<Command> replicaB = testKit.spawn(create(setupA, R2));
ActorRef<Command> replicaA =
testKit.spawn(AuctionEntity.create(R1, auctionName, initialBid, closeAt, true));
ActorRef<Command> replicaB =
testKit.spawn(AuctionEntity.create(R2, auctionName, initialBid, closeAt, false));
replicaA.tell(new OfferBid("me", 100));
replicaA.tell(new OfferBid("me", 99));
@ -87,52 +88,24 @@ public class ReplicatedAuctionExampleTest extends JUnitSuite {
}
}
class ReplicatedAuctionExample
extends ReplicatedEventSourcedBehavior<Command, Event, AuctionState> {
// #setup
class AuctionEntity extends ReplicatedEventSourcedBehavior<Command, Event, AuctionState> {
public static ReplicaId R1 = new ReplicaId("R1");
public static ReplicaId R2 = new ReplicaId("R2");
public static Set<ReplicaId> ALL_REPLICAS = new HashSet<>(Arrays.asList(R1, R2));
private final ActorContext<Command> context;
private final AuctionSetup setup;
public static Behavior<Command> create(AuctionSetup setup, ReplicaId replica) {
return Behaviors.setup(
ctx ->
ReplicatedEventSourcing.withSharedJournal("Auction",
setup.name,
replica,
ALL_REPLICAS,
PersistenceTestKitReadJournal.Identifier(),
replicationCtx -> new ReplicatedAuctionExample(replicationCtx, ctx, setup)));
}
public ReplicatedAuctionExample(
ReplicationContext replicationContext, ActorContext<Command> context, AuctionSetup setup) {
super(replicationContext);
this.context = context;
this.setup = setup;
}
private final TimerScheduler<Command> timers;
private final Bid initialBid;
private final Instant closingAt;
private final boolean responsibleForClosing;
// #setup
static class AuctionSetup {
final String name;
final Bid initialBid; // the initial bid is the minimum price bidden at start time by the owner
final Instant closingAt;
final boolean responsibleForClosing;
public AuctionSetup(
String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) {
this.name = name;
this.initialBid = initialBid;
this.closingAt = closingAt;
this.responsibleForClosing = responsibleForClosing;
}
}
// #setup
public static final class Bid implements CborSerializable {
// #commands
public static final class Bid {
public final String bidder;
public final int offer;
public final Instant timestamp;
@ -146,7 +119,6 @@ class ReplicatedAuctionExample
}
}
// #commands
interface Command extends CborSerializable {}
public enum Finish implements Command {
@ -278,11 +250,72 @@ class ReplicatedAuctionExample
}
// #state
// #setup
public static Behavior<Command> create(
ReplicaId replica,
String name,
Bid initialBid,
Instant closingAt,
boolean responsibleForClosing) {
return Behaviors.setup(
ctx ->
Behaviors.withTimers(
timers ->
ReplicatedEventSourcing.withSharedJournal(
"Auction",
name,
replica,
ALL_REPLICAS,
PersistenceTestKitReadJournal.Identifier(),
replicationCtx ->
new AuctionEntity(
ctx,
replicationCtx,
timers,
initialBid,
closingAt,
responsibleForClosing))));
}
private AuctionEntity(
ActorContext<Command> context,
ReplicationContext replicationContext,
TimerScheduler<Command> timers,
Bid initialBid,
Instant closingAt,
boolean responsibleForClosing) {
super(replicationContext);
this.context = context;
this.timers = timers;
this.initialBid = initialBid;
this.closingAt = closingAt;
this.responsibleForClosing = responsibleForClosing;
}
@Override
public AuctionState emptyState() {
return new AuctionState(true, setup.initialBid, setup.initialBid.offer, Collections.emptySet());
return new AuctionState(true, initialBid, initialBid.offer, Collections.emptySet());
}
@Override
public SignalHandler<AuctionState> signalHandler() {
return newSignalHandlerBuilder()
.onSignal(RecoveryCompleted.instance(), this::onRecoveryCompleted)
.build();
}
private void onRecoveryCompleted(AuctionState state) {
if (shouldClose(state)) {
context.getSelf().tell(Close.INSTANCE);
}
long millisUntilClosing =
closingAt.toEpochMilli() - getReplicationContext().currentTimeMillis();
timers.startSingleTimer(Finish.INSTANCE, Duration.ofMillis(millisUntilClosing));
}
// #setup
// #command-handler
@Override
public CommandHandler<Command, Event, AuctionState> commandHandler() {
@ -356,6 +389,7 @@ class ReplicatedAuctionExample
}
// #command-handler
// #event-handler
@Override
public EventHandler<AuctionState, Event> eventHandler() {
return newEventHandlerBuilder()
@ -382,6 +416,7 @@ class ReplicatedAuctionExample
.onEvent(WinnerDecided.class, (state, event) -> state.close())
.build();
}
// #event-handler
// #event-triggers
private void eventTriggers(AuctionFinished event, AuctionState newState) {
@ -393,14 +428,17 @@ class ReplicatedAuctionExample
context.getSelf().tell(Finish.INSTANCE);
}
}
// #event-triggers
private boolean shouldClose(AuctionState state) {
return setup.responsibleForClosing
return responsibleForClosing
&& !state.isClosed()
&& getReplicationContext().getAllReplicas().stream()
.map(ReplicaId::id)
.collect(Collectors.toSet())
.equals(state.finishedAtDc);
}
// #event-triggers
// #setup
}
// #setup

View file

@ -26,7 +26,8 @@ public final class ReplicatedStringSet
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal("StringSet",
return ReplicatedEventSourcing.withSharedJournal(
"StringSet",
entityId,
replicaId,
allReplicas,

View file

@ -6,231 +6,290 @@ package docs.akka.persistence.typed
import java.time.Instant
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors, _ }
import akka.actor.typed.{ ActorRef, Behavior }
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.{ Effect, EventSourcedBehavior, ReplicatedEventSourcing, ReplicationContext }
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import scala.concurrent.duration._
import docs.akka.persistence.typed.ReplicatedAuctionExampleSpec.AuctionEntity
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.TimerScheduler
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.RecoveryCompleted
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.persistence.typed.scaladsl.ReplicationContext
import akka.serialization.jackson.CborSerializable
object ReplicatedAuctionExampleSpec {
type MoneyAmount = Int
//#setup
object AuctionEntity {
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originReplica: ReplicaId)
//#setup
//#commands
sealed trait AuctionCommand
case object Finish extends AuctionCommand // A timer needs to schedule this event at each replica
final case class OfferBid(bidder: String, offer: MoneyAmount) extends AuctionCommand
final case class GetHighestBid(replyTo: ActorRef[Bid]) extends AuctionCommand
final case class IsClosed(replyTo: ActorRef[Boolean]) extends AuctionCommand
private final case object Close extends AuctionCommand // Internal, should not be sent from the outside
//#commands
//#commands
type MoneyAmount = Int
//#events
sealed trait AuctionEvent extends CborSerializable
final case class BidRegistered(bid: Bid) extends AuctionEvent
final case class AuctionFinished(atReplica: ReplicaId) extends AuctionEvent
final case class WinnerDecided(atReplica: ReplicaId, winningBid: Bid, highestCounterOffer: MoneyAmount)
extends AuctionEvent
//#events
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originReplica: ReplicaId)
//#phase
sealed trait AuctionPhase
case object Running extends AuctionPhase
final case class Closing(finishedAtReplica: Set[ReplicaId]) extends AuctionPhase
case object Closed extends AuctionPhase
//#phase
sealed trait Command extends CborSerializable
case object Finish extends Command // A timer needs to schedule this event at each replica
final case class OfferBid(bidder: String, offer: MoneyAmount) extends Command
final case class GetHighestBid(replyTo: ActorRef[Bid]) extends Command
final case class IsClosed(replyTo: ActorRef[Boolean]) extends Command
private final case object Close extends Command // Internal, should not be sent from the outside
//#commands
//#state
case class AuctionState(phase: AuctionPhase, highestBid: Bid, highestCounterOffer: MoneyAmount) {
//#events
sealed trait Event extends CborSerializable
final case class BidRegistered(bid: Bid) extends Event
final case class AuctionFinished(atReplica: ReplicaId) extends Event
final case class WinnerDecided(atReplica: ReplicaId, winningBid: Bid, highestCounterOffer: MoneyAmount)
extends Event
//#events
def applyEvent(event: AuctionEvent): AuctionState =
event match {
case BidRegistered(b) =>
if (isHigherBid(b, highestBid))
withNewHighestBid(b)
else
withTooLowBid(b)
case AuctionFinished(atDc) =>
phase match {
case Running =>
copy(phase = Closing(Set(atDc)))
case Closing(alreadyFinishedDcs) =>
copy(phase = Closing(alreadyFinishedDcs + atDc))
case _ =>
this
}
case _: WinnerDecided =>
copy(phase = Closed)
//#phase
/**
* The auction passes through several workflow phases.
* First, in `Running` `OfferBid` commands are accepted.
*
* `AuctionEntity` instances in all DCs schedule a `Finish` command
* at a given time. That persists the `AuctionFinished` event and the
* phase is in `Closing` until the auction is finished in all DCs.
*
* When the auction has been finished no more `OfferBid` commands are accepted.
*
* The auction is also finished immediately if `AuctionFinished` event from another
* DC is seen before the scheduled `Finish` command. In that way the auction is finished
* as quickly as possible in all DCs even though there might be some clock skew.
*
* One DC is responsible for finally deciding the winner and publishing the result.
* All events must be collected from all DC before that can happen.
* When the responsible DC has seen all `AuctionFinished` events from other DCs
* all other events have also been propagated and it can persist `WinnerDecided` and
* the auction is finally `Closed`.
*
*/
sealed trait AuctionPhase
case object Running extends AuctionPhase
final case class Closing(finishedAtReplica: Set[ReplicaId]) extends AuctionPhase
case object Closed extends AuctionPhase
//#phase
//#state
case class AuctionState(phase: AuctionPhase, highestBid: Bid, highestCounterOffer: MoneyAmount)
extends CborSerializable {
def applyEvent(event: Event): AuctionState =
event match {
case BidRegistered(b) =>
if (isHigherBid(b, highestBid))
withNewHighestBid(b)
else
withTooLowBid(b)
case AuctionFinished(atDc) =>
phase match {
case Running =>
copy(phase = Closing(Set(atDc)))
case Closing(alreadyFinishedDcs) =>
copy(phase = Closing(alreadyFinishedDcs + atDc))
case _ =>
this
}
case _: WinnerDecided =>
copy(phase = Closed)
}
def withNewHighestBid(bid: Bid): AuctionState = {
require(phase != Closed)
require(isHigherBid(bid, highestBid))
copy(highestBid = bid, highestCounterOffer = highestBid.offer // keep last highest bid around
)
}
def withNewHighestBid(bid: Bid): AuctionState = {
require(phase != Closed)
require(isHigherBid(bid, highestBid))
copy(highestBid = bid, highestCounterOffer = highestBid.offer // keep last highest bid around
)
def withTooLowBid(bid: Bid): AuctionState = {
require(phase != Closed)
require(isHigherBid(highestBid, bid))
copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer
}
def isHigherBid(first: Bid, second: Bid): Boolean =
first.offer > second.offer ||
(first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
// If timestamps are equal, choose by dc where the offer was submitted
// In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
// particular DC would not be an advantage.
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originReplica.id
.compareTo(second.originReplica.id) < 0)
}
def withTooLowBid(bid: Bid): AuctionState = {
require(phase != Closed)
require(isHigherBid(highestBid, bid))
copy(highestCounterOffer = highestCounterOffer.max(bid.offer)) // update highest counter offer
//#state
//#setup
def apply(
replica: ReplicaId,
name: String,
initialBid: AuctionEntity.Bid, // the initial bid is basically the minimum price bidden at start time by the owner
closingAt: Instant,
responsibleForClosing: Boolean,
allReplicas: Set[ReplicaId]): Behavior[Command] = Behaviors.setup[Command] { ctx =>
Behaviors.withTimers { timers =>
ReplicatedEventSourcing
.withSharedJournal("auction", name, replica, allReplicas, PersistenceTestKitReadJournal.Identifier) {
replicationCtx =>
new AuctionEntity(ctx, replicationCtx, timers, closingAt, responsibleForClosing, allReplicas)
.behavior(initialBid)
}
}
}
def isHigherBid(first: Bid, second: Bid): Boolean =
first.offer > second.offer ||
(first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) || // if equal, first one wins
// If timestamps are equal, choose by dc where the offer was submitted
// In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
// particular DC would not be an advantage.
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originReplica.id
.compareTo(second.originReplica.id) < 0)
}
//#state
//#setup
case class AuctionSetup(
name: String,
initialBid: Bid, // the initial bid is basically the minimum price bidden at start time by the owner
class AuctionEntity(
context: ActorContext[AuctionEntity.Command],
replicationContext: ReplicationContext,
timers: TimerScheduler[AuctionEntity.Command],
closingAt: Instant,
responsibleForClosing: Boolean,
allReplicas: Set[ReplicaId])
//#setup
allReplicas: Set[ReplicaId]) {
import AuctionEntity._
//#command-handler
def commandHandler(setup: AuctionSetup, ctx: ActorContext[AuctionCommand], replicationContext: ReplicationContext)(
state: AuctionState,
command: AuctionCommand): Effect[AuctionEvent, AuctionState] = {
state.phase match {
case Closing(_) | Closed =>
command match {
case GetHighestBid(replyTo) =>
replyTo ! state.highestBid.copy(offer = state.highestCounterOffer) // TODO this is not as described
Effect.none
case IsClosed(replyTo) =>
replyTo ! (state.phase == Closed)
Effect.none
case Finish =>
ctx.log.info("Finish")
Effect.persist(AuctionFinished(replicationContext.replicaId))
case Close =>
ctx.log.info("Close")
require(shouldClose(setup, state))
// TODO send email (before or after persisting)
Effect.persist(WinnerDecided(replicationContext.replicaId, state.highestBid, state.highestCounterOffer))
case _: OfferBid =>
// auction finished, no more bids accepted
Effect.unhandled
}
case Running =>
command match {
case OfferBid(bidder, offer) =>
Effect.persist(
BidRegistered(
Bid(
bidder,
offer,
Instant.ofEpochMilli(replicationContext.currentTimeMillis()),
replicationContext.replicaId)))
case GetHighestBid(replyTo) =>
replyTo ! state.highestBid
Effect.none
case Finish =>
Effect.persist(AuctionFinished(replicationContext.replicaId))
case Close =>
ctx.log.warn("Premature close")
// Close should only be triggered when we have already finished
Effect.unhandled
case IsClosed(replyTo) =>
replyTo ! false
Effect.none
}
}
}
//#command-handler
private def shouldClose(auctionSetup: AuctionSetup, state: AuctionState): Boolean = {
auctionSetup.responsibleForClosing && (state.phase match {
case Closing(alreadyFinishedAtDc) =>
val allDone = auctionSetup.allReplicas.diff(alreadyFinishedAtDc).isEmpty
if (!allDone) {
println(
s"Not closing auction as not all DCs have reported finished. All DCs: ${auctionSetup.allReplicas}. Reported finished ${alreadyFinishedAtDc}")
}
allDone
case _ =>
false
})
}
//#event-handler
def eventHandler(ctx: ActorContext[AuctionCommand], replicationCtx: ReplicationContext, setup: AuctionSetup)(
state: AuctionState,
event: AuctionEvent): AuctionState = {
val newState = state.applyEvent(event)
ctx.log.infoN("Applying event {}. New start {}", event, newState)
if (!replicationCtx.recoveryRunning) {
eventTriggers(setup, ctx, replicationCtx, event, newState)
}
newState
}
//#event-handler
//#event-triggers
private def eventTriggers(
setup: AuctionSetup,
ctx: ActorContext[AuctionCommand],
replicationCtx: ReplicationContext,
event: AuctionEvent,
newState: AuctionState) = {
event match {
case finished: AuctionFinished =>
newState.phase match {
case Closing(alreadyFinishedAtDc) =>
ctx.log.infoN(
"AuctionFinished at {}, already finished at [{}]",
finished.atReplica,
alreadyFinishedAtDc.mkString(", "))
if (alreadyFinishedAtDc(replicationCtx.replicaId)) {
if (shouldClose(setup, newState)) ctx.self ! Close
} else {
ctx.log.info("Sending finish to self")
ctx.self ! Finish
}
case _ => // no trigger for this state
}
case _ => // no trigger for this event
}
}
//#event-triggers
def initialState(setup: AuctionSetup) =
AuctionState(phase = Running, highestBid = setup.initialBid, highestCounterOffer = setup.initialBid.offer)
def behavior(replica: ReplicaId, setup: AuctionSetup): Behavior[AuctionCommand] = Behaviors.setup[AuctionCommand] {
ctx =>
ReplicatedEventSourcing.withSharedJournal(
"auction",
setup.name,
replica,
setup.allReplicas,
PersistenceTestKitReadJournal.Identifier) { replicationCtx =>
EventSourcedBehavior(
replicationCtx.persistenceId,
initialState(setup),
commandHandler(setup, ctx, replicationCtx),
eventHandler(ctx, replicationCtx, setup))
private def behavior(initialBid: AuctionEntity.Bid): EventSourcedBehavior[Command, Event, AuctionState] =
EventSourcedBehavior(
replicationContext.persistenceId,
AuctionState(phase = Running, highestBid = initialBid, highestCounterOffer = initialBid.offer),
commandHandler,
eventHandler).receiveSignal {
case (state, RecoveryCompleted) => recoveryCompleted(state)
}
private def recoveryCompleted(state: AuctionState): Unit = {
if (shouldClose(state))
context.self ! Close
val millisUntilClosing = closingAt.toEpochMilli - replicationContext.currentTimeMillis()
timers.startSingleTimer(Finish, millisUntilClosing.millis)
}
//#setup
//#command-handler
def commandHandler(state: AuctionState, command: Command): Effect[Event, AuctionState] = {
state.phase match {
case Closing(_) | Closed =>
command match {
case GetHighestBid(replyTo) =>
replyTo ! state.highestBid.copy(offer = state.highestCounterOffer) // TODO this is not as described
Effect.none
case IsClosed(replyTo) =>
replyTo ! (state.phase == Closed)
Effect.none
case Finish =>
context.log.info("Finish")
Effect.persist(AuctionFinished(replicationContext.replicaId))
case Close =>
context.log.info("Close")
require(shouldClose(state))
// TODO send email (before or after persisting)
Effect.persist(WinnerDecided(replicationContext.replicaId, state.highestBid, state.highestCounterOffer))
case _: OfferBid =>
// auction finished, no more bids accepted
Effect.unhandled
}
case Running =>
command match {
case OfferBid(bidder, offer) =>
Effect.persist(
BidRegistered(
Bid(
bidder,
offer,
Instant.ofEpochMilli(replicationContext.currentTimeMillis()),
replicationContext.replicaId)))
case GetHighestBid(replyTo) =>
replyTo ! state.highestBid
Effect.none
case Finish =>
Effect.persist(AuctionFinished(replicationContext.replicaId))
case Close =>
context.log.warn("Premature close")
// Close should only be triggered when we have already finished
Effect.unhandled
case IsClosed(replyTo) =>
replyTo ! false
Effect.none
}
}
}
//#command-handler
//#event-handler
def eventHandler(state: AuctionState, event: Event): AuctionState = {
val newState = state.applyEvent(event)
context.log.infoN("Applying event {}. New start {}", event, newState)
if (!replicationContext.recoveryRunning) {
eventTriggers(event, newState)
}
newState
}
//#event-handler
//#event-triggers
private def eventTriggers(event: Event, newState: AuctionState): Unit = {
event match {
case finished: AuctionFinished =>
newState.phase match {
case Closing(alreadyFinishedAtDc) =>
context.log.infoN(
"AuctionFinished at {}, already finished at [{}]",
finished.atReplica,
alreadyFinishedAtDc.mkString(", "))
if (alreadyFinishedAtDc(replicationContext.replicaId)) {
if (shouldClose(newState)) context.self ! Close
} else {
context.log.info("Sending finish to self")
context.self ! Finish
}
case _ => // no trigger for this state
}
case _ => // no trigger for this event
}
}
private def shouldClose(state: AuctionState): Boolean = {
responsibleForClosing && (state.phase match {
case Closing(alreadyFinishedAtDc) =>
val allDone = allReplicas.diff(alreadyFinishedAtDc).isEmpty
if (!allDone) {
context.log.info2(
s"Not closing auction as not all DCs have reported finished. All DCs: {}. Reported finished {}",
allReplicas,
alreadyFinishedAtDc)
}
allDone
case _ =>
false
})
}
//#event-triggers
//#setup
}
//#setup
}
class ReplicatedAuctionExampleSpec
@ -240,24 +299,20 @@ class ReplicatedAuctionExampleSpec
with LogCapturing
with ScalaFutures
with Eventually {
import ReplicatedAuctionExampleSpec._
import ReplicatedAuctionExampleSpec.AuctionEntity._
"Auction example" should {
"work" in {
val Replicas = Set(ReplicaId("DC-A"), ReplicaId("DC-B"))
val setupA =
AuctionSetup(
"old-skis",
Bid("chbatey", 12, Instant.now(), ReplicaId("DC-A")),
Instant.now().plusSeconds(10),
responsibleForClosing = true,
Replicas)
val auctionName = "old-skis"
val initialBid = Bid("chbatey", 12, Instant.now(), ReplicaId("DC-A"))
val closingAt = Instant.now().plusSeconds(10)
val setupB = setupA.copy(responsibleForClosing = false)
val dcAReplica: ActorRef[AuctionCommand] = spawn(behavior(ReplicaId("DC-A"), setupA))
val dcBReplica: ActorRef[AuctionCommand] = spawn(behavior(ReplicaId("DC-B"), setupB))
val dcAReplica: ActorRef[Command] = spawn(
AuctionEntity(ReplicaId("DC-A"), auctionName, initialBid, closingAt, responsibleForClosing = true, Replicas))
val dcBReplica: ActorRef[Command] = spawn(
AuctionEntity(ReplicaId("DC-B"), auctionName, initialBid, closingAt, responsibleForClosing = false, Replicas))
dcAReplica ! OfferBid("me", 100)
dcAReplica ! OfferBid("me", 99)