diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorInterceptorSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorInterceptorSpec.scala new file mode 100644 index 0000000000..dc6369fef6 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorInterceptorSpec.scala @@ -0,0 +1,109 @@ +/* + * Copyright (C) 2019-2021 Lightbend Inc. + */ + +package akka.persistence.typed.state.scaladsl + +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.TypedActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.PersistenceId +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +import java.util.concurrent.atomic.AtomicInteger + +object DurableStateBehaviorInterceptorSpec { + + def conf: Config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.persistence.state.plugin = "akka.persistence.state.inmem" + akka.persistence.state.inmem { + class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider" + recovery-timeout = 30s + } + """) + + def testBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] = + Behaviors.setup { _ => + DurableStateBehavior[String, String]( + persistenceId, + emptyState = "", + commandHandler = (_, command) => + command match { + case _ => + Effect.persist(command).thenRun(newState => probe ! newState) + }).withDurableStateStorePluginId("akka.persistence.state.inmem") + } + +} + +class DurableStateBehaviorInterceptorSpec + extends ScalaTestWithActorTestKit(DurableStateBehaviorInterceptorSpec.conf) + with AnyWordSpecLike + with LogCapturing { + + import DurableStateBehaviorInterceptorSpec._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})") + + "DurableStateBehavior interceptor" must { + + "be possible to combine with another interceptor" in { + val probe = createTestProbe[String]() + val pid = nextPid() + + val toUpper = new BehaviorInterceptor[String, String] { + override def aroundReceive( + ctx: TypedActorContext[String], + msg: String, + target: BehaviorInterceptor.ReceiveTarget[String]): Behavior[String] = { + target(ctx, msg.toUpperCase()) + } + } + + val ref = spawn(Behaviors.intercept(() => toUpper)(testBehavior(pid, probe.ref))) + + ref ! "a" + ref ! "bc" + probe.expectMessage("A") + probe.expectMessage("BC") + } + + "be possible to combine with transformMessages" in { + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(testBehavior(pid, probe.ref).transformMessages[String] { + case s => s.toUpperCase() + }) + + ref ! "a" + ref ! "bc" + probe.expectMessage("A") + probe.expectMessage("BC") + } + + "be possible to combine with MDC" in { + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(Behaviors.setup[String] { _ => + Behaviors.withMdc( + staticMdc = Map("pid" -> pid.toString), + mdcForMessage = (msg: String) => Map("msg" -> msg.toUpperCase())) { + testBehavior(pid, probe.ref) + } + }) + + ref ! "a" + ref ! "bc" + probe.expectMessage("a") + probe.expectMessage("bc") + + } + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorTimersSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorTimersSpec.scala new file mode 100644 index 0000000000..557d4c285d --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/DurableStateBehaviorTimersSpec.scala @@ -0,0 +1,125 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.typed.state.scaladsl + +/* + * Copyright (C) 2017-2021 Lightbend Inc. + */ + +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.PersistenceId +import org.scalatest.wordspec.AnyWordSpecLike +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.duration._ + +object DurableStateBehaviorTimersSpec { + + def conf: Config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.persistence.state.plugin = "akka.persistence.state.inmem" + akka.persistence.state.inmem { + class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider" + recovery-timeout = 30s + } + """) + + def testBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] = + Behaviors.setup { _ => + Behaviors.withTimers { timers => + DurableStateBehavior[String, String]( + persistenceId, + emptyState = "", + commandHandler = (_, command) => + command match { + case "scheduled" => + probe ! "scheduled" + Effect.none + case "cmd-0" => + timers.startSingleTimer("key", "scheduled", Duration.Zero) + Effect.none + case _ => + timers.startSingleTimer("key", "scheduled", Duration.Zero) + Effect.persist(command).thenRun(_ => probe ! command) + }).withDurableStateStorePluginId("akka.persistence.state.inmem") + } + } + + def testTimerFromSetupBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] = + Behaviors.setup { _ => + Behaviors.withTimers { timers => + timers.startSingleTimer("key", "scheduled", Duration.Zero) + + DurableStateBehavior[String, String]( + persistenceId, + emptyState = "", + commandHandler = (_, command) => + command match { + case "scheduled" => + probe ! "scheduled" + Effect.none + case _ => + Effect.persist(command).thenRun(_ => probe ! command) + }).withDurableStateStorePluginId("akka.persistence.state.inmem") + } + } + +} + +class DurableStateBehaviorTimersSpec + extends ScalaTestWithActorTestKit(DurableStateBehaviorTimersSpec.conf) + with AnyWordSpecLike + with LogCapturing { + + import DurableStateBehaviorTimersSpec._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})") + + "DurableStateBehavior withTimers" must { + + "be able to schedule message" in { + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(testBehavior(pid, probe.ref)) + + ref ! "cmd-0" + probe.expectMessage("scheduled") + } + + "not discard timer msg due to stashing" in { + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(testBehavior(pid, probe.ref)) + + ref ! "cmd-1" + probe.expectMessage("cmd-1") + probe.expectMessage("scheduled") + } + + "be able to schedule message from setup" in { + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(testTimerFromSetupBehavior(pid, probe.ref)) + + probe.expectMessage("scheduled") + + (1 to 20).foreach { n => + ref ! s"cmd-$n" + } + probe.receiveMessages(20) + + // start new instance that is likely to stash the timer message while replaying + spawn(testTimerFromSetupBehavior(pid, probe.ref)) + probe.expectMessage("scheduled") + } + + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/NullEmptyStateSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/NullEmptyStateSpec.scala new file mode 100644 index 0000000000..f3ea072318 --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/NullEmptyStateSpec.scala @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.typed.state.scaladsl + +/* + * Copyright (C) 2018-2021 Lightbend Inc. + */ + +import akka.actor.testkit.typed.TestKitSettings +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.typed.PersistenceId +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +object NullEmptyStateSpec { + + private def conf: Config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.persistence.state.plugin = "akka.persistence.state.inmem" + akka.persistence.state.inmem { + class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider" + recovery-timeout = 30s + } + """) +} + +class NullEmptyStateSpec + extends ScalaTestWithActorTestKit(NullEmptyStateSpec.conf) + with AnyWordSpecLike + with LogCapturing { + + implicit val testSettings: TestKitSettings = TestKitSettings(system) + + def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] = + DurableStateBehavior[String, String](persistenceId, emptyState = null, commandHandler = (_, command) => { + if (command == "stop") + Effect.stop() + else + Effect.persist(command).thenReply(probe)(_ => command) + }).withDurableStateStorePluginId("akka.persistence.state.inmem") + + "A typed persistent actor with primitive state" must { + "persist events and update state" in { + val probe = TestProbe[String]() + val b = primitiveState(PersistenceId.ofUniqueId("a"), probe.ref) + val ref1 = spawn(b) + ref1 ! "one" + probe.expectMessage("one") + ref1 ! "two" + probe.expectMessage("two") + ref1 ! "stop" + probe.expectTerminated(ref1) + + val _ = spawn(b) + // no events, no replay and hence no messages + probe.expectNoMessage() + } + } +} diff --git a/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/PrimitiveStateSpec.scala b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/PrimitiveStateSpec.scala new file mode 100644 index 0000000000..4610b2887c --- /dev/null +++ b/akka-persistence-typed-tests/src/test/scala/akka/persistence/typed/state/scaladsl/PrimitiveStateSpec.scala @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +package akka.persistence.typed.state.scaladsl + +/* + * Copyright (C) 2018-2021 Lightbend Inc. + */ + +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.persistence.typed.PersistenceId +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.wordspec.AnyWordSpecLike + +object PrimitiveStateSpec { + + private def conf: Config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.persistence.state.plugin = "akka.persistence.state.inmem" + akka.persistence.state.inmem { + class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider" + recovery-timeout = 30s + } + """) +} + +class PrimitiveStateSpec + extends ScalaTestWithActorTestKit(PrimitiveStateSpec.conf) + with AnyWordSpecLike + with LogCapturing { + + def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[Int] = + DurableStateBehavior[Int, Int](persistenceId, emptyState = 0, commandHandler = (_, command) => { + if (command < 0) + Effect.stop() + else + Effect.persist(command).thenReply(probe)(_ => command.toString) + }).withDurableStateStorePluginId("akka.persistence.state.inmem") + + "A typed persistent actor with primitive state" must { + "persist primitive events and update state" in { + val probe = TestProbe[String]() + val b = primitiveState(PersistenceId.ofUniqueId("a"), probe.ref) + val ref1 = spawn(b) + ref1 ! 1 + probe.expectMessage("1") + ref1 ! 2 + probe.expectMessage("2") + + ref1 ! -1 + probe.expectTerminated(ref1) + + val ref2 = spawn(b) + // no events, no replay and hence no messages + probe.expectNoMessage() + ref2 ! 3 + probe.expectMessage("3") + } + + } +}