diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index 6bad81eb4b..1cf3192726 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -53,7 +53,17 @@ class ActorRefSourceSpec extends AkkaSpec { expectTerminated(ref) } - "complete the stream when receiving PoisonPill" in assertAllStagesStopped { + "not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val ref = Source.actorRef(0, OverflowStrategy.dropHead).to(Sink(s)).run() + watch(ref) + val sub = s.expectSubscription + sub.request(100) + sub.cancel() + expectTerminated(ref) + } + + "complete the stream immediatly when receiving PoisonPill" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() val sub = s.expectSubscription @@ -61,14 +71,49 @@ class ActorRefSourceSpec extends AkkaSpec { s.expectComplete() } - "complete the stream when receiving Status.Success" in assertAllStagesStopped { + "signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() - val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() + val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink(s)).run() val sub = s.expectSubscription + ref ! 1 + ref ! 2 + ref ! 3 ref ! Status.Success("ok") + sub.request(10) + s.expectNext(1, 2, 3) s.expectComplete() } + "not buffer elements after receiving Status.Success" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink(s)).run() + val sub = s.expectSubscription + ref ! 1 + ref ! 2 + ref ! 3 + ref ! Status.Success("ok") + ref ! 100 + ref ! 100 + ref ! 100 + sub.request(10) + s.expectNext(1, 2, 3) + s.expectComplete() + } + + "after receiving Status.Success, allow for earlier completion with PoisonPill" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink(s)).run() + val sub = s.expectSubscription + ref ! 1 + ref ! 2 + ref ! 3 + ref ! Status.Success("ok") + sub.request(2) // not all elements drained yet + s.expectNext(1, 2) + ref ! PoisonPill + s.expectComplete() // element `3` not signalled + } + "fail the stream when receiving Status.Failure" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink(s)).run() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 809d449a01..5789602d47 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -32,18 +32,19 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf def receive = { case _: Request ⇒ // totalDemand is tracked by super - while (totalDemand > 0L && !buffer.isEmpty) - onNext(buffer.dequeue()) + if (bufferSize != 0) + while (totalDemand > 0L && !buffer.isEmpty) + onNext(buffer.dequeue()) case Cancel ⇒ context.stop(self) case _: Status.Success ⇒ - context.stop(self) // will complete the stream successfully + if (bufferSize == 0 || buffer.isEmpty) context.stop(self) // will complete the stream successfully + else context.become(drainBufferThenComplete) case Status.Failure(cause) if isActive ⇒ - onError(cause) - context.stop(self) + onErrorThenStop(cause) case elem if isActive ⇒ if (totalDemand > 0L) @@ -63,12 +64,31 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf buffer.clear() buffer.enqueue(elem) case Fail ⇒ - onError(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) - context.stop(self) + onErrorThenStop(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $bufferSize)!")) case Backpressure ⇒ // there is a precondition check in Source.actorRefSource factory method } + } + def drainBufferThenComplete: Receive = { + case Cancel ⇒ + context.stop(self) + + case Status.Failure(cause) if isActive ⇒ + // errors must be signalled as soon as possible, + // even if previously valid completion was requested via Status.Success + onErrorThenStop(cause) + + case _: Request ⇒ + // totalDemand is tracked by super + while (totalDemand > 0L && !buffer.isEmpty) + onNext(buffer.dequeue()) + + if (buffer.isEmpty) context.stop(self) // will complete the stream successfully + + case elem if isActive ⇒ + log.debug("Dropping element because Status.Success received already, " + + "only draining already buffered elements: [{}] (pending: [{}])", elem, buffer.used) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 5316259b35..cdfd3e0c36 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -165,8 +165,6 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) object Source extends SourceApply { - import OperationAttributes.none - private[stream] def apply[Out, Mat](module: SourceModule[Out, Mat]): Source[Out, Mat] = new Source(module) @@ -355,11 +353,14 @@ object Source extends SourceApply { * if there is no demand from downstream. When `bufferSize` is 0 the `overflowStrategy` does * not matter. * - * The stream can be completed successfully by sending [[akka.actor.PoisonPill]] or - * [[akka.actor.Status.Success]] to the actor reference. + * The stream can be completed successfully by sending the actor referende an [[akka.actor.Status.Success]] + * messagein which case already buffered elements will be signalled before signalling completion, + * or by sending a [[akka.actor.PoisonPill]] in which case completion will be signalled immediatly. * * The stream can be completed with failure by sending [[akka.actor.Status.Failure]] to the - * actor reference. + * actor reference. In case the Actor is still draining its internal buffer (after having received + * an [[akka.actor.Status.Success]]) before signalling completion and it receives a [[akka.actor.Status.Failure]], + * the failure will be signalled downstream immediatly (instead of the completion signal). * * The actor will be stopped when the stream is completed, failed or cancelled from downstream, * i.e. you can watch it to get notified when that happens.