fix ActorRefWithAck when buffer has run full #19653
This commit is contained in:
parent
a6aee310ba
commit
15580e767f
2 changed files with 24 additions and 5 deletions
|
|
@ -8,6 +8,9 @@ import akka.stream.ActorMaterializer
|
|||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.scaladsl._
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalactic.ConversionCheckedTripleEquals
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object ActorRefBackpressureSinkSpec {
|
||||
val initMessage = "start"
|
||||
|
|
@ -43,9 +46,10 @@ object ActorRefBackpressureSinkSpec {
|
|||
|
||||
}
|
||||
|
||||
class ActorRefBackpressureSinkSpec extends AkkaSpec {
|
||||
class ActorRefBackpressureSinkSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals {
|
||||
import ActorRefBackpressureSinkSpec._
|
||||
implicit val mat = ActorMaterializer()
|
||||
implicit val patience = PatienceConfig(2.second)
|
||||
|
||||
def createActor[T](c: Class[T]) =
|
||||
system.actorOf(Props(c, testActor).withDispatcher("akka.test.stream-dispatcher"))
|
||||
|
|
@ -110,6 +114,22 @@ class ActorRefBackpressureSinkSpec extends AkkaSpec {
|
|||
expectMsg(completeMessage)
|
||||
}
|
||||
|
||||
"keep on sending even after the buffer has been full" in assertAllStagesStopped {
|
||||
val fw = createActor(classOf[Fw2])
|
||||
val probe = Source(1 to 20)
|
||||
.alsoToMat(Flow[Int].take(16).watchTermination()(Keep.right).to(Sink.ignore))(Keep.right)
|
||||
.to(Sink.actorRefWithAck(fw, initMessage, ackMessage, completeMessage))
|
||||
.run()
|
||||
probe.futureValue should ===(akka.Done)
|
||||
expectMsg(initMessage)
|
||||
fw ! TriggerAckMessage
|
||||
for (i ← 1 to 20) {
|
||||
expectMsg(i)
|
||||
fw ! TriggerAckMessage
|
||||
}
|
||||
expectMsg(completeMessage)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,10 +52,9 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa
|
|||
}
|
||||
|
||||
private def sendData(): Unit = {
|
||||
if (!buffer.isEmpty) {
|
||||
ref ! buffer.poll()
|
||||
acknowledgementReceived = false
|
||||
}
|
||||
if (buffer.size() == maxBuffer) tryPull(in)
|
||||
ref ! buffer.poll()
|
||||
acknowledgementReceived = false
|
||||
if (buffer.isEmpty && completeReceived) finish()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue