Remove ExpectingReply (#27781)
* remove ExpectingReply * call method instead of private field * re-format comments * formatting * remove obsolete comment * remove replyTo() * remove getters in commands, use immutable fields instead * Added migration note
This commit is contained in:
parent
b94f3c3bcc
commit
2bab0d8dcd
24 changed files with 283 additions and 458 deletions
|
|
@ -85,18 +85,13 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
INSTANCE
|
||||
}
|
||||
|
||||
public static class IncrementWithConfirmation implements Command, ExpectingReply<Done> {
|
||||
private final ActorRef<Done> replyTo;
|
||||
public static class IncrementWithConfirmation implements Command {
|
||||
public final ActorRef<Done> replyTo;
|
||||
|
||||
@JsonCreator
|
||||
public IncrementWithConfirmation(ActorRef<Done> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActorRef<Done> replyTo() {
|
||||
return replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
public enum StopThenLog implements Command {
|
||||
|
|
@ -107,18 +102,13 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
INSTANCE
|
||||
}
|
||||
|
||||
public static class GetValue implements Command, ExpectingReply<State> {
|
||||
private final ActorRef<State> replyTo;
|
||||
public static class GetValue implements Command {
|
||||
public final ActorRef<State> replyTo;
|
||||
|
||||
@JsonCreator
|
||||
public GetValue(ActorRef<State> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActorRef<State> replyTo() {
|
||||
return replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
// need the JsonTypeInfo because of the Wrapper
|
||||
|
|
@ -233,11 +223,11 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
|
||||
private ReplyEffect<Event, State> incrementWithConfirmation(
|
||||
State state, IncrementWithConfirmation command) {
|
||||
return Effect().persist(new Incremented(1)).thenReply(command, newState -> done());
|
||||
return Effect().persist(new Incremented(1)).thenReply(command.replyTo, newState -> done());
|
||||
}
|
||||
|
||||
private ReplyEffect<Event, State> getValue(State state, GetValue command) {
|
||||
return Effect().reply(command, state);
|
||||
return Effect().reply(command.replyTo, state);
|
||||
}
|
||||
|
||||
private Effect<Event, State> incrementLater(State state, IncrementLater command) {
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ package jdocs.akka.persistence.typed.auction;
|
|||
|
||||
import akka.Done;
|
||||
import akka.actor.typed.ActorRef;
|
||||
import akka.persistence.typed.ExpectingReply;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
|
|
@ -14,68 +13,40 @@ import java.util.UUID;
|
|||
public interface AuctionCommand {
|
||||
|
||||
/** Start the auction. */
|
||||
final class StartAuction implements AuctionCommand, ExpectingReply<Done> {
|
||||
final class StartAuction implements AuctionCommand {
|
||||
|
||||
/** The auction to start. */
|
||||
private final Auction auction;
|
||||
public final Auction auction;
|
||||
|
||||
private final ActorRef<Done> replyTo;
|
||||
public final ActorRef<Done> replyTo;
|
||||
|
||||
public StartAuction(Auction auction, ActorRef<Done> replyTo) {
|
||||
this.auction = auction;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActorRef<Done> replyTo() {
|
||||
return replyTo;
|
||||
}
|
||||
|
||||
public Auction getAuction() {
|
||||
return auction;
|
||||
}
|
||||
}
|
||||
|
||||
/** Cancel the auction. */
|
||||
final class CancelAuction implements AuctionCommand, ExpectingReply<Done> {
|
||||
private final ActorRef<Done> replyTo;
|
||||
final class CancelAuction implements AuctionCommand {
|
||||
public final ActorRef<Done> replyTo;
|
||||
|
||||
public CancelAuction(ActorRef<Done> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActorRef<Done> replyTo() {
|
||||
return replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
/** Place a bid on the auction. */
|
||||
final class PlaceBid implements AuctionCommand, ExpectingReply<PlaceBidReply> {
|
||||
final class PlaceBid implements AuctionCommand {
|
||||
|
||||
private final int bidPrice;
|
||||
private final UUID bidder;
|
||||
|
||||
private final ActorRef<PlaceBidReply> replyTo;
|
||||
public final int bidPrice;
|
||||
public final UUID bidder;
|
||||
public final ActorRef<PlaceBidReply> replyTo;
|
||||
|
||||
public PlaceBid(int bidPrice, UUID bidder, ActorRef<PlaceBidReply> replyTo) {
|
||||
this.bidPrice = bidPrice;
|
||||
this.bidder = bidder;
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActorRef<PlaceBidReply> replyTo() {
|
||||
return replyTo;
|
||||
}
|
||||
|
||||
public int getBidPrice() {
|
||||
return bidPrice;
|
||||
}
|
||||
|
||||
public UUID getBidder() {
|
||||
return bidder;
|
||||
}
|
||||
}
|
||||
|
||||
interface PlaceBidReply {}
|
||||
|
|
@ -167,31 +138,21 @@ public interface AuctionCommand {
|
|||
}
|
||||
|
||||
/** Finish bidding. */
|
||||
final class FinishBidding implements AuctionCommand, ExpectingReply<Done> {
|
||||
final class FinishBidding implements AuctionCommand {
|
||||
|
||||
private final ActorRef<Done> replyTo;
|
||||
public final ActorRef<Done> replyTo;
|
||||
|
||||
FinishBidding(ActorRef<Done> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActorRef<Done> replyTo() {
|
||||
return replyTo;
|
||||
}
|
||||
}
|
||||
|
||||
/** Get the auction. */
|
||||
final class GetAuction implements AuctionCommand, ExpectingReply<AuctionState> {
|
||||
private final ActorRef<AuctionState> replyTo;
|
||||
final class GetAuction implements AuctionCommand {
|
||||
public final ActorRef<AuctionState> replyTo;
|
||||
|
||||
public GetAuction(ActorRef<AuctionState> replyTo) {
|
||||
this.replyTo = replyTo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActorRef<AuctionState> replyTo() {
|
||||
return replyTo;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
package jdocs.akka.persistence.typed.auction;
|
||||
|
||||
import akka.Done;
|
||||
import akka.persistence.typed.ExpectingReply;
|
||||
import akka.persistence.typed.PersistenceId;
|
||||
import akka.persistence.typed.javadsl.*;
|
||||
|
||||
|
|
@ -16,6 +15,7 @@ import java.time.Instant;
|
|||
import java.util.Arrays;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import akka.actor.typed.ActorRef;
|
||||
|
||||
/**
|
||||
* Based on
|
||||
|
|
@ -27,7 +27,6 @@ public class AuctionEntity
|
|||
private final UUID entityUUID;
|
||||
|
||||
public AuctionEntity(String entityId) {
|
||||
// when used with Cluster Sharding this should use EntityTypeKey, or PersistentEntity
|
||||
super(new PersistenceId("Auction|" + entityId));
|
||||
this.entityUUID = UUID.fromString(entityId);
|
||||
}
|
||||
|
|
@ -42,7 +41,7 @@ public class AuctionEntity
|
|||
.onCommand(
|
||||
PlaceBid.class,
|
||||
(state, cmd) ->
|
||||
Effect().reply(cmd, createResult(state, PlaceBidStatus.NOT_STARTED)));
|
||||
Effect().reply(cmd.replyTo, createResult(state, PlaceBidStatus.NOT_STARTED)));
|
||||
|
||||
// Command handler for the under auction state.
|
||||
private CommandHandlerWithReplyBuilderByState<
|
||||
|
|
@ -50,7 +49,7 @@ public class AuctionEntity
|
|||
underAuctionHandler =
|
||||
newCommandHandlerWithReplyBuilder()
|
||||
.forState(state -> state.getStatus() == AuctionStatus.UNDER_AUCTION)
|
||||
.onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd))
|
||||
.onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd.replyTo))
|
||||
.onCommand(PlaceBid.class, this::placeBid)
|
||||
.onCommand(FinishBidding.class, this::finishBidding);
|
||||
|
||||
|
|
@ -60,12 +59,12 @@ public class AuctionEntity
|
|||
completedHandler =
|
||||
newCommandHandlerWithReplyBuilder()
|
||||
.forState(state -> state.getStatus() == AuctionStatus.COMPLETE)
|
||||
.onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd))
|
||||
.onCommand(FinishBidding.class, (state, cmd) -> alreadyDone(cmd))
|
||||
.onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd.replyTo))
|
||||
.onCommand(FinishBidding.class, (state, cmd) -> alreadyDone(cmd.replyTo))
|
||||
.onCommand(
|
||||
PlaceBid.class,
|
||||
(state, cmd) ->
|
||||
Effect().reply(cmd, createResult(state, PlaceBidStatus.FINISHED)));
|
||||
Effect().reply(cmd.replyTo, createResult(state, PlaceBidStatus.FINISHED)));
|
||||
|
||||
// Command handler for the cancelled state.
|
||||
private CommandHandlerWithReplyBuilderByState<
|
||||
|
|
@ -73,20 +72,20 @@ public class AuctionEntity
|
|||
cancelledHandler =
|
||||
newCommandHandlerWithReplyBuilder()
|
||||
.forState(state -> state.getStatus() == AuctionStatus.CANCELLED)
|
||||
.onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd))
|
||||
.onCommand(FinishBidding.class, (state, cmd) -> alreadyDone(cmd))
|
||||
.onCommand(CancelAuction.class, (state, cmd) -> alreadyDone(cmd))
|
||||
.onCommand(StartAuction.class, (state, cmd) -> alreadyDone(cmd.replyTo))
|
||||
.onCommand(FinishBidding.class, (state, cmd) -> alreadyDone(cmd.replyTo))
|
||||
.onCommand(CancelAuction.class, (state, cmd) -> alreadyDone(cmd.replyTo))
|
||||
.onCommand(
|
||||
PlaceBid.class,
|
||||
(state, cmd) ->
|
||||
Effect().reply(cmd, createResult(state, PlaceBidStatus.CANCELLED)));
|
||||
Effect().reply(cmd.replyTo, createResult(state, PlaceBidStatus.CANCELLED)));
|
||||
|
||||
private CommandHandlerWithReplyBuilderByState<
|
||||
AuctionCommand, AuctionEvent, AuctionState, AuctionState>
|
||||
getAuctionHandler =
|
||||
newCommandHandlerWithReplyBuilder()
|
||||
.forStateType(AuctionState.class)
|
||||
.onCommand(GetAuction.class, (state, cmd) -> Effect().reply(cmd, state));
|
||||
.onCommand(GetAuction.class, (state, cmd) -> Effect().reply(cmd.replyTo, state));
|
||||
|
||||
private CommandHandlerBuilderByState<AuctionCommand, AuctionEvent, AuctionState, AuctionState>
|
||||
cancelHandler =
|
||||
|
|
@ -95,28 +94,28 @@ public class AuctionEntity
|
|||
.onCommand(CancelAuction.class, this::cancelAuction);
|
||||
// Note, an item can go from completed to cancelled, since it is the item service that controls
|
||||
// whether an auction is cancelled or not. If it cancels before it receives a bidding finished
|
||||
// event from us, it will ignore the bidding finished event, so we need to update our state
|
||||
// to reflect that.
|
||||
// event from us, it will ignore the bidding finished event, so we need to update our state to
|
||||
// reflect that.
|
||||
|
||||
private ReplyEffect<AuctionEvent, AuctionState> startAuction(
|
||||
AuctionState state, StartAuction cmd) {
|
||||
return Effect()
|
||||
.persist(new AuctionStarted(entityUUID, cmd.getAuction()))
|
||||
.thenReply(cmd, notUsed -> Done.getInstance());
|
||||
.persist(new AuctionStarted(entityUUID, cmd.auction))
|
||||
.thenReply(cmd.replyTo, notUsed -> Done.getInstance());
|
||||
}
|
||||
|
||||
private ReplyEffect<AuctionEvent, AuctionState> finishBidding(
|
||||
AuctionState state, FinishBidding cmd) {
|
||||
return Effect()
|
||||
.persist(new BiddingFinished(entityUUID))
|
||||
.thenReply(cmd, notUsed -> Done.getInstance());
|
||||
.thenReply(cmd.replyTo, notUsed -> Done.getInstance());
|
||||
}
|
||||
|
||||
private ReplyEffect<AuctionEvent, AuctionState> cancelAuction(
|
||||
AuctionState state, CancelAuction cmd) {
|
||||
return Effect()
|
||||
.persist(new AuctionCancelled(entityUUID))
|
||||
.thenReply(cmd, notUsed -> Done.getInstance());
|
||||
.thenReply(cmd.replyTo, notUsed -> Done.getInstance());
|
||||
}
|
||||
|
||||
/** The main logic for handling of bids. */
|
||||
|
|
@ -127,12 +126,14 @@ public class AuctionEntity
|
|||
|
||||
// Even though we're not in the finished state yet, we should check
|
||||
if (auction.getEndTime().isBefore(now)) {
|
||||
return Effect().reply(bid, createResult(state, PlaceBidStatus.FINISHED));
|
||||
return Effect().reply(bid.replyTo, createResult(state, PlaceBidStatus.FINISHED));
|
||||
}
|
||||
|
||||
if (auction.getCreator().equals(bid.getBidder())) {
|
||||
if (auction.getCreator().equals(bid.bidder)) {
|
||||
return Effect()
|
||||
.reply(bid, new PlaceBidRejected("An auctions creator cannot bid in their own auction."));
|
||||
.reply(
|
||||
bid.replyTo,
|
||||
new PlaceBidRejected("An auctions creator cannot bid in their own auction."));
|
||||
}
|
||||
|
||||
Optional<Bid> currentBid = state.lastBid();
|
||||
|
|
@ -147,13 +148,13 @@ public class AuctionEntity
|
|||
}
|
||||
|
||||
boolean bidderIsCurrentBidder =
|
||||
currentBid.filter(b -> b.getBidder().equals(bid.getBidder())).isPresent();
|
||||
currentBid.filter(b -> b.getBidder().equals(bid.bidder)).isPresent();
|
||||
|
||||
if (bidderIsCurrentBidder && bid.getBidPrice() >= currentBidPrice) {
|
||||
if (bidderIsCurrentBidder && bid.bidPrice >= currentBidPrice) {
|
||||
// Allow the current bidder to update their bid
|
||||
if (auction.getReservePrice() > currentBidPrice) {
|
||||
|
||||
int newBidPrice = Math.min(auction.getReservePrice(), bid.getBidPrice());
|
||||
int newBidPrice = Math.min(auction.getReservePrice(), bid.bidPrice);
|
||||
PlaceBidStatus placeBidStatus;
|
||||
|
||||
if (newBidPrice == auction.getReservePrice()) {
|
||||
|
|
@ -162,25 +163,22 @@ public class AuctionEntity
|
|||
placeBidStatus = PlaceBidStatus.ACCEPTED_BELOW_RESERVE;
|
||||
}
|
||||
return Effect()
|
||||
.persist(
|
||||
new BidPlaced(
|
||||
entityUUID, new Bid(bid.getBidder(), now, newBidPrice, bid.getBidPrice())))
|
||||
.persist(new BidPlaced(entityUUID, new Bid(bid.bidder, now, newBidPrice, bid.bidPrice)))
|
||||
.thenReply(
|
||||
bid, newState -> new PlaceBidResult(placeBidStatus, newBidPrice, bid.getBidder()));
|
||||
bid.replyTo,
|
||||
newState -> new PlaceBidResult(placeBidStatus, newBidPrice, bid.bidder));
|
||||
}
|
||||
return Effect()
|
||||
.persist(
|
||||
new BidPlaced(
|
||||
entityUUID, new Bid(bid.getBidder(), now, currentBidPrice, bid.getBidPrice())))
|
||||
new BidPlaced(entityUUID, new Bid(bid.bidder, now, currentBidPrice, bid.bidPrice)))
|
||||
.thenReply(
|
||||
bid,
|
||||
newState ->
|
||||
new PlaceBidResult(PlaceBidStatus.ACCEPTED, currentBidPrice, bid.getBidder()));
|
||||
bid.replyTo,
|
||||
newState -> new PlaceBidResult(PlaceBidStatus.ACCEPTED, currentBidPrice, bid.bidder));
|
||||
}
|
||||
|
||||
if (bid.getBidPrice() < currentBidPrice + auction.getIncrement()) {
|
||||
return Effect().reply(bid, createResult(state, PlaceBidStatus.TOO_LOW));
|
||||
} else if (bid.getBidPrice() <= currentBidMaximum) {
|
||||
if (bid.bidPrice < currentBidPrice + auction.getIncrement()) {
|
||||
return Effect().reply(bid.replyTo, createResult(state, PlaceBidStatus.TOO_LOW));
|
||||
} else if (bid.bidPrice <= currentBidMaximum) {
|
||||
return handleAutomaticOutbid(
|
||||
bid, auction, now, currentBid, currentBidPrice, currentBidMaximum);
|
||||
} else {
|
||||
|
|
@ -202,21 +200,20 @@ public class AuctionEntity
|
|||
Optional<Bid> currentBid,
|
||||
int currentBidPrice,
|
||||
int currentBidMaximum) {
|
||||
// Adjust the bid so that the increment for the current maximum makes the current maximum a
|
||||
// valid bid
|
||||
int adjustedBidPrice = Math.min(bid.getBidPrice(), currentBidMaximum - auction.getIncrement());
|
||||
// Adjust the bid so that the increment for the current maximum makes the
|
||||
// current maximum a valid bid
|
||||
int adjustedBidPrice = Math.min(bid.bidPrice, currentBidMaximum - auction.getIncrement());
|
||||
int newBidPrice = adjustedBidPrice + auction.getIncrement();
|
||||
|
||||
return Effect()
|
||||
.persist(
|
||||
Arrays.asList(
|
||||
new BidPlaced(
|
||||
entityUUID, new Bid(bid.getBidder(), now, adjustedBidPrice, bid.getBidPrice())),
|
||||
new BidPlaced(entityUUID, new Bid(bid.bidder, now, adjustedBidPrice, bid.bidPrice)),
|
||||
new BidPlaced(
|
||||
entityUUID,
|
||||
new Bid(currentBid.get().getBidder(), now, newBidPrice, currentBidMaximum))))
|
||||
.thenReply(
|
||||
bid,
|
||||
bid.replyTo,
|
||||
newState ->
|
||||
new PlaceBidResult(
|
||||
PlaceBidStatus.ACCEPTED_OUTBID, newBidPrice, currentBid.get().getBidder()));
|
||||
|
|
@ -225,19 +222,17 @@ public class AuctionEntity
|
|||
/** Handle the situation where a bid will be accepted as the new winning bidder. */
|
||||
private ReplyEffect<AuctionEvent, AuctionState> handleNewWinningBidder(
|
||||
PlaceBid bid, Auction auction, Instant now, int currentBidMaximum) {
|
||||
int nextIncrement = Math.min(currentBidMaximum + auction.getIncrement(), bid.getBidPrice());
|
||||
int nextIncrement = Math.min(currentBidMaximum + auction.getIncrement(), bid.bidPrice);
|
||||
int newBidPrice;
|
||||
if (nextIncrement < auction.getReservePrice()) {
|
||||
newBidPrice = Math.min(auction.getReservePrice(), bid.getBidPrice());
|
||||
newBidPrice = Math.min(auction.getReservePrice(), bid.bidPrice);
|
||||
} else {
|
||||
newBidPrice = nextIncrement;
|
||||
}
|
||||
return Effect()
|
||||
.persist(
|
||||
new BidPlaced(
|
||||
entityUUID, new Bid(bid.getBidder(), now, newBidPrice, bid.getBidPrice())))
|
||||
.persist(new BidPlaced(entityUUID, new Bid(bid.bidder, now, newBidPrice, bid.bidPrice)))
|
||||
.thenReply(
|
||||
bid,
|
||||
bid.replyTo,
|
||||
newState -> {
|
||||
PlaceBidStatus status;
|
||||
if (newBidPrice < auction.getReservePrice()) {
|
||||
|
|
@ -245,7 +240,7 @@ public class AuctionEntity
|
|||
} else {
|
||||
status = PlaceBidStatus.ACCEPTED;
|
||||
}
|
||||
return new PlaceBidResult(status, newBidPrice, bid.getBidder());
|
||||
return new PlaceBidResult(status, newBidPrice, bid.bidder);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -292,7 +287,7 @@ public class AuctionEntity
|
|||
}
|
||||
}
|
||||
|
||||
private ReplyEffect<AuctionEvent, AuctionState> alreadyDone(ExpectingReply<Done> cmd) {
|
||||
return Effect().reply(cmd, Done.getInstance());
|
||||
private ReplyEffect<AuctionEvent, AuctionState> alreadyDone(ActorRef<Done> replyTo) {
|
||||
return Effect().reply(replyTo, Done.getInstance());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@ import akka.actor.typed.ActorRef
|
|||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.scaladsl.ActorContext
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.persistence.typed.ExpectingReply
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.serialization.jackson.CborSerializable
|
||||
import com.typesafe.config.Config
|
||||
|
|
@ -30,10 +29,10 @@ object EventSourcedBehaviorReplySpec {
|
|||
akka.persistence.snapshot-store.local.dir = "target/typed-persistence-${UUID.randomUUID().toString}"
|
||||
""")
|
||||
|
||||
sealed trait Command[ReplyMessage] extends ExpectingReply[ReplyMessage] with CborSerializable
|
||||
final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done]) extends Command[Done]
|
||||
final case class IncrementReplyLater(override val replyTo: ActorRef[Done]) extends Command[Done]
|
||||
final case class ReplyNow(override val replyTo: ActorRef[Done]) extends Command[Done]
|
||||
sealed trait Command[ReplyMessage] extends CborSerializable
|
||||
final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command[Done]
|
||||
final case class IncrementReplyLater(replyTo: ActorRef[Done]) extends Command[Done]
|
||||
final case class ReplyNow(replyTo: ActorRef[Done]) extends Command[Done]
|
||||
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
|
||||
|
||||
sealed trait Event extends CborSerializable
|
||||
|
|
@ -53,17 +52,17 @@ object EventSourcedBehaviorReplySpec {
|
|||
commandHandler = (state, command) =>
|
||||
command match {
|
||||
|
||||
case cmd: IncrementWithConfirmation =>
|
||||
Effect.persist(Incremented(1)).thenReply(cmd)(_ => Done)
|
||||
case IncrementWithConfirmation(replyTo) =>
|
||||
Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done)
|
||||
|
||||
case cmd: IncrementReplyLater =>
|
||||
Effect.persist(Incremented(1)).thenRun((_: State) => ctx.self ! ReplyNow(cmd.replyTo)).thenNoReply()
|
||||
case IncrementReplyLater(replyTo) =>
|
||||
Effect.persist(Incremented(1)).thenRun((_: State) => ctx.self ! ReplyNow(replyTo)).thenNoReply()
|
||||
|
||||
case cmd: ReplyNow =>
|
||||
Effect.reply(cmd)(Done)
|
||||
case ReplyNow(replyTo) =>
|
||||
Effect.reply(replyTo)(Done)
|
||||
|
||||
case query: GetValue =>
|
||||
Effect.reply(query)(state)
|
||||
case GetValue(replyTo) =>
|
||||
Effect.reply(replyTo)(state)
|
||||
|
||||
},
|
||||
eventHandler = (state, evt) =>
|
||||
|
|
|
|||
|
|
@ -32,7 +32,6 @@ import akka.persistence.query.PersistenceQuery
|
|||
import akka.persistence.query.Sequence
|
||||
import akka.persistence.query.journal.leveldb.scaladsl.LeveldbReadJournal
|
||||
import akka.persistence.snapshot.SnapshotStore
|
||||
import akka.persistence.typed.ExpectingReply
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import akka.persistence.typed.SnapshotCompleted
|
||||
|
|
@ -99,9 +98,7 @@ object EventSourcedBehaviorSpec {
|
|||
final case object IncrementLater extends Command
|
||||
final case object IncrementAfterReceiveTimeout extends Command
|
||||
final case object IncrementTwiceAndThenLog extends Command
|
||||
final case class IncrementWithConfirmation(override val replyTo: ActorRef[Done])
|
||||
extends Command
|
||||
with ExpectingReply[Done]
|
||||
final case class IncrementWithConfirmation(replyTo: ActorRef[Done]) extends Command
|
||||
final case object DoNothingAndThenLog extends Command
|
||||
final case object EmptyEventsListAndThenLog extends Command
|
||||
final case class GetValue(replyTo: ActorRef[State]) extends Command
|
||||
|
|
@ -198,8 +195,8 @@ object EventSourcedBehaviorSpec {
|
|||
case IncrementWithPersistAll(n) =>
|
||||
Effect.persist((0 until n).map(_ => Incremented(1)))
|
||||
|
||||
case cmd: IncrementWithConfirmation =>
|
||||
Effect.persist(Incremented(1)).thenReply(cmd)(_ => Done)
|
||||
case IncrementWithConfirmation(replyTo) =>
|
||||
Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Done)
|
||||
|
||||
case GetValue(replyTo) =>
|
||||
replyTo ! state
|
||||
|
|
|
|||
|
|
@ -24,7 +24,6 @@ import akka.actor.typed.internal.PoisonPill
|
|||
import akka.actor.typed.javadsl.StashOverflowException
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import akka.persistence.typed.ExpectingReply
|
||||
import akka.persistence.typed.PersistenceId
|
||||
import akka.persistence.typed.RecoveryCompleted
|
||||
import com.typesafe.config.Config
|
||||
|
|
@ -45,22 +44,21 @@ object EventSourcedBehaviorStashSpec {
|
|||
}
|
||||
""").withFallback(ConfigFactory.defaultReference()).resolve()
|
||||
|
||||
sealed trait Command[ReplyMessage] extends ExpectingReply[ReplyMessage]
|
||||
sealed trait Command[ReplyMessage]
|
||||
// Unstash and change to active mode
|
||||
final case class Activate(id: String, override val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
final case class Activate(id: String, val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
// Change to active mode, stash incoming Increment
|
||||
final case class Deactivate(id: String, override val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
final case class Deactivate(id: String, val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
// Persist Incremented if in active mode, otherwise stashed
|
||||
final case class Increment(id: String, override val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
final case class Increment(id: String, val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
// Persist ValueUpdated, independent of active/inactive
|
||||
final case class UpdateValue(id: String, value: Int, override val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
final case class UpdateValue(id: String, value: Int, val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
// Retrieve current state, independent of active/inactive
|
||||
final case class GetValue(replyTo: ActorRef[State]) extends Command[State]
|
||||
final case class Unhandled(replyTo: ActorRef[NotUsed]) extends Command[NotUsed]
|
||||
final case class Throw(id: String, t: Throwable, override val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
final case class IncrementThenThrow(id: String, t: Throwable, override val replyTo: ActorRef[Ack])
|
||||
extends Command[Ack]
|
||||
final case class Slow(id: String, latch: CountDownLatch, override val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
final case class Throw(id: String, t: Throwable, val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
final case class IncrementThenThrow(id: String, t: Throwable) extends Command[Ack]
|
||||
final case class Slow(id: String, latch: CountDownLatch, val replyTo: ActorRef[Ack]) extends Command[Ack]
|
||||
|
||||
final case class Ack(id: String)
|
||||
|
||||
|
|
@ -115,27 +113,27 @@ object EventSourcedBehaviorStashSpec {
|
|||
|
||||
private def active(state: State, command: Command[_]): ReplyEffect[Event, State] = {
|
||||
command match {
|
||||
case cmd: Increment =>
|
||||
Effect.persist(Incremented(1)).thenReply(cmd)(_ => Ack(cmd.id))
|
||||
case cmd @ UpdateValue(_, value, _) =>
|
||||
Effect.persist(ValueUpdated(value)).thenReply(cmd)(_ => Ack(cmd.id))
|
||||
case query: GetValue =>
|
||||
Effect.reply(query)(state)
|
||||
case cmd: Deactivate =>
|
||||
Effect.persist(Deactivated).thenReply(cmd)(_ => Ack(cmd.id))
|
||||
case cmd: Activate =>
|
||||
case Increment(id, replyTo) =>
|
||||
Effect.persist(Incremented(1)).thenReply(replyTo)(_ => Ack(id))
|
||||
case UpdateValue(id, value, replyTo) =>
|
||||
Effect.persist(ValueUpdated(value)).thenReply(replyTo)(_ => Ack(id))
|
||||
case GetValue(replyTo) =>
|
||||
Effect.reply(replyTo)(state)
|
||||
case Deactivate(id, replyTo) =>
|
||||
Effect.persist(Deactivated).thenReply(replyTo)(_ => Ack(id))
|
||||
case Activate(id, replyTo) =>
|
||||
// already active
|
||||
Effect.reply(cmd)(Ack(cmd.id))
|
||||
Effect.reply(replyTo)(Ack(id))
|
||||
case _: Unhandled =>
|
||||
Effect.unhandled.thenNoReply()
|
||||
case Throw(id, t, replyTo) =>
|
||||
replyTo ! Ack(id)
|
||||
throw t
|
||||
case cmd: IncrementThenThrow =>
|
||||
Effect.persist(Incremented(1)).thenRun((_: State) => throw cmd.t).thenNoReply()
|
||||
case cmd: Slow =>
|
||||
cmd.latch.await(30, TimeUnit.SECONDS)
|
||||
Effect.reply(cmd)(Ack(cmd.id))
|
||||
case IncrementThenThrow(_, throwable) =>
|
||||
Effect.persist(Incremented(1)).thenRun((_: State) => throw throwable).thenNoReply()
|
||||
case Slow(id, latch, replyTo) =>
|
||||
latch.await(30, TimeUnit.SECONDS)
|
||||
Effect.reply(replyTo)(Ack(id))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -143,15 +141,15 @@ object EventSourcedBehaviorStashSpec {
|
|||
command match {
|
||||
case _: Increment =>
|
||||
Effect.stash()
|
||||
case cmd @ UpdateValue(_, value, _) =>
|
||||
Effect.persist(ValueUpdated(value)).thenReply(cmd)(_ => Ack(cmd.id))
|
||||
case query: GetValue =>
|
||||
Effect.reply(query)(state)
|
||||
case cmd: Deactivate =>
|
||||
case UpdateValue(id, value, replyTo) =>
|
||||
Effect.persist(ValueUpdated(value)).thenReply(replyTo)(_ => Ack(id))
|
||||
case GetValue(replyTo) =>
|
||||
Effect.reply(replyTo)(state)
|
||||
case Deactivate(id, replyTo) =>
|
||||
// already inactive
|
||||
Effect.reply(cmd)(Ack(cmd.id))
|
||||
case cmd: Activate =>
|
||||
Effect.persist(Activated).thenReply(cmd)((_: State) => Ack(cmd.id)).thenUnstashAll()
|
||||
Effect.reply(replyTo)(Ack(id))
|
||||
case Activate(id, replyTo) =>
|
||||
Effect.persist(Activated).thenReply(replyTo)((_: State) => Ack(id)).thenUnstashAll()
|
||||
case _: Unhandled =>
|
||||
Effect.unhandled.thenNoReply()
|
||||
case Throw(id, t, replyTo) =>
|
||||
|
|
@ -429,7 +427,7 @@ class EventSourcedBehaviorStashSpec
|
|||
|
||||
(1 to 10).foreach { n =>
|
||||
if (n == 3)
|
||||
c ! IncrementThenThrow(s"inc-$n", new TestException("test"), ackProbe.ref)
|
||||
c ! IncrementThenThrow(s"inc-$n", new TestException("test"))
|
||||
else
|
||||
c ! Increment(s"inc-$n", ackProbe.ref)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue