diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala index 44d0278d39..5fea761616 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala @@ -55,9 +55,11 @@ abstract class HandshakeRestartReceiverSpec super.afterAll() } - def identifyWithUid(rootPath: ActorPath, actorName: String): (Int, ActorRef) = { - system.actorSelection(rootPath / "user" / actorName) ! "identify" - expectMsgType[(Int, ActorRef)] + def identifyWithUid(rootPath: ActorPath, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Int, ActorRef) = { + within(timeout) { + system.actorSelection(rootPath / "user" / actorName) ! "identify" + expectMsgType[(Int, ActorRef)] + } } "Artery Handshake" must { @@ -70,7 +72,7 @@ abstract class HandshakeRestartReceiverSpec runOn(first) { val secondRootPath = node(second) - val (secondUid, _) = identifyWithUid(secondRootPath, "subject") + val (secondUid, _) = identifyWithUid(secondRootPath, "subject", 5.seconds) val secondAddress = node(second).address val secondAssociation = RARP(system).provider.transport.asInstanceOf[ArteryTransport].association(secondAddress) @@ -83,14 +85,13 @@ abstract class HandshakeRestartReceiverSpec within(30.seconds) { awaitAssert { - within(1.second) { - identifyWithUid(secondRootPath, "subject2") - } + identifyWithUid(secondRootPath, "subject2", 1.second) } } val (secondUid2, subject2) = identifyWithUid(secondRootPath, "subject2") secondUid2 should !==(secondUid) val secondUniqueRemoteAddress2 = Await.result(secondAssociation.associationState.uniqueRemoteAddress, 3.seconds) + println(s"# ${secondAssociation.associationState} secondUid $secondUid $secondUid2") // FIXME secondUniqueRemoteAddress2.uid should ===(secondUid2) secondUniqueRemoteAddress2.address should ===(secondAddress) secondUniqueRemoteAddress2 should !==(secondUniqueRemoteAddress) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PiercingShouldKeepQuarantineSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PiercingShouldKeepQuarantineSpec.scala new file mode 100644 index 0000000000..fd292f172f --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/PiercingShouldKeepQuarantineSpec.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.testkit._ +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.remote.testconductor.RoleName +import akka.remote.AddressUidExtension +import akka.remote.RARP + +object PiercingShouldKeepQuarantineSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + #akka.loglevel = INFO + #akka.remote.log-remote-lifecycle-events = INFO + akka.remote.retry-gate-closed-for = 0.5s + + akka.remote.artery.enabled = on + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20561 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20562 + } + + nodeConfig(first) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(first)} + """) + } + + nodeConfig(second) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(second)} + """) + } + + class Subject extends Actor { + def receive = { + case "getuid" ⇒ sender() ! AddressUidExtension(context.system).addressUid + } + } + +} + +class PiercingShouldKeepQuarantineSpecMultiJvmNode1 extends PiercingShouldKeepQuarantineSpec +class PiercingShouldKeepQuarantineSpecMultiJvmNode2 extends PiercingShouldKeepQuarantineSpec + +abstract class PiercingShouldKeepQuarantineSpec extends MultiNodeSpec(PiercingShouldKeepQuarantineSpec) + with STMultiNodeSpec + with ImplicitSender { + + import PiercingShouldKeepQuarantineSpec._ + + override def initialParticipants = roles.size + + "While probing through the quarantine remoting" must { + + "not lose existing quarantine marker" taggedAs LongRunningTest in { + runOn(first) { + enterBarrier("actors-started") + + // Communicate with second system + system.actorSelection(node(second) / "user" / "subject") ! "getuid" + val uid = expectMsgType[Int](10.seconds) + enterBarrier("actor-identified") + + // Manually Quarantine the other system + RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + + // Quarantining is not immediate + Thread.sleep(1000) + + // Quarantine is up -- Should not be able to communicate with remote system any more + for (_ ← 1 to 4) { + system.actorSelection(node(second) / "user" / "subject") ! "getuid" + expectNoMsg(2.seconds) + } + + enterBarrier("quarantine-intact") + + } + + runOn(second) { + system.actorOf(Props[Subject], "subject") + enterBarrier("actors-started") + enterBarrier("actor-identified") + enterBarrier("quarantine-intact") + } + + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteQuarantinePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteQuarantinePiercingSpec.scala new file mode 100644 index 0000000000..e546a293e6 --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteQuarantinePiercingSpec.scala @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.artery + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociate, Direction } +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.ActorIdentity +import akka.remote.testconductor.RoleName +import akka.actor.Identify +import scala.concurrent.Await +import akka.remote.AddressUidExtension +import akka.remote.RARP + +object RemoteQuarantinePiercingSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = INFO + akka.remote.artery.enabled = on + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20551 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20552 + } + + nodeConfig(first) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(first)} + """) + } + + nodeConfig(second) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(second)} + """) + } + + class Subject extends Actor { + def receive = { + case "shutdown" ⇒ context.system.terminate() + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self) + } + } + +} + +class RemoteQuarantinePiercingSpecMultiJvmNode1 extends RemoteQuarantinePiercingSpec +class RemoteQuarantinePiercingSpecMultiJvmNode2 extends RemoteQuarantinePiercingSpec + +abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuarantinePiercingSpec) + with STMultiNodeSpec + with ImplicitSender { + + import RemoteQuarantinePiercingSpec._ + + override def initialParticipants = roles.size + + def identifyWithUid(role: RoleName, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Int, ActorRef) = { + within(timeout) { + system.actorSelection(node(role) / "user" / actorName) ! "identify" + expectMsgType[(Int, ActorRef)] + } + } + + "RemoteNodeShutdownAndComesBack" must { + + "allow piercing through the quarantine when remote UID is new" taggedAs LongRunningTest in { + runOn(first) { + val secondAddress = node(second).address + enterBarrier("actors-started") + + // Acquire ActorRef from first system + val (uidFirst, subjectFirst) = identifyWithUid(second, "subject", 5.seconds) + enterBarrier("actor-identified") + + // Manually Quarantine the other system + RARP(system).provider.transport.quarantine(node(second).address, Some(uidFirst)) + + // Quarantine is up -- Cannot communicate with remote system any more + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify" + expectNoMsg(2.seconds) + + // Shut down the other system -- which results in restart (see runOn(second)) + Await.result(testConductor.shutdown(second), 30.seconds) + + // Now wait until second system becomes alive again + within(30.seconds) { + // retry because the Subject actor might not be started yet + awaitAssert { + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "identify" + val (uidSecond, subjectSecond) = expectMsgType[(Int, ActorRef)](1.second) + uidSecond should not be (uidFirst) + subjectSecond should not be (subjectFirst) + } + } + + // If we got here the Quarantine was successfully pierced since it is configured to last 1 day + + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "shutdown" + + } + + runOn(second) { + val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + system.actorOf(Props[Subject], "subject") + enterBarrier("actors-started") + + enterBarrier("actor-identified") + + Await.ready(system.whenTerminated, 30.seconds) + + val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" + akka.remote.artery.port = ${addr.port.get} + """).withFallback(system.settings.config)) + freshSystem.actorOf(Props[Subject], "subject") + + Await.ready(freshSystem.whenTerminated, 30.seconds) + } + + } + + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala new file mode 100644 index 0000000000..24402358bd --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala @@ -0,0 +1,169 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.remote.transport.AssociationHandle +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociateExplicitly, ForceDisassociate, Direction } +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.ActorIdentity +import akka.remote.testconductor.RoleName +import akka.actor.Identify +import scala.concurrent.Await +import akka.remote.AddressUidExtension +import akka.remote.RARP +import akka.remote.ThisActorSystemQuarantinedEvent + +object RemoteRestartedQuarantinedSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = WARNING + akka.remote.log-remote-lifecycle-events = WARNING + + # Keep it long, we don't want reconnects + akka.remote.retry-gate-closed-for = 1 s + + # Important, otherwise it is very racy to get a non-writing endpoint: the only way to do it if the two nodes + # associate to each other at the same time. Setting this will ensure that the right scenario happens. + akka.remote.use-passive-connections = off + + # TODO should not be needed, but see TODO at the end of the test + akka.remote.transport-failure-detector.heartbeat-interval = 1 s + akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 10 s + + akka.remote.artery.enabled = on + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20541 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20542 + } + + nodeConfig(first) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(first)} + """) + } + + nodeConfig(second) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(second)} + """) + } + + class Subject extends Actor { + def receive = { + case "shutdown" ⇒ context.system.terminate() + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self) + } + } + +} + +class RemoteRestartedQuarantinedSpecMultiJvmNode1 extends RemoteRestartedQuarantinedSpec +class RemoteRestartedQuarantinedSpecMultiJvmNode2 extends RemoteRestartedQuarantinedSpec + +abstract class RemoteRestartedQuarantinedSpec + extends MultiNodeSpec(RemoteRestartedQuarantinedSpec) + with STMultiNodeSpec with ImplicitSender { + + import RemoteRestartedQuarantinedSpec._ + + override def initialParticipants = 2 + + def identifyWithUid(role: RoleName, actorName: String, timeout: FiniteDuration = remainingOrDefault): (Int, ActorRef) = { + within(timeout) { + system.actorSelection(node(role) / "user" / actorName) ! "identify" + expectMsgType[(Int, ActorRef)] + } + } + + "A restarted quarantined system" must { + + "should not crash the other system (#17213)" taggedAs LongRunningTest in { + + system.actorOf(Props[Subject], "subject") + enterBarrier("subject-started") + + runOn(first) { + val secondAddress = node(second).address + + val (uid, ref) = identifyWithUid(second, "subject", 5.seconds) + + enterBarrier("before-quarantined") + RARP(system).provider.transport.quarantine(node(second).address, Some(uid)) + + enterBarrier("quarantined") + enterBarrier("still-quarantined") + + testConductor.shutdown(second).await + + within(30.seconds) { + awaitAssert { + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! Identify("subject") + expectMsgType[ActorIdentity](1.second).ref.get + } + } + + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "shutdown" + } + + runOn(second) { + val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val firstAddress = node(first).address + system.eventStream.subscribe(testActor, classOf[ThisActorSystemQuarantinedEvent]) + + val (firstUid, ref) = identifyWithUid(first, "subject", 5.seconds) + + enterBarrier("before-quarantined") + enterBarrier("quarantined") + + expectMsgPF(10 seconds) { + case ThisActorSystemQuarantinedEvent(local, remote) ⇒ + } + + // check that we quarantine back + val firstAssociation = RARP(system).provider.transport.asInstanceOf[ArteryTransport].association(firstAddress) + awaitAssert { + firstAssociation.associationState.isQuarantined(firstUid) + firstAssociation.associationState.isQuarantined() + } + + enterBarrier("still-quarantined") + + Await.result(system.whenTerminated, 10.seconds) + + val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" + akka.remote.artery.port = ${addr.port.get} + """).withFallback(system.settings.config)) + + val probe = TestProbe()(freshSystem) + + freshSystem.actorSelection(RootActorPath(firstAddress) / "user" / "subject").tell(Identify("subject"), probe.ref) + // TODO sometimes it takes long time until the new connection is established, + // It seems like there must first be a transport failure detector timeout, that triggers + // "No response from remote. Handshake timed out or transport failure detector triggered". + probe.expectMsgType[ActorIdentity](30.second).ref should not be (None) + + // Now the other system will be able to pass, too + freshSystem.actorOf(Props[Subject], "subject") + + Await.ready(freshSystem.whenTerminated, 10.seconds) + } + + } + + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index c8b694190a..9952a82c3a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -13,7 +13,6 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success - import akka.Done import akka.NotUsed import akka.actor.ActorRef @@ -25,12 +24,16 @@ import akka.event.Logging import akka.event.LoggingAdapter import akka.remote.AddressUidExtension import akka.remote.EndpointManager.Send +import akka.remote.EventPublisher import akka.remote.MessageSerializer import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport +import akka.remote.RemotingLifecycleEvent import akka.remote.SeqNo +import akka.remote.ThisActorSystemQuarantinedEvent import akka.remote.UniqueAddress +import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.transport.AkkaPduCodec @@ -55,6 +58,7 @@ import io.aeron.driver.MediaDriver import io.aeron.exceptions.ConductorServiceTimeoutException import org.agrona.ErrorHandler import org.agrona.IoUtil +import scala.util.Try import java.io.File import java.net.InetSocketAddress import java.nio.channels.DatagramChannel @@ -92,11 +96,24 @@ private[akka] trait InboundContext { * Lookup the outbound association for a given address. */ def association(remoteAddress: Address): OutboundContext + } -final class AssociationState( +/** + * INTERNAL API + */ +private[akka] object AssociationState { + def apply(): AssociationState = + new AssociationState(incarnation = 1, uniqueRemoteAddressPromise = Promise(), quarantined = Set.empty) +} + +/** + * INTERNAL API + */ +private[akka] final class AssociationState( val incarnation: Int, - val uniqueRemoteAddressPromise: Promise[UniqueAddress]) { + val uniqueRemoteAddressPromise: Promise[UniqueAddress], + val quarantined: Set[Long]) { /** * Full outbound address with UID for this association. @@ -104,6 +121,33 @@ final class AssociationState( */ def uniqueRemoteAddress: Future[UniqueAddress] = uniqueRemoteAddressPromise.future + def uniqueRemoteAddressValue(): Option[Try[UniqueAddress]] = { + // FIXME we should cache access to uniqueRemoteAddress.value (avoid allocations), used in many places + uniqueRemoteAddress.value + } + + def newIncarnation(remoteAddressPromise: Promise[UniqueAddress]): AssociationState = + new AssociationState(incarnation + 1, remoteAddressPromise, quarantined) + + def newQuarantined(): AssociationState = + uniqueRemoteAddressPromise.future.value match { + case Some(Success(a)) ⇒ + new AssociationState(incarnation, uniqueRemoteAddressPromise, quarantined = quarantined + a.uid) + case _ ⇒ this + } + + def isQuarantined(): Boolean = { + uniqueRemoteAddressValue match { + case Some(Success(a)) ⇒ isQuarantined(a.uid) + case _ ⇒ false // handshake not completed yet + } + } + + def isQuarantined(uid: Long): Boolean = { + // FIXME does this mean boxing (allocation) because of Set[Long]? Use specialized Set. LongMap? + quarantined(uid) + } + override def toString(): String = { val a = uniqueRemoteAddressPromise.future.value match { case Some(Success(a)) ⇒ a @@ -134,6 +178,8 @@ private[akka] trait OutboundContext { def completeHandshake(peer: UniqueAddress): Unit + def quarantine(reason: String): Unit + /** * An inbound stage can send control message, e.g. a HandshakeReq, to the remote * address of this association. It will be sent over the control sub-channel. @@ -166,10 +212,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var driver: MediaDriver = _ @volatile private[this] var aeron: Aeron = _ - override val log: LoggingAdapter = Logging(system, getClass.getName) override def defaultAddress: Address = localAddress.address override def addresses: Set[Address] = Set(defaultAddress) override def localAddressForRemote(remote: Address): Address = defaultAddress + override val log: LoggingAdapter = Logging(system, getClass.getName) + private val eventPublisher = new EventPublisher(system, log, remoteSettings.RemoteLifecycleEventsLogLevel) private val codec: AkkaPduCodec = AkkaPduProtobufCodec private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") @@ -252,6 +299,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def runInboundFlows(): Unit = { + // control stream controlSubject = Source.fromGraph(new AeronSource(inboundChannel, controlStreamId, aeron, taskRunner)) .async // FIXME measure .map(ByteString.apply) // TODO we should use ByteString all the way @@ -259,6 +307,20 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .to(Sink.ignore) .run()(materializer) + controlSubject.attach(new ControlMessageObserver { + override def notify(inboundEnvelope: InboundEnvelope): Unit = { + inboundEnvelope.message match { + case Quarantined(from, to) if to == localAddress ⇒ + val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress.address, from.address) + publishLifecycleEvent(lifecycleEvent) + // quarantine the other system from here + association(from.address).quarantine(lifecycleEvent.toString, Some(from.uid)) + case _ ⇒ // not interesting + } + } + }) + + // ordinary messages stream Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner)) .async // FIXME measure .map(ByteString.apply) // TODO we should use ByteString all the way @@ -307,8 +369,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } } + private def publishLifecycleEvent(event: RemotingLifecycleEvent): Unit = + eventPublisher.notifyListeners(event) + override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = - association(remoteAddress).quarantine(uid) + association(remoteAddress).quarantine(reason = "", uid) // FIXME change the method signature (old remoting) to include reason? def outbound(outboundContext: OutboundContext): Sink[Send, Any] = { Flow.fromGraph(killSwitch.flow[Send]) @@ -321,11 +386,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def outboundControl(outboundContext: OutboundContext): Sink[Send, OutboundControlIngress] = { Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext, handshakeTimeout)) - .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) + .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval, remoteSettings.SysMsgBufferSize)) .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder) .map(_.toArray) // TODO we should use ByteString all the way .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner)) + + // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } // FIXME hack until real envelopes, encoding originAddress in sender :) @@ -375,6 +442,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R decoder .via(deserializer) .via(new InboundHandshake(this, inControlStream = false)) + .via(new InboundQuarantineCheck(this)) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow)) } @@ -384,6 +452,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R decoder .via(deserializer) .via(new InboundHandshake(this, inControlStream = true)) + .via(new InboundQuarantineCheck(this)) .viaMat(new InboundControlJunction)(Keep.right) .via(new SystemMessageAcker(this)) .to(messageDispatcherSink), diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 55e9c3b004..4c0b427441 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -3,6 +3,7 @@ */ package akka.remote.artery +import scala.annotation.tailrec import scala.concurrent.Promise import scala.util.Success import akka.actor.ActorRef @@ -24,6 +25,8 @@ import akka.util.Unsafe import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import akka.actor.ActorSelectionMessage +import akka.remote.artery.SystemMessageDelivery.ClearSystemMessageDelivery /** * INTERNAL API @@ -39,6 +42,7 @@ private[akka] class Association( extends AbstractAssociation with OutboundContext { private val log = Logging(transport.system, getClass.getName) + private val controlQueueSize = transport.provider.remoteSettings.SysMsgBufferSize @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ @volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _ @@ -63,8 +67,7 @@ private[akka] class Association( * Holds reference to shared state of Association - *access only via helper methods* */ @volatile - private[this] var _sharedStateDoNotCallMeDirectly: AssociationState = - new AssociationState(incarnation = 1, uniqueRemoteAddressPromise = Promise()) + private[this] var _sharedStateDoNotCallMeDirectly: AssociationState = AssociationState() /** * Helper method for access to underlying state via Unsafe @@ -88,17 +91,17 @@ private[akka] class Association( s"wrong remote address in completeHandshake, got ${peer.address}, expected ${remoteAddress}") val current = associationState current.uniqueRemoteAddressPromise.trySuccess(peer) - current.uniqueRemoteAddress.value match { + current.uniqueRemoteAddressValue() match { case Some(Success(`peer`)) ⇒ // our value case _ ⇒ - val newState = new AssociationState(incarnation = current.incarnation + 1, Promise.successful(peer)) + val newState = current.newIncarnation(Promise.successful(peer)) if (swapState(current, newState)) { - current.uniqueRemoteAddress.value match { + current.uniqueRemoteAddressValue() match { case Some(Success(old)) ⇒ log.debug("Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", newState.incarnation, peer.address, peer.uid, old.uid) - quarantine(Some(old.uid)) - case _ ⇒ // Failed, nothing to do + case _ ⇒ + // Failed, nothing to do } // if swap failed someone else completed before us, and that is fine } @@ -110,29 +113,68 @@ private[akka] class Association( outboundControlIngress.sendControlMessage(message) def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { - // TODO: lookup subchannel - // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly - message match { - case _: SystemMessage ⇒ - implicit val ec = materializer.executionContext - controlQueue.offer(Send(message, senderOption, recipient, None)).onFailure { - case e ⇒ - // FIXME proper error handling, and quarantining - println(s"# System message dropped, due to $e") // FIXME - } - case _ ⇒ - queue.offer(Send(message, senderOption, recipient, None)) - } + // allow ActorSelectionMessage to pass through quarantine, to be able to establish interaction with new system + // FIXME where is that ActorSelectionMessage check in old remoting? + if (message.isInstanceOf[ActorSelectionMessage] || !associationState.isQuarantined() || message.isInstanceOf[ClearSystemMessageDelivery.type]) { + // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly + message match { + case _: SystemMessage | ClearSystemMessageDelivery ⇒ + implicit val ec = materializer.executionContext + controlQueue.offer(Send(message, senderOption, recipient, None)).onFailure { + case e ⇒ + quarantine(reason = s"Due to overflow of control queue, size [$controlQueueSize]") + } + case _ ⇒ + queue.offer(Send(message, senderOption, recipient, None)) + } + } else if (log.isDebugEnabled) + log.debug("Dropping message to quarantined system {}", remoteAddress) } // FIXME we should be able to Send without a recipient ActorRef override val dummyRecipient: RemoteActorRef = transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef] - def quarantine(uid: Option[Int]): Unit = { - // FIXME implement - log.error("Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address.", - remoteAddress, uid.getOrElse("unknown")) + // OutboundContext + override def quarantine(reason: String): Unit = { + val uid = associationState.uniqueRemoteAddressValue() match { + case Some(Success(a)) ⇒ Some(a.uid) + case _ ⇒ None + } + quarantine(reason, uid) + } + + @tailrec final def quarantine(reason: String, uid: Option[Int]): Unit = { + uid match { + case Some(u) ⇒ + val current = associationState + current.uniqueRemoteAddressValue() match { + case Some(Success(peer)) if peer.uid == u ⇒ + if (!current.isQuarantined(u)) { + val newState = current.newQuarantined() + if (swapState(current, newState)) { + // quarantine state change was performed + log.warning("Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address. {}", + remoteAddress, u, reason) + // end delivery of system messages to that incarnation after this point + send(ClearSystemMessageDelivery, None, dummyRecipient) + // try to tell the other system that we have quarantined it + sendControl(Quarantined(localAddress, peer)) + } else + quarantine(reason, uid) // recursive + } + case Some(Success(peer)) ⇒ + log.debug("Quarantine of [{}] ignored due to non-matching UID, quarantine requested for [{}] but current is [{}]. {}", + remoteAddress, u, peer.uid, reason) + case None ⇒ + log.debug("Quarantine of [{}] ignored because handshake not completed, quarantine request was for old incarnation. {}", + remoteAddress, reason) + } + case None ⇒ + // FIXME should we do something more, old impl used gating? + log.warning("Quarantine of [{}] ignored because unknown UID", remoteAddress) + } + } // Idempotent @@ -142,7 +184,7 @@ private[akka] class Association( // it's important to materialize the outboundControl stream first, // so that outboundControlIngress is ready when stages for all streams start if (controlQueue eq null) { - val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer) + val (q, control) = Source.queue(controlQueueSize, OverflowStrategy.backpressure) .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) controlQueue = q diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index ae3b51c3bd..b371ae2650 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -4,10 +4,8 @@ package akka.remote.artery import java.util.ArrayDeque - import scala.concurrent.Future import scala.concurrent.Promise - import akka.Done import akka.remote.EndpointManager.Send import akka.stream.Attributes @@ -19,17 +17,24 @@ import akka.stream.stage.GraphStageLogic import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler +import akka.remote.UniqueAddress /** - * Marker trait for reply messages + * INTERNAL API: Marker trait for reply messages */ -trait Reply extends ControlMessage +private[akka] trait Reply extends ControlMessage /** + * INTERNAL API * Marker trait for control messages that can be sent via the system message sub-channel * but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`. */ -trait ControlMessage +private[akka] trait ControlMessage + +/** + * INTERNAL API + */ +private[akka] final case class Quarantined(from: UniqueAddress, to: UniqueAddress) extends ControlMessage // FIXME serialization /** * INTERNAL API diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index a838e133dd..a889acac38 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -159,7 +159,7 @@ private[akka] class InboundHandshake(inboundContext: InboundContext, inControlSt private def isKnownOrigin(originAddress: UniqueAddress): Boolean = { // FIXME these association lookups are probably too costly for each message, need local cache or something val associationState = inboundContext.association(originAddress.address).associationState - associationState.uniqueRemoteAddress.value match { + associationState.uniqueRemoteAddressValue() match { case Some(Success(a)) if a.uid == originAddress.uid ⇒ true case x ⇒ false } diff --git a/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala new file mode 100644 index 0000000000..cf0a916ee9 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/InboundQuarantineCheck.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet + +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler + +/** + * INTERNAL API + */ +private[akka] class InboundQuarantineCheck(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { + val in: Inlet[InboundEnvelope] = Inlet("InboundQuarantineCheck.in") + val out: Outlet[InboundEnvelope] = Outlet("InboundQuarantineCheck.out") + override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + + // InHandler + override def onPush(): Unit = { + val env = grab(in) + val association = inboundContext.association(env.originAddress.address) + if (association.associationState.isQuarantined(env.originAddress.uid)) { + inboundContext.sendControl(env.originAddress.address, + Quarantined(inboundContext.localAddress, env.originAddress)) + pull(in) + } else + push(out, env) + } + + // OutHandler + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 4fa7a2d23f..32a31ee111 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -35,6 +35,8 @@ private[akka] object SystemMessageDelivery { final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply + final case object ClearSystemMessageDelivery + private case object ResendTick } @@ -43,7 +45,8 @@ private[akka] object SystemMessageDelivery { */ private[akka] class SystemMessageDelivery( outboundContext: OutboundContext, - resendInterval: FiniteDuration) + resendInterval: FiniteDuration, + maxBufferSize: Int) extends GraphStage[FlowShape[Send, Send]] { import SystemMessageDelivery._ @@ -120,16 +123,17 @@ private[akka] class SystemMessageDelivery( } private val nackCallback = getAsyncCallback[Nack] { reply ⇒ - ack(reply.seqNo) - if (reply.seqNo > resendingFromSeqNo) - resending = unacknowledged.clone() - tryResend() + if (reply.seqNo <= seqNo) { + ack(reply.seqNo) + if (reply.seqNo > resendingFromSeqNo) + resending = unacknowledged.clone() + tryResend() + } } private def ack(n: Long): Unit = { - if (n > seqNo) - throw new IllegalArgumentException(s"Unexpected ack $n, when highest sent seqNo is $seqNo") - clearUnacknowledged(n) + if (n <= seqNo) + clearUnacknowledged(n) } @tailrec private def clearUnacknowledged(ackedSeqNo: Long): Unit = { @@ -151,20 +155,35 @@ private[akka] class SystemMessageDelivery( // InHandler override def onPush(): Unit = { grab(in) match { + case s @ Send(ClearSystemMessageDelivery, _, _, _) ⇒ + clear() + pull(in) case s @ Send(msg: AnyRef, _, _, _) ⇒ - seqNo += 1 - val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress)) - // FIXME quarantine if unacknowledged is full - unacknowledged.offer(sendMsg) - if (resending.isEmpty && isAvailable(out)) - push(out, sendMsg) - else { - resending.offer(sendMsg) - tryResend() + if (unacknowledged.size < maxBufferSize) { + seqNo += 1 + val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress)) + unacknowledged.offer(sendMsg) + if (resending.isEmpty && isAvailable(out)) + push(out, sendMsg) + else { + resending.offer(sendMsg) + tryResend() + } + } else { + // buffer overflow + outboundContext.quarantine(reason = s"System message delivery buffer overflow, size [$maxBufferSize]") + pull(in) } } } + private def clear(): Unit = { + seqNo = 0L // sequence number for the first message will be 1 + unacknowledged.clear() + resending.clear() + resendingFromSeqNo = -1L + } + // OutHandler override def onPull(): Unit = { if (replyObserverAttached) { // otherwise it will be pulled after attached diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala index ea1b460711..bf7cfcf8bc 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -52,7 +52,7 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { "send HandshakeRsp as reply to HandshakeReq" in { val replyProbe = TestProbe() - val inboundContext = new ManualReplyInboundContext(replyProbe.ref, addressB, new TestControlMessageSubject) + val inboundContext = new TestInboundContext(addressB, controlProbe = Some(replyProbe.ref)) val (upstream, downstream) = setupStream(inboundContext) downstream.request(10) @@ -77,9 +77,9 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { downstream.cancel() } - "send HandshakeReq as when receiving message from unknown (receiving system restarted)" in { + "send HandshakeReq when receiving message from unknown (receiving system restarted)" in { val replyProbe = TestProbe() - val inboundContext = new ManualReplyInboundContext(replyProbe.ref, addressB, new TestControlMessageSubject) + val inboundContext = new TestInboundContext(addressB, controlProbe = Some(replyProbe.ref)) val (upstream, downstream) = setupStream(inboundContext) downstream.request(10) diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 62f1ecd768..940f2bd62f 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -74,7 +74,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo val remoteRef = null.asInstanceOf[RemoteActorRef] // not used Source(1 to sendCount) .map(n ⇒ Send("msg-" + n, None, remoteRef, None)) - .via(new SystemMessageDelivery(outboundContext, resendInterval)) + .via(new SystemMessageDelivery(outboundContext, resendInterval, maxBufferSize = 1000)) } private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = { diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index d1af1446ea..30ed97698f 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -54,7 +54,7 @@ private[akka] class TestOutboundContext( val controlProbe: Option[ActorRef] = None) extends OutboundContext { // access to this is synchronized (it's a test utility) - private var _associationState = new AssociationState(1, Promise()) + private var _associationState = AssociationState() override def associationState: AssociationState = synchronized { _associationState @@ -65,10 +65,14 @@ private[akka] class TestOutboundContext( _associationState.uniqueRemoteAddress.value match { case Some(Success(`peer`)) ⇒ // our value case _ ⇒ - _associationState = new AssociationState(incarnation = _associationState.incarnation + 1, Promise.successful(peer)) + _associationState = _associationState.newIncarnation(Promise.successful(peer)) } } + override def quarantine(reason: String): Unit = synchronized { + _associationState = _associationState.newQuarantined() + } + override def sendControl(message: ControlMessage) = { controlProbe.foreach(_ ! message) controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, None, localAddress))