2014-09-11 10:32:35 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
2014-10-27 14:35:41 +01:00
|
|
|
package akka.stream.scaladsl
|
|
|
|
|
|
2015-09-08 23:57:17 -04:00
|
|
|
import akka.actor._
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.stream.Supervision._
|
2015-09-08 23:57:17 -04:00
|
|
|
import akka.stream.impl._
|
|
|
|
|
import akka.stream.impl.fusing.ActorInterpreter
|
2014-11-12 10:43:39 +01:00
|
|
|
import akka.stream.stage.Stage
|
2015-09-08 23:57:17 -04:00
|
|
|
import akka.stream.testkit.Utils._
|
|
|
|
|
import akka.stream.testkit._
|
|
|
|
|
import akka.stream.testkit.scaladsl.TestSink
|
|
|
|
|
import akka.stream.{ AbruptTerminationException, ActorMaterializer, ActorMaterializerSettings, Attributes }
|
|
|
|
|
import akka.testkit.TestEvent.{ Mute, UnMute }
|
|
|
|
|
import akka.testkit.{ EventFilter, TestDuration }
|
|
|
|
|
import com.typesafe.config.ConfigFactory
|
|
|
|
|
import org.reactivestreams.{ Publisher, Subscriber }
|
2014-10-27 14:35:41 +01:00
|
|
|
import scala.collection.immutable
|
2015-07-09 13:36:54 +02:00
|
|
|
import scala.concurrent.Await
|
2014-10-27 14:35:41 +01:00
|
|
|
import scala.concurrent.duration._
|
2014-11-12 10:43:39 +01:00
|
|
|
import scala.util.control.NoStackTrace
|
2014-09-11 10:32:35 +02:00
|
|
|
|
|
|
|
|
object FlowSpec {
|
|
|
|
|
class Fruit
|
|
|
|
|
class Apple extends Fruit
|
2014-11-09 21:09:50 +01:00
|
|
|
val apples = () ⇒ Iterator.continually(new Apple)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
}
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
|
|
|
|
|
import FlowSpec._
|
|
|
|
|
|
2015-06-23 18:28:53 +02:00
|
|
|
val settings = ActorMaterializerSettings(system)
|
2015-01-28 14:19:50 +01:00
|
|
|
.withInputBuffer(initialSize = 2, maxSize = 16)
|
|
|
|
|
|
2015-06-23 18:28:53 +02:00
|
|
|
implicit val mat = ActorMaterializer(settings)
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
val identity: Flow[Any, Any, _] ⇒ Flow[Any, Any, _] = in ⇒ in.map(e ⇒ e)
|
|
|
|
|
val identity2: Flow[Any, Any, _] ⇒ Flow[Any, Any, _] = in ⇒ identity(in)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
class BrokenActorInterpreter(
|
2015-06-23 18:28:53 +02:00
|
|
|
_settings: ActorMaterializerSettings,
|
2014-11-12 10:43:39 +01:00
|
|
|
_ops: Seq[Stage[_, _]],
|
2014-10-08 18:16:57 +02:00
|
|
|
brokenMessage: Any)
|
2015-06-23 17:32:55 +02:00
|
|
|
extends ActorInterpreter(_settings, _ops, mat, Attributes.none) {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
import akka.stream.actor.ActorSubscriberMessage._
|
|
|
|
|
|
|
|
|
|
override protected[akka] def aroundReceive(receive: Receive, msg: Any) = {
|
|
|
|
|
msg match {
|
|
|
|
|
case OnNext(m) if m == brokenMessage ⇒
|
|
|
|
|
throw new NullPointerException(s"I'm so broken [$m]")
|
|
|
|
|
case _ ⇒ super.aroundReceive(receive, msg)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
val faultyFlow: Flow[Any, Any, _] ⇒ Flow[Any, Any, _] = in ⇒ in.andThenMat { () ⇒
|
|
|
|
|
val props = Props(new BrokenActorInterpreter(settings, List(fusing.Map({ x: Any ⇒ x }, stoppingDecider)), "a3"))
|
2015-05-29 16:43:02 +02:00
|
|
|
.withDispatcher("akka.test.stream-dispatcher").withDeploy(Deploy.local)
|
2015-01-28 14:19:50 +01:00
|
|
|
val processor = ActorProcessorFactory[Any, Any](system.actorOf(
|
|
|
|
|
props,
|
|
|
|
|
"borken-stage-actor"))
|
|
|
|
|
(processor, ())
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
2015-06-23 18:28:53 +02:00
|
|
|
val toPublisher: (Source[Any, _], ActorMaterializer) ⇒ Publisher[Any] =
|
2015-03-05 12:21:17 +01:00
|
|
|
(f, m) ⇒ f.runWith(Sink.publisher)(m)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-06-23 18:28:53 +02:00
|
|
|
def toFanoutPublisher[In, Out](initialBufferSize: Int, maximumBufferSize: Int): (Source[Out, _], ActorMaterializer) ⇒ Publisher[Out] =
|
2014-10-17 14:05:50 +02:00
|
|
|
(f, m) ⇒ f.runWith(Sink.fanoutPublisher(initialBufferSize, maximumBufferSize))(m)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
def materializeIntoSubscriberAndPublisher[In, Out](flow: Flow[In, Out, _]): (Subscriber[In], Publisher[Out]) = {
|
2014-10-17 14:05:50 +02:00
|
|
|
val source = Source.subscriber[In]
|
|
|
|
|
val sink = Sink.publisher[Out]
|
|
|
|
|
flow.runWith(source, sink)
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"A Flow" must {
|
|
|
|
|
|
|
|
|
|
for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) {
|
|
|
|
|
s"request initial elements from upstream ($name, $n)" in {
|
|
|
|
|
new ChainSetup(op, settings.withInputBuffer(initialSize = n, maxSize = settings.maxInputBufferSize), toPublisher) {
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, settings.initialInputBufferSize)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"request more elements from upstream when downstream requests more elements" in {
|
|
|
|
|
new ChainSetup(identity, settings, toPublisher) {
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, settings.initialInputBufferSize)
|
|
|
|
|
downstreamSubscription.request(1)
|
|
|
|
|
upstream.expectNoMsg(100.millis)
|
|
|
|
|
downstreamSubscription.request(2)
|
|
|
|
|
upstream.expectNoMsg(100.millis)
|
|
|
|
|
upstreamSubscription.sendNext("a")
|
|
|
|
|
downstream.expectNext("a")
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstream.expectNoMsg(100.millis)
|
|
|
|
|
upstreamSubscription.sendNext("b")
|
|
|
|
|
upstreamSubscription.sendNext("c")
|
|
|
|
|
upstreamSubscription.sendNext("d")
|
|
|
|
|
downstream.expectNext("b")
|
|
|
|
|
downstream.expectNext("c")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver events when publisher sends elements and then completes" in {
|
|
|
|
|
new ChainSetup(identity, settings, toPublisher) {
|
|
|
|
|
downstreamSubscription.request(1)
|
|
|
|
|
upstreamSubscription.sendNext("test")
|
|
|
|
|
upstreamSubscription.sendComplete()
|
|
|
|
|
downstream.expectNext("test")
|
|
|
|
|
downstream.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver complete signal when publisher immediately completes" in {
|
|
|
|
|
new ChainSetup(identity, settings, toPublisher) {
|
|
|
|
|
upstreamSubscription.sendComplete()
|
|
|
|
|
downstream.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"deliver error signal when publisher immediately fails" in {
|
|
|
|
|
new ChainSetup(identity, settings, toPublisher) {
|
|
|
|
|
object WeirdError extends RuntimeException("weird test exception")
|
2014-10-08 18:16:57 +02:00
|
|
|
upstreamSubscription.sendError(WeirdError)
|
|
|
|
|
downstream.expectError(WeirdError)
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"cancel upstream when single subscriber cancels subscription while receiving data" in {
|
|
|
|
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize), toPublisher) {
|
|
|
|
|
downstreamSubscription.request(5)
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.sendNext("test")
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.sendNext("test2")
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
downstream.expectNext("test")
|
|
|
|
|
downstream.expectNext("test2")
|
|
|
|
|
downstreamSubscription.cancel()
|
|
|
|
|
|
|
|
|
|
// because of the "must cancel its upstream Subscription if its last downstream Subscription has been cancelled" rule
|
|
|
|
|
upstreamSubscription.expectCancellation()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"materialize into Publisher/Subscriber" in {
|
2014-10-02 17:32:08 +02:00
|
|
|
val flow = Flow[String]
|
|
|
|
|
val (flowIn: Subscriber[String], flowOut: Publisher[String]) = materializeIntoSubscriberAndPublisher(flow)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val c1 = TestSubscriber.manualProbe[String]()
|
2014-09-11 10:32:35 +02:00
|
|
|
flowOut.subscribe(c1)
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
val source: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher)
|
2014-10-17 14:05:50 +02:00
|
|
|
source.subscribe(flowIn)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-08-01 00:13:14 +02:00
|
|
|
val sub1 = c1.expectSubscription()
|
2014-09-11 10:32:35 +02:00
|
|
|
sub1.request(3)
|
|
|
|
|
c1.expectNext("1")
|
|
|
|
|
c1.expectNext("2")
|
|
|
|
|
c1.expectNext("3")
|
2015-08-01 00:13:14 +02:00
|
|
|
c1.expectComplete()
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"materialize into Publisher/Subscriber and transformation processor" in {
|
2014-10-02 17:32:08 +02:00
|
|
|
val flow = Flow[Int].map((i: Int) ⇒ i.toString)
|
|
|
|
|
val (flowIn: Subscriber[Int], flowOut: Publisher[String]) = materializeIntoSubscriberAndPublisher(flow)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val c1 = TestSubscriber.manualProbe[String]()
|
2014-09-11 10:32:35 +02:00
|
|
|
flowOut.subscribe(c1)
|
2015-08-01 00:13:14 +02:00
|
|
|
val sub1 = c1.expectSubscription()
|
2014-09-11 10:32:35 +02:00
|
|
|
sub1.request(3)
|
|
|
|
|
c1.expectNoMsg(200.millis)
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher)
|
2014-10-17 14:05:50 +02:00
|
|
|
source.subscribe(flowIn)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
|
|
|
|
c1.expectNext("1")
|
|
|
|
|
c1.expectNext("2")
|
|
|
|
|
c1.expectNext("3")
|
2015-08-01 00:13:14 +02:00
|
|
|
c1.expectComplete()
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"materialize into Publisher/Subscriber and multiple transformation processors" in {
|
2014-10-02 17:32:08 +02:00
|
|
|
val flow = Flow[Int].map(_.toString).map("elem-" + _)
|
|
|
|
|
val (flowIn, flowOut) = materializeIntoSubscriberAndPublisher(flow)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val c1 = TestSubscriber.manualProbe[String]()
|
2014-09-11 10:32:35 +02:00
|
|
|
flowOut.subscribe(c1)
|
2015-08-01 00:13:14 +02:00
|
|
|
val sub1 = c1.expectSubscription()
|
2014-09-11 10:32:35 +02:00
|
|
|
sub1.request(3)
|
|
|
|
|
c1.expectNoMsg(200.millis)
|
|
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
val source: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher)
|
2014-10-17 14:05:50 +02:00
|
|
|
source.subscribe(flowIn)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
|
|
|
|
c1.expectNext("elem-1")
|
|
|
|
|
c1.expectNext("elem-2")
|
|
|
|
|
c1.expectNext("elem-3")
|
2015-08-01 00:13:14 +02:00
|
|
|
c1.expectComplete()
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"subscribe Subscriber" in {
|
2015-01-28 14:19:50 +01:00
|
|
|
val flow: Flow[String, String, _] = Flow[String]
|
2015-04-24 11:45:03 +03:00
|
|
|
val c1 = TestSubscriber.manualProbe[String]()
|
2015-01-28 14:19:50 +01:00
|
|
|
val sink: Sink[String, _] = flow.to(Sink(c1))
|
2015-03-05 12:21:17 +01:00
|
|
|
val publisher: Publisher[String] = Source(List("1", "2", "3")).runWith(Sink.publisher)
|
2014-10-31 10:43:42 +02:00
|
|
|
Source(publisher).to(sink).run()
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-08-01 00:13:14 +02:00
|
|
|
val sub1 = c1.expectSubscription()
|
2014-09-11 10:32:35 +02:00
|
|
|
sub1.request(3)
|
|
|
|
|
c1.expectNext("1")
|
|
|
|
|
c1.expectNext("2")
|
|
|
|
|
c1.expectNext("3")
|
2015-08-01 00:13:14 +02:00
|
|
|
c1.expectComplete()
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"perform transformation operation" in {
|
2014-10-02 17:32:08 +02:00
|
|
|
val flow = Flow[Int].map(i ⇒ { testActor ! i.toString; i.toString })
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-03-05 12:21:17 +01:00
|
|
|
val publisher = Source(List(1, 2, 3)).runWith(Sink.publisher)
|
2014-10-31 10:43:42 +02:00
|
|
|
Source(publisher).via(flow).to(Sink.ignore).run()
|
2014-09-11 10:32:35 +02:00
|
|
|
|
|
|
|
|
expectMsg("1")
|
|
|
|
|
expectMsg("2")
|
|
|
|
|
expectMsg("3")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"perform transformation operation and subscribe Subscriber" in {
|
2014-10-02 17:32:08 +02:00
|
|
|
val flow = Flow[Int].map(_.toString)
|
2015-04-24 11:45:03 +03:00
|
|
|
val c1 = TestSubscriber.manualProbe[String]()
|
2015-01-28 14:19:50 +01:00
|
|
|
val sink: Sink[Int, _] = flow.to(Sink(c1))
|
2015-03-05 12:21:17 +01:00
|
|
|
val publisher: Publisher[Int] = Source(List(1, 2, 3)).runWith(Sink.publisher)
|
2014-10-31 10:43:42 +02:00
|
|
|
Source(publisher).to(sink).run()
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-08-01 00:13:14 +02:00
|
|
|
val sub1 = c1.expectSubscription()
|
2014-09-11 10:32:35 +02:00
|
|
|
sub1.request(3)
|
|
|
|
|
c1.expectNext("1")
|
|
|
|
|
c1.expectNext("2")
|
|
|
|
|
c1.expectNext("3")
|
2015-08-01 00:13:14 +02:00
|
|
|
c1.expectComplete()
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
2015-04-16 20:13:43 +02:00
|
|
|
"be materializable several times with fanout publisher" in assertAllStagesStopped {
|
2014-10-02 17:32:08 +02:00
|
|
|
val flow = Source(List(1, 2, 3)).map(_.toString)
|
2014-10-17 14:05:50 +02:00
|
|
|
val p1 = flow.runWith(Sink.fanoutPublisher(2, 2))
|
|
|
|
|
val p2 = flow.runWith(Sink.fanoutPublisher(2, 2))
|
2015-04-24 11:45:03 +03:00
|
|
|
val s1 = TestSubscriber.manualProbe[String]()
|
|
|
|
|
val s2 = TestSubscriber.manualProbe[String]()
|
|
|
|
|
val s3 = TestSubscriber.manualProbe[String]()
|
2014-09-29 16:01:17 +02:00
|
|
|
p1.subscribe(s1)
|
|
|
|
|
p2.subscribe(s2)
|
|
|
|
|
p2.subscribe(s3)
|
|
|
|
|
|
2015-08-01 00:13:14 +02:00
|
|
|
val sub1 = s1.expectSubscription()
|
|
|
|
|
val sub2 = s2.expectSubscription()
|
|
|
|
|
val sub3 = s3.expectSubscription()
|
2014-09-29 16:01:17 +02:00
|
|
|
|
|
|
|
|
sub1.request(3)
|
|
|
|
|
s1.expectNext("1")
|
|
|
|
|
s1.expectNext("2")
|
|
|
|
|
s1.expectNext("3")
|
2015-08-01 00:13:14 +02:00
|
|
|
s1.expectComplete()
|
2014-09-29 16:01:17 +02:00
|
|
|
|
|
|
|
|
sub2.request(3)
|
|
|
|
|
sub3.request(3)
|
|
|
|
|
s2.expectNext("1")
|
|
|
|
|
s2.expectNext("2")
|
|
|
|
|
s2.expectNext("3")
|
2015-08-01 00:13:14 +02:00
|
|
|
s2.expectComplete()
|
2014-09-29 16:01:17 +02:00
|
|
|
s3.expectNext("1")
|
|
|
|
|
s3.expectNext("2")
|
|
|
|
|
s3.expectNext("3")
|
2015-08-01 00:13:14 +02:00
|
|
|
s3.expectComplete()
|
2014-09-29 16:01:17 +02:00
|
|
|
}
|
|
|
|
|
|
2014-09-11 10:32:35 +02:00
|
|
|
"be covariant" in {
|
2015-01-28 14:19:50 +01:00
|
|
|
val f1: Source[Fruit, _] = Source[Fruit](apples)
|
2015-03-05 12:21:17 +01:00
|
|
|
val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher)
|
2015-01-28 14:19:50 +01:00
|
|
|
val f2: Source[Source[Fruit, _], _] = Source[Fruit](apples).splitWhen(_ ⇒ true)
|
|
|
|
|
val f3: Source[(Boolean, Source[Fruit, _]), _] = Source[Fruit](apples).groupBy(_ ⇒ true)
|
|
|
|
|
val f4: Source[(immutable.Seq[Fruit], Source[Fruit, _]), _] = Source[Fruit](apples).prefixAndTail(1)
|
|
|
|
|
val d1: Flow[String, Source[Fruit, _], _] = Flow[String].map(_ ⇒ new Apple).splitWhen(_ ⇒ true)
|
|
|
|
|
val d2: Flow[String, (Boolean, Source[Fruit, _]), _] = Flow[String].map(_ ⇒ new Apple).groupBy(_ ⇒ true)
|
|
|
|
|
val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit, _]), _] = Flow[String].map(_ ⇒ new Apple).prefixAndTail(1)
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
2014-11-03 12:59:05 +01:00
|
|
|
|
|
|
|
|
"be able to concat with a Source" in {
|
2015-01-28 14:19:50 +01:00
|
|
|
val f1: Flow[Int, String, _] = Flow[Int].map(_.toString + "-s")
|
|
|
|
|
val s1: Source[Int, _] = Source(List(1, 2, 3))
|
|
|
|
|
val s2: Source[String, _] = Source(List(4, 5, 6)).map(_.toString + "-s")
|
2014-11-03 12:59:05 +01:00
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val subs = TestSubscriber.manualProbe[Any]()
|
2015-01-28 14:19:50 +01:00
|
|
|
val subSink = Sink.publisher[Any]
|
2014-11-03 12:59:05 +01:00
|
|
|
|
|
|
|
|
val (_, res) = f1.concat(s2).runWith(s1, subSink)
|
|
|
|
|
|
|
|
|
|
res.subscribe(subs)
|
|
|
|
|
val sub = subs.expectSubscription()
|
|
|
|
|
sub.request(9)
|
|
|
|
|
subs.expectNext("1-s")
|
|
|
|
|
subs.expectNext("2-s")
|
|
|
|
|
subs.expectNext("3-s")
|
|
|
|
|
subs.expectNext("4-s")
|
|
|
|
|
subs.expectNext("5-s")
|
|
|
|
|
subs.expectNext("6-s")
|
|
|
|
|
subs.expectComplete()
|
2015-09-08 23:57:17 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be able to concat with empty source" in {
|
|
|
|
|
val probe = Source.single(1).concat(Source.empty)
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
probe.request(1)
|
|
|
|
|
probe.expectNext(1)
|
|
|
|
|
probe.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be able to concat empty source" in {
|
|
|
|
|
val probe = Source.empty.concat(Source.single(1))
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
probe.request(1)
|
|
|
|
|
probe.expectNext(1)
|
|
|
|
|
probe.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be able to concat two empty sources" in {
|
|
|
|
|
val probe = Source.empty.concat(Source.empty)
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
probe.expectSubscription()
|
|
|
|
|
probe.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be able to concat source with error" in {
|
|
|
|
|
val probe = Source.single(1).concat(Source.failed(TestException))
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
probe.expectSubscription()
|
|
|
|
|
probe.expectError(TestException)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"subscribe at once to initial source and to one that it's concat to" in {
|
|
|
|
|
val publisher1 = TestPublisher.probe[Int]()
|
|
|
|
|
val publisher2 = TestPublisher.probe[Int]()
|
|
|
|
|
val probeSink = Source.apply(publisher1).concat(Source.apply(publisher2))
|
|
|
|
|
.runWith(TestSink.probe[Int])
|
|
|
|
|
|
|
|
|
|
val sub1 = publisher1.expectSubscription()
|
|
|
|
|
val sub2 = publisher2.expectSubscription()
|
|
|
|
|
val subSink = probeSink.expectSubscription()
|
|
|
|
|
|
|
|
|
|
sub1.sendNext(1)
|
|
|
|
|
subSink.request(1)
|
|
|
|
|
probeSink.expectNext(1)
|
|
|
|
|
sub1.sendComplete()
|
|
|
|
|
|
|
|
|
|
sub2.sendNext(2)
|
|
|
|
|
subSink.request(1)
|
|
|
|
|
probeSink.expectNext(2)
|
|
|
|
|
sub2.sendComplete()
|
|
|
|
|
|
|
|
|
|
probeSink.expectComplete()
|
2014-11-03 12:59:05 +01:00
|
|
|
}
|
2015-07-09 13:36:54 +02:00
|
|
|
|
|
|
|
|
"be possible to convert to a processor, and should be able to take a Processor" in {
|
|
|
|
|
val identity1 = Flow[Int].toProcessor
|
|
|
|
|
val identity2 = Flow(() ⇒ identity1.run())
|
|
|
|
|
Await.result(
|
|
|
|
|
Source(1 to 10).via(identity2).grouped(100).runWith(Sink.head),
|
|
|
|
|
3.seconds) should ===(1 to 10)
|
|
|
|
|
|
|
|
|
|
// Reusable:
|
|
|
|
|
Await.result(
|
|
|
|
|
Source(1 to 10).via(identity2).grouped(100).runWith(Sink.head),
|
|
|
|
|
3.seconds) should ===(1 to 10)
|
|
|
|
|
}
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"A Flow with multiple subscribers (FanOutBox)" must {
|
|
|
|
|
"adapt speed to the currently slowest subscriber" in {
|
2015-05-05 12:32:49 +02:00
|
|
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
2014-09-11 10:32:35 +02:00
|
|
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
2014-09-11 10:32:35 +02:00
|
|
|
publisher.subscribe(downstream2)
|
|
|
|
|
val downstream2Subscription = downstream2.expectSubscription()
|
|
|
|
|
|
|
|
|
|
downstreamSubscription.request(5)
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1) // because initialInputBufferSize=1
|
|
|
|
|
|
|
|
|
|
upstreamSubscription.sendNext("firstElement")
|
|
|
|
|
downstream.expectNext("firstElement")
|
|
|
|
|
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstreamSubscription.sendNext("element2")
|
|
|
|
|
|
|
|
|
|
downstream.expectNoMsg(1.second)
|
|
|
|
|
downstream2Subscription.request(1)
|
|
|
|
|
downstream2.expectNext("firstElement")
|
|
|
|
|
|
|
|
|
|
downstream.expectNext("element2")
|
|
|
|
|
|
|
|
|
|
downstream2Subscription.request(1)
|
|
|
|
|
downstream2.expectNext("element2")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"support slow subscriber with fan-out 2" in {
|
2015-05-05 12:32:49 +02:00
|
|
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
2014-09-11 10:32:35 +02:00
|
|
|
toFanoutPublisher(initialBufferSize = 2, maximumBufferSize = 2)) {
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
2014-09-11 10:32:35 +02:00
|
|
|
publisher.subscribe(downstream2)
|
|
|
|
|
val downstream2Subscription = downstream2.expectSubscription()
|
|
|
|
|
|
|
|
|
|
downstreamSubscription.request(5)
|
|
|
|
|
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1) // because initialInputBufferSize=1
|
|
|
|
|
upstreamSubscription.sendNext("element1")
|
|
|
|
|
downstream.expectNext("element1")
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.sendNext("element2")
|
|
|
|
|
downstream.expectNext("element2")
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.sendNext("element3")
|
|
|
|
|
// downstream2 has not requested anything, fan-out buffer 2
|
|
|
|
|
downstream.expectNoMsg(100.millis.dilated)
|
|
|
|
|
|
|
|
|
|
downstream2Subscription.request(2)
|
|
|
|
|
downstream.expectNext("element3")
|
|
|
|
|
downstream2.expectNext("element1")
|
|
|
|
|
downstream2.expectNext("element2")
|
|
|
|
|
downstream2.expectNoMsg(100.millis.dilated)
|
|
|
|
|
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.sendNext("element4")
|
|
|
|
|
downstream.expectNext("element4")
|
|
|
|
|
|
|
|
|
|
downstream2Subscription.request(2)
|
|
|
|
|
downstream2.expectNext("element3")
|
|
|
|
|
downstream2.expectNext("element4")
|
|
|
|
|
|
|
|
|
|
upstreamSubscription.sendComplete()
|
|
|
|
|
downstream.expectComplete()
|
|
|
|
|
downstream2.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"support incoming subscriber while elements were requested before" in {
|
2015-05-05 12:32:49 +02:00
|
|
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
2014-09-11 10:32:35 +02:00
|
|
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
|
|
|
|
downstreamSubscription.request(5)
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstreamSubscription.sendNext("a1")
|
|
|
|
|
downstream.expectNext("a1")
|
|
|
|
|
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstreamSubscription.sendNext("a2")
|
|
|
|
|
downstream.expectNext("a2")
|
|
|
|
|
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
|
|
|
|
|
// link now while an upstream element is already requested
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
2014-09-11 10:32:35 +02:00
|
|
|
publisher.subscribe(downstream2)
|
|
|
|
|
val downstream2Subscription = downstream2.expectSubscription()
|
|
|
|
|
|
|
|
|
|
// situation here:
|
|
|
|
|
// downstream 1 now has 3 outstanding
|
|
|
|
|
// downstream 2 has 0 outstanding
|
|
|
|
|
|
|
|
|
|
upstreamSubscription.sendNext("a3")
|
|
|
|
|
downstream.expectNext("a3")
|
|
|
|
|
downstream2.expectNoMsg(100.millis.dilated) // as nothing was requested yet, fanOutBox needs to cache element in this case
|
|
|
|
|
|
|
|
|
|
downstream2Subscription.request(1)
|
|
|
|
|
downstream2.expectNext("a3")
|
|
|
|
|
|
|
|
|
|
// d1 now has 2 outstanding
|
|
|
|
|
// d2 now has 0 outstanding
|
|
|
|
|
// buffer should be empty so we should be requesting one new element
|
|
|
|
|
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1) // because of buffer size 1
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"be unblocked when blocking subscriber cancels subscription" in {
|
2015-05-05 12:32:49 +02:00
|
|
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
2014-09-11 10:32:35 +02:00
|
|
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
2014-09-11 10:32:35 +02:00
|
|
|
publisher.subscribe(downstream2)
|
|
|
|
|
val downstream2Subscription = downstream2.expectSubscription()
|
|
|
|
|
|
|
|
|
|
downstreamSubscription.request(5)
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.sendNext("firstElement")
|
|
|
|
|
downstream.expectNext("firstElement")
|
|
|
|
|
|
|
|
|
|
downstream2Subscription.request(1)
|
|
|
|
|
downstream2.expectNext("firstElement")
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.sendNext("element2")
|
|
|
|
|
|
|
|
|
|
downstream.expectNext("element2")
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.sendNext("element3")
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
|
|
|
|
|
downstream.expectNoMsg(200.millis.dilated)
|
|
|
|
|
downstream2.expectNoMsg(200.millis.dilated)
|
|
|
|
|
upstream.expectNoMsg(200.millis.dilated)
|
|
|
|
|
|
|
|
|
|
// should unblock fanoutbox
|
|
|
|
|
downstream2Subscription.cancel()
|
|
|
|
|
downstream.expectNext("element3")
|
|
|
|
|
upstreamSubscription.sendNext("element4")
|
|
|
|
|
downstream.expectNext("element4")
|
|
|
|
|
|
|
|
|
|
upstreamSubscription.sendComplete()
|
|
|
|
|
downstream.expectComplete()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-03-27 13:13:44 +01:00
|
|
|
"call future subscribers' onError after onSubscribe if initial upstream was completed" in {
|
2015-05-05 12:32:49 +02:00
|
|
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
2014-09-11 10:32:35 +02:00
|
|
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
2014-09-11 10:32:35 +02:00
|
|
|
// don't link it just yet
|
|
|
|
|
|
|
|
|
|
downstreamSubscription.request(5)
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstreamSubscription.sendNext("a1")
|
|
|
|
|
downstream.expectNext("a1")
|
|
|
|
|
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstreamSubscription.sendNext("a2")
|
|
|
|
|
downstream.expectNext("a2")
|
|
|
|
|
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
|
|
|
|
|
// link now while an upstream element is already requested
|
|
|
|
|
publisher.subscribe(downstream2)
|
|
|
|
|
val downstream2Subscription = downstream2.expectSubscription()
|
|
|
|
|
|
|
|
|
|
upstreamSubscription.sendNext("a3")
|
|
|
|
|
upstreamSubscription.sendComplete()
|
|
|
|
|
downstream.expectNext("a3")
|
|
|
|
|
downstream.expectComplete()
|
|
|
|
|
|
|
|
|
|
downstream2.expectNoMsg(100.millis.dilated) // as nothing was requested yet, fanOutBox needs to cache element in this case
|
|
|
|
|
|
|
|
|
|
downstream2Subscription.request(1)
|
|
|
|
|
downstream2.expectNext("a3")
|
|
|
|
|
downstream2.expectComplete()
|
|
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream3 = TestSubscriber.manualProbe[Any]()
|
2015-03-27 13:13:44 +01:00
|
|
|
publisher.subscribe(downstream3)
|
|
|
|
|
downstream3.expectSubscription()
|
|
|
|
|
downstream3.expectError() should ===(ActorPublisher.NormalShutdownReason)
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"call future subscribers' onError should be called instead of onSubscribed after initial upstream reported an error" in {
|
2015-05-05 12:32:49 +02:00
|
|
|
new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
2014-09-11 10:32:35 +02:00
|
|
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
|
|
|
|
|
downstreamSubscription.request(1)
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
upstreamSubscription.sendNext(5)
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
upstreamSubscription.expectCancellation()
|
|
|
|
|
downstream.expectError(TestException)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream2 = TestSubscriber.manualProbe[String]()
|
2014-09-11 10:32:35 +02:00
|
|
|
publisher.subscribe(downstream2)
|
2015-03-03 10:57:25 +01:00
|
|
|
downstream2.expectSubscriptionAndError() should be(TestException)
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"call future subscribers' onError when all subscriptions were cancelled" in {
|
2015-05-05 12:32:49 +02:00
|
|
|
new ChainSetup(identity, settings.withInputBuffer(initialSize = 1, maxSize = 1),
|
2014-09-11 10:32:35 +02:00
|
|
|
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) {
|
|
|
|
|
upstreamSubscription.expectRequest(1)
|
|
|
|
|
downstreamSubscription.cancel()
|
|
|
|
|
upstreamSubscription.expectCancellation()
|
|
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
2014-09-11 10:32:35 +02:00
|
|
|
publisher.subscribe(downstream2)
|
|
|
|
|
// IllegalStateException shut down
|
2015-03-03 10:57:25 +01:00
|
|
|
downstream2.expectSubscriptionAndError().isInstanceOf[IllegalStateException] should be(true)
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
"A broken Flow" must {
|
|
|
|
|
"cancel upstream and call onError on current and future downstream subscribers if an internal error occurs" in {
|
2015-05-05 12:32:49 +02:00
|
|
|
new ChainSetup(faultyFlow, settings.withInputBuffer(initialSize = 1, maxSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 16)) {
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = {
|
2014-09-11 10:32:35 +02:00
|
|
|
val error = sprobe.expectError()
|
2015-05-12 15:44:18 +02:00
|
|
|
error.isInstanceOf[AbruptTerminationException] should be(true)
|
|
|
|
|
error.getMessage should startWith("Processor actor")
|
2014-09-11 10:32:35 +02:00
|
|
|
}
|
|
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream2 = TestSubscriber.manualProbe[Any]()
|
2014-09-11 10:32:35 +02:00
|
|
|
publisher.subscribe(downstream2)
|
|
|
|
|
val downstream2Subscription = downstream2.expectSubscription()
|
|
|
|
|
|
|
|
|
|
downstreamSubscription.request(5)
|
|
|
|
|
downstream2Subscription.request(5)
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstreamSubscription.sendNext("a1")
|
|
|
|
|
downstream.expectNext("a1")
|
|
|
|
|
downstream2.expectNext("a1")
|
|
|
|
|
|
|
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstreamSubscription.sendNext("a2")
|
|
|
|
|
downstream.expectNext("a2")
|
|
|
|
|
downstream2.expectNext("a2")
|
|
|
|
|
|
2015-01-28 14:19:50 +01:00
|
|
|
val filters = immutable.Seq(
|
|
|
|
|
EventFilter[NullPointerException](),
|
|
|
|
|
EventFilter[IllegalStateException](),
|
|
|
|
|
EventFilter[PostRestartException]()) // This is thrown because we attach the dummy failing actor to toplevel
|
2014-09-11 10:32:35 +02:00
|
|
|
try {
|
|
|
|
|
system.eventStream.publish(Mute(filters))
|
|
|
|
|
|
2014-10-06 14:04:05 +02:00
|
|
|
upstream.expectRequest(upstreamSubscription, 1)
|
|
|
|
|
upstreamSubscription.sendNext("a3")
|
|
|
|
|
upstreamSubscription.expectCancellation()
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2014-10-06 14:04:05 +02:00
|
|
|
// IllegalStateException terminated abruptly
|
|
|
|
|
checkError(downstream)
|
|
|
|
|
checkError(downstream2)
|
2014-09-11 10:32:35 +02:00
|
|
|
|
2015-04-24 11:45:03 +03:00
|
|
|
val downstream3 = TestSubscriber.manualProbe[Any]()
|
2014-10-06 14:04:05 +02:00
|
|
|
publisher.subscribe(downstream3)
|
2015-03-03 10:57:25 +01:00
|
|
|
downstream3.expectSubscription()
|
2014-10-06 14:04:05 +02:00
|
|
|
// IllegalStateException terminated abruptly
|
|
|
|
|
checkError(downstream3)
|
2014-09-11 10:32:35 +02:00
|
|
|
} finally {
|
|
|
|
|
system.eventStream.publish(UnMute(filters))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
object TestException extends RuntimeException with NoStackTrace
|
2014-09-11 10:32:35 +02:00
|
|
|
|
|
|
|
|
}
|