From 779e827495351d232a6dafc6591f03c6b585f34a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Aug 2020 14:40:47 +0200 Subject: [PATCH] Counter example, shopping cart --- .../typed/replicated-eventsourcing-cart.md | 22 +++ .../replicated-eventsourcing-examples.md | 1 + .../typed/ReplicatedBlogExample.java | 2 +- .../typed/ReplicatedShoppingCartExample.java | 159 ++++++++++++++++++ .../typed/ReplicatedBlogExampleSpec.scala | 4 - .../ReplicatedShoppingCartExampleSpec.scala | 123 ++++++++++++++ 6 files changed, 306 insertions(+), 5 deletions(-) create mode 100644 akka-docs/src/main/paradox/typed/replicated-eventsourcing-cart.md create mode 100644 akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java create mode 100644 akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing-cart.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing-cart.md new file mode 100644 index 0000000000..09da74c292 --- /dev/null +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing-cart.md @@ -0,0 +1,22 @@ +# Shopping cart example + +The provided CRDT data structures can be used as the root state of a replicated `EventSourcedBehavior` but they can +also be nested inside another data structure. This requires a bit more careful thinking about the eventual consistency. + +In this sample we model a shopping cart as a map of product ids and the number of that product added or removed in the +shopping cart. By using the @apidoc[Counter] CRDT and persisting its `Update` in our events we can be sure that an +add or remove of items in any data center will eventually lead to all data centers ending up with the same number of +each product. + +Scala +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala) { #shopping-cart } + +Java +: @@snip [AuctionExample](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java) { #shopping-cart } + +With this model we cannot have a `ClearCart` command as that could give different states in different data centers. +It is quite easy to imagine such a scenario: commands arriving in the order `ClearCart`, `AddItem('a', 5)` in one +data center and the order `AddItem('a', 5), ClearCart` in another. + +To clear a cart a client would instead have to remove as many items of each product as it sees in the cart at the time +of removal. diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing-examples.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing-examples.md index 0d53de9289..e0f17977d1 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing-examples.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing-examples.md @@ -7,6 +7,7 @@ The following are more realistic examples of building systems with Replicated Ev @@@ index * [auction](replicated-eventsourcing-auction.md) +* [shopping cart](replicated-eventsourcing-cart.md) @@@ diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java index 4af9d46f74..09c14b60c4 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java @@ -187,7 +187,7 @@ interface ReplicatedBlogExample { return Behaviors.setup( context -> ReplicatedEventSourcing.withSharedJournal( - "StringSet", + "blog", entityId, replicaId, allReplicas, diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java new file mode 100644 index 0000000000..2c49cd2fd9 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedShoppingCartExample.java @@ -0,0 +1,159 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.crdt.Counter; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.Effect; +import akka.persistence.typed.javadsl.EventHandler; +import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior; +import akka.persistence.typed.javadsl.ReplicatedEventSourcing; +import akka.persistence.typed.javadsl.ReplicationContext; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +interface ReplicatedShoppingCartExample { + + // #shopping-cart + public final class ShoppingCart + extends ReplicatedEventSourcedBehavior< + ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { + + public interface Event {} + + public static final class ItemUpdated implements Event { + public final String productId; + public final Counter.Updated update; + + public ItemUpdated(String productId, Counter.Updated update) { + this.productId = productId; + this.update = update; + } + } + + public interface Command {} + + public static final class AddItem implements Command { + public final String productId; + public final int count; + + public AddItem(String productId, int count) { + this.productId = productId; + this.count = count; + } + } + + public static final class RemoveItem implements Command { + public final String productId; + public final int count; + + public RemoveItem(String productId, int count) { + this.productId = productId; + this.count = count; + } + } + + public static class GetCartItems implements Command { + public final ActorRef replyTo; + + public GetCartItems(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + public static final class CartItems { + public final Map items; + + public CartItems(Map items) { + this.items = items; + } + } + + public static final class State { + public final Map items = new HashMap<>(); + } + + public static Behavior create( + String entityId, ReplicaId replicaId, Set allReplicas) { + return ReplicatedEventSourcing.withSharedJournal( + "blog", + entityId, + replicaId, + allReplicas, + PersistenceTestKitReadJournal.Identifier(), + ShoppingCart::new); + } + + private ShoppingCart(ReplicationContext replicationContext) { + super(replicationContext); + } + + @Override + public State emptyState() { + return new State(); + } + + @Override + public CommandHandler commandHandler() { + return newCommandHandlerBuilder() + .forAnyState() + .onCommand(AddItem.class, this::onAddItem) + .onCommand(RemoveItem.class, this::onRemoveItem) + .onCommand(GetCartItems.class, this::onGetCartItems) + .build(); + } + + private Effect onAddItem(State state, AddItem command) { + return Effect() + .persist(new ItemUpdated(command.productId, new Counter.Updated(command.count))); + } + + private Effect onRemoveItem(State state, RemoveItem command) { + return Effect() + .persist(new ItemUpdated(command.productId, new Counter.Updated(-command.count))); + } + + private Effect onGetCartItems(State state, GetCartItems command) { + command.replyTo.tell(new CartItems(filterEmptyAndNegative(state.items))); + return Effect().none(); + } + + private Map filterEmptyAndNegative(Map cart) { + Map result = new HashMap<>(); + for (Map.Entry entry : cart.entrySet()) { + int count = entry.getValue().value().intValue(); + if (count > 0) result.put(entry.getKey(), count); + } + return Collections.unmodifiableMap(result); + } + + @Override + public EventHandler eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onEvent(ItemUpdated.class, this::onItemUpdated) + .build(); + } + + private State onItemUpdated(State state, ItemUpdated event) { + final Counter counterForProduct; + if (state.items.containsKey(event.productId)) { + counterForProduct = state.items.get(event.productId); + } else { + counterForProduct = Counter.empty(); + } + state.items.put(event.productId, counterForProduct.applyOperation(event.update)); + return state; + } + } + // #shopping-cart +} diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala index 79d6543a85..4d1d9592d6 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala @@ -4,8 +4,6 @@ package docs.akka.persistence.typed -import org.scalatest.time.Millis -import org.scalatest.time.Span import org.scalatest.wordspec.AnyWordSpecLike import akka.Done @@ -141,8 +139,6 @@ class ReplicatedBlogExampleSpec import ReplicatedBlogExampleSpec.BlogEntity import ReplicatedBlogExampleSpec.BlogEntity._ - implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) - "Blog Example" should { "work" in { val refDcA: ActorRef[Command] = diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala new file mode 100644 index 0000000000..8a311df9a6 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedShoppingCartExampleSpec.scala @@ -0,0 +1,123 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.akka.persistence.typed + +import java.util.UUID + +import docs.akka.persistence.typed.ReplicatedShoppingCartExampleSpec.ShoppingCart.CartItems +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.crdt.Counter +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.ReplicatedEventSourcing +import akka.serialization.jackson.CborSerializable + +object ReplicatedShoppingCartExampleSpec { + + //#shopping-cart + object ShoppingCart { + + type ProductId = String + + sealed trait Command extends CborSerializable + final case class AddItem(id: ProductId, count: Int) extends Command + final case class RemoveItem(id: ProductId, count: Int) extends Command + final case class GetCartItems(replyTo: ActorRef[CartItems]) extends Command + final case class CartItems(items: Map[ProductId, Int]) extends CborSerializable + + sealed trait Event extends CborSerializable + final case class ItemUpdated(id: ProductId, update: Counter.Updated) extends Event + + final case class State(items: Map[ProductId, Counter]) + + def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { + ReplicatedEventSourcing.withSharedJournal( + "blog", + entityId, + replicaId, + allReplicaIds, + PersistenceTestKitReadJournal.Identifier) { replicationContext => + EventSourcedBehavior[Command, Event, State]( + replicationContext.persistenceId, + State(Map.empty), + (state, cmd) => commandHandler(state, cmd), + (state, event) => eventHandler(state, event)) + } + } + + private def commandHandler(state: State, cmd: Command): Effect[Event, State] = { + cmd match { + case AddItem(productId, count) => + Effect.persist(ItemUpdated(productId, Counter.Updated(count))) + case RemoveItem(productId, count) => + Effect.persist(ItemUpdated(productId, Counter.Updated(-count))) + case GetCartItems(replyTo) => + val items = state.items.collect { + case (id, counter) if counter.value > 0 => id -> counter.value.toInt + } + replyTo ! CartItems(items) + Effect.none + } + } + + private def eventHandler(state: State, event: Event): State = { + event match { + case ItemUpdated(id, update) => + val newItems = state.items.get(id) match { + case Some(counter) => state.items + (id -> counter.applyOperation(update)) + case None => state.items + (id -> Counter.empty.applyOperation(update)) + } + State(newItems) + } + } + } + //#shopping-cart +} + +class ReplicatedShoppingCartExampleSpec + extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) + with AnyWordSpecLike + with LogCapturing { + import ReplicatedShoppingCartExampleSpec.ShoppingCart + + "Replicated shopping cart" should { + "work" in { + val cartId = UUID.randomUUID().toString + + val refDcA: ActorRef[ShoppingCart.Command] = + spawn(ShoppingCart(cartId, ReplicaId("DC-A"), Set(ReplicaId("DC-A"), ReplicaId("DC-B")))) + + val refDcB: ActorRef[ShoppingCart.Command] = + spawn(ShoppingCart(cartId, ReplicaId("DC-B"), Set(ReplicaId("DC-A"), ReplicaId("DC-B")))) + + val fidgetSpinnerId = "T2912" + val rubicsCubeId = "T1302" + + refDcA ! ShoppingCart.AddItem(fidgetSpinnerId, 10) + refDcB ! ShoppingCart.AddItem(rubicsCubeId, 10) + refDcA ! ShoppingCart.AddItem(rubicsCubeId, 10) + refDcA ! ShoppingCart.AddItem(fidgetSpinnerId, 10) + refDcB ! ShoppingCart.AddItem(fidgetSpinnerId, 10) + refDcA ! ShoppingCart.RemoveItem(fidgetSpinnerId, 10) + refDcA ! ShoppingCart.AddItem(rubicsCubeId, 10) + refDcB ! ShoppingCart.RemoveItem(rubicsCubeId, 10) + + val replyProbe = createTestProbe[CartItems]() + + eventually { + refDcA ! ShoppingCart.GetCartItems(replyProbe.ref) + replyProbe.expectMessage(CartItems(Map(fidgetSpinnerId -> 20, rubicsCubeId -> 20))) + } + } + } +}