From 5fd19552553a205500434985b943b6e9082ff673 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 30 Apr 2013 14:57:33 +0200 Subject: [PATCH] AddressTerminated interference in RemoteWatcherSpec, see #3290 * The problem was that published AddressTerminated caused real notification of "Other side terminated" --- .../scala/akka/remote/RemoteWatcherSpec.scala | 47 ++++++++++--------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index d6527e0026..2c52df1c45 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -15,8 +15,6 @@ import akka.actor.RootActorPath import akka.actor.Identify import akka.actor.ActorIdentity import akka.actor.PoisonPill -import akka.actor.AddressTerminated -import akka.actor.MinimalActorRef import akka.actor.Address object RemoteWatcherSpec { @@ -31,8 +29,6 @@ object RemoteWatcherSpec { def receive = Actor.emptyBehavior } - case class WrappedAddressTerminated(msg: AddressTerminated) - // turn off all periodic activity val TurnOff = 5.minutes @@ -49,6 +45,7 @@ object RemoteWatcherSpec { } object TestRemoteWatcher { + case class AddressTerm(address: Address) case class Quarantined(address: Address, uid: Int) } @@ -60,6 +57,11 @@ object RemoteWatcherSpec { def this() = this(heartbeatExpectedResponseAfter = TurnOff) + override def publishAddressTerminated(address: Address): Unit = + // don't publish the real AddressTerminated, but a testable message, + // that doesn't interfere with the real watch that is going on in the background + context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address)) + override def quarantine(address: Address, uid: Int): Unit = { // don't quarantine in remoting, but publish a testable message context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) @@ -72,7 +74,7 @@ object RemoteWatcherSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteWatcherSpec extends AkkaSpec( """akka { - # loglevel = DEBUG + loglevel = INFO actor.provider = "akka.remote.RemoteActorRefProvider" remote.netty.tcp { hostname = localhost @@ -103,15 +105,6 @@ class RemoteWatcherSpec extends AkkaSpec( expectMsgType[ActorIdentity].ref.get } - // AddressTerminated is AutoReceiveMessage - def addressTerminatedSubscriber(fwTo: ActorRef) = new MinimalActorRef { - override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match { - case msg: AddressTerminated ⇒ fwTo.tell(WrappedAddressTerminated(msg), sender) - } - override val path = system / "testSubscriber" / fwTo.path.name - override def provider = throw new UnsupportedOperationException("UndefinedUidActorRef does not provide") - } - "A RemoteWatcher" must { "have correct interaction when watching" in { @@ -245,7 +238,7 @@ class RemoteWatcherSpec extends AkkaSpec( "generate AddressTerminated when missing heartbeats" in { val p = TestProbe() val q = TestProbe() - system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined]) val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor4") @@ -266,7 +259,7 @@ class RemoteWatcherSpec extends AkkaSpec( awaitAssert { monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick - p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) } } @@ -278,7 +271,7 @@ class RemoteWatcherSpec extends AkkaSpec( "generate AddressTerminated when missing first heartbeat" in { val p = TestProbe() val q = TestProbe() - system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined]) val fd = createFailureDetector() @@ -299,7 +292,7 @@ class RemoteWatcherSpec extends AkkaSpec( awaitAssert { monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick - p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) // no quarantine when missing first heartbeat, uid unknown q.expectNoMsg(1 second) } @@ -318,7 +311,7 @@ class RemoteWatcherSpec extends AkkaSpec( "generate AddressTerminated for new watch after broken connection that was re-established and broken again" in { val p = TestProbe() val q = TestProbe() - system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm]) system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined]) val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor6") @@ -339,15 +332,23 @@ class RemoteWatcherSpec extends AkkaSpec( awaitAssert { monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick - p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address)) q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) } } + // real AddressTerminated would trigger Terminated for b6, simulate that here + remoteSystem.stop(b) + awaitAssert { + monitorA ! Stats + expectMsg(Stats.empty) + } + expectNoMsg(2 seconds) + // assume that connection comes up again, or remote system is restarted val c = createRemoteActor(Props[MyActor], "c6") - monitorA ! WatchRemote(b, a) + monitorA ! WatchRemote(c, a) monitorA ! HeartbeatTick expectMsg(HeartbeatRequest) @@ -368,8 +369,8 @@ class RemoteWatcherSpec extends AkkaSpec( awaitAssert { monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick - p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) - q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) + p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address)) + q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, remoteAddressUid)) } }