diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 17bc4dd7d2..013ec0ba9a 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -33,6 +33,40 @@ object RemoteWatcherSpec { case class WrappedAddressTerminated(msg: AddressTerminated) + // turn off all periodic activity + val TurnOff = 5.minutes + + def createFailureDetector(): FailureDetectorRegistry[Address] = { + def createFailureDetector(): FailureDetector = + new PhiAccrualFailureDetector( + threshold = 8.0, + maxSampleSize = 200, + minStdDeviation = 100.millis, + acceptableHeartbeatPause = 3.seconds, + firstHeartbeatEstimate = 1.second) + + new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector()) + } + + object TestRemoteWatcher { + case class Quarantined(address: Address, uid: Int) + } + + class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector, + heartbeatInterval = TurnOff, + unreachableReaperInterval = TurnOff, + heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter, + numberOfEndHeartbeatRequests = 3) { + + def this() = this(heartbeatExpectedResponseAfter = TurnOff) + + override def quarantine(address: Address, uid: Int): Unit = { + // don't quarantine in remoting, but publish a testable message + context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid)) + } + + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -40,8 +74,6 @@ class RemoteWatcherSpec extends AkkaSpec( """akka { # loglevel = DEBUG actor.provider = "akka.remote.RemoteActorRefProvider" - # don't use quarantine for this "unit test" - remote.quarantine-systems-for = off remote.netty.tcp { hostname = localhost port = 0 @@ -55,12 +87,13 @@ class RemoteWatcherSpec extends AkkaSpec( val remoteSystem = ActorSystem("RemoteSystem", system.settings.config) val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid override def afterTermination() { remoteSystem.shutdown() } - val heartbeatMsgB = Heartbeat(AddressUidExtension(remoteSystem).addressUid) + val heartbeatMsgB = Heartbeat(remoteAddressUid) def createRemoteActor(props: Props, name: String): ActorRef = { remoteSystem.actorOf(props, name) @@ -68,31 +101,6 @@ class RemoteWatcherSpec extends AkkaSpec( expectMsgType[ActorIdentity].ref.get } - // turn off all periodic activity - val TurnOff = 5.minutes - - def nonScheduledRemoteWatcherProps(): Props = { - val fd = createFailureDetector() - RemoteWatcher.props( - fd, - heartbeatInterval = TurnOff, - unreachableReaperInterval = TurnOff, - heartbeatExpectedResponseAfter = TurnOff, - numberOfEndHeartbeatRequests = 3) - } - - def createFailureDetector(): FailureDetectorRegistry[Address] = { - def createFailureDetector(): FailureDetector = - new PhiAccrualFailureDetector( - threshold = 8.0, - maxSampleSize = 200, - minStdDeviation = 100.millis, - acceptableHeartbeatPause = 3.seconds, - firstHeartbeatEstimate = 1.second) - - new DefaultFailureDetectorRegistry(() ⇒ createFailureDetector()) - } - // AddressTerminated is AutoReceiveMessage def addressTerminatedSubscriber(fwTo: ActorRef) = new MinimalActorRef { override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match { @@ -107,7 +115,7 @@ class RemoteWatcherSpec extends AkkaSpec( "have correct interaction when watching" in { val fd = createFailureDetector() - val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor1") + val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor1") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1") val a1 = system.actorOf(Props[MyActor], "a1") @@ -174,7 +182,7 @@ class RemoteWatcherSpec extends AkkaSpec( "have correct interaction when beeing watched" in { val monitorA = system.actorOf(Props(classOf[TestActorProxy], testActor), "monitor2") - val monitorB = createRemoteActor(nonScheduledRemoteWatcherProps(), "monitor2") + val monitorB = createRemoteActor(Props[TestRemoteWatcher], "monitor2") val b3 = createRemoteActor(Props[MyActor], "b3") @@ -224,9 +232,11 @@ class RemoteWatcherSpec extends AkkaSpec( "generate AddressTerminated when missing heartbeats" in { val p = TestProbe() + val q = TestProbe() system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined]) - val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor4") + val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor4") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor4") val a = system.actorOf(Props[MyActor], "a4") @@ -245,6 +255,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) } } @@ -254,15 +265,13 @@ class RemoteWatcherSpec extends AkkaSpec( "generate AddressTerminated when missing first heartbeat" in { val p = TestProbe() + val q = TestProbe() system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined]) val fd = createFailureDetector() - val monitorA = system.actorOf(RemoteWatcher.props( - fd, - heartbeatInterval = TurnOff, - unreachableReaperInterval = TurnOff, - heartbeatExpectedResponseAfter = 2.seconds, - numberOfEndHeartbeatRequests = 3), "monitor5") + val heartbeatExpectedResponseAfter = 2.seconds + val monitorA = system.actorOf(Props(classOf[TestRemoteWatcher], heartbeatExpectedResponseAfter), "monitor5") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5") val a = system.actorOf(Props[MyActor], "a5") @@ -279,6 +288,8 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + // no quarantine when missing first heartbeat, uid unknown + q.expectNoMsg(1 second) } } @@ -294,9 +305,11 @@ class RemoteWatcherSpec extends AkkaSpec( "generate AddressTerminated for new watch after broken connection that was re-established and broken again" in { val p = TestProbe() + val q = TestProbe() system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated]) + system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined]) - val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor6") + val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor6") val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor6") val a = system.actorOf(Props[MyActor], "a6") @@ -315,6 +328,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) } } @@ -335,6 +349,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick p.expectNoMsg(1 second) + q.expectNoMsg(1 second) // then stop heartbeating again, should generate new AddressTerminated within(10 seconds) { @@ -342,6 +357,7 @@ class RemoteWatcherSpec extends AkkaSpec( monitorA ! HeartbeatTick monitorA ! ReapUnreachableTick p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address))) + q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid)) } }