Merge pull request #1392 from akka/wip-3290-RemoteWatcherSpec

AddressTerminated interference in RemoteWatcherSpec, see #3290
This commit is contained in:
Patrik Nordwall 2013-05-02 07:29:18 -07:00
commit 6b51b4d824

View file

@ -15,8 +15,6 @@ import akka.actor.RootActorPath
import akka.actor.Identify
import akka.actor.ActorIdentity
import akka.actor.PoisonPill
import akka.actor.AddressTerminated
import akka.actor.MinimalActorRef
import akka.actor.Address
object RemoteWatcherSpec {
@ -31,8 +29,6 @@ object RemoteWatcherSpec {
def receive = Actor.emptyBehavior
}
case class WrappedAddressTerminated(msg: AddressTerminated)
// turn off all periodic activity
val TurnOff = 5.minutes
@ -49,6 +45,7 @@ object RemoteWatcherSpec {
}
object TestRemoteWatcher {
case class AddressTerm(address: Address)
case class Quarantined(address: Address, uid: Int)
}
@ -60,6 +57,11 @@ object RemoteWatcherSpec {
def this() = this(heartbeatExpectedResponseAfter = TurnOff)
override def publishAddressTerminated(address: Address): Unit =
// don't publish the real AddressTerminated, but a testable message,
// that doesn't interfere with the real watch that is going on in the background
context.system.eventStream.publish(TestRemoteWatcher.AddressTerm(address))
override def quarantine(address: Address, uid: Int): Unit = {
// don't quarantine in remoting, but publish a testable message
context.system.eventStream.publish(TestRemoteWatcher.Quarantined(address, uid))
@ -72,7 +74,7 @@ object RemoteWatcherSpec {
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteWatcherSpec extends AkkaSpec(
"""akka {
# loglevel = DEBUG
loglevel = INFO
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.tcp {
hostname = localhost
@ -103,15 +105,6 @@ class RemoteWatcherSpec extends AkkaSpec(
expectMsgType[ActorIdentity].ref.get
}
// AddressTerminated is AutoReceiveMessage
def addressTerminatedSubscriber(fwTo: ActorRef) = new MinimalActorRef {
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match {
case msg: AddressTerminated fwTo.tell(WrappedAddressTerminated(msg), sender)
}
override val path = system / "testSubscriber" / fwTo.path.name
override def provider = throw new UnsupportedOperationException("UndefinedUidActorRef does not provide")
}
"A RemoteWatcher" must {
"have correct interaction when watching" in {
@ -245,7 +238,7 @@ class RemoteWatcherSpec extends AkkaSpec(
"generate AddressTerminated when missing heartbeats" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated])
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor4")
@ -266,7 +259,7 @@ class RemoteWatcherSpec extends AkkaSpec(
awaitAssert {
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address)))
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
}
}
@ -278,7 +271,7 @@ class RemoteWatcherSpec extends AkkaSpec(
"generate AddressTerminated when missing first heartbeat" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated])
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val fd = createFailureDetector()
@ -299,7 +292,7 @@ class RemoteWatcherSpec extends AkkaSpec(
awaitAssert {
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address)))
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
// no quarantine when missing first heartbeat, uid unknown
q.expectNoMsg(1 second)
}
@ -318,7 +311,7 @@ class RemoteWatcherSpec extends AkkaSpec(
"generate AddressTerminated for new watch after broken connection that was re-established and broken again" in {
val p = TestProbe()
val q = TestProbe()
system.eventStream.subscribe(addressTerminatedSubscriber(p.ref), classOf[AddressTerminated])
system.eventStream.subscribe(p.ref, classOf[TestRemoteWatcher.AddressTerm])
system.eventStream.subscribe(q.ref, classOf[TestRemoteWatcher.Quarantined])
val monitorA = system.actorOf(Props[TestRemoteWatcher], "monitor6")
@ -339,15 +332,23 @@ class RemoteWatcherSpec extends AkkaSpec(
awaitAssert {
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address)))
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(b.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
}
}
// real AddressTerminated would trigger Terminated for b6, simulate that here
remoteSystem.stop(b)
awaitAssert {
monitorA ! Stats
expectMsg(Stats.empty)
}
expectNoMsg(2 seconds)
// assume that connection comes up again, or remote system is restarted
val c = createRemoteActor(Props[MyActor], "c6")
monitorA ! WatchRemote(b, a)
monitorA ! WatchRemote(c, a)
monitorA ! HeartbeatTick
expectMsg(HeartbeatRequest)
@ -368,8 +369,8 @@ class RemoteWatcherSpec extends AkkaSpec(
awaitAssert {
monitorA ! HeartbeatTick
monitorA ! ReapUnreachableTick
p.expectMsg(1 second, WrappedAddressTerminated(AddressTerminated(b.path.address)))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(b.path.address, remoteAddressUid))
p.expectMsg(1 second, TestRemoteWatcher.AddressTerm(c.path.address))
q.expectMsg(1 second, TestRemoteWatcher.Quarantined(c.path.address, remoteAddressUid))
}
}