LwwTime example

* show event and command handlers of the blog example in docs
* add blog example for Java
This commit is contained in:
Patrik Nordwall 2020-08-11 12:50:09 +02:00 committed by Christopher Batey
parent ac469e1a56
commit d078a6b65f
3 changed files with 436 additions and 110 deletions

View file

@ -156,6 +156,21 @@ replica that persisted it. When comparing two @apidoc[LwwTime] the greatest time
identifier is used if the two timestamps are equal, and then the one from the `replicaId` sorted first in
alphanumeric order wins.
Scala
: @@snip [blog](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala) { #event-handler }
Java
: @@snip [blog](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java) { #event-handler }
When creating the `LwwTime` it is good to have a monotonically increasing timestamp, and for that the `increase`
method in `LwwTime` can be used:
Scala
: @@snip [blog](/akka-persistence-typed-tests/src/test/scala/docs/akka/persistence/typed/ReplicatedBlogExampleSpec.scala) { #command-handler }
Java
: @@snip [blog](/akka-persistence-typed-tests/src/test/java/jdocs/akka/persistence/typed/ReplicatedBlogExample.java) { #command-handler }
The nature of last writer wins means that if you only have one timestamp for the state the events must represent an
update of the full state. Otherwise, there is a risk that the state in different replicas will be different and
not eventually converge.
@ -188,10 +203,11 @@ Side effects from the event handler are generally discouraged because the event
result in undesired re-execution of the side effects.
Uses cases for doing side effects in the event handler:
* Doing a side effect only in a single replica
* Doing a side effect once all replicas have seen an event
* A side effect for a replicated event
* A side effect when a conflict has occured
* A side effect when a conflict has occurred
There is no built in support for knowing an event has been replicated to all replicas but it can be modelled in your state.
For some use cases you may need to trigger side effects after consuming replicated events. For example when an auction has been closed in

View file

@ -0,0 +1,292 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.persistence.typed;
import akka.Done;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.persistence.testkit.query.javadsl.PersistenceTestKitReadJournal;
import akka.persistence.typed.ReplicaId;
import akka.persistence.typed.crdt.LwwTime;
import akka.persistence.typed.javadsl.CommandHandler;
import akka.persistence.typed.javadsl.Effect;
import akka.persistence.typed.javadsl.EventHandler;
import akka.persistence.typed.javadsl.ReplicatedEventSourcedBehavior;
import akka.persistence.typed.javadsl.ReplicatedEventSourcing;
import akka.persistence.typed.javadsl.ReplicationContext;
import java.util.Optional;
import java.util.Set;
interface ReplicatedBlogExample {
public final class BlogEntity
extends ReplicatedEventSourcedBehavior<
BlogEntity.Command, BlogEntity.Event, BlogEntity.BlogState> {
private final ActorContext<Command> context;
interface Command {
String getPostId();
}
static final class AddPost implements Command {
final String postId;
final PostContent content;
final ActorRef<AddPostDone> replyTo;
public AddPost(String postId, PostContent content, ActorRef<AddPostDone> replyTo) {
this.postId = postId;
this.content = content;
this.replyTo = replyTo;
}
public String getPostId() {
return postId;
}
}
static final class AddPostDone {
final String postId;
AddPostDone(String postId) {
this.postId = postId;
}
public String getPostId() {
return postId;
}
}
static final class GetPost implements Command {
final String postId;
final ActorRef<PostContent> replyTo;
public GetPost(String postId, ActorRef<PostContent> replyTo) {
this.postId = postId;
this.replyTo = replyTo;
}
public String getPostId() {
return postId;
}
}
static final class ChangeBody implements Command {
final String postId;
final PostContent newContent;
final ActorRef<Done> replyTo;
public ChangeBody(String postId, PostContent newContent, ActorRef<Done> replyTo) {
this.postId = postId;
this.newContent = newContent;
this.replyTo = replyTo;
}
public String getPostId() {
return postId;
}
}
static final class Publish implements Command {
final String postId;
final ActorRef<Done> replyTo;
public Publish(String postId, ActorRef<Done> replyTo) {
this.postId = postId;
this.replyTo = replyTo;
}
public String getPostId() {
return postId;
}
}
interface Event {}
static final class PostAdded implements Event {
final String postId;
final PostContent content;
final LwwTime timestamp;
public PostAdded(String postId, PostContent content, LwwTime timestamp) {
this.postId = postId;
this.content = content;
this.timestamp = timestamp;
}
}
static final class BodyChanged implements Event {
final String postId;
final PostContent content;
final LwwTime timestamp;
public BodyChanged(String postId, PostContent content, LwwTime timestamp) {
this.postId = postId;
this.content = content;
this.timestamp = timestamp;
}
}
static final class Published implements Event {
final String postId;
public Published(String postId) {
this.postId = postId;
}
}
public static final class PostContent {
final String title;
final String body;
public PostContent(String title, String body) {
this.title = title;
this.body = body;
}
}
public static class BlogState {
public static final BlogState EMPTY =
new BlogState(Optional.empty(), new LwwTime(Long.MIN_VALUE, new ReplicaId("")), false);
final Optional<PostContent> content;
final LwwTime contentTimestamp;
final boolean published;
public BlogState(Optional<PostContent> content, LwwTime contentTimestamp, boolean published) {
this.content = content;
this.contentTimestamp = contentTimestamp;
this.published = published;
}
BlogState withContent(PostContent newContent, LwwTime timestamp) {
return new BlogState(Optional.of(newContent), timestamp, this.published);
}
BlogState publish() {
if (published) {
return this;
} else {
return new BlogState(content, contentTimestamp, true);
}
}
boolean isEmpty() {
return !content.isPresent();
}
}
public static Behavior<Command> create(
String entityId, ReplicaId replicaId, Set<ReplicaId> allReplicas) {
return Behaviors.setup(
context ->
ReplicatedEventSourcing.withSharedJournal(
"StringSet",
entityId,
replicaId,
allReplicas,
PersistenceTestKitReadJournal.Identifier(),
replicationContext -> new BlogEntity(context, replicationContext)));
}
private BlogEntity(ActorContext<Command> context, ReplicationContext replicationContext) {
super(replicationContext);
this.context = context;
}
@Override
public BlogState emptyState() {
return BlogState.EMPTY;
}
// #command-handler
@Override
public CommandHandler<Command, Event, BlogState> commandHandler() {
return newCommandHandlerBuilder()
.forAnyState()
.onCommand(AddPost.class, this::onAddPost)
.onCommand(ChangeBody.class, this::onChangeBody)
.onCommand(Publish.class, this::onPublish)
.onCommand(GetPost.class, this::onGetPost)
.build();
}
private Effect<Event, BlogState> onAddPost(BlogState state, AddPost command) {
PostAdded evt =
new PostAdded(
getReplicationContext().entityId(),
command.content,
state.contentTimestamp.increase(
getReplicationContext().currentTimeMillis(),
getReplicationContext().replicaId()));
return Effect()
.persist(evt)
.thenRun(() -> command.replyTo.tell(new AddPostDone(getReplicationContext().entityId())));
}
private Effect<Event, BlogState> onChangeBody(BlogState state, ChangeBody command) {
BodyChanged evt =
new BodyChanged(
getReplicationContext().entityId(),
command.newContent,
state.contentTimestamp.increase(
getReplicationContext().currentTimeMillis(),
getReplicationContext().replicaId()));
return Effect().persist(evt).thenRun(() -> command.replyTo.tell(Done.getInstance()));
}
private Effect<Event, BlogState> onPublish(BlogState state, Publish command) {
Published evt = new Published(getReplicationContext().entityId());
return Effect().persist(evt).thenRun(() -> command.replyTo.tell(Done.getInstance()));
}
private Effect<Event, BlogState> onGetPost(BlogState state, GetPost command) {
context.getLog().info("GetPost {}", state.content);
if (state.content.isPresent()) command.replyTo.tell(state.content.get());
return Effect().none();
}
// #command-handler
// #event-handler
@Override
public EventHandler<BlogState, Event> eventHandler() {
return newEventHandlerBuilder()
.forAnyState()
.onEvent(PostAdded.class, this::onPostAdded)
.onEvent(BodyChanged.class, this::onBodyChanged)
.onEvent(Published.class, this::onPublished)
.build();
}
private BlogState onPostAdded(BlogState state, PostAdded event) {
if (event.timestamp.isAfter(state.contentTimestamp)) {
BlogState s = state.withContent(event.content, event.timestamp);
context.getLog().info("Updating content. New content is {}", s);
return s;
} else {
context.getLog().info("Ignoring event as timestamp is older");
return state;
}
}
private BlogState onBodyChanged(BlogState state, BodyChanged event) {
if (event.timestamp.isAfter(state.contentTimestamp)) {
return state.withContent(event.content, event.timestamp);
} else {
return state;
}
}
private BlogState onPublished(BlogState state, Published event) {
return state.publish();
}
// #event-handler
}
}

View file

@ -4,67 +4,84 @@
package docs.akka.persistence.typed
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.Millis
import org.scalatest.time.Span
import org.scalatest.wordspec.AnyWordSpecLike
import akka.Done
import akka.actor.testkit.typed.scaladsl.{ LogCapturing, ScalaTestWithActorTestKit }
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.testkit.PersistenceTestKitPlugin
import akka.persistence.testkit.query.scaladsl.PersistenceTestKitReadJournal
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.crdt.LwwTime
import akka.persistence.typed.scaladsl._
import akka.serialization.jackson.CborSerializable
import org.scalatest.concurrent.{ Eventually, ScalaFutures }
import org.scalatest.matchers.should.Matchers
import org.scalatest.time.{ Millis, Span }
import org.scalatest.wordspec.AnyWordSpecLike
object ReplicatedBlogExampleSpec {
final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean) {
object BlogEntity {
object BlogState {
val empty: BlogState = BlogState(None, LwwTime(Long.MinValue, ReplicaId("")), published = false)
}
final case class BlogState(content: Option[PostContent], contentTimestamp: LwwTime, published: Boolean)
extends CborSerializable {
def withContent(newContent: PostContent, timestamp: LwwTime): BlogState =
copy(content = Some(newContent), contentTimestamp = timestamp)
def isEmpty: Boolean = content.isEmpty
}
val emptyState: BlogState = BlogState(None, LwwTime(Long.MinValue, ReplicaId("")), published = false)
final case class PostContent(title: String, body: String)
final case class PostSummary(postId: String, title: String)
final case class Published(postId: String) extends BlogEvent
final case class PostContent(title: String, body: String) extends CborSerializable
final case class Published(postId: String) extends Event
sealed trait BlogCommand
final case class AddPost(postId: String, content: PostContent, replyTo: ActorRef[AddPostDone]) extends BlogCommand
sealed trait Command extends CborSerializable
final case class AddPost(postId: String, content: PostContent, replyTo: ActorRef[AddPostDone]) extends Command
final case class AddPostDone(postId: String)
final case class GetPost(postId: String, replyTo: ActorRef[PostContent]) extends BlogCommand
final case class ChangeBody(postId: String, newContent: PostContent, replyTo: ActorRef[Done]) extends BlogCommand
final case class Publish(postId: String, replyTo: ActorRef[Done]) extends BlogCommand
final case class GetPost(postId: String, replyTo: ActorRef[PostContent]) extends Command
final case class ChangeBody(postId: String, newContent: PostContent, replyTo: ActorRef[Done]) extends Command
final case class Publish(postId: String, replyTo: ActorRef[Done]) extends Command
sealed trait BlogEvent extends CborSerializable
final case class PostAdded(postId: String, content: PostContent, timestamp: LwwTime) extends BlogEvent
final case class BodyChanged(postId: String, newContent: PostContent, timestamp: LwwTime) extends BlogEvent
sealed trait Event extends CborSerializable
final case class PostAdded(postId: String, content: PostContent, timestamp: LwwTime) extends Event
final case class BodyChanged(postId: String, newContent: PostContent, timestamp: LwwTime) extends Event
def apply(entityId: String, replicaId: ReplicaId, allReplicaIds: Set[ReplicaId]): Behavior[Command] = {
Behaviors.setup[Command] { ctx =>
ReplicatedEventSourcing.withSharedJournal(
"blog",
entityId,
replicaId,
allReplicaIds,
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
EventSourcedBehavior[Command, Event, BlogState](
replicationContext.persistenceId,
BlogState.empty,
(state, cmd) => commandHandler(ctx, replicationContext, state, cmd),
(state, event) => eventHandler(ctx, replicationContext, state, event))
}
}
}
class ReplicatedBlogExampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with Matchers
with LogCapturing
with ScalaFutures
with Eventually {
import ReplicatedBlogExampleSpec._
implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
def behavior(replicationContext: ReplicationContext, ctx: ActorContext[BlogCommand]) =
EventSourcedBehavior[BlogCommand, BlogEvent, BlogState](
replicationContext.persistenceId,
emptyState,
(state, cmd) =>
//#command-handler
private def commandHandler(
ctx: ActorContext[Command],
replicationContext: ReplicationContext,
state: BlogState,
cmd: Command): Effect[Event, BlogState] = {
cmd match {
case AddPost(_, content, replyTo) =>
val evt =
PostAdded(
replicationContext.persistenceId.id,
replicationContext.entityId,
content,
state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))
Effect.persist(evt).thenRun { _ =>
@ -73,7 +90,7 @@ class ReplicatedBlogExampleSpec
case ChangeBody(_, newContent, replyTo) =>
val evt =
BodyChanged(
replicationContext.persistenceId.id,
replicationContext.entityId,
newContent,
state.contentTimestamp.increase(replicationContext.currentTimeMillis(), replicationContext.replicaId))
Effect.persist(evt).thenRun { _ =>
@ -87,8 +104,16 @@ class ReplicatedBlogExampleSpec
ctx.log.info("GetPost {}", state.content)
state.content.foreach(content => gp.replyTo ! content)
Effect.none
},
(state, event) => {
}
}
//#command-handler
//#event-handler
private def eventHandler(
ctx: ActorContext[Command],
replicationContext: ReplicationContext,
state: BlogState,
event: Event): BlogState = {
ctx.log.info(s"${replicationContext.entityId}:${replicationContext.replicaId} Received event $event")
event match {
case PostAdded(_, content, timestamp) =>
@ -107,42 +132,35 @@ class ReplicatedBlogExampleSpec
case Published(_) =>
state.copy(published = true)
}
})
}
//#event-handler
}
}
class ReplicatedBlogExampleSpec
extends ScalaTestWithActorTestKit(PersistenceTestKitPlugin.config)
with AnyWordSpecLike
with Matchers
with LogCapturing
with ScalaFutures
with Eventually {
import ReplicatedBlogExampleSpec.BlogEntity
import ReplicatedBlogExampleSpec.BlogEntity._
implicit val config: PatienceConfig = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
"Blog Example" should {
"work" in {
val refDcA: ActorRef[BlogCommand] =
spawn(
Behaviors.setup[BlogCommand] { ctx =>
ReplicatedEventSourcing.withSharedJournal(
"blog",
"cat",
ReplicaId("DC-A"),
Set(ReplicaId("DC-A"), ReplicaId("DC-B")),
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
behavior(replicationContext, ctx)
}
},
"dc-a")
val refDcA: ActorRef[Command] =
spawn(BlogEntity("cat", ReplicaId("DC-A"), Set(ReplicaId("DC-A"), ReplicaId("DC-B"))))
val refDcB: ActorRef[BlogCommand] =
spawn(
Behaviors.setup[BlogCommand] { ctx =>
ReplicatedEventSourcing.withSharedJournal(
"blog",
"cat",
ReplicaId("DC-B"),
Set(ReplicaId("DC-A"), ReplicaId("DC-B")),
PersistenceTestKitReadJournal.Identifier) { replicationContext =>
behavior(replicationContext, ctx)
}
},
"dc-b")
val refDcB: ActorRef[Command] =
spawn(BlogEntity("cat", ReplicaId("DC-B"), Set(ReplicaId("DC-A"), ReplicaId("DC-B"))))
import scala.concurrent.duration._
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
import scala.concurrent.duration._
implicit val timeout: Timeout = 3.seconds
val content = PostContent("cats are the bets", "yep")