diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index 2df7079373..a19468bbb1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -120,6 +120,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { expectMsg(2) fw ! TriggerAckMessage expectMsg(3) + fw ! TriggerAckMessage expectMsg(completeMessage) } @@ -144,6 +145,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { expectMsg(2) fw ! TriggerAckMessage expectMsg(3) + fw ! TriggerAckMessage expectMsg(completeMessage) } @@ -229,6 +231,29 @@ class ActorRefBackpressureSinkSpec extends StreamSpec { probe.expectMsg(failMessage) } + "signal completion after last message has been acked" in { + val probe = TestProbe() + + val sink = Sink + .actorRefWithBackpressure[String]( + probe.ref, + initMessage, + ackMessage, + completeMessage, + (_: Throwable) => failMessage) + .withAttributes(inputBuffer(1, 1)) + + Source.single("hello world").runWith(sink) + + probe.expectMsg(initMessage) + probe.reply(ackMessage) + + probe.expectMsg("hello world") + probe.expectNoMessage(100.millis) + + probe.reply(ackMessage) + probe.expectMsg(completeMessage) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index 2e2b20aa82..102682bca5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -4,14 +4,15 @@ package akka.stream.impl -import java.util import akka.actor._ import akka.annotation.InternalApi -import akka.stream._ import akka.stream.Attributes.InputBuffer +import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes import akka.stream.stage._ +import java.util + /** * INTERNAL API */ @@ -21,7 +22,7 @@ import akka.stream.stage._ onInitMessage: ActorRef => Any, ackMessage: Option[Any], onCompleteMessage: Any, - onFailureMessage: (Throwable) => Any) + onFailureMessage: Throwable => Any) extends GraphStage[SinkShape[In]] { val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in") override def initialAttributes: Attributes = DefaultAttributes.actorRefWithBackpressureSink @@ -43,8 +44,10 @@ import akka.stream.stage._ evt._2 match { case Terminated(`ref`) => completeStage() case ackMsg if ackMessage.isEmpty || ackMessage.contains(ackMsg) => - if (buffer.isEmpty) acknowledgementReceived = true - else { + if (buffer.isEmpty) { + acknowledgementReceived = true + if (completeReceived) finish() + } else { // onPush might have filled the buffer up and // stopped pulling, so we pull here if (buffer.size() == maxBuffer) tryPull(in) @@ -63,7 +66,6 @@ import akka.stream.stage._ private def dequeueAndSend(): Unit = { ref ! messageAdapter(self)(buffer.poll()) - if (buffer.isEmpty && completeReceived) finish() } private def finish(): Unit = {