2018-10-29 17:19:37 +08:00
|
|
|
/*
|
2019-01-02 18:55:26 +08:00
|
|
|
* Copyright (C) 2015-2019 Lightbend Inc. <https://www.lightbend.com>
|
2015-03-31 15:13:57 +02:00
|
|
|
*/
|
2018-03-13 23:45:55 +09:00
|
|
|
|
2015-03-31 15:13:57 +02:00
|
|
|
package akka.stream.scaladsl
|
|
|
|
|
|
|
|
|
|
import scala.concurrent.duration._
|
2019-03-11 10:38:24 +01:00
|
|
|
import akka.stream.{ ActorMaterializer, Attributes, OverflowStrategy }
|
2015-04-24 11:45:03 +03:00
|
|
|
import akka.stream.testkit._
|
2015-06-01 18:08:13 +03:00
|
|
|
import akka.stream.testkit.scaladsl._
|
2015-04-24 11:45:03 +03:00
|
|
|
import akka.stream.testkit.Utils._
|
2018-06-15 18:16:58 +03:00
|
|
|
import akka.stream.testkit.scaladsl.StreamTestKit._
|
2015-03-31 15:13:57 +02:00
|
|
|
import akka.actor.PoisonPill
|
|
|
|
|
import akka.actor.Status
|
2018-07-01 11:18:34 +01:00
|
|
|
import akka.Done
|
2015-03-31 15:13:57 +02:00
|
|
|
|
2016-07-28 16:43:08 +08:00
|
|
|
class ActorRefSourceSpec extends StreamSpec {
|
2015-12-11 14:45:24 +01:00
|
|
|
implicit val materializer = ActorMaterializer()
|
2015-03-31 15:13:57 +02:00
|
|
|
|
|
|
|
|
"A ActorRefSource" must {
|
|
|
|
|
|
|
|
|
|
"emit received messages to the stream" in {
|
2015-04-24 11:45:03 +03:00
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
2015-12-17 11:48:30 +02:00
|
|
|
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
|
2015-03-31 15:13:57 +02:00
|
|
|
val sub = s.expectSubscription
|
|
|
|
|
sub.request(2)
|
|
|
|
|
ref ! 1
|
|
|
|
|
s.expectNext(1)
|
|
|
|
|
ref ! 2
|
|
|
|
|
s.expectNext(2)
|
|
|
|
|
ref ! 3
|
|
|
|
|
s.expectNoMsg(500.millis)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"buffer when needed" in {
|
2015-04-24 11:45:03 +03:00
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
2015-12-17 11:48:30 +02:00
|
|
|
val ref = Source.actorRef(100, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
|
2015-03-31 15:13:57 +02:00
|
|
|
val sub = s.expectSubscription
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 1 to 20) ref ! n
|
2015-03-31 15:13:57 +02:00
|
|
|
sub.request(10)
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 1 to 10) s.expectNext(n)
|
2015-03-31 15:13:57 +02:00
|
|
|
sub.request(10)
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 11 to 20) s.expectNext(n)
|
2015-03-31 15:13:57 +02:00
|
|
|
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 200 to 399) ref ! n
|
2015-03-31 15:13:57 +02:00
|
|
|
sub.request(100)
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 300 to 399) s.expectNext(n)
|
2015-03-31 15:13:57 +02:00
|
|
|
}
|
|
|
|
|
|
2015-06-01 18:08:13 +03:00
|
|
|
"drop new when full and with dropNew strategy" in {
|
|
|
|
|
val (ref, sub) = Source.actorRef(100, OverflowStrategy.dropNew).toMat(TestSink.probe[Int])(Keep.both).run()
|
|
|
|
|
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 1 to 20) ref ! n
|
2015-06-01 18:08:13 +03:00
|
|
|
sub.request(10)
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 1 to 10) sub.expectNext(n)
|
2015-06-01 18:08:13 +03:00
|
|
|
sub.request(10)
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 11 to 20) sub.expectNext(n)
|
2015-06-01 18:08:13 +03:00
|
|
|
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 200 to 399) ref ! n
|
2015-06-01 18:08:13 +03:00
|
|
|
sub.request(100)
|
2019-02-09 15:25:39 +01:00
|
|
|
for (n <- 200 to 299) sub.expectNext(n)
|
2015-06-01 18:08:13 +03:00
|
|
|
}
|
|
|
|
|
|
2015-04-16 20:13:43 +02:00
|
|
|
"terminate when the stream is cancelled" in assertAllStagesStopped {
|
2015-04-24 11:45:03 +03:00
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
2015-12-17 11:48:30 +02:00
|
|
|
val ref = Source.actorRef(0, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
|
2015-03-31 15:13:57 +02:00
|
|
|
watch(ref)
|
|
|
|
|
val sub = s.expectSubscription
|
|
|
|
|
sub.cancel()
|
|
|
|
|
expectTerminated(ref)
|
|
|
|
|
}
|
|
|
|
|
|
2015-04-28 09:48:46 +02:00
|
|
|
"not fail when 0 buffer space and demand is signalled" in assertAllStagesStopped {
|
|
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
2015-12-17 11:48:30 +02:00
|
|
|
val ref = Source.actorRef(0, OverflowStrategy.dropHead).to(Sink.fromSubscriber(s)).run()
|
2015-04-28 09:48:46 +02:00
|
|
|
watch(ref)
|
|
|
|
|
val sub = s.expectSubscription
|
|
|
|
|
sub.request(100)
|
|
|
|
|
sub.cancel()
|
|
|
|
|
expectTerminated(ref)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"signal buffered elements and complete the stream after receiving Status.Success" in assertAllStagesStopped {
|
2015-04-24 11:45:03 +03:00
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
2015-12-17 11:48:30 +02:00
|
|
|
val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
|
2015-04-28 09:48:46 +02:00
|
|
|
val sub = s.expectSubscription
|
|
|
|
|
ref ! 1
|
|
|
|
|
ref ! 2
|
|
|
|
|
ref ! 3
|
|
|
|
|
ref ! Status.Success("ok")
|
|
|
|
|
sub.request(10)
|
|
|
|
|
s.expectNext(1, 2, 3)
|
|
|
|
|
s.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
2018-03-19 13:06:53 +11:00
|
|
|
"signal buffered elements and complete the stream after receiving a Status.Success companion" in assertAllStagesStopped {
|
|
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
|
|
|
|
val ref = Source.actorRef(3, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
|
|
|
|
|
val sub = s.expectSubscription
|
|
|
|
|
ref ! 1
|
|
|
|
|
ref ! 2
|
|
|
|
|
ref ! 3
|
|
|
|
|
ref ! Status.Success
|
|
|
|
|
sub.request(10)
|
|
|
|
|
s.expectNext(1, 2, 3)
|
|
|
|
|
s.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
2015-04-28 09:48:46 +02:00
|
|
|
"not buffer elements after receiving Status.Success" in assertAllStagesStopped {
|
|
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
2015-12-17 11:48:30 +02:00
|
|
|
val ref = Source.actorRef(3, OverflowStrategy.dropBuffer).to(Sink.fromSubscriber(s)).run()
|
2015-03-31 15:13:57 +02:00
|
|
|
val sub = s.expectSubscription
|
2015-04-28 09:48:46 +02:00
|
|
|
ref ! 1
|
|
|
|
|
ref ! 2
|
|
|
|
|
ref ! 3
|
2015-03-31 15:13:57 +02:00
|
|
|
ref ! Status.Success("ok")
|
2015-04-28 09:48:46 +02:00
|
|
|
ref ! 100
|
|
|
|
|
ref ! 100
|
|
|
|
|
ref ! 100
|
|
|
|
|
sub.request(10)
|
|
|
|
|
s.expectNext(1, 2, 3)
|
2015-03-31 15:13:57 +02:00
|
|
|
s.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
2018-07-01 11:18:34 +01:00
|
|
|
"complete and materialize the stream after receiving Status.Success" in assertAllStagesStopped {
|
|
|
|
|
val (ref, done) = {
|
|
|
|
|
Source.actorRef(3, OverflowStrategy.dropBuffer).toMat(Sink.ignore)(Keep.both).run()
|
|
|
|
|
}
|
2015-04-28 09:48:46 +02:00
|
|
|
ref ! Status.Success("ok")
|
2018-07-01 11:18:34 +01:00
|
|
|
done.futureValue should be(Done)
|
2015-04-28 09:48:46 +02:00
|
|
|
}
|
|
|
|
|
|
2015-04-16 20:13:43 +02:00
|
|
|
"fail the stream when receiving Status.Failure" in assertAllStagesStopped {
|
2015-04-24 11:45:03 +03:00
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
2015-12-17 11:48:30 +02:00
|
|
|
val ref = Source.actorRef(10, OverflowStrategy.fail).to(Sink.fromSubscriber(s)).run()
|
2015-03-31 15:13:57 +02:00
|
|
|
val sub = s.expectSubscription
|
2015-04-24 11:45:03 +03:00
|
|
|
val exc = TE("testfailure")
|
2015-03-31 15:13:57 +02:00
|
|
|
ref ! Status.Failure(exc)
|
|
|
|
|
s.expectError(exc)
|
|
|
|
|
}
|
2015-12-28 22:32:07 -05:00
|
|
|
|
|
|
|
|
"set actor name equal to stage name" in assertAllStagesStopped {
|
|
|
|
|
val s = TestSubscriber.manualProbe[Int]()
|
|
|
|
|
val name = "SomeCustomName"
|
2019-03-11 10:38:24 +01:00
|
|
|
val ref = Source
|
|
|
|
|
.actorRef(10, OverflowStrategy.fail)
|
|
|
|
|
.withAttributes(Attributes.name(name))
|
|
|
|
|
.to(Sink.fromSubscriber(s))
|
|
|
|
|
.run()
|
2015-12-28 22:32:07 -05:00
|
|
|
ref.path.name.contains(name) should ===(true)
|
|
|
|
|
ref ! PoisonPill
|
|
|
|
|
}
|
2015-03-31 15:13:57 +02:00
|
|
|
}
|
|
|
|
|
}
|