diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index f3427d7c92..590035ea01 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -70,7 +70,12 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu private var lastMsgSizeRef = new AtomicInteger // used in the external backoff task private val addOfferTask: Add = Add(offerTask(pub, buffer, lastMsgSizeRef, getAsyncCallback(_ ⇒ onOfferSuccess()))) - override def preStart(): Unit = pull(in) + private var offerTaskInProgress = false + + override def preStart(): Unit = { + setKeepGoing(true) + pull(in) + } override def postStop(): Unit = { taskRunner.command(Remove(addOfferTask.task)) @@ -97,6 +102,7 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu } else { // delegate backoff to shared TaskRunner lastMsgSizeRef.set(lastMsgSize) + offerTaskInProgress = true taskRunner.command(addOfferTask) } } else { @@ -105,7 +111,17 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu } private def onOfferSuccess(): Unit = { - pull(in) + offerTaskInProgress = false + if (isClosed(in)) + completeStage() + else + pull(in) + } + + override def onUpstreamFinish(): Unit = { + // flush outstanding offer before completing stage + if (!offerTaskInProgress) + super.onUpstreamFinish() } setHandler(in, this)