Prepare Artery for alternative TCP transport, #24390

* Refactoring to separate the Aeron specific things, ArteryAeronUdpTransport
* move Aeron specific classes to akka.remote.artery.aeron package
* move Version to ArterySettings, and describe strategy for envelope header changes
This commit is contained in:
Patrik Nordwall 2017-10-21 13:35:05 +01:00
parent c83e4adfea
commit 0d222906f4
42 changed files with 860 additions and 597 deletions

View file

@ -127,7 +127,7 @@ class CodecBenchmark {
} else null
val envelope = new EnvelopeBuffer(envelopeTemplateBuffer)
val outboundEnvelope = OutboundEnvelope(OptionVal.None, payload, OptionVal.None)
headerIn setVersion 1
headerIn setVersion ArteryTransport.HighestVersion
headerIn setUid 42
headerIn setSenderActorRef actorOnSystemA
headerIn setRecipientActorRef remoteRefB
@ -138,13 +138,14 @@ class CodecBenchmark {
// Now build up the graphs
val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.OutboundCompressionAccess] =
Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool, false))
Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool,
envelopePool, streamId = 1, debugLogSend = false, version = ArteryTransport.HighestVersion))
val encoderInput: Flow[String, OutboundEnvelope, NotUsed] =
Flow[String].map(msg outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB)))
val compressions = new InboundCompressionsImpl(system, inboundContext, inboundContext.settings.Advanced.Compression)
val decoder: Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] =
Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem],
uniqueLocalAddress, inboundContext.settings, envelopePool, compressions, inboundEnvelopePool))
uniqueLocalAddress, inboundContext.settings, compressions, inboundEnvelopePool))
val deserializer: Flow[InboundEnvelope, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Deserializer(inboundContext, system.asInstanceOf[ExtendedActorSystem], envelopePool))
val decoderInput: Flow[String, EnvelopeBuffer, NotUsed] = Flow[String]

View file

@ -115,7 +115,7 @@ class SendQueueBenchmark {
val burstSize = 1000
val queue = new ManyToOneConcurrentArrayQueue[Int](1024)
val source = Source.fromGraph(new SendQueue[Int])
val source = Source.fromGraph(new SendQueue[Int](system.deadLetters))
val (sendQueue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both)
.toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer)

View file

@ -12,7 +12,7 @@ import scala.util.control.NonFatal
import akka.remote.RemoteSettings
import akka.remote.artery.ArterySettings
import akka.remote.artery.TaskRunner
import akka.remote.artery.aeron.TaskRunner
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import com.typesafe.config.ConfigFactory

View file

@ -478,7 +478,7 @@ phi = -log10(1 - F(timeSinceLastHeartbeat))
where F is the cumulative distribution function of a normal distribution with mean
and standard deviation estimated from historical heartbeat inter-arrival times.
In the [Remote Configuration](#remote-configuration-artery) you can adjust the `akka.remote.watch-failure-detector.threshold`
In the @ref:[Remote Configuration](#remote-configuration-artery) you can adjust the `akka.remote.watch-failure-detector.threshold`
to define when a *phi* value is considered to be a failure.
A low `threshold` is prone to generate many false positives but ensures
@ -504,7 +504,7 @@ a standard deviation of 100 ms.
To be able to survive sudden abnormalities, such as garbage collection pauses and
transient network failures the failure detector is configured with a margin,
`akka.remote.watch-failure-detector.acceptable-heartbeat-pause`. You may want to
adjust the [Remote Configuration](#remote-configuration-artery) of this depending on you environment.
adjust the @ref:[Remote Configuration](#remote-configuration-artery) of this depending on you environment.
This is how the curve looks like for `acceptable-heartbeat-pause` configured to
3 seconds.

View file

@ -512,7 +512,7 @@ To intercept generic remoting related errors, listen to `RemotingErrorEvent` whi
<a id="remote-security"></a>
## Remote Security
An `ActorSystem` should not be exposed via Akka Remote over plain TCP to an untrusted network (e.g. internet).
An `ActorSystem` should not be exposed via Akka Remote over plain TCP to an untrusted network (e.g. Internet).
It should be protected by network security, such as a firewall. If that is not considered as enough protection
[TLS with mutual authentication](#remote-tls) should be enabled.

View file

@ -2,6 +2,7 @@
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
package aeron
import java.io.File
import java.util.concurrent.atomic.AtomicInteger

View file

@ -2,6 +2,7 @@
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
package aeron
import java.io.File
import java.util.concurrent.CyclicBarrier
@ -258,7 +259,7 @@ abstract class AeronStreamLatencySpec
envelope
}
val queueValue = Source.fromGraph(new SendQueue[Unit])
val queueValue = Source.fromGraph(new SendQueue[Unit](system.deadLetters))
.via(sendFlow)
.to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
.run()

View file

@ -2,6 +2,7 @@
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
package aeron
import java.util.concurrent.Executors

View file

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.remote.artery;
package akka.remote.artery.aeron;
import akka.event.NoLogging;
import io.aeron.CncFileDescriptor;

View file

@ -7,3 +7,25 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefPr
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider#Internals.copy")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider#Internals.copy$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider#Internals.this")
# #24390 Artery TCP/TLS transport
ProblemFilters.exclude[Problem]("akka.remote.artery.ArteryTransport*")
ProblemFilters.exclude[Problem]("akka.remote.artery.HeaderBuilder*")
ProblemFilters.exclude[Problem]("akka.remote.artery.SendQueue*")
ProblemFilters.exclude[Problem]("akka.remote.artery.InboundEnvelope*")
ProblemFilters.exclude[Problem]("akka.remote.artery.Encoder*")
ProblemFilters.exclude[Problem]("akka.remote.artery.RemoteInstruments*")
ProblemFilters.exclude[Problem]("akka.remote.artery.ReusableInboundEnvelope*")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.ArterySettings#Bind.BindTimeout")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.EventSink.loFreq")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.EventSink.alert")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.FlightRecorderEvents.Transport_AeronStarted")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.Decoder.this")
ProblemFilters.exclude[Problem]("akka.remote.artery.AeronSink*")
ProblemFilters.exclude[Problem]("akka.remote.artery.AeronSource*")
ProblemFilters.exclude[Problem]("akka.remote.artery.TaskRunner*")
ProblemFilters.exclude[Problem]("akka.remote.artery.AeronErrorLog*")

View file

@ -611,7 +611,7 @@ akka {
# DEPRECATED, since 2.5.0
# The netty.udp transport is deprecated, please use Artery instead.
# See: http://doc.akka.io/docs/akka/2.4/scala/remoting-artery.html
# See: https://doc.akka.io/docs/akka/current/remoting-artery.html
netty.udp = ${akka.remote.netty.tcp}
netty.udp {
transport-protocol = udp
@ -724,7 +724,7 @@ akka {
remote {
#//#artery
### Configuration for Artery, the reimplementation of remoting
### Configuration for Artery, the new implementation of remoting
artery {
# Enable the new remoting with this flag
@ -770,7 +770,7 @@ akka {
#
hostname = ""
# Time to wait for Aeron to bind
# Time to wait for Aeron/TCP to bind
bind-timeout = 3s
}

View file

@ -11,12 +11,12 @@ import akka.event.{ EventStream, Logging, LoggingAdapter }
import akka.event.Logging.Error
import akka.serialization.{ Serialization, SerializationExtension }
import akka.pattern.pipe
import scala.util.control.NonFatal
import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone }
import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone }
import scala.util.control.Exception.Catcher
import scala.concurrent.Future
import akka.ConfigurationException
import akka.annotation.InternalApi
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
@ -24,6 +24,7 @@ import akka.remote.artery.ArteryTransport
import akka.util.OptionVal
import akka.remote.artery.OutboundEnvelope
import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope
import akka.remote.artery.aeron.ArteryAeronUdpTransport
import akka.remote.serialization.ActorRefResolveCache
import akka.remote.serialization.ActorRefResolveThreadLocalCache
@ -203,7 +204,7 @@ private[akka] class RemoteActorRefProvider(
local.registerExtraNames(Map(("remote", d)))
d
},
transport = if (remoteSettings.Artery.Enabled) new ArteryTransport(system, this) else new Remoting(system, this))
transport = if (remoteSettings.Artery.Enabled) new ArteryAeronUdpTransport(system, this) else new Remoting(system, this))
_internals = internals
remotingTerminator ! internals

View file

@ -3,26 +3,25 @@
*/
package akka.remote.artery
import akka.japi.Util.immutableSeq
import akka.ConfigurationException
import akka.event.Logging
import akka.event.Logging.LogLevel
import akka.stream.ActorMaterializerSettings
import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase }
import akka.util.WildcardIndex
import akka.NotUsed
import com.typesafe.config.{ Config, ConfigFactory }
import java.net.InetAddress
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import java.net.InetAddress
import java.nio.file.Path
import java.util.concurrent.TimeUnit
import akka.NotUsed
import akka.japi.Util.immutableSeq
import akka.stream.ActorMaterializerSettings
import akka.util.Helpers.ConfigOps
import akka.util.Helpers.Requiring
import akka.util.Helpers.toRootLowerCase
import akka.util.WildcardIndex
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
/** INTERNAL API */
private[akka] final class ArterySettings private (config: Config) {
import config._
import ArterySettings._
import config._
def withDisabledCompression(): ArterySettings =
ArterySettings(ConfigFactory.parseString(
@ -34,7 +33,7 @@ private[akka] final class ArterySettings private (config: Config) {
val Enabled: Boolean = getBoolean("enabled")
object Canonical {
val config = getConfig("canonical")
val config: Config = getConfig("canonical")
import config._
val Port: Int = getInt("port").requiring(port
@ -43,22 +42,23 @@ private[akka] final class ArterySettings private (config: Config) {
}
object Bind {
val config = getConfig("bind")
val config: Config = getConfig("bind")
import config._
val Port: Int = getString("port") match {
case "" Canonical.Port
case other getInt("port").requiring(port 0 to 65535 contains port, "bind.port must be 0 through 65535")
case "" Canonical.Port
case _ getInt("port").requiring(port 0 to 65535 contains port, "bind.port must be 0 through 65535")
}
val Hostname: String = getHostname("hostname", config) match {
case "" Canonical.Hostname
case other other
}
val BindTimeout = getDuration("bind-timeout").requiring(!_.isNegative, "bind-timeout can not be negative")
val BindTimeout: FiniteDuration = config.getMillisDuration("bind-timeout").requiring(
_ > Duration.Zero, "bind-timeout can not be negative")
}
val LargeMessageDestinations =
val LargeMessageDestinations: WildcardIndex[NotUsed] =
config.getStringList("large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry)
val segments = entry.split('/').tail
tree.insert(segments, NotUsed)
@ -71,34 +71,41 @@ private[akka] final class ArterySettings private (config: Config) {
val LogSend: Boolean = getBoolean("log-sent-messages")
val LogAeronCounters: Boolean = config.getBoolean("log-aeron-counters")
/**
* Used version of the header format for outbound messages.
* To support rolling upgrades this may be a lower version than `ArteryTransport.HighestVersion`,
* which is the highest supported version on receiving (decoding) side.
*/
val Version: Byte = ArteryTransport.HighestVersion
object Advanced {
val config = getConfig("advanced")
val config: Config = getConfig("advanced")
import config._
val TestMode: Boolean = getBoolean("test-mode")
val Dispatcher = getString("use-dispatcher")
val ControlStreamDispatcher = getString("use-control-stream-dispatcher")
val MaterializerSettings = {
val Dispatcher: String = getString("use-dispatcher")
val ControlStreamDispatcher: String = getString("use-control-stream-dispatcher")
val MaterializerSettings: ActorMaterializerSettings = {
val settings = ActorMaterializerSettings(config.getConfig("materializer"))
if (Dispatcher.isEmpty) settings
else settings.withDispatcher(Dispatcher)
}
val ControlStreamMaterializerSettings = {
val ControlStreamMaterializerSettings: ActorMaterializerSettings = {
val settings = ActorMaterializerSettings(config.getConfig("materializer"))
if (ControlStreamDispatcher.isEmpty) settings
else settings.withDispatcher(ControlStreamDispatcher)
}
val EmbeddedMediaDriver = getBoolean("embedded-media-driver")
val AeronDirectoryName = getString("aeron-dir") requiring (dir
val EmbeddedMediaDriver: Boolean = getBoolean("embedded-media-driver")
val AeronDirectoryName: String = getString("aeron-dir") requiring (dir
EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver")
val DeleteAeronDirectory = getBoolean("delete-aeron-dir")
val DeleteAeronDirectory: Boolean = getBoolean("delete-aeron-dir")
val IdleCpuLevel: Int = getInt("idle-cpu-level").requiring(level
1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10")
val OutboundLanes = getInt("outbound-lanes").requiring(n
val OutboundLanes: Int = getInt("outbound-lanes").requiring(n
n > 0, "outbound-lanes must be greater than zero")
val InboundLanes = getInt("inbound-lanes").requiring(n
val InboundLanes: Int = getInt("inbound-lanes").requiring(n
n > 0, "inbound-lanes must be greater than zero")
val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring(
_ > 0, "system-message-buffer-size must be more than zero")
@ -108,34 +115,43 @@ private[akka] final class ArterySettings private (config: Config) {
_ > 0, "outbound-control-queue-size must be more than zero")
val OutboundLargeMessageQueueSize: Int = getInt("outbound-large-message-queue-size").requiring(
_ > 0, "outbound-large-message-queue-size must be more than zero")
val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval
interval > Duration.Zero, "system-message-resend-interval must be more than zero")
val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval
val SystemMessageResendInterval: FiniteDuration =
config.getMillisDuration("system-message-resend-interval").requiring(interval
interval > Duration.Zero, "system-message-resend-interval must be more than zero")
val HandshakeTimeout: FiniteDuration = config.getMillisDuration("handshake-timeout").requiring(interval
interval > Duration.Zero, "handshake-timeout must be more than zero")
val HandshakeRetryInterval = config.getMillisDuration("handshake-retry-interval").requiring(interval
interval > Duration.Zero, "handshake-retry-interval must be more than zero")
val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval
interval > Duration.Zero, "inject-handshake-interval must be more than zero")
val GiveUpMessageAfter = config.getMillisDuration("give-up-message-after").requiring(interval
val HandshakeRetryInterval: FiniteDuration =
config.getMillisDuration("handshake-retry-interval").requiring(interval
interval > Duration.Zero, "handshake-retry-interval must be more than zero")
val InjectHandshakeInterval: FiniteDuration =
config.getMillisDuration("inject-handshake-interval").requiring(interval
interval > Duration.Zero, "inject-handshake-interval must be more than zero")
val GiveUpMessageAfter: FiniteDuration = config.getMillisDuration("give-up-message-after").requiring(interval
interval > Duration.Zero, "give-up-message-after must be more than zero")
val GiveUpSystemMessageAfter = config.getMillisDuration("give-up-system-message-after").requiring(interval
interval > Duration.Zero, "give-up-system-message-after must be more than zero")
val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval
interval > Duration.Zero, "shutdown-flush-timeout must be more than zero")
val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval
interval > Duration.Zero, "inbound-restart-timeout must be more than zero")
val InboundMaxRestarts = getInt("inbound-max-restarts")
val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout").requiring(interval
interval > Duration.Zero, "outbound-restart-timeout must be more than zero")
val OutboundMaxRestarts = getInt("outbound-max-restarts")
val StopQuarantinedAfterIdle = config.getMillisDuration("stop-quarantined-after-idle").requiring(interval
interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero")
val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout").requiring(interval
interval > Duration.Zero, "client-liveness-timeout must be more than zero")
val ImageLivenessTimeout = config.getMillisDuration("image-liveness-timeout").requiring(interval
val GiveUpSystemMessageAfter: FiniteDuration =
config.getMillisDuration("give-up-system-message-after").requiring(interval
interval > Duration.Zero, "give-up-system-message-after must be more than zero")
val ShutdownFlushTimeout: FiniteDuration =
config.getMillisDuration("shutdown-flush-timeout").requiring(interval
interval > Duration.Zero, "shutdown-flush-timeout must be more than zero")
val InboundRestartTimeout: FiniteDuration =
config.getMillisDuration("inbound-restart-timeout").requiring(interval
interval > Duration.Zero, "inbound-restart-timeout must be more than zero")
val InboundMaxRestarts: Int = getInt("inbound-max-restarts")
val OutboundRestartTimeout: FiniteDuration =
config.getMillisDuration("outbound-restart-timeout").requiring(interval
interval > Duration.Zero, "outbound-restart-timeout must be more than zero")
val OutboundMaxRestarts: Int = getInt("outbound-max-restarts")
val StopQuarantinedAfterIdle: FiniteDuration =
config.getMillisDuration("stop-quarantined-after-idle").requiring(interval
interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero")
val ClientLivenessTimeout: FiniteDuration =
config.getMillisDuration("client-liveness-timeout").requiring(interval
interval > Duration.Zero, "client-liveness-timeout must be more than zero")
val ImageLivenessTimeout: FiniteDuration = config.getMillisDuration("image-liveness-timeout").requiring(interval
interval > Duration.Zero, "image-liveness-timeout must be more than zero")
require(ImageLivenessTimeout < HandshakeTimeout, "image-liveness-timeout must be less than handshake-timeout")
val DriverTimeout = config.getMillisDuration("driver-timeout").requiring(interval
val DriverTimeout: FiniteDuration = config.getMillisDuration("driver-timeout").requiring(interval
interval > Duration.Zero, "driver-timeout must be more than zero")
val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled")
val FlightRecorderDestination: String = getString("flight-recorder.destination")
@ -164,18 +180,18 @@ private[akka] object ArterySettings {
private[akka] final val Enabled = ActorRefs.Max > 0 || Manifests.Max > 0
object ActorRefs {
val config = getConfig("actor-refs")
val config: Config = getConfig("actor-refs")
import config._
val AdvertisementInterval = config.getMillisDuration("advertisement-interval")
val Max = getInt("max")
val AdvertisementInterval: FiniteDuration = config.getMillisDuration("advertisement-interval")
val Max: Int = getInt("max")
}
object Manifests {
val config = getConfig("manifests")
val config: Config = getConfig("manifests")
import config._
val AdvertisementInterval = config.getMillisDuration("advertisement-interval")
val Max = getInt("max")
val AdvertisementInterval: FiniteDuration = config.getMillisDuration("advertisement-interval")
val Max: Int = getInt("max")
}
}
object Compression {
@ -183,9 +199,10 @@ private[akka] object ArterySettings {
final val Debug = false // unlocks additional very verbose debug logging of compression events (to stdout)
}
def getHostname(key: String, config: Config) = config.getString(key) match {
def getHostname(key: String, config: Config): String = config.getString(key) match {
case "<getHostAddress>" InetAddress.getLocalHost.getHostAddress
case "<getHostName>" InetAddress.getLocalHost.getHostName
case other other
}
}

View file

@ -3,17 +3,19 @@
*/
package akka.remote.artery
import java.io.File
import java.net.InetSocketAddress
import java.nio.channels.{ DatagramChannel, FileChannel }
import java.nio.channels.DatagramChannel
import java.nio.channels.FileChannel
import java.nio.channels.ServerSocketChannel
import java.nio.file.Path
import java.util.UUID
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
@ -23,21 +25,26 @@ import scala.util.control.NonFatal
import akka.Done
import akka.NotUsed
import akka.actor._
import akka.actor.Actor
import akka.actor.Cancellable
import akka.actor.Props
import akka.actor._
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.remote._
import akka.remote.artery.AeronSource.ResourceLifecycle
import akka.remote.AddressUidExtension
import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteTransport
import akka.remote.ThisActorSystemQuarantinedEvent
import akka.remote.UniqueAddress
import akka.remote.artery.ArteryTransport.ShuttingDown
import akka.remote.artery.Decoder.InboundCompressionAccess
import akka.remote.artery.Encoder.OutboundCompressionAccess
import akka.remote.artery.InboundControlJunction.ControlMessageObserver
import akka.remote.artery.InboundControlJunction.ControlMessageSubject
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import akka.remote.artery.compress._
import akka.remote.artery.compress.CompressionProtocol.CompressionMessage
import akka.remote.artery.compress._
import akka.remote.artery.aeron.AeronSource
import akka.remote.transport.ThrottlerTransportAdapter.Blackhole
import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle
import akka.remote.transport.ThrottlerTransportAdapter.Unthrottled
@ -49,20 +56,8 @@ import akka.stream.SharedKillSwitch
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.OptionVal
import akka.util.WildcardIndex
import io.aeron._
import io.aeron.driver.MediaDriver
import io.aeron.driver.ThreadingMode
import io.aeron.exceptions.ConductorServiceTimeoutException
import io.aeron.exceptions.DriverTimeoutException
import org.agrona.{ DirectBuffer, ErrorHandler, IoUtil }
import org.agrona.concurrent.BackoffIdleStrategy
import akka.remote.artery.Decoder.InboundCompressionAccess
import io.aeron.status.ChannelEndpointStatus
import org.agrona.collections.IntObjConsumer
import org.agrona.concurrent.status.CountersReader.MetaData
/**
* INTERNAL API
@ -172,7 +167,7 @@ private[remote] final class AssociationState(
override def toString(): String = {
val a = uniqueRemoteAddressPromise.future.value match {
case Some(Success(a)) a
case Some(Failure(e)) s"Failure(${e.getMessage})"
case Some(Failure(e)) s"Failure($e)"
case None "unknown"
}
s"AssociationState($incarnation, $a)"
@ -285,26 +280,19 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati
/**
* INTERNAL API
*/
private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
extends RemoteTransport(_system, _provider) with InboundContext {
import ArteryTransport.AeronTerminated
import ArteryTransport.ShutdownSignal
import ArteryTransport.InboundStreamMatValues
import ArteryTransport._
import FlightRecorderEvents._
// these vars are initialized once in the start method
@volatile private[this] var _localAddress: UniqueAddress = _
@volatile private[this] var _bindAddress: UniqueAddress = _
@volatile private[this] var _addresses: Set[Address] = _
@volatile private[this] var materializer: Materializer = _
@volatile private[this] var controlMaterializer: Materializer = _
@volatile protected var materializer: Materializer = _
@volatile protected var controlMaterializer: Materializer = _
@volatile private[this] var controlSubject: ControlMessageSubject = _
@volatile private[this] var messageDispatcher: MessageDispatcher = _
private[this] val mediaDriver = new AtomicReference[Option[MediaDriver]](None)
@volatile private[this] var aeron: Aeron = _
@volatile private[this] var aeronErrorLogTask: Cancellable = _
@volatile private[this] var aeronCounterTask: Cancellable = _
@volatile private[this] var areonErrorLog: AeronErrorLog = _
override val log: LoggingAdapter = Logging(system, getClass.getName)
@ -319,7 +307,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
*
* Use `inboundCompressionAccess` (provided by the materialized `Decoder`) to call into the compression infrastructure.
*/
private[this] val _inboundCompressions = {
protected val _inboundCompressions = {
if (settings.Advanced.Compression.Enabled) {
val eventSink = createFlightRecorderEventSink(synchr = false)
new InboundCompressionsImpl(system, this, settings.Advanced.Compression, eventSink)
@ -329,6 +317,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
@volatile private[this] var _inboundCompressionAccess: OptionVal[InboundCompressionAccess] = OptionVal.None
/** Only access compression tables via the CompressionAccess */
def inboundCompressionAccess: OptionVal[InboundCompressionAccess] = _inboundCompressionAccess
protected def setInboundCompressionAccess(a: InboundCompressionAccess): Unit =
_inboundCompressionAccess = OptionVal(a)
def bindAddress: UniqueAddress = _bindAddress
override def localAddress: UniqueAddress = _localAddress
@ -336,17 +326,16 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
override def addresses: Set[Address] = _addresses
override def localAddressForRemote(remote: Address): Address = defaultAddress
private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
protected val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch")
// keyed by the streamId
private[this] val streamMatValues = new AtomicReference(Map.empty[Int, InboundStreamMatValues])
protected val streamMatValues = new AtomicReference(Map.empty[Int, InboundStreamMatValues])
private[this] val hasBeenShutdown = new AtomicBoolean(false)
private val testState = new SharedTestState
private val inboundLanes = settings.Advanced.InboundLanes
protected val inboundLanes = settings.Advanced.InboundLanes
// TODO use WildcardIndex.isEmpty when merged from master
val largeMessageChannelEnabled: Boolean =
!settings.LargeMessageDestinations.wildcardTree.isEmpty ||
!settings.LargeMessageDestinations.doubleWildcardTree.isEmpty
@ -361,26 +350,21 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
.insert(Array("system", "cluster", "core", "daemon", "crossDcHeartbeatSender"), NotUsed)
.insert(Array("system", "cluster", "heartbeatReceiver"), NotUsed)
private def inboundChannel = s"aeron:udp?endpoint=${_bindAddress.address.host.get}:${_bindAddress.address.port.get}"
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
private val controlStreamId = 1
private val ordinaryStreamId = 2
private val largeStreamId = 3
private val taskRunner = new TaskRunner(system, settings.Advanced.IdleCpuLevel)
private val restartCounter = new RestartCounter(settings.Advanced.InboundMaxRestarts, settings.Advanced.InboundRestartTimeout)
private val envelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumFrameSize, settings.Advanced.BufferPoolSize)
private val largeEnvelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.LargeBufferPoolSize)
protected val envelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumFrameSize, settings.Advanced.BufferPoolSize)
protected val largeEnvelopeBufferPool =
if (largeMessageChannelEnabled)
new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.LargeBufferPoolSize)
else // not used
new EnvelopeBufferPool(0, 2)
private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16)
// The outboundEnvelopePool is shared among all outbound associations
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity =
settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3)
private val topLevelFREvents =
protected val topLevelFREvents =
createFlightRecorderEventSink(synchr = true)
def createFlightRecorderEventSink(synchr: Boolean = false): EventSink = {
@ -411,16 +395,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
if (system.settings.JvmShutdownHooks)
Runtime.getRuntime.addShutdownHook(shutdownHook)
startMediaDriver()
startAeron()
topLevelFREvents.loFreq(Transport_AeronStarted, NoMetaData)
startAeronErrorLog()
topLevelFREvents.loFreq(Transport_AeronErrorLogStarted, NoMetaData)
if (settings.LogAeronCounters) {
startAeronCounterLog()
}
taskRunner.start()
topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData)
startTransport()
topLevelFREvents.loFreq(Transport_Started, NoMetaData)
val port =
if (settings.Canonical.Port == 0) {
@ -443,7 +419,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
AddressUidExtension(system).longAddressUid)
// TODO: This probably needs to be a global value instead of an event as events might rotate out of the log
topLevelFREvents.loFreq(Transport_UniqueAddressSet, _localAddress.toString().getBytes("US-ASCII"))
topLevelFREvents.loFreq(Transport_UniqueAddressSet, _localAddress.toString())
materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system)
controlMaterializer = ActorMaterializer.systemMaterializer(
@ -454,12 +430,32 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData)
runInboundStreams()
blockUntilChannelActive()
topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData)
log.info("Remoting started; listening on address: [{}] with UID [{}]", localAddress.address, localAddress.uid)
}
protected def startTransport(): Unit
protected def runInboundStreams(): Unit
// Select inbound lane based on destination to preserve message order,
// Also include the uid of the sending system in the hash to spread
// "hot" destinations, e.g. ActorSelection anchor.
protected val inboundLanePartitioner: InboundEnvelope Int = env {
env.recipient match {
case OptionVal.Some(r)
val a = r.path.uid
val b = env.originUid
val hashA = 23 + a
val hash: Int = 23 * hashA + java.lang.Long.hashCode(b)
math.abs(hash) % inboundLanes
case OptionVal.None
// the lane is set by the DuplicateHandshakeReq stage, otherwise 0
env.lane
}
}
private lazy val shutdownHook = new Thread {
override def run(): Unit = {
if (!hasBeenShutdown.get) {
@ -479,229 +475,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
}
private def startMediaDriver(): Unit = {
if (settings.Advanced.EmbeddedMediaDriver) {
val driverContext = new MediaDriver.Context
if (settings.Advanced.AeronDirectoryName.nonEmpty) {
driverContext.aeronDirectoryName(settings.Advanced.AeronDirectoryName)
} else {
// create a random name but include the actor system name for easier debugging
val uniquePart = UUID.randomUUID().toString
val randomName = s"${CommonContext.AERON_DIR_PROP_DEFAULT}-${system.name}-$uniquePart"
driverContext.aeronDirectoryName(randomName)
}
driverContext.clientLivenessTimeoutNs(settings.Advanced.ClientLivenessTimeout.toNanos)
driverContext.imageLivenessTimeoutNs(settings.Advanced.ImageLivenessTimeout.toNanos)
driverContext.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis)
val idleCpuLevel = settings.Advanced.IdleCpuLevel
if (idleCpuLevel == 10) {
driverContext
.threadingMode(ThreadingMode.DEDICATED)
.conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1))
.receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
.senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else if (idleCpuLevel == 1) {
driverContext
.threadingMode(ThreadingMode.SHARED)
.sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else if (idleCpuLevel <= 7) {
driverContext
.threadingMode(ThreadingMode.SHARED_NETWORK)
.sharedNetworkIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else {
driverContext
.threadingMode(ThreadingMode.DEDICATED)
.receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
.senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
}
val driver = MediaDriver.launchEmbedded(driverContext)
log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName)
topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName().getBytes("US-ASCII"))
if (!mediaDriver.compareAndSet(None, Some(driver))) {
throw new IllegalStateException("media driver started more than once")
}
}
}
private def aeronDir: String = mediaDriver.get match {
case Some(driver) driver.aeronDirectoryName
case None settings.Advanced.AeronDirectoryName
}
private def stopMediaDriver(): Unit = {
// make sure we only close the driver once or we will crash the JVM
val maybeDriver = mediaDriver.getAndSet(None)
maybeDriver.foreach { driver
// this is only for embedded media driver
try driver.close() catch {
case NonFatal(e)
// don't think driver.close will ever throw, but just in case
log.warning("Couldn't close Aeron embedded media driver due to [{}]", e.getMessage)
}
try {
if (settings.Advanced.DeleteAeronDirectory) {
IoUtil.delete(new File(driver.aeronDirectoryName), false)
topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData)
}
} catch {
case NonFatal(e)
log.warning(
"Couldn't delete Aeron embedded media driver files in [{}] due to [{}]",
driver.aeronDirectoryName, e.getMessage)
}
}
}
// TODO: Add FR events
private def startAeron(): Unit = {
val ctx = new Aeron.Context
ctx.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis)
ctx.availableImageHandler(new AvailableImageHandler {
override def onAvailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
ctx.unavailableImageHandler(new UnavailableImageHandler {
override def onUnavailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}")
// freeSessionBuffer in AeronSource FragmentAssembler
streamMatValues.get.valuesIterator.foreach {
case InboundStreamMatValues(resourceLife, _) resourceLife.onUnavailableImage(img.sessionId)
}
}
})
ctx.errorHandler(new ErrorHandler {
private val fatalErrorOccured = new AtomicBoolean
override def onError(cause: Throwable): Unit = {
cause match {
case e: ConductorServiceTimeoutException handleFatalError(e)
case e: DriverTimeoutException handleFatalError(e)
case _: AeronTerminated // already handled, via handleFatalError
case _
log.error(cause, s"Aeron error, ${cause.getMessage}")
}
}
private def handleFatalError(cause: Throwable): Unit = {
if (fatalErrorOccured.compareAndSet(false, true)) {
if (!isShutdown) {
log.error(cause, "Fatal Aeron error {}. Have to terminate ActorSystem because it lost contact with the " +
"{} Aeron media driver. Possible configuration properties to mitigate the problem are " +
"'client-liveness-timeout' or 'driver-timeout'. {}",
Logging.simpleName(cause),
if (settings.Advanced.EmbeddedMediaDriver) "embedded" else "external",
cause.getMessage)
taskRunner.stop()
aeronErrorLogTask.cancel()
if (settings.LogAeronCounters) aeronCounterTask.cancel()
system.terminate()
throw new AeronTerminated(cause)
}
} else
throw new AeronTerminated(cause)
}
})
ctx.aeronDirectoryName(aeronDir)
aeron = Aeron.connect(ctx)
}
private def blockUntilChannelActive(): Unit = {
val counterIdForInboundChannel = findCounterId(s"rcv-channel: $inboundChannel")
val waitInterval = 200
val retries = math.max(1, settings.Bind.BindTimeout.toMillis / waitInterval)
retry(retries)
@tailrec def retry(retries: Long): Unit = {
val status = aeron.countersReader().getCounterValue(counterIdForInboundChannel)
if (status == ChannelEndpointStatus.ACTIVE) {
log.debug("Inbound channel is now active")
} else if (status == ChannelEndpointStatus.ERRORED) {
areonErrorLog.logErrors(log, 0L)
stopMediaDriver()
throw new RemoteTransportException("Inbound Aeron channel is in errored state. See Aeron logs for details.")
} else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) {
Thread.sleep(waitInterval)
retry(retries - 1)
} else {
areonErrorLog.logErrors(log, 0L)
stopMediaDriver()
throw new RemoteTransportException("Timed out waiting for Aeron transport to bind. See Aeoron logs.")
}
}
}
private def findCounterId(label: String): Int = {
var counterId = -1
aeron.countersReader().forEach(new IntObjConsumer[String] {
def accept(i: Int, l: String): Unit = {
if (label == l)
counterId = i
}
})
if (counterId == -1) {
throw new RuntimeException(s"Unable to found counterId for label: $label")
} else {
counterId
}
}
// TODO Add FR Events
private def startAeronErrorLog(): Unit = {
areonErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE), log)
val lastTimestamp = new AtomicLong(0L)
import system.dispatcher
aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) {
if (!isShutdown) {
val newLastTimestamp = areonErrorLog.logErrors(log, lastTimestamp.get)
lastTimestamp.set(newLastTimestamp + 1)
}
}
}
private def startAeronCounterLog(): Unit = {
import system.dispatcher
aeronCounterTask = system.scheduler.schedule(5.seconds, 5.seconds) {
if (!isShutdown && log.isDebugEnabled) {
aeron.countersReader.forEach(new MetaData() {
def accept(counterId: Int, typeId: Int, keyBuffer: DirectBuffer, label: String): Unit = {
val value = aeron.countersReader().getCounterValue(counterId)
log.debug("Aeron Counter {}: {} {}]", counterId, value, label)
}
})
}
}
}
private def runInboundStreams(): Unit = {
runInboundControlStream()
runInboundOrdinaryMessagesStream()
if (largeMessageChannelEnabled) {
runInboundLargeMessagesStream()
}
}
private def runInboundControlStream(): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, ctrl, completed) =
aeronSource(controlStreamId, envelopeBufferPool)
.via(inboundFlow(settings, NoInboundCompressions))
.toMat(inboundControlSink)({ case (a, (c, d)) (a, c, d) })
.run()(controlMaterializer)
protected def attachControlMessageObserver(ctrl: ControlMessageSubject): Unit = {
controlSubject = ctrl
controlSubject.attach(new ControlMessageObserver {
override def notify(inboundEnvelope: InboundEnvelope): Unit = {
try {
@ -776,87 +551,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
})
updateStreamMatValues(controlStreamId, resourceLife, completed)
attachStreamRestart("Inbound control stream", completed, () runInboundControlStream())
}
private def runInboundOrdinaryMessagesStream(): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, inboundCompressionAccesses, completed) =
if (inboundLanes == 1) {
aeronSource(ordinaryStreamId, envelopeBufferPool)
.viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both)
.toMat(inboundSink(envelopeBufferPool))({ case ((a, b), c) (a, b, c) })
.run()(materializer)
} else {
val hubKillSwitch = KillSwitches.shared("hubKillSwitch")
val source: Source[InboundEnvelope, (ResourceLifecycle, InboundCompressionAccess)] =
aeronSource(ordinaryStreamId, envelopeBufferPool)
.via(hubKillSwitch.flow)
.viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both)
.via(Flow.fromGraph(new DuplicateHandshakeReq(inboundLanes, this, system, envelopeBufferPool)))
// Select lane based on destination to preserve message order,
// Also include the uid of the sending system in the hash to spread
// "hot" destinations, e.g. ActorSelection anchor.
val partitioner: InboundEnvelope Int = env {
env.recipient match {
case OptionVal.Some(r)
val a = r.path.uid
val b = env.originUid
val hashA = 23 + a
val hash: Int = 23 * hashA + java.lang.Long.hashCode(b)
math.abs(hash) % inboundLanes
case OptionVal.None
// the lane is set by the DuplicateHandshakeReq stage, otherwise 0
env.lane
}
}
val (resourceLife, compressionAccess, hub) =
source
.toMat(Sink.fromGraph(new FixedSizePartitionHub[InboundEnvelope](partitioner, inboundLanes,
settings.Advanced.InboundHubBufferSize)))({ case ((a, b), c) (a, b, c) })
.run()(materializer)
val lane = inboundSink(envelopeBufferPool)
val completedValues: Vector[Future[Done]] =
(0 until inboundLanes).map { _
hub.toMat(lane)(Keep.right).run()(materializer)
}(collection.breakOut)
import system.dispatcher
// tear down the upstream hub part if downstream lane fails
// lanes are not completed with success by themselves so we don't have to care about onSuccess
Future.firstCompletedOf(completedValues).failed.foreach { reason hubKillSwitch.abort(reason) }
val allCompleted = Future.sequence(completedValues).map(_ Done)
(resourceLife, compressionAccess, allCompleted)
}
_inboundCompressionAccess = OptionVal(inboundCompressionAccesses)
updateStreamMatValues(ordinaryStreamId, resourceLife, completed)
attachStreamRestart("Inbound message stream", completed, () runInboundOrdinaryMessagesStream())
}
private def runInboundLargeMessagesStream(): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, completed) = aeronSource(largeStreamId, largeEnvelopeBufferPool)
.via(inboundLargeFlow(settings))
.toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both)
.run()(materializer)
updateStreamMatValues(largeStreamId, resourceLife, completed)
attachStreamRestart("Inbound large message stream", completed, () runInboundLargeMessagesStream())
}
private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () Unit): Unit = {
protected def attachInboundStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () Unit): Unit = {
implicit val ec = materializer.executionContext
streamCompleted.failed.foreach {
case ShutdownSignal // shutdown as expected
@ -904,26 +601,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
killSwitch.abort(ShutdownSignal)
topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData)
for {
_ streamsCompleted
_ taskRunner.stop()
_ streamsCompleted.recover { case _ Done }
_ shutdownTransport().recover { case _ Done }
} yield {
topLevelFREvents.loFreq(Transport_Stopped, NoMetaData)
// no need to explicitly shut down the contained access since it's lifecycle is bound to the Decoder
_inboundCompressionAccess = OptionVal.None
if (aeronErrorLogTask != null) {
aeronErrorLogTask.cancel()
topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData)
}
if (aeron != null) aeron.close()
if (areonErrorLog != null) areonErrorLog.close()
if (mediaDriver.get.isDefined) {
stopMediaDriver()
}
topLevelFREvents.loFreq(Transport_FlightRecorderClose, NoMetaData)
flightRecorder.foreach(_.close())
afrFileChannel.foreach(_.force(true))
afrFileChannel.foreach(_.close())
@ -931,12 +615,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
}
private def updateStreamMatValues(streamId: Int, aeronSourceLifecycle: AeronSource.ResourceLifecycle, completed: Future[Done]): Unit = {
implicit val ec = materializer.executionContext
updateStreamMatValues(streamId, InboundStreamMatValues(aeronSourceLifecycle, completed.recover { case _ Done }))
}
protected def shutdownTransport(): Future[Done]
@tailrec private def updateStreamMatValues(streamId: Int, values: InboundStreamMatValues): Unit = {
@tailrec final protected def updateStreamMatValues(streamId: Int, values: InboundStreamMatValues): Unit = {
val prev = streamMatValues.get()
if (!streamMatValues.compareAndSet(prev, prev + (streamId values))) {
updateStreamMatValues(streamId, values)
@ -1024,43 +705,36 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] =
createOutboundSink(largeStreamId, outboundContext, largeEnvelopeBufferPool)
createOutboundSink(LargeStreamId, outboundContext, largeEnvelopeBufferPool)
.mapMaterializedValue { case (_, d) d }
def outbound(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundCompressionAccess, Future[Done])] =
createOutboundSink(ordinaryStreamId, outboundContext, envelopeBufferPool)
createOutboundSink(OrdinaryStreamId, outboundContext, envelopeBufferPool)
private def createOutboundSink(streamId: Int, outboundContext: OutboundContext,
bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (OutboundCompressionAccess, Future[Done])] = {
outboundLane(outboundContext, bufferPool)
.toMat(aeronSink(outboundContext, streamId, bufferPool))(Keep.both)
outboundLane(outboundContext, bufferPool, streamId)
.toMat(outboundTransportSink(outboundContext, streamId, bufferPool))(Keep.both)
}
def aeronSink(outboundContext: OutboundContext): Sink[EnvelopeBuffer, Future[Done]] =
aeronSink(outboundContext, ordinaryStreamId, envelopeBufferPool)
def outboundTransportSink(outboundContext: OutboundContext): Sink[EnvelopeBuffer, Future[Done]] =
outboundTransportSink(outboundContext, OrdinaryStreamId, envelopeBufferPool)
private def aeronSink(outboundContext: OutboundContext, streamId: Int,
bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = {
val giveUpAfter =
if (streamId == controlStreamId) settings.Advanced.GiveUpSystemMessageAfter
else settings.Advanced.GiveUpMessageAfter
Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner,
bufferPool, giveUpAfter, createFlightRecorderEventSink()))
}
protected def outboundTransportSink(outboundContext: OutboundContext, streamId: Int,
bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]]
def outboundLane(outboundContext: OutboundContext): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] =
outboundLane(outboundContext, envelopeBufferPool)
outboundLane(outboundContext, envelopeBufferPool, OrdinaryStreamId)
private def outboundLane(
outboundContext: OutboundContext,
bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] = {
bufferPool: EnvelopeBufferPool, streamId: Int): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] = {
Flow.fromGraph(killSwitch.flow[OutboundEnvelope])
.via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout,
settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval))
.viaMat(createEncoder(bufferPool))(Keep.right)
.viaMat(createEncoder(bufferPool, streamId))(Keep.right)
}
def outboundControl(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = {
@ -1073,24 +747,21 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// note that System messages must not be dropped before the SystemMessageDelivery stage
.via(outboundTestFlow(outboundContext))
.viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right)
.via(createEncoder(envelopeBufferPool))
.toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner,
envelopeBufferPool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both)
.via(createEncoder(envelopeBufferPool, ControlStreamId))
.toMat(outboundTransportSink(outboundContext, ControlStreamId, envelopeBufferPool))(Keep.both)
// TODO we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages
}
def createEncoder(pool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] =
Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool, settings.LogSend))
def createEncoder(pool: EnvelopeBufferPool, streamId: Int): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] =
Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool, streamId, settings.LogSend,
settings.Version))
def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, AeronSource.ResourceLifecycle] =
Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool,
createFlightRecorderEventSink(), aeronSourceSpinningStrategy))
def createDecoder(settings: ArterySettings, compressions: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] =
Flow.fromGraph(new Decoder(this, system, localAddress, settings, compressions, inboundEnvelopePool))
private def aeronSourceSpinningStrategy: Int =
if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365
settings.Advanced.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels
else 50 * settings.Advanced.IdleCpuLevel - 240
def createDeserializer(bufferPool: EnvelopeBufferPool): Flow[InboundEnvelope, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Deserializer(this, system, bufferPool))
val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m
messageDispatcher.dispatch(m)
@ -1100,12 +771,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
}
def createDecoder(settings: ArterySettings, compressions: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] =
Flow.fromGraph(new Decoder(this, system, localAddress, settings, bufferPool, compressions, inboundEnvelopePool))
def createDeserializer(bufferPool: EnvelopeBufferPool): Flow[InboundEnvelope, InboundEnvelope, NotUsed] =
Flow.fromGraph(new Deserializer(this, system, bufferPool))
// Checks for termination hint messages and sends an ACK for those (not processing them further)
// Purpose of this stage is flushing, the sender can wait for the ACKs up to try flushing
// pending messages.
@ -1135,15 +800,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
def inboundFlow(settings: ArterySettings, compressions: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] = {
Flow[EnvelopeBuffer]
.via(killSwitch.flow)
.viaMat(createDecoder(settings, compressions, envelopeBufferPool))(Keep.right)
.viaMat(createDecoder(settings, compressions))(Keep.right)
}
// large messages flow does not use compressions, since the message size dominates the size anyway
def inboundLargeFlow(settings: ArterySettings): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = {
Flow[EnvelopeBuffer]
.via(killSwitch.flow)
.via(createDecoder(settings, NoInboundCompressions, largeEnvelopeBufferPool))
}
def inboundLargeFlow(settings: ArterySettings): Flow[EnvelopeBuffer, InboundEnvelope, Any] =
inboundFlow(settings, NoInboundCompressions)
def inboundControlSink: Sink[InboundEnvelope, (ControlMessageSubject, Future[Done])] = {
Flow[InboundEnvelope]
@ -1191,7 +853,11 @@ private[remote] object ArteryTransport {
val ProtocolName = "akka"
val Version: Byte = 0
// Note that the used version of the header format for outbound messages is defined in
// `ArterySettings.Version` because that may depend on configuration settings.
// This is the highest supported version on receiving (decoding) side.
// ArterySettings.Version can be lower than this HighestVersion to support rolling upgrades.
val HighestVersion: Byte = 0
class AeronTerminated(e: Throwable) extends RuntimeException(e)
@ -1201,7 +867,7 @@ private[remote] object ArteryTransport {
object ShuttingDown extends RuntimeException with NoStackTrace
final case class InboundStreamMatValues(
aeronSourceLifecycle: AeronSource.ResourceLifecycle,
aeronSourceLifecycle: Option[AeronSource.ResourceLifecycle],
completed: Future[Done])
def autoSelectPort(hostname: String): Int = {
@ -1212,4 +878,15 @@ private[remote] object ArteryTransport {
port
}
val ControlStreamId = 1
val OrdinaryStreamId = 2
val LargeStreamId = 3
def streamName(streamId: Int): String =
streamId match {
case ControlStreamId "control"
case LargeStreamId "large message"
case _ "message"
}
}

View file

@ -24,7 +24,7 @@ import akka.pattern.after
import akka.remote._
import akka.remote.DaemonMsgCreate
import akka.remote.QuarantinedEvent
import akka.remote.artery.AeronSink.GaveUpMessageException
import akka.remote.artery.aeron.AeronSink.GaveUpMessageException
import akka.remote.artery.ArteryTransport.{ AeronTerminated, ShuttingDown }
import akka.remote.artery.Encoder.OutboundCompressionAccess
import akka.remote.artery.Encoder.AccessOutboundCompressionFailed
@ -516,7 +516,7 @@ private[remote] class Association(
val streamKillSwitch = KillSwitches.shared("outboundControlStreamKillSwitch")
val (queueValue, (control, completed)) =
Source.fromGraph(new SendQueue[OutboundEnvelope])
Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters))
.via(streamKillSwitch.flow)
.toMat(transport.outboundControl(this))(Keep.both)
.run()(materializer)
@ -529,7 +529,7 @@ private[remote] class Association(
materializing.countDown()
updateStreamMatValues(ControlQueueIndex, streamKillSwitch, completed)
attachStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize,
attachOutboundStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize,
completed, () runOutboundControlStream())
}
@ -555,7 +555,7 @@ private[remote] class Association(
val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch")
val (queueValue, testMgmt, changeCompression, completed) =
Source.fromGraph(new SendQueue[OutboundEnvelope])
Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters))
.via(streamKillSwitch.flow)
.viaMat(transport.outboundTestFlow(this))(Keep.both)
.toMat(transport.outbound(this))({ case ((a, b), (c, d)) (a, b, c, d) }) // "keep all, exploded"
@ -568,7 +568,7 @@ private[remote] class Association(
outboundCompressionAccess = Vector(changeCompression)
updateStreamMatValues(OrdinaryQueueIndex, streamKillSwitch, completed)
attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
attachOutboundStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
completed, () runOutboundOrdinaryMessagesStream())
} else {
@ -582,7 +582,7 @@ private[remote] class Association(
val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch")
val lane = Source.fromGraph(new SendQueue[OutboundEnvelope])
val lane = Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters))
.via(streamKillSwitch.flow)
.via(transport.outboundTestFlow(this))
.viaMat(transport.outboundLane(this))(Keep.both)
@ -593,9 +593,9 @@ private[remote] class Association(
case ((q, c), w) (q, c, w)
}
val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer]
val (mergeHub, transportSinkCompleted) = MergeHub.source[EnvelopeBuffer]
.via(streamKillSwitch.flow)
.toMat(transport.aeronSink(this))(Keep.both).run()(materializer)
.toMat(transport.outboundTransportSink(this))(Keep.both).run()(materializer)
val values: Vector[(SendQueue.QueueValue[OutboundEnvelope], Encoder.OutboundCompressionAccess, Future[Done])] =
(0 until outboundLanes).map { _
@ -610,9 +610,9 @@ private[remote] class Association(
Future.firstCompletedOf(laneCompletedValues).failed.foreach { reason
streamKillSwitch.abort(reason)
}
(laneCompletedValues :+ aeronSinkCompleted).foreach(_.foreach { _ streamKillSwitch.shutdown() })
(laneCompletedValues :+ transportSinkCompleted).foreach(_.foreach { _ streamKillSwitch.shutdown() })
val allCompleted = Future.sequence(laneCompletedValues).flatMap(_ aeronSinkCompleted)
val allCompleted = Future.sequence(laneCompletedValues).flatMap(_ transportSinkCompleted)
queueValues.zip(wrappers).zipWithIndex.foreach {
case ((q, w), i)
@ -623,7 +623,7 @@ private[remote] class Association(
outboundCompressionAccess = compressionAccessValues
attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
attachOutboundStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize,
allCompleted, () runOutboundOrdinaryMessagesStream())
}
}
@ -637,7 +637,7 @@ private[remote] class Association(
val streamKillSwitch = KillSwitches.shared("outboundLargeMessagesKillSwitch")
val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope])
val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters))
.via(streamKillSwitch.flow)
.via(transport.outboundTestFlow(this))
.toMat(transport.outboundLarge(this))(Keep.both)
@ -649,12 +649,12 @@ private[remote] class Association(
queuesVisibility = true // volatile write for visibility of the queues array
updateStreamMatValues(LargeQueueIndex, streamKillSwitch, completed)
attachStreamRestart("Outbound large message stream", LargeQueueIndex, largeQueueSize,
attachOutboundStreamRestart("Outbound large message stream", LargeQueueIndex, largeQueueSize,
completed, () runOutboundLargeMessagesStream())
}
private def attachStreamRestart(streamName: String, queueIndex: Int, queueCapacity: Int,
streamCompleted: Future[Done], restart: () Unit): Unit = {
private def attachOutboundStreamRestart(streamName: String, queueIndex: Int, queueCapacity: Int,
streamCompleted: Future[Done], restart: () Unit): Unit = {
def lazyRestart(): Unit = {
outboundCompressionAccess = Vector.empty
@ -668,6 +668,11 @@ private[remote] class Association(
}
implicit val ec = materializer.executionContext
streamCompleted.foreach { _
// shutdown as expected
// countDown the latch in case threads are waiting on the latch in outboundControlIngress method
materializing.countDown()
}
streamCompleted.failed.foreach {
case ArteryTransport.ShutdownSignal
// shutdown as expected
@ -692,7 +697,9 @@ private[remote] class Association(
if (queueIndex == ControlQueueIndex) {
cause match {
case _: HandshakeTimeoutException // ok, quarantine not possible without UID
case _ quarantine("Outbound control stream restarted")
case _
// FIXME can we avoid quarantine if all system messages have been delivered?
quarantine("Outbound control stream restarted")
}
}

View file

@ -21,6 +21,8 @@ import akka.util.{ OptionVal, Unsafe }
import scala.concurrent.duration._
import scala.concurrent.{ Future, Promise }
import scala.util.control.NonFatal
import akka.util.ByteStringBuilder
import java.nio.ByteOrder
import akka.remote.artery.OutboundHandshake.HandshakeReq
import akka.serialization.SerializerWithStringManifest
@ -47,7 +49,9 @@ private[remote] class Encoder(
system: ExtendedActorSystem,
outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope],
bufferPool: EnvelopeBufferPool,
debugLogSend: Boolean)
streamId: Int,
debugLogSend: Boolean,
version: Byte)
extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.OutboundCompressionAccess] {
import Encoder._
@ -59,7 +63,7 @@ private[remote] class Encoder(
val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with OutboundCompressionAccess {
private val headerBuilder = HeaderBuilder.out()
headerBuilder setVersion ArteryTransport.Version
headerBuilder setVersion version
headerBuilder setUid uniqueLocalAddress.uid
private val localAddress = uniqueLocalAddress.address
private val serialization = SerializationExtension(system)
@ -342,7 +346,6 @@ private[remote] class Decoder(
system: ExtendedActorSystem,
uniqueLocalAddress: UniqueAddress,
settings: ArterySettings,
bufferPool: EnvelopeBufferPool,
inboundCompressions: InboundCompressions,
inEnvelopePool: ObjectPool[ReusableInboundEnvelope])
extends GraphStageWithMaterializedValue[FlowShape[EnvelopeBuffer, InboundEnvelope], InboundCompressionAccess] {
@ -390,7 +393,7 @@ private[remote] class Decoder(
}
}
}
override def onPush(): Unit = {
override def onPush(): Unit = try {
messageCount += 1
val envelope = grab(in)
headerBuilder.resetMessageFields()
@ -409,7 +412,7 @@ private[remote] class Decoder(
} catch {
case NonFatal(e)
// probably version mismatch due to restarted system
log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e.getMessage)
log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e)
OptionVal.None
}
@ -423,14 +426,14 @@ private[remote] class Decoder(
} catch {
case NonFatal(e)
// probably version mismatch due to restarted system
log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e.getMessage)
log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e)
OptionVal.None
}
val classManifestOpt = try headerBuilder.manifest(originUid) catch {
case NonFatal(e)
// probably version mismatch due to restarted system
log.warning("Couldn't decompress manifest from originUid [{}]. {}", originUid, e.getMessage)
log.warning("Couldn't decompress manifest from originUid [{}]. {}", originUid, e)
OptionVal.None
}
@ -520,6 +523,10 @@ private[remote] class Decoder(
push(out, decoded)
}
}
} catch {
case NonFatal(e)
log.warning("Dropping message due to: {}", e)
pull(in)
}
private def resolveRecipient(path: String): OptionVal[InternalActorRef] = {
@ -638,7 +645,7 @@ private[remote] class Deserializer(
}
log.warning(
"Failed to deserialize message from [{}] with serializer id [{}] and manifest [{}]. {}",
from, envelope.serializer, envelope.classManifest, e.getMessage)
from, envelope.serializer, envelope.classManifest, e)
pull(in)
} finally {
val buf = envelope.envelopeBuffer

View file

@ -37,8 +37,10 @@ private[remote] class EnvelopeBufferPool(maximumPayload: Int, maximumBuffers: In
}
}
def release(buffer: EnvelopeBuffer) =
def release(buffer: EnvelopeBuffer) = {
// only reuse direct buffers, e.g. not those wrapping ByteString
if (buffer.byteBuffer.isDirect && !availableBuffers.offer(buffer)) buffer.tryCleanDirectByteBuffer()
}
}
@ -60,6 +62,22 @@ private[remote] object ByteFlag {
/**
* INTERNAL API
*
* The strategy if the header format must be changed in an incompatible way is:
* - In the end we only want to support one header format, the latest, but during
* a rolling upgrade period we must support two versions in at least one Akka patch
* release.
* - When supporting two version the outbound messages must still be encoded with old
* version. The Decoder on the receiving side must understand both versions.
* - Create a new copy of the header encoding/decoding logic (issue #24553: we should refactor to make that easier).
* - Bump `ArteryTransport.HighestVersion` and keep `ArterySettings.Version` as the old version.
* - Make sure `Decoder` picks the right parsing logic based on the version field in the incoming frame.
* - Release Akka, e.g. 2.5.13
* - Later, remove the old header parsing logic and bump the `ArterySettings.Version` to the same as
* `ArteryTransport.HighestVersion` again.
* - Release Akka, e.g. 2.5.14, and announce that all nodes in the cluster must first be on version
* 2.5.13 before upgrading to 2.5.14. That means that it is not supported to do a rolling upgrade
* from 2.5.12 directly to 2.5.14.
*/
private[remote] object EnvelopeBuffer {
@ -84,6 +102,7 @@ private[remote] object EnvelopeBuffer {
// EITHER metadata followed by literals directly OR literals directly in this spot.
// Mode depends on the `MetadataPresentFlag`.
val MetadataContainerAndLiteralSectionOffset = 28 // Int
}
/** INTERNAL API */
@ -111,7 +130,8 @@ private[remote] sealed trait HeaderBuilder {
def setFlags(v: Byte): Unit
def flags: Byte
def flag(byteFlag: ByteFlag): Boolean
def setFlag(byteFlag: ByteFlag, value: Boolean): Unit
def setFlag(byteFlag: ByteFlag): Unit
def clearFlag(byteFlag: ByteFlag): Unit
def inboundActorRefCompressionTableVersion: Byte
def inboundClassManifestCompressionTableVersion: Byte
@ -218,6 +238,10 @@ private[remote] final class HeaderBuilderImpl(
var _remoteInstruments: OptionVal[RemoteInstruments] = OptionVal.None
override def resetMessageFields(): Unit = {
// some fields must not be reset because they are set only once from the Encoder,
// which owns the HeaderBuilder instance. Those are never changed.
// version, uid, streamId
_flags = 0
_senderActorRef = null
_senderActorRefIdx = -1
@ -237,9 +261,10 @@ private[remote] final class HeaderBuilderImpl(
override def setFlags(v: Byte) = _flags = v
override def flags = _flags
override def flag(byteFlag: ByteFlag): Boolean = (_flags.toInt & byteFlag.mask) != 0
override def setFlag(byteFlag: ByteFlag, value: Boolean): Unit =
if (value) _flags = (flags | byteFlag.mask).toByte
else _flags = (flags & ~byteFlag.mask).toByte
override def setFlag(byteFlag: ByteFlag): Unit =
_flags = (flags | byteFlag.mask).toByte
override def clearFlag(byteFlag: ByteFlag): Unit =
_flags = (flags & ~byteFlag.mask).toByte
override def setUid(uid: Long) = _uid = uid
override def uid: Long = _uid
@ -366,6 +391,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
// Write fixed length parts
byteBuffer.put(VersionOffset, header.version)
byteBuffer.put(FlagsOffset, header.flags)
// compression table version numbers
byteBuffer.put(ActorRefCompressionTableVersionOffset, header.outboundActorRefCompression.version)
@ -380,7 +406,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
header._remoteInstruments.get.serialize(OptionVal(oe), byteBuffer)
if (byteBuffer.position() != MetadataContainerAndLiteralSectionOffset) {
// we actually wrote some metadata so update the flag field to reflect that
header.setFlag(MetadataPresentFlag, true)
header.setFlag(MetadataPresentFlag)
byteBuffer.put(FlagsOffset, header.flags)
}
}
@ -409,6 +435,12 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) {
// Read fixed length parts
header.setVersion(byteBuffer.get(VersionOffset))
if (header.version > ArteryTransport.HighestVersion)
throw new IllegalArgumentException(
s"Incompatible protocol version [${header.version}], " +
s"highest known version for this node is [${ArteryTransport.HighestVersion}]")
header.setFlags(byteBuffer.get(FlagsOffset))
// compression table versions (stored in the Tag)
header._inboundActorRefCompressionTableVersion = byteBuffer.get(ActorRefCompressionTableVersionOffset)

View file

@ -20,7 +20,9 @@ import scala.annotation.tailrec
*/
private[remote] trait EventSink {
def alert(code: Int, metadata: Array[Byte]): Unit
def alert(code: Int, metadata: String): Unit
def loFreq(code: Int, metadata: Array[Byte]): Unit
def loFreq(code: Int, metadata: String): Unit
def hiFreq(code: Long, param: Long): Unit
def flushHiFreqBatch(): Unit
@ -31,7 +33,9 @@ private[remote] trait EventSink {
*/
private[remote] object IgnoreEventSink extends EventSink {
override def alert(code: Int, metadata: Array[Byte]): Unit = ()
override def alert(code: Int, metadata: String): Unit = ()
override def loFreq(code: Int, metadata: Array[Byte]): Unit = ()
override def loFreq(code: Int, metadata: String): Unit = ()
override def flushHiFreqBatch(): Unit = ()
override def hiFreq(code: Long, param: Long): Unit = ()
}
@ -44,10 +48,18 @@ private[remote] class SynchronizedEventSink(delegate: EventSink) extends EventSi
delegate.alert(code, metadata)
}
override def alert(code: Int, metadata: String): Unit = {
alert(code, metadata.getBytes("US-ASCII"))
}
override def loFreq(code: Int, metadata: Array[Byte]): Unit = synchronized {
delegate.loFreq(code, metadata)
}
override def loFreq(code: Int, metadata: String): Unit = {
loFreq(code, metadata.getBytes("US-ASCII"))
}
override def flushHiFreqBatch(): Unit = synchronized {
delegate.flushHiFreqBatch()
}
@ -371,6 +383,10 @@ private[remote] class FlightRecorder(val fileChannel: FileChannel) extends Atomi
}
}
override def alert(code: Int, metadata: String): Unit = {
alert(code, metadata.getBytes("US-ASCII"))
}
override def loFreq(code: Int, metadata: Array[Byte]): Unit = {
val status = FlightRecorder.this.get
if (status eq Running) {
@ -380,6 +396,10 @@ private[remote] class FlightRecorder(val fileChannel: FileChannel) extends Atomi
}
}
override def loFreq(code: Int, metadata: String): Unit = {
loFreq(code, metadata.getBytes("US-ASCII"))
}
private def prepareRichRecord(recordBuffer: ByteBuffer, code: Int, metadata: Array[Byte]): Unit = {
recordBuffer.clear()
// TODO: This is a bit overkill, needs some smarter scheme later, no need to always store the wallclock

View file

@ -11,7 +11,7 @@ private[remote] object FlightRecorderEvents {
// Top level remoting events
val Transport_MediaDriverStarted = 0
val Transport_AeronStarted = 1
val Transport_Started = 1
val Transport_AeronErrorLogStarted = 2
val Transport_TaskRunnerStarted = 3
val Transport_UniqueAddressSet = 4
@ -54,7 +54,7 @@ private[remote] object FlightRecorderEvents {
// Used for presentation of the entries in the flight recorder
lazy val eventDictionary = Map(
Transport_MediaDriverStarted "Transport: Media driver started",
Transport_AeronStarted "Transport: Aeron started",
Transport_Started "Transport: started",
Transport_AeronErrorLogStarted "Transport: Aeron error log started",
Transport_TaskRunnerStarted "Transport: Task runner started",
Transport_UniqueAddressSet "Transport: Unique address set",
@ -92,7 +92,8 @@ private[remote] object FlightRecorderEvents {
Compression_CompressedManifest "Compression: Compressed manifest",
Compression_AllocatedManifestCompressionId "Compression: Allocated manifest compression id",
Compression_Inbound_RunActorRefAdvertisement "InboundCompression: Run class manifest compression advertisement",
Compression_Inbound_RunClassManifestAdvertisement "InboundCompression: Run class manifest compression advertisement")
.map { case (int, str) int.toLong str }
Compression_Inbound_RunClassManifestAdvertisement "InboundCompression: Run class manifest compression advertisement"
).map { case (int, str) int.toLong str }
}

View file

@ -67,6 +67,10 @@ private[remote] class OutboundHandshake(
private var pendingMessage: OutboundEnvelope = null
private var injectHandshakeTickScheduled = false
override def preStart(): Unit = {
scheduleOnce(HandshakeTimeout, timeout)
}
// InHandler
override def onPush(): Unit = {
if (handshakeState != Completed)
@ -96,11 +100,10 @@ private[remote] class OutboundHandshake(
case Start
val uniqueRemoteAddress = outboundContext.associationState.uniqueRemoteAddress
if (uniqueRemoteAddress.isCompleted) {
handshakeState = Completed
handshakeCompleted()
} else {
// will pull when handshake reply is received (uniqueRemoteAddress completed)
handshakeState = ReqInProgress
scheduleOnce(HandshakeTimeout, timeout)
schedulePeriodically(HandshakeRetryTick, retryInterval)
// The InboundHandshake stage will complete the uniqueRemoteAddress future

View file

@ -4,6 +4,7 @@
package akka.remote.artery
import java.util.Queue
import akka.stream.stage.GraphStage
import akka.stream.stage.OutHandler
import akka.stream.Attributes
@ -15,12 +16,15 @@ import akka.stream.stage.GraphStageWithMaterializedValue
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueueTail
import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger
import scala.annotation.tailrec
import scala.concurrent.Promise
import scala.util.Try
import scala.util.Success
import scala.util.Failure
import akka.actor.ActorRef
/**
* INTERNAL API
*/
@ -43,7 +47,7 @@ private[remote] object SendQueue {
/**
* INTERNAL API
*/
private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] {
private[remote] final class SendQueue[T](deadLetters: ActorRef) extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] {
import SendQueue._
val out: Outlet[T] = Outlet("SendQueue.out")
@ -102,8 +106,14 @@ private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue
override def postStop(): Unit = {
// TODO quarantine will currently always be done when control stream is terminated, see issue #21359
if (consumerQueue ne null)
if (consumerQueue ne null) {
var msg = consumerQueue.poll()
while (msg != null) {
deadLetters ! msg
msg = consumerQueue.poll()
}
consumerQueue.clear()
}
super.postStop()
}

View file

@ -2,6 +2,7 @@
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
package aeron
import akka.util.PrettyDuration.PrettyPrintableDuration
import java.nio.ByteBuffer
@ -200,7 +201,7 @@ private[remote] class AeronSink(
private def onGiveUp(): Unit = {
offerTaskInProgress = false
val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.")
flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.getMessage.getBytes("US-ASCII"))
flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.toString.getBytes("US-ASCII"))
completedValue = Failure(cause)
failStage(cause)
}

View file

@ -2,6 +2,7 @@
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
package aeron
import scala.annotation.tailrec
import akka.stream.Attributes
@ -117,7 +118,7 @@ private[remote] class AeronSource(
try sub.close() catch {
case e: DriverTimeoutException
// media driver was shutdown
log.debug("DriverTimeout when closing subscription. {}", e.getMessage)
log.debug("DriverTimeout when closing subscription. {}", e)
} finally
flightRecorder.loFreq(AeronSource_Stopped, channelMetadata)
}

View file

@ -0,0 +1,411 @@
/**
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
package aeron
import java.io.File
import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.Done
import akka.actor.Address
import akka.actor.Cancellable
import akka.actor.ExtendedActorSystem
import akka.event.Logging
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteTransportException
import akka.remote.artery.compress._
import akka.stream.KillSwitches
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.OptionVal
import io.aeron.Aeron
import io.aeron.AvailableImageHandler
import io.aeron.CncFileDescriptor
import io.aeron.CommonContext
import io.aeron.Image
import io.aeron.UnavailableImageHandler
import io.aeron.driver.MediaDriver
import io.aeron.driver.ThreadingMode
import io.aeron.exceptions.ConductorServiceTimeoutException
import io.aeron.exceptions.DriverTimeoutException
import io.aeron.status.ChannelEndpointStatus
import org.agrona.DirectBuffer
import org.agrona.ErrorHandler
import org.agrona.IoUtil
import org.agrona.collections.IntObjConsumer
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.status.CountersReader.MetaData
/**
* INTERNAL API
*/
private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider)
extends ArteryTransport(_system, _provider) {
import AeronSource.ResourceLifecycle
import ArteryTransport._
import Decoder.InboundCompressionAccess
import FlightRecorderEvents._
private[this] val mediaDriver = new AtomicReference[Option[MediaDriver]](None)
@volatile private[this] var aeron: Aeron = _
@volatile private[this] var aeronCounterTask: Cancellable = _
@volatile private[this] var aeronErrorLogTask: Cancellable = _
@volatile private[this] var aeronErrorLog: AeronErrorLog = _
private val taskRunner = new TaskRunner(system, settings.Advanced.IdleCpuLevel)
private def inboundChannel = s"aeron:udp?endpoint=${bindAddress.address.host.get}:${bindAddress.address.port.get}"
private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}"
override protected def startTransport(): Unit = {
startMediaDriver()
startAeron()
startAeronErrorLog()
topLevelFREvents.loFreq(Transport_AeronErrorLogStarted, NoMetaData)
if (settings.LogAeronCounters) {
startAeronCounterLog()
}
taskRunner.start()
topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData)
}
private def startMediaDriver(): Unit = {
if (settings.Advanced.EmbeddedMediaDriver) {
val driverContext = new MediaDriver.Context
if (settings.Advanced.AeronDirectoryName.nonEmpty) {
driverContext.aeronDirectoryName(settings.Advanced.AeronDirectoryName)
} else {
// create a random name but include the actor system name for easier debugging
val uniquePart = UUID.randomUUID().toString
val randomName = s"${CommonContext.AERON_DIR_PROP_DEFAULT}-${system.name}-$uniquePart"
driverContext.aeronDirectoryName(randomName)
}
driverContext.clientLivenessTimeoutNs(settings.Advanced.ClientLivenessTimeout.toNanos)
driverContext.imageLivenessTimeoutNs(settings.Advanced.ImageLivenessTimeout.toNanos)
driverContext.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis)
val idleCpuLevel = settings.Advanced.IdleCpuLevel
if (idleCpuLevel == 10) {
driverContext
.threadingMode(ThreadingMode.DEDICATED)
.conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1))
.receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
.senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else if (idleCpuLevel == 1) {
driverContext
.threadingMode(ThreadingMode.SHARED)
.sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else if (idleCpuLevel <= 7) {
driverContext
.threadingMode(ThreadingMode.SHARED_NETWORK)
.sharedNetworkIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
} else {
driverContext
.threadingMode(ThreadingMode.DEDICATED)
.receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
.senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
}
val driver = MediaDriver.launchEmbedded(driverContext)
log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName)
topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName())
if (!mediaDriver.compareAndSet(None, Some(driver))) {
throw new IllegalStateException("media driver started more than once")
}
}
}
private def aeronDir: String = mediaDriver.get match {
case Some(driver) driver.aeronDirectoryName
case None settings.Advanced.AeronDirectoryName
}
private def stopMediaDriver(): Unit = {
// make sure we only close the driver once or we will crash the JVM
val maybeDriver = mediaDriver.getAndSet(None)
maybeDriver.foreach { driver
// this is only for embedded media driver
try driver.close() catch {
case NonFatal(e)
// don't think driver.close will ever throw, but just in case
log.warning("Couldn't close Aeron embedded media driver due to [{}]", e)
}
try {
if (settings.Advanced.DeleteAeronDirectory) {
IoUtil.delete(new File(driver.aeronDirectoryName), false)
topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData)
}
} catch {
case NonFatal(e)
log.warning(
"Couldn't delete Aeron embedded media driver files in [{}] due to [{}]",
driver.aeronDirectoryName, e)
}
}
}
// TODO: Add FR events
private def startAeron(): Unit = {
val ctx = new Aeron.Context
ctx.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis)
ctx.availableImageHandler(new AvailableImageHandler {
override def onAvailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}")
}
})
ctx.unavailableImageHandler(new UnavailableImageHandler {
override def onUnavailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}")
// freeSessionBuffer in AeronSource FragmentAssembler
streamMatValues.get.valuesIterator.foreach {
case InboundStreamMatValues(resourceLife, _)
resourceLife.foreach(_.onUnavailableImage(img.sessionId))
}
}
})
ctx.errorHandler(new ErrorHandler {
private val fatalErrorOccured = new AtomicBoolean
override def onError(cause: Throwable): Unit = {
cause match {
case e: ConductorServiceTimeoutException handleFatalError(e)
case e: DriverTimeoutException handleFatalError(e)
case _: AeronTerminated // already handled, via handleFatalError
case _
log.error(cause, s"Aeron error, $cause")
}
}
private def handleFatalError(cause: Throwable): Unit = {
if (fatalErrorOccured.compareAndSet(false, true)) {
if (!isShutdown) {
log.error(cause, "Fatal Aeron error {}. Have to terminate ActorSystem because it lost contact with the " +
"{} Aeron media driver. Possible configuration properties to mitigate the problem are " +
"'client-liveness-timeout' or 'driver-timeout'. {}",
Logging.simpleName(cause),
if (settings.Advanced.EmbeddedMediaDriver) "embedded" else "external",
cause)
taskRunner.stop()
aeronErrorLogTask.cancel()
if (settings.LogAeronCounters) aeronCounterTask.cancel()
system.terminate()
throw new AeronTerminated(cause)
}
} else
throw new AeronTerminated(cause)
}
})
ctx.aeronDirectoryName(aeronDir)
aeron = Aeron.connect(ctx)
}
private def blockUntilChannelActive(): Unit = {
val counterIdForInboundChannel = findCounterId(s"rcv-channel: $inboundChannel")
val waitInterval = 200
val retries = math.max(1, settings.Bind.BindTimeout.toMillis / waitInterval)
retry(retries)
@tailrec def retry(retries: Long): Unit = {
val status = aeron.countersReader().getCounterValue(counterIdForInboundChannel)
if (status == ChannelEndpointStatus.ACTIVE) {
log.debug("Inbound channel is now active")
} else if (status == ChannelEndpointStatus.ERRORED) {
aeronErrorLog.logErrors(log, 0L)
stopMediaDriver()
throw new RemoteTransportException("Inbound Aeron channel is in errored state. See Aeron logs for details.")
} else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) {
Thread.sleep(waitInterval)
retry(retries - 1)
} else {
aeronErrorLog.logErrors(log, 0L)
stopMediaDriver()
throw new RemoteTransportException("Timed out waiting for Aeron transport to bind. See Aeoron logs.")
}
}
}
private def findCounterId(label: String): Int = {
var counterId = -1
aeron.countersReader().forEach(new IntObjConsumer[String] {
def accept(i: Int, l: String): Unit = {
if (label == l)
counterId = i
}
})
if (counterId == -1) {
throw new RuntimeException(s"Unable to found counterId for label: $label")
} else {
counterId
}
}
// TODO Add FR Events
private def startAeronErrorLog(): Unit = {
aeronErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE), log)
val lastTimestamp = new AtomicLong(0L)
import system.dispatcher
aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) {
if (!isShutdown) {
val newLastTimestamp = aeronErrorLog.logErrors(log, lastTimestamp.get)
lastTimestamp.set(newLastTimestamp + 1)
}
}
}
private def startAeronCounterLog(): Unit = {
import system.dispatcher
aeronCounterTask = system.scheduler.schedule(5.seconds, 5.seconds) {
if (!isShutdown && log.isDebugEnabled) {
aeron.countersReader.forEach(new MetaData() {
def accept(counterId: Int, typeId: Int, keyBuffer: DirectBuffer, label: String): Unit = {
val value = aeron.countersReader().getCounterValue(counterId)
log.debug("Aeron Counter {}: {} {}]", counterId, value, label)
}
})
}
}
}
override protected def outboundTransportSink(outboundContext: OutboundContext, streamId: Int,
bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = {
val giveUpAfter =
if (streamId == ControlStreamId) settings.Advanced.GiveUpSystemMessageAfter
else settings.Advanced.GiveUpMessageAfter
Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner,
bufferPool, giveUpAfter, createFlightRecorderEventSink()))
}
private def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, AeronSource.ResourceLifecycle] =
Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool,
createFlightRecorderEventSink(), aeronSourceSpinningStrategy))
private def aeronSourceSpinningStrategy: Int =
if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365
settings.Advanced.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels
else 50 * settings.Advanced.IdleCpuLevel - 240
override protected def runInboundStreams(): Unit = {
runInboundControlStream()
runInboundOrdinaryMessagesStream()
if (largeMessageChannelEnabled) {
runInboundLargeMessagesStream()
}
blockUntilChannelActive()
}
private def runInboundControlStream(): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, ctrl, completed) =
aeronSource(ControlStreamId, envelopeBufferPool)
.via(inboundFlow(settings, NoInboundCompressions))
.toMat(inboundControlSink)({ case (a, (c, d)) (a, c, d) })
.run()(controlMaterializer)
attachControlMessageObserver(ctrl)
updateStreamMatValues(ControlStreamId, resourceLife, completed)
attachInboundStreamRestart("Inbound control stream", completed, () runInboundControlStream())
}
private def runInboundOrdinaryMessagesStream(): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, inboundCompressionAccess, completed) =
if (inboundLanes == 1) {
aeronSource(OrdinaryStreamId, envelopeBufferPool)
.viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both)
.toMat(inboundSink(envelopeBufferPool))({ case ((a, b), c) (a, b, c) })
.run()(materializer)
} else {
val laneKillSwitch = KillSwitches.shared("laneKillSwitch")
val laneSource: Source[InboundEnvelope, (ResourceLifecycle, InboundCompressionAccess)] =
aeronSource(OrdinaryStreamId, envelopeBufferPool)
.via(laneKillSwitch.flow)
.viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both)
.via(Flow.fromGraph(new DuplicateHandshakeReq(inboundLanes, this, system, envelopeBufferPool)))
val (resourceLife, compressionAccess, laneHub) =
laneSource
.toMat(Sink.fromGraph(new FixedSizePartitionHub[InboundEnvelope](inboundLanePartitioner, inboundLanes,
settings.Advanced.InboundHubBufferSize)))({ case ((a, b), c) (a, b, c) })
.run()(materializer)
val lane = inboundSink(envelopeBufferPool)
val completedValues: Vector[Future[Done]] =
(0 until inboundLanes).map { _
laneHub.toMat(lane)(Keep.right).run()(materializer)
}(collection.breakOut)
import system.dispatcher
// tear down the upstream hub part if downstream lane fails
// lanes are not completed with success by themselves so we don't have to care about onSuccess
Future.firstCompletedOf(completedValues).failed.foreach { reason laneKillSwitch.abort(reason) }
val allCompleted = Future.sequence(completedValues).map(_ Done)
(resourceLife, compressionAccess, allCompleted)
}
setInboundCompressionAccess(inboundCompressionAccess)
updateStreamMatValues(OrdinaryStreamId, resourceLife, completed)
attachInboundStreamRestart("Inbound message stream", completed, () runInboundOrdinaryMessagesStream())
}
private def runInboundLargeMessagesStream(): Unit = {
if (isShutdown) throw ShuttingDown
val (resourceLife, completed) = aeronSource(LargeStreamId, largeEnvelopeBufferPool)
.via(inboundLargeFlow(settings))
.toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both)
.run()(materializer)
updateStreamMatValues(LargeStreamId, resourceLife, completed)
attachInboundStreamRestart("Inbound large message stream", completed, () runInboundLargeMessagesStream())
}
private def updateStreamMatValues(streamId: Int, aeronSourceLifecycle: AeronSource.ResourceLifecycle, completed: Future[Done]): Unit = {
implicit val ec = materializer.executionContext
updateStreamMatValues(streamId, InboundStreamMatValues(
Some(aeronSourceLifecycle),
completed.recover { case _ Done }))
}
override protected def shutdownTransport(): Future[Done] = {
import system.dispatcher
taskRunner.stop().map { _
topLevelFREvents.loFreq(Transport_Stopped, NoMetaData)
if (aeronErrorLogTask != null) {
aeronErrorLogTask.cancel()
topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData)
}
if (aeron != null) aeron.close()
if (aeronErrorLog != null) aeronErrorLog.close()
if (mediaDriver.get.isDefined) stopMediaDriver()
Done
}
}
}

View file

@ -2,6 +2,7 @@
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
package aeron
import java.util.concurrent.TimeUnit.{ MICROSECONDS, MILLISECONDS }

View file

@ -396,7 +396,10 @@ private[remote] abstract class InboundCompression[T >: Null](
case None
inboundContext.association(originUid) match {
case OptionVal.Some(association)
if (alive) {
if (association.associationState.isQuarantined(originUid)) {
// FIXME cleanup compresssion for quarantined associations, see #23967
log.debug("Ignoring {} for quarantined originUid [{}].", Logging.simpleName(tables.activeTable), originUid)
} else if (alive) {
val table = prepareCompressionAdvertisement(tables.nextTable.version)
// TODO expensive, check if building the other way wouldn't be faster?
val nextState = tables.copy(nextTable = table.invert, advertisementInProgress = Some(table))
@ -404,8 +407,9 @@ private[remote] abstract class InboundCompression[T >: Null](
alive = false // will be set to true on first incoming message
resendCount = 0
advertiseCompressionTable(association, table)
} else
} else {
log.debug("{} for originUid [{}] not changed, no need to advertise same.", Logging.simpleName(tables.activeTable), originUid)
}
case OptionVal.None
// otherwise it's too early, association not ready yet.
@ -417,18 +421,25 @@ private[remote] abstract class InboundCompression[T >: Null](
resendCount += 1
if (resendCount <= 5) {
// The ActorRefCompressionAdvertisement message is resent because it can be lost
log.debug(
"Advertisment in progress for originUid [{}] version {}, resending",
originUid, inProgress.version)
inboundContext.association(originUid) match {
case OptionVal.Some(association)
advertiseCompressionTable(association, inProgress) // resend
if (association.associationState.isQuarantined(originUid)) {
// give up
log.debug("Skipping advertisement in progress for quarantined originUid [{}].", originUid)
confirmAdvertisement(inProgress.version)
} else {
log.debug(
"Advertisement in progress for originUid [{}] version {}, resending",
originUid, inProgress.version)
advertiseCompressionTable(association, inProgress) // resend
}
case OptionVal.None
}
} else {
// give up, it might be dead
log.debug(
"Advertisment in progress for originUid [{}] version {} but no confirmation after retries.",
"Advertisement in progress for originUid [{}] version {} but no confirmation after retries.",
originUid, inProgress.version)
confirmAdvertisement(inProgress.version)
}

View file

@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.remote.artery;
package akka.remote.artery.aeron;
import java.io.File;
import java.io.PrintStream;

View file

@ -66,7 +66,8 @@ object Configuration {
rng.nextInt() // Has to work
val sRng = settings.SSLRandomNumberGenerator
rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng))
if (rng.getAlgorithm != sRng && sRng != "")
throw new NoSuchAlgorithmException(sRng)
val engine = NettySSLSupport(settings, NoMarkerLogging, isClient = true).getEngine
val gotAllSupported = enabled.toSet diff engine.getSupportedCipherSuites.toSet

View file

@ -9,6 +9,7 @@ import java.util.UUID
import akka.actor.ActorSystem
import akka.remote.RARP
import akka.testkit.AkkaSpec
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.Outcome
@ -46,7 +47,8 @@ object ArterySpecSupport {
* Artery enabled, flight recorder enabled, dynamic selection of port on localhost.
* Combine with [[FlightRecorderSpecIntegration]] or remember to delete flight recorder file if using manually
*/
def defaultConfig = newFlightRecorderConfig.withFallback(staticArteryRemotingConfig)
def defaultConfig = newFlightRecorderConfig
.withFallback(staticArteryRemotingConfig)
}

View file

@ -51,6 +51,8 @@ class EnvelopeBufferSpec extends AkkaSpec {
override def runNextClassManifestAdvertisement(): Unit = ???
}
val version = ArteryTransport.HighestVersion
"EnvelopeBuffer" must {
val headerOut = HeaderBuilder.in(TestCompressor)
val headerIn = HeaderBuilder.out()
@ -64,7 +66,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
val originUid = 1L
"be able to encode and decode headers with compressed literals" in {
headerIn setVersion 1
headerIn setVersion version
headerIn setUid 42
headerIn setSerializer 4
headerIn setRecipientActorRef minimalRef("compressable1")
@ -78,7 +80,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
envelope.byteBuffer.flip()
envelope.parseHeader(headerOut)
headerOut.version should ===(1)
headerOut.version should ===(version)
headerOut.uid should ===(42)
headerOut.inboundActorRefCompressionTableVersion should ===(28.toByte)
headerOut.inboundClassManifestCompressionTableVersion should ===(35.toByte)
@ -94,7 +96,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
val senderRef = minimalRef("uncompressable0")
val recipientRef = minimalRef("uncompressable11")
headerIn setVersion 1
headerIn setVersion version
headerIn setUid 42
headerIn setSerializer 4
headerIn setSenderActorRef senderRef
@ -113,7 +115,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
envelope.byteBuffer.flip()
envelope.parseHeader(headerOut)
headerOut.version should ===(1)
headerOut.version should ===(version)
headerOut.uid should ===(42)
headerOut.serializer should ===(4)
headerOut.senderActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable0"))
@ -126,7 +128,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
"be able to encode and decode headers with mixed literals" in {
val recipientRef = minimalRef("uncompressable1")
headerIn setVersion 1
headerIn setVersion version
headerIn setUid 42
headerIn setSerializer 4
headerIn setSenderActorRef minimalRef("reallylongcompressablestring")
@ -141,7 +143,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
envelope.byteBuffer.flip()
envelope.parseHeader(headerOut)
headerOut.version should ===(1)
headerOut.version should ===(version)
headerOut.uid should ===(42)
headerOut.serializer should ===(4)
headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring")
@ -152,7 +154,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
val senderRef = minimalRef("uncompressable0")
headerIn setVersion 3
headerIn setVersion version
headerIn setUid Long.MinValue
headerIn setSerializer -1
headerIn setSenderActorRef senderRef
@ -168,7 +170,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
envelope.byteBuffer.flip()
envelope.parseHeader(headerOut)
headerOut.version should ===(3)
headerOut.version should ===(version)
headerOut.uid should ===(Long.MinValue)
headerOut.serializer should ===(-1)
headerOut.senderActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable0"))
@ -181,7 +183,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
"be able to encode and decode headers with mixed literals and payload" in {
val payload = ByteString("Hello Artery!")
headerIn setVersion 1
headerIn setVersion version
headerIn setUid 42
headerIn setSerializer 4
headerIn setSenderActorRef minimalRef("reallylongcompressablestring")
@ -194,7 +196,7 @@ class EnvelopeBufferSpec extends AkkaSpec {
envelope.parseHeader(headerOut)
headerOut.version should ===(1)
headerOut.version should ===(version)
headerOut.uid should ===(42)
headerOut.serializer should ===(4)
headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring")

View file

@ -33,8 +33,10 @@ class HandshakeDenySpec extends ArteryMultiNodeSpec(HandshakeDenySpec.commonConf
systemB.actorOf(TestActors.echoActorProps, "echo")
EventFilter.warning(start = "Dropping Handshake Request from").intercept {
sel ! Identify(None)
expectNoMsg(3.seconds)
sel ! Identify("hi echo")
// handshake timeout and Identify message in SendQueue is sent to deadLetters,
// which generates the ActorIdentity(None)
expectMsg(5.seconds, ActorIdentity("hi echo", None))
}(systemB)
}

View file

@ -26,7 +26,6 @@ object LargeMessagesStreamSpec {
class LargeMessagesStreamSpec extends ArteryMultiNodeSpec(
"""
akka {
loglevel = ERROR
remote.artery.large-message-destinations = [ "/user/large" ]
}
""".stripMargin) {
@ -102,16 +101,23 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec(
val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large"))
val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular"))
// send a large message, as well as regular one
val remoteProbe = TestProbe()(systemA)
// send a large message, as well as some regular ones
val probeSmall = TestProbe()(systemA)
val probeLarge = TestProbe()(systemA)
val largeBytes = 2000000
largeRemote.tell(Ping(ByteString.fromArray(new Array[Byte](largeBytes))), remoteProbe.ref)
regularRemote.tell(Ping(), remoteProbe.ref)
largeRemote.tell(Ping(ByteString.fromArray(new Array[Byte](largeBytes))), probeLarge.ref)
regularRemote.tell(Ping(), probeSmall.ref)
Thread.sleep(50)
regularRemote.tell(Ping(), probeSmall.ref)
Thread.sleep(50)
regularRemote.tell(Ping(), probeSmall.ref)
// should be no problems sending regular small messages while large messages are being sent
remoteProbe.expectMsg(Pong(0))
remoteProbe.expectMsg(10.seconds, Pong(largeBytes))
probeSmall.expectMsg(Pong(0))
probeSmall.expectMsg(Pong(0))
probeSmall.expectMsg(Pong(0))
probeLarge.expectMsg(10.seconds, Pong(largeBytes))
// cached flags should be set now
largeRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should ===(Association.LargeQueueIndex)

View file

@ -2,10 +2,19 @@ package akka.remote.artery
import akka.remote.EndpointDisassociatedException
import akka.testkit.{ EventFilter, ImplicitSender, TestActors, TestEvent }
import scala.concurrent.duration._
import akka.testkit.DeadLettersFilter
import akka.testkit.TestEvent.Mute
object RemoteFailureSpec {
final case class Ping(s: String)
}
class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender {
import RemoteFailureSpec._
system.eventStream.publish(Mute(DeadLettersFilter(classOf[Ping])(occurrences = Int.MaxValue)))
"Remoting" should {
@ -29,12 +38,12 @@ class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender {
// first everything is up and running
1 to n foreach { x
localSelection ! "ping"
remoteSelections(x % remoteSystems.size) ! "ping"
localSelection ! Ping("1")
remoteSelections(x % remoteSystems.size) ! Ping("1")
}
within(5.seconds) {
receiveN(n * 2) foreach { reply reply should ===("ping") }
receiveN(n * 2) foreach { reply reply should ===(Ping("1")) }
}
// then we shutdown remote systems to simulate broken connections
@ -43,13 +52,13 @@ class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender {
}
1 to n foreach { x
localSelection ! "ping"
remoteSelections(x % remoteSystems.size) ! "ping"
localSelection ! Ping("2")
remoteSelections(x % remoteSystems.size) ! Ping("2")
}
// ping messages to localEcho should go through even though we use many different broken connections
within(5.seconds) {
receiveN(n) foreach { reply reply should ===("ping") }
receiveN(n) foreach { reply reply should ===(Ping("2")) }
}
}

View file

@ -11,12 +11,12 @@ import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._
import akka.actor.ActorSelection
class RemoteSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec(ConfigFactory.parseString("""
class ArteryUpdSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec(ConfigFactory.parseString("""
akka.remote.artery.advanced.outbound-lanes = 1
akka.remote.artery.advanced.inbound-lanes = 1
""").withFallback(ArterySpecSupport.defaultConfig))
class RemoteSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec(
class ArteryUpdSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec(
ConfigFactory.parseString("""
akka.remote.artery.advanced.outbound-lanes = 3
akka.remote.artery.advanced.inbound-lanes = 3

View file

@ -59,7 +59,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with
"deliver all messages" in {
val queue = new ManyToOneConcurrentArrayQueue[String](128)
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String])
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters))
.toMat(TestSink.probe)(Keep.both).run()
downstream.request(10)
@ -78,7 +78,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with
queue.offer("a")
queue.offer("b")
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String])
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters))
.toMat(TestSink.probe)(Keep.both).run()
downstream.request(10)
@ -96,7 +96,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with
// this test verifies that the wakeup signal is triggered correctly
val queue = new ManyToOneConcurrentArrayQueue[Int](128)
val burstSize = 100
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Int])
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Int](system.deadLetters))
.grouped(burstSize)
.async
.toMat(TestSink.probe)(Keep.both).run()
@ -124,7 +124,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with
// send 100 per producer before materializing
producers.foreach(_ ! ProduceToQueue(0, 100, queue))
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Msg])
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Msg](system.deadLetters))
.toMat(TestSink.probe)(Keep.both).run()
sendQueue.inject(queue)
@ -154,7 +154,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with
(1 to 100).foreach { n
val queue = new ManyToOneConcurrentArrayQueue[String](16)
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String])
val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters))
.toMat(TestSink.probe)(Keep.both).run()
f(queue, sendQueue, downstream)

View file

@ -2,6 +2,7 @@
* Copyright (C) 2016-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
package aeron
import java.io.File
@ -10,7 +11,7 @@ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.ExtendedActorSystem
import akka.remote.artery.AeronSink.GaveUpMessageException
import akka.remote.artery.aeron.AeronSink.GaveUpMessageException
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.scaladsl.Sink

View file

@ -263,10 +263,10 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat
"wrap around" in {
val extraConfig =
"""
akka.loglevel = INFO
akka.loglevel = DEBUG
akka.remote.artery.advanced.compression {
actor-refs.advertisement-interval = 10 millis
actor-refs.advertisement-interval = 100 millis
manifests.advertisement-interval = 10 minutes
}
"""

View file

@ -16,6 +16,8 @@ import akka.util.OptionVal
import scala.concurrent.Promise
import scala.util.control.NonFatal
import akka.stream.Attributes.LogLevels
/**
* INTERNAL API
*
@ -347,7 +349,12 @@ import scala.util.control.NonFatal
def reportStageError(e: Throwable): Unit = {
if (activeStage == null) throw e
else {
log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage)
val loggingEnabled = activeStage.attributes.get[LogLevels] match {
case Some(levels) levels.onFailure != LogLevels.Off
case None true
}
if (loggingEnabled)
log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage)
activeStage.failStage(e)
// Abort chasing

View file

@ -456,7 +456,7 @@ import scala.util.{ Failure, Success, Try }
/**
* INTERNAL API
*/
@InternalApi private[stream] object TlsUtils {
@InternalApi private[akka] object TlsUtils {
def applySessionParameters(engine: SSLEngine, sessionParameters: NegotiateNewSession): Unit = {
sessionParameters.enabledCipherSuites foreach (cs engine.setEnabledCipherSuites(cs.toArray))
sessionParameters.enabledProtocols foreach (p engine.setEnabledProtocols(p.toArray))

View file

@ -343,8 +343,8 @@ private[stream] object ConnectionSourceStage {
if (interpreter.log.isDebugEnabled) {
val msg = "Aborting tcp connection to {} because of upstream failure: {}"
if (ex.getStackTrace.isEmpty) interpreter.log.debug(msg, remoteAddress, ex.getMessage)
else interpreter.log.debug(msg + "\n{}", remoteAddress, ex.getMessage, ex.getStackTrace.mkString("\n"))
if (ex.getStackTrace.isEmpty) interpreter.log.debug(msg, remoteAddress, ex)
else interpreter.log.debug(msg + "\n{}", remoteAddress, ex, ex.getStackTrace.mkString("\n"))
}
connection ! Abort
} else failStage(ex)

View file

@ -10,7 +10,6 @@ import akka.NotUsed
import akka.dispatch.{ AbstractNodeQueue, ExecutionContexts }
import akka.stream._
import akka.stream.stage._
import scala.annotation.tailrec
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success, Try }
@ -21,9 +20,11 @@ import java.util.concurrent.atomic.AtomicReferenceArray
import scala.collection.immutable
import scala.collection.mutable.LongMap
import scala.collection.immutable.Queue
import akka.annotation.InternalApi
import akka.annotation.DoNotInherit
import akka.annotation.ApiMayChange
import akka.stream.Attributes.LogLevels
/**
* A MergeHub is a special streaming hub that is able to collect streamed elements from a dynamic set of
@ -284,7 +285,13 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) extends GraphStageWi
}
(logic, Sink.fromGraph(sink))
// propagate LogLevels attribute so that MergeHub can be used with onFailure = LogLevels.Off
val sinkWithAttributes = inheritedAttributes.get[LogLevels] match {
case Some(a) Sink.fromGraph(sink).addAttributes(Attributes(a))
case None Sink.fromGraph(sink)
}
(logic, sinkWithAttributes)
}
}

View file

@ -801,7 +801,7 @@ trait TestKitBase {
*/
def shutdown(
actorSystem: ActorSystem = system,
duration: Duration = 5.seconds.dilated.min(10.seconds),
duration: Duration = 10.seconds.dilated.min(10.seconds),
verifySystemShutdown: Boolean = false) {
TestKit.shutdownActorSystem(actorSystem, duration, verifySystemShutdown)
}