=per #3615 Fix InvalidActorNameException in persistence tests

- use overridden processor and channel ids
- no need anymore to wait for processor instances to stop
- unrelated: fix wrong artifact names in documentation
This commit is contained in:
Martin Krasser 2013-09-18 11:55:29 +02:00
parent 6246099694
commit 5da888548b
6 changed files with 79 additions and 90 deletions

View file

@ -24,7 +24,7 @@ Akka persistence is a separate jar file. Make sure that you have the following d
<dependency> <dependency>
<groupId>com.typesafe.akka</groupId> <groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_@binVersion@</artifactId> <artifactId>akka-persistence-experimental_@binVersion@</artifactId>
<version>@version@</version> <version>@version@</version>
</dependency> </dependency>

View file

@ -22,7 +22,7 @@ Dependencies
Akka persistence is a separate jar file. Make sure that you have the following dependency in your project:: Akka persistence is a separate jar file. Make sure that you have the following dependency in your project::
"com.typesafe.akka" %% "akka-persistence" % "@version@" @crossString@ "com.typesafe.akka" %% "akka-persistence-experimental" % "@version@" @crossString@
Architecture Architecture
============ ============

View file

@ -11,9 +11,9 @@ object ChannelSpec {
|akka.persistence.journal.leveldb.dir = "target/journal-channel-spec" |akka.persistence.journal.leveldb.dir = "target/journal-channel-spec"
""".stripMargin """.stripMargin
class TestProcessor extends Processor { class TestProcessor(name: String) extends NamedProcessor(name) {
val destination = context.actorOf(Props[TestDestination]) val destination = context.actorOf(Props[TestDestination])
val channel = context.actorOf(Channel.props(), "channel") val channel = context.actorOf(Channel.props("channel"))
def receive = { def receive = {
case m @ Persistent(s: String, _) if s.startsWith("a") { case m @ Persistent(s: String, _) if s.startsWith("a") {
@ -51,7 +51,7 @@ class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with
val forwardProbe = TestProbe() val forwardProbe = TestProbe()
val replyProbe = TestProbe() val replyProbe = TestProbe()
val processor = system.actorOf(Props[TestProcessor], name) val processor = system.actorOf(Props(classOf[TestProcessor], name))
system.eventStream.subscribe(confirmProbe.ref, classOf[Confirm]) system.eventStream.subscribe(confirmProbe.ref, classOf[Confirm])
@ -65,61 +65,53 @@ class ChannelSpec extends AkkaSpec(ChannelSpec.config) with PersistenceSpec with
// for replay so that channels can drop confirmed messages) // for replay so that channels can drop confirmed messages)
confirmProbe.expectMsgType[Confirm] confirmProbe.expectMsgType[Confirm]
confirmProbe.expectMsgType[Confirm] confirmProbe.expectMsgType[Confirm]
}
stopAndAwaitTermination(processor) def actorRefFor(topLevelName: String) = {
extension.system.provider.resolveActorRef(RootActorPath(Address("akka", system.name)) / "user" / topLevelName)
} }
"A channel" must { "A channel" must {
"forward un-confirmed messages to destination" in { "forward un-confirmed messages to destination" in {
val processor = system.actorOf(Props[TestProcessor], name) val processor = system.actorOf(Props(classOf[TestProcessor], name))
processor ! Persistent("a2") processor ! Persistent("a2")
expectMsgPF() { case m @ Persistent("fw: a2", _) m.confirm() } expectMsgPF() { case m @ Persistent("fw: a2", _) m.confirm() }
} }
"reply un-confirmed messages to senders" in { "reply un-confirmed messages to senders" in {
val processor = system.actorOf(Props[TestProcessor], name) val processor = system.actorOf(Props(classOf[TestProcessor], name))
processor ! Persistent("b2") processor ! Persistent("b2")
expectMsgPF() { case m @ Persistent("re: b2", _) m.confirm() } expectMsgPF() { case m @ Persistent("re: b2", _) m.confirm() }
} }
"must resolve sender references and preserve message order" in { "must resolve sender references and preserve message order" in {
val channel = system.actorOf(Channel.props(), "testChannel1") val channel = system.actorOf(Channel.props())
val destination = system.actorOf(Props[TestDestination]) val destination = system.actorOf(Props[TestDestination])
val sender1 = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender")
channel tell (Deliver(Persistent("a"), destination), sender1) val empty = actorRefFor("testSender") // will be an EmptyLocalActorRef
expectMsg("a") val sender = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender")
stopAndAwaitTermination(sender1)
// create new incarnation of sender (with same actor path)
val sender2 = system.actorOf(Props(classOf[TestReceiver], testActor), "testSender")
// replayed message (resolved = false) and invalid sender reference // replayed message (resolved = false) and invalid sender reference
channel tell (Deliver(PersistentImpl("a", resolved = false), destination, Resolve.Sender), sender1) channel tell (Deliver(PersistentImpl("a", resolved = false), destination, Resolve.Sender), empty)
// new messages (resolved = true) and valid sender references // new messages (resolved = true) and valid sender references
channel tell (Deliver(Persistent("b"), destination), sender2) channel tell (Deliver(Persistent("b"), destination), sender)
channel tell (Deliver(Persistent("c"), destination), sender2) channel tell (Deliver(Persistent("c"), destination), sender)
expectMsg("a") expectMsg("a")
expectMsg("b") expectMsg("b")
expectMsg("c") expectMsg("c")
} }
"must resolve destination references and preserve message order" in { "must resolve destination references and preserve message order" in {
val channel = system.actorOf(Channel.props(), "testChannel2") val channel = system.actorOf(Channel.props())
val destination1 = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination")
channel ! Deliver(Persistent("a"), destination1) val empty = actorRefFor("testDestination") // will be an EmptyLocalActorRef
expectMsg("a") val destination = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination")
stopAndAwaitTermination(destination1)
// create new incarnation of destination (with same actor path)
val destination2 = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination")
// replayed message (resolved = false) and invalid destination reference // replayed message (resolved = false) and invalid destination reference
channel ! Deliver(PersistentImpl("a", resolved = false), destination1, Resolve.Destination) channel ! Deliver(PersistentImpl("a", resolved = false), empty, Resolve.Destination)
// new messages (resolved = true) and valid destination references // new messages (resolved = true) and valid destination references
channel ! Deliver(Persistent("b"), destination2) channel ! Deliver(Persistent("b"), destination)
channel ! Deliver(Persistent("c"), destination2) channel ! Deliver(Persistent("c"), destination)
expectMsg("a") expectMsg("a")
expectMsg("b") expectMsg("b")

View file

@ -3,12 +3,13 @@ package akka.persistence
import java.io.File import java.io.File
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.reflect.ClassTag
import org.apache.commons.io.FileUtils import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfterEach import org.scalatest.BeforeAndAfterEach
import akka.actor.ActorRef import akka.actor.Props
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.TestActor.Watch
trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec
private var _name: String = _ private var _name: String = _
@ -26,11 +27,11 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒
*/ */
def namePrefix: String = "processor" def namePrefix: String = "processor"
protected def stopAndAwaitTermination(ref: ActorRef) { /**
testActor ! Watch(ref) * Creates a processor with current name as constructor argument.
system.stop(ref) */
expectTerminated(ref) def namedProcessor[T <: NamedProcessor: ClassTag] =
} system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name))
override protected def beforeEach() { override protected def beforeEach() {
_name = namePrefix + counter.incrementAndGet() _name = namePrefix + counter.incrementAndGet()
@ -41,6 +42,10 @@ trait PersistenceSpec extends BeforeAndAfterEach { this: AkkaSpec ⇒
} }
} }
abstract class NamedProcessor(name: String) extends Processor {
override def processorId: String = name
}
trait TurnOffRecoverOnStart { this: Processor trait TurnOffRecoverOnStart { this: Processor
override def preStartProcessor(): Unit = () override def preStartProcessor(): Unit = ()
} }

View file

@ -13,7 +13,7 @@ object ProcessorSpec {
case object GetState case object GetState
class RecoverTestProcessor extends Processor { class RecoverTestProcessor(name: String) extends NamedProcessor(name) {
var state = List.empty[String] var state = List.empty[String]
def receive = { def receive = {
case "boom" throw new Exception("boom") case "boom" throw new Exception("boom")
@ -31,22 +31,22 @@ object ProcessorSpec {
} }
} }
class RecoverOffTestProcessor extends RecoverTestProcessor with TurnOffRecoverOnStart class RecoverOffTestProcessor(name: String) extends RecoverTestProcessor(name) with TurnOffRecoverOnStart
class StoredSenderTestProcessor extends Processor { class StoredSenderTestProcessor(name: String) extends NamedProcessor(name) {
def receive = { def receive = {
case Persistent(payload, _) sender ! payload case Persistent(payload, _) sender ! payload
} }
} }
class RecoveryStatusTestProcessor extends Processor { class RecoveryStatusTestProcessor(name: String) extends NamedProcessor(name) {
def receive = { def receive = {
case Persistent("c", _) if !recoveryRunning sender ! "c" case Persistent("c", _) if !recoveryRunning sender ! "c"
case Persistent(payload, _) if recoveryRunning sender ! payload case Persistent(payload, _) if recoveryRunning sender ! payload
} }
} }
class BehaviorChangeTestProcessor extends Processor { class BehaviorChangeTestProcessor(name: String) extends NamedProcessor(name) {
val acceptA: Actor.Receive = { val acceptA: Actor.Receive = {
case Persistent("a", _) { case Persistent("a", _) {
sender ! "a" sender ! "a"
@ -64,7 +64,7 @@ object ProcessorSpec {
def receive = acceptA def receive = acceptA
} }
class FsmTestProcessor extends Processor with FSM[String, Int] { class FsmTestProcessor(name: String) extends NamedProcessor(name) with FSM[String, Int] {
startWith("closed", 0) startWith("closed", 0)
when("closed") { when("closed") {
@ -80,7 +80,7 @@ object ProcessorSpec {
} }
} }
class OutboundMessageTestProcessor extends Processor { class OutboundMessageTestProcessor(name: String) extends NamedProcessor(name) {
def receive = { def receive = {
case Persistent(payload, snr) sender ! Persistent(snr) case Persistent(payload, snr) sender ! Persistent(snr)
} }
@ -88,8 +88,8 @@ object ProcessorSpec {
class ResumeTestException extends Exception("test") class ResumeTestException extends Exception("test")
class ResumeTestSupervisor extends Actor { class ResumeTestSupervisor(name: String) extends Actor {
val processor = context.actorOf(Props[ResumeTestProcessor], "processor") val processor = context.actorOf(Props(classOf[ResumeTestProcessor], name))
override val supervisorStrategy = override val supervisorStrategy =
OneForOneStrategy() { OneForOneStrategy() {
@ -101,7 +101,7 @@ object ProcessorSpec {
} }
} }
class ResumeTestProcessor extends Processor { class ResumeTestProcessor(name: String) extends NamedProcessor(name) {
var state: List[String] = Nil var state: List[String] = Nil
def receive = { def receive = {
case "boom" throw new ResumeTestException case "boom" throw new ResumeTestException
@ -110,7 +110,7 @@ object ProcessorSpec {
} }
} }
class LastReplayedMsgFailsTestProcessor extends RecoverTestProcessor { class LastReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { override def preRestartProcessor(reason: Throwable, message: Option[Any]) = {
message match { message match {
case Some(m: Persistent) if (recoveryRunning) delete(m) case Some(m: Persistent) if (recoveryRunning) delete(m)
@ -120,7 +120,7 @@ object ProcessorSpec {
} }
} }
class AnyReplayedMsgFailsTestProcessor extends RecoverTestProcessor { class AnyReplayedMsgFailsTestProcessor(name: String) extends RecoverTestProcessor(name) {
val failOnReplayedA: Actor.Receive = { val failOnReplayedA: Actor.Receive = {
case Persistent("a", _) if recoveryRunning throw new Exception("boom") case Persistent("a", _) if recoveryRunning throw new Exception("boom")
} }
@ -135,34 +135,33 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
override protected def beforeEach() { override protected def beforeEach() {
super.beforeEach() super.beforeEach()
val processor = system.actorOf(Props[RecoverTestProcessor], name) val processor = namedProcessor[RecoverTestProcessor]
processor ! Persistent("a") processor ! Persistent("a")
processor ! Persistent("b") processor ! Persistent("b")
processor ! GetState processor ! GetState
expectMsg(List("a-1", "b-2")) expectMsg(List("a-1", "b-2"))
stopAndAwaitTermination(processor)
} }
"A processor" must { "A processor" must {
"recover state on explicit request" in { "recover state on explicit request" in {
val processor = system.actorOf(Props[RecoverOffTestProcessor], name) val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Recover() processor ! Recover()
processor ! GetState processor ! GetState
expectMsg(List("a-1", "b-2")) expectMsg(List("a-1", "b-2"))
} }
"recover state automatically" in { "recover state automatically" in {
val processor = system.actorOf(Props[RecoverTestProcessor], name) val processor = namedProcessor[RecoverTestProcessor]
processor ! GetState processor ! GetState
expectMsg(List("a-1", "b-2")) expectMsg(List("a-1", "b-2"))
} }
"recover state automatically on restart" in { "recover state automatically on restart" in {
val processor = system.actorOf(Props[RecoverTestProcessor], name) val processor = namedProcessor[RecoverTestProcessor]
processor ! "boom" processor ! "boom"
processor ! GetState processor ! GetState
expectMsg(List("a-1", "b-2")) expectMsg(List("a-1", "b-2"))
} }
"buffer new messages until recovery completed" in { "buffer new messages until recovery completed" in {
val processor = system.actorOf(Props[RecoverOffTestProcessor], name) val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Persistent("c") processor ! Persistent("c")
processor ! Recover() processor ! Recover()
processor ! Persistent("d") processor ! Persistent("d")
@ -170,7 +169,7 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
expectMsg(List("a-1", "b-2", "c-3", "d-4")) expectMsg(List("a-1", "b-2", "c-3", "d-4"))
} }
"ignore redundant recovery requests" in { "ignore redundant recovery requests" in {
val processor = system.actorOf(Props[RecoverOffTestProcessor], name) val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Persistent("c") processor ! Persistent("c")
processor ! Recover() processor ! Recover()
processor ! Persistent("d") processor ! Persistent("d")
@ -180,7 +179,7 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5")) expectMsg(List("a-1", "b-2", "c-3", "d-4", "e-5"))
} }
"buffer new messages until restart-recovery completed" in { "buffer new messages until restart-recovery completed" in {
val processor = system.actorOf(Props[RecoverTestProcessor], name) val processor = namedProcessor[RecoverTestProcessor]
processor ! "boom" processor ! "boom"
processor ! Persistent("c") processor ! Persistent("c")
processor ! Persistent("d") processor ! Persistent("d")
@ -188,13 +187,13 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
expectMsg(List("a-1", "b-2", "c-3", "d-4")) expectMsg(List("a-1", "b-2", "c-3", "d-4"))
} }
"allow deletion of journaled messages on failure" in { "allow deletion of journaled messages on failure" in {
val processor = system.actorOf(Props[RecoverTestProcessor], name) val processor = namedProcessor[RecoverTestProcessor]
processor ! Persistent("boom") // journaled message causes failure and will be deleted processor ! Persistent("boom") // journaled message causes failure and will be deleted
processor ! GetState processor ! GetState
expectMsg(List("a-1", "b-2")) expectMsg(List("a-1", "b-2"))
} }
"allow deletion of journaled messages on failure and buffer new messages until restart-recovery completed" in { "allow deletion of journaled messages on failure and buffer new messages until restart-recovery completed" in {
val processor = system.actorOf(Props[RecoverTestProcessor], name) val processor = namedProcessor[RecoverTestProcessor]
processor ! Persistent("boom") // journaled message causes failure and will be deleted processor ! Persistent("boom") // journaled message causes failure and will be deleted
processor ! Persistent("c") processor ! Persistent("c")
processor ! Persistent("d") processor ! Persistent("d")
@ -202,66 +201,63 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
expectMsg(List("a-1", "b-2", "c-4", "d-5")) // deleted message leaves gap in sequence expectMsg(List("a-1", "b-2", "c-4", "d-5")) // deleted message leaves gap in sequence
} }
"store sender references and restore them for replayed messages" in { "store sender references and restore them for replayed messages" in {
system.actorOf(Props[StoredSenderTestProcessor], name) namedProcessor[StoredSenderTestProcessor]
List("a", "b") foreach (expectMsg(_)) List("a", "b") foreach (expectMsg(_))
} }
"properly indicate its recovery status" in { "properly indicate its recovery status" in {
val processor = system.actorOf(Props[RecoveryStatusTestProcessor], name) val processor = namedProcessor[RecoveryStatusTestProcessor]
processor ! Persistent("c") processor ! Persistent("c")
List("a", "b", "c") foreach (expectMsg(_)) List("a", "b", "c") foreach (expectMsg(_))
} }
"continue journaling when changing behavior" in { "continue journaling when changing behavior" in {
val processor = system.actorOf(Props[BehaviorChangeTestProcessor], name) val processor = namedProcessor[BehaviorChangeTestProcessor]
processor ! Persistent("a") processor ! Persistent("a")
processor ! Persistent("b") processor ! Persistent("b")
List("a", "b", "a", "b") foreach (expectMsg(_)) List("a", "b", "a", "b") foreach (expectMsg(_))
} }
"derive outbound messages from the current message" in { "derive outbound messages from the current message" in {
val processor = system.actorOf(Props[OutboundMessageTestProcessor], name) val processor = namedProcessor[OutboundMessageTestProcessor]
processor ! Persistent("c") processor ! Persistent("c")
1 to 3 foreach { _ expectMsgPF() { case Persistent(payload, snr) payload must be(snr) } } 1 to 3 foreach { _ expectMsgPF() { case Persistent(payload, snr) payload must be(snr) } }
} }
"support recovery with upper sequence number bound" in { "support recovery with upper sequence number bound" in {
val processor = system.actorOf(Props[RecoverOffTestProcessor], name) val processor = namedProcessor[RecoverOffTestProcessor]
processor ! Recover(1L) processor ! Recover(1L)
processor ! GetState processor ! GetState
expectMsg(List("a-1")) expectMsg(List("a-1"))
} }
"never replace journaled messages" in { "never replace journaled messages" in {
val processor1 = system.actorOf(Props[RecoverOffTestProcessor], name) val processor1 = namedProcessor[RecoverOffTestProcessor]
processor1 ! Recover(1L) processor1 ! Recover(1L)
processor1 ! Persistent("c") processor1 ! Persistent("c")
processor1 ! GetState processor1 ! GetState
expectMsg(List("a-1", "c-3")) expectMsg(List("a-1", "c-3"))
stopAndAwaitTermination(processor1)
val processor2 = system.actorOf(Props[RecoverOffTestProcessor], name) val processor2 = namedProcessor[RecoverOffTestProcessor]
processor2 ! Recover() processor2 ! Recover()
processor2 ! GetState processor2 ! GetState
expectMsg(List("a-1", "b-2", "c-3")) expectMsg(List("a-1", "b-2", "c-3"))
} }
"be able to skip restart recovery when being resumed" in { "be able to skip restart recovery when being resumed" in {
val supervisor1 = system.actorOf(Props[ResumeTestSupervisor], name) val supervisor1 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor"))
supervisor1 ! Persistent("a") supervisor1 ! Persistent("a")
supervisor1 ! Persistent("b") supervisor1 ! Persistent("b")
supervisor1 ! GetState supervisor1 ! GetState
expectMsg(List("a-1", "b-2")) expectMsg(List("a-1", "b-2"))
stopAndAwaitTermination(supervisor1)
val supervisor2 = system.actorOf(Props[ResumeTestSupervisor], name) val supervisor2 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor"))
supervisor2 ! Persistent("c") supervisor2 ! Persistent("c")
supervisor2 ! "boom" supervisor2 ! "boom"
supervisor2 ! Persistent("d") supervisor2 ! Persistent("d")
supervisor2 ! GetState supervisor2 ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4")) expectMsg(List("a-1", "b-2", "c-3", "d-4"))
stopAndAwaitTermination(supervisor2)
val supervisor3 = system.actorOf(Props[ResumeTestSupervisor], name) val supervisor3 = system.actorOf(Props(classOf[ResumeTestSupervisor], "processor"))
supervisor3 ! GetState supervisor3 ! GetState
expectMsg(List("a-1", "b-2", "c-3", "d-4")) expectMsg(List("a-1", "b-2", "c-3", "d-4"))
} }
"be able to re-run restart recovery when it fails with last replayed message" in { "be able to re-run restart recovery when it fails with last replayed message" in {
val processor = system.actorOf(Props[LastReplayedMsgFailsTestProcessor], name) val processor = namedProcessor[LastReplayedMsgFailsTestProcessor]
processor ! Persistent("c") processor ! Persistent("c")
processor ! Persistent("boom") processor ! Persistent("boom")
processor ! Persistent("d") processor ! Persistent("d")
@ -269,13 +265,13 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
expectMsg(List("a-1", "b-2", "c-3", "d-5")) expectMsg(List("a-1", "b-2", "c-3", "d-5"))
} }
"be able to re-run initial recovery when it fails with a message that is not the last replayed message" in { "be able to re-run initial recovery when it fails with a message that is not the last replayed message" in {
val processor = system.actorOf(Props[AnyReplayedMsgFailsTestProcessor], name) val processor = namedProcessor[AnyReplayedMsgFailsTestProcessor]
processor ! Persistent("c") processor ! Persistent("c")
processor ! GetState processor ! GetState
expectMsg(List("b-2", "c-3")) expectMsg(List("b-2", "c-3"))
} }
"be able to re-run restart recovery when it fails with a message that is not the last replayed message" in { "be able to re-run restart recovery when it fails with a message that is not the last replayed message" in {
val processor = system.actorOf(Props[AnyReplayedMsgFailsTestProcessor], "other") // new processor, no initial replay val processor = system.actorOf(Props(classOf[AnyReplayedMsgFailsTestProcessor], "other")) // new processor, no initial replay
processor ! Persistent("b") processor ! Persistent("b")
processor ! Persistent("a") processor ! Persistent("a")
processor ! Persistent("c") processor ! Persistent("c")
@ -294,7 +290,7 @@ class ProcessorSpec extends AkkaSpec(ProcessorSpec.config) with PersistenceSpec
"A processor" can { "A processor" can {
"be a finite state machine" in { "be a finite state machine" in {
val processor = system.actorOf(Props[FsmTestProcessor], name) val processor = namedProcessor[FsmTestProcessor]
processor ! Persistent("a") processor ! Persistent("a")
processor ! Persistent("b") processor ! Persistent("b")
List(0, 1, 2, 3) foreach (expectMsg(_)) List(0, 1, 2, 3) foreach (expectMsg(_))

View file

@ -13,7 +13,7 @@ object ProcessorStashSpec {
case object GetState case object GetState
class StashingProcessor extends Processor { class StashingProcessor(name: String) extends NamedProcessor(name) {
var state: List[String] = Nil var state: List[String] = Nil
val behaviorA: Actor.Receive = { val behaviorA: Actor.Receive = {
@ -38,7 +38,7 @@ object ProcessorStashSpec {
} }
} }
class RecoveryFailureStashingProcessor extends StashingProcessor { class RecoveryFailureStashingProcessor(name: String) extends StashingProcessor(name) {
override def preRestartProcessor(reason: Throwable, message: Option[Any]) = { override def preRestartProcessor(reason: Throwable, message: Option[Any]) = {
message match { message match {
case Some(m: Persistent) if (recoveryRunning) delete(m) case Some(m: Persistent) if (recoveryRunning) delete(m)
@ -54,15 +54,14 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis
"A processor" must { "A processor" must {
"support user stash and unstash operations for persistent messages" in { "support user stash and unstash operations for persistent messages" in {
val p1 = system.actorOf(Props[StashingProcessor], name) val p1 = namedProcessor[StashingProcessor]
p1 ! Persistent("a") p1 ! Persistent("a")
p1 ! Persistent("b") p1 ! Persistent("b")
p1 ! Persistent("c") p1 ! Persistent("c")
p1 ! GetState p1 ! GetState
expectMsg(List("a-1", "c-3", "b-2")) expectMsg(List("a-1", "c-3", "b-2"))
stopAndAwaitTermination(p1)
val p2 = system.actorOf(Props[StashingProcessor], name) val p2 = namedProcessor[StashingProcessor]
p2 ! Persistent("a") p2 ! Persistent("a")
p2 ! Persistent("b") p2 ! Persistent("b")
p2 ! Persistent("c") p2 ! Persistent("c")
@ -70,16 +69,15 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis
expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5")) expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5"))
} }
"support user stash and unstash operations for persistent and transient messages" in { "support user stash and unstash operations for persistent and transient messages" in {
val p1 = system.actorOf(Props[StashingProcessor], name) val p1 = namedProcessor[StashingProcessor]
p1 ! Persistent("a") p1 ! Persistent("a")
p1 ! "x" p1 ! "x"
p1 ! Persistent("b") p1 ! Persistent("b")
p1 ! Persistent("c") p1 ! Persistent("c")
p1 ! GetState p1 ! GetState
expectMsg(List("a-1", "c-3", "x-0", "b-2")) expectMsg(List("a-1", "c-3", "x-0", "b-2"))
stopAndAwaitTermination(p1)
val p2 = system.actorOf(Props[StashingProcessor], name) val p2 = namedProcessor[StashingProcessor]
p2 ! Persistent("a") p2 ! Persistent("a")
p2 ! "x" p2 ! "x"
p2 ! Persistent("b") p2 ! Persistent("b")
@ -88,16 +86,15 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis
expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "x-0", "b-5")) expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "x-0", "b-5"))
} }
"support restarts between user stash and unstash operations" in { "support restarts between user stash and unstash operations" in {
val p1 = system.actorOf(Props[StashingProcessor], name) val p1 = namedProcessor[StashingProcessor]
p1 ! Persistent("a") p1 ! Persistent("a")
p1 ! Persistent("b") p1 ! Persistent("b")
p1 ! "boom" p1 ! "boom"
p1 ! Persistent("c") p1 ! Persistent("c")
p1 ! GetState p1 ! GetState
expectMsg(List("a-1", "c-3", "b-2")) expectMsg(List("a-1", "c-3", "b-2"))
stopAndAwaitTermination(p1)
val p2 = system.actorOf(Props[StashingProcessor], name) val p2 = namedProcessor[StashingProcessor]
p2 ! Persistent("a") p2 ! Persistent("a")
p2 ! Persistent("b") p2 ! Persistent("b")
p2 ! "boom" p2 ! "boom"
@ -106,16 +103,15 @@ class ProcessorStashSpec extends AkkaSpec(ProcessorStashSpec.config) with Persis
expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5")) expectMsg(List("a-1", "c-3", "b-2", "a-4", "c-6", "b-5"))
} }
"support multiple restarts between user stash and unstash operations" in { "support multiple restarts between user stash and unstash operations" in {
val p1 = system.actorOf(Props[RecoveryFailureStashingProcessor], name) val p1 = namedProcessor[RecoveryFailureStashingProcessor]
p1 ! Persistent("a") p1 ! Persistent("a")
p1 ! Persistent("b") p1 ! Persistent("b")
p1 ! Persistent("boom") p1 ! Persistent("boom")
p1 ! Persistent("c") p1 ! Persistent("c")
p1 ! GetState p1 ! GetState
expectMsg(List("a-1", "c-4", "b-2")) expectMsg(List("a-1", "c-4", "b-2"))
stopAndAwaitTermination(p1)
val p2 = system.actorOf(Props[RecoveryFailureStashingProcessor], name) val p2 = namedProcessor[RecoveryFailureStashingProcessor]
p2 ! Persistent("a") p2 ! Persistent("a")
p2 ! Persistent("b") p2 ! Persistent("b")
p2 ! Persistent("boom") p2 ! Persistent("boom")