diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala new file mode 100644 index 0000000000..45f7eabfed --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala @@ -0,0 +1,81 @@ +package akka.remote + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.testkit._ +import akka.remote.AddressUidExtension +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.remote.testconductor.RoleName + +object PiercingShouldKeepQuarantineSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + #akka.loglevel = INFO + #akka.remote.log-remote-lifecycle-events = INFO + akka.remote.retry-gate-closed-for = 0.5s + """))) + + class Subject extends Actor { + def receive = { + case "getuid" ⇒ sender() ! AddressUidExtension(context.system).addressUid + } + } + +} + +class PiercingShouldKeepQuarantineSpecMultiJvmNode1 extends PiercingShouldKeepQuarantineSpec +class PiercingShouldKeepQuarantineSpecMultiJvmNode2 extends PiercingShouldKeepQuarantineSpec + +abstract class PiercingShouldKeepQuarantineSpec extends MultiNodeSpec(PiercingShouldKeepQuarantineSpec) + with STMultiNodeSpec + with ImplicitSender { + + import PiercingShouldKeepQuarantineSpec._ + + override def initialParticipants = roles.size + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(1) + expectMsgType[ActorIdentity].ref.get + } + + "While probing through the quarantine remoting" must { + + "not lose existing quarantine marker" taggedAs LongRunningTest in { + runOn(first) { + enterBarrier("actors-started") + + // Communicate with second system + system.actorSelection(node(second) / "user" / "subject") ! "getuid" + val uid = expectMsgType[Int](10.seconds) + enterBarrier("actor-identified") + + // Manually Quarantine the other system + RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + + // Quarantine is up -- Should not be able to communicate with remote system any more + for (_ ← 1 to 4) { + system.actorSelection(node(second) / "user" / "subject") ! "getuid" + expectNoMsg(2.seconds) + } + + enterBarrier("quarantine-intact") + + } + + runOn(second) { + system.actorOf(Props[Subject], "subject") + enterBarrier("actors-started") + enterBarrier("actor-identified") + enterBarrier("quarantine-intact") + } + + } + + } +} diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 56734befa2..8b867e82e9 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -332,8 +332,18 @@ private[remote] object EndpointManager { def isReadOnly(endpoint: ActorRef): Boolean = readonlyToAddress contains endpoint def isQuarantined(address: Address, uid: Int): Boolean = writableEndpointWithPolicyFor(address) match { - case Some(Quarantined(`uid`, timeOfRelease)) ⇒ timeOfRelease.hasTimeLeft() - case _ ⇒ false + // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the + // known fact that it is quarantined. + case Some(Quarantined(`uid`, _)) ⇒ true + case _ ⇒ false + } + + def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match { + // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the + // known fact that it is quarantined. + case Some(Quarantined(uid, _)) ⇒ Some(uid) + case Some(Pass(_, uidOption)) ⇒ uidOption + case _ ⇒ None } /** @@ -397,21 +407,33 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends var pendingReadHandoffs = Map[ActorRef, AkkaProtocolHandle]() var stashedInbound = Map[ActorRef, Vector[InboundAssociation]]() + def keepQuarantinedOr(remoteAddress: Address)(body: ⇒ Unit): Unit = endpoints.refuseUid(remoteAddress) match { + case Some(uid) ⇒ + log.info("Quarantined address [{}] is still unreachable or has not been restarted. Keeping it quarantined.", remoteAddress) + // Restoring Quarantine marker overwritten by a Pass(endpoint, refuseUid) pair while probing remote system. + endpoints.markAsQuarantined(remoteAddress, uid, Deadline.now + settings.QuarantineDuration) + case None ⇒ body + } + override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { case e @ InvalidAssociation(localAddress, remoteAddress, reason) ⇒ - log.warning("Tried to associate with unreachable remote address [{}]. " + - "Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}", - remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage) - endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) + keepQuarantinedOr(remoteAddress) { + log.warning("Tried to associate with unreachable remote address [{}]. " + + "Address is now gated for {} ms, all messages to this address will be delivered to dead letters. Reason: {}", + remoteAddress, settings.RetryGateClosedFor.toMillis, reason.getMessage) + endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) + } AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop case ShutDownAssociation(localAddress, remoteAddress, _) ⇒ - log.debug("Remote system with address [{}] has shut down. " + - "Address is now gated for {} ms, all messages to this address will be delivered to dead letters.", - remoteAddress, settings.RetryGateClosedFor.toMillis) - endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) + keepQuarantinedOr(remoteAddress) { + log.debug("Remote system with address [{}] has shut down. " + + "Address is now gated for {} ms, all messages to this address will be delivered to dead letters.", + remoteAddress, settings.RetryGateClosedFor.toMillis) + endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) + } AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop @@ -426,10 +448,12 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Stop case HopelessAssociation(localAddress, remoteAddress, None, _) ⇒ - log.warning("Association to [{}] with unknown UID is irrecoverably failed. " + - "Address cannot be quarantined without knowing the UID, gating instead for {} ms.", - remoteAddress, settings.RetryGateClosedFor.toMillis) - endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) + keepQuarantinedOr(remoteAddress) { + log.warning("Association to [{}] with unknown UID is irrecoverably failed. " + + "Address cannot be quarantined without knowing the UID, gating instead for {} ms.", + remoteAddress, settings.RetryGateClosedFor.toMillis) + endpoints.markAsFailed(sender(), Deadline.now + settings.RetryGateClosedFor) + } AddressTerminatedTopic(context.system).publish(AddressTerminated(remoteAddress)) Stop @@ -514,7 +538,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef = endpoints.registerWritableEndpoint( recipientAddress, - None, + refuseUid, createEndpoint( recipientAddress, recipientRef.localAddressToUse,