From af377790b00142f9434d41c4a298530266395df0 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Fri, 30 Dec 2016 15:05:21 +0100 Subject: [PATCH] =rem #21365 less aggressive busy spinning in AeronSource Benchmarks revealed that busy spinning directly in the graph stage can lead to an excessive increase in latency when multiple inbound lanes are active (i.e. the inbound flow has an asynchronous boundary driving the multiple lanes). The new strategy is therefore: For inbound-lanes > 1 or idle-cpu-level < 5: no spinning in the graph stage For inbound-lanes = 1 and idle-cpu-level >= 6: 50 * settings.Advanced.IdleCpuLevel - 240 which means in general much less or no spinning at all. Fixes #21365. --- .../akka/remote/artery/AeronStreamConcistencySpec.scala | 4 ++-- .../scala/akka/remote/artery/AeronStreamLatencySpec.scala | 4 ++-- .../akka/remote/artery/AeronStreamMaxThroughputSpec.scala | 2 +- .../src/main/scala/akka/remote/artery/AeronSource.scala | 7 ++++--- .../main/scala/akka/remote/artery/ArteryTransport.scala | 7 ++++++- .../src/test/scala/akka/remote/artery/AeronSinkSpec.scala | 2 +- 6 files changed, 16 insertions(+), 10 deletions(-) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index 240607651d..6f9b8667b6 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -97,7 +97,7 @@ abstract class AeronStreamConsistencySpec "start echo" in { runOn(second) { // just echo back - Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) } enterBarrier("echo-started") @@ -111,7 +111,7 @@ abstract class AeronStreamConsistencySpec val killSwitch = KillSwitches.shared("test") val started = TestProbe() val startMsg = "0".getBytes("utf-8") - Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .via(killSwitch.flow) .runForeach { envelope ⇒ val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index e4cda8e744..21e9e194a1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -218,7 +218,7 @@ abstract class AeronStreamLatencySpec val killSwitch = KillSwitches.shared(testName) val started = TestProbe() val startMsg = "0".getBytes("utf-8") - Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .via(killSwitch.flow) .runForeach { envelope ⇒ val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) @@ -315,7 +315,7 @@ abstract class AeronStreamLatencySpec "start echo" in { runOn(second) { // just echo back - Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) } enterBarrier("echo-started") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 6215080ce5..70802a08f8 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -177,7 +177,7 @@ abstract class AeronStreamMaxThroughputSpec var count = 0L val done = TestLatch(1) val killSwitch = KillSwitches.shared(testName) - Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .via(killSwitch.flow) .runForeach { envelope ⇒ val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index befc797868..a7be652913 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -71,6 +71,8 @@ private[remote] object AeronSource { /** * INTERNAL API * @param channel eg. "aeron:udp?endpoint=localhost:40123" + * @param spinning the amount of busy spinning to be done synchronously before deferring to the TaskRunner + * when waiting for data */ private[remote] class AeronSource( channel: String, @@ -78,7 +80,8 @@ private[remote] class AeronSource( aeron: Aeron, taskRunner: TaskRunner, pool: EnvelopeBufferPool, - flightRecorder: EventSink) + flightRecorder: EventSink, + spinning: Int) extends GraphStageWithMaterializedValue[SourceShape[EnvelopeBuffer], AeronSource.ResourceLifecycle] { import AeronSource._ import TaskRunner._ @@ -91,8 +94,6 @@ private[remote] class AeronSource( val logic = new GraphStageLogic(shape) with OutHandler with ResourceLifecycle with StageLogging { private val sub = aeron.addSubscription(channel, streamId) - // spin between 100 to 10000 depending on idleCpuLevel - private val spinning = 1100 * taskRunner.idleCpuLevel - 1000 private var backoffCount = spinning private var delegateTaskStartTime = 0L private var countBeforeDelegate = 0L diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 16299b2cf1..4b749e57b5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -977,7 +977,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, AeronSource.ResourceLifecycle] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, - createFlightRecorderEventSink())) + createFlightRecorderEventSink(), aeronSourceSpinningStrategy)) + + private def aeronSourceSpinningStrategy: Int = + if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365 + settings.Advanced.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels + else 50 * settings.Advanced.IdleCpuLevel - 240 val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ messageDispatcher.dispatch(m) diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala index feeb58b2c4..7cd02447b0 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala @@ -58,7 +58,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender { val port = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort val channel = s"aeron:udp?endpoint=localhost:$port" - Source.fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool, IgnoreEventSink, 0)) // fail receiver stream on first message .map(_ ⇒ throw new RuntimeException("stop") with NoStackTrace) .runWith(Sink.ignore)