From cc347db22858f7a5d51573f3b605d592ae952d4d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 10 May 2016 18:23:21 +0200 Subject: [PATCH] flush outstanding task in AeronSink before completing --- .../scala/akka/remote/artery/AeronSink.scala | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index f3427d7c92..590035ea01 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -70,7 +70,12 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu private var lastMsgSizeRef = new AtomicInteger // used in the external backoff task private val addOfferTask: Add = Add(offerTask(pub, buffer, lastMsgSizeRef, getAsyncCallback(_ ⇒ onOfferSuccess()))) - override def preStart(): Unit = pull(in) + private var offerTaskInProgress = false + + override def preStart(): Unit = { + setKeepGoing(true) + pull(in) + } override def postStop(): Unit = { taskRunner.command(Remove(addOfferTask.task)) @@ -97,6 +102,7 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu } else { // delegate backoff to shared TaskRunner lastMsgSizeRef.set(lastMsgSize) + offerTaskInProgress = true taskRunner.command(addOfferTask) } } else { @@ -105,7 +111,17 @@ class AeronSink(channel: String, streamId: Int, aeron: Aeron, taskRunner: TaskRu } private def onOfferSuccess(): Unit = { - pull(in) + offerTaskInProgress = false + if (isClosed(in)) + completeStage() + else + pull(in) + } + + override def onUpstreamFinish(): Unit = { + // flush outstanding offer before completing stage + if (!offerTaskInProgress) + super.onUpstreamFinish() } setHandler(in, this)