diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index 2cd709c6d6..ae05d76266 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -134,13 +134,41 @@ The factory returns a `Behavior` that can be spawned like any other behavior. ### Conflict free replicated data types -The following CRDTs are included that can be used to build your own data model: +Writing code to resolve conflicts can be complicated to get right. +One well-understood technique to create eventually-consistent systems is to +model your state as a Conflict Free Replicated Data Type, a CRDT. There are two types of CRDTs; +operation-based and state-based. For Replicated Event Sourcing the operation-based is a good fit, +since the events represent the operations. Note that this is distinct from the CRDT's implemented +in @ref:[Akka Distributed Data](distributed-data.md), which are state-based rather than operation-based. + +The rule for operation-based CRDT's is that the operations must be commutative — in other words, applying the same events +(which represent the operations) in any order should always produce the same final state. You may assume each event +is applied only once, with @ref:[causal delivery order](#causal-delivery-order). + +The following CRDTs are included that can you can use as the state or part of the state in the entity: * @apidoc[LwwTime] * @apidoc[Counter] * @apidoc[akka.persistence.typed.crdt.ORSet] -Akka serializers are included for all these types and can be used to serialize when @ref[embedded in Jackson](../serialization-jackson.md#using-akka-serialization-for-embedded-types). +Akka serializers are included for all these types and can be used to serialize when +@ref[embedded in Jackson](../serialization-jackson.md#using-akka-serialization-for-embedded-types). + +An example would be a movies watch list that is represented by the general purpose +@apidoc[akka.persistence.typed.crdt.ORSet] CRDT. `ORSet` is short for Observed Remove Set. Elements can be added and +removed any number of times. Concurrent add wins over remove. It is an operation based CRDT where the delta of an +operation (add/remove) can be represented as an event. + +Such movies watch list example: + +Scala +: @@snip [movie](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala) { #movie-entity } + +Java +: @@snip [movie](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java) { #movie-entity } + +The @ref[Auction example](replicated-eventsourcing-auction.md) is a more comprehensive example that illustrates how application-specific +rules can be used to implement an entity with CRDT semantics. ### Last writer wins diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java index 53cea01ed4..4af9d46f74 100644 --- a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java @@ -26,7 +26,7 @@ interface ReplicatedBlogExample { public final class BlogEntity extends ReplicatedEventSourcedBehavior< - BlogEntity.Command, BlogEntity.Event, BlogEntity.BlogState> { + BlogEntity.Command, BlogEntity.Event, BlogEntity.BlogState> { private final ActorContext context; diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java new file mode 100644 index 0000000000..16ae730de1 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.crdt.ORSet; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.EventHandler; +import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior; +import akka.persistence.typed.javadsl.ReplicatedEventSourcing; +import akka.persistence.typed.javadsl.ReplicationContext; + +import java.util.Collections; +import java.util.Set; + +interface ReplicatedMovieExample { + + // #movie-entity + public final class MovieWatchList + extends ReplicatedEventSourcedBehavior> { + + interface Command {} + + public static class AddMovie implements Command { + public final String movieId; + + public AddMovie(String movieId) { + this.movieId = movieId; + } + } + + public static class RemoveMovie implements Command { + public final String movieId; + + public RemoveMovie(String movieId) { + this.movieId = movieId; + } + } + + public static class GetMovieList implements Command { + public final ActorRef replyTo; + + public GetMovieList(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + public static class MovieList { + public final Set movieIds; + + public MovieList(Set movieIds) { + this.movieIds = Collections.unmodifiableSet(movieIds); + } + } + + public static Behavior create( + String entityId, ReplicaId replicaId, Set allReplicas) { + return ReplicatedEventSourcing.withSharedJournal( + "movies", + entityId, + replicaId, + allReplicas, + PersistenceTestKitReadJournal.Identifier(), + MovieWatchList::new); + } + + private MovieWatchList(ReplicationContext replicationContext) { + super(replicationContext); + } + + @Override + public ORSet emptyState() { + return ORSet.empty(getReplicationContext().replicaId()); + } + + @Override + public CommandHandler> commandHandler() { + return newCommandHandlerBuilder() + .forAnyState() + .onCommand( + AddMovie.class, (state, command) -> Effect().persist(state.add(command.movieId))) + .onCommand( + RemoveMovie.class, + (state, command) -> Effect().persist(state.remove(command.movieId))) + .onCommand( + GetMovieList.class, + (state, command) -> { + command.replyTo.tell(new MovieList(state.getElements())); + return Effect().none(); + }) + .build(); + } + + @Override + public EventHandler, ORSet.DeltaOp> eventHandler() { + return newEventHandlerBuilder().forAnyState().onAnyEvent(ORSet::applyOperation); + } + } + // #movie-entity +} diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala index 724ad79af9..2d59b5ce97 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedAuctionExampleSpec.scala @@ -9,9 +9,6 @@ import java.time.Instant import scala.concurrent.duration._ import docs.akka.persistence.typed.ReplicatedAuctionExampleSpec.AuctionEntity -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.testkit.typed.scaladsl.LogCapturing @@ -295,10 +292,7 @@ object ReplicatedAuctionExampleSpec { class ReplicatedAuctionExampleSpec extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) with AnyWordSpecLike - with Matchers - with LogCapturing - with ScalaFutures - with Eventually { + with LogCapturing { import ReplicatedAuctionExampleSpec.AuctionEntity._ "Auction example" should { diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala index afde7d0de5..79d6543a85 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala @@ -4,9 +4,6 @@ package docs.akka.persistence.typed -import org.scalatest.concurrent.Eventually -import org.scalatest.concurrent.ScalaFutures -import org.scalatest.matchers.should.Matchers import org.scalatest.time.Millis import org.scalatest.time.Span import org.scalatest.wordspec.AnyWordSpecLike @@ -140,10 +137,7 @@ object ReplicatedBlogExampleSpec { class ReplicatedBlogExampleSpec extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) with AnyWordSpecLike - with Matchers - with LogCapturing - with ScalaFutures - with Eventually { + with LogCapturing { import ReplicatedBlogExampleSpec.BlogEntity import ReplicatedBlogExampleSpec.BlogEntity._ diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala new file mode 100644 index 0000000000..ff9c14e8dd --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.akka.persistence.typed + +import org.scalatest.wordspec.AnyWordSpecLike + +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.testkit.PersistenceTestKitPlugin +import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal +import akka.persistence.typed.ReplicaId +import akka.persistence.typed.crdt.ORSet +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import akka.persistence.typed.scaladsl.ReplicatedEventSourcing + +object ReplicatedMovieWatchListExampleSpec { + //#movie-entity + object MovieWatchList { + sealed trait Command + final case class AddMovie(movieId: String) extends Command + final case class RemoveMovie(movieId: String) extends Command + final case class GetMovieList(replyTo: ActorRef[MovieList]) extends Command + final case class MovieList(movieIds: Set[String]) + + def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { + ReplicatedEventSourcing.withSharedJournal( + "movies", + entityId, + replicaId, + allReplicaIds, + PersistenceTestKitReadJournal.Identifier) { replicationContext => + EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]]( + replicationContext.persistenceId, + ORSet.empty(replicationContext.replicaId), + (state, cmd) => commandHandler(state, cmd), + (state, event) => eventHandler(state, event)) + } + } + + private def commandHandler(state: ORSet[String], cmd: Command): Effect[ORSet.DeltaOp, ORSet[String]] = { + cmd match { + case AddMovie(movieId) => + Effect.persist(state + movieId) + case RemoveMovie(movieId) => + Effect.persist(state - movieId) + case GetMovieList(replyTo) => + replyTo ! MovieList(state.elements) + Effect.none + } + } + + private def eventHandler(state: ORSet[String], event: ORSet.DeltaOp): ORSet[String] = { + state.applyOperation(event) + } + + } + //#movie-entity + +} + +class ReplicatedMovieWatchListExampleSpec + extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config) + with AnyWordSpecLike + with LogCapturing { + import ReplicatedMovieWatchListExampleSpec._ + + "MovieWatchList" must { + "demonstrate ORSet" in { + import MovieWatchList._ + + val Replicas = Set(ReplicaId("DC-A"), ReplicaId("DC-B")) + + val dcAReplica: ActorRef[Command] = spawn(MovieWatchList("mylist", ReplicaId("DC-A"), Replicas)) + val dcBReplica: ActorRef[Command] = spawn(MovieWatchList("mylist", ReplicaId("DC-B"), Replicas)) + + val probeA = createTestProbe[MovieList]() + val probeB = createTestProbe[MovieList]() + + dcAReplica ! AddMovie("movie-15") + dcAReplica ! AddMovie("movie-17") + dcBReplica ! AddMovie("movie-20") + + eventually { + dcAReplica ! GetMovieList(probeA.ref) + probeA.expectMessage(MovieList(Set("movie-15", "movie-17", "movie-20"))) + dcBReplica ! GetMovieList(probeB.ref) + probeB.expectMessage(MovieList(Set("movie-15", "movie-17", "movie-20"))) + } + + dcBReplica ! RemoveMovie("movie-17") + eventually { + dcAReplica ! GetMovieList(probeA.ref) + probeA.expectMessage(MovieList(Set("movie-15", "movie-20"))) + dcBReplica ! GetMovieList(probeB.ref) + probeB.expectMessage(MovieList(Set("movie-15", "movie-20"))) + } + } + + } + +}