diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing-examples.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing-examples.md index d7b49e92eb..e74119b2f2 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing-examples.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing-examples.md @@ -18,11 +18,17 @@ We model those operations as commands to be sent to the auction actor: Scala : @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #commands } +Java +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #commands } + The events: Scala : @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #events } +Java +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #events } + The winner does not have to pay the highest bid but only enough to beat the second highest so the `highestCounterOffer` is in the `AuctionFinished` event. Let's have a look at the auction entity that will handle incoming commands: @@ -30,6 +36,9 @@ Let's have a look at the auction entity that will handle incoming commands: Scala : @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #command-handler } +Java +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #command-handler } + There is nothing specific to Replicated Event Sourcing about the command handler. It is the same as a command handler for a standard `EventSourcedBehavior`. For `OfferBid` and `AuctionFinished` we do nothing more than to emit events corresponding to the command. For `GetHighestBid` we respond with details from the state. Note, that we overwrite the actual @@ -43,21 +52,29 @@ an `initialBid`. Scala : @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #setup } +Java +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #setup } + +@@@ div { .group-scala } The auction moves through the following phases: Scala : @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #phase } +@@@ + The closing and closed states are to model waiting for all replicas to see the result of the auction before actually closing the action. Let's have a look at our state class, `AuctionState` which also represents the CRDT in our example. - Scala : @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #state } +Java +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #state } + The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid, and the highest counter offer so far. @@ -97,6 +114,9 @@ In the event handler above, when recovery is not running, it calls `eventTrigger Scala : @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #event-triggers } +Java +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #event-triggers } + The event trigger uses the `ReplicationContext` to decide when to trigger the Finish of the action. When a replica saves the `AuctionFinished` event it checks whether it should close the auction. For the close to happen the replica must be the one designated to close and all replicas must have diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java new file mode 100644 index 0000000000..70d38cf566 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java @@ -0,0 +1,400 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.actor.testkit.typed.javadsl.LogCapturing; +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.persistence.testkit.PersistenceTestKitPlugin; +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal; +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.javadsl.*; +import akka.serialization.jackson.CborSerializable; +import com.fasterxml.jackson.annotation.JsonCreator; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.scalatestplus.junit.JUnitSuite; + +import java.time.Instant; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import static jdocs.akka.persistence.typed.ReplicatedAuctionExample.*; +import static org.junit.Assert.assertEquals; + +public class ReplicatedAuctionExampleTest extends JUnitSuite { + @ClassRule + public static final TestKitJunitResource testKit = + new TestKitJunitResource(PersistenceTestKitPlugin.getInstance().config()); + + @Rule public final LogCapturing logCapturing = new LogCapturing(); + + @Test + public void auctionExample() { + AuctionSetup setupA = + new AuctionSetup( + "old-skis", + new Bid("chbatey", 12, Instant.now(), R1), + Instant.now().plusSeconds(10), + true); + + AuctionSetup setupB = + new AuctionSetup( + "old-skis", + new Bid("chbatey", 12, Instant.now(), R1), + Instant.now().plusSeconds(10), + false); + + ActorRef replicaA = testKit.spawn(create(setupA, R1)); + ActorRef replicaB = testKit.spawn(create(setupA, R2)); + + replicaA.tell(new OfferBid("me", 100)); + replicaA.tell(new OfferBid("me", 99)); + replicaA.tell(new OfferBid("me", 202)); + + TestProbe replyProbe = testKit.createTestProbe(); + replyProbe.awaitAssert( + () -> { + replicaA.tell(new GetHighestBid(replyProbe.ref())); + Bid bid = replyProbe.expectMessageClass(Bid.class); + assertEquals(bid.offer, 202); + return bid; + }); + + replicaA.tell(Finish.INSTANCE); + + TestProbe finishProbe = testKit.createTestProbe(); + finishProbe.awaitAssert( + () -> { + replicaA.tell(new IsClosed(finishProbe.ref())); + return finishProbe.expectMessage(true); + }); + finishProbe.awaitAssert( + () -> { + replicaB.tell(new IsClosed(finishProbe.ref())); + return finishProbe.expectMessage(true); + }); + } +} + +class ReplicatedAuctionExample + extends ReplicatedEventSourcedBehavior { + + public static ReplicaId R1 = new ReplicaId("R1"); + public static ReplicaId R2 = new ReplicaId("R2"); + + public static Set ALL_REPLICAS = new HashSet<>(Arrays.asList(R1, R2)); + private final ActorContext context; + private final AuctionSetup setup; + + public static Behavior create(AuctionSetup setup, ReplicaId replica) { + return Behaviors.setup( + ctx -> + ReplicatedEventSourcing.withSharedJournal( + setup.name, + replica, + ALL_REPLICAS, + PersistenceTestKitReadJournal.Identifier(), + replicationCtx -> new ReplicatedAuctionExample(replicationCtx, ctx, setup))); + } + + public ReplicatedAuctionExample( + ReplicationContext replicationContext, ActorContext context, AuctionSetup setup) { + super(replicationContext); + this.context = context; + this.setup = setup; + } + + //#setup + static class AuctionSetup { + final String name; + final Bid initialBid; // the initial bid is the minimum price bidden at start time by the owner + final Instant closingAt; + final boolean responsibleForClosing; + + public AuctionSetup( + String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) { + this.name = name; + this.initialBid = initialBid; + this.closingAt = closingAt; + this.responsibleForClosing = responsibleForClosing; + } + } + //#setup + + public static final class Bid implements CborSerializable { + public final String bidder; + public final int offer; + public final Instant timestamp; + public final ReplicaId originReplica; + + public Bid(String bidder, int offer, Instant timestamp, ReplicaId originReplica) { + this.bidder = bidder; + this.offer = offer; + this.timestamp = timestamp; + this.originReplica = originReplica; + } + } + + //#commands + interface Command extends CborSerializable {} + public enum Finish implements Command { + INSTANCE + } + public static final class OfferBid implements Command { + public final String bidder; + public final int offer; + + public OfferBid(String bidder, int offer) { + this.bidder = bidder; + this.offer = offer; + } + } + public static final class GetHighestBid implements Command { + public final ActorRef replyTo; + + public GetHighestBid(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + public static final class IsClosed implements Command { + public final ActorRef replyTo; + public IsClosed(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + private enum Close implements Command { + INSTANCE + } + //#commands + + //#events + interface Event extends CborSerializable {} + + public static final class BidRegistered implements Event { + public final Bid bid; + + @JsonCreator + public BidRegistered(Bid bid) { + this.bid = bid; + } + } + + public static final class AuctionFinished implements Event { + public final ReplicaId atReplica; + + @JsonCreator + public AuctionFinished(ReplicaId atReplica) { + this.atReplica = atReplica; + } + } + + public static final class WinnerDecided implements Event { + public final ReplicaId atReplica; + public final Bid winningBid; + public final int amount; + + public WinnerDecided(ReplicaId atReplica, Bid winningBid, int amount) { + this.atReplica = atReplica; + this.winningBid = winningBid; + this.amount = amount; + } + } + //#events + + //#state + static class AuctionState implements CborSerializable { + + final boolean stillRunning; + final Bid highestBid; + final int highestCounterOffer; + final Set finishedAtDc; + + AuctionState( + boolean stillRunning, Bid highestBid, int highestCounterOffer, Set finishedAtDc) { + this.stillRunning = stillRunning; + this.highestBid = highestBid; + this.highestCounterOffer = highestCounterOffer; + this.finishedAtDc = finishedAtDc; + } + + AuctionState withNewHighestBid(Bid bid) { + assert (stillRunning); + assert (isHigherBid(bid, highestBid)); + return new AuctionState( + stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around + } + + AuctionState withTooLowBid(Bid bid) { + assert (stillRunning); + assert (isHigherBid(highestBid, bid)); + return new AuctionState( + stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc); + } + + static Boolean isHigherBid(Bid first, Bid second) { + return first.offer > second.offer + || (first.offer == second.offer && first.timestamp.isBefore(second.timestamp)) + || // if equal, first one wins + // If timestamps are equal, choose by dc where the offer was submitted + // In real auctions, this last comparison should be deterministic but unpredictable, so + // that submitting to a + // particular DC would not be an advantage. + (first.offer == second.offer + && first.timestamp.equals(second.timestamp) + && first.originReplica.id().compareTo(second.originReplica.id()) < 0); + } + + AuctionState addFinishedAtReplica(String replica) { + Set s = new HashSet<>(finishedAtDc); + s.add(replica); + return new AuctionState( + false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s)); + } + + public AuctionState close() { + return new AuctionState(false, highestBid, highestCounterOffer, Collections.emptySet()); + } + + public boolean isClosed() { + return !stillRunning && finishedAtDc.isEmpty(); + } + } + //#state + + @Override + public AuctionState emptyState() { + return new AuctionState(true, setup.initialBid, setup.initialBid.offer, Collections.emptySet()); + } + + //#command-handler + @Override + public CommandHandler commandHandler() { + + CommandHandlerBuilder builder = newCommandHandlerBuilder(); + + // running + builder + .forState(state -> state.stillRunning) + .onCommand( + OfferBid.class, + (state, bid) -> + Effect() + .persist( + new BidRegistered( + new Bid( + bid.bidder, + bid.offer, + Instant.ofEpochMilli( + this.getReplicationContext().currentTimeMillis()), + this.getReplicationContext().replicaId())))) + .onCommand( + GetHighestBid.class, + (state, get) -> { + get.replyTo.tell(state.highestBid); + return Effect().none(); + }) + .onCommand( + Finish.class, + (state, finish) -> + Effect().persist(new AuctionFinished(getReplicationContext().replicaId()))) + .onCommand(Close.class, (state, close) -> Effect().unhandled()) + .onCommand( + IsClosed.class, + (state, get) -> { + get.replyTo.tell(false); + return Effect().none(); + }); + + // finished + builder + .forAnyState() + .onCommand(OfferBid.class, (state, bid) -> Effect().unhandled()) + .onCommand( + GetHighestBid.class, + (state, get) -> { + get.replyTo.tell(state.highestBid); + return Effect().none(); + }) + .onCommand( + Finish.class, + (state, finish) -> + Effect().persist(new AuctionFinished(getReplicationContext().replicaId()))) + .onCommand( + Close.class, + (state, close) -> + Effect() + .persist( + new WinnerDecided( + getReplicationContext().replicaId(), + state.highestBid, + state.highestCounterOffer))) + .onCommand( + IsClosed.class, + (state, get) -> { + get.replyTo.tell(state.isClosed()); + return Effect().none(); + }); + + return builder.build(); + } + //#command-handler + + @Override + public EventHandler eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onEvent( + BidRegistered.class, + (state, event) -> { + if (AuctionState.isHigherBid(event.bid, state.highestBid)) { + return state.withNewHighestBid(event.bid); + } else { + return state.withTooLowBid(event.bid); + } + }) + .onEvent( + AuctionFinished.class, + (state, event) -> { + AuctionState newState = state.addFinishedAtReplica(event.atReplica.id()); + if (state.isClosed()) return state; // already closed + else if (!getReplicationContext().recoveryRunning()) { + eventTriggers(event, newState); + } + return newState; + }) + .onEvent(WinnerDecided.class, (state, event) -> state.close()) + .build(); + } + + //#event-triggers + private void eventTriggers(AuctionFinished event, AuctionState newState) { + if (newState.finishedAtDc.contains(getReplicationContext().replicaId().id())) { + if (shouldClose(newState)) { + context.getSelf().tell(Close.INSTANCE); + } + } else { + context.getSelf().tell(Finish.INSTANCE); + } + } + //#event-triggers + + private boolean shouldClose(AuctionState state) { + return setup.responsibleForClosing + && !state.isClosed() + && getReplicationContext().getAllReplicas().stream() + .map(ReplicaId::id) + .collect(Collectors.toSet()) + .equals(state.finishedAtDc); + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala index e0c4d408e2..f5aaa3f9b2 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala @@ -22,7 +22,7 @@ object ReplicatedAuctionExampleSpec { type MoneyAmount = Int - case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: ReplicaId) + case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originReplica: ReplicaId) //#commands sealed trait AuctionCommand @@ -89,8 +89,8 @@ object ReplicatedAuctionExampleSpec { // If timestamps are equal, choose by dc where the offer was submitted // In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a // particular DC would not be an advantage. - (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.id - .compareTo(second.originDc.id) < 0) + (first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originReplica.id + .compareTo(second.originReplica.id) < 0) } //#state diff --git a/build.sbt b/build.sbt index 87c6ce871a..0794926692 100644 --- a/build.sbt +++ b/build.sbt @@ -318,6 +318,7 @@ lazy val persistenceTypedTests = akkaModule("akka-persistence-typed-tests") .dependsOn(persistenceTyped, persistenceTestkit % "test", actorTestkitTyped % "test", jackson % "test->test") .settings(AkkaBuild.mayChangeSettings) .settings(Dependencies.persistenceTypedTests) + .settings(javacOptions += "-parameters") // for Jackson .disablePlugins(MimaPlugin) .enablePlugins(NoPublish)