diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index 240607651d..6f9b8667b6 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -97,7 +97,7 @@ abstract class AeronStreamConsistencySpec "start echo" in { runOn(second) { // just echo back - Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) } enterBarrier("echo-started") @@ -111,7 +111,7 @@ abstract class AeronStreamConsistencySpec val killSwitch = KillSwitches.shared("test") val started = TestProbe() val startMsg = "0".getBytes("utf-8") - Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .via(killSwitch.flow) .runForeach { envelope ⇒ val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index e4cda8e744..21e9e194a1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -218,7 +218,7 @@ abstract class AeronStreamLatencySpec val killSwitch = KillSwitches.shared(testName) val started = TestProbe() val startMsg = "0".getBytes("utf-8") - Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .via(killSwitch.flow) .runForeach { envelope ⇒ val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) @@ -315,7 +315,7 @@ abstract class AeronStreamLatencySpec "start echo" in { runOn(second) { // just echo back - Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) } enterBarrier("echo-started") diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 6215080ce5..70802a08f8 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -177,7 +177,7 @@ abstract class AeronStreamMaxThroughputSpec var count = 0L val done = TestLatch(1) val killSwitch = KillSwitches.shared(testName) - Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) .via(killSwitch.flow) .runForeach { envelope ⇒ val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index befc797868..a7be652913 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -71,6 +71,8 @@ private[remote] object AeronSource { /** * INTERNAL API * @param channel eg. "aeron:udp?endpoint=localhost:40123" + * @param spinning the amount of busy spinning to be done synchronously before deferring to the TaskRunner + * when waiting for data */ private[remote] class AeronSource( channel: String, @@ -78,7 +80,8 @@ private[remote] class AeronSource( aeron: Aeron, taskRunner: TaskRunner, pool: EnvelopeBufferPool, - flightRecorder: EventSink) + flightRecorder: EventSink, + spinning: Int) extends GraphStageWithMaterializedValue[SourceShape[EnvelopeBuffer], AeronSource.ResourceLifecycle] { import AeronSource._ import TaskRunner._ @@ -91,8 +94,6 @@ private[remote] class AeronSource( val logic = new GraphStageLogic(shape) with OutHandler with ResourceLifecycle with StageLogging { private val sub = aeron.addSubscription(channel, streamId) - // spin between 100 to 10000 depending on idleCpuLevel - private val spinning = 1100 * taskRunner.idleCpuLevel - 1000 private var backoffCount = spinning private var delegateTaskStartTime = 0L private var countBeforeDelegate = 0L diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 16299b2cf1..4b749e57b5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -977,7 +977,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R def aeronSource(streamId: Int, pool: EnvelopeBufferPool): Source[EnvelopeBuffer, AeronSource.ResourceLifecycle] = Source.fromGraph(new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, - createFlightRecorderEventSink())) + createFlightRecorderEventSink(), aeronSourceSpinningStrategy)) + + private def aeronSourceSpinningStrategy: Int = + if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365 + settings.Advanced.IdleCpuLevel < 5) 0 // also don't spin for small IdleCpuLevels + else 50 * settings.Advanced.IdleCpuLevel - 240 val messageDispatcherSink: Sink[InboundEnvelope, Future[Done]] = Sink.foreach[InboundEnvelope] { m ⇒ messageDispatcher.dispatch(m) diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala index feeb58b2c4..7cd02447b0 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala @@ -58,7 +58,7 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender { val port = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort val channel = s"aeron:udp?endpoint=localhost:$port" - Source.fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool, IgnoreEventSink)) + Source.fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool, IgnoreEventSink, 0)) // fail receiver stream on first message .map(_ ⇒ throw new RuntimeException("stop") with NoStackTrace) .runWith(Sink.ignore)