Java auction example (#29443)

This commit is contained in:
Christopher Batey 2020-07-30 17:03:46 +01:00
parent 742352caba
commit c945fbd7a1
4 changed files with 425 additions and 4 deletions

View file

@ -18,11 +18,17 @@ We model those operations as commands to be sent to the auction actor:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #commands }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #commands }
The events:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #events }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #events }
The winner does not have to pay the highest bid but only enough to beat the second highest so the `highestCounterOffer` is in the `AuctionFinished` event.
Let's have a look at the auction entity that will handle incoming commands:
@ -30,6 +36,9 @@ Let's have a look at the auction entity that will handle incoming commands:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #command-handler }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #command-handler }
There is nothing specific to Replicated Event Sourcing about the command handler. It is the same as a command handler for a standard `EventSourcedBehavior`.
For `OfferBid` and `AuctionFinished` we do nothing more than to emit
events corresponding to the command. For `GetHighestBid` we respond with details from the state. Note, that we overwrite the actual
@ -43,21 +52,29 @@ an `initialBid`.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #setup }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #setup }
@@@ div { .group-scala }
The auction moves through the following phases:
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #phase }
@@@
The closing and closed states are to model waiting for all replicas to see the result of the auction before
actually closing the action.
Let's have a look at our state class, `AuctionState` which also represents the CRDT in our example.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #state }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #state }
The state consists of a flag that keeps track of whether the auction is still active, the currently highest bid,
and the highest counter offer so far.
@ -97,6 +114,9 @@ In the event handler above, when recovery is not running, it calls `eventTrigger
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala) { #event-triggers }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedAuctionExampleTest.java) { #event-triggers }
The event trigger uses the `ReplicationContext` to decide when to trigger the Finish of the action.
When a replica saves the `AuctionFinished` event it checks whether it should close the auction.
For the close to happen the replica must be the one designated to close and all replicas must have

View file

@ -0,0 +1,400 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.actor.testkit.typed.javadsl.LogCapturing;
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
import akka.actor.testkit.typed.javadsl.TestProbe;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.testkit.PersistenceTestKitPlugin;
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.javadsl.*;
import akka.serialization.jackson.CborSerializable;
import com.fasterxml.jackson.annotation.JsonCreator;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.scalatestplus.junit.JUnitSuite;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import static jdocs.akka.persistence.typed.ReplicatedAuctionExample.*;
import static org.junit.Assert.assertEquals;
public class ReplicatedAuctionExampleTest extends JUnitSuite {
@ClassRule
public static final TestKitJunitResource testKit =
new TestKitJunitResource(PersistenceTestKitPlugin.getInstance().config());
@Rule public final LogCapturing logCapturing = new LogCapturing();
@Test
public void auctionExample() {
AuctionSetup setupA =
new AuctionSetup(
"old-skis",
new Bid("chbatey", 12, Instant.now(), R1),
Instant.now().plusSeconds(10),
true);
AuctionSetup setupB =
new AuctionSetup(
"old-skis",
new Bid("chbatey", 12, Instant.now(), R1),
Instant.now().plusSeconds(10),
false);
ActorRef<Command> replicaA = testKit.spawn(create(setupA, R1));
ActorRef<Command> replicaB = testKit.spawn(create(setupA, R2));
replicaA.tell(new OfferBid("me", 100));
replicaA.tell(new OfferBid("me", 99));
replicaA.tell(new OfferBid("me", 202));
TestProbe<Bid> replyProbe = testKit.createTestProbe();
replyProbe.awaitAssert(
() -> {
replicaA.tell(new GetHighestBid(replyProbe.ref()));
Bid bid = replyProbe.expectMessageClass(Bid.class);
assertEquals(bid.offer, 202);
return bid;
});
replicaA.tell(Finish.INSTANCE);
TestProbe<Boolean> finishProbe = testKit.createTestProbe();
finishProbe.awaitAssert(
() -> {
replicaA.tell(new IsClosed(finishProbe.ref()));
return finishProbe.expectMessage(true);
});
finishProbe.awaitAssert(
() -> {
replicaB.tell(new IsClosed(finishProbe.ref()));
return finishProbe.expectMessage(true);
});
}
}
class ReplicatedAuctionExample
extends ReplicatedEventSourcedBehavior<Command, Event, AuctionState> {
public static ReplicaId R1 = new ReplicaId("R1");
public static ReplicaId R2 = new ReplicaId("R2");
public static Set<ReplicaId> ALL_REPLICAS = new HashSet<>(Arrays.asList(R1, R2));
private final ActorContext<Command> context;
private final AuctionSetup setup;
public static Behavior<Command> create(AuctionSetup setup, ReplicaId replica) {
return Behaviors.setup(
ctx ->
ReplicatedEventSourcing.withSharedJournal(
setup.name,
replica,
ALL_REPLICAS,
PersistenceTestKitReadJournal.Identifier(),
replicationCtx -> new ReplicatedAuctionExample(replicationCtx, ctx, setup)));
}
public ReplicatedAuctionExample(
ReplicationContext replicationContext, ActorContext<Command> context, AuctionSetup setup) {
super(replicationContext);
this.context = context;
this.setup = setup;
}
//#setup
static class AuctionSetup {
final String name;
final Bid initialBid; // the initial bid is the minimum price bidden at start time by the owner
final Instant closingAt;
final boolean responsibleForClosing;
public AuctionSetup(
String name, Bid initialBid, Instant closingAt, boolean responsibleForClosing) {
this.name = name;
this.initialBid = initialBid;
this.closingAt = closingAt;
this.responsibleForClosing = responsibleForClosing;
}
}
//#setup
public static final class Bid implements CborSerializable {
public final String bidder;
public final int offer;
public final Instant timestamp;
public final ReplicaId originReplica;
public Bid(String bidder, int offer, Instant timestamp, ReplicaId originReplica) {
this.bidder = bidder;
this.offer = offer;
this.timestamp = timestamp;
this.originReplica = originReplica;
}
}
//#commands
interface Command extends CborSerializable {}
public enum Finish implements Command {
INSTANCE
}
public static final class OfferBid implements Command {
public final String bidder;
public final int offer;
public OfferBid(String bidder, int offer) {
this.bidder = bidder;
this.offer = offer;
}
}
public static final class GetHighestBid implements Command {
public final ActorRef<Bid> replyTo;
public GetHighestBid(ActorRef<Bid> replyTo) {
this.replyTo = replyTo;
}
}
public static final class IsClosed implements Command {
public final ActorRef<Boolean> replyTo;
public IsClosed(ActorRef<Boolean> replyTo) {
this.replyTo = replyTo;
}
}
private enum Close implements Command {
INSTANCE
}
//#commands
//#events
interface Event extends CborSerializable {}
public static final class BidRegistered implements Event {
public final Bid bid;
@JsonCreator
public BidRegistered(Bid bid) {
this.bid = bid;
}
}
public static final class AuctionFinished implements Event {
public final ReplicaId atReplica;
@JsonCreator
public AuctionFinished(ReplicaId atReplica) {
this.atReplica = atReplica;
}
}
public static final class WinnerDecided implements Event {
public final ReplicaId atReplica;
public final Bid winningBid;
public final int amount;
public WinnerDecided(ReplicaId atReplica, Bid winningBid, int amount) {
this.atReplica = atReplica;
this.winningBid = winningBid;
this.amount = amount;
}
}
//#events
//#state
static class AuctionState implements CborSerializable {
final boolean stillRunning;
final Bid highestBid;
final int highestCounterOffer;
final Set<String> finishedAtDc;
AuctionState(
boolean stillRunning, Bid highestBid, int highestCounterOffer, Set<String> finishedAtDc) {
this.stillRunning = stillRunning;
this.highestBid = highestBid;
this.highestCounterOffer = highestCounterOffer;
this.finishedAtDc = finishedAtDc;
}
AuctionState withNewHighestBid(Bid bid) {
assert (stillRunning);
assert (isHigherBid(bid, highestBid));
return new AuctionState(
stillRunning, bid, highestBid.offer, finishedAtDc); // keep last highest bid around
}
AuctionState withTooLowBid(Bid bid) {
assert (stillRunning);
assert (isHigherBid(highestBid, bid));
return new AuctionState(
stillRunning, highestBid, Math.max(highestCounterOffer, bid.offer), finishedAtDc);
}
static Boolean isHigherBid(Bid first, Bid second) {
return first.offer > second.offer
|| (first.offer == second.offer && first.timestamp.isBefore(second.timestamp))
|| // if equal, first one wins
// If timestamps are equal, choose by dc where the offer was submitted
// In real auctions, this last comparison should be deterministic but unpredictable, so
// that submitting to a
// particular DC would not be an advantage.
(first.offer == second.offer
&& first.timestamp.equals(second.timestamp)
&& first.originReplica.id().compareTo(second.originReplica.id()) < 0);
}
AuctionState addFinishedAtReplica(String replica) {
Set<String> s = new HashSet<>(finishedAtDc);
s.add(replica);
return new AuctionState(
false, highestBid, highestCounterOffer, Collections.unmodifiableSet(s));
}
public AuctionState close() {
return new AuctionState(false, highestBid, highestCounterOffer, Collections.emptySet());
}
public boolean isClosed() {
return !stillRunning && finishedAtDc.isEmpty();
}
}
//#state
@Override
public AuctionState emptyState() {
return new AuctionState(true, setup.initialBid, setup.initialBid.offer, Collections.emptySet());
}
//#command-handler
@Override
public CommandHandler<Command, Event, AuctionState> commandHandler() {
CommandHandlerBuilder<Command, Event, AuctionState> builder = newCommandHandlerBuilder();
// running
builder
.forState(state -> state.stillRunning)
.onCommand(
OfferBid.class,
(state, bid) ->
Effect()
.persist(
new BidRegistered(
new Bid(
bid.bidder,
bid.offer,
Instant.ofEpochMilli(
this.getReplicationContext().currentTimeMillis()),
this.getReplicationContext().replicaId()))))
.onCommand(
GetHighestBid.class,
(state, get) -> {
get.replyTo.tell(state.highestBid);
return Effect().none();
})
.onCommand(
Finish.class,
(state, finish) ->
Effect().persist(new AuctionFinished(getReplicationContext().replicaId())))
.onCommand(Close.class, (state, close) -> Effect().unhandled())
.onCommand(
IsClosed.class,
(state, get) -> {
get.replyTo.tell(false);
return Effect().none();
});
// finished
builder
.forAnyState()
.onCommand(OfferBid.class, (state, bid) -> Effect().unhandled())
.onCommand(
GetHighestBid.class,
(state, get) -> {
get.replyTo.tell(state.highestBid);
return Effect().none();
})
.onCommand(
Finish.class,
(state, finish) ->
Effect().persist(new AuctionFinished(getReplicationContext().replicaId())))
.onCommand(
Close.class,
(state, close) ->
Effect()
.persist(
new WinnerDecided(
getReplicationContext().replicaId(),
state.highestBid,
state.highestCounterOffer)))
.onCommand(
IsClosed.class,
(state, get) -> {
get.replyTo.tell(state.isClosed());
return Effect().none();
});
return builder.build();
}
//#command-handler
@Override
public EventHandler<AuctionState, Event> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onEvent(
BidRegistered.class,
(state, event) -> {
if (AuctionState.isHigherBid(event.bid, state.highestBid)) {
return state.withNewHighestBid(event.bid);
} else {
return state.withTooLowBid(event.bid);
}
})
.onEvent(
AuctionFinished.class,
(state, event) -> {
AuctionState newState = state.addFinishedAtReplica(event.atReplica.id());
if (state.isClosed()) return state; // already closed
else if (!getReplicationContext().recoveryRunning()) {
eventTriggers(event, newState);
}
return newState;
})
.onEvent(WinnerDecided.class, (state, event) -> state.close())
.build();
}
//#event-triggers
private void eventTriggers(AuctionFinished event, AuctionState newState) {
if (newState.finishedAtDc.contains(getReplicationContext().replicaId().id())) {
if (shouldClose(newState)) {
context.getSelf().tell(Close.INSTANCE);
}
} else {
context.getSelf().tell(Finish.INSTANCE);
}
}
//#event-triggers
private boolean shouldClose(AuctionState state) {
return setup.responsibleForClosing
&& !state.isClosed()
&& getReplicationContext().getAllReplicas().stream()
.map(ReplicaId::id)
.collect(Collectors.toSet())
.equals(state.finishedAtDc);
}
}

View file

@ -22,7 +22,7 @@ object ReplicatedAuctionExampleSpec {
type MoneyAmount = Int
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originDc: ReplicaId)
case class Bid(bidder: String, offer: MoneyAmount, timestamp: Instant, originReplica: ReplicaId)
//#commands
sealed trait AuctionCommand
@ -89,8 +89,8 @@ object ReplicatedAuctionExampleSpec {
// If timestamps are equal, choose by dc where the offer was submitted
// In real auctions, this last comparison should be deterministic but unpredictable, so that submitting to a
// particular DC would not be an advantage.
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originDc.id
.compareTo(second.originDc.id) < 0)
(first.offer == second.offer && first.timestamp.equals(second.timestamp) && first.originReplica.id
.compareTo(second.originReplica.id) < 0)
}
//#state

View file

@ -318,6 +318,7 @@ lazy val persistenceTypedTests = akkaModule("akka-persistence-typed-tests")
.dependsOn(persistenceTyped, persistenceTestkit % "test", actorTestkitTyped % "test", jackson % "test->test")
.settings(AkkaBuild.mayChangeSettings)
.settings(Dependencies.persistenceTypedTests)
.settings(javacOptions += "-parameters") // for Jackson
.disablePlugins(MimaPlugin)
.enablePlugins(NoPublish)