From 67325da7229ee625ff7ac6aff5e91de2c32cb254 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 13 May 2016 08:06:13 +0200 Subject: [PATCH] init new handshake for unknown origin, receiver restarted, #20313 * handle UID incarnations, shared association state that can be swapped for new handshakes * detect that message comes from unknown origin and then initiate new handshake (handled by InboundHandshake stage) * simplify the OutboundHandshake stage * doesn't have to listen for HandshakeRsp replies, it can just listen to when the uniqueRemoteAddress future is completed, InboundHandshake stage completes the handshake when it receives HandshakeRsp * send the HandshakeReq via the control message ingress, instead of pushing it downstreams, than also means that HandshakeReq is only sent on the control stream, which is good * materialization race condition --- .../artery/HandshakeRestartReceiverSpec.scala | 18 +-- .../remote/artery/AbstractAssociation.java | 18 +++ .../akka/remote/artery/ArteryTransport.scala | 70 +++++++--- .../akka/remote/artery/Association.scala | 109 +++++++++++++--- .../scala/akka/remote/artery/Control.scala | 10 +- .../scala/akka/remote/artery/Handshake.scala | 122 +++++++++--------- .../remote/artery/MessageDispatcher.scala | 2 +- .../remote/artery/SystemMessageDelivery.scala | 2 +- .../artery/InboundControlJunctionSpec.scala | 4 +- .../remote/artery/InboundHandshakeSpec.scala | 34 ++++- .../remote/artery/OutboundHandshakeSpec.scala | 37 ++---- .../artery/SystemMessageDeliverySpec.scala | 4 +- .../akka/remote/artery/TestContext.scala | 62 ++++++--- 13 files changed, 317 insertions(+), 175 deletions(-) create mode 100644 akka-remote/src/main/java/akka/remote/artery/AbstractAssociation.java diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala index 7a377574b1..44d0278d39 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala @@ -3,21 +3,18 @@ */ package akka.remote.artery -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit.NANOSECONDS +import scala.concurrent.Await import scala.concurrent.duration._ + import akka.actor._ -import akka.remote.RemoteActorRefProvider +import akka.remote.AddressUidExtension +import akka.remote.RARP import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ import com.typesafe.config.ConfigFactory -import java.net.InetAddress -import scala.concurrent.Await -import akka.remote.RARP -import akka.remote.AddressUidExtension object HandshakeRestartReceiverSpec extends MultiNodeConfig { val first = role("first") @@ -77,7 +74,7 @@ abstract class HandshakeRestartReceiverSpec val secondAddress = node(second).address val secondAssociation = RARP(system).provider.transport.asInstanceOf[ArteryTransport].association(secondAddress) - val secondUniqueRemoteAddress = Await.result(secondAssociation.uniqueRemoteAddress, 3.seconds) + val secondUniqueRemoteAddress = Await.result(secondAssociation.associationState.uniqueRemoteAddress, 3.seconds) secondUniqueRemoteAddress.address should ===(secondAddress) secondUniqueRemoteAddress.uid should ===(secondUid) @@ -93,7 +90,10 @@ abstract class HandshakeRestartReceiverSpec } val (secondUid2, subject2) = identifyWithUid(secondRootPath, "subject2") secondUid2 should !==(secondUid) - // FIXME verify that UID in association was replaced (not implemented yet) + val secondUniqueRemoteAddress2 = Await.result(secondAssociation.associationState.uniqueRemoteAddress, 3.seconds) + secondUniqueRemoteAddress2.uid should ===(secondUid2) + secondUniqueRemoteAddress2.address should ===(secondAddress) + secondUniqueRemoteAddress2 should !==(secondUniqueRemoteAddress) subject2 ! "shutdown" } diff --git a/akka-remote/src/main/java/akka/remote/artery/AbstractAssociation.java b/akka-remote/src/main/java/akka/remote/artery/AbstractAssociation.java new file mode 100644 index 0000000000..03407235e2 --- /dev/null +++ b/akka-remote/src/main/java/akka/remote/artery/AbstractAssociation.java @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery; + +import akka.util.Unsafe; + +class AbstractAssociation { + protected final static long sharedStateOffset; + + static { + try { + sharedStateOffset = Unsafe.instance.objectFieldOffset(Association.class.getDeclaredField("_sharedStateDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index c2236542a1..c8b694190a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -3,18 +3,24 @@ */ package akka.remote.artery +import java.io.File import java.nio.ByteOrder import java.util.concurrent.ConcurrentHashMap import java.util.function.{ Function ⇒ JFunction } import scala.concurrent.Future +import scala.concurrent.Promise import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success + import akka.Done import akka.NotUsed import akka.actor.ActorRef import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.actor.InternalActorRef +import akka.actor.Props import akka.event.Logging import akka.event.LoggingAdapter import akka.remote.AddressUidExtension @@ -23,8 +29,10 @@ import akka.remote.MessageSerializer import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider import akka.remote.RemoteTransport +import akka.remote.SeqNo import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageSubject +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.transport.AkkaPduCodec import akka.remote.transport.AkkaPduProtobufCodec import akka.serialization.Serialization @@ -60,7 +68,8 @@ private[akka] final case class InboundEnvelope( recipient: InternalActorRef, recipientAddress: Address, message: AnyRef, - senderOption: Option[ActorRef]) + senderOption: Option[ActorRef], + originAddress: UniqueAddress) /** * INTERNAL API @@ -75,7 +84,7 @@ private[akka] trait InboundContext { /** * An inbound stage can send control message, e.g. a reply, to the origin - * address with this method. + * address with this method. It will be sent over the control sub-channel. */ def sendControl(to: Address, message: ControlMessage): Unit @@ -85,6 +94,26 @@ private[akka] trait InboundContext { def association(remoteAddress: Address): OutboundContext } +final class AssociationState( + val incarnation: Int, + val uniqueRemoteAddressPromise: Promise[UniqueAddress]) { + + /** + * Full outbound address with UID for this association. + * Completed when by the handshake. + */ + def uniqueRemoteAddress: Future[UniqueAddress] = uniqueRemoteAddressPromise.future + + override def toString(): String = { + val a = uniqueRemoteAddressPromise.future.value match { + case Some(Success(a)) ⇒ a + case Some(Failure(e)) ⇒ s"Failure(${e.getMessage})" + case None ⇒ "unknown" + } + s"AssociationState($incarnation, $a)" + } +} + /** * INTERNAL API * Outbound association API that is used by the stream stages. @@ -101,17 +130,15 @@ private[akka] trait OutboundContext { */ def remoteAddress: Address - /** - * Full outbound address with UID for this association. - * Completed when by the handshake. - */ - def uniqueRemoteAddress: Future[UniqueAddress] + def associationState: AssociationState + + def completeHandshake(peer: UniqueAddress): Unit /** - * Set the outbound address with UID when the - * handshake is completed. + * An inbound stage can send control message, e.g. a HandshakeReq, to the remote + * address of this association. It will be sent over the control sub-channel. */ - def completeRemoteAddress(a: UniqueAddress): Unit + def sendControl(message: ControlMessage): Unit /** * An outbound stage can listen to control messages @@ -139,7 +166,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var driver: MediaDriver = _ @volatile private[this] var aeron: Aeron = _ - override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) + override val log: LoggingAdapter = Logging(system, getClass.getName) override def defaultAddress: Address = localAddress.address override def addresses: Set[Address] = Set(defaultAddress) override def localAddressForRemote(remote: Address): Address = defaultAddress @@ -280,9 +307,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } } - override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = { - ??? - } + override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = + association(remoteAddress).quarantine(uid) def outbound(outboundContext: OutboundContext): Sink[Send, Any] = { Flow.fromGraph(killSwitch.flow[Send]) @@ -302,6 +328,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner)) } + // FIXME hack until real envelopes, encoding originAddress in sender :) + private val dummySender = system.systemActorOf(Props.empty, "dummy") + // TODO: Try out parallelized serialization (mapAsync) for performance val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope ⇒ val pdu: ByteString = codec.constructMessage( @@ -310,8 +339,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress.address, system)) { MessageSerializer.serialize(system, sendEnvelope.message.asInstanceOf[AnyRef]) }, - sendEnvelope.senderOption, - seqOption = None, // FIXME: Acknowledgements will be handled differently I just reused the old codec + if (sendEnvelope.senderOption.isDefined) sendEnvelope.senderOption else Some(dummySender), // FIXME: hack until real envelopes + seqOption = Some(SeqNo(localAddress.uid)), // FIXME: hack until real envelopes ackOption = None) // TODO: Drop unserializable messages @@ -337,14 +366,15 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R m.recipient, m.recipientAddress, MessageSerializer.deserialize(system, m.serializedMessage), - m.senderOption) + if (m.senderOption.get.path.name == "dummy") None else m.senderOption, // FIXME hack until real envelopes + UniqueAddress(m.senderOption.get.path.address, m.seq.rawValue.toInt)) // FIXME hack until real envelopes } val inboundFlow: Flow[ByteString, ByteString, NotUsed] = { Flow.fromSinkAndSource( decoder .via(deserializer) - .via(new InboundHandshake(this)) + .via(new InboundHandshake(this, inControlStream = false)) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow)) } @@ -353,9 +383,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Flow.fromSinkAndSourceMat( decoder .via(deserializer) - .via(new InboundHandshake(this)) - .via(new SystemMessageAcker(this)) + .via(new InboundHandshake(this, inControlStream = true)) .viaMat(new InboundControlJunction)(Keep.right) + .via(new SystemMessageAcker(this)) .to(messageDispatcherSink), Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 323aac7266..55e9c3b004 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -3,22 +3,27 @@ */ package akka.remote.artery -import scala.concurrent.Future import scala.concurrent.Promise +import scala.util.Success import akka.actor.ActorRef import akka.actor.Address import akka.actor.RootActorPath import akka.dispatch.sysmsg.SystemMessage +import akka.event.Logging import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress import akka.remote.artery.InboundControlJunction.ControlMessageSubject +import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.stream.Materializer import akka.stream.OverflowStrategy +import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Source import akka.stream.scaladsl.SourceQueueWithComplete -import akka.remote.artery.OutboundControlJunction.OutboundControlIngress -import akka.stream.scaladsl.Keep +import akka.util.Unsafe +import java.util.concurrent.locks.ReentrantLock +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit /** * INTERNAL API @@ -30,35 +35,87 @@ private[akka] class Association( val transport: ArteryTransport, val materializer: Materializer, override val remoteAddress: Address, - override val controlSubject: ControlMessageSubject) extends OutboundContext { + override val controlSubject: ControlMessageSubject) + extends AbstractAssociation with OutboundContext { + + private val log = Logging(transport.system, getClass.getName) @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ - @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ + @volatile private[this] var controlQueue: SourceQueueWithComplete[Send] = _ @volatile private[this] var _outboundControlIngress: OutboundControlIngress = _ + private val materializing = new CountDownLatch(1) def outboundControlIngress: OutboundControlIngress = { - if (_outboundControlIngress eq null) - throw new IllegalStateException("outboundControlIngress not initialized yet") - _outboundControlIngress + if (_outboundControlIngress ne null) + _outboundControlIngress + else { + // materialization not completed yet + materializing.await(10, TimeUnit.SECONDS) + if (_outboundControlIngress eq null) + throw new IllegalStateException("outboundControlIngress not initialized yet") + _outboundControlIngress + } } override def localAddress: UniqueAddress = transport.localAddress - // FIXME we also need to be able to switch to new uid - private val _uniqueRemoteAddress = Promise[UniqueAddress]() - override def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future - override def completeRemoteAddress(a: UniqueAddress): Unit = { - require(a.address == remoteAddress, s"Wrong UniqueAddress got [$a.address], expected [$remoteAddress]") - _uniqueRemoteAddress.trySuccess(a) + /** + * Holds reference to shared state of Association - *access only via helper methods* + */ + @volatile + private[this] var _sharedStateDoNotCallMeDirectly: AssociationState = + new AssociationState(incarnation = 1, uniqueRemoteAddressPromise = Promise()) + + /** + * Helper method for access to underlying state via Unsafe + * + * @param oldState Previous state + * @param newState Next state on transition + * @return Whether the previous state matched correctly + */ + @inline + private[this] def swapState(oldState: AssociationState, newState: AssociationState): Boolean = + Unsafe.instance.compareAndSwapObject(this, AbstractAssociation.sharedStateOffset, oldState, newState) + + /** + * @return Reference to current shared state + */ + def associationState: AssociationState = + Unsafe.instance.getObjectVolatile(this, AbstractAssociation.sharedStateOffset).asInstanceOf[AssociationState] + + override def completeHandshake(peer: UniqueAddress): Unit = { + require(remoteAddress == peer.address, + s"wrong remote address in completeHandshake, got ${peer.address}, expected ${remoteAddress}") + val current = associationState + current.uniqueRemoteAddressPromise.trySuccess(peer) + current.uniqueRemoteAddress.value match { + case Some(Success(`peer`)) ⇒ // our value + case _ ⇒ + val newState = new AssociationState(incarnation = current.incarnation + 1, Promise.successful(peer)) + if (swapState(current, newState)) { + current.uniqueRemoteAddress.value match { + case Some(Success(old)) ⇒ + log.debug("Incarnation {} of association to [{}] with new UID [{}] (old UID [{}])", + newState.incarnation, peer.address, peer.uid, old.uid) + quarantine(Some(old.uid)) + case _ ⇒ // Failed, nothing to do + } + // if swap failed someone else completed before us, and that is fine + } + } } + // OutboundContext + override def sendControl(message: ControlMessage): Unit = + outboundControlIngress.sendControlMessage(message) + def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { // TODO: lookup subchannel // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly message match { case _: SystemMessage ⇒ implicit val ec = materializer.executionContext - systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure { + controlQueue.offer(Send(message, senderOption, recipient, None)).onFailure { case e ⇒ // FIXME proper error handling, and quarantining println(s"# System message dropped, due to $e") // FIXME @@ -72,20 +129,30 @@ private[akka] class Association( override val dummyRecipient: RemoteActorRef = transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef] - def quarantine(uid: Option[Int]): Unit = () + def quarantine(uid: Option[Int]): Unit = { + // FIXME implement + log.error("Association to [{}] with UID [{}] is irrecoverably failed. Quarantining address.", + remoteAddress, uid.getOrElse("unknown")) + } // Idempotent def associate(): Unit = { // FIXME detect and handle stream failure, e.g. handshake timeout - if (queue eq null) - queue = Source.queue(256, OverflowStrategy.dropBuffer) - .to(transport.outbound(this)).run()(materializer) - if (systemMessageQueue eq null) { + + // it's important to materialize the outboundControl stream first, + // so that outboundControlIngress is ready when stages for all streams start + if (controlQueue eq null) { val (q, control) = Source.queue(256, OverflowStrategy.dropBuffer) .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) - systemMessageQueue = q + controlQueue = q _outboundControlIngress = control + // stage in the control stream may access the outboundControlIngress before returned here + // using CountDownLatch to make sure that materialization is completed before accessing outboundControlIngress + materializing.countDown() + + queue = Source.queue(256, OverflowStrategy.dropBuffer) + .to(transport.outbound(this)).run()(materializer) } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index a257060ce3..ae3b51c3bd 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -3,20 +3,22 @@ */ package akka.remote.artery +import java.util.ArrayDeque + import scala.concurrent.Future import scala.concurrent.Promise + import akka.Done +import akka.remote.EndpointManager.Send import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet import akka.stream.Outlet +import akka.stream.stage.CallbackWrapper import akka.stream.stage.GraphStageLogic import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler -import akka.remote.EndpointManager.Send -import java.util.ArrayDeque -import akka.stream.stage.CallbackWrapper /** * Marker trait for reply messages @@ -97,7 +99,7 @@ private[akka] class InboundControlJunction // InHandler override def onPush(): Unit = { grab(in) match { - case env @ InboundEnvelope(_, _, _: ControlMessage, _) ⇒ + case env @ InboundEnvelope(_, _, _: ControlMessage, _, _) ⇒ observers.foreach(_.notify(env)) pull(in) case env ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 92793ff6e6..a838e133dd 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -4,11 +4,12 @@ package akka.remote.artery import java.util.concurrent.TimeoutException + import scala.concurrent.duration._ -import akka.Done +import scala.util.Success + import akka.remote.EndpointManager.Send import akka.remote.UniqueAddress -import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet @@ -29,7 +30,6 @@ private[akka] object OutboundHandshake { private sealed trait HandshakeState private case object Start extends HandshakeState - private case object ControlMessageObserverAttached extends HandshakeState private case object ReqInProgress extends HandshakeState private case object Completed extends HandshakeState @@ -46,34 +46,24 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler with ControlMessageObserver { + new TimerGraphStageLogic(shape) with InHandler with OutHandler { import OutboundHandshake._ private var handshakeState: HandshakeState = Start - private def remoteAddress = outboundContext.remoteAddress - override def preStart(): Unit = { - if (outboundContext.uniqueRemoteAddress.isCompleted) { + val uniqueRemoteAddress = outboundContext.associationState.uniqueRemoteAddress + if (uniqueRemoteAddress.isCompleted) { handshakeState = Completed } else { + // The InboundHandshake stage will complete the uniqueRemoteAddress future + // when it receives the HandshakeRsp reply implicit val ec = materializer.executionContext - outboundContext.controlSubject.attach(this).foreach { - getAsyncCallback[Done] { _ ⇒ - if (handshakeState != Completed) { - if (isAvailable(out)) - pushHandshakeReq() - else - handshakeState = ControlMessageObserverAttached - } - }.invoke - } - - outboundContext.uniqueRemoteAddress.foreach { + uniqueRemoteAddress.foreach { getAsyncCallback[UniqueAddress] { a ⇒ if (handshakeState != Completed) { handshakeCompleted() - if (isAvailable(out) && !hasBeenPulled(in)) + if (isAvailable(out)) pull(in) } }.invoke @@ -83,10 +73,6 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: } } - override def postStop(): Unit = { - outboundContext.controlSubject.detach(this) - } - // InHandler override def onPush(): Unit = { if (handshakeState != Completed) @@ -98,50 +84,27 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: override def onPull(): Unit = { handshakeState match { case Completed ⇒ pull(in) - case ControlMessageObserverAttached ⇒ - pushHandshakeReq() - case Start ⇒ // will push HandshakeReq when ControlMessageObserver is attached + case Start ⇒ + // will pull when handshake reply is received (uniqueRemoteAddress completed) + handshakeState = ReqInProgress + outboundContext.sendControl(HandshakeReq(outboundContext.localAddress)) case ReqInProgress ⇒ // will pull when handshake reply is received } } - private def pushHandshakeReq(): Unit = { - handshakeState = ReqInProgress - // FIXME we should be able to Send without recipient ActorRef - push(out, Send(HandshakeReq(outboundContext.localAddress), None, outboundContext.dummyRecipient, None)) - } - private def handshakeCompleted(): Unit = { handshakeState = Completed cancelTimer(HandshakeTimeout) - outboundContext.controlSubject.detach(this) } override protected def onTimer(timerKey: Any): Unit = timerKey match { case HandshakeTimeout ⇒ + // FIXME would it make sense to retry a few times before failing? failStage(new TimeoutException( - s"Handshake with [$remoteAddress] did not complete within ${timeout.toMillis} ms")) + s"Handshake with [${outboundContext.remoteAddress}] did not complete within ${timeout.toMillis} ms")) } - // ControlMessageObserver, external call - override def notify(inboundEnvelope: InboundEnvelope): Unit = { - inboundEnvelope.message match { - case rsp: HandshakeRsp ⇒ - if (rsp.from.address == remoteAddress) { - getAsyncCallback[HandshakeRsp] { reply ⇒ - if (handshakeState != Completed) { - handshakeCompleted() - outboundContext.completeRemoteAddress(reply.from) - if (isAvailable(out) && !hasBeenPulled(in)) - pull(in) - } - }.invoke(rsp) - } - case _ ⇒ // not interested - } - } - setHandlers(in, out, this) } @@ -150,31 +113,62 @@ private[akka] class OutboundHandshake(outboundContext: OutboundContext, timeout: /** * INTERNAL API */ -private[akka] class InboundHandshake(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { +private[akka] class InboundHandshake(inboundContext: InboundContext, inControlStream: Boolean) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { val in: Inlet[InboundEnvelope] = Inlet("InboundHandshake.in") val out: Outlet[InboundEnvelope] = Outlet("InboundHandshake.out") override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler { + new TimerGraphStageLogic(shape) with OutHandler { import OutboundHandshake._ // InHandler - override def onPush(): Unit = { - grab(in) match { - case InboundEnvelope(_, _, HandshakeReq(from), _) ⇒ - inboundContext.association(from.address).completeRemoteAddress(from) - inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) - pull(in) - case other ⇒ - push(out, other) + if (inControlStream) + setHandler(in, new InHandler { + override def onPush(): Unit = { + grab(in) match { + case InboundEnvelope(_, _, HandshakeReq(from), _, _) ⇒ + inboundContext.association(from.address).completeHandshake(from) + inboundContext.sendControl(from.address, HandshakeRsp(inboundContext.localAddress)) + pull(in) + case InboundEnvelope(_, _, HandshakeRsp(from), _, _) ⇒ + inboundContext.association(from.address).completeHandshake(from) + pull(in) + case other ⇒ onMessage(other) + } + } + }) + else + setHandler(in, new InHandler { + override def onPush(): Unit = onMessage(grab(in)) + }) + + private def onMessage(env: InboundEnvelope): Unit = { + if (isKnownOrigin(env.originAddress)) + push(out, env) + else { + inboundContext.sendControl(env.originAddress.address, HandshakeReq(inboundContext.localAddress)) + // FIXME Note that we have the originAddress that would be needed to complete the handshake + // but it is not done here because the handshake might exchange more information. + // Is that a valid thought? + // drop message from unknown, this system was probably restarted + pull(in) + } + } + + private def isKnownOrigin(originAddress: UniqueAddress): Boolean = { + // FIXME these association lookups are probably too costly for each message, need local cache or something + val associationState = inboundContext.association(originAddress.address).associationState + associationState.uniqueRemoteAddress.value match { + case Some(Success(a)) if a.uid == originAddress.uid ⇒ true + case x ⇒ false } } // OutHandler override def onPull(): Unit = pull(in) - setHandlers(in, out, this) + setHandler(out, this) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala index 3286729521..a76a4b478b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala @@ -25,7 +25,7 @@ private[akka] class MessageDispatcher( provider: RemoteActorRefProvider) { private val remoteDaemon = provider.remoteDaemon - private val log = Logging(system.eventStream, getClass.getName) + private val log = Logging(system, getClass.getName) def dispatch(recipient: InternalActorRef, recipientAddress: Address, diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 21e62ca917..4fa7a2d23f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -199,7 +199,7 @@ private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends G // InHandler override def onPush(): Unit = { grab(in) match { - case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _) ⇒ + case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _, _) ⇒ if (n == seqNo) { inboundContext.sendControl(ackReplyTo.address, Ack(n, localAddress)) seqNo += 1 diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala index 4b67be4274..1ead987ae9 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundControlJunctionSpec.scala @@ -42,9 +42,9 @@ class InboundControlJunctionSpec extends AkkaSpec with ImplicitSender { val recipient = null.asInstanceOf[InternalActorRef] // not used val ((upstream, controlSubject), downstream) = TestSource.probe[AnyRef] - .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None)) + .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None, addressA)) .viaMat(new InboundControlJunction)(Keep.both) - .map { case InboundEnvelope(_, _, msg, _) ⇒ msg } + .map { case InboundEnvelope(_, _, msg, _, _) ⇒ msg } .toMat(TestSink.probe[Any])(Keep.both) .run() diff --git a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala index f2b378efc5..ea1b460711 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/InboundHandshakeSpec.scala @@ -3,6 +3,7 @@ */ package akka.remote.artery +import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.Address @@ -40,9 +41,9 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { private def setupStream(inboundContext: InboundContext, timeout: FiniteDuration = 5.seconds): (TestPublisher.Probe[AnyRef], TestSubscriber.Probe[Any]) = { val recipient = null.asInstanceOf[InternalActorRef] // not used TestSource.probe[AnyRef] - .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None)) - .via(new InboundHandshake(inboundContext)) - .map { case InboundEnvelope(_, _, msg, _) ⇒ msg } + .map(msg ⇒ InboundEnvelope(recipient, addressB.address, msg, None, addressA)) + .via(new InboundHandshake(inboundContext, inControlStream = true)) + .map { case InboundEnvelope(_, _, msg, _, _) ⇒ msg } .toMat(TestSink.probe[Any])(Keep.both) .run() } @@ -62,6 +63,33 @@ class InboundHandshakeSpec extends AkkaSpec with ImplicitSender { downstream.cancel() } + "complete remoteUniqueAddress when receiving HandshakeReq" in { + val inboundContext = new TestInboundContext(addressB) + val (upstream, downstream) = setupStream(inboundContext) + + downstream.request(10) + upstream.sendNext(HandshakeReq(addressA)) + upstream.sendNext("msg1") + downstream.expectNext("msg1") + val uniqueRemoteAddress = Await.result( + inboundContext.association(addressA.address).associationState.uniqueRemoteAddress, remainingOrDefault) + uniqueRemoteAddress should ===(addressA) + downstream.cancel() + } + + "send HandshakeReq as when receiving message from unknown (receiving system restarted)" in { + val replyProbe = TestProbe() + val inboundContext = new ManualReplyInboundContext(replyProbe.ref, addressB, new TestControlMessageSubject) + val (upstream, downstream) = setupStream(inboundContext) + + downstream.request(10) + // no HandshakeReq + upstream.sendNext("msg17") + replyProbe.expectMsg(HandshakeReq(addressB)) + downstream.expectNoMsg(200.millis) // messages from unknown are dropped + downstream.cancel() + } + } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala index a75c8576bf..3e9d73e0f5 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/OutboundHandshakeSpec.scala @@ -8,12 +8,10 @@ import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import akka.actor.Address -import akka.actor.InternalActorRef import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef import akka.remote.UniqueAddress import akka.remote.artery.OutboundHandshake.HandshakeReq -import akka.remote.artery.OutboundHandshake.HandshakeRsp import akka.remote.artery.SystemMessageDelivery._ import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings @@ -24,6 +22,7 @@ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSource import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender +import akka.testkit.TestProbe class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { @@ -45,53 +44,37 @@ class OutboundHandshakeSpec extends AkkaSpec with ImplicitSender { "OutboundHandshake stage" must { "send HandshakeReq when first pulled" in { - val inboundContext = new TestInboundContext(localAddress = addressA) + val controlProbe = TestProbe() + val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref)) val outboundContext = inboundContext.association(addressB.address) val (upstream, downstream) = setupStream(outboundContext) downstream.request(10) - downstream.expectNext(HandshakeReq(addressA)) + controlProbe.expectMsg(HandshakeReq(addressA)) downstream.cancel() } - "timeout if not receiving HandshakeRsp" in { + "timeout if handshake not completed" in { val inboundContext = new TestInboundContext(localAddress = addressA) val outboundContext = inboundContext.association(addressB.address) val (upstream, downstream) = setupStream(outboundContext, timeout = 200.millis) downstream.request(1) - downstream.expectNext(HandshakeReq(addressA)) downstream.expectError().getClass should be(classOf[TimeoutException]) } "not deliver messages from upstream until handshake completed" in { - val controlSubject = new TestControlMessageSubject - val inboundContext = new TestInboundContext(localAddress = addressA, controlSubject) + val controlProbe = TestProbe() + val inboundContext = new TestInboundContext(localAddress = addressA, controlProbe = Some(controlProbe.ref)) val outboundContext = inboundContext.association(addressB.address) - val recipient = null.asInstanceOf[InternalActorRef] // not used val (upstream, downstream) = setupStream(outboundContext) downstream.request(10) - downstream.expectNext(HandshakeReq(addressA)) + controlProbe.expectMsg(HandshakeReq(addressA)) upstream.sendNext("msg1") downstream.expectNoMsg(200.millis) - controlSubject.sendControl(InboundEnvelope(recipient, addressA.address, HandshakeRsp(addressB), None)) - downstream.expectNext("msg1") - upstream.sendNext("msg2") - downstream.expectNext("msg2") - downstream.cancel() - } - - "complete handshake via another sub-channel" in { - val inboundContext = new TestInboundContext(localAddress = addressA) - val outboundContext = inboundContext.association(addressB.address) - val (upstream, downstream) = setupStream(outboundContext) - - downstream.request(10) - downstream.expectNext(HandshakeReq(addressA)) - upstream.sendNext("msg1") - // handshake completed first by another sub-channel - outboundContext.completeRemoteAddress(addressB) + // InboundHandshake stage will complete the handshake when receiving HandshakeRsp + inboundContext.association(addressB.address).completeHandshake(addressB) downstream.expectNext("msg1") upstream.sendNext("msg2") downstream.expectNext("msg2") diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index 6e24432483..62f1ecd768 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -10,9 +10,7 @@ import scala.concurrent.duration._ import akka.NotUsed import akka.actor.ActorIdentity -import akka.actor.ActorRef import akka.actor.ActorSystem -import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.actor.Identify import akka.actor.InternalActorRef @@ -84,7 +82,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo Flow[Send] .map { case Send(sysEnv: SystemMessageEnvelope, _, _, _) ⇒ - InboundEnvelope(recipient, addressB.address, sysEnv, None) + InboundEnvelope(recipient, addressB.address, sysEnv, None, addressA) } .async .via(new SystemMessageAcker(inboundContext)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 0db4672139..d1af1446ea 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -3,35 +3,39 @@ */ package akka.remote.artery -import akka.remote.UniqueAddress -import akka.actor.Address -import scala.concurrent.Future -import akka.remote.artery.InboundControlJunction.ControlMessageSubject -import akka.remote.RemoteActorRef -import scala.concurrent.Promise -import akka.Done -import akka.remote.artery.InboundControlJunction.ControlMessageObserver -import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.ThreadLocalRandom + +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Success + +import akka.Done import akka.actor.ActorRef +import akka.actor.Address +import akka.remote.RemoteActorRef +import akka.remote.UniqueAddress +import akka.remote.artery.InboundControlJunction.ControlMessageObserver +import akka.remote.artery.InboundControlJunction.ControlMessageSubject private[akka] class TestInboundContext( override val localAddress: UniqueAddress, val controlSubject: TestControlMessageSubject = new TestControlMessageSubject, - replyDropRate: Double = 0.0) extends InboundContext { + val controlProbe: Option[ActorRef] = None, + val replyDropRate: Double = 0.0) extends InboundContext { private val associations = new ConcurrentHashMap[Address, OutboundContext] - def sendControl(to: Address, message: ControlMessage) = { + override def sendControl(to: Address, message: ControlMessage) = { if (ThreadLocalRandom.current().nextDouble() >= replyDropRate) - controlSubject.sendControl(InboundEnvelope(null, to, message, None)) + association(to).sendControl(message) } - def association(remoteAddress: Address): OutboundContext = + override def association(remoteAddress: Address): OutboundContext = associations.get(remoteAddress) match { case null ⇒ - val a = new TestOutboundContext(localAddress, remoteAddress, controlSubject) + val a = createAssociation(remoteAddress) associations.putIfAbsent(remoteAddress, a) match { case null ⇒ a case existing ⇒ existing @@ -40,20 +44,38 @@ private[akka] class TestInboundContext( } protected def createAssociation(remoteAddress: Address): OutboundContext = - new TestOutboundContext(localAddress, remoteAddress, controlSubject) + new TestOutboundContext(localAddress, remoteAddress, controlSubject, controlProbe) } private[akka] class TestOutboundContext( override val localAddress: UniqueAddress, override val remoteAddress: Address, - override val controlSubject: TestControlMessageSubject) extends OutboundContext { + override val controlSubject: TestControlMessageSubject, + val controlProbe: Option[ActorRef] = None) extends OutboundContext { - private val _uniqueRemoteAddress = Promise[UniqueAddress]() - def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future - def completeRemoteAddress(a: UniqueAddress): Unit = _uniqueRemoteAddress.trySuccess(a) + // access to this is synchronized (it's a test utility) + private var _associationState = new AssociationState(1, Promise()) + + override def associationState: AssociationState = synchronized { + _associationState + } + + override def completeHandshake(peer: UniqueAddress): Unit = synchronized { + _associationState.uniqueRemoteAddressPromise.trySuccess(peer) + _associationState.uniqueRemoteAddress.value match { + case Some(Success(`peer`)) ⇒ // our value + case _ ⇒ + _associationState = new AssociationState(incarnation = _associationState.incarnation + 1, Promise.successful(peer)) + } + } + + override def sendControl(message: ControlMessage) = { + controlProbe.foreach(_ ! message) + controlSubject.sendControl(InboundEnvelope(null, remoteAddress, message, None, localAddress)) + } // FIXME we should be able to Send without a recipient ActorRef - def dummyRecipient: RemoteActorRef = null + override def dummyRecipient: RemoteActorRef = null }