Merge pull request #17828 from ktoso/wip-nested-persist-ktoso

~per #15640 support nested persist calls
This commit is contained in:
Konrad Malawski 2015-06-26 12:53:16 +02:00
commit 83e931ea7e
10 changed files with 752 additions and 46 deletions

View file

@ -9,11 +9,13 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor._
import akka.testkit.{ AkkaSpec, ImplicitSender, TestLatch, TestProbe }
import com.typesafe.config.Config
import org.scalatest.matchers.{ MatchResult, Matcher }
import scala.collection.immutable.Seq
import scala.concurrent.Await
import scala.collection.immutable
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import scala.util.Random
import scala.util.{ Try, Random }
import scala.util.control.NoStackTrace
object PersistentActorSpec {
@ -426,6 +428,130 @@ object PersistentActorSpec {
}
}
class MultipleAndNestedPersists(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case s: String
probe ! s
persist(s + "-outer-1") { outer
probe ! outer
persist(s + "-inner-1") { inner probe ! inner }
}
persist(s + "-outer-2") { outer
probe ! outer
persist(s + "-inner-2") { inner probe ! inner }
}
}
}
class MultipleAndNestedPersistAsyncs(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case s: String
probe ! s
persistAsync(s + "-outer-1") { outer
probe ! outer
persistAsync(s + "-inner-1") { inner probe ! inner }
}
persistAsync(s + "-outer-2") { outer
probe ! outer
persistAsync(s + "-inner-2") { inner probe ! inner }
}
}
}
class DeeplyNestedPersistAsyncs(name: String, maxDepth: Int, probe: ActorRef) extends ExamplePersistentActor(name) {
var currentDepths = Map.empty[String, Int].withDefaultValue(1)
def weMustGoDeeper: String Unit = { dWithDepth
val d = dWithDepth.split("-").head
probe ! dWithDepth
if (currentDepths(d) < maxDepth) {
currentDepths = currentDepths.updated(d, currentDepths(d) + 1)
persistAsync(d + "-" + currentDepths(d))(weMustGoDeeper)
} else {
// reset depth counter before next command
currentDepths = currentDepths.updated(d, 1)
}
}
val receiveCommand: Receive = {
case s: String
probe ! s
persistAsync(s + "-" + 1)(weMustGoDeeper)
}
}
class NestedPersistNormalAndAsyncs(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case s: String
probe ! s
persist(s + "-outer-1") { outer
probe ! outer
persistAsync(s + "-inner-async-1") { inner
probe ! inner
}
}
persist(s + "-outer-2") { outer
probe ! outer
persistAsync(s + "-inner-async-2") { inner
probe ! inner
}
}
}
}
class NestedPersistAsyncsAndNormal(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case s: String
probe ! s
persistAsync(s + "-outer-async-1") { outer
probe ! outer
persist(s + "-inner-1") { inner
probe ! inner
}
}
persistAsync(s + "-outer-async-2") { outer
probe ! outer
persist(s + "-inner-2") { inner
probe ! inner
}
}
}
}
class NestedPersistInAsyncEnforcesStashing(name: String, probe: ActorRef) extends ExamplePersistentActor(name) {
val receiveCommand: Receive = {
case s: String
probe ! s
persistAsync(s + "-outer-async") { outer
probe ! outer
persist(s + "-inner") { inner
probe ! inner
Thread.sleep(1000) // really long wait here...
// the next incoming command must be handled by the following function
context.become({ case _ sender() ! "done" })
}
}
}
}
class DeeplyNestedPersists(name: String, maxDepth: Int, probe: ActorRef) extends ExamplePersistentActor(name) {
var currentDepths = Map.empty[String, Int].withDefaultValue(1)
def weMustGoDeeper: String Unit = { dWithDepth
val d = dWithDepth.split("-").head
probe ! dWithDepth
if (currentDepths(d) < maxDepth) {
currentDepths = currentDepths.updated(d, currentDepths(d) + 1)
persist(d + "-" + currentDepths(d))(weMustGoDeeper)
} else {
// reset depth counter before next command
currentDepths = currentDepths.updated(d, 1)
}
}
val receiveCommand: Receive = {
case s: String
probe ! s
persist(s + "-" + 1)(weMustGoDeeper)
}
}
class StackableTestPersistentActor(val probe: ActorRef) extends StackableTestPersistentActor.BaseActor with PersistentActor with StackableTestPersistentActor.MixinActor {
override def persistenceId: String = "StackableTestPersistentActor"
@ -893,6 +1019,88 @@ abstract class PersistentActorSpec(config: Config) extends PersistenceSpec(confi
expectNoMsg(100.millis)
}
"allow multiple persists with nested persist calls" in {
val persistentActor = system.actorOf(Props(classOf[MultipleAndNestedPersists], name, testActor))
persistentActor ! "a"
persistentActor ! "b"
expectMsg("a")
expectMsg("a-outer-1")
expectMsg("a-outer-2")
expectMsg("a-inner-1")
expectMsg("a-inner-2")
// and only then process "b"
expectMsg("b")
expectMsg("b-outer-1")
expectMsg("b-outer-2")
expectMsg("b-inner-1")
expectMsg("b-inner-2")
}
"allow multiple persistAsyncs with nested persistAsync calls" in {
val persistentActor = system.actorOf(Props(classOf[MultipleAndNestedPersistAsyncs], name, testActor))
persistentActor ! "a"
persistentActor ! "b"
val msgs = receiveN(10).map(_.toString)
val as = msgs.filter(_ startsWith "a")
val bs = msgs.filter(_ startsWith "b")
as should equal(List("a", "a-outer-1", "a-outer-2", "a-inner-1", "a-inner-2"))
bs should equal(List("b", "b-outer-1", "b-outer-2", "b-inner-1", "b-inner-2"))
}
"allow deeply nested persist calls" in {
val nestedPersists = 6
val persistentActor = system.actorOf(Props(classOf[DeeplyNestedPersists], name, nestedPersists, testActor))
persistentActor ! "a"
persistentActor ! "b"
expectMsg("a")
receiveN(6) should ===((1 to nestedPersists).map("a-" + _))
// and only then process "b"
expectMsg("b")
receiveN(6) should ===((1 to nestedPersists).map("b-" + _))
}
"allow deeply nested persistAsync calls" in {
val nestedPersistAsyncs = 6
val persistentActor = system.actorOf(Props(classOf[DeeplyNestedPersistAsyncs], name, nestedPersistAsyncs, testActor))
persistentActor ! "a"
expectMsg("a")
val got = receiveN(nestedPersistAsyncs)
got should beIndependentlyOrdered("a-")
persistentActor ! "b"
persistentActor ! "c"
val expectedReplies = 2 + (nestedPersistAsyncs * 2)
receiveN(expectedReplies).map(_.toString) should beIndependentlyOrdered("b-", "c-")
}
"allow mixed nesting of persistAsync in persist calls" in {
val persistentActor = system.actorOf(Props(classOf[NestedPersistNormalAndAsyncs], name, testActor))
persistentActor ! "a"
expectMsg("a")
receiveN(4) should equal(List("a-outer-1", "a-outer-2", "a-inner-async-1", "a-inner-async-2"))
}
"allow mixed nesting of persist in persistAsync calls" in {
val persistentActor = system.actorOf(Props(classOf[NestedPersistAsyncsAndNormal], name, testActor))
persistentActor ! "a"
expectMsg("a")
receiveN(4) should equal(List("a-outer-async-1", "a-outer-async-2", "a-inner-1", "a-inner-2"))
}
"make sure persist retains promised semantics when nested in persistAsync callback xoxo" in {
val persistentActor = system.actorOf(Props(classOf[NestedPersistInAsyncEnforcesStashing], name, testActor))
persistentActor ! "a"
expectMsg("a")
expectMsg("a-outer-async")
expectMsg("a-inner")
persistentActor ! "b"
expectMsg("done")
// which means that b only got applied after the inner persist() handler finished
// so it keeps the persist() semantics, even though we should not recommend this style it can come in handy I guess
}
"be able to delete events" in {
val persistentActor = namedPersistentActor[Behavior1PersistentActor]