RemoteWatcherSpec interference with real quarantining, see #3257

This commit is contained in:
Patrik Nordwall 2013-04-23 08:38:45 +02:00
parent 7094daf313
commit f5af7f861a

View file

@ -33,6 +33,40 @@ object RemoteWatcherSpec {
case class WrappedAddressTerminated(msg: AddressTerminated)
// turn off all periodic activity
val TurnOff = 5.minutes
def createFailureDetector(): FailureDetectorRegistry[Address] = {
def createFailureDetector(): FailureDetector =
new PhiAccrualFailureDetector(
threshold = 8.0,
maxSampleSize = 200,
minStdDeviation = 100.millis,
acceptableHeartbeatPause = 3.seconds,
firstHeartbeatEstimate = 1.second)
new DefaultFailureDetectorRegistry(() createFailureDetector())
}
object TestRemoteWatcher {
case class Quarantined(address: Address, uid: Int)
}
class TestRemoteWatcher(heartbeatExpectedResponseAfter: FiniteDuration) extends RemoteWatcher(createFailureDetector,
heartbeatInterval = TurnOff,
unreachableReaperInterval = TurnOff,
heartbeatExpectedResponseAfter = heartbeatExpectedResponseAfter,
numberOfEndHeartbeatRequests = 3) {
def this() = this(heartbeatExpectedResponseAfter = TurnOff)
override def quarantine(address: Address, uid: Int): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -40,8 +74,6 @@ class RemoteWatcherSpec extends AkkaSpec(
"""akka {
# loglevel = DEBUG
actor.provider = "akka.remote.RemoteActorRefProvider"
# don't use quarantine for this "unit test"
remote.quarantine-systems-for = off
remote.netty.tcp {
hostname = localhost
port = 0
@ -55,12 +87,13 @@ class RemoteWatcherSpec extends AkkaSpec(
val remoteSystem = ActorSystem("RemoteSystem", system.settings.config)
val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid
override def afterTermination() {
remoteSystem.shutdown()
}
val heartbeatMsgB = Heartbeat(AddressUidExtension(remoteSystem).addressUid)
val heartbeatMsgB = Heartbeat(remoteAddressUid)
def createRemoteActor(props: Props, name: String): ActorRef = {
remoteSystem.actorOf(props, name)
@ -68,31 +101,6 @@ class RemoteWatcherSpec extends AkkaSpec(
expectMsgType[ActorIdentity].ref.get
}
// turn off all periodic activity
val TurnOff = 5.minutes
def nonScheduledRemoteWatcherProps(): Props = {
val fd = createFailureDetector()
RemoteWatcher.props(
fd,
heartbeatInterval = TurnOff,
unreachableReaperInterval = TurnOff,
heartbeatExpectedResponseAfter = TurnOff,
numberOfEndHeartbeatRequests = 3)
}
def createFailureDetector(): FailureDetectorRegistry[Address] = {
def createFailureDetector(): FailureDetector =
new PhiAccrualFailureDetector(
threshold = 8.0,
maxSampleSize = 200,
minStdDeviation = 100.millis,
acceptableHeartbeatPause = 3.seconds,
firstHeartbeatEstimate = 1.second)
new DefaultFailureDetectorRegistry(() createFailureDetector())
}
// AddressTerminated is AutoReceiveMessage
def addressTerminatedSubscriber(fwTo: ActorRef) = new MinimalActorRef {
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match {
@ -107,7 +115,7 @@ class RemoteWatcherSpec extends AkkaSpec(
"have correct interaction when watching" in {
val fd = createFailureDetector()
val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor1")
val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor1")
val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor1")
val a1 = system.actorOf(Props[MyActor], "a1")
@ -174,7 +182,7 @@ class RemoteWatcherSpec extends AkkaSpec(
"have correct interaction when beeing watched" in {
val monitorA = system.actorOf(Props(classOf[TestActorProxy], testActor), "monitor2")
val monitorB = createRemoteActor(nonScheduledRemoteWatcherProps(), "monitor2")
val monitorB = createRemoteActor(Props[TestRemoteWatcher], "monitor2")
val b3 = createRemoteActor(Props[MyActor], "b3")
@ -224,9 +232,11 @@ class RemoteWatcherSpec extends AkkaSpec(
"generate AddressTerminated when missing heartbeats" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor4")
val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor4")
val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor4")
val a = system.actorOf(Props[MyActor], "a4")
@ -245,6 +255,7 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address)))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
}
}
@ -254,15 +265,13 @@ class RemoteWatcherSpec extends AkkaSpec(
"generate AddressTerminated when missing first heartbeat" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val fd = createFailureDetector()
val monitorA = system.actorOf(RemoteWatcher.props(
fd,
heartbeatInterval = TurnOff,
unreachableReaperInterval = TurnOff,
heartbeatExpectedResponseAfter = 2.seconds,
numberOfEndHeartbeatRequests = 3), "monitor5")
val heartbeatExpectedResponseAfter = 2.seconds
val monitorA = system.actorOf(Props(classOf[TestRemoteWatcher], heartbeatExpectedResponseAfter), "monitor5")
val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor5")
val a = system.actorOf(Props[MyActor], "a5")
@ -279,6 +288,8 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address)))
// no quarantine when missing first heartbeat, uid unknown
q.expectNoMsg(1 second)
}
}
@ -294,9 +305,11 @@ class RemoteWatcherSpec extends AkkaSpec(
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val monitorA = system.actorOf(nonScheduledRemoteWatcherProps(), "monitor6")
val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor6")
val monitorB = createRemoteActor(Props(classOf[TestActorProxy], testActor), "monitor6")
val a = system.actorOf(Props[MyActor], "a6")
@ -315,6 +328,7 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address)))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
}
}
@ -335,6 +349,7 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectNoMsg(1 second)
q.expectNoMsg(1 second)
// then stop heartbeating again, should generate new AddressTerminated
within(10 seconds) {
@ -342,6 +357,7 @@ class RemoteWatcherSpec extends AkkaSpec(
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address)))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
}
}