From c90121485fcfc44a3cee62a0c638e1982d13d812 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 19 May 2016 08:24:27 +0200 Subject: [PATCH] give up sending after a while, #20317 --- .../artery/AeronStreamConcistencySpec.scala | 7 +- .../artery/AeronStreamLatencySpec.scala | 7 +- .../artery/AeronStreamMaxThroughputSpec.scala | 3 +- .../scala/akka/remote/artery/AeronSink.scala | 51 +++++++---- .../akka/remote/artery/ArteryTransport.scala | 28 ++++--- .../akka/remote/artery/Association.scala | 23 +++-- .../scala/akka/remote/artery/Control.scala | 7 +- .../akka/remote/artery/StageLogging.scala | 34 ++++++++ .../remote/artery/SystemMessageDelivery.scala | 1 + .../akka/remote/artery/AeronSinkSpec.scala | 84 +++++++++++++++++++ .../akka/remote/artery/AeronStreamsApp.scala | 9 +- 11 files changed, 203 insertions(+), 51 deletions(-) create mode 100644 akka-remote/src/main/scala/akka/remote/artery/StageLogging.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index 526f321bd0..880dafd727 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -81,6 +81,7 @@ abstract class AeronStreamConsistencySpec } val streamId = 1 + val giveUpSendAfter = 30.seconds override def afterAll(): Unit = { taskRunner.stop() @@ -96,7 +97,7 @@ abstract class AeronStreamConsistencySpec runOn(second) { // just echo back Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool)) - .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter)) } enterBarrier("echo-started") } @@ -137,7 +138,7 @@ abstract class AeronStreamConsistencySpec envelope } .throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping) - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter)) started.expectMsg(Done) } @@ -149,7 +150,7 @@ abstract class AeronStreamConsistencySpec envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter)) Await.ready(done, 20.seconds) killSwitch.shutdown() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 6ce14f0b3b..27159795b1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -108,6 +108,7 @@ abstract class AeronStreamLatencySpec } val streamId = 1 + val giveUpSendAfter = 30.seconds lazy val reporterExecutor = Executors.newFixedThreadPool(1) def reporter(name: String): TestRateReporter = { @@ -227,7 +228,7 @@ abstract class AeronStreamLatencySpec envelope } .throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping) - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter)) started.expectMsg(Done) } @@ -245,7 +246,7 @@ abstract class AeronStreamLatencySpec sendTimes.set(n - 1, System.nanoTime()) envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter)) barrier.await((totalMessages / messageRate) + 10, SECONDS) } @@ -264,7 +265,7 @@ abstract class AeronStreamLatencySpec runOn(second) { // just echo back Source.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool)) - .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpSendAfter)) } enterBarrier("echo-started") } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index e4db9dca62..110ce66554 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -114,6 +114,7 @@ abstract class AeronStreamMaxThroughputSpec } val streamId = 1 + val giveUpSendAfter = 30.seconds lazy val reporterExecutor = Executors.newFixedThreadPool(1) def reporter(name: String): TestRateReporter = { @@ -211,7 +212,7 @@ abstract class AeronStreamMaxThroughputSpec envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpSendAfter)) printStats("sender") enterBarrier(testName + "-done") diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index 611809a449..a9a490579d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -13,6 +13,7 @@ import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try +import scala.util.control.NoStackTrace import akka.Done import akka.stream.Attributes @@ -28,26 +29,38 @@ import org.agrona.concurrent.UnsafeBuffer object AeronSink { - class OfferTask(pub: Publication, var buffer: UnsafeBuffer, msgSize: AtomicInteger, onOfferSuccess: AsyncCallback[Unit]) - extends (() ⇒ Boolean) { + final class GaveUpSendingException(msg: String) extends RuntimeException(msg) with NoStackTrace + private val TimerCheckPeriod = 1 << 13 // 8192 + private val TimerCheckMask = TimerCheckPeriod - 1 + + private final class OfferTask(pub: Publication, var buffer: UnsafeBuffer, var msgSize: Int, onOfferSuccess: AsyncCallback[Unit], + giveUpAfter: Duration, onGiveUp: AsyncCallback[Unit]) + extends (() ⇒ Boolean) { + val giveUpAfterNanos = giveUpAfter match { + case f: FiniteDuration ⇒ f.toNanos + case _ ⇒ -1L + } var n = 0L - var localMsgSize = -1 + var startTime = 0L override def apply(): Boolean = { + if (n == 0L) { + // first invocation for this message + startTime = if (giveUpAfterNanos >= 0) System.nanoTime() else 0L + } n += 1 - if (localMsgSize == -1) - localMsgSize = msgSize.get - val result = pub.offer(buffer, 0, localMsgSize) + val result = pub.offer(buffer, 0, msgSize) if (result >= 0) { - n = 0 - localMsgSize = -1 + n = 0L onOfferSuccess.invoke(()) true + } else if (giveUpAfterNanos >= 0 && (n & TimerCheckMask) == 0 && (System.nanoTime() - startTime) > giveUpAfterNanos) { + // the task is invoked by the spinning thread, only check nanoTime each 8192th invocation + n = 0L + onGiveUp.invoke(()) + true } else { - // FIXME drop after too many attempts? - if (n > 1000000 && n % 100000 == 0) - println(s"# offer not accepted after $n") // FIXME false } } @@ -57,7 +70,7 @@ object AeronSink { /** * @param channel eg. "aeron:udp?endpoint=localhost:40123" */ -class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRunner, pool: EnvelopeBufferPool) +class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRunner, pool: EnvelopeBufferPool, giveUpSendAfter: Duration) extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] { import AeronSink._ import TaskRunner._ @@ -77,8 +90,8 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu private val spinning = 1000 private var backoffCount = spinning private var lastMsgSize = 0 - private val lastMsgSizeRef = new AtomicInteger // used in the external backoff task - private val offerTask = new OfferTask(pub, null, lastMsgSizeRef, getAsyncCallback(_ ⇒ onOfferSuccess())) + private val offerTask = new OfferTask(pub, null, lastMsgSize, getAsyncCallback(_ ⇒ onOfferSuccess()), + giveUpSendAfter, getAsyncCallback(_ ⇒ onGiveUp())) private val addOfferTask: Add = Add(offerTask) private var offerTaskInProgress = false @@ -112,9 +125,10 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu publish() // recursive } else { // delegate backoff to shared TaskRunner - lastMsgSizeRef.set(lastMsgSize) offerTaskInProgress = true + // visibility of these assignments are ensured by adding the task to the command queue offerTask.buffer = envelopeInFlight.aeronBuffer + offerTask.msgSize = lastMsgSize taskRunner.command(addOfferTask) } } else { @@ -134,6 +148,13 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu pull(in) } + private def onGiveUp(): Unit = { + offerTaskInProgress = false + val cause = new GaveUpSendingException(s"Gave up sending message to $channel after $giveUpSendAfter.") + completedValue = Failure(cause) + failStage(cause) + } + override def onUpstreamFinish(): Unit = { // flush outstanding offer before completing stage if (!offerTaskInProgress) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 7a35e2913e..ece54fa867 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -236,6 +236,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val handshakeTimeout: FiniteDuration = system.settings.config.getMillisDuration("akka.remote.handshake-timeout").requiring(_ > Duration.Zero, "handshake-timeout must be > 0") + private val giveUpSendAfter: FiniteDuration = 60.seconds private val largeMessageDestinations = system.settings.config.getStringList("akka.remote.artery.large-message-destinations").asScala.foldLeft(WildcardTree[NotUsed]()) { (tree, entry) ⇒ @@ -416,17 +417,17 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: () ⇒ Unit): Unit = { implicit val ec = materializer.executionContext streamCompleted.onFailure { + case _ if isShutdown ⇒ // don't restart after shutdown case _: AbruptTerminationException ⇒ // ActorSystem shutdown case cause ⇒ - if (!isShutdown) - if (restartCounter.restart()) { - log.error(cause, "{} failed. Restarting it.", streamName) - restart() - } else { - log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system.", - streamName, maxRestarts, restartTimeout.toSeconds) - system.terminate() - } + if (restartCounter.restart()) { + log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage) + restart() + } else { + log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system. {}", + streamName, maxRestarts, restartTimeout.toSeconds, cause.getMessage) + system.terminate() + } } } @@ -485,7 +486,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval)) .via(encoder) - .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, envelopePool))(Keep.right) + .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), ordinaryStreamId, aeron, taskRunner, + envelopePool, giveUpSendAfter))(Keep.right) } def outboundLarge(outboundContext: OutboundContext): Sink[Send, Future[Done]] = { @@ -494,7 +496,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R Flow.fromGraph(killSwitch.flow[Send]) .via(new OutboundHandshake(outboundContext, handshakeTimeout, handshakeRetryInterval)) .via(createEncoder(pool)) - .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, envelopePool))(Keep.right) + .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), largeStreamId, aeron, taskRunner, + envelopePool, giveUpSendAfter))(Keep.right) case None ⇒ throw new IllegalArgumentException("Trying to create outbound stream but outbound stream not configured") } } @@ -505,7 +508,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .via(new SystemMessageDelivery(outboundContext, systemMessageResendInterval, remoteSettings.SysMsgBufferSize)) .viaMat(new OutboundControlJunction(outboundContext))(Keep.right) .via(encoder) - .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, envelopePool))(Keep.both) + .toMat(new AeronSink(outboundChannel(outboundContext.remoteAddress), controlStreamId, aeron, taskRunner, + envelopePool, Duration.Inf))(Keep.both) // FIXME we can also add scrubbing stage that would collapse sys msg acks/nacks and remove duplicate Quarantine messages } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index fe6b9e800e..cbc3abc118 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -21,6 +21,7 @@ import akka.dispatch.sysmsg.SystemMessage import akka.event.Logging import akka.remote.EndpointManager.Send import akka.remote.{ LargeDestination, RegularDestination, RemoteActorRef, UniqueAddress } +import akka.remote.artery.AeronSink.GaveUpSendingException import akka.remote.artery.InboundControlJunction.ControlMessageSubject import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.artery.OutboundHandshake.HandshakeTimeoutException @@ -261,17 +262,21 @@ private[akka] class Association( private def attachStreamRestart(streamName: String, streamCompleted: Future[Done], restart: Throwable ⇒ Unit): Unit = { implicit val ec = materializer.executionContext streamCompleted.onFailure { + case _ if transport.isShutdown ⇒ // don't restart after shutdown case _: AbruptTerminationException ⇒ // ActorSystem shutdown + case cause: GaveUpSendingException ⇒ + log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage) + // restart unconditionally, without counting restarts + restart(cause) case cause ⇒ - if (!transport.isShutdown) - if (restartCounter.restart()) { - log.error(cause, "{} failed. Restarting it.", streamName) - restart(cause) - } else { - log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system.", - streamName, maxRestarts, restartTimeout.toSeconds) - transport.system.terminate() - } + if (restartCounter.restart()) { + log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage) + restart(cause) + } else { + log.error(cause, "{} failed and restarted {} times within {} seconds. Terminating system. {}", + streamName, maxRestarts, restartTimeout.toSeconds, cause.getMessage) + transport.system.terminate() + } } } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Control.scala b/akka-remote/src/main/scala/akka/remote/artery/Control.scala index b371ae2650..7d32d0889b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Control.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Control.scala @@ -141,7 +141,7 @@ private[akka] class InboundControlJunction * INTERNAL API */ private[akka] object OutboundControlJunction { - trait OutboundControlIngress { + private[akka] trait OutboundControlIngress { def sendControlMessage(message: ControlMessage): Unit } } @@ -158,7 +158,7 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { // FIXME see issue #20503 related to CallbackWrapper, we might implement this in a better way - val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler { + val logic = new GraphStageLogic(shape) with CallbackWrapper[ControlMessage] with InHandler with OutHandler with StageLogging { import OutboundControlJunction._ private val sendControlMessageCallback = getAsyncCallback[ControlMessage](internalSendControlMessage) @@ -192,8 +192,7 @@ private[akka] class OutboundControlJunction(outboundContext: OutboundContext) buffer.offer(wrap(message)) else { // it's alright to drop control messages - // FIXME we need that stage logging support - println(s"dropping control message ${message.getClass.getName} due to full buffer") + log.debug("Dropping control message [{}] due to full buffer.", message.getClass.getName) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/StageLogging.scala b/akka-remote/src/main/scala/akka/remote/artery/StageLogging.scala new file mode 100644 index 0000000000..8f9e768299 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/StageLogging.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import akka.stream.stage.GraphStageLogic +import akka.event.LoggingAdapter +import akka.stream.ActorMaterializer +import akka.event.NoLogging + +// TODO this can be removed when https://github.com/akka/akka/issues/18793 has been implemented +/** + * INTERNAL API + */ +private[akka] trait StageLogging { self: GraphStageLogic ⇒ + + private var _log: LoggingAdapter = _ + + protected def logSource: Class[_] = this.getClass + + def log: LoggingAdapter = { + // only used in StageLogic, i.e. thread safe + if (_log eq null) { + materializer match { + case a: ActorMaterializer ⇒ + _log = akka.event.Logging(a.system, logSource) + case _ ⇒ + _log = NoLogging + } + } + _log + } + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 9fa61c25dd..747fcad084 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -107,6 +107,7 @@ private[akka] class SystemMessageDelivery( } if (!unacknowledged.isEmpty) scheduleOnce(ResendTick, resendInterval) + // FIXME give up resending after a long while, i.e. config property quarantine-after-silence } // ControlMessageObserver, external call diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala new file mode 100644 index 0000000000..313ac9e979 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.artery + +import java.io.File + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +import akka.actor.ExtendedActorSystem +import akka.remote.artery.AeronSink.GaveUpSendingException +import akka.stream.ActorMaterializer +import akka.stream.ActorMaterializerSettings +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.SocketUtil +import io.aeron.Aeron +import io.aeron.driver.MediaDriver +import org.agrona.IoUtil + +class AeronSinkSpec extends AkkaSpec with ImplicitSender { + + val driver = MediaDriver.launchEmbedded() + + val aeron = { + val ctx = new Aeron.Context + ctx.aeronDirectoryName(driver.aeronDirectoryName) + Aeron.connect(ctx) + } + + val taskRunner = { + val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem]) + r.start() + r + } + + val pool = new EnvelopeBufferPool(ArteryTransport.MaximumFrameSize, ArteryTransport.MaximumPooledBuffers) + + val matSettings = ActorMaterializerSettings(system).withFuzzing(true) + implicit val mat = ActorMaterializer(matSettings)(system) + + override def afterTermination(): Unit = { + taskRunner.stop() + aeron.close() + driver.close() + IoUtil.delete(new File(driver.aeronDirectoryName), true) + super.afterTermination() + } + + "AeronSink" must { + + "give up sending after given duration" in { + val port = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort + val channel = s"aeron:udp?endpoint=localhost:$port" + + Source.fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool)) + // fail receiver stream on first message + .map(_ ⇒ throw new RuntimeException("stop") with NoStackTrace) + .runWith(Sink.ignore) + + // use large enough messages to fill up buffers + val payload = Array.ofDim[Byte](100000) + val done = Source(1 to 1000).map(_ ⇒ payload) + .map { n ⇒ + val envelope = pool.acquire() + envelope.byteBuffer.put(payload) + envelope.byteBuffer.flip() + envelope + } + .runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis)) + + // without the give up timeout the stream would not complete/fail + intercept[GaveUpSendingException] { + Await.result(done, 5.seconds) + } + } + + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala index 26fcdb904d..12e664d732 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala @@ -38,6 +38,7 @@ object AeronStreamsApp { val latencyRate = 10000 // per second val latencyN = 10 * latencyRate val payload = ("0" * 100).getBytes("utf-8") + val giveUpSendAfter = 60.seconds lazy val sendTimes = new AtomicLongArray(latencyN) lazy val driver = { @@ -201,7 +202,7 @@ object AeronStreamsApp { envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter)) } def runEchoReceiver(): Unit = { @@ -213,7 +214,7 @@ object AeronStreamsApp { r.onMessage(1, envelope.byteBuffer.limit) envelope } - .runWith(new AeronSink(channel2, streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel2, streamId, aeron, taskRunner, pool, giveUpSendAfter)) } def runEchoSender(): Unit = { @@ -264,7 +265,7 @@ object AeronStreamsApp { envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter)) barrier.await() } @@ -303,7 +304,7 @@ object AeronStreamsApp { envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool)) + .runWith(new AeronSink(channel1, streamId, aeron, taskRunner, pool, giveUpSendAfter)) } def runStats(): Unit = {