From e744e583e58d4585b43491f75adf7651dc05fed0 Mon Sep 17 00:00:00 2001 From: Christopher Hunt Date: Mon, 19 Mar 2018 13:06:53 +1100 Subject: [PATCH] Capture the Success companion object as well as its class (#24747) The absence of this is something that caught me out, and others. --- .../akka/stream/scaladsl/ActorRefSourceSpec.scala | 13 +++++++++++++ .../main/scala/akka/stream/scaladsl/Source.scala | 5 ++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala index 5968ae7881..9f9fee4710 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSourceSpec.scala @@ -99,6 +99,19 @@ class ActorRefSourceSpec extends StreamSpec { s.expectComplete() } + "signal buffered elements and complete the stream after receiving a Status.Success companion" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int]() + val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run() + val sub = s.expectSubscription + ref ! 1 + ref ! 2 + ref ! 3 + ref ! Status.Success + sub.request(10) + s.expectNext(1, 2, 3) + s.expectComplete() + } + "not buffer elements after receiving Status.Success" in assertAllStagesStopped { val s = TestSubscriber.manualProbe[Int]() val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run() diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 9f672ff62b..b3cf02bfd1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -529,7 +529,10 @@ object Source { */ def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] = actorRef( - { case akka.actor.Status.Success(_) ⇒ }, + { + case akka.actor.Status.Success ⇒ + case akka.actor.Status.Success(_) ⇒ + }, { case akka.actor.Status.Failure(cause) ⇒ cause }, bufferSize, overflowStrategy)