=str #17323 Source.actorRef now emits buffered els before completion

This commit is contained in:
Konrad Malawski 2015-04-28 09:48:46 +02:00
parent 8714d556d5
commit 4ba11177ba
3 changed files with 81 additions and 15 deletions

View file

@ -53,7 +53,17 @@ class ActorRefSourceSpec extends AkkaSpec {
expectTerminated(ref)
}
"complete the stream when receiving PoisonPill" in assertAllStagesStopped {
"not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(0, OverflowStrategy.dropHead).to(Sink(s)).run()
watch(ref)
val sub = s.expectSubscription
sub.request(100)
sub.cancel()
expectTerminated(ref)
}
"complete the stream immediatly when receiving PoisonPill" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run()
val sub = s.expectSubscription
@ -61,14 +71,49 @@ class ActorRefSourceSpec extends AkkaSpec {
s.expectComplete()
}
"complete the stream when receiving Status.Success" in assertAllStagesStopped {
"signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run()
val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink(s)).run()
val sub = s.expectSubscription
ref ! 1
ref ! 2
ref ! 3
ref ! Status.Success("ok")
sub.request(10)
s.expectNext(1, 2, 3)
s.expectComplete()
}
"not buffer elements after receiving Status.Success" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink(s)).run()
val sub = s.expectSubscription
ref ! 1
ref ! 2
ref ! 3
ref ! Status.Success("ok")
ref ! 100
ref ! 100
ref ! 100
sub.request(10)
s.expectNext(1, 2, 3)
s.expectComplete()
}
"after receiving Status.Success, allow for earlier completion with PoisonPill" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink(s)).run()
val sub = s.expectSubscription
ref ! 1
ref ! 2
ref ! 3
ref ! Status.Success("ok")
sub.request(2) // not all elements drained yet
s.expectNext(1, 2)
ref ! PoisonPill
s.expectComplete() // element `3` not signalled
}
"fail the stream when receiving Status.Failure" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run()

View file

@ -32,18 +32,19 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf
def receive = {
case _: Request
// totalDemand is tracked by super
while (totalDemand > 0L && !buffer.isEmpty)
onNext(buffer.dequeue())
if (bufferSize != 0)
while (totalDemand > 0L && !buffer.isEmpty)
onNext(buffer.dequeue())
case Cancel
context.stop(self)
case _: Status.Success
context.stop(self) // will complete the stream successfully
if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully
else context.become(drainBufferThenComplete)
case Status.Failure(cause) if isActive
onError(cause)
context.stop(self)
onErrorThenStop(cause)
case elem if isActive
if (totalDemand > 0L)
@ -63,12 +64,31 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf
buffer.clear()
buffer.enqueue(elem)
case Fail
onError(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!"))
context.stop(self)
onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!"))
case Backpressure
// there is a precondition check in Source.actorRefSource factory method
}
}
def drainBufferThenComplete: Receive = {
case Cancel
context.stop(self)
case Status.Failure(cause) if isActive
// errors must be signalled as soon as possible,
// even if previously valid completion was requested via Status.Success
onErrorThenStop(cause)
case _: Request
// totalDemand is tracked by super
while (totalDemand > 0L && !buffer.isEmpty)
onNext(buffer.dequeue())
if (buffer.isEmpty) context.stop(self) // will complete the stream successfully
case elem if isActive
log.debug("Dropping element because Status.Success received already, " +
"only draining already buffered elements: [{}] (pending: [{}])", elem, buffer.used)
}
}

View file

@ -165,8 +165,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
object Source extends SourceApply {
import OperationAttributes.none
private[stream] def apply[Out, Mat](module: SourceModule[Out, Mat]): Source[Out, Mat] =
new Source(module)
@ -355,11 +353,14 @@ object Source extends SourceApply {
* if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does
* not matter.
*
* The stream can be completed successfully by sending [[akka.actor.PoisonPill]] or
* [[akka.actor.Status.Success]] to the actor reference.
* The stream can be completed successfully by sending the actor referende an [[akka.actor.Status.Success]]
* messagein which case already buffered elements will be signalled before signalling completion,
* or by sending a [[akka.actor.PoisonPill]] in which case completion will be signalled immediatly.
*
* The stream can be completed with failure by sending [[akka.actor.Status.Failure]] to the
* actor reference.
* actor reference. In case the Actor is still draining its internal buffer (after having received
* an [[akka.actor.Status.Success]]) before signalling completion and it receives a [[akka.actor.Status.Failure]],
* the failure will be signalled downstream immediatly (instead of the completion signal).
*
* The actor will be stopped when the stream is completed, failed or cancelled from downstream,
* i.e. you can watch it to get notified when that happens.