Counter example, shopping cart

This commit is contained in:
Patrik Nordwall 2020-08-11 14:40:47 +02:00 committed by Christopher Batey
parent 2e0821c2f0
commit 779e827495
6 changed files with 306 additions and 5 deletions

View file

@ -0,0 +1,22 @@
# Shopping cart example
The provided CRDT data structures can be used as the root state of a replicated `EventSourcedBehavior` but they can
also be nested inside another data structure. This requires a bit more careful thinking about the eventual consistency.
In this sample we model a shopping cart as a map of product ids and the number of that product added or removed in the
shopping cart. By using the @apidoc[Counter] CRDT and persisting its `Update` in our events we can be sure that an
add or remove of items in any data center will eventually lead to all data centers ending up with the same number of
each product.
Scala
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala) { #shopping-cart }
Java
: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java) { #shopping-cart }
With this model we cannot have a `ClearCart` command as that could give different states in different data centers.
It is quite easy to imagine such a scenario: commands arriving in the order `ClearCart`, `AddItem('a', 5)` in one
data center and the order `AddItem('a', 5), ClearCart` in another.
To clear a cart a client would instead have to remove as many items of each product as it sees in the cart at the time
of removal.

View file

@ -7,6 +7,7 @@ The following are more realistic examples of building systems with Replicated Ev
@@@ index
* [auction](replicated-eventsourcing-auction.md)
* [shopping cart](replicated-eventsourcing-cart.md)
@@@

View file

@ -187,7 +187,7 @@ interface ReplicatedBlogExample {
return Behaviors.setup(
context ->
ReplicatedEventSourcing.withSharedJournal(
"StringSet",
"blog",
entityId,
replicaId,
allReplicas,

View file

@ -0,0 +1,159 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.crdt.Counter;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior;
import akka.persistence.typed.javadsl.ReplicatedEventSourcing;
import akka.persistence.typed.javadsl.ReplicationContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
interface ReplicatedShoppingCartExample {
// #shopping-cart
public final class ShoppingCart
extends ReplicatedEventSourcedBehavior<
ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {
public interface Event {}
public static final class ItemUpdated implements Event {
public final String productId;
public final Counter.Updated update;
public ItemUpdated(String productId, Counter.Updated update) {
this.productId = productId;
this.update = update;
}
}
public interface Command {}
public static final class AddItem implements Command {
public final String productId;
public final int count;
public AddItem(String productId, int count) {
this.productId = productId;
this.count = count;
}
}
public static final class RemoveItem implements Command {
public final String productId;
public final int count;
public RemoveItem(String productId, int count) {
this.productId = productId;
this.count = count;
}
}
public static class GetCartItems implements Command {
public final ActorRef<CartItems> replyTo;
public GetCartItems(ActorRef<CartItems> replyTo) {
this.replyTo = replyTo;
}
}
public static final class CartItems {
public final Map<String, Integer> items;
public CartItems(Map<String, Integer> items) {
this.items = items;
}
}
public static final class State {
public final Map<String, Counter> items = new HashMap<>();
}
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
"blog",
entityId,
replicaId,
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
ShoppingCart::new);
}
private ShoppingCart(ReplicationContext replicationContext) {
super(replicationContext);
}
@Override
public State emptyState() {
return new State();
}
@Override
public CommandHandler<Command, Event, State> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(AddItem.class, this::onAddItem)
.onCommand(RemoveItem.class, this::onRemoveItem)
.onCommand(GetCartItems.class, this::onGetCartItems)
.build();
}
private Effect<Event, State> onAddItem(State state, AddItem command) {
return Effect()
.persist(new ItemUpdated(command.productId, new Counter.Updated(command.count)));
}
private Effect<Event, State> onRemoveItem(State state, RemoveItem command) {
return Effect()
.persist(new ItemUpdated(command.productId, new Counter.Updated(-command.count)));
}
private Effect<Event, State> onGetCartItems(State state, GetCartItems command) {
command.replyTo.tell(new CartItems(filterEmptyAndNegative(state.items)));
return Effect().none();
}
private Map<String, Integer> filterEmptyAndNegative(Map<String, Counter> cart) {
Map<String, Integer> result = new HashMap<>();
for (Map.Entry<String, Counter> entry : cart.entrySet()) {
int count = entry.getValue().value().intValue();
if (count > 0) result.put(entry.getKey(), count);
}
return Collections.unmodifiableMap(result);
}
@Override
public EventHandler<State, Event> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onEvent(ItemUpdated.class, this::onItemUpdated)
.build();
}
private State onItemUpdated(State state, ItemUpdated event) {
final Counter counterForProduct;
if (state.items.containsKey(event.productId)) {
counterForProduct = state.items.get(event.productId);
} else {
counterForProduct = Counter.empty();
}
state.items.put(event.productId, counterForProduct.applyOperation(event.update));
return state;
}
}
// #shopping-cart
}

View file

@ -4,8 +4,6 @@
package docs.akka.persistence.typed
import org.scalatest.time.Millis
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
@ -141,8 +139,6 @@ class ReplicatedBlogExampleSpec
import ReplicatedBlogExampleSpec.BlogEntity
import ReplicatedBlogExampleSpec.BlogEntity._
implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
"Blog Example" should {
"work" in {
val refDcA: ActorRef[Command] =

View file

@ -0,0 +1,123 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
import java.util.UUID
import docs.akka.persistence.typed.ReplicatedShoppingCartExampleSpec.ShoppingCart.CartItems
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.crdt.Counter
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
import akka.serialization.jackson.CborSerializable
object ReplicatedShoppingCartExampleSpec {
//#shopping-cart
object ShoppingCart {
type ProductId = String
sealed trait Command extends CborSerializable
final case class AddItem(id: ProductId, count: Int) extends Command
final case class RemoveItem(id: ProductId, count: Int) extends Command
final case class GetCartItems(replyTo: ActorRef[CartItems]) extends Command
final case class CartItems(items: Map[ProductId, Int]) extends CborSerializable
sealed trait Event extends CborSerializable
final case class ItemUpdated(id: ProductId, update: Counter.Updated) extends Event
final case class State(items: Map[ProductId, Counter])
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
"blog",
entityId,
replicaId,
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, Event, State](
replicationContext.persistenceId,
State(Map.empty),
(state, cmd) => commandHandler(state, cmd),
(state, event) => eventHandler(state, event))
}
}
private def commandHandler(state: State, cmd: Command): Effect[Event, State] = {
cmd match {
case AddItem(productId, count) =>
Effect.persist(ItemUpdated(productId, Counter.Updated(count)))
case RemoveItem(productId, count) =>
Effect.persist(ItemUpdated(productId, Counter.Updated(-count)))
case GetCartItems(replyTo) =>
val items = state.items.collect {
case (id, counter) if counter.value > 0 => id -> counter.value.toInt
}
replyTo ! CartItems(items)
Effect.none
}
}
private def eventHandler(state: State, event: Event): State = {
event match {
case ItemUpdated(id, update) =>
val newItems = state.items.get(id) match {
case Some(counter) => state.items + (id -> counter.applyOperation(update))
case None => state.items + (id -> Counter.empty.applyOperation(update))
}
State(newItems)
}
}
}
//#shopping-cart
}
class ReplicatedShoppingCartExampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing {
import ReplicatedShoppingCartExampleSpec.ShoppingCart
"Replicated shopping cart" should {
"work" in {
val cartId = UUID.randomUUID().toString
val refDcA: ActorRef[ShoppingCart.Command] =
spawn(ShoppingCart(cartId, ReplicaId("DC-A"), Set(ReplicaId("DC-A"), ReplicaId("DC-B"))))
val refDcB: ActorRef[ShoppingCart.Command] =
spawn(ShoppingCart(cartId, ReplicaId("DC-B"), Set(ReplicaId("DC-A"), ReplicaId("DC-B"))))
val fidgetSpinnerId = "T2912"
val rubicsCubeId = "T1302"
refDcA ! ShoppingCart.AddItem(fidgetSpinnerId, 10)
refDcB ! ShoppingCart.AddItem(rubicsCubeId, 10)
refDcA ! ShoppingCart.AddItem(rubicsCubeId, 10)
refDcA ! ShoppingCart.AddItem(fidgetSpinnerId, 10)
refDcB ! ShoppingCart.AddItem(fidgetSpinnerId, 10)
refDcA ! ShoppingCart.RemoveItem(fidgetSpinnerId, 10)
refDcA ! ShoppingCart.AddItem(rubicsCubeId, 10)
refDcB ! ShoppingCart.RemoveItem(rubicsCubeId, 10)
val replyProbe = createTestProbe[CartItems]()
eventually {
refDcA ! ShoppingCart.GetCartItems(replyProbe.ref)
replyProbe.expectMessage(CartItems(Map(fidgetSpinnerId -> 20, rubicsCubeId -> 20)))
}
}
}
}