ORSet example, MovieWatchList

This commit is contained in:
Patrik Nordwall 2020-08-11 13:39:25 +02:00 committed by Christopher Batey
parent d078a6b65f
commit 2e0821c2f0
6 changed files with 244 additions and 17 deletions

View file

@ -134,13 +134,41 @@ The factory returns a `Behavior` that can be spawned like any other behavior.
### Conflict free replicated data types
The following CRDTs are included that can be used to build your own data model:
Writing code to resolve conflicts can be complicated to get right.
One well-understood technique to create eventually-consistent systems is to
model your state as a Conflict Free Replicated Data Type, a CRDT. There are two types of CRDTs;
operation-based and state-based. For Replicated Event Sourcing the operation-based is a good fit,
since the events represent the operations. Note that this is distinct from the CRDT's implemented
in @ref:[Akka Distributed Data](distributed-data.md), which are state-based rather than operation-based.
The rule for operation-based CRDT's is that the operations must be commutative — in other words, applying the same events
(which represent the operations) in any order should always produce the same final state. You may assume each event
is applied only once, with @ref:[causal delivery order](#causal-delivery-order).
The following CRDTs are included that can you can use as the state or part of the state in the entity:
* @apidoc[LwwTime]
* @apidoc[Counter]
* @apidoc[akka.persistence.typed.crdt.ORSet]
Akka serializers are included for all these types and can be used to serialize when @ref[embedded in Jackson](../serialization-jackson.md#using-akka-serialization-for-embedded-types).
Akka serializers are included for all these types and can be used to serialize when
@ref[embedded in Jackson](../serialization-jackson.md#using-akka-serialization-for-embedded-types).
An example would be a movies watch list that is represented by the general purpose
@apidoc[akka.persistence.typed.crdt.ORSet] CRDT. `ORSet` is short for Observed Remove Set. Elements can be added and
removed any number of times. Concurrent add wins over remove. It is an operation based CRDT where the delta of an
operation (add/remove) can be represented as an event.
Such movies watch list example:
Scala
: @@snip [movie](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedMovieWatchListExampleSpec.scala) { #movie-entity }
Java
: @@snip [movie](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedMovieExample.java) { #movie-entity }
The @ref[Auction example](replicated-eventsourcing-auction.md) is a more comprehensive example that illustrates how application-specific
rules can be used to implement an entity with CRDT semantics.
### Last writer wins

View file

@ -26,7 +26,7 @@ interface ReplicatedBlogExample {
public final class BlogEntity
extends ReplicatedEventSourcedBehavior<
BlogEntity.Command, BlogEntity.Event, BlogEntity.BlogState> {
BlogEntity.Command, BlogEntity.Event, BlogEntity.BlogState> {
private final ActorContext<Command> context;

View file

@ -0,0 +1,105 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.crdt.ORSet;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior;
import akka.persistence.typed.javadsl.ReplicatedEventSourcing;
import akka.persistence.typed.javadsl.ReplicationContext;
import java.util.Collections;
import java.util.Set;
interface ReplicatedMovieExample {
// #movie-entity
public final class MovieWatchList
extends ReplicatedEventSourcedBehavior<MovieWatchList.Command, ORSet.DeltaOp, ORSet<String>> {
interface Command {}
public static class AddMovie implements Command {
public final String movieId;
public AddMovie(String movieId) {
this.movieId = movieId;
}
}
public static class RemoveMovie implements Command {
public final String movieId;
public RemoveMovie(String movieId) {
this.movieId = movieId;
}
}
public static class GetMovieList implements Command {
public final ActorRef<MovieList> replyTo;
public GetMovieList(ActorRef<MovieList> replyTo) {
this.replyTo = replyTo;
}
}
public static class MovieList {
public final Set<String> movieIds;
public MovieList(Set<String> movieIds) {
this.movieIds = Collections.unmodifiableSet(movieIds);
}
}
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return ReplicatedEventSourcing.withSharedJournal(
"movies",
entityId,
replicaId,
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
MovieWatchList::new);
}
private MovieWatchList(ReplicationContext replicationContext) {
super(replicationContext);
}
@Override
public ORSet<String> emptyState() {
return ORSet.empty(getReplicationContext().replicaId());
}
@Override
public CommandHandler<Command, ORSet.DeltaOp, ORSet<String>> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(
AddMovie.class, (state, command) -> Effect().persist(state.add(command.movieId)))
.onCommand(
RemoveMovie.class,
(state, command) -> Effect().persist(state.remove(command.movieId)))
.onCommand(
GetMovieList.class,
(state, command) -> {
command.replyTo.tell(new MovieList(state.getElements()));
return Effect().none();
})
.build();
}
@Override
public EventHandler<ORSet<String>, ORSet.DeltaOp> eventHandler() {
return newEventHandlerBuilder().forAnyState().onAnyEvent(ORSet::applyOperation);
}
}
// #movie-entity
}

View file

@ -9,9 +9,6 @@ import java.time.Instant
import scala.concurrent.duration._
import docs.akka.persistence.typed.ReplicatedAuctionExampleSpec.AuctionEntity
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
@ -295,10 +292,7 @@ object ReplicatedAuctionExampleSpec {
class ReplicatedAuctionExampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with Matchers
with LogCapturing
with ScalaFutures
with Eventually {
with LogCapturing {
import ReplicatedAuctionExampleSpec.AuctionEntity._
"Auction example" should {

View file

@ -4,9 +4,6 @@
package docs.akka.persistence.typed
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Millis
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpecLike
@ -140,10 +137,7 @@ object ReplicatedBlogExampleSpec {
class ReplicatedBlogExampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with Matchers
with LogCapturing
with ScalaFutures
with Eventually {
with LogCapturing {
import ReplicatedBlogExampleSpec.BlogEntity
import ReplicatedBlogExampleSpec.BlogEntity._

View file

@ -0,0 +1,106 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.persistence.typed
import org.scalatest.wordspec.AnyWordSpecLike
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.crdt.ORSet
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
object ReplicatedMovieWatchListExampleSpec {
//#movie-entity
object MovieWatchList {
sealed trait Command
final case class AddMovie(movieId: String) extends Command
final case class RemoveMovie(movieId: String) extends Command
final case class GetMovieList(replyTo: ActorRef[MovieList]) extends Command
final case class MovieList(movieIds: Set[String])
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
ReplicatedEventSourcing.withSharedJournal(
"movies",
entityId,
replicaId,
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, ORSet.DeltaOp, ORSet[String]](
replicationContext.persistenceId,
ORSet.empty(replicationContext.replicaId),
(state, cmd) => commandHandler(state, cmd),
(state, event) => eventHandler(state, event))
}
}
private def commandHandler(state: ORSet[String], cmd: Command): Effect[ORSet.DeltaOp, ORSet[String]] = {
cmd match {
case AddMovie(movieId) =>
Effect.persist(state + movieId)
case RemoveMovie(movieId) =>
Effect.persist(state - movieId)
case GetMovieList(replyTo) =>
replyTo ! MovieList(state.elements)
Effect.none
}
}
private def eventHandler(state: ORSet[String], event: ORSet.DeltaOp): ORSet[String] = {
state.applyOperation(event)
}
}
//#movie-entity
}
class ReplicatedMovieWatchListExampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with LogCapturing {
import ReplicatedMovieWatchListExampleSpec._
"MovieWatchList" must {
"demonstrate ORSet" in {
import MovieWatchList._
val Replicas = Set(ReplicaId("DC-A"), ReplicaId("DC-B"))
val dcAReplica: ActorRef[Command] = spawn(MovieWatchList("mylist", ReplicaId("DC-A"), Replicas))
val dcBReplica: ActorRef[Command] = spawn(MovieWatchList("mylist", ReplicaId("DC-B"), Replicas))
val probeA = createTestProbe[MovieList]()
val probeB = createTestProbe[MovieList]()
dcAReplica ! AddMovie("movie-15")
dcAReplica ! AddMovie("movie-17")
dcBReplica ! AddMovie("movie-20")
eventually {
dcAReplica ! GetMovieList(probeA.ref)
probeA.expectMessage(MovieList(Set("movie-15", "movie-17", "movie-20")))
dcBReplica ! GetMovieList(probeB.ref)
probeB.expectMessage(MovieList(Set("movie-15", "movie-17", "movie-20")))
}
dcBReplica ! RemoveMovie("movie-17")
eventually {
dcAReplica ! GetMovieList(probeA.ref)
probeA.expectMessage(MovieList(Set("movie-15", "movie-20")))
dcBReplica ! GetMovieList(probeB.ref)
probeB.expectMessage(MovieList(Set("movie-15", "movie-20")))
}
}
}
}