From bed17cc1723c3e4ef475325b04ed4f1638954060 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 18 Oct 2018 11:38:27 +0200 Subject: [PATCH] null as empty state, in javadsl, #25768 --- .../EventsourcedJournalInteractions.scala | 6 +- .../typed/javadsl/CommandHandler.scala | 47 +++-- .../typed/javadsl/EventHandler.scala | 19 +- .../typed/javadsl/NullEmptyStateTest.java | 108 +++++++++++ .../typed/javadsl/PrimitiveStateTest.java | 85 ++++++++ .../akka/persistence/typed/NullBlogState.java | 181 ++++++++++++++++++ .../persistence/typed/OptionalBlogState.java | 4 +- .../typed/scaladsl/NullEmptyStateSpec.scala | 67 +++++++ .../typed/scaladsl/PrimitiveStateSpec.scala | 67 +++++++ 9 files changed, 568 insertions(+), 16 deletions(-) create mode 100644 akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java create mode 100644 akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java create mode 100644 akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala index 91bb60f0b6..6371d673e0 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventsourcedJournalInteractions.scala @@ -108,7 +108,11 @@ private[akka] trait EventsourcedJournalInteractions[C, E, S] { } protected def internalSaveSnapshot(state: EventsourcedRunning.EventsourcedState[S]): Unit = { - setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot(SnapshotMetadata(setup.persistenceId.id, state.seqNr), state.state), setup.selfUntyped) + // don't store null state + if (state.state != null) + setup.snapshotStore.tell(SnapshotProtocol.SaveSnapshot( + SnapshotMetadata(setup.persistenceId.id, state.seqNr), + state.state), setup.selfUntyped) } } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala index 5d900532b7..71c9d49966 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/CommandHandler.scala @@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl import java.util.function.BiFunction import java.util.function.Predicate +import java.util.function.{ Function ⇒ JFunction } import akka.annotation.InternalApi import akka.persistence.typed.internal._ @@ -23,18 +24,14 @@ trait CommandHandler[Command, Event, State] { object CommandHandlerBuilder { - private val _trueStatePredicate: Predicate[Any] = new Predicate[Any] { - override def test(t: Any): Boolean = true - } - - private def trueStatePredicate[S]: Predicate[S] = _trueStatePredicate.asInstanceOf[Predicate[S]] - /** * @param stateClass The handlers defined by this builder are used when the state is an instance of the `stateClass` * @return A new, mutable, command handler builder */ def builder[Command, Event, S <: State, State](stateClass: Class[S]): CommandHandlerBuilder[Command, Event, S, State] = - new CommandHandlerBuilder(stateClass, statePredicate = trueStatePredicate) + new CommandHandlerBuilder(statePredicate = new Predicate[S] { + override def test(state: S): Boolean = state != null && stateClass.isAssignableFrom(state.getClass) + }) /** * @param statePredicate The handlers defined by this builder are used when the `statePredicate` is `true`, @@ -42,7 +39,7 @@ object CommandHandlerBuilder { * @return A new, mutable, command handler builder */ def builder[Command, Event, State](statePredicate: Predicate[State]): CommandHandlerBuilder[Command, Event, State, State] = - new CommandHandlerBuilder(classOf[Any].asInstanceOf[Class[State]], statePredicate) + new CommandHandlerBuilder(statePredicate) /** * INTERNAL API @@ -54,7 +51,7 @@ object CommandHandlerBuilder { } final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalApi private[persistence] ( - val stateClass: Class[S], val statePredicate: Predicate[S]) { + val statePredicate: Predicate[S]) { import CommandHandlerBuilder.CommandHandlerCase private var cases: List[CommandHandlerCase[Command, Event, State]] = Nil @@ -62,7 +59,7 @@ final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalAp private def addCase(predicate: Command ⇒ Boolean, handler: BiFunction[S, Command, Effect[Event, State]]): Unit = { cases = CommandHandlerCase[Command, Event, State]( commandPredicate = predicate, - statePredicate = state ⇒ stateClass.isAssignableFrom(state.getClass) && statePredicate.test(state.asInstanceOf[S]), + statePredicate = state ⇒ statePredicate.test(state.asInstanceOf[S]), handler.asInstanceOf[BiFunction[State, Command, Effect[Event, State]]]) :: cases } @@ -74,17 +71,45 @@ final class CommandHandlerBuilder[Command, Event, S <: State, State] @InternalAp this } + /** + * Match any command which the given `predicate` returns true for. + * + * Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass + * the state in a `BiFunction`. + */ + def matchCommand(predicate: Predicate[Command], handler: JFunction[Command, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = { + addCase(cmd ⇒ predicate.test(cmd), new BiFunction[S, Command, Effect[Event, State]] { + override def apply(state: S, cmd: Command): Effect[Event, State] = handler(cmd) + }) + this + } + + /** + * Match commands that are of the given `commandClass` or subclass thereof + */ def matchCommand[C <: Command](commandClass: Class[C], handler: BiFunction[S, C, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = { addCase(cmd ⇒ commandClass.isAssignableFrom(cmd.getClass), handler.asInstanceOf[BiFunction[S, Command, Effect[Event, State]]]) this } + /** + * Match commands that are of the given `commandClass` or subclass thereof. + * + * Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass + * the state in a `BiFunction`. + */ + def matchCommand[C <: Command](commandClass: Class[C], handler: JFunction[C, Effect[Event, State]]): CommandHandlerBuilder[Command, Event, S, State] = { + matchCommand[C](commandClass, new BiFunction[S, C, Effect[Event, State]] { + override def apply(state: S, cmd: C): Effect[Event, State] = handler(cmd) + }) + } + /** * Compose this builder with another builder. The handlers in this builder will be tried first followed * by the handlers in `other`. */ def orElse[S2 <: State](other: CommandHandlerBuilder[Command, Event, S2, State]): CommandHandlerBuilder[Command, Event, S2, State] = { - val newBuilder = new CommandHandlerBuilder[Command, Event, S2, State](other.stateClass, other.statePredicate) + val newBuilder = new CommandHandlerBuilder[Command, Event, S2, State](other.statePredicate) // problem with overloaded constructor with `cases` as parameter newBuilder.cases = other.cases ::: cases newBuilder diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala index 7df154f93d..743798acc9 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/javadsl/EventHandler.scala @@ -5,6 +5,7 @@ package akka.persistence.typed.javadsl import java.util.function.BiFunction +import java.util.function.{ Function ⇒ JFunction } import akka.annotation.InternalApi import akka.util.OptionVal @@ -49,11 +50,23 @@ final class EventHandlerBuilder[State >: Null, Event]() { this } + /** + * Match any event which is an instance of `E` or a subtype of `E`. + * + * Use this when then `State` is not needed in the `handler`, otherwise there is an overloaded method that pass + * the state in a `BiFunction`. + */ + def matchEvent[E <: Event](eventClass: Class[E], f: JFunction[E, State]): EventHandlerBuilder[State, Event] = { + matchEvent[E](eventClass, new BiFunction[State, E, State] { + override def apply(state: State, event: E): State = f(event) + }) + } + def matchEvent[E <: Event, S <: State](eventClass: Class[E], stateClass: Class[S], biFunction: BiFunction[S, E, State]): EventHandlerBuilder[State, Event] = { cases = EventHandlerCase[State, Event]( - statePredicate = s ⇒ stateClass.isAssignableFrom(s.getClass), + statePredicate = s ⇒ s != null && stateClass.isAssignableFrom(s.getClass), eventPredicate = e ⇒ eventClass.isAssignableFrom(e.getClass), biFunction.asInstanceOf[BiFunction[State, Event, State]]) :: cases this @@ -102,7 +115,9 @@ final class EventHandlerBuilder[State >: Null, Event]() { } result match { - case OptionVal.None ⇒ throw new MatchError(s"No match found for event [${event.getClass}] and state [${state.getClass}]. Has this event been stored using an EventAdapter?") + case OptionVal.None ⇒ + val stateClass = if (state == null) "null" else state.getClass.getName + throw new MatchError(s"No match found for event [${event.getClass}] and state [$stateClass]. Has this event been stored using an EventAdapter?") case OptionVal.Some(s) ⇒ s } } diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java new file mode 100644 index 0000000000..58e1fcdc56 --- /dev/null +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/NullEmptyStateTest.java @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.javadsl; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Behaviors; +import akka.persistence.typed.PersistenceId; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.util.Objects; + +public class NullEmptyStateTest extends JUnitSuite { + + private static final Config config = ConfigFactory.parseString( + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"); + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + + static class NullEmptyState extends PersistentBehavior { + + private final ActorRef probe; + + NullEmptyState(PersistenceId persistenceId, ActorRef probe) { + super(persistenceId); + this.probe = probe; + } + + @Override + public String emptyState() { + return null; + } + + @Override + public void onRecoveryCompleted(String s) { + probe.tell("onRecoveryCompleted:" + s); + } + + @Override + public CommandHandler commandHandler() { + CommandHandlerBuilder b1 = + commandHandlerBuilder(Objects::isNull) + .matchCommand("stop"::equals, command -> Effect().stop()) + .matchCommand(String.class, this::persistCommand); + + CommandHandlerBuilder b2 = + commandHandlerBuilder(String.class) + .matchCommand("stop"::equals, command -> Effect().stop()) + .matchCommand(String.class, this::persistCommand); + + return b1.orElse(b2).build(); + } + + private Effect persistCommand(String command) { + return Effect().persist(command); + } + + @Override + public EventHandler eventHandler() { + return eventHandlerBuilder() + .matchEvent(String.class, this::applyEvent) + .build(); + } + + private String applyEvent(String state, String event) { + probe.tell("eventHandler:" + state + ":" + event); + if (state == null) + return event; + else + return state + event; + } + } + + @Test + public void handleNullState() throws Exception { + TestProbe probe = testKit.createTestProbe(); + Behavior b = Behaviors.setup(ctx -> new NullEmptyState(new PersistenceId("a"), probe.ref())); + + ActorRef ref1 = testKit.spawn(b); + probe.expectMessage("onRecoveryCompleted:null"); + ref1.tell("stop"); + + ActorRef ref2 = testKit.spawn(b); + probe.expectMessage("onRecoveryCompleted:null"); + ref2.tell("one"); + probe.expectMessage("eventHandler:null:one"); + ref2.tell("two"); + probe.expectMessage("eventHandler:one:two"); + + ref2.tell("stop"); + ActorRef ref3 = testKit.spawn(b); + // eventHandler from reply + probe.expectMessage("eventHandler:null:one"); + probe.expectMessage("eventHandler:one:two"); + probe.expectMessage("onRecoveryCompleted:onetwo"); + ref3.tell("three"); + probe.expectMessage("eventHandler:onetwo:three"); + } +} diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java new file mode 100644 index 0000000000..4cb8896541 --- /dev/null +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PrimitiveStateTest.java @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.javadsl; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Behaviors; +import akka.persistence.typed.PersistenceId; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +public class PrimitiveStateTest extends JUnitSuite { + + private static final Config config = ConfigFactory.parseString( + "akka.persistence.journal.plugin = \"akka.persistence.journal.inmem\" \n"); + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(config); + + static class PrimitiveState extends PersistentBehavior { + + private final ActorRef probe; + + PrimitiveState(PersistenceId persistenceId, ActorRef probe) { + super(persistenceId); + this.probe = probe; + } + + @Override + public Integer emptyState() { + return 0; + } + + @Override + public void onRecoveryCompleted(Integer n) { + probe.tell("onRecoveryCompleted:" + n); + } + + @Override + public CommandHandler commandHandler() { + return (state, command) -> { + if (command < 0) + return Effect().stop(); + else + return Effect().persist(command); + }; + } + + @Override + public EventHandler eventHandler() { + return (state, event) -> { + probe.tell("eventHandler:" + state + ":" + event); + return state + event; + }; + } + } + + @Test + public void handleIntegerState() throws Exception { + TestProbe probe = testKit.createTestProbe(); + Behavior b = Behaviors.setup(ctx -> new PrimitiveState(new PersistenceId("a"), probe.ref())); + ActorRef ref1 = testKit.spawn(b); + probe.expectMessage("onRecoveryCompleted:0"); + ref1.tell(1); + probe.expectMessage("eventHandler:0:1"); + ref1.tell(2); + probe.expectMessage("eventHandler:1:2"); + + ref1.tell(-1); + ActorRef ref2 = testKit.spawn(b); + // eventHandler from reply + probe.expectMessage("eventHandler:0:1"); + probe.expectMessage("eventHandler:1:2"); + probe.expectMessage("onRecoveryCompleted:3"); + ref2.tell(3); + probe.expectMessage("eventHandler:3:3"); + } +} diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java new file mode 100644 index 0000000000..60de48801b --- /dev/null +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/NullBlogState.java @@ -0,0 +1,181 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package jdocs.akka.persistence.typed; + +import akka.Done; +import akka.actor.typed.ActorRef; +import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.CommandHandlerBuilder; +import akka.persistence.typed.javadsl.EventHandler; +import akka.persistence.typed.javadsl.PersistentBehavior; + +import java.util.Objects; + +public class NullBlogState { + + interface BlogEvent { + } + public static class PostAdded implements BlogEvent { + private final String postId; + private final PostContent content; + + public PostAdded(String postId, PostContent content) { + this.postId = postId; + this.content = content; + } + } + + public static class BodyChanged implements BlogEvent { + private final String postId; + private final String newBody; + + public BodyChanged(String postId, String newBody) { + this.postId = postId; + this.newBody = newBody; + } + } + + public static class Published implements BlogEvent { + private final String postId; + + public Published(String postId) { + this.postId = postId; + } + } + + public static class BlogState { + final PostContent postContent; + final boolean published; + + BlogState(PostContent postContent, boolean published) { + this.postContent = postContent; + this.published = published; + } + + public BlogState withContent(PostContent newContent) { + return new BlogState(newContent, this.published); + } + + public String postId() { + return postContent.postId; + } + } + + public interface BlogCommand { + } + public static class AddPost implements BlogCommand { + final PostContent content; + final ActorRef replyTo; + + public AddPost(PostContent content, ActorRef replyTo) { + this.content = content; + this.replyTo = replyTo; + } + } + public static class AddPostDone implements BlogCommand { + final String postId; + + public AddPostDone(String postId) { + this.postId = postId; + } + } + public static class GetPost implements BlogCommand { + final ActorRef replyTo; + + public GetPost(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + public static class ChangeBody implements BlogCommand { + final String newBody; + final ActorRef replyTo; + + public ChangeBody(String newBody, ActorRef replyTo) { + this.newBody = newBody; + this.replyTo = replyTo; + } + } + public static class Publish implements BlogCommand { + final ActorRef replyTo; + + public Publish(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + public static class PassivatePost implements BlogCommand { + + } + public static class PostContent implements BlogCommand { + final String postId; + final String title; + final String body; + + public PostContent(String postId, String title, String body) { + this.postId = postId; + this.title = title; + this.body = body; + } + } + + public static class BlogBehavior extends PersistentBehavior { + + private CommandHandlerBuilder initialCommandHandler() { + return commandHandlerBuilder(Objects::isNull) + .matchCommand(AddPost.class, cmd -> { + PostAdded event = new PostAdded(cmd.content.postId, cmd.content); + return Effect().persist(event) + .andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); + }) + .matchCommand(PassivatePost.class, cmd -> Effect().stop()); + } + + private CommandHandlerBuilder postCommandHandler() { + return commandHandlerBuilder(Objects::nonNull) + .matchCommand(ChangeBody.class, (state, cmd) -> { + BodyChanged event = new BodyChanged(state.postId(), cmd.newBody); + return Effect().persist(event).andThen(() -> cmd.replyTo.tell(Done.getInstance())); + }) + .matchCommand(Publish.class, (state, cmd) -> Effect() + .persist(new Published(state.postId())).andThen(() -> { + System.out.println("Blog post published: " + state.postId()); + cmd.replyTo.tell(Done.getInstance()); + })) + .matchCommand(GetPost.class, (state, cmd) -> { + cmd.replyTo.tell(state.postContent); + return Effect().none(); + }) + .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()) + .matchCommand(PassivatePost.class, cmd -> Effect().stop()); + } + + public BlogBehavior(PersistenceId persistenceId) { + super(persistenceId); + } + + @Override + public BlogState emptyState() { + return null; + } + + @Override + public CommandHandler commandHandler() { + return initialCommandHandler().orElse(postCommandHandler()).build(); + } + + @Override + public EventHandler eventHandler() { + return eventHandlerBuilder() + .matchEvent(PostAdded.class, event -> + new BlogState(event.content, false)) + .matchEvent(BodyChanged.class, (state, chg) -> + state.withContent( + new PostContent(state.postId(), state.postContent.title, chg.newBody))) + .matchEvent(Published.class, (state, event) -> + new BlogState(state.postContent, true)) + .build(); + } + } +} diff --git a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java index 48795ded06..ecd2d3c297 100644 --- a/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java +++ b/akka-persistence-typed/src/test/java/jdocs/akka/persistence/typed/OptionalBlogState.java @@ -129,7 +129,7 @@ public class OptionalBlogState { return Effect().persist(event) .andThen(() -> cmd.replyTo.tell(new AddPostDone(cmd.content.postId))); }) - .matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop()); + .matchCommand(PassivatePost.class, cmd -> Effect().stop()); } private CommandHandlerBuilder, Optional> postCommandHandler() { @@ -148,7 +148,7 @@ public class OptionalBlogState { return Effect().none(); }) .matchCommand(AddPost.class, (state, cmd) -> Effect().unhandled()) - .matchCommand(PassivatePost.class, (state, cmd) -> Effect().stop()); + .matchCommand(PassivatePost.class, cmd -> Effect().stop()); } public BlogBehavior(PersistenceId persistenceId) { diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala new file mode 100644 index 0000000000..e27a90a9c2 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/NullEmptyStateSpec.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import akka.actor.testkit.typed.TestKitSettings +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.typed.PersistenceId +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object NullEmptyStateSpec { + + private val conf = ConfigFactory.parseString( + s""" + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """) +} + +class NullEmptyStateSpec extends ScalaTestWithActorTestKit(NullEmptyStateSpec.conf) with WordSpecLike { + + implicit val testSettings = TestKitSettings(system) + + def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] = + PersistentBehavior[String, String, String]( + persistenceId, + emptyState = null, + commandHandler = (_, command) ⇒ { + if (command == "stop") + Effect.stop + else + Effect.persist(command) + }, + eventHandler = (state, event) ⇒ { + probe.tell("eventHandler:" + state + ":" + event) + if (state == null) event else state + event + } + ).onRecoveryCompleted { s ⇒ + probe.tell("onRecoveryCompleted:" + s) + } + + "A typed persistent actor with primitive state" must { + "persist events and update state" in { + val probe = TestProbe[String]() + val b = primitiveState(PersistenceId("a"), probe.ref) + val ref1 = spawn(b) + probe.expectMessage("onRecoveryCompleted:null") + ref1 ! "one" + probe.expectMessage("eventHandler:null:one") + ref1 ! "two" + probe.expectMessage("eventHandler:one:two") + + ref1 ! "stop" + val ref2 = testKit.spawn(b) + // eventHandler from reply + probe.expectMessage("eventHandler:null:one") + probe.expectMessage("eventHandler:one:two") + probe.expectMessage("onRecoveryCompleted:onetwo") + ref2 ! "three" + probe.expectMessage("eventHandler:onetwo:three") + } + + } +} diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala new file mode 100644 index 0000000000..8b5c4d6c45 --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PrimitiveStateSpec.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import akka.actor.testkit.typed.TestKitSettings +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.typed.PersistenceId +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object PrimitiveStateSpec { + + private val conf = ConfigFactory.parseString( + s""" + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """) +} + +class PrimitiveStateSpec extends ScalaTestWithActorTestKit(PrimitiveStateSpec.conf) with WordSpecLike { + + implicit val testSettings = TestKitSettings(system) + + def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[Int] = + PersistentBehavior[Int, Int, Int]( + persistenceId, + emptyState = 0, + commandHandler = (_, command) ⇒ { + if (command < 0) + Effect.stop + else + Effect.persist(command) + }, + eventHandler = (state, event) ⇒ { + probe.tell("eventHandler:" + state + ":" + event) + state + event + } + ).onRecoveryCompleted { n ⇒ + probe.tell("onRecoveryCompleted:" + n) + } + + "A typed persistent actor with primitive state" must { + "persist events and update state" in { + val probe = TestProbe[String]() + val b = primitiveState(PersistenceId("a"), probe.ref) + val ref1 = spawn(b) + probe.expectMessage("onRecoveryCompleted:0") + ref1 ! 1 + probe.expectMessage("eventHandler:0:1") + ref1 ! 2 + probe.expectMessage("eventHandler:1:2") + + ref1 ! -1 + val ref2 = testKit.spawn(b) + // eventHandler from reply + probe.expectMessage("eventHandler:0:1") + probe.expectMessage("eventHandler:1:2") + probe.expectMessage("onRecoveryCompleted:3") + ref2 ! 3 + probe.expectMessage("eventHandler:3:3") + } + + } +}