From 16cf8d4ab6c666c948dd8a8b2056e5ededca5191 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 9 May 2016 07:31:41 +0200 Subject: [PATCH] first stab at handshake, #20313 * UID exchange with handshake stages * second iteration of reply side-channel, observable * InboundContext and OutboundContext to facilitate testing without real transport * collapse ArterySubsystem and Transport into ArteryTransport * incomplete HandshakeRestartReceiverSpec (origin address missing to be able to implement that part * remove embedded aeron media driver directory on shutdown --- .../src/main/scala/akka/actor/Address.scala | 12 + .../artery/AeronStreamConcistencySpec.scala | 3 + .../artery/AeronStreamMaxThroughputSpec.scala | 2 + .../artery/HandshakeRestartReceiverSpec.scala | 135 +++++++ .../akka/remote/RemoteActorRefProvider.scala | 4 +- .../scala/akka/remote/UniqueAddress.scala | 17 + .../akka/remote/artery/ArterySubsystem.scala | 133 ------- .../akka/remote/artery/ArteryTransport.scala | 353 ++++++++++++++++++ .../akka/remote/artery/Association.scala | 80 ++++ .../scala/akka/remote/artery/Handshake.scala | 182 +++++++++ .../remote/artery/MessageDispatcher.scala | 84 +++++ .../main/scala/akka/remote/artery/Reply.scala | 97 +++++ .../remote/artery/SystemMessageDelivery.scala | 169 +++------ .../scala/akka/remote/artery/Transport.scala | 281 -------------- .../artery/SystemMessageDeliverySpec.scala | 204 +++++----- .../akka/remote/artery/TestContext.scala | 79 ++++ 16 files changed, 1194 insertions(+), 641 deletions(-) create mode 100644 akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala create mode 100644 akka-remote/src/main/scala/akka/remote/UniqueAddress.scala delete mode 100644 akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala create mode 100644 akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala create mode 100644 akka-remote/src/main/scala/akka/remote/artery/Association.scala create mode 100644 akka-remote/src/main/scala/akka/remote/artery/Handshake.scala create mode 100644 akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala create mode 100644 akka-remote/src/main/scala/akka/remote/artery/Reply.scala delete mode 100644 akka-remote/src/main/scala/akka/remote/artery/Transport.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/TestContext.scala diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index f5ba1fb781..ac5f90d32d 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -76,6 +76,18 @@ object Address { * Constructs a new Address with the specified protocol, system name, host and port */ def apply(protocol: String, system: String, host: String, port: Int) = new Address(protocol, system, Some(host), Some(port)) + + /** + * `Address` ordering type class, sorts addresses by protocol, name, host and port. + */ + implicit val addressOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ + if (a eq b) false + else if (a.protocol != b.protocol) a.system.compareTo(b.protocol) < 0 + else if (a.system != b.system) a.system.compareTo(b.system) < 0 + else if (a.host != b.host) a.host.getOrElse("").compareTo(b.host.getOrElse("")) < 0 + else if (a.port != b.port) a.port.getOrElse(0) < b.port.getOrElse(0) + else false + } } private[akka] trait PathUtils { diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index 28572c8418..429a3569cd 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -20,6 +20,8 @@ import com.typesafe.config.ConfigFactory import io.aeron.Aeron import io.aeron.driver.MediaDriver import akka.actor.ExtendedActorSystem +import org.agrona.IoUtil +import java.io.File object AeronStreamConsistencySpec extends MultiNodeConfig { val first = role("first") @@ -85,6 +87,7 @@ abstract class AeronStreamConsistencySpec taskRunner.stop() aeron.close() driver.close() + IoUtil.delete(new File(driver.aeronDirectoryName), true) super.afterAll() } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index f374461700..042ed8fc90 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -22,6 +22,7 @@ import io.aeron.driver.MediaDriver import akka.stream.KillSwitches import java.io.File import io.aeron.CncFileDescriptor +import org.agrona.IoUtil object AeronStreamMaxThroughputSpec extends MultiNodeConfig { val first = role("first") @@ -127,6 +128,7 @@ abstract class AeronStreamMaxThroughputSpec taskRunner.stop() aeron.close() driver.close() + IoUtil.delete(new File(driver.aeronDirectoryName), true) runOn(second) { println(plot.csv(system.name)) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala new file mode 100644 index 0000000000..0ea2f76dfb --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/HandshakeRestartReceiverSpec.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit.NANOSECONDS +import scala.concurrent.duration._ +import akka.actor._ +import akka.remote.RemoteActorRefProvider +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import java.net.InetAddress +import scala.concurrent.Await +import akka.remote.RARP +import akka.remote.AddressUidExtension + +object HandshakeRestartReceiverSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(s""" + akka { + loglevel = INFO + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.artery { + enabled = on + } + } + """))) + + def aeronPort(roleName: RoleName): Int = + roleName match { + case `first` ⇒ 20531 // TODO yeah, we should have support for dynamic port assignment + case `second` ⇒ 20532 + } + + nodeConfig(first) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(first)} + """) + } + + nodeConfig(second) { + ConfigFactory.parseString(s""" + akka.remote.artery.port = ${aeronPort(second)} + """) + } + + class Subject extends Actor { + def receive = { + case "shutdown" ⇒ context.system.terminate() + case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self) + } + } + +} + +class HandshakeRestartReceiverSpecMultiJvmNode1 extends HandshakeRestartReceiverSpec +class HandshakeRestartReceiverSpecMultiJvmNode2 extends HandshakeRestartReceiverSpec + +abstract class HandshakeRestartReceiverSpec + extends MultiNodeSpec(HandshakeRestartReceiverSpec) + with STMultiNodeSpec with ImplicitSender { + + import HandshakeRestartReceiverSpec._ + + override def initialParticipants = roles.size + + override def afterAll(): Unit = { + super.afterAll() + } + + def identifyWithUid(rootPath: ActorPath, actorName: String): (Int, ActorRef) = { + system.actorSelection(rootPath / "user" / actorName) ! "identify" + expectMsgType[(Int, ActorRef)] + } + + "Artery Handshake" must { + + "detect restarted receiver and initiate new handshake" in { + runOn(second) { + system.actorOf(Props[Subject], "subject") + } + enterBarrier("subject-started") + + runOn(first) { + val secondRootPath = node(second) + val (secondUid, _) = identifyWithUid(secondRootPath, "subject") + + val secondAddress = node(second).address + val secondAssociation = RARP(system).provider.transport.asInstanceOf[ArteryTransport].association(secondAddress) + val secondUniqueRemoteAddress = Await.result(secondAssociation.uniqueRemoteAddress, 3.seconds) + secondUniqueRemoteAddress.address should ===(secondAddress) + secondUniqueRemoteAddress.uid should ===(secondUid) + + enterBarrier("before-shutdown") + testConductor.shutdown(second).await + + within(30.seconds) { + awaitAssert { + within(1.second) { + identifyWithUid(secondRootPath, "subject2") + } + } + } + val (secondUid2, subject2) = identifyWithUid(secondRootPath, "subject2") + secondUid2 should !==(secondUid) + // FIXME verify that UID in association was replaced (not implemented yet) + + subject2 ! "shutdown" + } + + runOn(second) { + val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + enterBarrier("before-shutdown") + + Await.result(system.whenTerminated, 10.seconds) + + val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" + akka.remote.artery.port = ${addr.port.get} + """).withFallback(system.settings.config)) + freshSystem.actorOf(Props[Subject], "subject2") + + Await.result(freshSystem.whenTerminated, 45.seconds) + } + } + + } +} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 0d6065da83..52fe8f2ba8 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -19,7 +19,7 @@ import scala.util.control.Exception.Catcher import scala.concurrent.Future import akka.ConfigurationException import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import akka.remote.artery.ArterySubsystem +import akka.remote.artery.ArteryTransport /** * INTERNAL API @@ -182,7 +182,7 @@ private[akka] class RemoteActorRefProvider( d }, serialization = SerializationExtension(system), - transport = if (remoteSettings.EnableArtery) new ArterySubsystem(system, this) else new Remoting(system, this)) + transport = if (remoteSettings.EnableArtery) new ArteryTransport(system, this) else new Remoting(system, this)) _internals = internals remotingTerminator ! internals diff --git a/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala b/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala new file mode 100644 index 0000000000..7bb76716c5 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/UniqueAddress.scala @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote + +import akka.actor.Address + +@SerialVersionUID(1L) +final case class UniqueAddress(address: Address, uid: Int) extends Ordered[UniqueAddress] { + override def hashCode = uid + + def compare(that: UniqueAddress): Int = { + val result = Address.addressOrdering.compare(this.address, that.address) + if (result == 0) if (this.uid < that.uid) -1 else if (this.uid == that.uid) 0 else 1 + else result + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala deleted file mode 100644 index 49bbd62e4e..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySubsystem.scala +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package akka.remote.artery - -import java.util.concurrent.ConcurrentHashMap -import akka.actor.{ ActorRef, Address, ExtendedActorSystem } -import akka.event.{ Logging, LoggingAdapter } -import akka.remote.EndpointManager.Send -import akka.remote.transport.AkkaPduProtobufCodec -import akka.remote.{ DefaultMessageDispatcher, RemoteActorRef, RemoteActorRefProvider, RemoteTransport } -import akka.stream.scaladsl.{ Sink, Source, SourceQueueWithComplete, Tcp } -import akka.stream.{ ActorMaterializer, Materializer, OverflowStrategy } -import akka.{ Done, NotUsed } -import scala.concurrent.duration._ -import scala.concurrent.{ Await, Future } -import akka.dispatch.sysmsg.SystemMessage - -/** - * INTERNAL API - */ -private[remote] class ArterySubsystem(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { - import provider.remoteSettings - - @volatile private[this] var address: Address = _ - @volatile private[this] var transport: Transport = _ - @volatile private[this] var tcpBinding: Option[Tcp.ServerBinding] = None - @volatile private[this] var materializer: Materializer = _ - override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) - - override def defaultAddress: Address = address - override def addresses: Set[Address] = Set(address) - override def localAddressForRemote(remote: Address): Address = defaultAddress - - // FIXME: This does locking on putIfAbsent, we need something smarter - private[this] val associations = new ConcurrentHashMap[Address, Association]() - - override def start(): Unit = { - // TODO: Configure materializer properly - // TODO: Have a supervisor actor - address = Address("akka.artery", system.name, remoteSettings.ArteryHostname, remoteSettings.ArteryPort) - materializer = ActorMaterializer()(system) - - transport = - new Transport( - address, - system, - materializer, - provider, - AkkaPduProtobufCodec) - transport.start() - } - - override def shutdown(): Future[Done] = { - if (transport != null) transport.shutdown() - else Future.successful(Done) - } - - override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { - val cached = recipient.cachedAssociation - val remoteAddress = recipient.path.address - - val association = - if (cached ne null) cached - else associate(remoteAddress) - - association.send(message, senderOption, recipient) - } - - private def associate(remoteAddress: Address): Association = { - val current = associations.get(remoteAddress) - if (current ne null) current - else { - associations.computeIfAbsent(remoteAddress, new java.util.function.Function[Address, Association] { - override def apply(remoteAddress: Address): Association = { - val newAssociation = new Association(materializer, remoteAddress, transport) - newAssociation.associate() // This is a bit costly for this blocking method :( - newAssociation - } - }) - } - } - - override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = { - ??? - } - -} - -/** - * INTERNAL API - * - * Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific - * remote address. - */ -private[akka] class Association( - val materializer: Materializer, - val remoteAddress: Address, - val transport: Transport) { - - @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ - @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ - - def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { - // TODO: lookup subchannel - // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly - message match { - case _: SystemMessage | _: SystemMessageDelivery.SystemMessageReply ⇒ - implicit val ec = materializer.executionContext - systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure { - case e ⇒ - // FIXME proper error handling, and quarantining - println(s"# System message dropped, due to $e") // FIXME - } - case _ ⇒ - queue.offer(Send(message, senderOption, recipient, None)) - } - } - - def quarantine(uid: Option[Int]): Unit = () - - // Idempotent - def associate(): Unit = { - if (queue eq null) - queue = Source.queue(256, OverflowStrategy.dropBuffer) - .to(transport.outbound(remoteAddress)).run()(materializer) - if (systemMessageQueue eq null) - systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer) - .to(transport.outboundSystemMessage(remoteAddress)).run()(materializer) - } -} - diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala new file mode 100644 index 0000000000..6b822f9521 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -0,0 +1,353 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.nio.ByteOrder +import java.util.concurrent.ConcurrentHashMap +import java.util.function.{ Function ⇒ JFunction } +import scala.concurrent.Future +import scala.concurrent.duration._ +import akka.Done +import akka.NotUsed +import akka.actor.ActorRef +import akka.actor.Address +import akka.actor.ExtendedActorSystem +import akka.actor.InternalActorRef +import akka.event.Logging +import akka.event.LoggingAdapter +import akka.remote.AddressUidExtension +import akka.remote.EndpointManager.Send +import akka.remote.MessageSerializer +import akka.remote.RemoteActorRef +import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteTransport +import akka.remote.UniqueAddress +import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.transport.AkkaPduCodec +import akka.remote.transport.AkkaPduProtobufCodec +import akka.serialization.Serialization +import akka.stream.ActorMaterializer +import akka.stream.KillSwitches +import akka.stream.Materializer +import akka.stream.SharedKillSwitch +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Framing +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.ByteString +import akka.util.ByteStringBuilder +import io.aeron.Aeron +import io.aeron.AvailableImageHandler +import io.aeron.Image +import io.aeron.UnavailableImageHandler +import io.aeron.driver.MediaDriver +import io.aeron.exceptions.ConductorServiceTimeoutException +import org.agrona.ErrorHandler +import org.agrona.IoUtil +import java.io.File + +/** + * INTERNAL API + */ +private[akka] final case class InboundEnvelope( + recipient: InternalActorRef, + recipientAddress: Address, + message: AnyRef, + senderOption: Option[ActorRef]) + +/** + * INTERNAL API + * Inbound API that is used by the stream stages. + * Separate trait to facilitate testing without real transport. + */ +private[akka] trait InboundContext { + /** + * The local inbound address. + */ + def localAddress: UniqueAddress + + /** + * An inbound stage can send reply message to the origin + * address with this method. + */ + def sendReply(to: Address, message: ControlMessage): Unit + + /** + * Lookup the outbound association for a given address. + */ + def association(remoteAddress: Address): OutboundContext +} + +/** + * INTERNAL API + * Outbound association API that is used by the stream stages. + * Separate trait to facilitate testing without real transport. + */ +private[akka] trait OutboundContext { + /** + * The local inbound address. + */ + def localAddress: UniqueAddress + + /** + * The outbound address for this association. + */ + def remoteAddress: Address + + /** + * Full outbound address with UID for this association. + * Completed when by the handshake. + */ + def uniqueRemoteAddress: Future[UniqueAddress] + + /** + * Set the outbound address with UID when the + * handshake is completed. + */ + def completeRemoteAddress(a: UniqueAddress): Unit + + /** + * An outbound stage can listen to reply messages + * via this observer subject. + */ + def replySubject: ReplySubject + + // FIXME we should be able to Send without a recipient ActorRef + def dummyRecipient: RemoteActorRef +} + +/** + * INTERNAL API + */ +private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) + extends RemoteTransport(_system, _provider) with InboundContext { + import provider.remoteSettings + + // these vars are initialized once in the start method + @volatile private[this] var _localAddress: UniqueAddress = _ + override def localAddress: UniqueAddress = _localAddress + @volatile private[this] var materializer: Materializer = _ + @volatile private[this] var replySubject: ReplySubject = _ + @volatile private[this] var messageDispatcher: MessageDispatcher = _ + @volatile private[this] var driver: MediaDriver = _ + @volatile private[this] var aeron: Aeron = _ + + override val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) + override def defaultAddress: Address = localAddress.address + override def addresses: Set[Address] = Set(defaultAddress) + override def localAddressForRemote(remote: Address): Address = defaultAddress + + private val codec: AkkaPduCodec = AkkaPduProtobufCodec + private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") + private val systemMessageResendInterval: FiniteDuration = 1.second // FIXME config + + // TODO support port 0 + private def inboundChannel = s"aeron:udp?endpoint=${localAddress.address.host.get}:${localAddress.address.port.get}" + private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + private val systemMessageStreamId = 1 + private val ordinaryStreamId = 3 + private val taskRunner = new TaskRunner(system) + + // FIXME: This does locking on putIfAbsent, we need something smarter + private[this] val associations = new ConcurrentHashMap[Address, Association]() + + override def start(): Unit = { + startMediaDriver() + startAeron() + taskRunner.start() + + // TODO: Configure materializer properly + // TODO: Have a supervisor actor + _localAddress = UniqueAddress( + Address("akka.artery", system.name, remoteSettings.ArteryHostname, remoteSettings.ArteryPort), + AddressUidExtension(system).addressUid) + materializer = ActorMaterializer()(system) + + messageDispatcher = new MessageDispatcher(system, provider) + + runInboundFlows() + } + + private def startMediaDriver(): Unit = { + // TODO also support external media driver + val driverContext = new MediaDriver.Context + // FIXME settings from config + driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.driverTimeoutMs(SECONDS.toNanos(10)) + driver = MediaDriver.launchEmbedded(driverContext) + } + + private def startAeron(): Unit = { + val ctx = new Aeron.Context + + ctx.availableImageHandler(new AvailableImageHandler { + override def onAvailableImage(img: Image): Unit = { + if (log.isDebugEnabled) + log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}") + } + }) + ctx.unavailableImageHandler(new UnavailableImageHandler { + override def onUnavailableImage(img: Image): Unit = { + if (log.isDebugEnabled) + log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}") + // FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable + } + }) + ctx.errorHandler(new ErrorHandler { + override def onError(cause: Throwable): Unit = { + cause match { + case e: ConductorServiceTimeoutException ⇒ + // Timeout between service calls + log.error(cause, s"Aeron ServiceTimeoutException, ${cause.getMessage}") + + case _ ⇒ + log.error(cause, s"Aeron error, ${cause.getMessage}") + } + } + }) + + ctx.aeronDirectoryName(driver.aeronDirectoryName) + aeron = Aeron.connect(ctx) + } + + private def runInboundFlows(): Unit = { + replySubject = Source.fromGraph(new AeronSource(inboundChannel, systemMessageStreamId, aeron, taskRunner)) + .async // FIXME measure + .map(ByteString.apply) // TODO we should use ByteString all the way + .viaMat(inboundSystemMessageFlow)(Keep.right) + .to(Sink.ignore) + .run()(materializer) + + Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner)) + .async // FIXME measure + .map(ByteString.apply) // TODO we should use ByteString all the way + .via(inboundFlow) + .runWith(Sink.ignore)(materializer) + } + + override def shutdown(): Future[Done] = { + killSwitch.shutdown() + if (taskRunner != null) taskRunner.stop() + if (aeron != null) aeron.close() + if (driver != null) { + driver.close() + // FIXME only delete files for embedded media driver, and it should also be configurable + IoUtil.delete(new File(driver.aeronDirectoryName), true) + } + Future.successful(Done) + } + + // InboundContext + override def sendReply(to: Address, message: ControlMessage) = { + send(message, None, association(to).dummyRecipient) + } + + override def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { + val cached = recipient.cachedAssociation + val remoteAddress = recipient.path.address + + val a = + if (cached ne null) cached + else association(remoteAddress) + + a.send(message, senderOption, recipient) + } + + override def association(remoteAddress: Address): Association = { + val current = associations.get(remoteAddress) + if (current ne null) current + else { + associations.computeIfAbsent(remoteAddress, new JFunction[Address, Association] { + override def apply(remoteAddress: Address): Association = { + val newAssociation = new Association(ArteryTransport.this, materializer, remoteAddress, replySubject) + newAssociation.associate() // This is a bit costly for this blocking method :( + newAssociation + } + }) + } + } + + override def quarantine(remoteAddress: Address, uid: Option[Int]): Unit = { + ??? + } + + def outbound(outboundContext: OutboundContext): Sink[Send, Any] = { + Flow.fromGraph(killSwitch.flow[Send]) + .via(new OutboundHandshake(outboundContext)) + .via(encoder) + .map(_.toArray) // TODO we should use ByteString all the way + .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner)) + } + + def outboundSystemMessage(outboundContext: OutboundContext): Sink[Send, Any] = { + Flow.fromGraph(killSwitch.flow[Send]) + .via(new OutboundHandshake(outboundContext)) + .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval)) + .via(encoder) + .map(_.toArray) // TODO we should use ByteString all the way + .to(new AeronSink(outboundChannel(outboundContext.remoteAddress), systemMessageStreamId, aeron, taskRunner)) + } + + // TODO: Try out parallelized serialization (mapAsync) for performance + val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope ⇒ + val pdu: ByteString = codec.constructMessage( + sendEnvelope.recipient.localAddressToUse, + sendEnvelope.recipient, + Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress.address, system)) { + MessageSerializer.serialize(system, sendEnvelope.message.asInstanceOf[AnyRef]) + }, + sendEnvelope.senderOption, + seqOption = None, // FIXME: Acknowledgements will be handled differently I just reused the old codec + ackOption = None) + + // TODO: Drop unserializable messages + // TODO: Drop oversized messages + (new ByteStringBuilder).putInt(pdu.size)(ByteOrder.LITTLE_ENDIAN).result() ++ pdu + } + + val decoder: Flow[ByteString, AkkaPduCodec.Message, NotUsed] = + Framing.lengthField(4, maximumFrameLength = 256000) + .map { frame ⇒ + // TODO: Drop unserializable messages + val pdu = codec.decodeMessage(frame.drop(4), provider, localAddress.address)._2.get + pdu + } + + val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ + messageDispatcher.dispatch(m.recipient, m.recipientAddress, m.message, m.senderOption) + } + + val deserializer: Flow[AkkaPduCodec.Message, InboundEnvelope, NotUsed] = + Flow[AkkaPduCodec.Message].map { m ⇒ + InboundEnvelope( + m.recipient, + m.recipientAddress, + MessageSerializer.deserialize(system, m.serializedMessage), + m.senderOption) + } + + val inboundFlow: Flow[ByteString, ByteString, NotUsed] = { + Flow.fromSinkAndSource( + decoder + .via(deserializer) + .via(new InboundHandshake(this)) + .to(messageDispatcherSink), + Source.maybe[ByteString].via(killSwitch.flow)) + } + + val inboundSystemMessageFlow: Flow[ByteString, ByteString, ReplySubject] = { + Flow.fromSinkAndSourceMat( + decoder + .via(deserializer) + .via(new InboundHandshake(this)) + .via(new SystemMessageAcker(this)) + .viaMat(new ReplyJunction)(Keep.right) + .to(messageDispatcherSink), + Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) + } + +} + diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala new file mode 100644 index 0000000000..1e6b8df019 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.Future +import scala.concurrent.Promise + +import akka.actor.ActorRef +import akka.actor.Address + +import akka.actor.RootActorPath +import akka.dispatch.sysmsg.SystemMessage +import akka.remote.EndpointManager.Send +import akka.remote.RemoteActorRef +import akka.remote.UniqueAddress +import akka.remote.artery.ReplyJunction.ReplySubject +import akka.stream.Materializer +import akka.stream.OverflowStrategy +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.SourceQueueWithComplete + +/** + * INTERNAL API + * + * Thread-safe, mutable holder for association state. Main entry point for remote destined message to a specific + * remote address. + */ +private[akka] class Association( + val transport: ArteryTransport, + val materializer: Materializer, + override val remoteAddress: Address, + override val replySubject: ReplySubject) extends OutboundContext { + + @volatile private[this] var queue: SourceQueueWithComplete[Send] = _ + @volatile private[this] var systemMessageQueue: SourceQueueWithComplete[Send] = _ + + override def localAddress: UniqueAddress = transport.localAddress + + // FIXME we also need to be able to switch to new uid + private val _uniqueRemoteAddress = Promise[UniqueAddress]() + override def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future + override def completeRemoteAddress(a: UniqueAddress): Unit = { + require(a.address == remoteAddress, s"Wrong UniqueAddress got [$a.address], expected [$remoteAddress]") + _uniqueRemoteAddress.trySuccess(a) + } + + def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit = { + // TODO: lookup subchannel + // FIXME: Use a different envelope than the old Send, but make sure the new is handled by deadLetters properly + message match { + case _: SystemMessage | _: Reply ⇒ + implicit val ec = materializer.executionContext + systemMessageQueue.offer(Send(message, senderOption, recipient, None)).onFailure { + case e ⇒ + // FIXME proper error handling, and quarantining + println(s"# System message dropped, due to $e") // FIXME + } + case _ ⇒ + queue.offer(Send(message, senderOption, recipient, None)) + } + } + + // FIXME we should be able to Send without a recipient ActorRef + override val dummyRecipient: RemoteActorRef = + transport.provider.resolveActorRef(RootActorPath(remoteAddress) / "system" / "dummy").asInstanceOf[RemoteActorRef] + + def quarantine(uid: Option[Int]): Unit = () + + // Idempotent + def associate(): Unit = { + // FIXME detect and handle stream failure, e.g. handshake timeout + if (queue eq null) + queue = Source.queue(256, OverflowStrategy.dropBuffer) + .to(transport.outbound(this)).run()(materializer) + if (systemMessageQueue eq null) + systemMessageQueue = Source.queue(256, OverflowStrategy.dropBuffer) + .to(transport.outboundSystemMessage(this)).run()(materializer) + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala new file mode 100644 index 0000000000..6590f82967 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -0,0 +1,182 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.util.concurrent.TimeoutException +import scala.concurrent.duration._ +import akka.Done +import akka.remote.EndpointManager.Send +import akka.remote.UniqueAddress +import akka.remote.artery.ReplyJunction.ReplyObserver +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.stage.GraphStage +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler +import akka.stream.stage.TimerGraphStageLogic + +/** + * INTERNAL API + */ +private[akka] object OutboundHandshake { + // FIXME serialization for these messages + final case class HandshakeReq(from: UniqueAddress) extends ControlMessage + final case class HandshakeRsp(from: UniqueAddress) extends Reply + + private sealed trait HandshakeState + private case object Start extends HandshakeState + private case object ReplyObserverAttached extends HandshakeState + private case object ReqInProgress extends HandshakeState + private case object Completed extends HandshakeState + + private case object HandshakeTimeout + +} + +/** + * INTERNAL API + */ +private[akka] class OutboundHandshake(outboundContext: OutboundContext) extends GraphStage[FlowShape[Send, Send]] { + val in: Inlet[Send] = Inlet("OutboundHandshake.in") + val out: Outlet[Send] = Outlet("OutboundHandshake.out") + override val shape: FlowShape[Send, Send] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver { + import OutboundHandshake._ + + private val timeout: FiniteDuration = 10.seconds // FIXME config + private var handshakeState: HandshakeState = Start + + private def remoteAddress = outboundContext.remoteAddress + + override def preStart(): Unit = { + if (outboundContext.uniqueRemoteAddress.isCompleted) { + handshakeState = Completed + } else { + implicit val ec = materializer.executionContext + outboundContext.replySubject.attach(this).foreach { + getAsyncCallback[Done] { _ ⇒ + if (handshakeState != Completed) { + if (isAvailable(out)) + pushHandshakeReq() + else + handshakeState = ReplyObserverAttached + } + }.invoke + } + + outboundContext.uniqueRemoteAddress.foreach { + getAsyncCallback[UniqueAddress] { a ⇒ + if (handshakeState != Completed) { + handshakeCompleted() + if (isAvailable(out) && !hasBeenPulled(in)) + pull(in) + } + }.invoke + } + + scheduleOnce(HandshakeTimeout, timeout) + } + } + + override def postStop(): Unit = { + outboundContext.replySubject.detach(this) + } + + // InHandler + override def onPush(): Unit = { + if (handshakeState != Completed) + throw new IllegalStateException(s"onPush before handshake completed, was [$handshakeState]") + push(out, grab(in)) + } + + // OutHandler + override def onPull(): Unit = { + handshakeState match { + case Completed ⇒ pull(in) + case ReplyObserverAttached ⇒ + pushHandshakeReq() + case Start ⇒ // will push HandshakeReq when ReplyObserver is attached + case ReqInProgress ⇒ // will pull when handshake reply is received + } + } + + private def pushHandshakeReq(): Unit = { + handshakeState = ReqInProgress + // FIXME we should be able to Send without recipient ActorRef + push(out, Send(HandshakeReq(outboundContext.localAddress), None, outboundContext.dummyRecipient, None)) + } + + private def handshakeCompleted(): Unit = { + handshakeState = Completed + cancelTimer(HandshakeTimeout) + outboundContext.replySubject.detach(this) + } + + override protected def onTimer(timerKey: Any): Unit = + timerKey match { + case HandshakeTimeout ⇒ + failStage(new TimeoutException( + s"Handshake with [$remoteAddress] did not complete within ${timeout.toMillis} ms")) + } + + // ReplyObserver, external call + override def reply(inboundEnvelope: InboundEnvelope): Unit = { + inboundEnvelope.message match { + case rsp: HandshakeRsp ⇒ + if (rsp.from.address == remoteAddress) { + getAsyncCallback[HandshakeRsp] { reply ⇒ + if (handshakeState != Completed) { + handshakeCompleted() + outboundContext.completeRemoteAddress(reply.from) + if (isAvailable(out) && !hasBeenPulled(in)) + pull(in) + } + }.invoke(rsp) + } + case _ ⇒ // not interested + } + } + + setHandlers(in, out, this) + } + +} + +/** + * INTERNAL API + */ +private[akka] class InboundHandshake(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { + val in: Inlet[InboundEnvelope] = Inlet("InboundHandshake.in") + val out: Outlet[InboundEnvelope] = Outlet("InboundHandshake.out") + override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + import OutboundHandshake._ + + // InHandler + override def onPush(): Unit = { + grab(in) match { + case InboundEnvelope(_, _, HandshakeReq(from), _) ⇒ + inboundContext.association(from.address).completeRemoteAddress(from) + inboundContext.sendReply(from.address, HandshakeRsp(inboundContext.localAddress)) + pull(in) + case other ⇒ + push(out, other) + } + } + + // OutHandler + override def onPull(): Unit = pull(in) + + setHandlers(in, out, this) + + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala new file mode 100644 index 0000000000..3286729521 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/MessageDispatcher.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.actor.ActorRef +import akka.actor.ActorSelection +import akka.actor.ActorSelectionMessage +import akka.actor.Address +import akka.actor.ExtendedActorSystem +import akka.actor.InternalActorRef +import akka.actor.LocalRef +import akka.actor.PossiblyHarmful +import akka.actor.RepointableRef +import akka.dispatch.sysmsg.SystemMessage +import akka.event.Logging +import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteRef + +/** + * INTERNAL API + */ +private[akka] class MessageDispatcher( + system: ExtendedActorSystem, + provider: RemoteActorRefProvider) { + + private val remoteDaemon = provider.remoteDaemon + private val log = Logging(system.eventStream, getClass.getName) + + def dispatch(recipient: InternalActorRef, + recipientAddress: Address, + message: AnyRef, + senderOption: Option[ActorRef]): Unit = { + + import provider.remoteSettings._ + + val sender: ActorRef = senderOption.getOrElse(system.deadLetters) + val originalReceiver = recipient.path + + def msgLog = s"RemoteMessage: [$message] to [$recipient]<+[$originalReceiver] from [$sender()]" + + recipient match { + + case `remoteDaemon` ⇒ + if (UntrustedMode) log.debug("dropping daemon message in untrusted mode") + else { + if (LogReceive) log.debug("received daemon message {}", msgLog) + remoteDaemon ! message + } + + case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒ + if (LogReceive) log.debug("received local message {}", msgLog) + message match { + case sel: ActorSelectionMessage ⇒ + if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) || + sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian)) + log.debug("operating in UntrustedMode, dropping inbound actor selection to [{}], " + + "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration", + sel.elements.mkString("/", "/", "")) + else + // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor + ActorSelection.deliverSelection(l, sender, sel) + case msg: PossiblyHarmful if UntrustedMode ⇒ + log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName) + case msg: SystemMessage ⇒ l.sendSystemMessage(msg) + case msg ⇒ l.!(msg)(sender) + } + + case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒ + if (LogReceive) log.debug("received remote-destined message {}", msgLog) + if (provider.transport.addresses(recipientAddress)) + // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) + r.!(message)(sender) + else + log.error("dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]", + message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) + + case r ⇒ log.error("dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", + message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) + + } + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/Reply.scala b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala new file mode 100644 index 0000000000..676425894e --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/Reply.scala @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import scala.concurrent.Future +import scala.concurrent.Promise +import akka.Done +import akka.stream.Attributes +import akka.stream.FlowShape +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.stage.GraphStageLogic +import akka.stream.stage.GraphStageWithMaterializedValue +import akka.stream.stage.InHandler +import akka.stream.stage.OutHandler + +/** + * Marker trait for reply messages + */ +trait Reply extends ControlMessage + +/** + * Marker trait for control messages that can be sent via the system message sub-channel + * but don't need full reliable delivery. E.g. `HandshakeReq` and `Reply`. + */ +trait ControlMessage + +/** + * INTERNAL API + */ +private[akka] object ReplyJunction { + + private[akka] trait ReplySubject { + def attach(observer: ReplyObserver): Future[Done] + def detach(observer: ReplyObserver): Unit + def stopped: Future[Done] + } + + private[akka] trait ReplyObserver { + def reply(inboundEnvelope: InboundEnvelope): Unit + } +} + +/** + * INTERNAL API + */ +private[akka] class ReplyJunction + extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], ReplyJunction.ReplySubject] { + import ReplyJunction._ + + val in: Inlet[InboundEnvelope] = Inlet("ReplyJunction.in") + val out: Outlet[InboundEnvelope] = Outlet("ReplyJunction.out") + override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { + val logic = new GraphStageLogic(shape) with InHandler with OutHandler with ReplySubject { + + private var replyObservers: Vector[ReplyObserver] = Vector.empty + private val stoppedPromise = Promise[Done]() + + override def postStop(): Unit = stoppedPromise.success(Done) + + // InHandler + override def onPush(): Unit = { + grab(in) match { + case env @ InboundEnvelope(_, _, reply: Reply, _) ⇒ + replyObservers.foreach(_.reply(env)) + pull(in) + case env ⇒ + push(out, env) + } + } + + // OutHandler + override def onPull(): Unit = pull(in) + + override def attach(observer: ReplyObserver): Future[Done] = { + val p = Promise[Done]() + getAsyncCallback[Unit](_ ⇒ { + replyObservers :+= observer + p.success(Done) + }).invoke(()) + p.future + } + + override def detach(observer: ReplyObserver): Unit = { + replyObservers = replyObservers.filterNot(_ == observer) + } + + override def stopped: Future[Done] = stoppedPromise.future + + setHandlers(in, out, this) + } + (logic, logic) + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index c8e3036e10..271f4a629a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -6,26 +6,21 @@ package akka.remote.artery import java.util.ArrayDeque import scala.annotation.tailrec -import scala.concurrent.Future -import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try import akka.Done -import akka.actor.ActorRef -import akka.actor.Address import akka.remote.EndpointManager.Send -import akka.remote.artery.Transport.InboundEnvelope +import akka.remote.UniqueAddress +import akka.remote.artery.ReplyJunction.ReplyObserver import akka.stream.Attributes import akka.stream.FlowShape import akka.stream.Inlet import akka.stream.Outlet -import akka.stream.stage.AsyncCallback import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic -import akka.stream.stage.GraphStageWithMaterializedValue import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import akka.stream.stage.TimerGraphStageLogic @@ -35,10 +30,10 @@ import akka.stream.stage.TimerGraphStageLogic */ private[akka] object SystemMessageDelivery { // FIXME serialization of these messages - final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: ActorRef) - sealed trait SystemMessageReply - final case class Ack(seq: Long, from: Address) extends SystemMessageReply - final case class Nack(seq: Long, from: Address) extends SystemMessageReply + // FIXME ackReplyTo should not be needed + final case class SystemMessageEnvelope(message: AnyRef, seqNo: Long, ackReplyTo: UniqueAddress) + final case class Ack(seqNo: Long, from: UniqueAddress) extends Reply + final case class Nack(seqNo: Long, from: UniqueAddress) extends Reply private case object ResendTick } @@ -47,49 +42,42 @@ private[akka] object SystemMessageDelivery { * INTERNAL API */ private[akka] class SystemMessageDelivery( - replyJunction: SystemMessageReplyJunction.Junction, - resendInterval: FiniteDuration, - localAddress: Address, - remoteAddress: Address, - ackRecipient: ActorRef) + outboundContext: OutboundContext, + resendInterval: FiniteDuration) extends GraphStage[FlowShape[Send, Send]] { import SystemMessageDelivery._ - import SystemMessageReplyJunction._ val in: Inlet[Send] = Inlet("SystemMessageDelivery.in") val out: Outlet[Send] = Outlet("SystemMessageDelivery.out") override val shape: FlowShape[Send, Send] = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler { + new TimerGraphStageLogic(shape) with InHandler with OutHandler with ReplyObserver { - var registered = false - var seqNo = 0L // sequence number for the first message will be 1 - val unacknowledged = new ArrayDeque[Send] - var resending = new ArrayDeque[Send] - var resendingFromSeqNo = -1L - var stopping = false + private var replyObserverAttached = false + private var seqNo = 0L // sequence number for the first message will be 1 + private val unacknowledged = new ArrayDeque[Send] + private var resending = new ArrayDeque[Send] + private var resendingFromSeqNo = -1L + private var stopping = false + + private def localAddress = outboundContext.localAddress + private def remoteAddress = outboundContext.remoteAddress override def preStart(): Unit = { this.schedulePeriodically(ResendTick, resendInterval) - def filter(env: InboundEnvelope): Boolean = - env.message match { - case Ack(_, from) if from == remoteAddress ⇒ true - case Nack(_, from) if from == remoteAddress ⇒ true - case _ ⇒ false - } implicit val ec = materializer.executionContext - replyJunction.addReplyInterest(filter, ackCallback).foreach { + outboundContext.replySubject.attach(this).foreach { getAsyncCallback[Done] { _ ⇒ - registered = true + replyObserverAttached = true if (isAvailable(out)) pull(in) // onPull from downstream already called }.invoke } - replyJunction.stopped.onComplete { + outboundContext.replySubject.stopped.onComplete { getAsyncCallback[Try[Done]] { // FIXME quarantine case Success(_) ⇒ completeStage() @@ -99,7 +87,7 @@ private[akka] class SystemMessageDelivery( } override def postStop(): Unit = { - replyJunction.removeReplyInterest(ackCallback) + outboundContext.replySubject.detach(this) } override def onUpstreamFinish(): Unit = { @@ -118,18 +106,26 @@ private[akka] class SystemMessageDelivery( } } - val ackCallback = getAsyncCallback[SystemMessageReply] { reply ⇒ - reply match { - case Ack(n, _) ⇒ - ack(n) - case Nack(n, _) ⇒ - ack(n) - if (n > resendingFromSeqNo) - resending = unacknowledged.clone() - tryResend() + // ReplyObserver, external call + override def reply(inboundEnvelope: InboundEnvelope): Unit = { + inboundEnvelope.message match { + case ack: Ack ⇒ if (ack.from.address == remoteAddress) ackCallback.invoke(ack) + case nack: Nack ⇒ if (nack.from.address == remoteAddress) nackCallback.invoke(nack) + case _ ⇒ // not interested } } + private val ackCallback = getAsyncCallback[Ack] { reply ⇒ + ack(reply.seqNo) + } + + private val nackCallback = getAsyncCallback[Nack] { reply ⇒ + ack(reply.seqNo) + if (reply.seqNo > resendingFromSeqNo) + resending = unacknowledged.clone() + tryResend() + } + private def ack(n: Long): Unit = { if (n > seqNo) throw new IllegalArgumentException(s"Unexpected ack $n, when highest sent seqNo is $seqNo") @@ -155,7 +151,7 @@ private[akka] class SystemMessageDelivery( // InHandler override def onPush(): Unit = { grab(in) match { - case s @ Send(reply: SystemMessageReply, _, _, _) ⇒ + case s @ Send(reply: ControlMessage, _, _, _) ⇒ // pass through if (isAvailable(out)) push(out, s) @@ -166,7 +162,7 @@ private[akka] class SystemMessageDelivery( case s @ Send(msg: AnyRef, _, _, _) ⇒ seqNo += 1 - val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, ackRecipient)) + val sendMsg = s.copy(message = SystemMessageEnvelope(msg, seqNo, localAddress)) // FIXME quarantine if unacknowledged is full unacknowledged.offer(sendMsg) if (resending.isEmpty && isAvailable(out)) @@ -180,7 +176,7 @@ private[akka] class SystemMessageDelivery( // OutHandler override def onPull(): Unit = { - if (registered) { // otherwise it will be pulled after replyJunction.addReplyInterest + if (replyObserverAttached) { // otherwise it will be pulled after attached if (resending.isEmpty && !hasBeenPulled(in) && !stopping) pull(in) else @@ -195,7 +191,7 @@ private[akka] class SystemMessageDelivery( /** * INTERNAL API */ -private[akka] class SystemMessageAcker(localAddress: Address) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { +private[akka] class SystemMessageAcker(inboundContext: InboundContext) extends GraphStage[FlowShape[InboundEnvelope, InboundEnvelope]] { import SystemMessageDelivery._ val in: Inlet[InboundEnvelope] = Inlet("SystemMessageAcker.in") @@ -207,20 +203,22 @@ private[akka] class SystemMessageAcker(localAddress: Address) extends GraphStage var seqNo = 1L + def localAddress = inboundContext.localAddress + // InHandler override def onPush(): Unit = { grab(in) match { case env @ InboundEnvelope(_, _, sysEnv @ SystemMessageEnvelope(_, n, ackReplyTo), _) ⇒ if (n == seqNo) { - ackReplyTo.tell(Ack(n, localAddress), ActorRef.noSender) + inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress)) seqNo += 1 val unwrapped = env.copy(message = sysEnv.message) push(out, unwrapped) } else if (n < seqNo) { - ackReplyTo.tell(Ack(n, localAddress), ActorRef.noSender) + inboundContext.sendReply(ackReplyTo.address, Ack(n, localAddress)) pull(in) } else { - ackReplyTo.tell(Nack(seqNo - 1, localAddress), ActorRef.noSender) + inboundContext.sendReply(ackReplyTo.address, Nack(seqNo - 1, localAddress)) pull(in) } case env ⇒ @@ -237,74 +235,3 @@ private[akka] class SystemMessageAcker(localAddress: Address) extends GraphStage } } -/** - * INTERNAL API - */ -private[akka] object SystemMessageReplyJunction { - import SystemMessageDelivery._ - - trait Junction { - def addReplyInterest(filter: InboundEnvelope ⇒ Boolean, replyCallback: AsyncCallback[SystemMessageReply]): Future[Done] - def removeReplyInterest(callback: AsyncCallback[SystemMessageReply]): Unit - def stopped: Future[Done] - } -} - -/** - * INTERNAL API - */ -private[akka] class SystemMessageReplyJunction - extends GraphStageWithMaterializedValue[FlowShape[InboundEnvelope, InboundEnvelope], SystemMessageReplyJunction.Junction] { - import SystemMessageReplyJunction._ - import SystemMessageDelivery._ - - val in: Inlet[InboundEnvelope] = Inlet("SystemMessageReplyJunction.in") - val out: Outlet[InboundEnvelope] = Outlet("SystemMessageReplyJunction.out") - override val shape: FlowShape[InboundEnvelope, InboundEnvelope] = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val logic = new GraphStageLogic(shape) with InHandler with OutHandler with Junction { - - private var replyHandlers: Vector[(InboundEnvelope ⇒ Boolean, AsyncCallback[SystemMessageReply])] = Vector.empty - private val stoppedPromise = Promise[Done]() - - override def postStop(): Unit = stoppedPromise.success(Done) - - // InHandler - override def onPush(): Unit = { - grab(in) match { - case env @ InboundEnvelope(_, _, reply: SystemMessageReply, _) ⇒ - replyHandlers.foreach { - case (f, callback) ⇒ - if (f(env)) - callback.invoke(reply) - } - pull(in) - case env ⇒ - push(out, env) - } - } - - // OutHandler - override def onPull(): Unit = pull(in) - - override def addReplyInterest(filter: InboundEnvelope ⇒ Boolean, replyCallback: AsyncCallback[SystemMessageReply]): Future[Done] = { - val p = Promise[Done]() - getAsyncCallback[Unit](_ ⇒ { - replyHandlers :+= (filter -> replyCallback) - p.success(Done) - }).invoke(()) - p.future - } - - override def removeReplyInterest(callback: AsyncCallback[SystemMessageReply]): Unit = { - replyHandlers = replyHandlers.filterNot { case (_, c) ⇒ c == callback } - } - - override def stopped: Future[Done] = stoppedPromise.future - - setHandlers(in, out, this) - } - (logic, logic) - } -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala b/akka-remote/src/main/scala/akka/remote/artery/Transport.scala deleted file mode 100644 index fb9563def7..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/Transport.scala +++ /dev/null @@ -1,281 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ - -package akka.remote.artery - -import scala.concurrent.duration._ -import akka.actor.Props -import scala.concurrent.duration._ -import java.net.InetSocketAddress -import java.nio.ByteOrder -import akka.NotUsed -import akka.actor.{ Address, ExtendedActorSystem } -import akka.remote.EndpointManager.Send -import akka.remote.{ InboundMessageDispatcher, MessageSerializer, RemoteActorRefProvider } -import akka.remote.transport.AkkaPduCodec -import akka.serialization.Serialization -import akka.stream.{ KillSwitches, SharedKillSwitch } -import akka.stream.scaladsl.{ Flow, Framing, Sink, Source, Tcp } -import akka.util.{ ByteString, ByteStringBuilder } -import scala.concurrent.Future -import akka.Done -import akka.stream.Materializer -import scala.concurrent.Await -import akka.event.LoggingAdapter -import akka.event.Logging -import io.aeron.driver.MediaDriver -import io.aeron.Aeron -import org.agrona.ErrorHandler -import io.aeron.AvailableImageHandler -import io.aeron.Image -import io.aeron.UnavailableImageHandler -import io.aeron.exceptions.ConductorServiceTimeoutException -import akka.actor.LocalRef -import akka.actor.InternalActorRef -import akka.dispatch.sysmsg.SystemMessage -import akka.actor.PossiblyHarmful -import akka.actor.RepointableRef -import akka.actor.ActorSelectionMessage -import akka.remote.RemoteRef -import akka.actor.ActorSelection -import akka.actor.ActorRef -import akka.stream.scaladsl.Keep - -/** - * INTERNAL API - */ -private[akka] object Transport { - // FIXME avoid allocating this envelope? - final case class InboundEnvelope( - recipient: InternalActorRef, - recipientAddress: Address, - message: AnyRef, - senderOption: Option[ActorRef]) -} - -/** - * INTERNAL API - */ -// FIXME: Replace the codec with a custom made, hi-perf one -private[akka] class Transport( - val localAddress: Address, - val system: ExtendedActorSystem, - val materializer: Materializer, - val provider: RemoteActorRefProvider, - val codec: AkkaPduCodec) { - import Transport._ - - private val log: LoggingAdapter = Logging(system.eventStream, getClass.getName) - private val remoteDaemon = provider.remoteDaemon - - private implicit val mat = materializer - // TODO support port 0 - private val inboundChannel = s"aeron:udp?endpoint=${localAddress.host.get}:${localAddress.port.get}" - private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - private val systemMessageStreamId = 1 - private val ordinaryStreamId = 3 - - private val systemMessageResendInterval: FiniteDuration = 1.second // FIXME config - - private var systemMessageReplyJunction: SystemMessageReplyJunction.Junction = _ - - // Need an ActorRef that is passed in the `SystemMessageEnvelope.ackReplyTo`. - // Those messages are not actually handled by this actor, but intercepted by the - // SystemMessageReplyJunction stage. - private val systemMessageReplyRecepient = system.systemActorOf(Props.empty, "systemMessageReplyTo") - - private val driver = { - // TODO also support external media driver - val driverContext = new MediaDriver.Context - // FIXME settings from config - driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10)) - driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10)) - driverContext.driverTimeoutMs(SECONDS.toNanos(10)) - MediaDriver.launchEmbedded(driverContext) - } - - private val aeron = { - val ctx = new Aeron.Context - - ctx.availableImageHandler(new AvailableImageHandler { - override def onAvailableImage(img: Image): Unit = { - if (log.isDebugEnabled) - log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}") - } - }) - ctx.unavailableImageHandler(new UnavailableImageHandler { - override def onUnavailableImage(img: Image): Unit = { - if (log.isDebugEnabled) - log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}") - // FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable - } - }) - ctx.errorHandler(new ErrorHandler { - override def onError(cause: Throwable): Unit = { - cause match { - case e: ConductorServiceTimeoutException ⇒ - // Timeout between service calls - log.error(cause, s"Aeron ServiceTimeoutException, ${cause.getMessage}") - - case _ ⇒ - log.error(cause, s"Aeron error, ${cause.getMessage}") - } - } - }) - - ctx.aeronDirectoryName(driver.aeronDirectoryName) - Aeron.connect(ctx) - } - - private val taskRunner = new TaskRunner(system) - - def start(): Unit = { - taskRunner.start() - systemMessageReplyJunction = Source.fromGraph(new AeronSource(inboundChannel, systemMessageStreamId, aeron, taskRunner)) - .async // FIXME use dedicated dispatcher for AeronSource - .map(ByteString.apply) // TODO we should use ByteString all the way - .viaMat(inboundSystemMessageFlow)(Keep.right) - .to(Sink.ignore) - .run() - Source.fromGraph(new AeronSource(inboundChannel, ordinaryStreamId, aeron, taskRunner)) - .async // FIXME use dedicated dispatcher for AeronSource - .map(ByteString.apply) // TODO we should use ByteString all the way - .via(inboundFlow) - .runWith(Sink.ignore) - } - - def shutdown(): Future[Done] = { - // FIXME stop the AeronSource first? - taskRunner.stop() - aeron.close() - driver.close() - Future.successful(Done) - } - - val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") - - def outbound(remoteAddress: Address): Sink[Send, Any] = { - Flow.fromGraph(killSwitch.flow[Send]) - .via(encoder) - .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel(remoteAddress), ordinaryStreamId, aeron, taskRunner)) - } - - def outboundSystemMessage(remoteAddress: Address): Sink[Send, Any] = { - Flow.fromGraph(killSwitch.flow[Send]) - .via(new SystemMessageDelivery(systemMessageReplyJunction, systemMessageResendInterval, - localAddress, remoteAddress, systemMessageReplyRecepient)) - .via(encoder) - .map(_.toArray) // TODO we should use ByteString all the way - .to(new AeronSink(outboundChannel(remoteAddress), systemMessageStreamId, aeron, taskRunner)) - } - - // TODO: Try out parallelized serialization (mapAsync) for performance - val encoder: Flow[Send, ByteString, NotUsed] = Flow[Send].map { sendEnvelope ⇒ - val pdu: ByteString = codec.constructMessage( - sendEnvelope.recipient.localAddressToUse, - sendEnvelope.recipient, - Serialization.currentTransportInformation.withValue(Serialization.Information(localAddress, system)) { - MessageSerializer.serialize(system, sendEnvelope.message.asInstanceOf[AnyRef]) - }, - sendEnvelope.senderOption, - seqOption = None, // FIXME: Acknowledgements will be handled differently I just reused the old codec - ackOption = None) - - // TODO: Drop unserializable messages - // TODO: Drop oversized messages - (new ByteStringBuilder).putInt(pdu.size)(ByteOrder.LITTLE_ENDIAN).result() ++ pdu - } - - val decoder: Flow[ByteString, AkkaPduCodec.Message, NotUsed] = - Framing.lengthField(4, maximumFrameLength = 256000) - .map { frame ⇒ - // TODO: Drop unserializable messages - val pdu = codec.decodeMessage(frame.drop(4), provider, localAddress)._2.get - pdu - } - - val messageDispatcher: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ - dispatchInboundMessage(m.recipient, m.recipientAddress, m.message, m.senderOption) - } - - val deserializer: Flow[AkkaPduCodec.Message, InboundEnvelope, NotUsed] = - Flow[AkkaPduCodec.Message].map { m ⇒ - InboundEnvelope( - m.recipient, - m.recipientAddress, - MessageSerializer.deserialize(system, m.serializedMessage), - m.senderOption) - } - - val inboundFlow: Flow[ByteString, ByteString, NotUsed] = { - Flow.fromSinkAndSource( - decoder.via(deserializer).to(messageDispatcher), - Source.maybe[ByteString].via(killSwitch.flow)) - } - - val inboundSystemMessageFlow: Flow[ByteString, ByteString, SystemMessageReplyJunction.Junction] = { - Flow.fromSinkAndSourceMat( - decoder.via(deserializer) - .via(new SystemMessageAcker(localAddress)) - .viaMat(new SystemMessageReplyJunction)(Keep.right) - .to(messageDispatcher), - Source.maybe[ByteString].via(killSwitch.flow))((a, b) ⇒ a) - } - - private def dispatchInboundMessage(recipient: InternalActorRef, - recipientAddress: Address, - message: AnyRef, - senderOption: Option[ActorRef]): Unit = { - - import provider.remoteSettings._ - - val sender: ActorRef = senderOption.getOrElse(system.deadLetters) - val originalReceiver = recipient.path - - def msgLog = s"RemoteMessage: [$message] to [$recipient]<+[$originalReceiver] from [$sender()]" - - recipient match { - - case `remoteDaemon` ⇒ - if (UntrustedMode) log.debug("dropping daemon message in untrusted mode") - else { - if (LogReceive) log.debug("received daemon message {}", msgLog) - remoteDaemon ! message - } - - case l @ (_: LocalRef | _: RepointableRef) if l.isLocal ⇒ - if (LogReceive) log.debug("received local message {}", msgLog) - message match { - case sel: ActorSelectionMessage ⇒ - if (UntrustedMode && (!TrustedSelectionPaths.contains(sel.elements.mkString("/", "/", "")) || - sel.msg.isInstanceOf[PossiblyHarmful] || l != provider.rootGuardian)) - log.debug("operating in UntrustedMode, dropping inbound actor selection to [{}], " + - "allow it by adding the path to 'akka.remote.trusted-selection-paths' configuration", - sel.elements.mkString("/", "/", "")) - else - // run the receive logic for ActorSelectionMessage here to make sure it is not stuck on busy user actor - ActorSelection.deliverSelection(l, sender, sel) - case msg: PossiblyHarmful if UntrustedMode ⇒ - log.debug("operating in UntrustedMode, dropping inbound PossiblyHarmful message of type [{}]", msg.getClass.getName) - case msg: SystemMessage ⇒ l.sendSystemMessage(msg) - case msg ⇒ l.!(msg)(sender) - } - - case r @ (_: RemoteRef | _: RepointableRef) if !r.isLocal && !UntrustedMode ⇒ - if (LogReceive) log.debug("received remote-destined message {}", msgLog) - if (provider.transport.addresses(recipientAddress)) - // if it was originally addressed to us but is in fact remote from our point of view (i.e. remote-deployed) - r.!(message)(sender) - else - log.error("dropping message [{}] for non-local recipient [{}] arriving at [{}] inbound addresses are [{}]", - message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) - - case r ⇒ log.error("dropping message [{}] for unknown recipient [{}] arriving at [{}] inbound addresses are [{}]", - message.getClass, r, recipientAddress, provider.transport.addresses.mkString(", ")) - - } - } - -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala index a95802d69b..8614aaf2b8 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SystemMessageDeliverySpec.scala @@ -3,37 +3,32 @@ */ package akka.remote.artery -import scala.concurrent.Await -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.concurrent.duration._ -import scala.concurrent.forkjoin.ThreadLocalRandom +import java.util.concurrent.ThreadLocalRandom + +import scala.concurrent.Await +import scala.concurrent.duration._ -import akka.Done import akka.NotUsed -import akka.actor.Actor import akka.actor.ActorIdentity import akka.actor.ActorRef import akka.actor.ActorSystem +import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.actor.Identify import akka.actor.InternalActorRef import akka.actor.PoisonPill -import akka.actor.Props import akka.actor.RootActorPath -import akka.actor.Stash +import akka.remote.AddressUidExtension import akka.remote.EndpointManager.Send import akka.remote.RemoteActorRef +import akka.remote.UniqueAddress import akka.remote.artery.SystemMessageDelivery._ -import akka.remote.artery.Transport.InboundEnvelope import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.ThrottleMode import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import akka.stream.stage.AsyncCallback -import akka.stream.testkit.TestSubscriber import akka.stream.testkit.scaladsl.TestSink import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender @@ -60,86 +55,60 @@ object SystemMessageDeliverySpec { val configB = ConfigFactory.parseString(s"akka.remote.artery.port = $portB") .withFallback(commonConfig) - class TestReplyJunction(sendCallbackTo: ActorRef) extends SystemMessageReplyJunction.Junction { + class ManualReplyInboundContext( + replyProbe: ActorRef, + localAddress: UniqueAddress, + replySubject: TestReplySubject) extends TestInboundContext(localAddress, replySubject) { - def addReplyInterest(filter: InboundEnvelope ⇒ Boolean, replyCallback: AsyncCallback[SystemMessageReply]): Future[Done] = { - sendCallbackTo ! replyCallback - Future.successful(Done) + private var lastReply: Option[(Address, ControlMessage)] = None + + override def sendReply(to: Address, message: ControlMessage) = { + lastReply = Some((to, message)) + replyProbe ! message } - override def removeReplyInterest(callback: AsyncCallback[SystemMessageReply]): Unit = () - - override def stopped: Future[Done] = Promise[Done]().future - } - - def replyConnectorProps(dropRate: Double): Props = - Props(new ReplyConnector(dropRate)) - - class ReplyConnector(dropRate: Double) extends Actor with Stash { - override def receive = { - case callback: AsyncCallback[SystemMessageReply] @unchecked ⇒ - context.become(active(callback)) - unstashAll() - case _ ⇒ stash() - } - - def active(callback: AsyncCallback[SystemMessageReply]): Receive = { - case reply: SystemMessageReply ⇒ - if (ThreadLocalRandom.current().nextDouble() >= dropRate) - callback.invoke(reply) + def deliverLastReply(): Unit = { + lastReply.foreach { case (to, message) ⇒ super.sendReply(to, message) } + lastReply = None } } - } class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commonConfig) with ImplicitSender { import SystemMessageDeliverySpec._ - val addressA = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + val addressA = UniqueAddress( + system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress, + AddressUidExtension(system).addressUid) val systemB = ActorSystem("systemB", configB) - val addressB = systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - val rootB = RootActorPath(addressB) + val addressB = UniqueAddress( + systemB.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress, + AddressUidExtension(systemB).addressUid) + val rootB = RootActorPath(addressB.address) val matSettings = ActorMaterializerSettings(system).withFuzzing(true) implicit val mat = ActorMaterializer(matSettings)(system) override def afterTermination(): Unit = shutdown(systemB) - def setupManualCallback(ackRecipient: ActorRef, resendInterval: FiniteDuration, - dropSeqNumbers: Vector[Long], sendCount: Int): (TestSubscriber.Probe[String], AsyncCallback[SystemMessageReply]) = { - val callbackProbe = TestProbe() - val replyJunction = new TestReplyJunction(callbackProbe.ref) - - val sink = - send(sendCount, resendInterval, replyJunction, ackRecipient) - .via(drop(dropSeqNumbers)) - .via(inbound) - .map(_.message.asInstanceOf[String]) - .runWith(TestSink.probe) - - val callback = callbackProbe.expectMsgType[AsyncCallback[SystemMessageReply]] - (sink, callback) - } - - def send(sendCount: Int, resendInterval: FiniteDuration, replyJunction: SystemMessageReplyJunction.Junction, - ackRecipient: ActorRef): Source[Send, NotUsed] = { + private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[Send, NotUsed] = { val remoteRef = null.asInstanceOf[RemoteActorRef] // not used Source(1 to sendCount) .map(n ⇒ Send("msg-" + n, None, remoteRef, None)) - .via(new SystemMessageDelivery(replyJunction, resendInterval, addressA, addressB, ackRecipient)) + .via(new SystemMessageDelivery(outboundContext, resendInterval)) } - def inbound: Flow[Send, InboundEnvelope, NotUsed] = { + private def inbound(inboundContext: InboundContext): Flow[Send, InboundEnvelope, NotUsed] = { val recipient = null.asInstanceOf[InternalActorRef] // not used Flow[Send] .map { case Send(sysEnv: SystemMessageEnvelope, _, _, _) ⇒ - InboundEnvelope(recipient, addressB, sysEnv, None) + InboundEnvelope(recipient, addressB.address, sysEnv, None) } .async - .via(new SystemMessageAcker(addressB)) + .via(new SystemMessageAcker(inboundContext)) } - def drop(dropSeqNumbers: Vector[Long]): Flow[Send, Send, NotUsed] = { + private def drop(dropSeqNumbers: Vector[Long]): Flow[Send, Send, NotUsed] = { Flow[Send] .statefulMapConcat(() ⇒ { var dropping = dropSeqNumbers @@ -156,7 +125,7 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo }) } - def randomDrop[T](dropRate: Double): Flow[T, T, NotUsed] = Flow[T].mapConcat { elem ⇒ + private def randomDrop[T](dropRate: Double): Flow[T, T, NotUsed] = Flow[T].mapConcat { elem ⇒ if (ThreadLocalRandom.current().nextDouble() < dropRate) Nil else List(elem) } @@ -177,83 +146,108 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo } "be resent when some in the middle are lost" in { - val ackRecipient = TestProbe() - val (sink, replyCallback) = - setupManualCallback(ackRecipient.ref, resendInterval = 60.seconds, dropSeqNumbers = Vector(3L, 4L), sendCount = 5) + val replyProbe = TestProbe() + val replySubject = new TestReplySubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) + val inboundContextA = new TestInboundContext(addressB, replySubject) + val outboundContextA = inboundContextA.association(addressB.address) + + val sink = send(sendCount = 5, resendInterval = 60.seconds, outboundContextA) + .via(drop(dropSeqNumbers = Vector(3L, 4L))) + .via(inbound(inboundContextB)) + .map(_.message.asInstanceOf[String]) + .runWith(TestSink.probe) sink.request(100) sink.expectNext("msg-1") sink.expectNext("msg-2") - ackRecipient.expectMsg(Ack(1L, addressB)) - ackRecipient.expectMsg(Ack(2L, addressB)) + replyProbe.expectMsg(Ack(1L, addressB)) + replyProbe.expectMsg(Ack(2L, addressB)) // 3 and 4 was dropped - ackRecipient.expectMsg(Nack(2L, addressB)) + replyProbe.expectMsg(Nack(2L, addressB)) sink.expectNoMsg(100.millis) // 3 was dropped - replyCallback.invoke(Nack(2L, addressB)) + inboundContextB.deliverLastReply() // resending 3, 4, 5 sink.expectNext("msg-3") - ackRecipient.expectMsg(Ack(3L, addressB)) + replyProbe.expectMsg(Ack(3L, addressB)) sink.expectNext("msg-4") - ackRecipient.expectMsg(Ack(4L, addressB)) + replyProbe.expectMsg(Ack(4L, addressB)) sink.expectNext("msg-5") - ackRecipient.expectMsg(Ack(5L, addressB)) - ackRecipient.expectNoMsg(100.millis) - replyCallback.invoke(Ack(5L, addressB)) + replyProbe.expectMsg(Ack(5L, addressB)) + replyProbe.expectNoMsg(100.millis) + inboundContextB.deliverLastReply() sink.expectComplete() } "be resent when first is lost" in { - val ackRecipient = TestProbe() - val (sink, replyCallback) = - setupManualCallback(ackRecipient.ref, resendInterval = 60.seconds, dropSeqNumbers = Vector(1L), sendCount = 3) + val replyProbe = TestProbe() + val replySubject = new TestReplySubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) + val inboundContextA = new TestInboundContext(addressB, replySubject) + val outboundContextA = inboundContextA.association(addressB.address) + + val sink = send(sendCount = 3, resendInterval = 60.seconds, outboundContextA) + .via(drop(dropSeqNumbers = Vector(1L))) + .via(inbound(inboundContextB)) + .map(_.message.asInstanceOf[String]) + .runWith(TestSink.probe) sink.request(100) - ackRecipient.expectMsg(Nack(0L, addressB)) // from receiving 2 - ackRecipient.expectMsg(Nack(0L, addressB)) // from receiving 3 + replyProbe.expectMsg(Nack(0L, addressB)) // from receiving 2 + replyProbe.expectMsg(Nack(0L, addressB)) // from receiving 3 sink.expectNoMsg(100.millis) // 1 was dropped - replyCallback.invoke(Nack(0L, addressB)) - replyCallback.invoke(Nack(0L, addressB)) + inboundContextB.deliverLastReply() // it's ok to not delivery all nacks // resending 1, 2, 3 sink.expectNext("msg-1") - ackRecipient.expectMsg(Ack(1L, addressB)) + replyProbe.expectMsg(Ack(1L, addressB)) sink.expectNext("msg-2") - ackRecipient.expectMsg(Ack(2L, addressB)) + replyProbe.expectMsg(Ack(2L, addressB)) sink.expectNext("msg-3") - ackRecipient.expectMsg(Ack(3L, addressB)) - replyCallback.invoke(Ack(3L, addressB)) + replyProbe.expectMsg(Ack(3L, addressB)) + inboundContextB.deliverLastReply() sink.expectComplete() } "be resent when last is lost" in { - val ackRecipient = TestProbe() - val (sink, replyCallback) = - setupManualCallback(ackRecipient.ref, resendInterval = 1.second, dropSeqNumbers = Vector(3L), sendCount = 3) + val replyProbe = TestProbe() + val replySubject = new TestReplySubject + val inboundContextB = new ManualReplyInboundContext(replyProbe.ref, addressB, replySubject) + val inboundContextA = new TestInboundContext(addressB, replySubject) + val outboundContextA = inboundContextA.association(addressB.address) + + val sink = send(sendCount = 3, resendInterval = 1.seconds, outboundContextA) + .via(drop(dropSeqNumbers = Vector(3L))) + .via(inbound(inboundContextB)) + .map(_.message.asInstanceOf[String]) + .runWith(TestSink.probe) sink.request(100) sink.expectNext("msg-1") - ackRecipient.expectMsg(Ack(1L, addressB)) - replyCallback.invoke(Ack(1L, addressB)) + replyProbe.expectMsg(Ack(1L, addressB)) + inboundContextB.deliverLastReply() sink.expectNext("msg-2") - ackRecipient.expectMsg(Ack(2L, addressB)) - replyCallback.invoke(Ack(2L, addressB)) + replyProbe.expectMsg(Ack(2L, addressB)) + inboundContextB.deliverLastReply() sink.expectNoMsg(200.millis) // 3 was dropped // resending 3 due to timeout sink.expectNext("msg-3") - ackRecipient.expectMsg(Ack(3L, addressB)) - replyCallback.invoke(Ack(3L, addressB)) + replyProbe.expectMsg(Ack(3L, addressB)) + inboundContextB.deliverLastReply() sink.expectComplete() } "deliver all during stress and random dropping" in { val N = 10000 val dropRate = 0.1 - val replyConnector = system.actorOf(replyConnectorProps(dropRate)) - val replyJunction = new TestReplyJunction(replyConnector) + val replySubject = new TestReplySubject + val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate) + val inboundContextA = new TestInboundContext(addressB, replySubject) + val outboundContextA = inboundContextA.association(addressB.address) val output = - send(N, 1.second, replyJunction, replyConnector) + send(N, 1.second, outboundContextA) .via(randomDrop(dropRate)) - .via(inbound) + .via(inbound(inboundContextB)) .map(_.message.asInstanceOf[String]) .runWith(Sink.seq) @@ -263,14 +257,16 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.commo "deliver all during throttling and random dropping" in { val N = 500 val dropRate = 0.1 - val replyConnector = system.actorOf(replyConnectorProps(dropRate)) - val replyJunction = new TestReplyJunction(replyConnector) + val replySubject = new TestReplySubject + val inboundContextB = new TestInboundContext(addressB, replySubject, replyDropRate = dropRate) + val inboundContextA = new TestInboundContext(addressB, replySubject) + val outboundContextA = inboundContextA.association(addressB.address) val output = - send(N, 1.second, replyJunction, replyConnector) + send(N, 1.second, outboundContextA) .throttle(200, 1.second, 10, ThrottleMode.shaping) .via(randomDrop(dropRate)) - .via(inbound) + .via(inbound(inboundContextB)) .map(_.message.asInstanceOf[String]) .runWith(Sink.seq) diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala new file mode 100644 index 0000000000..d54cff909e --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.remote.UniqueAddress +import akka.actor.Address +import scala.concurrent.Future +import akka.remote.artery.ReplyJunction.ReplySubject +import akka.remote.RemoteActorRef +import scala.concurrent.Promise +import akka.Done +import akka.remote.artery.ReplyJunction.ReplyObserver +import java.util.concurrent.CopyOnWriteArrayList +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ThreadLocalRandom + +private[akka] class TestInboundContext( + override val localAddress: UniqueAddress, + val replySubject: TestReplySubject = new TestReplySubject, + replyDropRate: Double = 0.0) extends InboundContext { + + private val associations = new ConcurrentHashMap[Address, OutboundContext] + + def sendReply(to: Address, message: ControlMessage) = { + if (ThreadLocalRandom.current().nextDouble() >= replyDropRate) + replySubject.sendReply(InboundEnvelope(null, to, message, None)) + } + + def association(remoteAddress: Address): OutboundContext = + associations.get(remoteAddress) match { + case null ⇒ + val a = new TestOutboundContext(localAddress, remoteAddress, replySubject) + associations.putIfAbsent(remoteAddress, a) match { + case null ⇒ a + case existing ⇒ existing + } + case existing ⇒ existing + } + + protected def createAssociation(remoteAddress: Address): OutboundContext = + new TestOutboundContext(localAddress, remoteAddress, replySubject) +} + +private[akka] class TestOutboundContext( + override val localAddress: UniqueAddress, + override val remoteAddress: Address, + override val replySubject: TestReplySubject) extends OutboundContext { + + private val _uniqueRemoteAddress = Promise[UniqueAddress]() + def uniqueRemoteAddress: Future[UniqueAddress] = _uniqueRemoteAddress.future + def completeRemoteAddress(a: UniqueAddress): Unit = _uniqueRemoteAddress.trySuccess(a) + + // FIXME we should be able to Send without a recipient ActorRef + def dummyRecipient: RemoteActorRef = null + +} + +private[akka] class TestReplySubject extends ReplySubject { + + private var replyObservers = new CopyOnWriteArrayList[ReplyObserver] + + override def attach(observer: ReplyObserver): Future[Done] = { + replyObservers.add(observer) + Future.successful(Done) + } + + override def detach(observer: ReplyObserver): Unit = { + replyObservers.remove(observer) + } + + override def stopped: Future[Done] = Promise[Done]().future + + def sendReply(env: InboundEnvelope): Unit = { + val iter = replyObservers.iterator() + while (iter.hasNext()) + iter.next().reply(env) + } +}