diff --git a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md index 34d126a619..2cd709c6d6 100644 --- a/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md +++ b/akka-docs/src/main/paradox/typed/replicated-eventsourcing.md @@ -156,6 +156,21 @@ replica that persisted it. When comparing two @apidoc[LwwTime] the greatest time identifier is used if the two timestamps are equal, and then the one from the `replicaId` sorted first in alphanumeric order wins. +Scala +: @@snip [blog](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala) { #event-handler } + +Java +: @@snip [blog](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java) { #event-handler } + +When creating the `LwwTime` it is good to have a monotonically increasing timestamp, and for that the `increase` +method in `LwwTime` can be used: + +Scala +: @@snip [blog](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala) { #command-handler } + +Java +: @@snip [blog](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java) { #command-handler } + The nature of last writer wins means that if you only have one timestamp for the state the events must represent an update of the full state. Otherwise, there is a risk that the state in different replicas will be different and not eventually converge. @@ -188,10 +203,11 @@ Side effects from the event handler are generally discouraged because the event result in undesired re-execution of the side effects. Uses cases for doing side effects in the event handler: + * Doing a side effect only in a single replica * Doing a side effect once all replicas have seen an event * A side effect for a replicated event -* A side effect when a conflict has occured +* A side effect when a conflict has occurred There is no built in support for knowing an event has been replicated to all replicas but it can be modelled in your state. For some use cases you may need to trigger side effects after consuming replicated events. For example when an auction has been closed in diff --git a/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java new file mode 100644 index 0000000000..53cea01ed4 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java @@ -0,0 +1,292 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.Done; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal; +import akka.persistence.typed.ReplicaId; +import akka.persistence.typed.crdt.LwwTime; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.Effect; +import akka.persistence.typed.javadsl.EventHandler; +import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior; +import akka.persistence.typed.javadsl.ReplicatedEventSourcing; +import akka.persistence.typed.javadsl.ReplicationContext; + +import java.util.Optional; +import java.util.Set; + +interface ReplicatedBlogExample { + + public final class BlogEntity + extends ReplicatedEventSourcedBehavior< + BlogEntity.Command, BlogEntity.Event, BlogEntity.BlogState> { + + private final ActorContext context; + + interface Command { + String getPostId(); + } + + static final class AddPost implements Command { + final String postId; + final PostContent content; + final ActorRef replyTo; + + public AddPost(String postId, PostContent content, ActorRef replyTo) { + this.postId = postId; + this.content = content; + this.replyTo = replyTo; + } + + public String getPostId() { + return postId; + } + } + + static final class AddPostDone { + final String postId; + + AddPostDone(String postId) { + this.postId = postId; + } + + public String getPostId() { + return postId; + } + } + + static final class GetPost implements Command { + final String postId; + final ActorRef replyTo; + + public GetPost(String postId, ActorRef replyTo) { + this.postId = postId; + this.replyTo = replyTo; + } + + public String getPostId() { + return postId; + } + } + + static final class ChangeBody implements Command { + final String postId; + final PostContent newContent; + final ActorRef replyTo; + + public ChangeBody(String postId, PostContent newContent, ActorRef replyTo) { + this.postId = postId; + this.newContent = newContent; + this.replyTo = replyTo; + } + + public String getPostId() { + return postId; + } + } + + static final class Publish implements Command { + final String postId; + final ActorRef replyTo; + + public Publish(String postId, ActorRef replyTo) { + this.postId = postId; + this.replyTo = replyTo; + } + + public String getPostId() { + return postId; + } + } + + interface Event {} + + static final class PostAdded implements Event { + final String postId; + final PostContent content; + final LwwTime timestamp; + + public PostAdded(String postId, PostContent content, LwwTime timestamp) { + this.postId = postId; + this.content = content; + this.timestamp = timestamp; + } + } + + static final class BodyChanged implements Event { + final String postId; + final PostContent content; + final LwwTime timestamp; + + public BodyChanged(String postId, PostContent content, LwwTime timestamp) { + this.postId = postId; + this.content = content; + this.timestamp = timestamp; + } + } + + static final class Published implements Event { + final String postId; + + public Published(String postId) { + this.postId = postId; + } + } + + public static final class PostContent { + final String title; + final String body; + + public PostContent(String title, String body) { + this.title = title; + this.body = body; + } + } + + public static class BlogState { + + public static final BlogState EMPTY = + new BlogState(Optional.empty(), new LwwTime(Long.MIN_VALUE, new ReplicaId("")), false); + + final Optional content; + final LwwTime contentTimestamp; + final boolean published; + + public BlogState(Optional content, LwwTime contentTimestamp, boolean published) { + this.content = content; + this.contentTimestamp = contentTimestamp; + this.published = published; + } + + BlogState withContent(PostContent newContent, LwwTime timestamp) { + return new BlogState(Optional.of(newContent), timestamp, this.published); + } + + BlogState publish() { + if (published) { + return this; + } else { + return new BlogState(content, contentTimestamp, true); + } + } + + boolean isEmpty() { + return !content.isPresent(); + } + } + + public static Behavior create( + String entityId, ReplicaId replicaId, Set allReplicas) { + return Behaviors.setup( + context -> + ReplicatedEventSourcing.withSharedJournal( + "StringSet", + entityId, + replicaId, + allReplicas, + PersistenceTestKitReadJournal.Identifier(), + replicationContext -> new BlogEntity(context, replicationContext))); + } + + private BlogEntity(ActorContext context, ReplicationContext replicationContext) { + super(replicationContext); + this.context = context; + } + + @Override + public BlogState emptyState() { + return BlogState.EMPTY; + } + + // #command-handler + @Override + public CommandHandler commandHandler() { + return newCommandHandlerBuilder() + .forAnyState() + .onCommand(AddPost.class, this::onAddPost) + .onCommand(ChangeBody.class, this::onChangeBody) + .onCommand(Publish.class, this::onPublish) + .onCommand(GetPost.class, this::onGetPost) + .build(); + } + + private Effect onAddPost(BlogState state, AddPost command) { + PostAdded evt = + new PostAdded( + getReplicationContext().entityId(), + command.content, + state.contentTimestamp.increase( + getReplicationContext().currentTimeMillis(), + getReplicationContext().replicaId())); + return Effect() + .persist(evt) + .thenRun(() -> command.replyTo.tell(new AddPostDone(getReplicationContext().entityId()))); + } + + private Effect onChangeBody(BlogState state, ChangeBody command) { + BodyChanged evt = + new BodyChanged( + getReplicationContext().entityId(), + command.newContent, + state.contentTimestamp.increase( + getReplicationContext().currentTimeMillis(), + getReplicationContext().replicaId())); + return Effect().persist(evt).thenRun(() -> command.replyTo.tell(Done.getInstance())); + } + + private Effect onPublish(BlogState state, Publish command) { + Published evt = new Published(getReplicationContext().entityId()); + return Effect().persist(evt).thenRun(() -> command.replyTo.tell(Done.getInstance())); + } + + private Effect onGetPost(BlogState state, GetPost command) { + context.getLog().info("GetPost {}", state.content); + if (state.content.isPresent()) command.replyTo.tell(state.content.get()); + return Effect().none(); + } + // #command-handler + + // #event-handler + @Override + public EventHandler eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onEvent(PostAdded.class, this::onPostAdded) + .onEvent(BodyChanged.class, this::onBodyChanged) + .onEvent(Published.class, this::onPublished) + .build(); + } + + private BlogState onPostAdded(BlogState state, PostAdded event) { + if (event.timestamp.isAfter(state.contentTimestamp)) { + BlogState s = state.withContent(event.content, event.timestamp); + context.getLog().info("Updating content. New content is {}", s); + return s; + } else { + context.getLog().info("Ignoring event as timestamp is older"); + return state; + } + } + + private BlogState onBodyChanged(BlogState state, BodyChanged event) { + if (event.timestamp.isAfter(state.contentTimestamp)) { + return state.withContent(event.content, event.timestamp); + } else { + return state; + } + } + + private BlogState onPublished(BlogState state, Published event) { + return state.publish(); + } + // #event-handler + + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala index e9a86ab8bf..afde7d0de5 100644 --- a/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala +++ b/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala @@ -4,44 +4,137 @@ package docs.akka.persistence.typed +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Millis +import org.scalatest.time.Span +import org.scalatest.wordspec.AnyWordSpecLike + import akka.Done -import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit } +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.typed.ActorRef -import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.Behaviors import akka.persistence.testkit.PersistenceTestKitPlugin import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal import akka.persistence.typed.ReplicaId import akka.persistence.typed.crdt.LwwTime import akka.persistence.typed.scaladsl._ import akka.serialization.jackson.CborSerializable -import org.scalatest.concurrent.{ Eventually, ScalaFutures } -import org.scalatest.matchers.should.Matchers -import org.scalatest.time.{ Millis, Span } -import org.scalatest.wordspec.AnyWordSpecLike object ReplicatedBlogExampleSpec { - final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) { - def withContent(newContent: PostContent, timestamp: LwwTime): BlogState = - copy(content = Some(newContent), contentTimestamp = timestamp) - def isEmpty: Boolean = content.isEmpty + object BlogEntity { + + object BlogState { + val empty: BlogState = BlogState(None, LwwTime(Long.MinValue, ReplicaId("")), published = false) + } + final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) + extends CborSerializable { + def withContent(newContent: PostContent, timestamp: LwwTime): BlogState = + copy(content = Some(newContent), contentTimestamp = timestamp) + + def isEmpty: Boolean = content.isEmpty + } + + final case class PostContent(title: String, body: String) extends CborSerializable + final case class Published(postId: String) extends Event + + sealed trait Command extends CborSerializable + final case class AddPost(postId: String, content: PostContent, replyTo: ActorRef[AddPostDone]) extends Command + final case class AddPostDone(postId: String) + final case class GetPost(postId: String, replyTo: ActorRef[PostContent]) extends Command + final case class ChangeBody(postId: String, newContent: PostContent, replyTo: ActorRef[Done]) extends Command + final case class Publish(postId: String, replyTo: ActorRef[Done]) extends Command + + sealed trait Event extends CborSerializable + final case class PostAdded(postId: String, content: PostContent, timestamp: LwwTime) extends Event + final case class BodyChanged(postId: String, newContent: PostContent, timestamp: LwwTime) extends Event + + def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = { + Behaviors.setup[Command] { ctx => + ReplicatedEventSourcing.withSharedJournal( + "blog", + entityId, + replicaId, + allReplicaIds, + PersistenceTestKitReadJournal.Identifier) { replicationContext => + EventSourcedBehavior[Command, Event, BlogState]( + replicationContext.persistenceId, + BlogState.empty, + (state, cmd) => commandHandler(ctx, replicationContext, state, cmd), + (state, event) => eventHandler(ctx, replicationContext, state, event)) + } + } + } + + //#command-handler + private def commandHandler( + ctx: ActorContext[Command], + replicationContext: ReplicationContext, + state: BlogState, + cmd: Command): Effect[Event, BlogState] = { + cmd match { + case AddPost(_, content, replyTo) => + val evt = + PostAdded( + replicationContext.entityId, + content, + state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)) + Effect.persist(evt).thenRun { _ => + replyTo ! AddPostDone(replicationContext.entityId) + } + case ChangeBody(_, newContent, replyTo) => + val evt = + BodyChanged( + replicationContext.entityId, + newContent, + state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)) + Effect.persist(evt).thenRun { _ => + replyTo ! Done + } + case p: Publish => + Effect.persist(Published("id")).thenRun { _ => + p.replyTo ! Done + } + case gp: GetPost => + ctx.log.info("GetPost {}", state.content) + state.content.foreach(content => gp.replyTo ! content) + Effect.none + } + } + //#command-handler + + //#event-handler + private def eventHandler( + ctx: ActorContext[Command], + replicationContext: ReplicationContext, + state: BlogState, + event: Event): BlogState = { + ctx.log.info(s"${replicationContext.entityId}:${replicationContext.replicaId} Received event $event") + event match { + case PostAdded(_, content, timestamp) => + if (timestamp.isAfter(state.contentTimestamp)) { + val s = state.withContent(content, timestamp) + ctx.log.info("Updating content. New content is {}", s) + s + } else { + ctx.log.info("Ignoring event as timestamp is older") + state + } + case BodyChanged(_, newContent, timestamp) => + if (timestamp.isAfter(state.contentTimestamp)) + state.withContent(newContent, timestamp) + else state + case Published(_) => + state.copy(published = true) + } + } + //#event-handler } - val emptyState: BlogState = BlogState(None, LwwTime(Long.MinValue, ReplicaId("")), published = false) - - final case class PostContent(title: String, body: String) - final case class PostSummary(postId: String, title: String) - final case class Published(postId: String) extends BlogEvent - - sealed trait BlogCommand - final case class AddPost(postId: String, content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand - final case class AddPostDone(postId: String) - final case class GetPost(postId: String, replyTo: ActorRef[PostContent]) extends BlogCommand - final case class ChangeBody(postId: String, newContent: PostContent, replyTo: ActorRef[Done]) extends BlogCommand - final case class Publish(postId: String, replyTo: ActorRef[Done]) extends BlogCommand - - sealed trait BlogEvent extends CborSerializable - final case class PostAdded(postId: String, content: PostContent, timestamp: LwwTime) extends BlogEvent - final case class BodyChanged(postId: String, newContent: PostContent, timestamp: LwwTime) extends BlogEvent } class ReplicatedBlogExampleSpec @@ -51,98 +144,23 @@ class ReplicatedBlogExampleSpec with LogCapturing with ScalaFutures with Eventually { - import ReplicatedBlogExampleSpec._ + import ReplicatedBlogExampleSpec.BlogEntity + import ReplicatedBlogExampleSpec.BlogEntity._ implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) - def behavior(replicationContext: ReplicationContext, ctx: ActorContext[BlogCommand]) = - EventSourcedBehavior[BlogCommand, BlogEvent, BlogState]( - replicationContext.persistenceId, - emptyState, - (state, cmd) => - cmd match { - case AddPost(_, content, replyTo) => - val evt = - PostAdded( - replicationContext.persistenceId.id, - content, - state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)) - Effect.persist(evt).thenRun { _ => - replyTo ! AddPostDone(replicationContext.entityId) - } - case ChangeBody(_, newContent, replyTo) => - val evt = - BodyChanged( - replicationContext.persistenceId.id, - newContent, - state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId)) - Effect.persist(evt).thenRun { _ => - replyTo ! Done - } - case p: Publish => - Effect.persist(Published("id")).thenRun { _ => - p.replyTo ! Done - } - case gp: GetPost => - ctx.log.info("GetPost {}", state.content) - state.content.foreach(content => gp.replyTo ! content) - Effect.none - }, - (state, event) => { - ctx.log.info(s"${replicationContext.entityId}:${replicationContext.replicaId} Received event $event") - event match { - case PostAdded(_, content, timestamp) => - if (timestamp.isAfter(state.contentTimestamp)) { - val s = state.withContent(content, timestamp) - ctx.log.info("Updating content. New content is {}", s) - s - } else { - ctx.log.info("Ignoring event as timestamp is older") - state - } - case BodyChanged(_, newContent, timestamp) => - if (timestamp.isAfter(state.contentTimestamp)) - state.withContent(newContent, timestamp) - else state - case Published(_) => - state.copy(published = true) - } - }) - "Blog Example" should { "work" in { - val refDcA: ActorRef[BlogCommand] = - spawn( - Behaviors.setup[BlogCommand] { ctx => - ReplicatedEventSourcing.withSharedJournal( - "blog", - "cat", - ReplicaId("DC-A"), - Set(ReplicaId("DC-A"), ReplicaId("DC-B")), - PersistenceTestKitReadJournal.Identifier) { replicationContext => - behavior(replicationContext, ctx) - } - }, - "dc-a") + val refDcA: ActorRef[Command] = + spawn(BlogEntity("cat", ReplicaId("DC-A"), Set(ReplicaId("DC-A"), ReplicaId("DC-B")))) - val refDcB: ActorRef[BlogCommand] = - spawn( - Behaviors.setup[BlogCommand] { ctx => - ReplicatedEventSourcing.withSharedJournal( - "blog", - "cat", - ReplicaId("DC-B"), - Set(ReplicaId("DC-A"), ReplicaId("DC-B")), - PersistenceTestKitReadJournal.Identifier) { replicationContext => - behavior(replicationContext, ctx) - } - }, - "dc-b") + val refDcB: ActorRef[Command] = + spawn(BlogEntity("cat", ReplicaId("DC-B"), Set(ReplicaId("DC-A"), ReplicaId("DC-B")))) + + import scala.concurrent.duration._ import akka.actor.typed.scaladsl.AskPattern._ import akka.util.Timeout - - import scala.concurrent.duration._ implicit val timeout: Timeout = 3.seconds val content = PostContent("cats are the bets", "yep")