From 162a1f80a0afc0bc1652043ddd3e59b32635037b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 16 Feb 2018 09:26:29 +0100 Subject: [PATCH] Add Artery TCP transport, #24390 * transport config * TCP specific classes in akka.remote.artery.tcp package * TcpFraming stage that handle the additional streamId field and length based framing. Credit to jrudolph for this clean solution, which made it possible to use same envolope header for Aeron and TCP. * magic first bytes to detect invalid access * drain SendQueue to deadLetters in postStop * error handling, restart, inbound and outbound streams * udp vs tcp in autoSelectPort * TCP specific flight recorder events * update reference documentation --- akka-docs/src/main/paradox/remoting-artery.md | 37 +- .../artery/ArteryFailedToBindSpec.scala | 12 +- akka-remote/src/main/resources/reference.conf | 35 +- .../akka/remote/RemoteActorRefProvider.scala | 15 +- .../akka/remote/artery/ArterySettings.scala | 23 ++ .../akka/remote/artery/ArteryTransport.scala | 29 +- .../remote/artery/EnvelopeBufferPool.scala | 8 + .../remote/artery/FlightRecorderEvents.scala | 20 +- .../artery/tcp/ArteryTcpTransport.scala | 391 ++++++++++++++++++ .../akka/remote/artery/tcp/TcpFraming.scala | 102 +++++ .../remote/artery/RemoteDeathWatchSpec.scala | 15 +- .../artery/RemoteSendConsistencySpec.scala | 14 + .../remote/artery/tcp/TcpFramingSpec.scala | 112 +++++ 13 files changed, 790 insertions(+), 23 deletions(-) create mode 100644 akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala create mode 100644 akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala diff --git a/akka-docs/src/main/paradox/remoting-artery.md b/akka-docs/src/main/paradox/remoting-artery.md index 4915a5434c..0053df1871 100644 --- a/akka-docs/src/main/paradox/remoting-artery.md +++ b/akka-docs/src/main/paradox/remoting-artery.md @@ -2,8 +2,11 @@ @@@ note -This page describes the @ref:[may change](common/may-change.md) remoting subsystem, codenamed *Artery* that will eventually replace the -old remoting implementation. For the current stable remoting system please refer to @ref:[Remoting](remoting.md). +This page describes the remoting subsystem, codenamed *Artery* that will eventually replace the +@ref:[old remoting implementation](remoting.md). Artery with the Aeron transport is ready +to use in production. The TCP based transport is not ready for use in production yet. The module is +marked @ref:[may change](common/may-change.md) because some configuration will be changed when the API +becomes stable. @@@ @@ -25,7 +28,7 @@ Artery is a reimplementation of the old remoting module aimed at improving perfo source compatible with the old implementation and it is a drop-in replacement in many cases. Main features of Artery compared to the previous implementation: - * Based on [Aeron](https://github.com/real-logic/Aeron) (UDP) instead of TCP + * Based on [Aeron](https://github.com/real-logic/Aeron) (UDP) and Akka Streams TCP/TLS instead of Netty TCP * Focused on high-throughput, low-latency communication * Isolation of internal control messages from user messages improving stability and reducing false failure detection in case of heavy traffic by using a dedicated subchannel. @@ -75,6 +78,7 @@ akka { remote { artery { enabled = on + transport = aeron-udp canonical.hostname = "127.0.0.1" canonical.port = 25520 } @@ -102,7 +106,28 @@ listening for connections and handling messages as not to interfere with other a @@@ The example above only illustrates the bare minimum of properties you have to add to enable remoting. -All settings are described in [Remote Configuration](#remote-configuration-artery). +All settings are described in @ref:[Remote Configuration](#remote-configuration-artery). + +### Selecting transport + +There are three alternatives of which underlying transport to use. It is configured by property +`akka.remote.artery.transport` with the possible values: + +* `aeron-udp` - Based on [Aeron (UDP)](https://github.com/real-logic/aeron) +* `tcp` - Based on @ref:[Akka Streams TCP](stream/stream-io.md#streaming-tcp) +* `tls-tcp` - Same as `tcp` with encryption using @ref:[Akka Streams TLS](stream/stream-io.md#tls) + +The Aeron (UDP) transport is a high performance transport and should be used for systems +that require high throughput and low latency. It is using more CPU than TCP when the system +is idle or at low message rates. There is no encryption for Aeron. + +The TCP and TLS transport is implemented using Akka Streams TCP/TLS. This is the choice +when encryption is needed, but it can also be used with plain TCP without TLS. It's also +the obvious choice when UDP can't be used. +It has very good performance (high throughput and low latency) but not as good as the Aeron transport. +It is using less CPU than Aeron when the system is idle or at low message rates. +This has not been verified yet, but it might scale better for many connections than the Aereon +transport, which can be of importance for large clusters with 100s or even 1000s of nodes. @@@ note @@ -764,8 +789,8 @@ See Aeron documentation about [Performance Testing](https://github.com/real-logi ### Fine-tuning CPU usage latency tradeoff Artery has been designed for low latency and as a result it can be CPU hungry when the system is mostly idle. -This is not always desirable. It is possible to tune the tradeoff between CPU usage and latency with -the following configuration: +This is not always desirable. When using the Aeron transport it is possible to tune the tradeoff between CPU +usage and latency with the following configuration: ``` # Values can be from 1 to 10, where 10 strongly prefers low latency diff --git a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala index 00069e3a22..eae1d91d5d 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala @@ -1,8 +1,10 @@ package akka.remote.artery import akka.actor.ActorSystem +import akka.remote.RARP import akka.remote.RemoteTransportException import akka.testkit.SocketUtil +import akka.testkit.TestKit import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, WordSpec } @@ -32,9 +34,15 @@ class ArteryFailedToBindSpec extends WordSpec with Matchers { val ex = intercept[RemoteTransportException] { ActorSystem("BindTest2", config) } - ex.getMessage should equal("Inbound Aeron channel is in errored state. See Aeron logs for details.") + RARP(as).provider.transport.asInstanceOf[ArteryTransport].settings.Transport match { + case ArterySettings.AeronUpd ⇒ + ex.getMessage should ===("Inbound Aeron channel is in errored state. See Aeron logs for details.") + case ArterySettings.Tcp ⇒ + ex.getMessage should startWith("Failed to bind TCP") + } + } finally { - as.terminate() + TestKit.shutdownActorSystem(as) } } } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index ac7f383875..dabf1562ca 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -730,6 +730,22 @@ akka { # Enable the new remoting with this flag enabled = off + # Select the underlying transport implementation. + # + # Possible values: aeron-udp, tcp + # + # The Aeron (UDP) transport is a high performance transport and should be used for systems + # that require high throughput and low latency. It is using more CPU than TCP when the + # system is idle or at low message rates. There is no encryption for Aeron. + # https://github.com/real-logic/aeron + # + # The TCP and TLS transport is implemented using Akka Streams TCP/TLS. This is the choice + # when encryption is needed, but it can also be used with plain TCP without TLS. It's also + # the obvious choice when UDP can't be used. + # It has very good performance (high throughput and low latency) but not as good as the Aeron transport. + # It is using less CPU than Aeron when the system is idle or at low message rates. + transport = aeron-udp + # Canonical address is the address other clients should connect to. # Artery transport will expect messages to this address. canonical { @@ -775,6 +791,7 @@ akka { } # Periodically log out all Aeron counters. See https://github.com/real-logic/aeron/wiki/Monitoring-and-Debugging#counters + # Only used when transport is aeron-udp. log-aeron-counters = false # Actor paths to use the large message stream for when a message @@ -859,15 +876,18 @@ akka { # Controls whether to start the Aeron media driver in the same JVM or use external # process. Set to 'off' when using external media driver, and then also set the # 'aeron-dir'. + # Only used when transport is aeron-udp. embedded-media-driver = on # Directory used by the Aeron media driver. It's mandatory to define the 'aeron-dir' # if using external media driver, i.e. when 'embedded-media-driver = off'. # Embedded media driver will use a this directory, or a temporary directory if this # property is not defined (empty). + # Only used when transport is aeron-udp. aeron-dir = "" # Whether to delete aeron embeded driver directory upon driver stop. + # Only used when transport is aeron-udp. delete-aeron-dir = yes # Level of CPU time used, on a scale between 1 and 10, during backoff/idle. @@ -876,6 +896,7 @@ akka { # backoff backpressure. # Level 1 strongly prefer low CPU consumption over low latency. # Level 10 strongly prefer low latency over low CPU consumption. + # Only used when transport is aeron-udp. idle-cpu-level = 5 # Total number of inbound lanes, shared among all inbound associations. A value @@ -926,9 +947,13 @@ akka { # unacknowledged system messages are re-delivered with this interval system-message-resend-interval = 1 second + # Timeout of establishing outbound connections. + # Only used when transport is tcp + connection-timeout = 5 seconds + # The timeout for outbound associations to perform the handshake. # This timeout must be greater than the 'image-liveness-timeout'. - handshake-timeout = 20 s + handshake-timeout = 20 seconds # incomplete handshake attempt is retried with this interval handshake-retry-interval = 1 second @@ -939,6 +964,7 @@ akka { inject-handshake-interval = 1 second # messages that are not accepted by Aeron are dropped after retrying for this period + # Only used when transport is aeron-udp. give-up-message-after = 60 seconds # System messages that are not acknowledged after re-sending for this period are @@ -958,6 +984,10 @@ akka { # If more restarts occurs the ActorSystem will be terminated. inbound-max-restarts = 5 + # Retry outbound connection after this backoff. + # Only used when transport is tcp + outbound-restart-backoff = 1 second + # See 'outbound-max-restarts' outbound-restart-timeout = 5 seconds @@ -971,15 +1001,18 @@ akka { # Timeout after which aeron driver has not had keepalive messages # from a client before it considers the client dead. + # Only used when transport is aeron-udp. client-liveness-timeout = 20 seconds # Timeout for each the INACTIVE and LINGER stages an aeron image # will be retained for when it is no longer referenced. # This timeout must be less than the 'handshake-timeout'. + # Only used when transport is aeron-udp. image-liveness-timeout = 10 seconds # Timeout after which the aeron driver is considered dead # if it does not update its C'n'C timestamp. + # Only used when transport is aeron-udp. driver-timeout = 20 seconds flight-recorder { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5fdd2d5a0b..bbf7bea5a7 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -9,11 +9,12 @@ import akka.actor._ import akka.dispatch.sysmsg._ import akka.event.{ EventStream, Logging, LoggingAdapter } import akka.event.Logging.Error -import akka.serialization.{ Serialization, SerializationExtension } import akka.pattern.pipe + import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } + import scala.util.control.Exception.Catcher import scala.concurrent.Future @@ -21,12 +22,15 @@ import akka.ConfigurationException import akka.annotation.InternalApi import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } import akka.remote.artery.ArteryTransport +import akka.remote.artery.aeron.ArteryAeronUdpTransport +import akka.remote.artery.ArterySettings +import akka.remote.artery.ArterySettings.AeronUpd import akka.util.OptionVal import akka.remote.artery.OutboundEnvelope import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope -import akka.remote.artery.aeron.ArteryAeronUdpTransport import akka.remote.serialization.ActorRefResolveCache import akka.remote.serialization.ActorRefResolveThreadLocalCache +import akka.remote.artery.tcp.ArteryTcpTransport /** * INTERNAL API @@ -204,7 +208,12 @@ private[akka] class RemoteActorRefProvider( local.registerExtraNames(Map(("remote", d))) d }, - transport = if (remoteSettings.Artery.Enabled) new ArteryAeronUdpTransport(system, this) else new Remoting(system, this)) + transport = + if (remoteSettings.Artery.Enabled) remoteSettings.Artery.Transport match { + case ArterySettings.AeronUpd ⇒ new ArteryAeronUdpTransport(system, this) + case ArterySettings.Tcp ⇒ new ArteryTcpTransport(system, this) + } + else new Remoting(system, this)) _internals = internals remotingTerminator ! internals diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 2abdde9435..7e11ffdcaf 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -71,6 +71,13 @@ private[akka] final class ArterySettings private (config: Config) { val LogSend: Boolean = getBoolean("log-sent-messages") val LogAeronCounters: Boolean = config.getBoolean("log-aeron-counters") + val Transport: Transport = toRootLowerCase(getString("transport")) match { + case AeronUpd.configName ⇒ AeronUpd + case Tcp.configName ⇒ Tcp + case other ⇒ throw new IllegalArgumentException(s"Unknown transport [$other], possible values: " + + s""""${AeronUpd.configName}", "${Tcp.configName}"""") + } + /** * Used version of the header format for outbound messages. * To support rolling upgrades this may be a lower version than `ArteryTransport.HighestVersion`, @@ -126,6 +133,8 @@ private[akka] final class ArterySettings private (config: Config) { val InjectHandshakeInterval: FiniteDuration = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒ interval > Duration.Zero, "inject-handshake-interval must be more than zero") + val ConnectionTimeout: FiniteDuration = config.getMillisDuration("connection-timeout").requiring(interval ⇒ + interval > Duration.Zero, "connection-timeout must be more than zero") val GiveUpMessageAfter: FiniteDuration = config.getMillisDuration("give-up-message-after").requiring(interval ⇒ interval > Duration.Zero, "give-up-message-after must be more than zero") val GiveUpSystemMessageAfter: FiniteDuration = @@ -138,6 +147,9 @@ private[akka] final class ArterySettings private (config: Config) { config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒ interval > Duration.Zero, "inbound-restart-timeout must be more than zero") val InboundMaxRestarts: Int = getInt("inbound-max-restarts") + val OutboundRestartBackoff: FiniteDuration = + config.getMillisDuration("outbound-restart-backoff").requiring(interval ⇒ + interval > Duration.Zero, "outbound-restart-backoff must be more than zero") val OutboundRestartTimeout: FiniteDuration = config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒ interval > Duration.Zero, "outbound-restart-timeout must be more than zero") @@ -205,4 +217,15 @@ private[akka] object ArterySettings { case other ⇒ other } + sealed trait Transport { + val configName: String + } + object AeronUpd extends Transport { + override val configName: String = "aeron-udp" + override def toString: String = configName + } + object Tcp extends Transport { + override val configName: String = "tcp" + override def toString: String = configName + } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 5414a3f4c2..78f8dade93 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -398,15 +398,16 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr startTransport() topLevelFREvents.loFreq(Transport_Started, NoMetaData) + val udp = settings.Transport == ArterySettings.AeronUpd val port = if (settings.Canonical.Port == 0) { if (settings.Bind.Port != 0) settings.Bind.Port // if bind port is set, use bind port instead of random - else ArteryTransport.autoSelectPort(settings.Canonical.Hostname) + else ArteryTransport.autoSelectPort(settings.Canonical.Hostname, udp) } else settings.Canonical.Port val bindPort = if (settings.Bind.Port == 0) { if (settings.Canonical.Port == 0) port // canonical and bind ports are zero. Use random port for both - else ArteryTransport.autoSelectPort(settings.Bind.Hostname) + else ArteryTransport.autoSelectPort(settings.Bind.Hostname, udp) } else settings.Bind.Port _localAddress = UniqueAddress( @@ -432,7 +433,9 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr runInboundStreams() topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData) - log.info("Remoting started; listening on address: [{}] with UID [{}]", localAddress.address, localAddress.uid) + log.info( + "Remoting started with transport [Artery {}]; listening on address [{}] with UID [{}]", + settings.Transport, localAddress.address, localAddress.uid) } protected def startTransport(): Unit @@ -870,12 +873,20 @@ private[remote] object ArteryTransport { aeronSourceLifecycle: Option[AeronSource.ResourceLifecycle], completed: Future[Done]) - def autoSelectPort(hostname: String): Int = { - val socket = DatagramChannel.open().socket() - socket.bind(new InetSocketAddress(hostname, 0)) - val port = socket.getLocalPort - socket.close() - port + def autoSelectPort(hostname: String, udp: Boolean): Int = { + if (udp) { + val socket = DatagramChannel.open().socket() + socket.bind(new InetSocketAddress(hostname, 0)) + val port = socket.getLocalPort + socket.close() + port + } else { + val socket = ServerSocketChannel.open().socket() + socket.bind(new InetSocketAddress(hostname, 0)) + val port = socket.getLocalPort + socket.close() + port + } } val ControlStreamId = 1 diff --git a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala index db766a225d..5df4ae49f8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala @@ -383,6 +383,14 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { private var literalChars = new Array[Char](64) private var literalBytes = new Array[Byte](64) + // The streamId is only used for TCP transport. It is not part of the ordinary envelope header, but included in the + // frame header that is parsed by the TcpFraming stage. + private var _streamId: Int = -1 + def streamId: Int = + if (_streamId != -1) _streamId + else throw new IllegalStateException("StreamId was not set") + def setStreamId(newStreamId: Int): Unit = _streamId = newStreamId + def writeHeader(h: HeaderBuilder): Unit = writeHeader(h, null) def writeHeader(h: HeaderBuilder, oe: OutboundEnvelope): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala index 2475d64ba6..a32953a432 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala @@ -51,6 +51,14 @@ private[remote] object FlightRecorderEvents { val Compression_Inbound_RunActorRefAdvertisement = 94 val Compression_Inbound_RunClassManifestAdvertisement = 95 + val TcpOutbound_Connected = 150 + val TcpOutbound_Sent = 151 + + val TcpInbound_Bound = 170 + val TcpInbound_Unbound = 171 + val TcpInbound_Connected = 172 + val TcpInbound_Received = 173 + // Used for presentation of the entries in the flight recorder lazy val eventDictionary = Map( Transport_MediaDriverStarted → "Transport: Media driver started", @@ -92,7 +100,17 @@ private[remote] object FlightRecorderEvents { Compression_CompressedManifest → "Compression: Compressed manifest", Compression_AllocatedManifestCompressionId → "Compression: Allocated manifest compression id", Compression_Inbound_RunActorRefAdvertisement → "InboundCompression: Run class manifest compression advertisement", - Compression_Inbound_RunClassManifestAdvertisement → "InboundCompression: Run class manifest compression advertisement" + Compression_Inbound_RunClassManifestAdvertisement → "InboundCompression: Run class manifest compression advertisement", + + // TCP outbound events + TcpOutbound_Connected -> "TCP out: Connected", + TcpOutbound_Sent -> "TCP out: Sent message", + + // TCP inbound events + TcpInbound_Bound -> "TCP in: Bound", + TcpInbound_Unbound -> "TCP in: Unbound", + TcpInbound_Connected -> "TCP in: New connection", + TcpInbound_Received -> "TCP in: Received message" ).map { case (int, str) ⇒ int.toLong → str } diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala new file mode 100644 index 0000000000..985ab45e8f --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -0,0 +1,391 @@ +/** + * Copyright (C) 2017 Lightbend Inc. + */ +package akka.remote.artery +package tcp + +import java.net.InetSocketAddress + +import scala.concurrent.Await +import scala.concurrent.ExecutionContext +import scala.concurrent.ExecutionContextExecutor +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.concurrent.duration._ +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import akka.ConfigurationException +import akka.Done +import akka.NotUsed +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem +import akka.dispatch.ExecutionContexts +import akka.event.Logging +import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteTransportException +import akka.remote.artery.Decoder.InboundCompressionAccess +import akka.remote.artery.compress._ +import akka.stream.Attributes +import akka.stream.Attributes.LogLevels +import akka.stream.FlowShape +import akka.stream.Graph +import akka.stream.KillSwitches +import akka.stream.Materializer +import akka.stream.SharedKillSwitch +import akka.stream.SinkShape +import akka.stream.scaladsl.Broadcast +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.GraphDSL +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.MergeHub +import akka.stream.scaladsl.Partition +import akka.stream.scaladsl.RestartFlow +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.stream.scaladsl.Tcp +import akka.stream.scaladsl.Tcp.IncomingConnection +import akka.stream.scaladsl.Tcp.ServerBinding +import akka.util.ByteString +import akka.util.OptionVal + +/** + * INTERNAL API + */ +private[remote] object ArteryTcpTransport { + + private val successUnit = Success(()) + + def optionToTry(opt: Option[Throwable]): Try[Unit] = opt match { + case None ⇒ successUnit + case Some(t) ⇒ Failure(t) + } +} + +/** + * INTERNAL API + */ +private[remote] class ArteryTcpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) + extends ArteryTransport(_system, _provider) { + import ArteryTransport._ + import ArteryTcpTransport._ + import FlightRecorderEvents._ + + // may change when inbound streams are restarted + @volatile private var inboundKillSwitch: SharedKillSwitch = KillSwitches.shared("inboundKillSwitch") + // may change when inbound streams are restarted + @volatile private var inboundStream: OptionVal[Sink[EnvelopeBuffer, NotUsed]] = OptionVal.None + @volatile private var serverBinding: Option[Future[ServerBinding]] = None + + override protected def startTransport(): Unit = { + // nothing specific here + } + + override protected def outboundTransportSink( + outboundContext: OutboundContext, + streamId: Int, + bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = { + implicit val sys: ActorSystem = system + + val afr = createFlightRecorderEventSink() + + val host = outboundContext.remoteAddress.host.get + val port = outboundContext.remoteAddress.port.get + val remoteAddress = InetSocketAddress.createUnresolved(host, port) + + def connectionFlow: Flow[ByteString, ByteString, Future[Tcp.OutgoingConnection]] = + Tcp() + .outgoingConnection( + remoteAddress, + halfClose = true, // issue https://github.com/akka/akka/issues/24392 if set to false + connectTimeout = settings.Advanced.ConnectionTimeout) + + def connectionFlowWithRestart: Flow[ByteString, ByteString, NotUsed] = { + val flowFactory = () ⇒ { + + afr.loFreq( + TcpOutbound_Connected, + s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " + + s"/ ${streamName(streamId)}") + Flow[ByteString] + .prepend(Source.single(TcpFraming.encodeConnectionHeader(streamId))) + .via(connectionFlow) + .mapMaterializedValue(_ ⇒ NotUsed) + .recoverWithRetries(1, { case ArteryTransport.ShutdownSignal ⇒ Source.empty }) + .log(name = s"outbound connection to [${outboundContext.remoteAddress}], ${streamName(streamId)} stream") + .addAttributes(Attributes.logLevels(onElement = LogLevels.Off, onFailure = Logging.WarningLevel)) + } + + if (streamId == ControlStreamId) { + // restart of inner connection part important in control flow, since system messages + // are buffered and resent from the outer SystemMessageDelivery stage. + RestartFlow.withBackoff[ByteString, ByteString]( + settings.Advanced.OutboundRestartBackoff, + settings.Advanced.GiveUpSystemMessageAfter, 0.1)(flowFactory) + } else { + // Best effort retry a few times + // FIXME only restart on failures?, but missing in RestartFlow, see https://github.com/akka/akka/pull/23911 + RestartFlow.withBackoff[ByteString, ByteString]( + settings.Advanced.OutboundRestartBackoff, + settings.Advanced.OutboundRestartBackoff * 5, 0.1, maxRestarts = 3)(flowFactory) + } + + } + + Flow[EnvelopeBuffer] + .map { env ⇒ + // TODO Possible performance improvement, could we reduce the copying of bytes? + afr.hiFreq(TcpOutbound_Sent, env.byteBuffer.limit) + val size = env.byteBuffer.limit + + val bytes = ByteString(env.byteBuffer) + bufferPool.release(env) + + TcpFraming.encodeFrameHeader(size) ++ bytes + } + .via(connectionFlowWithRestart) + .map(_ ⇒ throw new IllegalStateException(s"Unexpected incoming bytes in outbound connection to [${outboundContext.remoteAddress}]")) + .toMat(Sink.ignore)(Keep.right) + } + + override protected def runInboundStreams(): Unit = { + + // Design note: The design of how to run the inbound streams are influenced by the original design + // for the Aeron streams, and there we can only have one single inbound since everything comes in + // via the single AeronSource. + // + // For TCP we could materialize the inbound streams for each inbound connection, i.e. running many + // completely separate inbound streams. Each would still have to include all 3 control, ordinary, + // large parts for each connection even though only one is used by a specific connection. Unless + // we can dynamically choose what to materialize based on the `streamId` (as a first byte, or + // in first frame of the connection), which is complicated. + // + // However, this would be make the design for Aeron and TCP more different and more things might + // have to be changed, such as compression advertisements and materialized values. Number of + // inbound streams would be dynamic, and so on. + + implicit val mat: Materializer = materializer + implicit val sys: ActorSystem = system + + // These streams are always running, only one instance of each inbound stream, and then the inbound connections + // are attached to these via a MergeHub. + val (controlStream, controlStreamCompleted) = runInboundControlStream() + val (ordinaryMessagesStream, ordinaryMessagesStreamCompleted) = runInboundOrdinaryMessagesStream() + val (largeMessagesStream, largeMessagesStreamCompleted) = { + if (largeMessageChannelEnabled) + runInboundLargeMessagesStream() + else + ( + Flow[EnvelopeBuffer] + .map(_ ⇒ log.warning("Dropping large message, missing large-message-destinations configuration.")) + .to(Sink.ignore), + Promise[Done]().future) // never completed, not enabled + } + + // An inbound connection will only use one of the control, ordinary or large streams, but we have to + // attach it to all and select via Partition and the streamId in the frame header. Conceptually it + // would have been better to send the streamId as one single first byte for a new connection and + // decide where to attach it based on that byte. Then the streamId wouldn't have to be sent in each + // frame. That was not chosen because it is more complicated to implement and might have more runtime + // overhead. + inboundStream = + OptionVal.Some(Sink.fromGraph(GraphDSL.create() { implicit b ⇒ + import GraphDSL.Implicits._ + val partition = b.add(Partition[EnvelopeBuffer](3, env ⇒ { + env.streamId match { + case OrdinaryStreamId ⇒ 1 + case ControlStreamId ⇒ 0 + case LargeStreamId ⇒ 2 + case other ⇒ throw new IllegalArgumentException(s"Unexpected streamId [$other]") + } + })) + partition.out(0) ~> controlStream + partition.out(1) ~> ordinaryMessagesStream + partition.out(2) ~> largeMessagesStream + SinkShape(partition.in) + })) + + // If something in the inboundConnectionFlow fails, e.g. framing, the connection will be teared down, + // but other parts of the inbound streams don't have to restarted. + def inboundConnectionFlow(inboundConnection: IncomingConnection): Flow[ByteString, ByteString, NotUsed] = { + // must create new Flow for each connection because of the FlightRecorder that can't be shared + val afr = createFlightRecorderEventSink() + Flow[ByteString] + .via(inboundKillSwitch.flow) + .via(new TcpFraming(afr)) + .alsoTo(inboundStream.get) + .filter(_ ⇒ false) // don't send back anything in this TCP socket + .map(_ ⇒ ByteString.empty) // make it a Flow[ByteString] again + } + + val host = localAddress.address.host.get + val port = localAddress.address.port.get + + val connectionSource: Source[Tcp.IncomingConnection, Future[ServerBinding]] = + Tcp().bind( + interface = host, + port = port, + halfClose = false) + + serverBinding = serverBinding match { + case None ⇒ + val afr = createFlightRecorderEventSink() + val binding = connectionSource + .to(Sink.foreach { connection ⇒ + afr.loFreq( + TcpInbound_Connected, + s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}") + connection.handleWith(inboundConnectionFlow(connection)) + }) + .run() + .recoverWith { + case e ⇒ Future.failed(new RemoteTransportException( + s"Failed to bind TCP to [${localAddress.address.host.get}:${localAddress.address.port.get}] due to: " + + e.getMessage, e)) + }(ExecutionContexts.sameThreadExecutionContext) + + // only on initial startup, when ActorSystem is starting + Await.result(binding, settings.Bind.BindTimeout) + afr.loFreq(TcpInbound_Bound, s"$host:$port") + Some(binding) + case s @ Some(_) ⇒ + // already bound, when restarting + s + } + + // Failures in any of the inbound streams should be extremely rare, probably an unforeseen accident. + // Tear down everything and start over again. Inbound streams are "stateless" so that should be fine. + // Tested in SurviveInboundStreamRestartWithCompressionInFlightSpec + implicit val ec: ExecutionContext = materializer.executionContext + val completed = Future.firstCompletedOf( + List(controlStreamCompleted, ordinaryMessagesStreamCompleted, largeMessagesStreamCompleted)) + val restart = () ⇒ { + inboundKillSwitch.shutdown() + inboundKillSwitch = KillSwitches.shared("inboundKillSwitch") + + val allStopped: Future[Done] = for { + _ ← controlStreamCompleted.recover { case _ ⇒ Done } + _ ← ordinaryMessagesStreamCompleted.recover { case _ ⇒ Done } + _ ← if (largeMessageChannelEnabled) + largeMessagesStreamCompleted.recover { case _ ⇒ Done } else Future.successful(Done) + } yield Done + allStopped.foreach(_ ⇒ runInboundStreams()) + } + + attachInboundStreamRestart("Inbound streams", completed, restart) + } + + private def runInboundControlStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = { + if (isShutdown) throw ArteryTransport.ShuttingDown + + val (hub, ctrl, completed) = + MergeHub.source[EnvelopeBuffer].addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) + .via(inboundKillSwitch.flow) + .via(inboundFlow(settings, NoInboundCompressions)) + .toMat(inboundControlSink)({ case (a, (c, d)) ⇒ (a, c, d) }) + .run()(controlMaterializer) + attachControlMessageObserver(ctrl) + implicit val ec: ExecutionContext = materializer.executionContext + updateStreamMatValues(ControlStreamId, completed) + + (hub, completed) + } + + private def runInboundOrdinaryMessagesStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = { + if (isShutdown) throw ArteryTransport.ShuttingDown + + val (inboundHub: Sink[EnvelopeBuffer, NotUsed], inboundCompressionAccess, completed) = + if (inboundLanes == 1) { + MergeHub.source[EnvelopeBuffer].addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) + .via(inboundKillSwitch.flow) + .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) + .toMat(inboundSink(envelopeBufferPool))({ case ((a, b), c) ⇒ (a, b, c) }) + .run()(materializer) + + } else { + // TODO perhaps a few more things can be extracted and DRY with AeronUpdTransport.runInboundOrdinaryMessagesStream + val laneKillSwitch = KillSwitches.shared("laneKillSwitch") + val laneSource: Source[InboundEnvelope, (Sink[EnvelopeBuffer, NotUsed], InboundCompressionAccess)] = + MergeHub.source[EnvelopeBuffer].addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) + .via(inboundKillSwitch.flow) + .via(laneKillSwitch.flow) + .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) + .via(Flow.fromGraph(new DuplicateHandshakeReq(inboundLanes, this, system, envelopeBufferPool))) + + val (inboundHub, compressionAccess, laneHub) = + laneSource + .toMat(Sink.fromGraph(new FixedSizePartitionHub[InboundEnvelope](inboundLanePartitioner, inboundLanes, + settings.Advanced.InboundHubBufferSize)))({ case ((a, b), c) ⇒ (a, b, c) }) + .run()(materializer) + + val lane = inboundSink(envelopeBufferPool) + val completedValues: Vector[Future[Done]] = + (0 until inboundLanes).map { _ ⇒ + laneHub.toMat(lane)(Keep.right).run()(materializer) + }(collection.breakOut) + + import system.dispatcher + + // tear down the upstream hub part if downstream lane fails + // lanes are not completed with success by themselves so we don't have to care about onSuccess + Future.firstCompletedOf(completedValues).failed.foreach { reason ⇒ laneKillSwitch.abort(reason) } + val allCompleted = Future.sequence(completedValues).map(_ ⇒ Done) + + (inboundHub, compressionAccess, allCompleted) + } + + setInboundCompressionAccess(inboundCompressionAccess) + + updateStreamMatValues(OrdinaryStreamId, completed) + + (inboundHub, completed) + } + + private def runInboundLargeMessagesStream(): (Sink[EnvelopeBuffer, NotUsed], Future[Done]) = { + if (isShutdown) throw ArteryTransport.ShuttingDown + + val (hub, completed) = + MergeHub.source[EnvelopeBuffer].addAttributes(Attributes.logLevels(onFailure = LogLevels.Off)) + .via(inboundKillSwitch.flow) + .via(inboundLargeFlow(settings)) + .toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both) + .run()(materializer) + + updateStreamMatValues(LargeStreamId, completed) + + (hub, completed) + } + + private def updateStreamMatValues(streamId: Int, completed: Future[Done]): Unit = { + implicit val ec: ExecutionContext = materializer.executionContext + updateStreamMatValues(ControlStreamId, InboundStreamMatValues( + None, + completed.recover { case _ ⇒ Done })) + } + + override protected def shutdownTransport(): Future[Done] = { + implicit val ec: ExecutionContext = materializer.executionContext + inboundKillSwitch.shutdown() + unbind().map { _ ⇒ + topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) + Done + } + } + + private def unbind(): Future[Done] = { + implicit val ec: ExecutionContext = materializer.executionContext + serverBinding match { + case Some(binding) ⇒ + for { + b ← binding + _ ← b.unbind() + } yield { + topLevelFREvents.loFreq(TcpInbound_Bound, s"${localAddress.address.host.get}:${localAddress.address.port}") + Done + } + case None ⇒ + Future.successful(Done) + } + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala new file mode 100644 index 0000000000..3332226976 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala @@ -0,0 +1,102 @@ +/** + * Copyright (C) 2018 Lightbend Inc. + */ +package akka.remote.artery +package tcp + +import java.nio.ByteBuffer +import java.nio.ByteOrder + +import akka.annotation.InternalApi +import akka.remote.artery.FlightRecorderEvents.TcpInbound_Received +import akka.stream.Attributes +import akka.stream.impl.io.ByteStringParser +import akka.stream.impl.io.ByteStringParser.ByteReader +import akka.stream.impl.io.ByteStringParser.ParseResult +import akka.stream.impl.io.ByteStringParser.ParseStep +import akka.stream.scaladsl.Framing.FramingException +import akka.stream.stage.GraphStageLogic +import akka.util.ByteString + +/** + * INTERNAL API + */ +@InternalApi private[akka] object TcpFraming { + val Undefined = Int.MinValue + + /** + * The first 4 bytes of a new connection must be these `0x64 0x75 0x75 0x64` (AKKA). + * The purpose of the "magic" is to detect and reject weird (accidental) accesses. + */ + val Magic = ByteString('A'.toByte, 'K'.toByte, 'K'.toByte, 'A'.toByte) + + /** + * When establishing the connection this header is sent first. + * It contains a "magic" and the stream identifier for selecting control, ordinary, large + * inbound streams. + * + * The purpose of the "magic" is to detect and reject weird (accidental) accesses. + * The magic 4 bytes are `0x64 0x75 0x75 0x64` (AKKA). + * + * The streamId` is encoded as 1 byte. + */ + def encodeConnectionHeader(streamId: Int): ByteString = + Magic ++ ByteString(streamId.toByte) + + /** + * Each frame starts with the frame header that contains the length + * of the frame. The `frameLength` is encoded as 4 bytes (little endian). + */ + def encodeFrameHeader(frameLength: Int): ByteString = + ByteString( + (frameLength & 0xff).toByte, + ((frameLength & 0xff00) >> 8).toByte, + ((frameLength & 0xff0000) >> 16).toByte, + ((frameLength & 0xff000000) >> 24).toByte + ) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class TcpFraming(flightRecorder: EventSink) extends ByteStringParser[EnvelopeBuffer] { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic { + abstract class Step extends ParseStep[EnvelopeBuffer] + startWith(ReadMagic) + + case object ReadMagic extends Step { + override def parse(reader: ByteReader): ParseResult[EnvelopeBuffer] = { + val magic = reader.take(TcpFraming.Magic.size) + if (magic == TcpFraming.Magic) + ParseResult(None, ReadStreamId) + else + throw new FramingException("Stream didn't start with expected magic bytes, " + + s"got [${(magic ++ reader.remainingData).take(10).map(_ formatted "%02x").mkString(" ")}] " + + "Connection is rejected. Probably invalid accidental access.") + } + } + case object ReadStreamId extends Step { + override def parse(reader: ByteReader): ParseResult[EnvelopeBuffer] = + ParseResult(None, ReadFrame(reader.readByte())) + } + case class ReadFrame(streamId: Int) extends Step { + override def onTruncation(): Unit = + failStage(new FramingException("Stream finished but there was a truncated final frame in the buffer")) + + override def parse(reader: ByteReader): ParseResult[EnvelopeBuffer] = { + val frameLength = reader.readIntLE() + val buffer = createBuffer(reader.take(frameLength)) + ParseResult(Some(buffer), this) + } + + private def createBuffer(bs: ByteString): EnvelopeBuffer = { + val buffer = ByteBuffer.wrap(bs.toArray) + buffer.order(ByteOrder.LITTLE_ENDIAN) + flightRecorder.hiFreq(TcpInbound_Received, buffer.limit) + val res = new EnvelopeBuffer(buffer) + res.setStreamId(streamId) + res + } + } + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala index 21711fe3ef..77acae3781 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeathWatchSpec.scala @@ -25,7 +25,15 @@ object RemoteDeathWatchSpec { /watchers.remote = "akka://other@localhost:$otherPort" } } - remote.watch-failure-detector.acceptable-heartbeat-pause = 3s + test.filter-leeway = 10s + remote.watch-failure-detector.acceptable-heartbeat-pause = 2s + + # reduce handshake timeout for quicker test of unknownhost, but + # must still be longer than failure detection + remote.artery.advanced { + handshake-timeout = 10 s + image-liveness-timeout = 9 seconds + } } """).withFallback(ArterySpecSupport.defaultConfig) } @@ -55,6 +63,7 @@ class RemoteDeathWatchSpec extends ArteryMultiNodeSpec(RemoteDeathWatchSpec.conf system.actorOf(Props(new Actor { context.watch(ref) + def receive = { case Terminated(r) ⇒ testActor ! r } @@ -78,6 +87,10 @@ class RemoteDeathWatchSpec extends ArteryMultiNodeSpec(RemoteDeathWatchSpec.conf } "receive ActorIdentity(None) when identified node is unknown host" in { + // TODO There is a timing difference between Aeron and TCP. AeronSink will throw exception + // immediately in constructor from aeron.addPublication when UnknownHostException. That will trigger + // this immediately. With TCP it will trigger after handshake timeout. Can we see the UnknownHostException + // reason somehow and fail the stream immediately for that case? val path = RootActorPath(Address("akka", system.name, "unknownhost2", 2552)) / "user" / "subject" system.actorSelection(path) ! Identify(path) expectMsg(60.seconds, ActorIdentity(path, None)) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 730ae550a0..43b82e29a9 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -22,6 +22,20 @@ class ArteryUpdSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsi akka.remote.artery.advanced.inbound-lanes = 3 """).withFallback(ArterySpecSupport.defaultConfig)) +class ArteryTcpSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec( + ConfigFactory.parseString(""" + akka.remote.artery.transport = tcp + akka.remote.artery.advanced.outbound-lanes = 1 + akka.remote.artery.advanced.inbound-lanes = 1 + """).withFallback(ArterySpecSupport.defaultConfig)) + +class ArteryTcpSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec( + ConfigFactory.parseString(""" + akka.remote.artery.transport = tcp + akka.remote.artery.advanced.outbound-lanes = 3 + akka.remote.artery.advanced.inbound-lanes = 3 + """).withFallback(ArterySpecSupport.defaultConfig)) + abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryMultiNodeSpec(config) with ImplicitSender { val systemB = newRemoteSystem(name = Some("systemB")) diff --git a/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala new file mode 100644 index 0000000000..5b63d7ea32 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala @@ -0,0 +1,112 @@ +/** + * Copyright (C) 2016-2018 Lightbend Inc. + */ +package akka.remote.artery +package tcp + +import scala.util.Random + +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.impl.io.ByteStringParser.ParsingException +import akka.stream.impl.io.ByteStringParser.ParsingException +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Framing.FramingException +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.util.ByteString + +class TcpFramingSpec extends AkkaSpec with ImplicitSender { + import TcpFraming.encodeFrameHeader + + private val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + private implicit val mat = ActorMaterializer(matSettings)(system) + + private val afr = IgnoreEventSink + + private val framingFlow = Flow[ByteString].via(new TcpFraming(afr)) + + private val payload5 = ByteString((1 to 5).map(_.toByte).toArray) + + private def frameBytes(numberOfFrames: Int): ByteString = + (1 to numberOfFrames).foldLeft(ByteString.empty)((acc, _) ⇒ acc ++ encodeFrameHeader(payload5.size) ++ payload5) + + private val rndSeed = System.currentTimeMillis() + private val rnd = new Random(rndSeed) + + private def rechunk(bytes: ByteString): Iterator[ByteString] = { + var remaining = bytes + new Iterator[ByteString] { + override def hasNext: Boolean = remaining.nonEmpty + + override def next(): ByteString = { + val chunkSize = rnd.nextInt(remaining.size) + 1 // no 0 length frames + val chunk = remaining.take(chunkSize) + remaining = remaining.drop(chunkSize) + chunk + } + } + } + + "TcpFraming stage" must { + + "grab streamId from connection header" in { + val bytes = TcpFraming.encodeConnectionHeader(2) ++ frameBytes(1) + val frames = Source(List(bytes)).via(framingFlow).runWith(Sink.seq).futureValue + frames.head.streamId should ===(2) + } + + "grab streamId from connection header in single chunk" in { + val frames = Source(List(TcpFraming.encodeConnectionHeader(1), frameBytes(1))).via(framingFlow) + .runWith(Sink.seq).futureValue + frames.head.streamId should ===(1) + } + + "reject invalid magic" in { + val bytes = frameBytes(2) + val fail = Source(List(bytes)).via(framingFlow).runWith(Sink.seq) + .failed.futureValue + fail shouldBe a[ParsingException] + fail.getCause shouldBe a[FramingException] + } + + "include streamId in each frame" in { + val bytes = TcpFraming.encodeConnectionHeader(3) ++ frameBytes(3) + val frames = Source(List(bytes)).via(framingFlow).runWith(Sink.seq).futureValue + frames(0).streamId should ===(3) + frames(1).streamId should ===(3) + frames(2).streamId should ===(3) + } + + "parse frames from random chunks" in { + val numberOfFrames = 100 + val bytes = TcpFraming.encodeConnectionHeader(3) ++ frameBytes(numberOfFrames) + withClue(s"Random chunks seed: $rndSeed") { + val frames = Source.fromIterator(() ⇒ rechunk(bytes)).via(framingFlow).runWith(Sink.seq).futureValue + frames.size should ===(numberOfFrames) + frames.foreach { frame ⇒ + frame.byteBuffer.limit should ===(payload5.size) + val payload = new Array[Byte](frame.byteBuffer.limit) + frame.byteBuffer.get(payload) + ByteString(payload) should ===(payload5) + frame.streamId should ===(3) + } + } + } + + "report truncated frames" in { + val bytes = TcpFraming.encodeConnectionHeader(3) ++ frameBytes(3).drop(1) + Source(List(bytes)).via(framingFlow).runWith(Sink.seq) + .failed.futureValue shouldBe a[FramingException] + } + + "work with empty stream" in { + val frames = Source.empty.via(framingFlow).runWith(Sink.seq).futureValue + frames.size should ===(0) + } + + } + +}