diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index 48ba42b8e4..61caaefa88 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -116,21 +116,49 @@ class ActorRefBackpressureSinkSpec extends AkkaSpec with ScalaFutures with Conve } "keep on sending even after the buffer has been full" in assertAllStagesStopped { + val bufferSize = 16 + val streamElementCount = bufferSize + 4 val fw = createActor(classOf[Fw2]) - val probe = Source(1 to 20) - .alsoToMat(Flow[Int].take(16).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right) - .to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)) + val sink = Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage) + .withAttributes(inputBuffer(bufferSize, bufferSize)) + val probe = Source(1 to streamElementCount) + .alsoToMat(Flow[Int].take(bufferSize).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right) + .to(sink) .run() probe.futureValue should ===(akka.Done) expectMsg(initMessage) fw ! TriggerAckMessage - for (i ← 1 to 20) { + for (i ← 1 to streamElementCount) { expectMsg(i) fw ! TriggerAckMessage } expectMsg(completeMessage) } + "work with one element buffer" in assertAllStagesStopped { + val fw = createActor(classOf[Fw2]) + val publisher = + TestSource.probe[Int].to(Sink.actorRefWithAck(fw, + initMessage, ackMessage, completeMessage) + .withAttributes(inputBuffer(1, 1))).run() + + expectMsg(initMessage) + fw ! TriggerAckMessage + + publisher.sendNext(1) + expectMsg(1) + + fw ! TriggerAckMessage + expectNoMsg() // Ack received but buffer empty + + publisher.sendNext(2) // Buffer this value + fw ! TriggerAckMessage + expectMsg(2) + + publisher.sendComplete() + expectMsg(completeMessage) + } + "fail to materialize with zero sized input buffer" in { val fw = createActor(classOf[Fw]) an[IllegalArgumentException] shouldBe thrownBy { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala index 4ee4ce1498..791cbfd8a3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/QueueSinkSpec.scala @@ -116,12 +116,16 @@ class QueueSinkSpec extends AkkaSpec with ScalaFutures { } "keep on sending even after the buffer has been full" in assertAllStagesStopped { - val (probe, queue) = Source(1 to 20) - .alsoToMat(Flow[Int].take(15).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right) - .toMat(Sink.queue())(Keep.both) + val bufferSize = 16 + val streamElementCount = bufferSize + 4 + val sink = Sink.queue[Int]() + .withAttributes(inputBuffer(bufferSize, bufferSize)) + val (probe, queue) = Source(1 to streamElementCount) + .alsoToMat(Flow[Int].take(bufferSize).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right) + .toMat(sink)(Keep.both) .run() probe.futureValue should ===(akka.Done) - for (i ← 1 to 20) { + for (i ← 1 to streamElementCount) { queue.pull() pipeTo testActor expectMsg(Some(i)) } @@ -130,6 +134,25 @@ class QueueSinkSpec extends AkkaSpec with ScalaFutures { } + "work with one element buffer" in assertAllStagesStopped { + val sink = Sink.queue[Int]().withAttributes(inputBuffer(1, 1)) + val probe = TestPublisher.manualProbe[Int]() + val queue = Source.fromPublisher(probe).runWith(sink) + val sub = probe.expectSubscription() + + queue.pull().pipeTo(testActor) + sub.sendNext(1) // should pull next element + expectMsg(Some(1)) + + queue.pull().pipeTo(testActor) + expectNoMsg() // element requested but buffer empty + sub.sendNext(2) + expectMsg(Some(2)) + + sub.sendComplete() + Await.result(queue.pull(), noMsgTimeout) should be(None) + } + "fail to materialize with zero sized input buffer" in { an[IllegalArgumentException] shouldBe thrownBy { Source.single(()).runWith(Sink.queue().withAttributes(inputBuffer(0, 0))) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index e6403eca08..b7238f9604 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -38,9 +38,15 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa private def receive(evt: (ActorRef, Any)): Unit = { evt._2 match { - case `ackMessage` ⇒ - if (!buffer.isEmpty) sendData() - else acknowledgementReceived = true + case `ackMessage` ⇒ { + if (buffer.isEmpty) acknowledgementReceived = true + else { + // onPush might have filled the buffer up and + // stopped pulling, so we pull here + if (buffer.size() == maxBuffer) tryPull(in) + dequeueAndSend() + } + } case Terminated(`ref`) ⇒ completeStage() case _ ⇒ //ignore all other messages } @@ -53,10 +59,8 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa pull(in) } - private def sendData(): Unit = { - if (buffer.size() == maxBuffer) tryPull(in) + private def dequeueAndSend(): Unit = { ref ! buffer.poll() - acknowledgementReceived = false if (buffer.isEmpty && completeReceived) finish() } @@ -68,7 +72,10 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa setHandler(in, new InHandler { override def onPush(): Unit = { buffer offer grab(in) - if (acknowledgementReceived) sendData() + if (acknowledgementReceived) { + dequeueAndSend() + acknowledgementReceived = false + } if (buffer.size() < maxBuffer) pull(in) } override def onUpstreamFinish(): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index d5dab8e51e..bfd37bbf43 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -303,6 +303,8 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal var currentRequest: Option[Requested[T]] = None override def preStart(): Unit = { + // Allocates one additional element to hold stream + // closed/failure indicators buffer = Buffer(maxBuffer + 1, materializer) setKeepGoing(true) initCallback(callback.invoke) @@ -319,7 +321,7 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal case None ⇒ if (buffer.isEmpty) currentRequest = Some(promise) else { - if (buffer.used == maxBuffer - 1) tryPull(in) + if (buffer.used == maxBuffer) tryPull(in) sendDownstream(promise) } }) @@ -347,7 +349,7 @@ final private[stream] class QueueSink[T]() extends GraphStageWithMaterializedVal setHandler(in, new InHandler { override def onPush(): Unit = { enqueueAndNotify(Success(Some(grab(in)))) - if (buffer.used < maxBuffer - 1) pull(in) + if (buffer.used < maxBuffer) pull(in) } override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None)) override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))