diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index 5968ae7881..9f9fee4710 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -99,6 +99,19 @@ class ActorRefSourceSpec extends StreamSpec { s.expectComplete() } + "signal buffered elements and complete the stream after receiving a Status.Success companion" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + ref ! 1 + ref ! 2 + ref ! 3 + ref ! Status.Success + sub.request(10) + s.expectNext(1, 2, 3) + s.expectComplete() + } + "not buffer elements after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 9f672ff62b..b3cf02bfd1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -529,7 +529,10 @@ object Source { */ def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = actorRef( - { case akka.actor.Status.Success(_) ⇒ }, + { + case akka.actor.Status.Success ⇒ + case akka.actor.Status.Success(_) ⇒ + }, { case akka.actor.Status.Failure(cause) ⇒ cause }, bufferSize, overflowStrategy)