diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala index 9484fb0e60..2b5d1bde94 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/CodecBenchmark.scala @@ -127,7 +127,7 @@ class CodecBenchmark { } else null val envelope = new EnvelopeBuffer(envelopeTemplateBuffer) val outboundEnvelope = OutboundEnvelope(OptionVal.None, payload, OptionVal.None) - headerIn setVersion 1 + headerIn setVersion ArteryTransport.HighestVersion headerIn setUid 42 headerIn setSenderActorRef actorOnSystemA headerIn setRecipientActorRef remoteRefB @@ -138,13 +138,14 @@ class CodecBenchmark { // Now build up the graphs val encoder: Flow[OutboundEnvelope, EnvelopeBuffer, Encoder.OutboundCompressionAccess] = - Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, envelopePool, false)) + Flow.fromGraph(new Encoder(uniqueLocalAddress, system.asInstanceOf[ExtendedActorSystem], outboundEnvelopePool, + envelopePool, streamId = 1, debugLogSend = false, version = ArteryTransport.HighestVersion)) val encoderInput: Flow[String, OutboundEnvelope, NotUsed] = Flow[String].map(msg ⇒ outboundEnvelopePool.acquire().init(OptionVal.None, payload, OptionVal.Some(remoteRefB))) val compressions = new InboundCompressionsImpl(system, inboundContext, inboundContext.settings.Advanced.Compression) val decoder: Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] = Flow.fromGraph(new Decoder(inboundContext, system.asInstanceOf[ExtendedActorSystem], - uniqueLocalAddress, inboundContext.settings, envelopePool, compressions, inboundEnvelopePool)) + uniqueLocalAddress, inboundContext.settings, compressions, inboundEnvelopePool)) val deserializer: Flow[InboundEnvelope, InboundEnvelope, NotUsed] = Flow.fromGraph(new Deserializer(inboundContext, system.asInstanceOf[ExtendedActorSystem], envelopePool)) val decoderInput: Flow[String, EnvelopeBuffer, NotUsed] = Flow[String] diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala index 503592e957..5af52d0ea1 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala @@ -115,7 +115,7 @@ class SendQueueBenchmark { val burstSize = 1000 val queue = new ManyToOneConcurrentArrayQueue[Int](1024) - val source = Source.fromGraph(new SendQueue[Int]) + val source = Source.fromGraph(new SendQueue[Int](system.deadLetters)) val (sendQueue, killSwitch) = source.viaMat(KillSwitches.single)(Keep.both) .toMat(new BarrierSink(N, latch, burstSize, barrier))(Keep.left).run()(materializer) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SharedMediaDriverSupport.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SharedMediaDriverSupport.scala index 223fac7651..31ea301431 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SharedMediaDriverSupport.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SharedMediaDriverSupport.scala @@ -12,7 +12,7 @@ import scala.util.control.NonFatal import akka.remote.RemoteSettings import akka.remote.artery.ArterySettings -import akka.remote.artery.TaskRunner +import akka.remote.artery.aeron.TaskRunner import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import com.typesafe.config.ConfigFactory diff --git a/akka-docs/src/main/paradox/remoting-artery.md b/akka-docs/src/main/paradox/remoting-artery.md index b90091ace4..4915a5434c 100644 --- a/akka-docs/src/main/paradox/remoting-artery.md +++ b/akka-docs/src/main/paradox/remoting-artery.md @@ -478,7 +478,7 @@ phi = -log10(1 - F(timeSinceLastHeartbeat)) where F is the cumulative distribution function of a normal distribution with mean and standard deviation estimated from historical heartbeat inter-arrival times. -In the [Remote Configuration](#remote-configuration-artery) you can adjust the `akka.remote.watch-failure-detector.threshold` +In the @ref:[Remote Configuration](#remote-configuration-artery) you can adjust the `akka.remote.watch-failure-detector.threshold` to define when a *phi* value is considered to be a failure. A low `threshold` is prone to generate many false positives but ensures @@ -504,7 +504,7 @@ a standard deviation of 100 ms. To be able to survive sudden abnormalities, such as garbage collection pauses and transient network failures the failure detector is configured with a margin, `akka.remote.watch-failure-detector.acceptable-heartbeat-pause`. You may want to -adjust the [Remote Configuration](#remote-configuration-artery) of this depending on you environment. +adjust the @ref:[Remote Configuration](#remote-configuration-artery) of this depending on you environment. This is how the curve looks like for `acceptable-heartbeat-pause` configured to 3 seconds. diff --git a/akka-docs/src/main/paradox/remoting.md b/akka-docs/src/main/paradox/remoting.md index 37dcf0d26b..846b448611 100644 --- a/akka-docs/src/main/paradox/remoting.md +++ b/akka-docs/src/main/paradox/remoting.md @@ -512,7 +512,7 @@ To intercept generic remoting related errors, listen to `RemotingErrorEvent` whi ## Remote Security -An `ActorSystem` should not be exposed via Akka Remote over plain TCP to an untrusted network (e.g. internet). +An `ActorSystem` should not be exposed via Akka Remote over plain TCP to an untrusted network (e.g. Internet). It should be protected by network security, such as a firewall. If that is not considered as enough protection [TLS with mutual authentication](#remote-tls) should be enabled. diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala similarity index 99% rename from akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala rename to akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala index 3148ce7d7b..fe882dd1e4 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala @@ -2,6 +2,7 @@ * Copyright (C) 2016-2018 Lightbend Inc. */ package akka.remote.artery +package aeron import java.io.File import java.util.concurrent.atomic.AtomicInteger diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala similarity index 99% rename from akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala rename to akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala index 7e83240674..7c7e045b3c 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala @@ -2,6 +2,7 @@ * Copyright (C) 2016-2018 Lightbend Inc. */ package akka.remote.artery +package aeron import java.io.File import java.util.concurrent.CyclicBarrier @@ -258,7 +259,7 @@ abstract class AeronStreamLatencySpec envelope } - val queueValue = Source.fromGraph(new SendQueue[Unit]) + val queueValue = Source.fromGraph(new SendQueue[Unit](system.deadLetters)) .via(sendFlow) .to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) .run() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala similarity index 99% rename from akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala rename to akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala index 02aa5f7a9f..40c447d5dd 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala @@ -2,6 +2,7 @@ * Copyright (C) 2016-2018 Lightbend Inc. */ package akka.remote.artery +package aeron import java.util.concurrent.Executors diff --git a/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java b/akka-remote/src/main/java/akka/remote/artery/aeron/AeronErrorLog.java similarity index 98% rename from akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java rename to akka-remote/src/main/java/akka/remote/artery/aeron/AeronErrorLog.java index 6e9266f655..1100f3523a 100644 --- a/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java +++ b/akka-remote/src/main/java/akka/remote/artery/aeron/AeronErrorLog.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package akka.remote.artery; +package akka.remote.artery.aeron; import akka.event.NoLogging; import io.aeron.CncFileDescriptor; diff --git a/akka-remote/src/main/mima-filters/2.5.9.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.9.backwards.excludes index d296d2911a..2e3a0d951b 100644 --- a/akka-remote/src/main/mima-filters/2.5.9.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.9.backwards.excludes @@ -7,3 +7,25 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefPr ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider#Internals.copy") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider#Internals.copy$default$3") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.RemoteActorRefProvider#Internals.this") + +# #24390 Artery TCP/TLS transport +ProblemFilters.exclude[Problem]("akka.remote.artery.ArteryTransport*") +ProblemFilters.exclude[Problem]("akka.remote.artery.HeaderBuilder*") +ProblemFilters.exclude[Problem]("akka.remote.artery.SendQueue*") +ProblemFilters.exclude[Problem]("akka.remote.artery.InboundEnvelope*") +ProblemFilters.exclude[Problem]("akka.remote.artery.Encoder*") +ProblemFilters.exclude[Problem]("akka.remote.artery.RemoteInstruments*") +ProblemFilters.exclude[Problem]("akka.remote.artery.ReusableInboundEnvelope*") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.ArterySettings#Bind.BindTimeout") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.EventSink.loFreq") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.EventSink.alert") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.FlightRecorderEvents.Transport_AeronStarted") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.Decoder.this") +ProblemFilters.exclude[Problem]("akka.remote.artery.AeronSink*") +ProblemFilters.exclude[Problem]("akka.remote.artery.AeronSource*") +ProblemFilters.exclude[Problem]("akka.remote.artery.TaskRunner*") +ProblemFilters.exclude[Problem]("akka.remote.artery.AeronErrorLog*") + + + + diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 49f8ca27a6..ac7f383875 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -611,7 +611,7 @@ akka { # DEPRECATED, since 2.5.0 # The netty.udp transport is deprecated, please use Artery instead. - # See: http://doc.akka.io/docs/akka/2.4/scala/remoting-artery.html + # See: https://doc.akka.io/docs/akka/current/remoting-artery.html netty.udp = ${akka.remote.netty.tcp} netty.udp { transport-protocol = udp @@ -724,7 +724,7 @@ akka { remote { #//#artery - ### Configuration for Artery, the reimplementation of remoting + ### Configuration for Artery, the new implementation of remoting artery { # Enable the new remoting with this flag @@ -770,7 +770,7 @@ akka { # hostname = "" - # Time to wait for Aeron to bind + # Time to wait for Aeron/TCP to bind bind-timeout = 3s } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index c077421e22..5fdd2d5a0b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -11,12 +11,12 @@ import akka.event.{ EventStream, Logging, LoggingAdapter } import akka.event.Logging.Error import akka.serialization.{ Serialization, SerializationExtension } import akka.pattern.pipe - import scala.util.control.NonFatal -import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } +import akka.actor.SystemGuardian.{ RegisterTerminationHook, TerminationHook, TerminationHookDone } import scala.util.control.Exception.Catcher import scala.concurrent.Future + import akka.ConfigurationException import akka.annotation.InternalApi import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } @@ -24,6 +24,7 @@ import akka.remote.artery.ArteryTransport import akka.util.OptionVal import akka.remote.artery.OutboundEnvelope import akka.remote.artery.SystemMessageDelivery.SystemMessageEnvelope +import akka.remote.artery.aeron.ArteryAeronUdpTransport import akka.remote.serialization.ActorRefResolveCache import akka.remote.serialization.ActorRefResolveThreadLocalCache @@ -203,7 +204,7 @@ private[akka] class RemoteActorRefProvider( local.registerExtraNames(Map(("remote", d))) d }, - transport = if (remoteSettings.Artery.Enabled) new ArteryTransport(system, this) else new Remoting(system, this)) + transport = if (remoteSettings.Artery.Enabled) new ArteryAeronUdpTransport(system, this) else new Remoting(system, this)) _internals = internals remotingTerminator ! internals diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index c8fbf60798..2abdde9435 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -3,26 +3,25 @@ */ package akka.remote.artery -import akka.japi.Util.immutableSeq -import akka.ConfigurationException -import akka.event.Logging -import akka.event.Logging.LogLevel -import akka.stream.ActorMaterializerSettings -import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase } -import akka.util.WildcardIndex -import akka.NotUsed -import com.typesafe.config.{ Config, ConfigFactory } +import java.net.InetAddress import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import java.net.InetAddress -import java.nio.file.Path -import java.util.concurrent.TimeUnit + +import akka.NotUsed +import akka.japi.Util.immutableSeq +import akka.stream.ActorMaterializerSettings +import akka.util.Helpers.ConfigOps +import akka.util.Helpers.Requiring +import akka.util.Helpers.toRootLowerCase +import akka.util.WildcardIndex +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory /** INTERNAL API */ private[akka] final class ArterySettings private (config: Config) { - import config._ import ArterySettings._ + import config._ def withDisabledCompression(): ArterySettings = ArterySettings(ConfigFactory.parseString( @@ -34,7 +33,7 @@ private[akka] final class ArterySettings private (config: Config) { val Enabled: Boolean = getBoolean("enabled") object Canonical { - val config = getConfig("canonical") + val config: Config = getConfig("canonical") import config._ val Port: Int = getInt("port").requiring(port ⇒ @@ -43,22 +42,23 @@ private[akka] final class ArterySettings private (config: Config) { } object Bind { - val config = getConfig("bind") + val config: Config = getConfig("bind") import config._ val Port: Int = getString("port") match { - case "" ⇒ Canonical.Port - case other ⇒ getInt("port").requiring(port ⇒ 0 to 65535 contains port, "bind.port must be 0 through 65535") + case "" ⇒ Canonical.Port + case _ ⇒ getInt("port").requiring(port ⇒ 0 to 65535 contains port, "bind.port must be 0 through 65535") } val Hostname: String = getHostname("hostname", config) match { case "" ⇒ Canonical.Hostname case other ⇒ other } - val BindTimeout = getDuration("bind-timeout").requiring(!_.isNegative, "bind-timeout can not be negative") + val BindTimeout: FiniteDuration = config.getMillisDuration("bind-timeout").requiring( + _ > Duration.Zero, "bind-timeout can not be negative") } - val LargeMessageDestinations = + val LargeMessageDestinations: WildcardIndex[NotUsed] = config.getStringList("large-message-destinations").asScala.foldLeft(WildcardIndex[NotUsed]()) { (tree, entry) ⇒ val segments = entry.split('/').tail tree.insert(segments, NotUsed) @@ -71,34 +71,41 @@ private[akka] final class ArterySettings private (config: Config) { val LogSend: Boolean = getBoolean("log-sent-messages") val LogAeronCounters: Boolean = config.getBoolean("log-aeron-counters") + /** + * Used version of the header format for outbound messages. + * To support rolling upgrades this may be a lower version than `ArteryTransport.HighestVersion`, + * which is the highest supported version on receiving (decoding) side. + */ + val Version: Byte = ArteryTransport.HighestVersion + object Advanced { - val config = getConfig("advanced") + val config: Config = getConfig("advanced") import config._ val TestMode: Boolean = getBoolean("test-mode") - val Dispatcher = getString("use-dispatcher") - val ControlStreamDispatcher = getString("use-control-stream-dispatcher") - val MaterializerSettings = { + val Dispatcher: String = getString("use-dispatcher") + val ControlStreamDispatcher: String = getString("use-control-stream-dispatcher") + val MaterializerSettings: ActorMaterializerSettings = { val settings = ActorMaterializerSettings(config.getConfig("materializer")) if (Dispatcher.isEmpty) settings else settings.withDispatcher(Dispatcher) } - val ControlStreamMaterializerSettings = { + val ControlStreamMaterializerSettings: ActorMaterializerSettings = { val settings = ActorMaterializerSettings(config.getConfig("materializer")) if (ControlStreamDispatcher.isEmpty) settings else settings.withDispatcher(ControlStreamDispatcher) } - val EmbeddedMediaDriver = getBoolean("embedded-media-driver") - val AeronDirectoryName = getString("aeron-dir") requiring (dir ⇒ + val EmbeddedMediaDriver: Boolean = getBoolean("embedded-media-driver") + val AeronDirectoryName: String = getString("aeron-dir") requiring (dir ⇒ EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver") - val DeleteAeronDirectory = getBoolean("delete-aeron-dir") + val DeleteAeronDirectory: Boolean = getBoolean("delete-aeron-dir") val IdleCpuLevel: Int = getInt("idle-cpu-level").requiring(level ⇒ 1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10") - val OutboundLanes = getInt("outbound-lanes").requiring(n ⇒ + val OutboundLanes: Int = getInt("outbound-lanes").requiring(n ⇒ n > 0, "outbound-lanes must be greater than zero") - val InboundLanes = getInt("inbound-lanes").requiring(n ⇒ + val InboundLanes: Int = getInt("inbound-lanes").requiring(n ⇒ n > 0, "inbound-lanes must be greater than zero") val SysMsgBufferSize: Int = getInt("system-message-buffer-size").requiring( _ > 0, "system-message-buffer-size must be more than zero") @@ -108,34 +115,43 @@ private[akka] final class ArterySettings private (config: Config) { _ > 0, "outbound-control-queue-size must be more than zero") val OutboundLargeMessageQueueSize: Int = getInt("outbound-large-message-queue-size").requiring( _ > 0, "outbound-large-message-queue-size must be more than zero") - val SystemMessageResendInterval = config.getMillisDuration("system-message-resend-interval").requiring(interval ⇒ - interval > Duration.Zero, "system-message-resend-interval must be more than zero") - val HandshakeTimeout = config.getMillisDuration("handshake-timeout").requiring(interval ⇒ + val SystemMessageResendInterval: FiniteDuration = + config.getMillisDuration("system-message-resend-interval").requiring(interval ⇒ + interval > Duration.Zero, "system-message-resend-interval must be more than zero") + val HandshakeTimeout: FiniteDuration = config.getMillisDuration("handshake-timeout").requiring(interval ⇒ interval > Duration.Zero, "handshake-timeout must be more than zero") - val HandshakeRetryInterval = config.getMillisDuration("handshake-retry-interval").requiring(interval ⇒ - interval > Duration.Zero, "handshake-retry-interval must be more than zero") - val InjectHandshakeInterval = config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒ - interval > Duration.Zero, "inject-handshake-interval must be more than zero") - val GiveUpMessageAfter = config.getMillisDuration("give-up-message-after").requiring(interval ⇒ + val HandshakeRetryInterval: FiniteDuration = + config.getMillisDuration("handshake-retry-interval").requiring(interval ⇒ + interval > Duration.Zero, "handshake-retry-interval must be more than zero") + val InjectHandshakeInterval: FiniteDuration = + config.getMillisDuration("inject-handshake-interval").requiring(interval ⇒ + interval > Duration.Zero, "inject-handshake-interval must be more than zero") + val GiveUpMessageAfter: FiniteDuration = config.getMillisDuration("give-up-message-after").requiring(interval ⇒ interval > Duration.Zero, "give-up-message-after must be more than zero") - val GiveUpSystemMessageAfter = config.getMillisDuration("give-up-system-message-after").requiring(interval ⇒ - interval > Duration.Zero, "give-up-system-message-after must be more than zero") - val ShutdownFlushTimeout = config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒ - interval > Duration.Zero, "shutdown-flush-timeout must be more than zero") - val InboundRestartTimeout = config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒ - interval > Duration.Zero, "inbound-restart-timeout must be more than zero") - val InboundMaxRestarts = getInt("inbound-max-restarts") - val OutboundRestartTimeout = config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒ - interval > Duration.Zero, "outbound-restart-timeout must be more than zero") - val OutboundMaxRestarts = getInt("outbound-max-restarts") - val StopQuarantinedAfterIdle = config.getMillisDuration("stop-quarantined-after-idle").requiring(interval ⇒ - interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero") - val ClientLivenessTimeout = config.getMillisDuration("client-liveness-timeout").requiring(interval ⇒ - interval > Duration.Zero, "client-liveness-timeout must be more than zero") - val ImageLivenessTimeout = config.getMillisDuration("image-liveness-timeout").requiring(interval ⇒ + val GiveUpSystemMessageAfter: FiniteDuration = + config.getMillisDuration("give-up-system-message-after").requiring(interval ⇒ + interval > Duration.Zero, "give-up-system-message-after must be more than zero") + val ShutdownFlushTimeout: FiniteDuration = + config.getMillisDuration("shutdown-flush-timeout").requiring(interval ⇒ + interval > Duration.Zero, "shutdown-flush-timeout must be more than zero") + val InboundRestartTimeout: FiniteDuration = + config.getMillisDuration("inbound-restart-timeout").requiring(interval ⇒ + interval > Duration.Zero, "inbound-restart-timeout must be more than zero") + val InboundMaxRestarts: Int = getInt("inbound-max-restarts") + val OutboundRestartTimeout: FiniteDuration = + config.getMillisDuration("outbound-restart-timeout").requiring(interval ⇒ + interval > Duration.Zero, "outbound-restart-timeout must be more than zero") + val OutboundMaxRestarts: Int = getInt("outbound-max-restarts") + val StopQuarantinedAfterIdle: FiniteDuration = + config.getMillisDuration("stop-quarantined-after-idle").requiring(interval ⇒ + interval > Duration.Zero, "stop-quarantined-after-idle must be more than zero") + val ClientLivenessTimeout: FiniteDuration = + config.getMillisDuration("client-liveness-timeout").requiring(interval ⇒ + interval > Duration.Zero, "client-liveness-timeout must be more than zero") + val ImageLivenessTimeout: FiniteDuration = config.getMillisDuration("image-liveness-timeout").requiring(interval ⇒ interval > Duration.Zero, "image-liveness-timeout must be more than zero") require(ImageLivenessTimeout < HandshakeTimeout, "image-liveness-timeout must be less than handshake-timeout") - val DriverTimeout = config.getMillisDuration("driver-timeout").requiring(interval ⇒ + val DriverTimeout: FiniteDuration = config.getMillisDuration("driver-timeout").requiring(interval ⇒ interval > Duration.Zero, "driver-timeout must be more than zero") val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled") val FlightRecorderDestination: String = getString("flight-recorder.destination") @@ -164,18 +180,18 @@ private[akka] object ArterySettings { private[akka] final val Enabled = ActorRefs.Max > 0 || Manifests.Max > 0 object ActorRefs { - val config = getConfig("actor-refs") + val config: Config = getConfig("actor-refs") import config._ - val AdvertisementInterval = config.getMillisDuration("advertisement-interval") - val Max = getInt("max") + val AdvertisementInterval: FiniteDuration = config.getMillisDuration("advertisement-interval") + val Max: Int = getInt("max") } object Manifests { - val config = getConfig("manifests") + val config: Config = getConfig("manifests") import config._ - val AdvertisementInterval = config.getMillisDuration("advertisement-interval") - val Max = getInt("max") + val AdvertisementInterval: FiniteDuration = config.getMillisDuration("advertisement-interval") + val Max: Int = getInt("max") } } object Compression { @@ -183,9 +199,10 @@ private[akka] object ArterySettings { final val Debug = false // unlocks additional very verbose debug logging of compression events (to stdout) } - def getHostname(key: String, config: Config) = config.getString(key) match { + def getHostname(key: String, config: Config): String = config.getString(key) match { case "" ⇒ InetAddress.getLocalHost.getHostAddress case "" ⇒ InetAddress.getLocalHost.getHostName case other ⇒ other } + } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index b7482c46d6..5414a3f4c2 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -3,17 +3,19 @@ */ package akka.remote.artery -import java.io.File import java.net.InetSocketAddress -import java.nio.channels.{ DatagramChannel, FileChannel } +import java.nio.channels.DatagramChannel +import java.nio.channels.FileChannel +import java.nio.channels.ServerSocketChannel import java.nio.file.Path -import java.util.UUID import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec -import scala.concurrent.{ Await, Future, Promise } +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success @@ -23,21 +25,26 @@ import scala.util.control.NonFatal import akka.Done import akka.NotUsed -import akka.actor._ import akka.actor.Actor -import akka.actor.Cancellable import akka.actor.Props +import akka.actor._ import akka.event.Logging import akka.event.LoggingAdapter -import akka.remote._ -import akka.remote.artery.AeronSource.ResourceLifecycle +import akka.remote.AddressUidExtension +import akka.remote.RemoteActorRef +import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteTransport +import akka.remote.ThisActorSystemQuarantinedEvent +import akka.remote.UniqueAddress import akka.remote.artery.ArteryTransport.ShuttingDown +import akka.remote.artery.Decoder.InboundCompressionAccess import akka.remote.artery.Encoder.OutboundCompressionAccess import akka.remote.artery.InboundControlJunction.ControlMessageObserver import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress -import akka.remote.artery.compress._ import akka.remote.artery.compress.CompressionProtocol.CompressionMessage +import akka.remote.artery.compress._ +import akka.remote.artery.aeron.AeronSource import akka.remote.transport.ThrottlerTransportAdapter.Blackhole import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle import akka.remote.transport.ThrottlerTransportAdapter.Unthrottled @@ -49,20 +56,8 @@ import akka.stream.SharedKillSwitch import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Keep import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source import akka.util.OptionVal import akka.util.WildcardIndex -import io.aeron._ -import io.aeron.driver.MediaDriver -import io.aeron.driver.ThreadingMode -import io.aeron.exceptions.ConductorServiceTimeoutException -import io.aeron.exceptions.DriverTimeoutException -import org.agrona.{ DirectBuffer, ErrorHandler, IoUtil } -import org.agrona.concurrent.BackoffIdleStrategy -import akka.remote.artery.Decoder.InboundCompressionAccess -import io.aeron.status.ChannelEndpointStatus -import org.agrona.collections.IntObjConsumer -import org.agrona.concurrent.status.CountersReader.MetaData /** * INTERNAL API @@ -172,7 +167,7 @@ private[remote] final class AssociationState( override def toString(): String = { val a = uniqueRemoteAddressPromise.future.value match { case Some(Success(a)) ⇒ a - case Some(Failure(e)) ⇒ s"Failure(${e.getMessage})" + case Some(Failure(e)) ⇒ s"Failure($e)" case None ⇒ "unknown" } s"AssociationState($incarnation, $a)" @@ -285,26 +280,19 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati /** * INTERNAL API */ -private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) +private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) with InboundContext { - import ArteryTransport.AeronTerminated - import ArteryTransport.ShutdownSignal - import ArteryTransport.InboundStreamMatValues + import ArteryTransport._ import FlightRecorderEvents._ // these vars are initialized once in the start method @volatile private[this] var _localAddress: UniqueAddress = _ @volatile private[this] var _bindAddress: UniqueAddress = _ @volatile private[this] var _addresses: Set[Address] = _ - @volatile private[this] var materializer: Materializer = _ - @volatile private[this] var controlMaterializer: Materializer = _ + @volatile protected var materializer: Materializer = _ + @volatile protected var controlMaterializer: Materializer = _ @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ - private[this] val mediaDriver = new AtomicReference[Option[MediaDriver]](None) - @volatile private[this] var aeron: Aeron = _ - @volatile private[this] var aeronErrorLogTask: Cancellable = _ - @volatile private[this] var aeronCounterTask: Cancellable = _ - @volatile private[this] var areonErrorLog: AeronErrorLog = _ override val log: LoggingAdapter = Logging(system, getClass.getName) @@ -319,7 +307,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R * * Use `inboundCompressionAccess` (provided by the materialized `Decoder`) to call into the compression infrastructure. */ - private[this] val _inboundCompressions = { + protected val _inboundCompressions = { if (settings.Advanced.Compression.Enabled) { val eventSink = createFlightRecorderEventSink(synchr = false) new InboundCompressionsImpl(system, this, settings.Advanced.Compression, eventSink) @@ -329,6 +317,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var _inboundCompressionAccess: OptionVal[InboundCompressionAccess] = OptionVal.None /** Only access compression tables via the CompressionAccess */ def inboundCompressionAccess: OptionVal[InboundCompressionAccess] = _inboundCompressionAccess + protected def setInboundCompressionAccess(a: InboundCompressionAccess): Unit = + _inboundCompressionAccess = OptionVal(a) def bindAddress: UniqueAddress = _bindAddress override def localAddress: UniqueAddress = _localAddress @@ -336,17 +326,16 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def addresses: Set[Address] = _addresses override def localAddressForRemote(remote: Address): Address = defaultAddress - private val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") + protected val killSwitch: SharedKillSwitch = KillSwitches.shared("transportKillSwitch") // keyed by the streamId - private[this] val streamMatValues = new AtomicReference(Map.empty[Int, InboundStreamMatValues]) + protected val streamMatValues = new AtomicReference(Map.empty[Int, InboundStreamMatValues]) private[this] val hasBeenShutdown = new AtomicBoolean(false) private val testState = new SharedTestState - private val inboundLanes = settings.Advanced.InboundLanes + protected val inboundLanes = settings.Advanced.InboundLanes - // TODO use WildcardIndex.isEmpty when merged from master val largeMessageChannelEnabled: Boolean = !settings.LargeMessageDestinations.wildcardTree.isEmpty || !settings.LargeMessageDestinations.doubleWildcardTree.isEmpty @@ -361,26 +350,21 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .insert(Array("system", "cluster", "core", "daemon", "crossDcHeartbeatSender"), NotUsed) .insert(Array("system", "cluster", "heartbeatReceiver"), NotUsed) - private def inboundChannel = s"aeron:udp?endpoint=${_bindAddress.address.host.get}:${_bindAddress.address.port.get}" - private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" - - private val controlStreamId = 1 - private val ordinaryStreamId = 2 - private val largeStreamId = 3 - - private val taskRunner = new TaskRunner(system, settings.Advanced.IdleCpuLevel) - private val restartCounter = new RestartCounter(settings.Advanced.InboundMaxRestarts, settings.Advanced.InboundRestartTimeout) - private val envelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumFrameSize, settings.Advanced.BufferPoolSize) - private val largeEnvelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.LargeBufferPoolSize) + protected val envelopeBufferPool = new EnvelopeBufferPool(settings.Advanced.MaximumFrameSize, settings.Advanced.BufferPoolSize) + protected val largeEnvelopeBufferPool = + if (largeMessageChannelEnabled) + new EnvelopeBufferPool(settings.Advanced.MaximumLargeFrameSize, settings.Advanced.LargeBufferPoolSize) + else // not used + new EnvelopeBufferPool(0, 2) private val inboundEnvelopePool = ReusableInboundEnvelope.createObjectPool(capacity = 16) // The outboundEnvelopePool is shared among all outbound associations private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3) - private val topLevelFREvents = + protected val topLevelFREvents = createFlightRecorderEventSink(synchr = true) def createFlightRecorderEventSink(synchr: Boolean = false): EventSink = { @@ -411,16 +395,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R if (system.settings.JvmShutdownHooks) Runtime.getRuntime.addShutdownHook(shutdownHook) - startMediaDriver() - startAeron() - topLevelFREvents.loFreq(Transport_AeronStarted, NoMetaData) - startAeronErrorLog() - topLevelFREvents.loFreq(Transport_AeronErrorLogStarted, NoMetaData) - if (settings.LogAeronCounters) { - startAeronCounterLog() - } - taskRunner.start() - topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData) + startTransport() + topLevelFREvents.loFreq(Transport_Started, NoMetaData) val port = if (settings.Canonical.Port == 0) { @@ -443,7 +419,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R AddressUidExtension(system).longAddressUid) // TODO: This probably needs to be a global value instead of an event as events might rotate out of the log - topLevelFREvents.loFreq(Transport_UniqueAddressSet, _localAddress.toString().getBytes("US-ASCII")) + topLevelFREvents.loFreq(Transport_UniqueAddressSet, _localAddress.toString()) materializer = ActorMaterializer.systemMaterializer(settings.Advanced.MaterializerSettings, "remote", system) controlMaterializer = ActorMaterializer.systemMaterializer( @@ -454,12 +430,32 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData) runInboundStreams() - blockUntilChannelActive() topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData) log.info("Remoting started; listening on address: [{}] with UID [{}]", localAddress.address, localAddress.uid) } + protected def startTransport(): Unit + + protected def runInboundStreams(): Unit + + // Select inbound lane based on destination to preserve message order, + // Also include the uid of the sending system in the hash to spread + // "hot" destinations, e.g. ActorSelection anchor. + protected val inboundLanePartitioner: InboundEnvelope ⇒ Int = env ⇒ { + env.recipient match { + case OptionVal.Some(r) ⇒ + val a = r.path.uid + val b = env.originUid + val hashA = 23 + a + val hash: Int = 23 * hashA + java.lang.Long.hashCode(b) + math.abs(hash) % inboundLanes + case OptionVal.None ⇒ + // the lane is set by the DuplicateHandshakeReq stage, otherwise 0 + env.lane + } + } + private lazy val shutdownHook = new Thread { override def run(): Unit = { if (!hasBeenShutdown.get) { @@ -479,229 +475,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } } - private def startMediaDriver(): Unit = { - if (settings.Advanced.EmbeddedMediaDriver) { - val driverContext = new MediaDriver.Context - if (settings.Advanced.AeronDirectoryName.nonEmpty) { - driverContext.aeronDirectoryName(settings.Advanced.AeronDirectoryName) - } else { - // create a random name but include the actor system name for easier debugging - val uniquePart = UUID.randomUUID().toString - val randomName = s"${CommonContext.AERON_DIR_PROP_DEFAULT}-${system.name}-$uniquePart" - driverContext.aeronDirectoryName(randomName) - } - driverContext.clientLivenessTimeoutNs(settings.Advanced.ClientLivenessTimeout.toNanos) - driverContext.imageLivenessTimeoutNs(settings.Advanced.ImageLivenessTimeout.toNanos) - driverContext.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis) - - val idleCpuLevel = settings.Advanced.IdleCpuLevel - if (idleCpuLevel == 10) { - driverContext - .threadingMode(ThreadingMode.DEDICATED) - .conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1)) - .receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) - .senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) - } else if (idleCpuLevel == 1) { - driverContext - .threadingMode(ThreadingMode.SHARED) - .sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) - } else if (idleCpuLevel <= 7) { - driverContext - .threadingMode(ThreadingMode.SHARED_NETWORK) - .sharedNetworkIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) - } else { - driverContext - .threadingMode(ThreadingMode.DEDICATED) - .receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) - .senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) - } - - val driver = MediaDriver.launchEmbedded(driverContext) - log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) - topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName().getBytes("US-ASCII")) - if (!mediaDriver.compareAndSet(None, Some(driver))) { - throw new IllegalStateException("media driver started more than once") - } - } - } - - private def aeronDir: String = mediaDriver.get match { - case Some(driver) ⇒ driver.aeronDirectoryName - case None ⇒ settings.Advanced.AeronDirectoryName - } - - private def stopMediaDriver(): Unit = { - // make sure we only close the driver once or we will crash the JVM - val maybeDriver = mediaDriver.getAndSet(None) - maybeDriver.foreach { driver ⇒ - // this is only for embedded media driver - try driver.close() catch { - case NonFatal(e) ⇒ - // don't think driver.close will ever throw, but just in case - log.warning("Couldn't close Aeron embedded media driver due to [{}]", e.getMessage) - } - - try { - if (settings.Advanced.DeleteAeronDirectory) { - IoUtil.delete(new File(driver.aeronDirectoryName), false) - topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData) - } - } catch { - case NonFatal(e) ⇒ - log.warning( - "Couldn't delete Aeron embedded media driver files in [{}] due to [{}]", - driver.aeronDirectoryName, e.getMessage) - } - } - } - - // TODO: Add FR events - private def startAeron(): Unit = { - val ctx = new Aeron.Context - - ctx.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis) - - ctx.availableImageHandler(new AvailableImageHandler { - override def onAvailableImage(img: Image): Unit = { - if (log.isDebugEnabled) - log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}") - } - }) - ctx.unavailableImageHandler(new UnavailableImageHandler { - override def onUnavailableImage(img: Image): Unit = { - if (log.isDebugEnabled) - log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}") - - // freeSessionBuffer in AeronSource FragmentAssembler - streamMatValues.get.valuesIterator.foreach { - case InboundStreamMatValues(resourceLife, _) ⇒ resourceLife.onUnavailableImage(img.sessionId) - } - } - }) - - ctx.errorHandler(new ErrorHandler { - private val fatalErrorOccured = new AtomicBoolean - - override def onError(cause: Throwable): Unit = { - cause match { - case e: ConductorServiceTimeoutException ⇒ handleFatalError(e) - case e: DriverTimeoutException ⇒ handleFatalError(e) - case _: AeronTerminated ⇒ // already handled, via handleFatalError - case _ ⇒ - log.error(cause, s"Aeron error, ${cause.getMessage}") - } - } - - private def handleFatalError(cause: Throwable): Unit = { - if (fatalErrorOccured.compareAndSet(false, true)) { - if (!isShutdown) { - log.error(cause, "Fatal Aeron error {}. Have to terminate ActorSystem because it lost contact with the " + - "{} Aeron media driver. Possible configuration properties to mitigate the problem are " + - "'client-liveness-timeout' or 'driver-timeout'. {}", - Logging.simpleName(cause), - if (settings.Advanced.EmbeddedMediaDriver) "embedded" else "external", - cause.getMessage) - taskRunner.stop() - aeronErrorLogTask.cancel() - if (settings.LogAeronCounters) aeronCounterTask.cancel() - system.terminate() - throw new AeronTerminated(cause) - } - } else - throw new AeronTerminated(cause) - } - }) - - ctx.aeronDirectoryName(aeronDir) - aeron = Aeron.connect(ctx) - } - - private def blockUntilChannelActive(): Unit = { - val counterIdForInboundChannel = findCounterId(s"rcv-channel: $inboundChannel") - val waitInterval = 200 - val retries = math.max(1, settings.Bind.BindTimeout.toMillis / waitInterval) - retry(retries) - - @tailrec def retry(retries: Long): Unit = { - val status = aeron.countersReader().getCounterValue(counterIdForInboundChannel) - if (status == ChannelEndpointStatus.ACTIVE) { - log.debug("Inbound channel is now active") - } else if (status == ChannelEndpointStatus.ERRORED) { - areonErrorLog.logErrors(log, 0L) - stopMediaDriver() - throw new RemoteTransportException("Inbound Aeron channel is in errored state. See Aeron logs for details.") - } else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) { - Thread.sleep(waitInterval) - retry(retries - 1) - } else { - areonErrorLog.logErrors(log, 0L) - stopMediaDriver() - throw new RemoteTransportException("Timed out waiting for Aeron transport to bind. See Aeoron logs.") - } - } - } - - private def findCounterId(label: String): Int = { - var counterId = -1 - aeron.countersReader().forEach(new IntObjConsumer[String] { - def accept(i: Int, l: String): Unit = { - if (label == l) - counterId = i - } - }) - if (counterId == -1) { - throw new RuntimeException(s"Unable to found counterId for label: $label") - } else { - counterId - } - } - - // TODO Add FR Events - private def startAeronErrorLog(): Unit = { - areonErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE), log) - val lastTimestamp = new AtomicLong(0L) - import system.dispatcher - aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) { - if (!isShutdown) { - val newLastTimestamp = areonErrorLog.logErrors(log, lastTimestamp.get) - lastTimestamp.set(newLastTimestamp + 1) - } - } - } - - private def startAeronCounterLog(): Unit = { - import system.dispatcher - aeronCounterTask = system.scheduler.schedule(5.seconds, 5.seconds) { - if (!isShutdown && log.isDebugEnabled) { - aeron.countersReader.forEach(new MetaData() { - def accept(counterId: Int, typeId: Int, keyBuffer: DirectBuffer, label: String): Unit = { - val value = aeron.countersReader().getCounterValue(counterId) - log.debug("Aeron Counter {}: {} {}]", counterId, value, label) - } - }) - } - } - } - - private def runInboundStreams(): Unit = { - runInboundControlStream() - runInboundOrdinaryMessagesStream() - - if (largeMessageChannelEnabled) { - runInboundLargeMessagesStream() - } - } - - private def runInboundControlStream(): Unit = { - if (isShutdown) throw ShuttingDown - val (resourceLife, ctrl, completed) = - aeronSource(controlStreamId, envelopeBufferPool) - .via(inboundFlow(settings, NoInboundCompressions)) - .toMat(inboundControlSink)({ case (a, (c, d)) ⇒ (a, c, d) }) - .run()(controlMaterializer) - + protected def attachControlMessageObserver(ctrl: ControlMessageSubject): Unit = { controlSubject = ctrl - controlSubject.attach(new ControlMessageObserver { override def notify(inboundEnvelope: InboundEnvelope): Unit = { try { @@ -776,87 +551,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } }) - updateStreamMatValues(controlStreamId, resourceLife, completed) - attachStreamRestart("Inbound control stream", completed, () ⇒ runInboundControlStream()) } - private def runInboundOrdinaryMessagesStream(): Unit = { - if (isShutdown) throw ShuttingDown - - val (resourceLife, inboundCompressionAccesses, completed) = - if (inboundLanes == 1) { - aeronSource(ordinaryStreamId, envelopeBufferPool) - .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) - .toMat(inboundSink(envelopeBufferPool))({ case ((a, b), c) ⇒ (a, b, c) }) - .run()(materializer) - - } else { - val hubKillSwitch = KillSwitches.shared("hubKillSwitch") - val source: Source[InboundEnvelope, (ResourceLifecycle, InboundCompressionAccess)] = - aeronSource(ordinaryStreamId, envelopeBufferPool) - .via(hubKillSwitch.flow) - .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) - .via(Flow.fromGraph(new DuplicateHandshakeReq(inboundLanes, this, system, envelopeBufferPool))) - - // Select lane based on destination to preserve message order, - // Also include the uid of the sending system in the hash to spread - // "hot" destinations, e.g. ActorSelection anchor. - val partitioner: InboundEnvelope ⇒ Int = env ⇒ { - env.recipient match { - case OptionVal.Some(r) ⇒ - val a = r.path.uid - val b = env.originUid - val hashA = 23 + a - val hash: Int = 23 * hashA + java.lang.Long.hashCode(b) - math.abs(hash) % inboundLanes - case OptionVal.None ⇒ - // the lane is set by the DuplicateHandshakeReq stage, otherwise 0 - env.lane - } - } - - val (resourceLife, compressionAccess, hub) = - source - .toMat(Sink.fromGraph(new FixedSizePartitionHub[InboundEnvelope](partitioner, inboundLanes, - settings.Advanced.InboundHubBufferSize)))({ case ((a, b), c) ⇒ (a, b, c) }) - .run()(materializer) - - val lane = inboundSink(envelopeBufferPool) - val completedValues: Vector[Future[Done]] = - (0 until inboundLanes).map { _ ⇒ - hub.toMat(lane)(Keep.right).run()(materializer) - }(collection.breakOut) - - import system.dispatcher - - // tear down the upstream hub part if downstream lane fails - // lanes are not completed with success by themselves so we don't have to care about onSuccess - Future.firstCompletedOf(completedValues).failed.foreach { reason ⇒ hubKillSwitch.abort(reason) } - - val allCompleted = Future.sequence(completedValues).map(_ ⇒ Done) - - (resourceLife, compressionAccess, allCompleted) - } - - _inboundCompressionAccess = OptionVal(inboundCompressionAccesses) - - updateStreamMatValues(ordinaryStreamId, resourceLife, completed) - attachStreamRestart("Inbound message stream", completed, () ⇒ runInboundOrdinaryMessagesStream()) - } - - private def runInboundLargeMessagesStream(): Unit = { - if (isShutdown) throw ShuttingDown - - val (resourceLife, completed) = aeronSource(largeStreamId, largeEnvelopeBufferPool) - .via(inboundLargeFlow(settings)) - .toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both) - .run()(materializer) - - updateStreamMatValues(largeStreamId, resourceLife, completed) - attachStreamRestart("Inbound large message stream", completed, () ⇒ runInboundLargeMessagesStream()) - } - - private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { + protected def attachInboundStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { implicit val ec = materializer.executionContext streamCompleted.failed.foreach { case ShutdownSignal ⇒ // shutdown as expected @@ -904,26 +601,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R killSwitch.abort(ShutdownSignal) topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) for { - _ ← streamsCompleted - _ ← taskRunner.stop() + _ ← streamsCompleted.recover { case _ ⇒ Done } + _ ← shutdownTransport().recover { case _ ⇒ Done } } yield { - topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) - // no need to explicitly shut down the contained access since it's lifecycle is bound to the Decoder _inboundCompressionAccess = OptionVal.None - if (aeronErrorLogTask != null) { - aeronErrorLogTask.cancel() - topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) - } - if (aeron != null) aeron.close() - if (areonErrorLog != null) areonErrorLog.close() - if (mediaDriver.get.isDefined) { - stopMediaDriver() - - } topLevelFREvents.loFreq(Transport_FlightRecorderClose, NoMetaData) - flightRecorder.foreach(_.close()) afrFileChannel.foreach(_.force(true)) afrFileChannel.foreach(_.close()) @@ -931,12 +615,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } } - private def updateStreamMatValues(streamId: Int, aeronSourceLifecycle: AeronSource.ResourceLifecycle, completed: Future[Done]): Unit = { - implicit val ec = materializer.executionContext - updateStreamMatValues(streamId, InboundStreamMatValues(aeronSourceLifecycle, completed.recover { case _ ⇒ Done })) - } + protected def shutdownTransport(): Future[Done] - @tailrec private def updateStreamMatValues(streamId: Int, values: InboundStreamMatValues): Unit = { + @tailrec final protected def updateStreamMatValues(streamId: Int, values: InboundStreamMatValues): Unit = { val prev = streamMatValues.get() if (!streamMatValues.compareAndSet(prev, prev + (streamId → values))) { updateStreamMatValues(streamId, values) @@ -1024,43 +705,36 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } def outboundLarge(outboundContext: OutboundContext): Sink[OutboundEnvelope, Future[Done]] = - createOutboundSink(largeStreamId, outboundContext, largeEnvelopeBufferPool) + createOutboundSink(LargeStreamId, outboundContext, largeEnvelopeBufferPool) .mapMaterializedValue { case (_, d) ⇒ d } def outbound(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundCompressionAccess, Future[Done])] = - createOutboundSink(ordinaryStreamId, outboundContext, envelopeBufferPool) + createOutboundSink(OrdinaryStreamId, outboundContext, envelopeBufferPool) private def createOutboundSink(streamId: Int, outboundContext: OutboundContext, bufferPool: EnvelopeBufferPool): Sink[OutboundEnvelope, (OutboundCompressionAccess, Future[Done])] = { - outboundLane(outboundContext, bufferPool) - .toMat(aeronSink(outboundContext, streamId, bufferPool))(Keep.both) + outboundLane(outboundContext, bufferPool, streamId) + .toMat(outboundTransportSink(outboundContext, streamId, bufferPool))(Keep.both) } - def aeronSink(outboundContext: OutboundContext): Sink[EnvelopeBuffer, Future[Done]] = - aeronSink(outboundContext, ordinaryStreamId, envelopeBufferPool) + def outboundTransportSink(outboundContext: OutboundContext): Sink[EnvelopeBuffer, Future[Done]] = + outboundTransportSink(outboundContext, OrdinaryStreamId, envelopeBufferPool) - private def aeronSink(outboundContext: OutboundContext, streamId: Int, - bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = { - - val giveUpAfter = - if (streamId == controlStreamId) settings.Advanced.GiveUpSystemMessageAfter - else settings.Advanced.GiveUpMessageAfter - Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, - bufferPool, giveUpAfter, createFlightRecorderEventSink())) - } + protected def outboundTransportSink(outboundContext: OutboundContext, streamId: Int, + bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] def outboundLane(outboundContext: OutboundContext): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] = - outboundLane(outboundContext, envelopeBufferPool) + outboundLane(outboundContext, envelopeBufferPool, OrdinaryStreamId) private def outboundLane( outboundContext: OutboundContext, - bufferPool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] = { + bufferPool: EnvelopeBufferPool, streamId: Int): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] = { Flow.fromGraph(killSwitch.flow[OutboundEnvelope]) .via(new OutboundHandshake(system, outboundContext, outboundEnvelopePool, settings.Advanced.HandshakeTimeout, settings.Advanced.HandshakeRetryInterval, settings.Advanced.InjectHandshakeInterval)) - .viaMat(createEncoder(bufferPool))(Keep.right) + .viaMat(createEncoder(bufferPool, streamId))(Keep.right) } def outboundControl(outboundContext: OutboundContext): Sink[OutboundEnvelope, (OutboundControlIngress, Future[Done])] = { @@ -1073,24 +747,21 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // note that System messages must not be dropped before the SystemMessageDelivery stage .via(outboundTestFlow(outboundContext)) .viaMat(new OutboundControlJunction(outboundContext, outboundEnvelopePool))(Keep.right) - .via(createEncoder(envelopeBufferPool)) - .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, - envelopeBufferPool, Duration.Inf, createFlightRecorderEventSink()))(Keep.both) + .via(createEncoder(envelopeBufferPool, ControlStreamId)) + .toMat(outboundTransportSink(outboundContext, ControlStreamId, envelopeBufferPool))(Keep.both) // TODO we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } - def createEncoder(pool: EnvelopeBufferPool): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] = - Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool, settings.LogSend)) + def createEncoder(pool: EnvelopeBufferPool, streamId: Int): Flow[OutboundEnvelope, EnvelopeBuffer, OutboundCompressionAccess] = + Flow.fromGraph(new Encoder(localAddress, system, outboundEnvelopePool, pool, streamId, settings.LogSend, + settings.Version)) - def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, AeronSource.ResourceLifecycle] = - Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, - createFlightRecorderEventSink(), aeronSourceSpinningStrategy)) + def createDecoder(settings: ArterySettings, compressions: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] = + Flow.fromGraph(new Decoder(this, system, localAddress, settings, compressions, inboundEnvelopePool)) - private def aeronSourceSpinningStrategy: Int = - if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365 - settings.Advanced.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels - else 50 * settings.Advanced.IdleCpuLevel - 240 + def createDeserializer(bufferPool: EnvelopeBufferPool): Flow[InboundEnvelope, InboundEnvelope, NotUsed] = + Flow.fromGraph(new Deserializer(this, system, bufferPool)) val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ messageDispatcher.dispatch(m) @@ -1100,12 +771,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } } - def createDecoder(settings: ArterySettings, compressions: InboundCompressions, bufferPool: EnvelopeBufferPool): Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] = - Flow.fromGraph(new Decoder(this, system, localAddress, settings, bufferPool, compressions, inboundEnvelopePool)) - - def createDeserializer(bufferPool: EnvelopeBufferPool): Flow[InboundEnvelope, InboundEnvelope, NotUsed] = - Flow.fromGraph(new Deserializer(this, system, bufferPool)) - // Checks for termination hint messages and sends an ACK for those (not processing them further) // Purpose of this stage is flushing, the sender can wait for the ACKs up to try flushing // pending messages. @@ -1135,15 +800,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def inboundFlow(settings: ArterySettings, compressions: InboundCompressions): Flow[EnvelopeBuffer, InboundEnvelope, InboundCompressionAccess] = { Flow[EnvelopeBuffer] .via(killSwitch.flow) - .viaMat(createDecoder(settings, compressions, envelopeBufferPool))(Keep.right) + .viaMat(createDecoder(settings, compressions))(Keep.right) } // large messages flow does not use compressions, since the message size dominates the size anyway - def inboundLargeFlow(settings: ArterySettings): Flow[EnvelopeBuffer, InboundEnvelope, NotUsed] = { - Flow[EnvelopeBuffer] - .via(killSwitch.flow) - .via(createDecoder(settings, NoInboundCompressions, largeEnvelopeBufferPool)) - } + def inboundLargeFlow(settings: ArterySettings): Flow[EnvelopeBuffer, InboundEnvelope, Any] = + inboundFlow(settings, NoInboundCompressions) def inboundControlSink: Sink[InboundEnvelope, (ControlMessageSubject, Future[Done])] = { Flow[InboundEnvelope] @@ -1191,7 +853,11 @@ private[remote] object ArteryTransport { val ProtocolName = "akka" - val Version: Byte = 0 + // Note that the used version of the header format for outbound messages is defined in + // `ArterySettings.Version` because that may depend on configuration settings. + // This is the highest supported version on receiving (decoding) side. + // ArterySettings.Version can be lower than this HighestVersion to support rolling upgrades. + val HighestVersion: Byte = 0 class AeronTerminated(e: Throwable) extends RuntimeException(e) @@ -1201,7 +867,7 @@ private[remote] object ArteryTransport { object ShuttingDown extends RuntimeException with NoStackTrace final case class InboundStreamMatValues( - aeronSourceLifecycle: AeronSource.ResourceLifecycle, + aeronSourceLifecycle: Option[AeronSource.ResourceLifecycle], completed: Future[Done]) def autoSelectPort(hostname: String): Int = { @@ -1212,4 +878,15 @@ private[remote] object ArteryTransport { port } + val ControlStreamId = 1 + val OrdinaryStreamId = 2 + val LargeStreamId = 3 + + def streamName(streamId: Int): String = + streamId match { + case ControlStreamId ⇒ "control" + case LargeStreamId ⇒ "large message" + case _ ⇒ "message" + } + } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index 99ded3a261..366addb2aa 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -24,7 +24,7 @@ import akka.pattern.after import akka.remote._ import akka.remote.DaemonMsgCreate import akka.remote.QuarantinedEvent -import akka.remote.artery.AeronSink.GaveUpMessageException +import akka.remote.artery.aeron.AeronSink.GaveUpMessageException import akka.remote.artery.ArteryTransport.{ AeronTerminated, ShuttingDown } import akka.remote.artery.Encoder.OutboundCompressionAccess import akka.remote.artery.Encoder.AccessOutboundCompressionFailed @@ -516,7 +516,7 @@ private[remote] class Association( val streamKillSwitch = KillSwitches.shared("outboundControlStreamKillSwitch") val (queueValue, (control, completed)) = - Source.fromGraph(new SendQueue[OutboundEnvelope]) + Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters)) .via(streamKillSwitch.flow) .toMat(transport.outboundControl(this))(Keep.both) .run()(materializer) @@ -529,7 +529,7 @@ private[remote] class Association( materializing.countDown() updateStreamMatValues(ControlQueueIndex, streamKillSwitch, completed) - attachStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize, + attachOutboundStreamRestart("Outbound control stream", ControlQueueIndex, controlQueueSize, completed, () ⇒ runOutboundControlStream()) } @@ -555,7 +555,7 @@ private[remote] class Association( val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch") val (queueValue, testMgmt, changeCompression, completed) = - Source.fromGraph(new SendQueue[OutboundEnvelope]) + Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters)) .via(streamKillSwitch.flow) .viaMat(transport.outboundTestFlow(this))(Keep.both) .toMat(transport.outbound(this))({ case ((a, b), (c, d)) ⇒ (a, b, c, d) }) // "keep all, exploded" @@ -568,7 +568,7 @@ private[remote] class Association( outboundCompressionAccess = Vector(changeCompression) updateStreamMatValues(OrdinaryQueueIndex, streamKillSwitch, completed) - attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize, + attachOutboundStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize, completed, () ⇒ runOutboundOrdinaryMessagesStream()) } else { @@ -582,7 +582,7 @@ private[remote] class Association( val streamKillSwitch = KillSwitches.shared("outboundMessagesKillSwitch") - val lane = Source.fromGraph(new SendQueue[OutboundEnvelope]) + val lane = Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters)) .via(streamKillSwitch.flow) .via(transport.outboundTestFlow(this)) .viaMat(transport.outboundLane(this))(Keep.both) @@ -593,9 +593,9 @@ private[remote] class Association( case ((q, c), w) ⇒ (q, c, w) } - val (mergeHub, aeronSinkCompleted) = MergeHub.source[EnvelopeBuffer] + val (mergeHub, transportSinkCompleted) = MergeHub.source[EnvelopeBuffer] .via(streamKillSwitch.flow) - .toMat(transport.aeronSink(this))(Keep.both).run()(materializer) + .toMat(transport.outboundTransportSink(this))(Keep.both).run()(materializer) val values: Vector[(SendQueue.QueueValue[OutboundEnvelope], Encoder.OutboundCompressionAccess, Future[Done])] = (0 until outboundLanes).map { _ ⇒ @@ -610,9 +610,9 @@ private[remote] class Association( Future.firstCompletedOf(laneCompletedValues).failed.foreach { reason ⇒ streamKillSwitch.abort(reason) } - (laneCompletedValues :+ aeronSinkCompleted).foreach(_.foreach { _ ⇒ streamKillSwitch.shutdown() }) + (laneCompletedValues :+ transportSinkCompleted).foreach(_.foreach { _ ⇒ streamKillSwitch.shutdown() }) - val allCompleted = Future.sequence(laneCompletedValues).flatMap(_ ⇒ aeronSinkCompleted) + val allCompleted = Future.sequence(laneCompletedValues).flatMap(_ ⇒ transportSinkCompleted) queueValues.zip(wrappers).zipWithIndex.foreach { case ((q, w), i) ⇒ @@ -623,7 +623,7 @@ private[remote] class Association( outboundCompressionAccess = compressionAccessValues - attachStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize, + attachOutboundStreamRestart("Outbound message stream", OrdinaryQueueIndex, queueSize, allCompleted, () ⇒ runOutboundOrdinaryMessagesStream()) } } @@ -637,7 +637,7 @@ private[remote] class Association( val streamKillSwitch = KillSwitches.shared("outboundLargeMessagesKillSwitch") - val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope]) + val (queueValue, completed) = Source.fromGraph(new SendQueue[OutboundEnvelope](transport.system.deadLetters)) .via(streamKillSwitch.flow) .via(transport.outboundTestFlow(this)) .toMat(transport.outboundLarge(this))(Keep.both) @@ -649,12 +649,12 @@ private[remote] class Association( queuesVisibility = true // volatile write for visibility of the queues array updateStreamMatValues(LargeQueueIndex, streamKillSwitch, completed) - attachStreamRestart("Outbound large message stream", LargeQueueIndex, largeQueueSize, + attachOutboundStreamRestart("Outbound large message stream", LargeQueueIndex, largeQueueSize, completed, () ⇒ runOutboundLargeMessagesStream()) } - private def attachStreamRestart(streamName: String, queueIndex: Int, queueCapacity: Int, - streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { + private def attachOutboundStreamRestart(streamName: String, queueIndex: Int, queueCapacity: Int, + streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { def lazyRestart(): Unit = { outboundCompressionAccess = Vector.empty @@ -668,6 +668,11 @@ private[remote] class Association( } implicit val ec = materializer.executionContext + streamCompleted.foreach { _ ⇒ + // shutdown as expected + // countDown the latch in case threads are waiting on the latch in outboundControlIngress method + materializing.countDown() + } streamCompleted.failed.foreach { case ArteryTransport.ShutdownSignal ⇒ // shutdown as expected @@ -692,7 +697,9 @@ private[remote] class Association( if (queueIndex == ControlQueueIndex) { cause match { case _: HandshakeTimeoutException ⇒ // ok, quarantine not possible without UID - case _ ⇒ quarantine("Outbound control stream restarted") + case _ ⇒ + // FIXME can we avoid quarantine if all system messages have been delivered? + quarantine("Outbound control stream restarted") } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index e019d20c36..2029ba46e7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -21,6 +21,8 @@ import akka.util.{ OptionVal, Unsafe } import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal +import akka.util.ByteStringBuilder +import java.nio.ByteOrder import akka.remote.artery.OutboundHandshake.HandshakeReq import akka.serialization.SerializerWithStringManifest @@ -47,7 +49,9 @@ private[remote] class Encoder( system: ExtendedActorSystem, outboundEnvelopePool: ObjectPool[ReusableOutboundEnvelope], bufferPool: EnvelopeBufferPool, - debugLogSend: Boolean) + streamId: Int, + debugLogSend: Boolean, + version: Byte) extends GraphStageWithMaterializedValue[FlowShape[OutboundEnvelope, EnvelopeBuffer], Encoder.OutboundCompressionAccess] { import Encoder._ @@ -59,7 +63,7 @@ private[remote] class Encoder( val logic = new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging with OutboundCompressionAccess { private val headerBuilder = HeaderBuilder.out() - headerBuilder setVersion ArteryTransport.Version + headerBuilder setVersion version headerBuilder setUid uniqueLocalAddress.uid private val localAddress = uniqueLocalAddress.address private val serialization = SerializationExtension(system) @@ -342,7 +346,6 @@ private[remote] class Decoder( system: ExtendedActorSystem, uniqueLocalAddress: UniqueAddress, settings: ArterySettings, - bufferPool: EnvelopeBufferPool, inboundCompressions: InboundCompressions, inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStageWithMaterializedValue[FlowShape[EnvelopeBuffer, InboundEnvelope], InboundCompressionAccess] { @@ -390,7 +393,7 @@ private[remote] class Decoder( } } } - override def onPush(): Unit = { + override def onPush(): Unit = try { messageCount += 1 val envelope = grab(in) headerBuilder.resetMessageFields() @@ -409,7 +412,7 @@ private[remote] class Decoder( } catch { case NonFatal(e) ⇒ // probably version mismatch due to restarted system - log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e.getMessage) + log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e) OptionVal.None } @@ -423,14 +426,14 @@ private[remote] class Decoder( } catch { case NonFatal(e) ⇒ // probably version mismatch due to restarted system - log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e.getMessage) + log.warning("Couldn't decompress sender from originUid [{}]. {}", originUid, e) OptionVal.None } val classManifestOpt = try headerBuilder.manifest(originUid) catch { case NonFatal(e) ⇒ // probably version mismatch due to restarted system - log.warning("Couldn't decompress manifest from originUid [{}]. {}", originUid, e.getMessage) + log.warning("Couldn't decompress manifest from originUid [{}]. {}", originUid, e) OptionVal.None } @@ -520,6 +523,10 @@ private[remote] class Decoder( push(out, decoded) } } + } catch { + case NonFatal(e) ⇒ + log.warning("Dropping message due to: {}", e) + pull(in) } private def resolveRecipient(path: String): OptionVal[InternalActorRef] = { @@ -638,7 +645,7 @@ private[remote] class Deserializer( } log.warning( "Failed to deserialize message from [{}] with serializer id [{}] and manifest [{}]. {}", - from, envelope.serializer, envelope.classManifest, e.getMessage) + from, envelope.serializer, envelope.classManifest, e) pull(in) } finally { val buf = envelope.envelopeBuffer diff --git a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala similarity index 89% rename from akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala rename to akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala index 5957ab684f..db766a225d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/BufferPool.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/EnvelopeBufferPool.scala @@ -37,8 +37,10 @@ private[remote] class EnvelopeBufferPool(maximumPayload: Int, maximumBuffers: In } } - def release(buffer: EnvelopeBuffer) = + def release(buffer: EnvelopeBuffer) = { + // only reuse direct buffers, e.g. not those wrapping ByteString if (buffer.byteBuffer.isDirect && !availableBuffers.offer(buffer)) buffer.tryCleanDirectByteBuffer() + } } @@ -60,6 +62,22 @@ private[remote] object ByteFlag { /** * INTERNAL API + * + * The strategy if the header format must be changed in an incompatible way is: + * - In the end we only want to support one header format, the latest, but during + * a rolling upgrade period we must support two versions in at least one Akka patch + * release. + * - When supporting two version the outbound messages must still be encoded with old + * version. The Decoder on the receiving side must understand both versions. + * - Create a new copy of the header encoding/decoding logic (issue #24553: we should refactor to make that easier). + * - Bump `ArteryTransport.HighestVersion` and keep `ArterySettings.Version` as the old version. + * - Make sure `Decoder` picks the right parsing logic based on the version field in the incoming frame. + * - Release Akka, e.g. 2.5.13 + * - Later, remove the old header parsing logic and bump the `ArterySettings.Version` to the same as + * `ArteryTransport.HighestVersion` again. + * - Release Akka, e.g. 2.5.14, and announce that all nodes in the cluster must first be on version + * 2.5.13 before upgrading to 2.5.14. That means that it is not supported to do a rolling upgrade + * from 2.5.12 directly to 2.5.14. */ private[remote] object EnvelopeBuffer { @@ -84,6 +102,7 @@ private[remote] object EnvelopeBuffer { // EITHER metadata followed by literals directly OR literals directly in this spot. // Mode depends on the `MetadataPresentFlag`. val MetadataContainerAndLiteralSectionOffset = 28 // Int + } /** INTERNAL API */ @@ -111,7 +130,8 @@ private[remote] sealed trait HeaderBuilder { def setFlags(v: Byte): Unit def flags: Byte def flag(byteFlag: ByteFlag): Boolean - def setFlag(byteFlag: ByteFlag, value: Boolean): Unit + def setFlag(byteFlag: ByteFlag): Unit + def clearFlag(byteFlag: ByteFlag): Unit def inboundActorRefCompressionTableVersion: Byte def inboundClassManifestCompressionTableVersion: Byte @@ -218,6 +238,10 @@ private[remote] final class HeaderBuilderImpl( var _remoteInstruments: OptionVal[RemoteInstruments] = OptionVal.None override def resetMessageFields(): Unit = { + // some fields must not be reset because they are set only once from the Encoder, + // which owns the HeaderBuilder instance. Those are never changed. + // version, uid, streamId + _flags = 0 _senderActorRef = null _senderActorRefIdx = -1 @@ -237,9 +261,10 @@ private[remote] final class HeaderBuilderImpl( override def setFlags(v: Byte) = _flags = v override def flags = _flags override def flag(byteFlag: ByteFlag): Boolean = (_flags.toInt & byteFlag.mask) != 0 - override def setFlag(byteFlag: ByteFlag, value: Boolean): Unit = - if (value) _flags = (flags | byteFlag.mask).toByte - else _flags = (flags & ~byteFlag.mask).toByte + override def setFlag(byteFlag: ByteFlag): Unit = + _flags = (flags | byteFlag.mask).toByte + override def clearFlag(byteFlag: ByteFlag): Unit = + _flags = (flags & ~byteFlag.mask).toByte override def setUid(uid: Long) = _uid = uid override def uid: Long = _uid @@ -366,6 +391,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { // Write fixed length parts byteBuffer.put(VersionOffset, header.version) + byteBuffer.put(FlagsOffset, header.flags) // compression table version numbers byteBuffer.put(ActorRefCompressionTableVersionOffset, header.outboundActorRefCompression.version) @@ -380,7 +406,7 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { header._remoteInstruments.get.serialize(OptionVal(oe), byteBuffer) if (byteBuffer.position() != MetadataContainerAndLiteralSectionOffset) { // we actually wrote some metadata so update the flag field to reflect that - header.setFlag(MetadataPresentFlag, true) + header.setFlag(MetadataPresentFlag) byteBuffer.put(FlagsOffset, header.flags) } } @@ -409,6 +435,12 @@ private[remote] final class EnvelopeBuffer(val byteBuffer: ByteBuffer) { // Read fixed length parts header.setVersion(byteBuffer.get(VersionOffset)) + + if (header.version > ArteryTransport.HighestVersion) + throw new IllegalArgumentException( + s"Incompatible protocol version [${header.version}], " + + s"highest known version for this node is [${ArteryTransport.HighestVersion}]") + header.setFlags(byteBuffer.get(FlagsOffset)) // compression table versions (stored in the Tag) header._inboundActorRefCompressionTableVersion = byteBuffer.get(ActorRefCompressionTableVersionOffset) diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala index 823e877ff7..fa19d74905 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala @@ -20,7 +20,9 @@ import scala.annotation.tailrec */ private[remote] trait EventSink { def alert(code: Int, metadata: Array[Byte]): Unit + def alert(code: Int, metadata: String): Unit def loFreq(code: Int, metadata: Array[Byte]): Unit + def loFreq(code: Int, metadata: String): Unit def hiFreq(code: Long, param: Long): Unit def flushHiFreqBatch(): Unit @@ -31,7 +33,9 @@ private[remote] trait EventSink { */ private[remote] object IgnoreEventSink extends EventSink { override def alert(code: Int, metadata: Array[Byte]): Unit = () + override def alert(code: Int, metadata: String): Unit = () override def loFreq(code: Int, metadata: Array[Byte]): Unit = () + override def loFreq(code: Int, metadata: String): Unit = () override def flushHiFreqBatch(): Unit = () override def hiFreq(code: Long, param: Long): Unit = () } @@ -44,10 +48,18 @@ private[remote] class SynchronizedEventSink(delegate: EventSink) extends EventSi delegate.alert(code, metadata) } + override def alert(code: Int, metadata: String): Unit = { + alert(code, metadata.getBytes("US-ASCII")) + } + override def loFreq(code: Int, metadata: Array[Byte]): Unit = synchronized { delegate.loFreq(code, metadata) } + override def loFreq(code: Int, metadata: String): Unit = { + loFreq(code, metadata.getBytes("US-ASCII")) + } + override def flushHiFreqBatch(): Unit = synchronized { delegate.flushHiFreqBatch() } @@ -371,6 +383,10 @@ private[remote] class FlightRecorder(val fileChannel: FileChannel) extends Atomi } } + override def alert(code: Int, metadata: String): Unit = { + alert(code, metadata.getBytes("US-ASCII")) + } + override def loFreq(code: Int, metadata: Array[Byte]): Unit = { val status = FlightRecorder.this.get if (status eq Running) { @@ -380,6 +396,10 @@ private[remote] class FlightRecorder(val fileChannel: FileChannel) extends Atomi } } + override def loFreq(code: Int, metadata: String): Unit = { + loFreq(code, metadata.getBytes("US-ASCII")) + } + private def prepareRichRecord(recordBuffer: ByteBuffer, code: Int, metadata: Array[Byte]): Unit = { recordBuffer.clear() // TODO: This is a bit overkill, needs some smarter scheme later, no need to always store the wallclock diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala index c1d585fd3c..2475d64ba6 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala @@ -11,7 +11,7 @@ private[remote] object FlightRecorderEvents { // Top level remoting events val Transport_MediaDriverStarted = 0 - val Transport_AeronStarted = 1 + val Transport_Started = 1 val Transport_AeronErrorLogStarted = 2 val Transport_TaskRunnerStarted = 3 val Transport_UniqueAddressSet = 4 @@ -54,7 +54,7 @@ private[remote] object FlightRecorderEvents { // Used for presentation of the entries in the flight recorder lazy val eventDictionary = Map( Transport_MediaDriverStarted → "Transport: Media driver started", - Transport_AeronStarted → "Transport: Aeron started", + Transport_Started → "Transport: started", Transport_AeronErrorLogStarted → "Transport: Aeron error log started", Transport_TaskRunnerStarted → "Transport: Task runner started", Transport_UniqueAddressSet → "Transport: Unique address set", @@ -92,7 +92,8 @@ private[remote] object FlightRecorderEvents { Compression_CompressedManifest → "Compression: Compressed manifest", Compression_AllocatedManifestCompressionId → "Compression: Allocated manifest compression id", Compression_Inbound_RunActorRefAdvertisement → "InboundCompression: Run class manifest compression advertisement", - Compression_Inbound_RunClassManifestAdvertisement → "InboundCompression: Run class manifest compression advertisement") - .map { case (int, str) ⇒ int.toLong → str } + Compression_Inbound_RunClassManifestAdvertisement → "InboundCompression: Run class manifest compression advertisement" + + ).map { case (int, str) ⇒ int.toLong → str } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 87eba26ed1..94f9687375 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -67,6 +67,10 @@ private[remote] class OutboundHandshake( private var pendingMessage: OutboundEnvelope = null private var injectHandshakeTickScheduled = false + override def preStart(): Unit = { + scheduleOnce(HandshakeTimeout, timeout) + } + // InHandler override def onPush(): Unit = { if (handshakeState != Completed) @@ -96,11 +100,10 @@ private[remote] class OutboundHandshake( case Start ⇒ val uniqueRemoteAddress = outboundContext.associationState.uniqueRemoteAddress if (uniqueRemoteAddress.isCompleted) { - handshakeState = Completed + handshakeCompleted() } else { // will pull when handshake reply is received (uniqueRemoteAddress completed) handshakeState = ReqInProgress - scheduleOnce(HandshakeTimeout, timeout) schedulePeriodically(HandshakeRetryTick, retryInterval) // The InboundHandshake stage will complete the uniqueRemoteAddress future diff --git a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala index 0ff81be065..6ffe3472a5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SendQueue.scala @@ -4,6 +4,7 @@ package akka.remote.artery import java.util.Queue + import akka.stream.stage.GraphStage import akka.stream.stage.OutHandler import akka.stream.Attributes @@ -15,12 +16,15 @@ import akka.stream.stage.GraphStageWithMaterializedValue import org.agrona.concurrent.ManyToOneConcurrentLinkedQueueTail import org.agrona.concurrent.ManyToOneConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger + import scala.annotation.tailrec import scala.concurrent.Promise import scala.util.Try import scala.util.Success import scala.util.Failure +import akka.actor.ActorRef + /** * INTERNAL API */ @@ -43,7 +47,7 @@ private[remote] object SendQueue { /** * INTERNAL API */ -private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] { +private[remote] final class SendQueue[T](deadLetters: ActorRef) extends GraphStageWithMaterializedValue[SourceShape[T], SendQueue.QueueValue[T]] { import SendQueue._ val out: Outlet[T] = Outlet("SendQueue.out") @@ -102,8 +106,14 @@ private[remote] final class SendQueue[T] extends GraphStageWithMaterializedValue override def postStop(): Unit = { // TODO quarantine will currently always be done when control stream is terminated, see issue #21359 - if (consumerQueue ne null) + if (consumerQueue ne null) { + var msg = consumerQueue.poll() + while (msg != null) { + deadLetters ! msg + msg = consumerQueue.poll() + } consumerQueue.clear() + } super.postStop() } diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSink.scala similarity index 98% rename from akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala rename to akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSink.scala index 3c71a516f2..2aab5af006 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSink.scala @@ -2,6 +2,7 @@ * Copyright (C) 2016-2018 Lightbend Inc. */ package akka.remote.artery +package aeron import akka.util.PrettyDuration.PrettyPrintableDuration import java.nio.ByteBuffer @@ -200,7 +201,7 @@ private[remote] class AeronSink( private def onGiveUp(): Unit = { offerTaskInProgress = false val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.") - flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.getMessage.getBytes("US-ASCII")) + flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.toString.getBytes("US-ASCII")) completedValue = Failure(cause) failStage(cause) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSource.scala similarity index 99% rename from akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala rename to akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSource.scala index 8011d0f2e2..3a91d9d286 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSource.scala @@ -2,6 +2,7 @@ * Copyright (C) 2016-2018 Lightbend Inc. */ package akka.remote.artery +package aeron import scala.annotation.tailrec import akka.stream.Attributes @@ -117,7 +118,7 @@ private[remote] class AeronSource( try sub.close() catch { case e: DriverTimeoutException ⇒ // media driver was shutdown - log.debug("DriverTimeout when closing subscription. {}", e.getMessage) + log.debug("DriverTimeout when closing subscription. {}", e) } finally flightRecorder.loFreq(AeronSource_Stopped, channelMetadata) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala new file mode 100644 index 0000000000..0621229f6d --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala @@ -0,0 +1,411 @@ +/** + * Copyright (C) 2016-2017 Lightbend Inc. + */ +package akka.remote.artery +package aeron + +import java.io.File +import java.util.UUID +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.AtomicReference + +import scala.annotation.tailrec +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.control.NonFatal + +import akka.Done +import akka.actor.Address +import akka.actor.Cancellable +import akka.actor.ExtendedActorSystem +import akka.event.Logging +import akka.remote.RemoteActorRefProvider +import akka.remote.RemoteTransportException +import akka.remote.artery.compress._ +import akka.stream.KillSwitches +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.util.OptionVal +import io.aeron.Aeron +import io.aeron.AvailableImageHandler +import io.aeron.CncFileDescriptor +import io.aeron.CommonContext +import io.aeron.Image +import io.aeron.UnavailableImageHandler +import io.aeron.driver.MediaDriver +import io.aeron.driver.ThreadingMode +import io.aeron.exceptions.ConductorServiceTimeoutException +import io.aeron.exceptions.DriverTimeoutException +import io.aeron.status.ChannelEndpointStatus +import org.agrona.DirectBuffer +import org.agrona.ErrorHandler +import org.agrona.IoUtil +import org.agrona.collections.IntObjConsumer +import org.agrona.concurrent.BackoffIdleStrategy +import org.agrona.concurrent.status.CountersReader.MetaData + +/** + * INTERNAL API + */ +private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) + extends ArteryTransport(_system, _provider) { + import AeronSource.ResourceLifecycle + import ArteryTransport._ + import Decoder.InboundCompressionAccess + import FlightRecorderEvents._ + + private[this] val mediaDriver = new AtomicReference[Option[MediaDriver]](None) + @volatile private[this] var aeron: Aeron = _ + @volatile private[this] var aeronCounterTask: Cancellable = _ + @volatile private[this] var aeronErrorLogTask: Cancellable = _ + @volatile private[this] var aeronErrorLog: AeronErrorLog = _ + + private val taskRunner = new TaskRunner(system, settings.Advanced.IdleCpuLevel) + + private def inboundChannel = s"aeron:udp?endpoint=${bindAddress.address.host.get}:${bindAddress.address.port.get}" + private def outboundChannel(a: Address) = s"aeron:udp?endpoint=${a.host.get}:${a.port.get}" + + override protected def startTransport(): Unit = { + startMediaDriver() + startAeron() + startAeronErrorLog() + topLevelFREvents.loFreq(Transport_AeronErrorLogStarted, NoMetaData) + if (settings.LogAeronCounters) { + startAeronCounterLog() + } + taskRunner.start() + topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData) + } + + private def startMediaDriver(): Unit = { + if (settings.Advanced.EmbeddedMediaDriver) { + val driverContext = new MediaDriver.Context + if (settings.Advanced.AeronDirectoryName.nonEmpty) { + driverContext.aeronDirectoryName(settings.Advanced.AeronDirectoryName) + } else { + // create a random name but include the actor system name for easier debugging + val uniquePart = UUID.randomUUID().toString + val randomName = s"${CommonContext.AERON_DIR_PROP_DEFAULT}-${system.name}-$uniquePart" + driverContext.aeronDirectoryName(randomName) + } + driverContext.clientLivenessTimeoutNs(settings.Advanced.ClientLivenessTimeout.toNanos) + driverContext.imageLivenessTimeoutNs(settings.Advanced.ImageLivenessTimeout.toNanos) + driverContext.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis) + + val idleCpuLevel = settings.Advanced.IdleCpuLevel + if (idleCpuLevel == 10) { + driverContext + .threadingMode(ThreadingMode.DEDICATED) + .conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1)) + .receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + .senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + } else if (idleCpuLevel == 1) { + driverContext + .threadingMode(ThreadingMode.SHARED) + .sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + } else if (idleCpuLevel <= 7) { + driverContext + .threadingMode(ThreadingMode.SHARED_NETWORK) + .sharedNetworkIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + } else { + driverContext + .threadingMode(ThreadingMode.DEDICATED) + .receiverIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + .senderIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + } + + val driver = MediaDriver.launchEmbedded(driverContext) + log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) + topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName()) + if (!mediaDriver.compareAndSet(None, Some(driver))) { + throw new IllegalStateException("media driver started more than once") + } + } + } + + private def aeronDir: String = mediaDriver.get match { + case Some(driver) ⇒ driver.aeronDirectoryName + case None ⇒ settings.Advanced.AeronDirectoryName + } + + private def stopMediaDriver(): Unit = { + // make sure we only close the driver once or we will crash the JVM + val maybeDriver = mediaDriver.getAndSet(None) + maybeDriver.foreach { driver ⇒ + // this is only for embedded media driver + try driver.close() catch { + case NonFatal(e) ⇒ + // don't think driver.close will ever throw, but just in case + log.warning("Couldn't close Aeron embedded media driver due to [{}]", e) + } + + try { + if (settings.Advanced.DeleteAeronDirectory) { + IoUtil.delete(new File(driver.aeronDirectoryName), false) + topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData) + } + } catch { + case NonFatal(e) ⇒ + log.warning( + "Couldn't delete Aeron embedded media driver files in [{}] due to [{}]", + driver.aeronDirectoryName, e) + } + } + } + + // TODO: Add FR events + private def startAeron(): Unit = { + val ctx = new Aeron.Context + + ctx.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis) + + ctx.availableImageHandler(new AvailableImageHandler { + override def onAvailableImage(img: Image): Unit = { + if (log.isDebugEnabled) + log.debug(s"onAvailableImage from ${img.sourceIdentity} session ${img.sessionId}") + } + }) + ctx.unavailableImageHandler(new UnavailableImageHandler { + override def onUnavailableImage(img: Image): Unit = { + if (log.isDebugEnabled) + log.debug(s"onUnavailableImage from ${img.sourceIdentity} session ${img.sessionId}") + + // freeSessionBuffer in AeronSource FragmentAssembler + streamMatValues.get.valuesIterator.foreach { + case InboundStreamMatValues(resourceLife, _) ⇒ + resourceLife.foreach(_.onUnavailableImage(img.sessionId)) + } + } + }) + + ctx.errorHandler(new ErrorHandler { + private val fatalErrorOccured = new AtomicBoolean + + override def onError(cause: Throwable): Unit = { + cause match { + case e: ConductorServiceTimeoutException ⇒ handleFatalError(e) + case e: DriverTimeoutException ⇒ handleFatalError(e) + case _: AeronTerminated ⇒ // already handled, via handleFatalError + case _ ⇒ + log.error(cause, s"Aeron error, $cause") + } + } + + private def handleFatalError(cause: Throwable): Unit = { + if (fatalErrorOccured.compareAndSet(false, true)) { + if (!isShutdown) { + log.error(cause, "Fatal Aeron error {}. Have to terminate ActorSystem because it lost contact with the " + + "{} Aeron media driver. Possible configuration properties to mitigate the problem are " + + "'client-liveness-timeout' or 'driver-timeout'. {}", + Logging.simpleName(cause), + if (settings.Advanced.EmbeddedMediaDriver) "embedded" else "external", + cause) + taskRunner.stop() + aeronErrorLogTask.cancel() + if (settings.LogAeronCounters) aeronCounterTask.cancel() + system.terminate() + throw new AeronTerminated(cause) + } + } else + throw new AeronTerminated(cause) + } + }) + + ctx.aeronDirectoryName(aeronDir) + aeron = Aeron.connect(ctx) + } + + private def blockUntilChannelActive(): Unit = { + val counterIdForInboundChannel = findCounterId(s"rcv-channel: $inboundChannel") + val waitInterval = 200 + val retries = math.max(1, settings.Bind.BindTimeout.toMillis / waitInterval) + retry(retries) + + @tailrec def retry(retries: Long): Unit = { + val status = aeron.countersReader().getCounterValue(counterIdForInboundChannel) + if (status == ChannelEndpointStatus.ACTIVE) { + log.debug("Inbound channel is now active") + } else if (status == ChannelEndpointStatus.ERRORED) { + aeronErrorLog.logErrors(log, 0L) + stopMediaDriver() + throw new RemoteTransportException("Inbound Aeron channel is in errored state. See Aeron logs for details.") + } else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) { + Thread.sleep(waitInterval) + retry(retries - 1) + } else { + aeronErrorLog.logErrors(log, 0L) + stopMediaDriver() + throw new RemoteTransportException("Timed out waiting for Aeron transport to bind. See Aeoron logs.") + } + } + } + + private def findCounterId(label: String): Int = { + var counterId = -1 + aeron.countersReader().forEach(new IntObjConsumer[String] { + def accept(i: Int, l: String): Unit = { + if (label == l) + counterId = i + } + }) + if (counterId == -1) { + throw new RuntimeException(s"Unable to found counterId for label: $label") + } else { + counterId + } + } + + // TODO Add FR Events + private def startAeronErrorLog(): Unit = { + aeronErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE), log) + val lastTimestamp = new AtomicLong(0L) + import system.dispatcher + aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) { + if (!isShutdown) { + val newLastTimestamp = aeronErrorLog.logErrors(log, lastTimestamp.get) + lastTimestamp.set(newLastTimestamp + 1) + } + } + } + + private def startAeronCounterLog(): Unit = { + import system.dispatcher + aeronCounterTask = system.scheduler.schedule(5.seconds, 5.seconds) { + if (!isShutdown && log.isDebugEnabled) { + aeron.countersReader.forEach(new MetaData() { + def accept(counterId: Int, typeId: Int, keyBuffer: DirectBuffer, label: String): Unit = { + val value = aeron.countersReader().getCounterValue(counterId) + log.debug("Aeron Counter {}: {} {}]", counterId, value, label) + } + }) + } + } + } + + override protected def outboundTransportSink(outboundContext: OutboundContext, streamId: Int, + bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = { + val giveUpAfter = + if (streamId == ControlStreamId) settings.Advanced.GiveUpSystemMessageAfter + else settings.Advanced.GiveUpMessageAfter + Sink.fromGraph(new AeronSink(outboundChannel(outboundContext.remoteAddress), streamId, aeron, taskRunner, + bufferPool, giveUpAfter, createFlightRecorderEventSink())) + } + + private def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, AeronSource.ResourceLifecycle] = + Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, + createFlightRecorderEventSink(), aeronSourceSpinningStrategy)) + + private def aeronSourceSpinningStrategy: Int = + if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365 + settings.Advanced.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels + else 50 * settings.Advanced.IdleCpuLevel - 240 + + override protected def runInboundStreams(): Unit = { + runInboundControlStream() + runInboundOrdinaryMessagesStream() + + if (largeMessageChannelEnabled) { + runInboundLargeMessagesStream() + } + blockUntilChannelActive() + } + + private def runInboundControlStream(): Unit = { + if (isShutdown) throw ShuttingDown + val (resourceLife, ctrl, completed) = + aeronSource(ControlStreamId, envelopeBufferPool) + .via(inboundFlow(settings, NoInboundCompressions)) + .toMat(inboundControlSink)({ case (a, (c, d)) ⇒ (a, c, d) }) + .run()(controlMaterializer) + + attachControlMessageObserver(ctrl) + + updateStreamMatValues(ControlStreamId, resourceLife, completed) + attachInboundStreamRestart("Inbound control stream", completed, () ⇒ runInboundControlStream()) + } + + private def runInboundOrdinaryMessagesStream(): Unit = { + if (isShutdown) throw ShuttingDown + + val (resourceLife, inboundCompressionAccess, completed) = + if (inboundLanes == 1) { + aeronSource(OrdinaryStreamId, envelopeBufferPool) + .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) + .toMat(inboundSink(envelopeBufferPool))({ case ((a, b), c) ⇒ (a, b, c) }) + .run()(materializer) + + } else { + val laneKillSwitch = KillSwitches.shared("laneKillSwitch") + val laneSource: Source[InboundEnvelope, (ResourceLifecycle, InboundCompressionAccess)] = + aeronSource(OrdinaryStreamId, envelopeBufferPool) + .via(laneKillSwitch.flow) + .viaMat(inboundFlow(settings, _inboundCompressions))(Keep.both) + .via(Flow.fromGraph(new DuplicateHandshakeReq(inboundLanes, this, system, envelopeBufferPool))) + + val (resourceLife, compressionAccess, laneHub) = + laneSource + .toMat(Sink.fromGraph(new FixedSizePartitionHub[InboundEnvelope](inboundLanePartitioner, inboundLanes, + settings.Advanced.InboundHubBufferSize)))({ case ((a, b), c) ⇒ (a, b, c) }) + .run()(materializer) + + val lane = inboundSink(envelopeBufferPool) + val completedValues: Vector[Future[Done]] = + (0 until inboundLanes).map { _ ⇒ + laneHub.toMat(lane)(Keep.right).run()(materializer) + }(collection.breakOut) + + import system.dispatcher + + // tear down the upstream hub part if downstream lane fails + // lanes are not completed with success by themselves so we don't have to care about onSuccess + Future.firstCompletedOf(completedValues).failed.foreach { reason ⇒ laneKillSwitch.abort(reason) } + val allCompleted = Future.sequence(completedValues).map(_ ⇒ Done) + + (resourceLife, compressionAccess, allCompleted) + } + + setInboundCompressionAccess(inboundCompressionAccess) + + updateStreamMatValues(OrdinaryStreamId, resourceLife, completed) + attachInboundStreamRestart("Inbound message stream", completed, () ⇒ runInboundOrdinaryMessagesStream()) + } + + private def runInboundLargeMessagesStream(): Unit = { + if (isShutdown) throw ShuttingDown + + val (resourceLife, completed) = aeronSource(LargeStreamId, largeEnvelopeBufferPool) + .via(inboundLargeFlow(settings)) + .toMat(inboundSink(largeEnvelopeBufferPool))(Keep.both) + .run()(materializer) + + updateStreamMatValues(LargeStreamId, resourceLife, completed) + attachInboundStreamRestart("Inbound large message stream", completed, () ⇒ runInboundLargeMessagesStream()) + } + + private def updateStreamMatValues(streamId: Int, aeronSourceLifecycle: AeronSource.ResourceLifecycle, completed: Future[Done]): Unit = { + implicit val ec = materializer.executionContext + updateStreamMatValues(streamId, InboundStreamMatValues( + Some(aeronSourceLifecycle), + completed.recover { case _ ⇒ Done })) + } + + override protected def shutdownTransport(): Future[Done] = { + import system.dispatcher + taskRunner.stop().map { _ ⇒ + topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) + if (aeronErrorLogTask != null) { + aeronErrorLogTask.cancel() + topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) + } + if (aeron != null) aeron.close() + if (aeronErrorLog != null) aeronErrorLog.close() + if (mediaDriver.get.isDefined) stopMediaDriver() + + Done + } + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/TaskRunner.scala similarity index 99% rename from akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala rename to akka-remote/src/main/scala/akka/remote/artery/aeron/TaskRunner.scala index cb735d077b..ed4139f478 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/TaskRunner.scala @@ -2,6 +2,7 @@ * Copyright (C) 2016-2018 Lightbend Inc. */ package akka.remote.artery +package aeron import java.util.concurrent.TimeUnit.{ MICROSECONDS, MILLISECONDS } diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index c935435e14..461f6c54e1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -396,7 +396,10 @@ private[remote] abstract class InboundCompression[T >: Null]( case None ⇒ inboundContext.association(originUid) match { case OptionVal.Some(association) ⇒ - if (alive) { + if (association.associationState.isQuarantined(originUid)) { + // FIXME cleanup compresssion for quarantined associations, see #23967 + log.debug("Ignoring {} for quarantined originUid [{}].", Logging.simpleName(tables.activeTable), originUid) + } else if (alive) { val table = prepareCompressionAdvertisement(tables.nextTable.version) // TODO expensive, check if building the other way wouldn't be faster? val nextState = tables.copy(nextTable = table.invert, advertisementInProgress = Some(table)) @@ -404,8 +407,9 @@ private[remote] abstract class InboundCompression[T >: Null]( alive = false // will be set to true on first incoming message resendCount = 0 advertiseCompressionTable(association, table) - } else + } else { log.debug("{} for originUid [{}] not changed, no need to advertise same.", Logging.simpleName(tables.activeTable), originUid) + } case OptionVal.None ⇒ // otherwise it's too early, association not ready yet. @@ -417,18 +421,25 @@ private[remote] abstract class InboundCompression[T >: Null]( resendCount += 1 if (resendCount <= 5) { // The ActorRefCompressionAdvertisement message is resent because it can be lost - log.debug( - "Advertisment in progress for originUid [{}] version {}, resending", - originUid, inProgress.version) + inboundContext.association(originUid) match { case OptionVal.Some(association) ⇒ - advertiseCompressionTable(association, inProgress) // resend + if (association.associationState.isQuarantined(originUid)) { + // give up + log.debug("Skipping advertisement in progress for quarantined originUid [{}].", originUid) + confirmAdvertisement(inProgress.version) + } else { + log.debug( + "Advertisement in progress for originUid [{}] version {}, resending", + originUid, inProgress.version) + advertiseCompressionTable(association, inProgress) // resend + } case OptionVal.None ⇒ } } else { // give up, it might be dead log.debug( - "Advertisment in progress for originUid [{}] version {} but no confirmation after retries.", + "Advertisement in progress for originUid [{}] version {} but no confirmation after retries.", originUid, inProgress.version) confirmAdvertisement(inProgress.version) } diff --git a/akka-remote/src/test/java/akka/remote/artery/AeronStat.java b/akka-remote/src/test/java/akka/remote/artery/aeron/AeronStat.java similarity index 99% rename from akka-remote/src/test/java/akka/remote/artery/AeronStat.java rename to akka-remote/src/test/java/akka/remote/artery/aeron/AeronStat.java index e4654e1d22..df1cdc3a29 100644 --- a/akka-remote/src/test/java/akka/remote/artery/AeronStat.java +++ b/akka-remote/src/test/java/akka/remote/artery/aeron/AeronStat.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package akka.remote.artery; +package akka.remote.artery.aeron; import java.io.File; import java.io.PrintStream; diff --git a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala index 5bbfb04e75..ce248ee8b6 100644 --- a/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/Ticket1978CommunicationSpec.scala @@ -66,7 +66,8 @@ object Configuration { rng.nextInt() // Has to work val sRng = settings.SSLRandomNumberGenerator - rng.getAlgorithm == sRng || (throw new NoSuchAlgorithmException(sRng)) + if (rng.getAlgorithm != sRng && sRng != "") + throw new NoSuchAlgorithmException(sRng) val engine = NettySSLSupport(settings, NoMarkerLogging, isClient = true).getEngine val gotAllSupported = enabled.toSet diff engine.getSupportedCipherSuites.toSet diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala b/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala index 5e6e57c50e..04ffd32a4c 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala @@ -9,6 +9,7 @@ import java.util.UUID import akka.actor.ActorSystem import akka.remote.RARP import akka.testkit.AkkaSpec +import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import org.scalatest.Outcome @@ -46,7 +47,8 @@ object ArterySpecSupport { * Artery enabled, flight recorder enabled, dynamic selection of port on localhost. * Combine with [[FlightRecorderSpecIntegration]] or remember to delete flight recorder file if using manually */ - def defaultConfig = newFlightRecorderConfig.withFallback(staticArteryRemotingConfig) + def defaultConfig = newFlightRecorderConfig + .withFallback(staticArteryRemotingConfig) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala index c9501d7c57..f9da3146f6 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/EnvelopeBufferSpec.scala @@ -51,6 +51,8 @@ class EnvelopeBufferSpec extends AkkaSpec { override def runNextClassManifestAdvertisement(): Unit = ??? } + val version = ArteryTransport.HighestVersion + "EnvelopeBuffer" must { val headerOut = HeaderBuilder.in(TestCompressor) val headerIn = HeaderBuilder.out() @@ -64,7 +66,7 @@ class EnvelopeBufferSpec extends AkkaSpec { val originUid = 1L "be able to encode and decode headers with compressed literals" in { - headerIn setVersion 1 + headerIn setVersion version headerIn setUid 42 headerIn setSerializer 4 headerIn setRecipientActorRef minimalRef("compressable1") @@ -78,7 +80,7 @@ class EnvelopeBufferSpec extends AkkaSpec { envelope.byteBuffer.flip() envelope.parseHeader(headerOut) - headerOut.version should ===(1) + headerOut.version should ===(version) headerOut.uid should ===(42) headerOut.inboundActorRefCompressionTableVersion should ===(28.toByte) headerOut.inboundClassManifestCompressionTableVersion should ===(35.toByte) @@ -94,7 +96,7 @@ class EnvelopeBufferSpec extends AkkaSpec { val senderRef = minimalRef("uncompressable0") val recipientRef = minimalRef("uncompressable11") - headerIn setVersion 1 + headerIn setVersion version headerIn setUid 42 headerIn setSerializer 4 headerIn setSenderActorRef senderRef @@ -113,7 +115,7 @@ class EnvelopeBufferSpec extends AkkaSpec { envelope.byteBuffer.flip() envelope.parseHeader(headerOut) - headerOut.version should ===(1) + headerOut.version should ===(version) headerOut.uid should ===(42) headerOut.serializer should ===(4) headerOut.senderActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable0")) @@ -126,7 +128,7 @@ class EnvelopeBufferSpec extends AkkaSpec { "be able to encode and decode headers with mixed literals" in { val recipientRef = minimalRef("uncompressable1") - headerIn setVersion 1 + headerIn setVersion version headerIn setUid 42 headerIn setSerializer 4 headerIn setSenderActorRef minimalRef("reallylongcompressablestring") @@ -141,7 +143,7 @@ class EnvelopeBufferSpec extends AkkaSpec { envelope.byteBuffer.flip() envelope.parseHeader(headerOut) - headerOut.version should ===(1) + headerOut.version should ===(version) headerOut.uid should ===(42) headerOut.serializer should ===(4) headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") @@ -152,7 +154,7 @@ class EnvelopeBufferSpec extends AkkaSpec { val senderRef = minimalRef("uncompressable0") - headerIn setVersion 3 + headerIn setVersion version headerIn setUid Long.MinValue headerIn setSerializer -1 headerIn setSenderActorRef senderRef @@ -168,7 +170,7 @@ class EnvelopeBufferSpec extends AkkaSpec { envelope.byteBuffer.flip() envelope.parseHeader(headerOut) - headerOut.version should ===(3) + headerOut.version should ===(version) headerOut.uid should ===(Long.MinValue) headerOut.serializer should ===(-1) headerOut.senderActorRefPath should ===(OptionVal.Some("akka://EnvelopeBufferSpec/uncompressable0")) @@ -181,7 +183,7 @@ class EnvelopeBufferSpec extends AkkaSpec { "be able to encode and decode headers with mixed literals and payload" in { val payload = ByteString("Hello Artery!") - headerIn setVersion 1 + headerIn setVersion version headerIn setUid 42 headerIn setSerializer 4 headerIn setSenderActorRef minimalRef("reallylongcompressablestring") @@ -194,7 +196,7 @@ class EnvelopeBufferSpec extends AkkaSpec { envelope.parseHeader(headerOut) - headerOut.version should ===(1) + headerOut.version should ===(version) headerOut.uid should ===(42) headerOut.serializer should ===(4) headerOut.senderActorRef(originUid).get.path.toSerializationFormat should ===("akka://EnvelopeBufferSpec/reallylongcompressablestring") diff --git a/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala index 347710241b..bf0c7329c1 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/HandshakeDenySpec.scala @@ -33,8 +33,10 @@ class HandshakeDenySpec extends ArteryMultiNodeSpec(HandshakeDenySpec.commonConf systemB.actorOf(TestActors.echoActorProps, "echo") EventFilter.warning(start = "Dropping Handshake Request from").intercept { - sel ! Identify(None) - expectNoMsg(3.seconds) + sel ! Identify("hi echo") + // handshake timeout and Identify message in SendQueue is sent to deadLetters, + // which generates the ActorIdentity(None) + expectMsg(5.seconds, ActorIdentity("hi echo", None)) }(systemB) } diff --git a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala index 801c7efdf9..2de55c0e20 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/LargeMessagesStreamSpec.scala @@ -26,7 +26,6 @@ object LargeMessagesStreamSpec { class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( """ akka { - loglevel = ERROR remote.artery.large-message-destinations = [ "/user/large" ] } """.stripMargin) { @@ -102,16 +101,23 @@ class LargeMessagesStreamSpec extends ArteryMultiNodeSpec( val largeRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "large")) val regularRemote = awaitResolve(systemA.actorSelection(rootB / "user" / "regular")) - // send a large message, as well as regular one - val remoteProbe = TestProbe()(systemA) + // send a large message, as well as some regular ones + val probeSmall = TestProbe()(systemA) + val probeLarge = TestProbe()(systemA) val largeBytes = 2000000 - largeRemote.tell(Ping(ByteString.fromArray(new Array[Byte](largeBytes))), remoteProbe.ref) - regularRemote.tell(Ping(), remoteProbe.ref) + largeRemote.tell(Ping(ByteString.fromArray(new Array[Byte](largeBytes))), probeLarge.ref) + regularRemote.tell(Ping(), probeSmall.ref) + Thread.sleep(50) + regularRemote.tell(Ping(), probeSmall.ref) + Thread.sleep(50) + regularRemote.tell(Ping(), probeSmall.ref) // should be no problems sending regular small messages while large messages are being sent - remoteProbe.expectMsg(Pong(0)) - remoteProbe.expectMsg(10.seconds, Pong(largeBytes)) + probeSmall.expectMsg(Pong(0)) + probeSmall.expectMsg(Pong(0)) + probeSmall.expectMsg(Pong(0)) + probeLarge.expectMsg(10.seconds, Pong(largeBytes)) // cached flags should be set now largeRemote.asInstanceOf[RemoteActorRef].cachedSendQueueIndex should ===(Association.LargeQueueIndex) diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala index f719dfb6b8..4c799a7d42 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteFailureSpec.scala @@ -2,10 +2,19 @@ package akka.remote.artery import akka.remote.EndpointDisassociatedException import akka.testkit.{ EventFilter, ImplicitSender, TestActors, TestEvent } - import scala.concurrent.duration._ +import akka.testkit.DeadLettersFilter +import akka.testkit.TestEvent.Mute + +object RemoteFailureSpec { + final case class Ping(s: String) +} + class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender { + import RemoteFailureSpec._ + + system.eventStream.publish(Mute(DeadLettersFilter(classOf[Ping])(occurrences = Int.MaxValue))) "Remoting" should { @@ -29,12 +38,12 @@ class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender { // first everything is up and running 1 to n foreach { x ⇒ - localSelection ! "ping" - remoteSelections(x % remoteSystems.size) ! "ping" + localSelection ! Ping("1") + remoteSelections(x % remoteSystems.size) ! Ping("1") } within(5.seconds) { - receiveN(n * 2) foreach { reply ⇒ reply should ===("ping") } + receiveN(n * 2) foreach { reply ⇒ reply should ===(Ping("1")) } } // then we shutdown remote systems to simulate broken connections @@ -43,13 +52,13 @@ class RemoteFailureSpec extends ArteryMultiNodeSpec with ImplicitSender { } 1 to n foreach { x ⇒ - localSelection ! "ping" - remoteSelections(x % remoteSystems.size) ! "ping" + localSelection ! Ping("2") + remoteSelections(x % remoteSystems.size) ! Ping("2") } // ping messages to localEcho should go through even though we use many different broken connections within(5.seconds) { - receiveN(n) foreach { reply ⇒ reply should ===("ping") } + receiveN(n) foreach { reply ⇒ reply should ===(Ping("2")) } } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala index 62d6718df5..730ae550a0 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteSendConsistencySpec.scala @@ -11,12 +11,12 @@ import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.duration._ import akka.actor.ActorSelection -class RemoteSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec(ConfigFactory.parseString(""" +class ArteryUpdSendConsistencyWithOneLaneSpec extends AbstractRemoteSendConsistencySpec(ConfigFactory.parseString(""" akka.remote.artery.advanced.outbound-lanes = 1 akka.remote.artery.advanced.inbound-lanes = 1 """).withFallback(ArterySpecSupport.defaultConfig)) -class RemoteSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec( +class ArteryUpdSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec( ConfigFactory.parseString(""" akka.remote.artery.advanced.outbound-lanes = 3 akka.remote.artery.advanced.inbound-lanes = 3 diff --git a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala index fff1547938..2780edf691 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/SendQueueSpec.scala @@ -59,7 +59,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with "deliver all messages" in { val queue = new ManyToOneConcurrentArrayQueue[String](128) - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String]) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters)) .toMat(TestSink.probe)(Keep.both).run() downstream.request(10) @@ -78,7 +78,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with queue.offer("a") queue.offer("b") - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String]) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters)) .toMat(TestSink.probe)(Keep.both).run() downstream.request(10) @@ -96,7 +96,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with // this test verifies that the wakeup signal is triggered correctly val queue = new ManyToOneConcurrentArrayQueue[Int](128) val burstSize = 100 - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Int]) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Int](system.deadLetters)) .grouped(burstSize) .async .toMat(TestSink.probe)(Keep.both).run() @@ -124,7 +124,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with // send 100 per producer before materializing producers.foreach(_ ! ProduceToQueue(0, 100, queue)) - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Msg]) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[Msg](system.deadLetters)) .toMat(TestSink.probe)(Keep.both).run() sendQueue.inject(queue) @@ -154,7 +154,7 @@ class SendQueueSpec extends AkkaSpec("akka.actor.serialize-messages = off") with (1 to 100).foreach { n ⇒ val queue = new ManyToOneConcurrentArrayQueue[String](16) - val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String]) + val (sendQueue, downstream) = Source.fromGraph(new SendQueue[String](system.deadLetters)) .toMat(TestSink.probe)(Keep.both).run() f(queue, sendQueue, downstream) diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala similarity index 96% rename from akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala rename to akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala index ba92d718b7..33bc78b545 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala @@ -2,6 +2,7 @@ * Copyright (C) 2016-2018 Lightbend Inc. */ package akka.remote.artery +package aeron import java.io.File @@ -10,7 +11,7 @@ import scala.concurrent.duration._ import scala.util.control.NoStackTrace import akka.actor.ExtendedActorSystem -import akka.remote.artery.AeronSink.GaveUpMessageException +import akka.remote.artery.aeron.AeronSink.GaveUpMessageException import akka.stream.ActorMaterializer import akka.stream.ActorMaterializerSettings import akka.stream.scaladsl.Sink diff --git a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala index 67a54ee5e4..4ba34ba83b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/compress/CompressionIntegrationSpec.scala @@ -263,10 +263,10 @@ class CompressionIntegrationSpec extends ArteryMultiNodeSpec(CompressionIntegrat "wrap around" in { val extraConfig = """ - akka.loglevel = INFO + akka.loglevel = DEBUG akka.remote.artery.advanced.compression { - actor-refs.advertisement-interval = 10 millis + actor-refs.advertisement-interval = 100 millis manifests.advertisement-interval = 10 minutes } """ diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index d5fbae163f..f1e84b2ed1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -16,6 +16,8 @@ import akka.util.OptionVal import scala.concurrent.Promise import scala.util.control.NonFatal +import akka.stream.Attributes.LogLevels + /** * INTERNAL API * @@ -347,7 +349,12 @@ import scala.util.control.NonFatal def reportStageError(e: Throwable): Unit = { if (activeStage == null) throw e else { - log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage) + val loggingEnabled = activeStage.attributes.get[LogLevels] match { + case Some(levels) ⇒ levels.onFailure != LogLevels.Off + case None ⇒ true + } + if (loggingEnabled) + log.error(e, "Error in stage [{}]: {}", activeStage.originalStage.getOrElse(activeStage), e.getMessage) activeStage.failStage(e) // Abort chasing diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala index 4ceac8c983..4e6a582891 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TLSActor.scala @@ -456,7 +456,7 @@ import scala.util.{ Failure, Success, Try } /** * INTERNAL API */ -@InternalApi private[stream] object TlsUtils { +@InternalApi private[akka] object TlsUtils { def applySessionParameters(engine: SSLEngine, sessionParameters: NegotiateNewSession): Unit = { sessionParameters.enabledCipherSuites foreach (cs ⇒ engine.setEnabledCipherSuites(cs.toArray)) sessionParameters.enabledProtocols foreach (p ⇒ engine.setEnabledProtocols(p.toArray)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index 3822736478..14ad016308 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -343,8 +343,8 @@ private[stream] object ConnectionSourceStage { if (interpreter.log.isDebugEnabled) { val msg = "Aborting tcp connection to {} because of upstream failure: {}" - if (ex.getStackTrace.isEmpty) interpreter.log.debug(msg, remoteAddress, ex.getMessage) - else interpreter.log.debug(msg + "\n{}", remoteAddress, ex.getMessage, ex.getStackTrace.mkString("\n")) + if (ex.getStackTrace.isEmpty) interpreter.log.debug(msg, remoteAddress, ex) + else interpreter.log.debug(msg + "\n{}", remoteAddress, ex, ex.getStackTrace.mkString("\n")) } connection ! Abort } else failStage(ex) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala index 24e778b10e..392da05e6e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Hub.scala @@ -10,7 +10,6 @@ import akka.NotUsed import akka.dispatch.{ AbstractNodeQueue, ExecutionContexts } import akka.stream._ import akka.stream.stage._ - import scala.annotation.tailrec import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success, Try } @@ -21,9 +20,11 @@ import java.util.concurrent.atomic.AtomicReferenceArray import scala.collection.immutable import scala.collection.mutable.LongMap import scala.collection.immutable.Queue + import akka.annotation.InternalApi import akka.annotation.DoNotInherit import akka.annotation.ApiMayChange +import akka.stream.Attributes.LogLevels /** * A MergeHub is a special streaming hub that is able to collect streamed elements from a dynamic set of @@ -284,7 +285,13 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int) extends GraphStageWi } - (logic, Sink.fromGraph(sink)) + // propagate LogLevels attribute so that MergeHub can be used with onFailure = LogLevels.Off + val sinkWithAttributes = inheritedAttributes.get[LogLevels] match { + case Some(a) ⇒ Sink.fromGraph(sink).addAttributes(Attributes(a)) + case None ⇒ Sink.fromGraph(sink) + } + + (logic, sinkWithAttributes) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index ff9ee860af..a4c3f1270e 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -801,7 +801,7 @@ trait TestKitBase { */ def shutdown( actorSystem: ActorSystem = system, - duration: Duration = 5.seconds.dilated.min(10.seconds), + duration: Duration = 10.seconds.dilated.min(10.seconds), verifySystemShutdown: Boolean = false) { TestKit.shutdownActorSystem(actorSystem, duration, verifySystemShutdown) }