This commit is contained in:
parent
2909d4cce5
commit
8db5daa064
2 changed files with 33 additions and 6 deletions
|
|
@ -120,6 +120,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
|
|||
expectMsg(2)
|
||||
fw ! TriggerAckMessage
|
||||
expectMsg(3)
|
||||
fw ! TriggerAckMessage
|
||||
|
||||
expectMsg(completeMessage)
|
||||
}
|
||||
|
|
@ -144,6 +145,7 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
|
|||
expectMsg(2)
|
||||
fw ! TriggerAckMessage
|
||||
expectMsg(3)
|
||||
fw ! TriggerAckMessage
|
||||
|
||||
expectMsg(completeMessage)
|
||||
}
|
||||
|
|
@ -229,6 +231,29 @@ class ActorRefBackpressureSinkSpec extends StreamSpec {
|
|||
probe.expectMsg(failMessage)
|
||||
}
|
||||
|
||||
"signal completion after last message has been acked" in {
|
||||
val probe = TestProbe()
|
||||
|
||||
val sink = Sink
|
||||
.actorRefWithBackpressure[String](
|
||||
probe.ref,
|
||||
initMessage,
|
||||
ackMessage,
|
||||
completeMessage,
|
||||
(_: Throwable) => failMessage)
|
||||
.withAttributes(inputBuffer(1, 1))
|
||||
|
||||
Source.single("hello world").runWith(sink)
|
||||
|
||||
probe.expectMsg(initMessage)
|
||||
probe.reply(ackMessage)
|
||||
|
||||
probe.expectMsg("hello world")
|
||||
probe.expectNoMessage(100.millis)
|
||||
|
||||
probe.reply(ackMessage)
|
||||
probe.expectMsg(completeMessage)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,14 +4,15 @@
|
|||
|
||||
package akka.stream.impl
|
||||
|
||||
import java.util
|
||||
import akka.actor._
|
||||
import akka.annotation.InternalApi
|
||||
import akka.stream._
|
||||
import akka.stream.Attributes.InputBuffer
|
||||
import akka.stream._
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.stage._
|
||||
|
||||
import java.util
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -21,7 +22,7 @@ import akka.stream.stage._
|
|||
onInitMessage: ActorRef => Any,
|
||||
ackMessage: Option[Any],
|
||||
onCompleteMessage: Any,
|
||||
onFailureMessage: (Throwable) => Any)
|
||||
onFailureMessage: Throwable => Any)
|
||||
extends GraphStage[SinkShape[In]] {
|
||||
val in: Inlet[In] = Inlet[In]("ActorRefBackpressureSink.in")
|
||||
override def initialAttributes: Attributes = DefaultAttributes.actorRefWithBackpressureSink
|
||||
|
|
@ -43,8 +44,10 @@ import akka.stream.stage._
|
|||
evt._2 match {
|
||||
case Terminated(`ref`) => completeStage()
|
||||
case ackMsg if ackMessage.isEmpty || ackMessage.contains(ackMsg) =>
|
||||
if (buffer.isEmpty) acknowledgementReceived = true
|
||||
else {
|
||||
if (buffer.isEmpty) {
|
||||
acknowledgementReceived = true
|
||||
if (completeReceived) finish()
|
||||
} else {
|
||||
// onPush might have filled the buffer up and
|
||||
// stopped pulling, so we pull here
|
||||
if (buffer.size() == maxBuffer) tryPull(in)
|
||||
|
|
@ -63,7 +66,6 @@ import akka.stream.stage._
|
|||
|
||||
private def dequeueAndSend(): Unit = {
|
||||
ref ! messageAdapter(self)(buffer.poll())
|
||||
if (buffer.isEmpty && completeReceived) finish()
|
||||
}
|
||||
|
||||
private def finish(): Unit = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue