diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala index ee5d298d3d..ec76249bce 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefBackpressureSinkSpec.scala @@ -8,6 +8,9 @@ import akka.stream.ActorMaterializer import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ +import org.scalatest.concurrent.ScalaFutures +import org.scalactic.ConversionCheckedTripleEquals +import scala.concurrent.duration._ object ActorRefBackpressureSinkSpec { val initMessage = "start" @@ -43,9 +46,10 @@ object ActorRefBackpressureSinkSpec { } -class ActorRefBackpressureSinkSpec extends AkkaSpec { +class ActorRefBackpressureSinkSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals { import ActorRefBackpressureSinkSpec._ implicit val mat = ActorMaterializer() + implicit val patience = PatienceConfig(2.second) def createActor[T](c: Class[T]) = system.actorOf(Props(c, testActor).withDispatcher("akka.test.stream-dispatcher")) @@ -110,6 +114,22 @@ class ActorRefBackpressureSinkSpec extends AkkaSpec { expectMsg(completeMessage) } + "keep on sending even after the buffer has been full" in assertAllStagesStopped { + val fw = createActor(classOf[Fw2]) + val probe = Source(1 to 20) + .alsoToMat(Flow[Int].take(16).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right) + .to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage)) + .run() + probe.futureValue should ===(akka.Done) + expectMsg(initMessage) + fw ! TriggerAckMessage + for (i ← 1 to 20) { + expectMsg(i) + fw ! TriggerAckMessage + } + expectMsg(completeMessage) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index 92b74da6a0..4bc54b9c9c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -52,10 +52,9 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa } private def sendData(): Unit = { - if (!buffer.isEmpty) { - ref ! buffer.poll() - acknowledgementReceived = false - } + if (buffer.size() == maxBuffer) tryPull(in) + ref ! buffer.poll() + acknowledgementReceived = false if (buffer.isEmpty && completeReceived) finish() }