diff --git a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala index f6ed598f11..5776517b9d 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ChannelSpec.scala @@ -13,12 +13,6 @@ import akka.actor._ import akka.testkit._ object ChannelSpec { - class TestDestination extends Actor { - def receive = { - case m: ConfirmablePersistent ⇒ sender ! m - } - } - class TestDestinationProcessor(name: String) extends NamedProcessor(name) { def receive = { case cp @ ConfirmablePersistent("a", _, _) ⇒ cp.confirm() @@ -84,15 +78,16 @@ abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with Persist "A channel" must { "must resolve destination references and preserve message order" in { val empty = actorRefFor("testDestination") // will be an EmptyLocalActorRef - val destination = system.actorOf(Props(classOf[TestReceiver], testActor), "testDestination") + val probe = TestProbe() + val destination = system.actorOf(Props(classOf[TestReceiver], probe.ref), "testDestination") defaultTestChannel ! Deliver(PersistentRepr("a"), empty.path) defaultTestChannel ! Deliver(Persistent("b"), destination.path) defaultTestChannel ! Deliver(Persistent("c"), destination.path) - expectMsg("a") - expectMsg("b") - expectMsg("c") + probe.expectMsg("a") + probe.expectMsg("b") + probe.expectMsg("c") } "support processors as destination" in { val destination = system.actorOf(Props(classOf[TestDestinationProcessor], name)) @@ -118,14 +113,14 @@ abstract class ChannelSpec(config: Config) extends AkkaSpec(config) with Persist awaitConfirmation(confirmProbe) } "accept confirmable persistent messages for delivery" in { - val destination = system.actorOf(Props[TestDestination]) val confirmProbe = TestProbe() + val destinationProbe = TestProbe() subscribeToConfirmation(confirmProbe) - defaultTestChannel ! Deliver(PersistentRepr("a", confirmable = true), destination.path) + defaultTestChannel ! Deliver(PersistentRepr("a", confirmable = true), destinationProbe.ref.path) - expectMsgPF() { case m @ ConfirmablePersistent("a", _, _) ⇒ m.confirm() } + destinationProbe.expectMsgPF() { case m @ ConfirmablePersistent("a", _, _) ⇒ m.confirm() } awaitConfirmation(confirmProbe) } "redeliver on missing confirmation" in { diff --git a/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala index 9f7ef036dc..4307b7f5fa 100644 --- a/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/PersistentChannelSpec.scala @@ -37,10 +37,10 @@ abstract class PersistentChannelSpec(config: Config) extends ChannelSpec(config) import PersistentChannelSpec._ private def redeliverChannelSettings(listener: Option[ActorRef]): PersistentChannelSettings = - PersistentChannelSettings(redeliverMax = 2, redeliverInterval = 100 milliseconds, redeliverFailureListener = listener) + PersistentChannelSettings(redeliverMax = 2, redeliverInterval = 100 milliseconds, redeliverFailureListener = listener, idleTimeout = 5.seconds) private def createDefaultTestChannel(name: String): ActorRef = - system.actorOf(PersistentChannel.props(s"${name}-default", PersistentChannelSettings())) + system.actorOf(PersistentChannel.props(s"${name}-default", PersistentChannelSettings(idleTimeout = 5.seconds))) override def createDefaultTestChannel(): ActorRef = createDefaultTestChannel(name)