Capture the Success companion object as well as its class (#24747)

The absence of this is something that caught me out, and others.
This commit is contained in:
Christopher Hunt 2018-03-19 13:06:53 +11:00 committed by Konrad `ktoso` Malawski
parent c67a86ac99
commit e744e583e5
2 changed files with 17 additions and 1 deletions

View file

@ -99,6 +99,19 @@ class ActorRefSourceSpec extends StreamSpec {
s.expectComplete()
}
"signal buffered elements and complete the stream after receiving a Status.Success companion" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
val sub = s.expectSubscription
ref ! 1
ref ! 2
ref ! 3
ref ! Status.Success
sub.request(10)
s.expectNext(1, 2, 3)
s.expectComplete()
}
"not buffer elements after receiving Status.Success" in assertAllStagesStopped {
val s = TestSubscriber.manualProbe[Int]()
val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run()

View file

@ -529,7 +529,10 @@ object Source {
*/
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
actorRef(
{ case akka.actor.Status.Success(_) },
{
case akka.actor.Status.Success
case akka.actor.Status.Success(_)
},
{ case akka.actor.Status.Failure(cause) cause },
bufferSize, overflowStrategy)