More tests on DurableStateBehavior (#30285)

* More tests on DurableStateBehavior

* ScalafmtCheck

* Removed unused variable

* Removed unused variable
This commit is contained in:
Debasish Ghosh 2021-06-04 19:12:12 +05:30 committed by GitHub
parent f93a01cf32
commit 6fffbe5a27
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 363 additions and 0 deletions

View file

@ -0,0 +1,109 @@
/*
* Copyright (C) 2019-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.state.scaladsl
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.TypedActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
import java.util.concurrent.atomic.AtomicInteger
object DurableStateBehaviorInterceptorSpec {
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.state.plugin = "akka.persistence.state.inmem"
akka.persistence.state.inmem {
class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider"
recovery-timeout = 30s
}
""")
def testBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] =
Behaviors.setup { _ =>
DurableStateBehavior[String, String](
persistenceId,
emptyState = "",
commandHandler = (_, command) =>
command match {
case _ =>
Effect.persist(command).thenRun(newState => probe ! newState)
}).withDurableStateStorePluginId("akka.persistence.state.inmem")
}
}
class DurableStateBehaviorInterceptorSpec
extends ScalaTestWithActorTestKit(DurableStateBehaviorInterceptorSpec.conf)
with AnyWordSpecLike
with LogCapturing {
import DurableStateBehaviorInterceptorSpec._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})")
"DurableStateBehavior interceptor" must {
"be possible to combine with another interceptor" in {
val probe = createTestProbe[String]()
val pid = nextPid()
val toUpper = new BehaviorInterceptor[String, String] {
override def aroundReceive(
ctx: TypedActorContext[String],
msg: String,
target: BehaviorInterceptor.ReceiveTarget[String]): Behavior[String] = {
target(ctx, msg.toUpperCase())
}
}
val ref = spawn(Behaviors.intercept(() => toUpper)(testBehavior(pid, probe.ref)))
ref ! "a"
ref ! "bc"
probe.expectMessage("A")
probe.expectMessage("BC")
}
"be possible to combine with transformMessages" in {
val probe = createTestProbe[String]()
val pid = nextPid()
val ref = spawn(testBehavior(pid, probe.ref).transformMessages[String] {
case s => s.toUpperCase()
})
ref ! "a"
ref ! "bc"
probe.expectMessage("A")
probe.expectMessage("BC")
}
"be possible to combine with MDC" in {
val probe = createTestProbe[String]()
val pid = nextPid()
val ref = spawn(Behaviors.setup[String] { _ =>
Behaviors.withMdc(
staticMdc = Map("pid" -> pid.toString),
mdcForMessage = (msg: String) => Map("msg" -> msg.toUpperCase())) {
testBehavior(pid, probe.ref)
}
})
ref ! "a"
ref ! "bc"
probe.expectMessage("a")
probe.expectMessage("bc")
}
}
}

View file

@ -0,0 +1,125 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.state.scaladsl
/*
* Copyright (C) 2017-2021 Lightbend Inc. <https://www.lightbend.com>
*/
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import org.scalatest.wordspec.AnyWordSpecLike
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
object DurableStateBehaviorTimersSpec {
def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.state.plugin = "akka.persistence.state.inmem"
akka.persistence.state.inmem {
class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider"
recovery-timeout = 30s
}
""")
def testBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] =
Behaviors.setup { _ =>
Behaviors.withTimers { timers =>
DurableStateBehavior[String, String](
persistenceId,
emptyState = "",
commandHandler = (_, command) =>
command match {
case "scheduled" =>
probe ! "scheduled"
Effect.none
case "cmd-0" =>
timers.startSingleTimer("key", "scheduled", Duration.Zero)
Effect.none
case _ =>
timers.startSingleTimer("key", "scheduled", Duration.Zero)
Effect.persist(command).thenRun(_ => probe ! command)
}).withDurableStateStorePluginId("akka.persistence.state.inmem")
}
}
def testTimerFromSetupBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] =
Behaviors.setup { _ =>
Behaviors.withTimers { timers =>
timers.startSingleTimer("key", "scheduled", Duration.Zero)
DurableStateBehavior[String, String](
persistenceId,
emptyState = "",
commandHandler = (_, command) =>
command match {
case "scheduled" =>
probe ! "scheduled"
Effect.none
case _ =>
Effect.persist(command).thenRun(_ => probe ! command)
}).withDurableStateStorePluginId("akka.persistence.state.inmem")
}
}
}
class DurableStateBehaviorTimersSpec
extends ScalaTestWithActorTestKit(DurableStateBehaviorTimersSpec.conf)
with AnyWordSpecLike
with LogCapturing {
import DurableStateBehaviorTimersSpec._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId.ofUniqueId(s"c${pidCounter.incrementAndGet()})")
"DurableStateBehavior withTimers" must {
"be able to schedule message" in {
val probe = createTestProbe[String]()
val pid = nextPid()
val ref = spawn(testBehavior(pid, probe.ref))
ref ! "cmd-0"
probe.expectMessage("scheduled")
}
"not discard timer msg due to stashing" in {
val probe = createTestProbe[String]()
val pid = nextPid()
val ref = spawn(testBehavior(pid, probe.ref))
ref ! "cmd-1"
probe.expectMessage("cmd-1")
probe.expectMessage("scheduled")
}
"be able to schedule message from setup" in {
val probe = createTestProbe[String]()
val pid = nextPid()
val ref = spawn(testTimerFromSetupBehavior(pid, probe.ref))
probe.expectMessage("scheduled")
(1 to 20).foreach { n =>
ref ! s"cmd-$n"
}
probe.receiveMessages(20)
// start new instance that is likely to stash the timer message while replaying
spawn(testTimerFromSetupBehavior(pid, probe.ref))
probe.expectMessage("scheduled")
}
}
}

View file

@ -0,0 +1,64 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.state.scaladsl
/*
* Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
*/
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object NullEmptyStateSpec {
private def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.state.plugin = "akka.persistence.state.inmem"
akka.persistence.state.inmem {
class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider"
recovery-timeout = 30s
}
""")
}
class NullEmptyStateSpec
extends ScalaTestWithActorTestKit(NullEmptyStateSpec.conf)
with AnyWordSpecLike
with LogCapturing {
implicit val testSettings: TestKitSettings = TestKitSettings(system)
def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] =
DurableStateBehavior[String, String](persistenceId, emptyState = null, commandHandler = (_, command) => {
if (command == "stop")
Effect.stop()
else
Effect.persist(command).thenReply(probe)(_ => command)
}).withDurableStateStorePluginId("akka.persistence.state.inmem")
"A typed persistent actor with primitive state" must {
"persist events and update state" in {
val probe = TestProbe[String]()
val b = primitiveState(PersistenceId.ofUniqueId("a"), probe.ref)
val ref1 = spawn(b)
ref1 ! "one"
probe.expectMessage("one")
ref1 ! "two"
probe.expectMessage("two")
ref1 ! "stop"
probe.expectTerminated(ref1)
val _ = spawn(b)
// no events, no replay and hence no messages
probe.expectNoMessage()
}
}
}

View file

@ -0,0 +1,65 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.state.scaladsl
/*
* Copyright (C) 2018-2021 Lightbend Inc. <https://www.lightbend.com>
*/
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.persistence.typed.PersistenceId
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object PrimitiveStateSpec {
private def conf: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.persistence.state.plugin = "akka.persistence.state.inmem"
akka.persistence.state.inmem {
class = "akka.persistence.state.inmem.InmemDurableStateStoreProvider"
recovery-timeout = 30s
}
""")
}
class PrimitiveStateSpec
extends ScalaTestWithActorTestKit(PrimitiveStateSpec.conf)
with AnyWordSpecLike
with LogCapturing {
def primitiveState(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[Int] =
DurableStateBehavior[Int, Int](persistenceId, emptyState = 0, commandHandler = (_, command) => {
if (command < 0)
Effect.stop()
else
Effect.persist(command).thenReply(probe)(_ => command.toString)
}).withDurableStateStorePluginId("akka.persistence.state.inmem")
"A typed persistent actor with primitive state" must {
"persist primitive events and update state" in {
val probe = TestProbe[String]()
val b = primitiveState(PersistenceId.ofUniqueId("a"), probe.ref)
val ref1 = spawn(b)
ref1 ! 1
probe.expectMessage("1")
ref1 ! 2
probe.expectMessage("2")
ref1 ! -1
probe.expectTerminated(ref1)
val ref2 = spawn(b)
// no events, no replay and hence no messages
probe.expectNoMessage()
ref2 ! 3
probe.expectMessage("3")
}
}
}