diff --git a/akka-docs/src/main/paradox/remoting-artery.md b/akka-docs/src/main/paradox/remoting-artery.md index 4915a5434c..0053df1871 100644 --- a/akka-docs/src/main/paradox/remoting-artery.md +++ b/akka-docs/src/main/paradox/remoting-artery.md @@ -2,8 +2,11 @@ @@@ note -This page describes the @ref:[may change](common/may-change.md) remoting subsystem, codenamed *Artery* that will eventually replace the -old remoting implementation. For the current stable remoting system please refer to @ref:[Remoting](remoting.md). +This page describes the remoting subsystem, codenamed *Artery* that will eventually replace the +@ref:[old remoting implementation](remoting.md). Artery with the Aeron transport is ready +to use in production. The TCP based transport is not ready for use in production yet. The module is +marked @ref:[may change](common/may-change.md) because some configuration will be changed when the API +becomes stable. @@@ @@ -25,7 +28,7 @@ Artery is a reimplementation of the old remoting module aimed at improving perfo source compatible with the old implementation and it is a drop-in replacement in many cases. Main features of Artery compared to the previous implementation: - * Based on [Aeron](https://github.com/real-logic/Aeron) (UDP) instead of TCP + * Based on [Aeron](https://github.com/real-logic/Aeron) (UDP) and Akka Streams TCP/TLS instead of Netty TCP * Focused on high-throughput, low-latency communication * Isolation of internal control messages from user messages improving stability and reducing false failure detection in case of heavy traffic by using a dedicated subchannel. @@ -75,6 +78,7 @@ akka { remote { artery { enabled = on + transport = aeron-udp canonical.hostname = "127.0.0.1" canonical.port = 25520 } @@ -102,7 +106,28 @@ listening for connections and handling messages as not to interfere with other a @@@ The example above only illustrates the bare minimum of properties you have to add to enable remoting. -All settings are described in [Remote Configuration](#remote-configuration-artery). +All settings are described in @ref:[Remote Configuration](#remote-configuration-artery). + +### Selecting transport + +There are three alternatives of which underlying transport to use. It is configured by property +`akka.remote.artery.transport` with the possible values: + +* `aeron-udp` - Based on [Aeron (UDP)](https://github.com/real-logic/aeron) +* `tcp` - Based on @ref:[Akka Streams TCP](stream/stream-io.md#streaming-tcp) +* `tls-tcp` - Same as `tcp` with encryption using @ref:[Akka Streams TLS](stream/stream-io.md#tls) + +The Aeron (UDP) transport is a high performance transport and should be used for systems +that require high throughput and low latency. It is using more CPU than TCP when the system +is idle or at low message rates. There is no encryption for Aeron. + +The TCP and TLS transport is implemented using Akka Streams TCP/TLS. This is the choice +when encryption is needed, but it can also be used with plain TCP without TLS. It's also +the obvious choice when UDP can't be used. +It has very good performance (high throughput and low latency) but not as good as the Aeron transport. +It is using less CPU than Aeron when the system is idle or at low message rates. +This has not been verified yet, but it might scale better for many connections than the Aereon +transport, which can be of importance for large clusters with 100s or even 1000s of nodes. @@@ note @@ -764,8 +789,8 @@ See Aeron documentation about [Performance Testing](https://github.com/real-logi ### Fine-tuning CPU usage latency tradeoff Artery has been designed for low latency and as a result it can be CPU hungry when the system is mostly idle. -This is not always desirable. It is possible to tune the tradeoff between CPU usage and latency with -the following configuration: +This is not always desirable. When using the Aeron transport it is possible to tune the tradeoff between CPU +usage and latency with the following configuration: ``` # Values can be from 1 to 10, where 10 strongly prefers low latency diff --git a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala index 00069e3a22..eae1d91d5d 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala @@ -1,8 +1,10 @@ package akka.remote.artery import akka.actor.ActorSystem +import akka.remote.RARP import akka.remote.RemoteTransportException import akka.testkit.SocketUtil +import akka.testkit.TestKit import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, WordSpec } @@ -32,9 +34,15 @@ class ArteryFailedToBindSpec extends WordSpec with Matchers { val ex = intercept[RemoteTransportException] { ActorSystem("BindTest2", config) } - ex.getMessage should equal("Inbound Aeron channel is in errored state. See Aeron logs for details.") + RARP(as).provider.transport.asInstanceOf[ArteryTransport].settings.Transport match { + case ArterySettings.AeronUpd ⇒ + ex.getMessage should ===("Inbound Aeron channel is in errored state. See Aeron logs for details.") + case ArterySettings.Tcp ⇒ + ex.getMessage should startWith("Failed to bind TCP") + } + } finally { - as.terminate() + TestKit.shutdownActorSystem(as) } } } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index ac7f383875..dabf1562ca 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -730,6 +730,22 @@ akka { # Enable the new remoting with this flag enabled = off + # Select the underlying transport implementation. + # + # Possible values: aeron-udp, tcp + # + # The Aeron (UDP) transport is a high performance transport and should be used for systems + # that require high throughput and low latency. It is using more CPU than TCP when the + # system is idle or at low message rates. There is no encryption for Aeron. + # https://github.com/real-logic/aeron + # + # The TCP and TLS transport is implemented using Akka Streams TCP/TLS. This is the choice + # when encryption is needed, but it can also be used with plain TCP without TLS. It's also + # the obvious choice when UDP can't be used. + # It has very good performance (high throughput and low latency) but not as good as the Aeron transport. + # It is using less CPU than Aeron when the system is idle or at low message rates. + transport = aeron-udp + # Canonical address is the address other clients should connect to. # Artery transport will expect messages to this address. canonical { @@ -775,6 +791,7 @@ akka { } # Periodically log out all Aeron counters. See https://github.com/real-logic/aeron/wiki/Monitoring-and-Debugging#counters + # Only used when transport is aeron-udp. log-aeron-counters = false # Actor paths to use the large message stream for when a message @@ -859,15 +876,18 @@ akka { # Controls whether to start the Aeron media driver in the same JVM or use external # process. Set to 'off' when using external media driver, and then also set the # 'aeron-dir'. + # Only used when transport is aeron-udp. embedded-media-driver = on # Directory used by the Aeron media driver. It's mandatory to define the 'aeron-dir' # if using external media driver, i.e. when 'embedded-media-driver = off'. # Embedded media driver will use a this directory, or a temporary directory if this # property is not defined (empty). + # Only used when transport is aeron-udp. aeron-dir = "" # Whether to delete aeron embeded driver directory upon driver stop. + # Only used when transport is aeron-udp. delete-aeron-dir = yes # Level of CPU time used, on a scale between 1 and 10, during backoff/idle. @@ -876,6 +896,7 @@ akka { # backoff backpressure. # Level 1 strongly prefer low CPU consumption over low latency. # Level 10 strongly prefer low latency over low CPU consumption. + # Only used when transport is aeron-udp. idle-cpu-level = 5 # Total number of inbound lanes, shared among all inbound associations. A value @@ -926,9 +947,13 @@ akka { # unacknowledged system messages are re-delivered with this interval system-message-resend-interval = 1 second + # Timeout of establishing outbound connections. + # Only used when transport is tcp + connection-timeout = 5 seconds + # The timeout for outbound associations to perform the handshake. # This timeout must be greater than the 'image-liveness-timeout'. - handshake-timeout = 20 s + handshake-timeout = 20 seconds # incomplete handshake attempt is retried with this interval handshake-retry-interval = 1 second @@ -939,6 +964,7 @@ akka { inject-handshake-interval = 1 second # messages that are not accepted by Aeron are dropped after retrying for this period + # Only used when transport is aeron-udp. give-up-message-after = 60 seconds # System messages that are not acknowledged after re-sending for this period are @@ -958,6 +984,10 @@ akka { # If more restarts occurs the ActorSystem will be terminated. inbound-max-restarts = 5 + # Retry outbound connection after this backoff. + # Only used when transport is tcp + outbound-restart-backoff = 1 second + # See 'outbound-max-restarts' outbound-restart-timeout = 5 seconds @@ -971,15 +1001,18 @@ akka { # Timeout after which aeron driver has not had keepalive messages # from a client before it considers the client dead. + # Only used when transport is aeron-udp. client-liveness-timeout = 20 seconds # Timeout for each the INACTIVE and LINGER stages an aeron image # will be retained for when it is no longer referenced. # This timeout must be less than the 'handshake-timeout'. + # Only used when transport is aeron-udp. image-liveness-timeout = 10 seconds # Timeout after which the aeron driver is considered dead # if it does not update its C'n'C timestamp. + # Only used when transport is aeron-udp. driver-timeout = 20 seconds flight-recorder { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5fdd2d5a0b..bbf7bea5a7 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -9,11 +9,12 @@ import akka.actor._ import akka.dispatch.sysmsg._ import akka.event.{ EventStream, Logging, LoggingAdapter } import akka.event.Logging.Error -import akka.serialization.{ Serialization, SerializationExtension } import akka.pattern.pipe + import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } + import scala.util.control.Exception.Catcher import scala.concurrent.Future @@ -21,12 +22,15 @@ import akka.ConfigurationException import akka.annotation.InternalApi import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.remote.artery.ArteryTransport +import akka.remote.artery.aeron.ArteryAeronUdpTransport +import akka.remote.artery.ArterySettings +import akka.remote.artery.ArterySettings.AeronUpd import akka.util.OptionVal import akka.remote.artery.OutboundEnvelope import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope -import akka.remote.artery.aeron.ArteryAeronUdpTransport import akka.remote.serialization.ActorRefResolveCache import akka.remote.serialization.ActorRefResolveThreadLocalCache +import akka.remote.artery.tcp.ArteryTcpTransport /** * INTERNAL API @@ -204,7 +208,12 @@ private[akka] class RemoteActorRefProvider( local.registerExtraNames(Map(("remote", d))) d }, - transport = if (remoteSettings.Artery.Enabled) new ArteryAeronUdpTransport(system, this) else new Remoting(system, this)) + transport = + if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Transport match { + case ArterySettings.AeronUpd ⇒ new ArteryAeronUdpTransport(system, this) + case ArterySettings.Tcp ⇒ new ArteryTcpTransport(system, this) + } + else new Remoting(system, this)) _internals = internals remotingTerminator ! internals diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 2abdde9435..7e11ffdcaf 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -71,6 +71,13 @@ private[akka] final class ArterySettings private (config: Config) { val LogSend: Boolean = getBoolean("log-sent-messages") val LogAeronCounters: Boolean = config.getBoolean("log-aeron-counters") + val Transport: Transport = toRootLowerCase(getString("transport")) match { + case AeronUpd.configName ⇒ AeronUpd + case Tcp.configName ⇒ Tcp + case other ⇒ throw new IllegalArgumentException(s"Unknown transport [$other], possible values: " + + s""""${AeronUpd.configName}", "${Tcp.configName}"""") + } + /** * Used version of the header format for outbound messages. * To support rolling upgrades this may be a lower version than `ArteryTransport.HighestVersion`, @@ -126,6 +133,8 @@ private[akka] final class ArterySettings private (config: Config) { val InjectHandshakeInterval: FiniteDuration = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒ interval > Duration.Zero, "inject-handshake-interval must be more than zero") + val ConnectionTimeout: FiniteDuration = config.getMillisDuration("connection-timeout").requiring(interval ⇒ + interval > Duration.Zero, "connection-timeout must be more than zero") val GiveUpMessageAfter: FiniteDuration = config.getMillisDuration("give-up-message-after").requiring(interval ⇒ interval > Duration.Zero, "give-up-message-after must be more than zero") val GiveUpSystemMessageAfter: FiniteDuration = @@ -138,6 +147,9 @@ private[akka] final class ArterySettings private (config: Config) { config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒ interval > Duration.Zero, "inbound-restart-timeout must be more than zero") val InboundMaxRestarts: Int = getInt("inbound-max-restarts") + val OutboundRestartBackoff: FiniteDuration = + config.getMillisDuration("outbound-restart-backoff").requiring(interval ⇒ + interval > Duration.Zero, "outbound-restart-backoff must be more than zero") val OutboundRestartTimeout: FiniteDuration = config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒ interval > Duration.Zero, "outbound-restart-timeout must be more than zero") @@ -205,4 +217,15 @@ private[akka] object ArterySettings { case other ⇒ other } + sealed trait Transport { + val configName: String + } + object AeronUpd extends Transport { + override val configName: String = "aeron-udp" + override def toString: String = configName + } + object Tcp extends Transport { + override val configName: String = "tcp" + override def toString: String = configName + } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 5414a3f4c2..78f8dade93 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -398,15 +398,16 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr startTransport() topLevelFREvents.loFreq(Transport_Started, NoMetaData) + val udp = settings.Transport == ArterySettings.AeronUpd val port = if (settings.Canonical.Port == 0) { if (settings.Bind.Port != 0) settings.Bind.Port // if bind port is set, use bind port instead of random - else ArteryTransport.autoSelectPort(settings.Canonical.Hostname) + else ArteryTransport.autoSelectPort(settings.Canonical.Hostname, udp) } else settings.Canonical.Port val bindPort = if (settings.Bind.Port == 0) { if (settings.Canonical.Port == 0) port // canonical and bind ports are zero. Use random port for both - else ArteryTransport.autoSelectPort(settings.Bind.Hostname) + else ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp) } else settings.Bind.Port _localAddress = UniqueAddress( @@ -432,7 +433,9 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr runInboundStreams() topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData) - log.info("Remoting started; listening on address: [{}] with UID [{}]", localAddress.address, localAddress.uid) + log.info( + "Remoting started with transport [Artery {}]; listening on address [{}] with UID [{}]", + settings.Transport, localAddress.address, localAddress.uid) } protected def startTransport(): Unit @@ -870,12 +873,20 @@ private[remote] object ArteryTransport { aeronSourceLifecycle: Option[AeronSource.ResourceLifecycle], completed: Future[Done]) - def autoSelectPort(hostname: String): Int = { - val socket = DatagramChannel.open().socket() - socket.bind(new InetSocketAddress(hostname, 0)) - val port = socket.getLocalPort - socket.close() - port + def autoSelectPort(hostname: String, udp: Boolean): Int = { + if (udp) { + val socket = DatagramChannel.open().socket() + socket.bind(new InetSocketAddress(hostname, 0)) + val port = socket.getLocalPort + socket.close() + port + } else { + val socket = ServerSocketChannel.open().socket() + socket.bind(new InetSocketAddress(hostname, 0)) + val port = socket.getLocalPort + socket.close() + port + } } val ControlStreamId = 1 diff --git a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala index db766a225d..5df4ae49f8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala @@ -383,6 +383,14 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { private var literalChars = new Array[Char](64) private var literalBytes = new Array[Byte](64) + // The streamId is only used for TCP transport. It is not part of the ordinary envelope header, but included in the + // frame header that is parsed by the TcpFraming stage. + private var _streamId: Int = -1 + def streamId: Int = + if (_streamId != -1) _streamId + else throw new IllegalStateException("StreamId was not set") + def setStreamId(newStreamId: Int): Unit = _streamId = newStreamId + def writeHeader(h: HeaderBuilder): Unit = writeHeader(h, null) def writeHeader(h: HeaderBuilder, oe: OutboundEnvelope): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala index 2475d64ba6..a32953a432 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala @@ -51,6 +51,14 @@ private[remote] object FlightRecorderEvents { val Compression_Inbound_RunActorRefAdvertisement = 94 val Compression_Inbound_RunClassManifestAdvertisement = 95 + val TcpOutbound_Connected = 150 + val TcpOutbound_Sent = 151 + + val TcpInbound_Bound = 170 + val TcpInbound_Unbound = 171 + val TcpInbound_Connected = 172 + val TcpInbound_Received = 173 + // Used for presentation of the entries in the flight recorder lazy val eventDictionary = Map( Transport_MediaDriverStarted → "Transport: Media driver started", @@ -92,7 +100,17 @@ private[remote] object FlightRecorderEvents { Compression_CompressedManifest → "Compression: Compressed manifest", Compression_AllocatedManifestCompressionId → "Compression: Allocated manifest compression id", Compression_Inbound_RunActorRefAdvertisement → "InboundCompression: Run class manifest compression advertisement", - Compression_Inbound_RunClassManifestAdvertisement → "InboundCompression: Run class manifest compression advertisement" + Compression_Inbound_RunClassManifestAdvertisement → "InboundCompression: Run class manifest compression advertisement", + + // TCP outbound events + TcpOutbound_Connected -> "TCP out: Connected", + TcpOutbound_Sent -> "TCP out: Sent message", + + // TCP inbound events + TcpInbound_Bound -> "TCP in: Bound", + TcpInbound_Unbound -> "TCP in: Unbound", + TcpInbound_Connected -> "TCP in: New connection", + TcpInbound_Received -> "TCP in: Received message" ).map { case (int, str) ⇒ int.toLong → str } diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala new file mode 100644 index 0000000000..985ab45e8f --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -0,0 +1,391 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.remote.artery +package tcp + +import java.net.InetSocketAddress + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import akka.ConfigurationException +import akka.Done +import akka.NotUsed +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.dispatch.ExecutionContexts +import akka.event.Logging +import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteTransportException +import akka.remote.artery.Decoder.InboundCompressionAccess +import akka.remote.artery.compress._ +import akka.stream.Attributes +import akka.stream.Attributes.LogLevels +import akka.stream.FlowShape +import akka.stream.Graph +import akka.stream.KillSwitches +import akka.stream.Materializer +import akka.stream.SharedKillSwitch +import akka.stream.SinkShape +import akka.stream.scaladsl.Broadcast +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.GraphDSL +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.MergeHub +import akka.stream.scaladsl.Partition +import akka.stream.scaladsl.RestartFlow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Tcp +import akka.stream.scaladsl.Tcp.IncomingConnection +import akka.stream.scaladsl.Tcp.ServerBinding +import akka.util.ByteString +import akka.util.OptionVal + +/** + * INTERNAL API + */ +private[remote] object ArteryTcpTransport { + + private val successUnit = Success(()) + + def optionToTry(opt: Option[Throwable]): Try[Unit] = opt match { + case None ⇒ successUnit + case Some(t) ⇒ Failure(t) + } +} + +/** + * INTERNAL API + */ +private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) + extends ArteryTransport(_system, _provider) { + import ArteryTransport._ + import ArteryTcpTransport._ + import FlightRecorderEvents._ + + // may change when inbound streams are restarted + @volatile private var inboundKillSwitch: SharedKillSwitch = KillSwitches.shared("inboundKillSwitch") + // may change when inbound streams are restarted + @volatile private var inboundStream: OptionVal[Sink[EnvelopeBuffer, NotUsed]] = OptionVal.None + @volatile private var serverBinding: Option[Future[ServerBinding]] = None + + override protected def startTransport(): Unit = { + // nothing specific here + } + + override protected def outboundTransportSink( + outboundContext: OutboundContext, + streamId: Int, + bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = { + implicit val sys: ActorSystem = system + + val afr = createFlightRecorderEventSink() + + val host = outboundContext.remoteAddress.host.get + val port = outboundContext.remoteAddress.port.get + val remoteAddress = InetSocketAddress.createUnresolved(host, port) + + def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = + Tcp() + .outgoingConnection( + remoteAddress, + halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false + connectTimeout = settings.Advanced.ConnectionTimeout) + + def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = { + val flowFactory = () ⇒ { + + afr.loFreq( + TcpOutbound_Connected, + s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " + + s"/ ${streamName(streamId)}") + Flow[ByteString] + .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) + .via(connectionFlow) + .mapMaterializedValue(_ ⇒ NotUsed) + .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal ⇒ Source.empty }) + .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream") + .addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = Logging.WarningLevel)) + } + + if (streamId == ControlStreamId) { + // restart of inner connection part important in control flow, since system messages + // are buffered and resent from the outer SystemMessageDelivery stage. + RestartFlow.withBackoff[ByteString, ByteString]( + settings.Advanced.OutboundRestartBackoff, + settings.Advanced.GiveUpSystemMessageAfter, 0.1)(flowFactory) + } else { + // Best effort retry a few times + // FIXME only restart on failures?, but missing in RestartFlow, see https://github.com/akka/akka/pull/23911 + RestartFlow.withBackoff[ByteString, ByteString]( + settings.Advanced.OutboundRestartBackoff, + settings.Advanced.OutboundRestartBackoff * 5, 0.1, maxRestarts = 3)(flowFactory) + } + + } + + Flow[EnvelopeBuffer] + .map { env ⇒ + // TODO Possible performance improvement, could we reduce the copying of bytes? + afr.hiFreq(TcpOutbound_Sent, env.byteBuffer.limit) + val size = env.byteBuffer.limit + + val bytes = ByteString(env.byteBuffer) + bufferPool.release(env) + + TcpFraming.encodeFrameHeader(size) ++ bytes + } + .via(connectionFlowWithRestart) + .map(_ ⇒ throw new IllegalStateException(s"Unexpected incoming bytes in outbound connection to [${outboundContext.remoteAddress}]")) + .toMat(Sink.ignore)(Keep.right) + } + + override protected def runInboundStreams(): Unit = { + + // Design note: The design of how to run the inbound streams are influenced by the original design + // for the Aeron streams, and there we can only have one single inbound since everything comes in + // via the single AeronSource. + // + // For TCP we could materialize the inbound streams for each inbound connection, i.e. running many + // completely separate inbound streams. Each would still have to include all 3 control, ordinary, + // large parts for each connection even though only one is used by a specific connection. Unless + // we can dynamically choose what to materialize based on the `streamId` (as a first byte, or + // in first frame of the connection), which is complicated. + // + // However, this would be make the design for Aeron and TCP more different and more things might + // have to be changed, such as compression advertisements and materialized values. Number of + // inbound streams would be dynamic, and so on. + + implicit val mat: Materializer = materializer + implicit val sys: ActorSystem = system + + // These streams are always running, only one instance of each inbound stream, and then the inbound connections + // are attached to these via a MergeHub. + val (controlStream, controlStreamCompleted) = runInboundControlStream() + val (ordinaryMessagesStream, ordinaryMessagesStreamCompleted) = runInboundOrdinaryMessagesStream() + val (largeMessagesStream, largeMessagesStreamCompleted) = { + if (largeMessageChannelEnabled) + runInboundLargeMessagesStream() + else + ( + Flow[EnvelopeBuffer] + .map(_ ⇒ log.warning("Dropping large message, missing large-message-destinations configuration.")) + .to(Sink.ignore), + Promise[Done]().future) // never completed, not enabled + } + + // An inbound connection will only use one of the control, ordinary or large streams, but we have to + // attach it to all and select via Partition and the streamId in the frame header. Conceptually it + // would have been better to send the streamId as one single first byte for a new connection and + // decide where to attach it based on that byte. Then the streamId wouldn't have to be sent in each + // frame. That was not chosen because it is more complicated to implement and might have more runtime + // overhead. + inboundStream = + OptionVal.Some(Sink.fromGraph(GraphDSL.create() { implicit b ⇒ + import GraphDSL.Implicits._ + val partition = b.add(Partition[EnvelopeBuffer](3, env ⇒ { + env.streamId match { + case OrdinaryStreamId ⇒ 1 + case ControlStreamId ⇒ 0 + case LargeStreamId ⇒ 2 + case other ⇒ throw new IllegalArgumentException(s"Unexpected streamId [$other]") + } + })) + partition.out(0) ~> controlStream + partition.out(1) ~> ordinaryMessagesStream + partition.out(2) ~> largeMessagesStream + SinkShape(partition.in) + })) + + // If something in the inboundConnectionFlow fails, e.g. framing, the connection will be teared down, + // but other parts of the inbound streams don't have to restarted. + def inboundConnectionFlow(inboundConnection: IncomingConnection): Flow[ByteString, ByteString, NotUsed] = { + // must create new Flow for each connection because of the FlightRecorder that can't be shared + val afr = createFlightRecorderEventSink() + Flow[ByteString] + .via(inboundKillSwitch.flow) + .via(new TcpFraming(afr)) + .alsoTo(inboundStream.get) + .filter(_ ⇒ false) // don't send back anything in this TCP socket + .map(_ ⇒ ByteString.empty) // make it a Flow[ByteString] again + } + + val host = localAddress.address.host.get + val port = localAddress.address.port.get + + val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] = + Tcp().bind( + interface = host, + port = port, + halfClose = false) + + serverBinding = serverBinding match { + case None ⇒ + val afr = createFlightRecorderEventSink() + val binding = connectionSource + .to(Sink.foreach { connection ⇒ + afr.loFreq( + TcpInbound_Connected, + s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}") + connection.handleWith(inboundConnectionFlow(connection)) + }) + .run() + .recoverWith { + case e ⇒ Future.failed(new RemoteTransportException( + s"Failed to bind TCP to [${localAddress.address.host.get}:${localAddress.address.port.get}] due to: " + + e.getMessage, e)) + }(ExecutionContexts.sameThreadExecutionContext) + + // only on initial startup, when ActorSystem is starting + Await.result(binding, settings.Bind.BindTimeout) + afr.loFreq(TcpInbound_Bound, s"$host:$port") + Some(binding) + case s @ Some(_) ⇒ + // already bound, when restarting + s + } + + // Failures in any of the inbound streams should be extremely rare, probably an unforeseen accident. + // Tear down everything and start over again. Inbound streams are "stateless" so that should be fine. + // Tested in SurviveInboundStreamRestartWithCompressionInFlightSpec + implicit val ec: ExecutionContext = materializer.executionContext + val completed = Future.firstCompletedOf( + List(controlStreamCompleted, ordinaryMessagesStreamCompleted, largeMessagesStreamCompleted)) + val restart = () ⇒ { + inboundKillSwitch.shutdown() + inboundKillSwitch = KillSwitches.shared("inboundKillSwitch") + + val allStopped: Future[Done] = for { + _ ← controlStreamCompleted.recover { case _ ⇒ Done } + _ ← ordinaryMessagesStreamCompleted.recover { case _ ⇒ Done } + _ ← if (largeMessageChannelEnabled) + largeMessagesStreamCompleted.recover { case _ ⇒ Done } else Future.successful(Done) + } yield Done + allStopped.foreach(_ ⇒ runInboundStreams()) + } + + attachInboundStreamRestart("Inbound streams", completed, restart) + } + + private def runInboundControlStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = { + if (isShutdown) throw ArteryTransport.ShuttingDown + + val (hub, ctrl, completed) = + MergeHub.source[EnvelopeBuffer].addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) + .via(inboundKillSwitch.flow) + .via(inboundFlow(settings, NoInboundCompressions)) + .toMat(inboundControlSink)({ case (a, (c, d)) ⇒ (a, c, d) }) + .run()(controlMaterializer) + attachControlMessageObserver(ctrl) + implicit val ec: ExecutionContext = materializer.executionContext + updateStreamMatValues(ControlStreamId, completed) + + (hub, completed) + } + + private def runInboundOrdinaryMessagesStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = { + if (isShutdown) throw ArteryTransport.ShuttingDown + + val (inboundHub: Sink[EnvelopeBuffer, NotUsed], inboundCompressionAccess, completed) = + if (inboundLanes == 1) { + MergeHub.source[EnvelopeBuffer].addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) + .via(inboundKillSwitch.flow) + .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) + .toMat(inboundSink(envelopeBufferPool))({ case ((a, b), c) ⇒ (a, b, c) }) + .run()(materializer) + + } else { + // TODO perhaps a few more things can be extracted and DRY with AeronUpdTransport.runInboundOrdinaryMessagesStream + val laneKillSwitch = KillSwitches.shared("laneKillSwitch") + val laneSource: Source[InboundEnvelope, (Sink[EnvelopeBuffer, NotUsed], InboundCompressionAccess)] = + MergeHub.source[EnvelopeBuffer].addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) + .via(inboundKillSwitch.flow) + .via(laneKillSwitch.flow) + .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) + .via(Flow.fromGraph(new DuplicateHandshakeReq(inboundLanes, this, system, envelopeBufferPool))) + + val (inboundHub, compressionAccess, laneHub) = + laneSource + .toMat(Sink.fromGraph(new FixedSizePartitionHub[InboundEnvelope](inboundLanePartitioner, inboundLanes, + settings.Advanced.InboundHubBufferSize)))({ case ((a, b), c) ⇒ (a, b, c) }) + .run()(materializer) + + val lane = inboundSink(envelopeBufferPool) + val completedValues: Vector[Future[Done]] = + (0 until inboundLanes).map { _ ⇒ + laneHub.toMat(lane)(Keep.right).run()(materializer) + }(collection.breakOut) + + import system.dispatcher + + // tear down the upstream hub part if downstream lane fails + // lanes are not completed with success by themselves so we don't have to care about onSuccess + Future.firstCompletedOf(completedValues).failed.foreach { reason ⇒ laneKillSwitch.abort(reason) } + val allCompleted = Future.sequence(completedValues).map(_ ⇒ Done) + + (inboundHub, compressionAccess, allCompleted) + } + + setInboundCompressionAccess(inboundCompressionAccess) + + updateStreamMatValues(OrdinaryStreamId, completed) + + (inboundHub, completed) + } + + private def runInboundLargeMessagesStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = { + if (isShutdown) throw ArteryTransport.ShuttingDown + + val (hub, completed) = + MergeHub.source[EnvelopeBuffer].addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) + .via(inboundKillSwitch.flow) + .via(inboundLargeFlow(settings)) + .toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both) + .run()(materializer) + + updateStreamMatValues(LargeStreamId, completed) + + (hub, completed) + } + + private def updateStreamMatValues(streamId: Int, completed: Future[Done]): Unit = { + implicit val ec: ExecutionContext = materializer.executionContext + updateStreamMatValues(ControlStreamId, InboundStreamMatValues( + None, + completed.recover { case _ ⇒ Done })) + } + + override protected def shutdownTransport(): Future[Done] = { + implicit val ec: ExecutionContext = materializer.executionContext + inboundKillSwitch.shutdown() + unbind().map { _ ⇒ + topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) + Done + } + } + + private def unbind(): Future[Done] = { + implicit val ec: ExecutionContext = materializer.executionContext + serverBinding match { + case Some(binding) ⇒ + for { + b ← binding + _ ← b.unbind() + } yield { + topLevelFREvents.loFreq(TcpInbound_Bound, s"${localAddress.address.host.get}:${localAddress.address.port}") + Done + } + case None ⇒ + Future.successful(Done) + } + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala new file mode 100644 index 0000000000..3332226976 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.remote.artery +package tcp + +import java.nio.ByteBuffer +import java.nio.ByteOrder + +import akka.annotation.InternalApi +import akka.remote.artery.FlightRecorderEvents.TcpInbound_Received +import akka.stream.Attributes +import akka.stream.impl.io.ByteStringParser +import akka.stream.impl.io.ByteStringParser.ByteReader +import akka.stream.impl.io.ByteStringParser.ParseResult +import akka.stream.impl.io.ByteStringParser.ParseStep +import akka.stream.scaladsl.Framing.FramingException +import akka.stream.stage.GraphStageLogic +import akka.util.ByteString + +/** + * INTERNAL API + */ +@InternalApi private[akka] object TcpFraming { + val Undefined = Int.MinValue + + /** + * The first 4 bytes of a new connection must be these `0x64 0x75 0x75 0x64` (AKKA). + * The purpose of the "magic" is to detect and reject weird (accidental) accesses. + */ + val Magic = ByteString('A'.toByte, 'K'.toByte, 'K'.toByte, 'A'.toByte) + + /** + * When establishing the connection this header is sent first. + * It contains a "magic" and the stream identifier for selecting control, ordinary, large + * inbound streams. + * + * The purpose of the "magic" is to detect and reject weird (accidental) accesses. + * The magic 4 bytes are `0x64 0x75 0x75 0x64` (AKKA). + * + * The streamId` is encoded as 1 byte. + */ + def encodeConnectionHeader(streamId: Int): ByteString = + Magic ++ ByteString(streamId.toByte) + + /** + * Each frame starts with the frame header that contains the length + * of the frame. The `frameLength` is encoded as 4 bytes (little endian). + */ + def encodeFrameHeader(frameLength: Int): ByteString = + ByteString( + (frameLength & 0xff).toByte, + ((frameLength & 0xff00) >> 8).toByte, + ((frameLength & 0xff0000) >> 16).toByte, + ((frameLength & 0xff000000) >> 24).toByte + ) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class TcpFraming(flightRecorder: EventSink) extends ByteStringParser[EnvelopeBuffer] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic { + abstract class Step extends ParseStep[EnvelopeBuffer] + startWith(ReadMagic) + + case object ReadMagic extends Step { + override def parse(reader: ByteReader): ParseResult[EnvelopeBuffer] = { + val magic = reader.take(TcpFraming.Magic.size) + if (magic == TcpFraming.Magic) + ParseResult(None, ReadStreamId) + else + throw new FramingException("Stream didn't start with expected magic bytes, " + + s"got [${(magic ++ reader.remainingData).take(10).map(_ formatted "%02x").mkString(" ")}] " + + "Connection is rejected. Probably invalid accidental access.") + } + } + case object ReadStreamId extends Step { + override def parse(reader: ByteReader): ParseResult[EnvelopeBuffer] = + ParseResult(None, ReadFrame(reader.readByte())) + } + case class ReadFrame(streamId: Int) extends Step { + override def onTruncation(): Unit = + failStage(new FramingException("Stream finished but there was a truncated final frame in the buffer")) + + override def parse(reader: ByteReader): ParseResult[EnvelopeBuffer] = { + val frameLength = reader.readIntLE() + val buffer = createBuffer(reader.take(frameLength)) + ParseResult(Some(buffer), this) + } + + private def createBuffer(bs: ByteString): EnvelopeBuffer = { + val buffer = ByteBuffer.wrap(bs.toArray) + buffer.order(ByteOrder.LITTLE_ENDIAN) + flightRecorder.hiFreq(TcpInbound_Received, buffer.limit) + val res = new EnvelopeBuffer(buffer) + res.setStreamId(streamId) + res + } + } + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala index 21711fe3ef..77acae3781 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala @@ -25,7 +25,15 @@ object RemoteDeathWatchSpec { /watchers.remote = "akka://other@localhost:$otherPort" } } - remote.watch-failure-detector.acceptable-heartbeat-pause = 3s + test.filter-leeway = 10s + remote.watch-failure-detector.acceptable-heartbeat-pause = 2s + + # reduce handshake timeout for quicker test of unknownhost, but + # must still be longer than failure detection + remote.artery.advanced { + handshake-timeout = 10 s + image-liveness-timeout = 9 seconds + } } """).withFallback(ArterySpecSupport.defaultConfig) } @@ -55,6 +63,7 @@ class RemoteDeathWatchSpec extends ArteryMultiNodeSpec(RemoteDeathWatchSpec.conf system.actorOf(Props(new Actor { context.watch(ref) + def receive = { case Terminated(r) ⇒ testActor ! r } @@ -78,6 +87,10 @@ class RemoteDeathWatchSpec extends ArteryMultiNodeSpec(RemoteDeathWatchSpec.conf } "receive ActorIdentity(None) when identified node is unknown host" in { + // TODO There is a timing difference between Aeron and TCP. AeronSink will throw exception + // immediately in constructor from aeron.addPublication when UnknownHostException. That will trigger + // this immediately. With TCP it will trigger after handshake timeout. Can we see the UnknownHostException + // reason somehow and fail the stream immediately for that case? val path = RootActorPath(Address("akka", system.name, "unknownhost2", 2552)) / "user" / "subject" system.actorSelection(path) ! Identify(path) expectMsg(60.seconds, ActorIdentity(path, None)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 730ae550a0..43b82e29a9 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -22,6 +22,20 @@ class ArteryUpdSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsi akka.remote.artery.advanced.inbound-lanes = 3 """).withFallback(ArterySpecSupport.defaultConfig)) +class ArteryTcpSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec( + ConfigFactory.parseString(""" + akka.remote.artery.transport = tcp + akka.remote.artery.advanced.outbound-lanes = 1 + akka.remote.artery.advanced.inbound-lanes = 1 + """).withFallback(ArterySpecSupport.defaultConfig)) + +class ArteryTcpSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec( + ConfigFactory.parseString(""" + akka.remote.artery.transport = tcp + akka.remote.artery.advanced.outbound-lanes = 3 + akka.remote.artery.advanced.inbound-lanes = 3 + """).withFallback(ArterySpecSupport.defaultConfig)) + abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryMultiNodeSpec(config) with ImplicitSender { val systemB = newRemoteSystem(name = Some("systemB")) diff --git a/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala new file mode 100644 index 0000000000..5b63d7ea32 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.remote.artery +package tcp + +import scala.util.Random + +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.impl.io.ByteStringParser.ParsingException +import akka.stream.impl.io.ByteStringParser.ParsingException +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Framing.FramingException +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.util.ByteString + +class TcpFramingSpec extends AkkaSpec with ImplicitSender { + import TcpFraming.encodeFrameHeader + + private val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + private implicit val mat = ActorMaterializer(matSettings)(system) + + private val afr = IgnoreEventSink + + private val framingFlow = Flow[ByteString].via(new TcpFraming(afr)) + + private val payload5 = ByteString((1 to 5).map(_.toByte).toArray) + + private def frameBytes(numberOfFrames: Int): ByteString = + (1 to numberOfFrames).foldLeft(ByteString.empty)((acc, _) ⇒ acc ++ encodeFrameHeader(payload5.size) ++ payload5) + + private val rndSeed = System.currentTimeMillis() + private val rnd = new Random(rndSeed) + + private def rechunk(bytes: ByteString): Iterator[ByteString] = { + var remaining = bytes + new Iterator[ByteString] { + override def hasNext: Boolean = remaining.nonEmpty + + override def next(): ByteString = { + val chunkSize = rnd.nextInt(remaining.size) + 1 // no 0 length frames + val chunk = remaining.take(chunkSize) + remaining = remaining.drop(chunkSize) + chunk + } + } + } + + "TcpFraming stage" must { + + "grab streamId from connection header" in { + val bytes = TcpFraming.encodeConnectionHeader(2) ++ frameBytes(1) + val frames = Source(List(bytes)).via(framingFlow).runWith(Sink.seq).futureValue + frames.head.streamId should ===(2) + } + + "grab streamId from connection header in single chunk" in { + val frames = Source(List(TcpFraming.encodeConnectionHeader(1), frameBytes(1))).via(framingFlow) + .runWith(Sink.seq).futureValue + frames.head.streamId should ===(1) + } + + "reject invalid magic" in { + val bytes = frameBytes(2) + val fail = Source(List(bytes)).via(framingFlow).runWith(Sink.seq) + .failed.futureValue + fail shouldBe a[ParsingException] + fail.getCause shouldBe a[FramingException] + } + + "include streamId in each frame" in { + val bytes = TcpFraming.encodeConnectionHeader(3) ++ frameBytes(3) + val frames = Source(List(bytes)).via(framingFlow).runWith(Sink.seq).futureValue + frames(0).streamId should ===(3) + frames(1).streamId should ===(3) + frames(2).streamId should ===(3) + } + + "parse frames from random chunks" in { + val numberOfFrames = 100 + val bytes = TcpFraming.encodeConnectionHeader(3) ++ frameBytes(numberOfFrames) + withClue(s"Random chunks seed: $rndSeed") { + val frames = Source.fromIterator(() ⇒ rechunk(bytes)).via(framingFlow).runWith(Sink.seq).futureValue + frames.size should ===(numberOfFrames) + frames.foreach { frame ⇒ + frame.byteBuffer.limit should ===(payload5.size) + val payload = new Array[Byte](frame.byteBuffer.limit) + frame.byteBuffer.get(payload) + ByteString(payload) should ===(payload5) + frame.streamId should ===(3) + } + } + } + + "report truncated frames" in { + val bytes = TcpFraming.encodeConnectionHeader(3) ++ frameBytes(3).drop(1) + Source(List(bytes)).via(framingFlow).runWith(Sink.seq) + .failed.futureValue shouldBe a[FramingException] + } + + "work with empty stream" in { + val frames = Source.empty.via(framingFlow).runWith(Sink.seq).futureValue + frames.size should ===(0) + } + + } + +}