diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala index 45f7eabfed..2da78d76cc 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala @@ -58,6 +58,9 @@ abstract class PiercingShouldKeepQuarantineSpec extends MultiNodeSpec(PiercingSh // Manually Quarantine the other system RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + // Quarantining is not immediate + Thread.sleep(1000) + // Quarantine is up -- Should not be able to communicate with remote system any more for (_ ← 1 to 4) { system.actorSelection(node(second) / "user" / "subject") ! "getuid" diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/Ticket15109Spec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/Ticket15109Spec.scala new file mode 100644 index 0000000000..6b3130e04b --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/Ticket15109Spec.scala @@ -0,0 +1,109 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.remote + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.transport.ThrottlerTransportAdapter.ForceDisassociateExplicitly +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.ActorIdentity +import akka.remote.testconductor.RoleName +import akka.actor.Identify +import scala.concurrent.Await +import akka.remote.transport.Transport.InvalidAssociationException +import akka.remote.transport.AssociationHandle + +object Ticket15109Spec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = INFO + ## Keep it tight, otherwise reestablishing a connection takes too much time + akka.remote.transport-failure-detector.heartbeat-interval = 1 s + akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s + akka.remote.quarantine-systems-for = 1 d + akka.remote.retry-gate-closed-for = 0.5 s + """))) + + testTransport(on = true) + + class Subject extends Actor { + def receive = { + case "ping" ⇒ sender() ! "pong" + } + } + +} + +class Ticket15109SpecMultiJvmNode1 extends Ticket15109Spec +class Ticket15109SpecMultiJvmNode2 extends Ticket15109Spec + +abstract class Ticket15109Spec extends MultiNodeSpec(Ticket15109Spec) + with STMultiNodeSpec + with ImplicitSender { + + import Ticket15109Spec._ + + override def initialParticipants = roles.size + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(0) + expectMsgType[ActorIdentity](5.seconds).getRef + } + + def ping(ref: ActorRef) = { + within(30.seconds) { + awaitAssert { + ref ! "ping" + expectMsg(1.second, "pong") + } + } + } + + "Quarantining" must { + + "not be introduced during normal errors (regression #15109)" taggedAs LongRunningTest in { + var subject: ActorRef = system.deadLetters + + runOn(second) { + system.actorOf(Props[Subject], "subject") + } + + enterBarrier("actors-started") + + runOn(first) { + // Acquire ActorRef from first system + subject = identify(second, "subject") + } + + enterBarrier("actor-identified") + + runOn(second) { + // Force a dissassociation. Using the message Shutdown, which is suboptimal here, but this is the only + // DisassoicateInfo that triggers the code-path we want to test + Await.result(RARP(system).provider.transport.managementCommand( + ForceDisassociateExplicitly(node(first).address, AssociationHandle.Shutdown)), 3.seconds) + } + + enterBarrier("disassociated") + + runOn(first) { + ping(subject) + } + + enterBarrier("done") + + } + + } +} diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 7b5875b5d0..e96122900d 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -164,7 +164,7 @@ private[remote] class OversizedPayloadException(msg: String) extends EndpointExc private[remote] object ReliableDeliverySupervisor { case object Ungate case object AttemptSysMsgRedelivery - final case class GotUid(uid: Int) + final case class GotUid(uid: Int, remoteAddres: Address) def props( handleOrActive: Option[AkkaProtocolHandle], @@ -309,7 +309,7 @@ private[remote] class ReliableDeliverySupervisor( if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery) context.become(idle) - case g @ GotUid(receivedUid) ⇒ + case g @ GotUid(receivedUid, _) ⇒ context.parent ! g // New system that has the same address as the old - need to start from fresh state uidConfirmed = true @@ -574,7 +574,7 @@ private[remote] class EndpointWriter( publishAndThrow(new EndpointAssociationException(s"Association failed with [$remoteAddress]", e), Logging.DebugLevel) case Handle(inboundHandle) ⇒ // Assert handle == None? - context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid) + context.parent ! ReliableDeliverySupervisor.GotUid(inboundHandle.handshakeInfo.uid, remoteAddress) handle = Some(inboundHandle) reader = startReadEndpoint(inboundHandle) eventPublisher.notifyListeners(AssociatedEvent(localAddress, remoteAddress, inbound)) diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index bbd892cfda..6bd0315fe3 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -273,7 +273,7 @@ private[remote] object EndpointManager { */ def isTombstone: Boolean } - final case class Pass(endpoint: ActorRef, uid: Option[Int]) extends EndpointPolicy { + final case class Pass(endpoint: ActorRef, uid: Option[Int], refuseUid: Option[Int]) extends EndpointPolicy { override def isTombstone: Boolean = false } final case class Gated(timeOfRelease: Deadline) extends EndpointPolicy { @@ -290,20 +290,20 @@ private[remote] object EndpointManager { private var addressToReadonly = HashMap[Address, ActorRef]() private var readonlyToAddress = HashMap[ActorRef, Address]() - def registerWritableEndpoint(address: Address, uid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { - case Some(Pass(e, _)) ⇒ - throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]") - case _ ⇒ - addressToWritable += address -> Pass(endpoint, uid) - writableToAddress += endpoint -> address - endpoint - } - - def registerWritableEndpointUid(writer: ActorRef, uid: Int): Unit = { - val address = writableToAddress(writer) + def registerWritableEndpoint(address: Address, uid: Option[Int], refuseUid: Option[Int], endpoint: ActorRef): ActorRef = addressToWritable.get(address) match { - case Some(Pass(ep, _)) ⇒ addressToWritable += address -> Pass(ep, Some(uid)) - case other ⇒ // the GotUid might have lost the race with some failure + case Some(Pass(e, _, _)) ⇒ + throw new IllegalArgumentException(s"Attempting to overwrite existing endpoint [$e] with [$endpoint]") + case _ ⇒ + addressToWritable += address -> Pass(endpoint, uid, refuseUid) + writableToAddress += endpoint -> address + endpoint + } + + def registerWritableEndpointUid(remoteAddress: Address, uid: Int): Unit = { + addressToWritable.get(remoteAddress) match { + case Some(Pass(ep, _, refuseUid)) ⇒ addressToWritable += remoteAddress -> Pass(ep, Some(uid), refuseUid) + case other ⇒ // the GotUid might have lost the race with some failure } } @@ -329,8 +329,8 @@ private[remote] object EndpointManager { def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address) def hasWritableEndpointFor(address: Address): Boolean = writableEndpointWithPolicyFor(address) match { - case Some(Pass(_, _)) ⇒ true - case _ ⇒ false + case Some(Pass(_, _, _)) ⇒ true + case _ ⇒ false } def readOnlyEndpointFor(address: Address): Option[ActorRef] = addressToReadonly.get(address) @@ -349,9 +349,9 @@ private[remote] object EndpointManager { def refuseUid(address: Address): Option[Int] = writableEndpointWithPolicyFor(address) match { // timeOfRelease is only used for garbage collection. If an address is still probed, we should report the // known fact that it is quarantined. - case Some(Quarantined(uid, _)) ⇒ Some(uid) - case Some(Pass(_, uidOption)) ⇒ uidOption - case _ ⇒ None + case Some(Quarantined(uid, _)) ⇒ Some(uid) + case Some(Pass(_, _, refuseUid)) ⇒ refuseUid + case _ ⇒ None } /** @@ -526,7 +526,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Quarantine(address, uidOption) ⇒ // Stop writers endpoints.writableEndpointWithPolicyFor(address) match { - case Some(Pass(endpoint, _)) ⇒ + case Some(Pass(endpoint, _, _)) ⇒ context.stop(endpoint) if (uidOption.isEmpty) { log.warning("Association to [{}] with unknown UID is reported as quarantined, but " + @@ -552,6 +552,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends def createAndRegisterWritingEndpoint(refuseUid: Option[Int]): ActorRef = endpoints.registerWritableEndpoint( recipientAddress, + uid = None, refuseUid, createEndpoint( recipientAddress, @@ -563,7 +564,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends refuseUid)) endpoints.writableEndpointWithPolicyFor(recipientAddress) match { - case Some(Pass(endpoint, _)) ⇒ + case Some(Pass(endpoint, _, _)) ⇒ endpoint ! s case Some(Gated(timeOfRelease)) ⇒ if (timeOfRelease.isOverdue()) createAndRegisterWritingEndpoint(refuseUid = None) ! s @@ -587,8 +588,8 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends handleStashedInbound(endpoint) case EndpointWriter.TookOver(endpoint, handle) ⇒ removePendingReader(takingOverFrom = endpoint, withHandle = handle) - case ReliableDeliverySupervisor.GotUid(uid) ⇒ - endpoints.registerWritableEndpointUid(sender, uid) + case ReliableDeliverySupervisor.GotUid(uid, remoteAddress) ⇒ + endpoints.registerWritableEndpointUid(remoteAddress, uid) handleStashedInbound(sender) case Prune ⇒ endpoints.prune() @@ -630,9 +631,9 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends if (endpoints.isQuarantined(handle.remoteAddress, handle.handshakeInfo.uid)) handle.disassociate(AssociationHandle.Quarantined) else endpoints.writableEndpointWithPolicyFor(handle.remoteAddress) match { - case Some(Pass(ep, None)) ⇒ + case Some(Pass(ep, None, _)) ⇒ stashedInbound += ep -> (stashedInbound.getOrElse(ep, Vector.empty) :+ ia) - case Some(Pass(ep, Some(uid))) ⇒ + case Some(Pass(ep, Some(uid), _)) ⇒ if (handle.handshakeInfo.uid == uid) { pendingReadHandoffs.get(ep) foreach (_.disassociate()) pendingReadHandoffs += ep -> handle @@ -641,10 +642,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends context.stop(ep) endpoints.unregisterEndpoint(ep) pendingReadHandoffs -= ep - createAndRegisterEndpoint(handle, Some(uid)) + createAndRegisterEndpoint(handle, refuseUid = Some(uid)) } case state ⇒ - createAndRegisterEndpoint(handle, None) + createAndRegisterEndpoint(handle, refuseUid = endpoints.refuseUid(handle.remoteAddress)) } } } @@ -661,7 +662,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends writing, refuseUid = refuseUid) if (writing) - endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), endpoint) + endpoints.registerWritableEndpoint(handle.remoteAddress, Some(handle.handshakeInfo.uid), refuseUid, endpoint) else { endpoints.registerReadOnlyEndpoint(handle.remoteAddress, endpoint) endpoints.removePolicy(handle.remoteAddress) diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index f0a6cdfdac..02ad18ff28 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -7,7 +7,7 @@ import akka.actor._ import akka.pattern.{ PromiseActorRef, ask, pipe } import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying import akka.remote.transport.AkkaPduCodec.Associate -import akka.remote.transport.AssociationHandle.{ ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener } +import akka.remote.transport.AssociationHandle.{ DisassociateInfo, ActorHandleEventListener, Disassociated, InboundPayload, HandleEventListener } import akka.remote.transport.ThrottlerManager.{ Listener, Handle, ListenerAndMode, Checkin } import akka.remote.transport.ThrottlerTransportAdapter._ import akka.remote.transport.Transport._ @@ -150,6 +150,12 @@ object ThrottlerTransportAdapter { @SerialVersionUID(1L) final case class ForceDisassociate(address: Address) + /** + * Management Command to force dissocation of an address with an explicit error. + */ + @SerialVersionUID(1L) + final case class ForceDisassociateExplicitly(address: Address, reason: DisassociateInfo) + @SerialVersionUID(1L) case object ForceDisassociateAck { /** @@ -172,9 +178,10 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA override def managementCommand(cmd: Any): Future[Boolean] = { import ActorTransportAdapter.AskTimeout cmd match { - case s: SetThrottle ⇒ manager ? s map { case SetThrottleAck ⇒ true } - case f: ForceDisassociate ⇒ manager ? f map { case ForceDisassociateAck ⇒ true } - case _ ⇒ wrappedTransport.managementCommand(cmd) + case s: SetThrottle ⇒ manager ? s map { case SetThrottleAck ⇒ true } + case f: ForceDisassociate ⇒ manager ? f map { case ForceDisassociateAck ⇒ true } + case f: ForceDisassociateExplicitly ⇒ manager ? f map { case ForceDisassociateAck ⇒ true } + case _ ⇒ wrappedTransport.managementCommand(cmd) } } } @@ -242,6 +249,13 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A case _ ⇒ } sender() ! ForceDisassociateAck + case ForceDisassociateExplicitly(address, reason) ⇒ + val naked = nakedAddress(address) + handleTable foreach { + case (`naked`, handle) ⇒ handle.disassociateWithFailure(reason) + case _ ⇒ + } + sender() ! ForceDisassociateAck case Checkin(origin, handle) ⇒ val naked: Address = nakedAddress(origin) @@ -338,6 +352,8 @@ private[transport] object ThrottledAssociation { sealed trait ThrottlerData case object Uninitialized extends ThrottlerData final case class ExposedHandle(handle: ThrottlerHandle) extends ThrottlerData + + final case class FailWith(reason: DisassociateInfo) } /** @@ -454,6 +470,9 @@ private[transport] class ThrottledAssociation( stay() case Event(Disassociated(info), _) ⇒ stop() // not notifying the upstream handler is intentional: we are relying on heartbeating + case Event(FailWith(reason), _) ⇒ + upstreamListener notify Disassociated(reason) + stop() } // This method captures ASSOCIATE packets and extracts the origin address @@ -534,4 +553,8 @@ private[transport] final case class ThrottlerHandle(_wrappedHandle: AssociationH throttlerActor ! PoisonPill } + def disassociateWithFailure(reason: DisassociateInfo): Unit = { + throttlerActor ! ThrottledAssociation.FailWith(reason) + } + } diff --git a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala index 8cbae48cc4..d38fc9729c 100644 --- a/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/EndpointRegistrySpec.scala @@ -20,9 +20,9 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should be(None) - reg.registerWritableEndpoint(address1, None, actorA) should be(actorA) + reg.registerWritableEndpoint(address1, None, None, actorA) should be(actorA) - reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorA, None))) + reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorA, None, None))) reg.readOnlyEndpointFor(address1) should be(None) reg.isWritable(actorA) should be(true) reg.isReadOnly(actorA) should be(false) @@ -49,10 +49,10 @@ class EndpointRegistrySpec extends AkkaSpec { reg.writableEndpointWithPolicyFor(address1) should be(None) reg.registerReadOnlyEndpoint(address1, actorA) should be(actorA) - reg.registerWritableEndpoint(address1, None, actorB) should be(actorB) + reg.registerWritableEndpoint(address1, None, None, actorB) should be(actorB) reg.readOnlyEndpointFor(address1) should be(Some(actorA)) - reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorB, None))) + reg.writableEndpointWithPolicyFor(address1) should be(Some(Pass(actorB, None, None))) reg.isWritable(actorA) should be(false) reg.isWritable(actorB) should be(true) @@ -66,7 +66,7 @@ class EndpointRegistrySpec extends AkkaSpec { val reg = new EndpointRegistry reg.writableEndpointWithPolicyFor(address1) should be(None) - reg.registerWritableEndpoint(address1, None, actorA) + reg.registerWritableEndpoint(address1, None, None, actorA) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) reg.writableEndpointWithPolicyFor(address1) should be(Some(Gated(deadline))) @@ -85,8 +85,8 @@ class EndpointRegistrySpec extends AkkaSpec { "keep tombstones when removing an endpoint" in { val reg = new EndpointRegistry - reg.registerWritableEndpoint(address1, None, actorA) - reg.registerWritableEndpoint(address2, None, actorB) + reg.registerWritableEndpoint(address1, None, None, actorA) + reg.registerWritableEndpoint(address2, None, None, actorB) val deadline = Deadline.now reg.markAsFailed(actorA, deadline) reg.markAsQuarantined(address2, 42, deadline) @@ -102,8 +102,8 @@ class EndpointRegistrySpec extends AkkaSpec { "prune outdated Gated directives properly" in { val reg = new EndpointRegistry - reg.registerWritableEndpoint(address1, None, actorA) - reg.registerWritableEndpoint(address2, None, actorB) + reg.registerWritableEndpoint(address1, None, None, actorA) + reg.registerWritableEndpoint(address2, None, None, actorB) reg.markAsFailed(actorA, Deadline.now) val farInTheFuture = Deadline.now + Duration(60, SECONDS) reg.markAsFailed(actorB, farInTheFuture)