diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala index 90ea63d3ce..a9196f063f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala @@ -43,9 +43,9 @@ private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorCon private[akka] object ActorConsumer { import Ast._ - def props(gen: MaterializerSettings, op: AstNode) = op match { - case t: Transform ⇒ Props(new TransformActorConsumer(gen, t)) - case r: Recover ⇒ Props(new RecoverActorConsumer(gen, r)) + def props(settings: MaterializerSettings, op: AstNode) = op match { + case t: Transform ⇒ Props(new TransformActorConsumer(settings, t)) + case r: Recover ⇒ Props(new RecoverActorConsumer(settings, r)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 9c40e2eb28..f33047e4f9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -58,10 +58,15 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin def waitingExposedPublisher: Receive = { case ExposedPublisher(publisher) ⇒ exposedPublisher = publisher + publisherExposed() context.become(waitingForUpstream) case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") } + // WARNING: DO NOT SEND messages from the constructor (that includes subscribing to other streams) since their reply + // might arrive earlier than ExposedPublisher. Override this method to schedule such events. + protected def publisherExposed(): Unit = () + def waitingForUpstream: Receive = downstreamManagement orElse { case OnComplete ⇒ // Instead of introducing an edge case, handle it in the general way diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 8be2106302..7af5d00690 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -131,7 +131,8 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett var secondaryInputs: Inputs = _ - other.getPublisher.subscribe(new OtherActorSubscriber(self)) + override def publisherExposed(): Unit = + other.getPublisher.subscribe(new OtherActorSubscriber(self)) override def waitingForUpstream: Receive = super.waitingForUpstream orElse { case OtherStreamOnComplete ⇒ diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala index d86020755f..61e518d9e4 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala @@ -7,22 +7,20 @@ import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import akka.stream.scaladsl.Flow +import org.reactivestreams.api.Producer -class FlowConcatSpec extends AkkaSpec { +class FlowConcatSpec extends TwoStreamsSetup { - val gen = new ActorBasedFlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2), system) + type Outputs = Int + override def operationUnderTest(in1: Flow[Int], in2: Producer[Int]) = in1.concat(in2) "Concat" must { "work in the happy case" in { - val source0 = Flow(List.empty[Int].iterator).toProducer(gen) - val source1 = Flow((1 to 4).iterator).toProducer(gen) - val source2 = Flow((5 to 10).iterator).toProducer(gen) - val p = Flow(source0).concat(source1).concat(source2).toProducer(gen) + val source0 = Flow(List.empty[Int].iterator).toProducer(materializer) + val source1 = Flow((1 to 4).iterator).toProducer(materializer) + val source2 = Flow((5 to 10).iterator).toProducer(materializer) + val p = Flow(source0).concat(source1).concat(source2).toProducer(materializer) val probe = StreamTestKit.consumerProbe[Int] p.produceTo(probe) @@ -36,5 +34,69 @@ class FlowConcatSpec extends AkkaSpec { probe.expectComplete() } + commonTests() + + "work with one immediately completed and one nonempty producer" in { + val consumer1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscription1 = consumer1.expectSubscription() + subscription1.requestMore(5) + consumer1.expectNext(1) + consumer1.expectNext(2) + consumer1.expectNext(3) + consumer1.expectNext(4) + consumer1.expectComplete() + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(5) + consumer2.expectNext(1) + consumer2.expectNext(2) + consumer2.expectNext(3) + consumer2.expectNext(4) + consumer2.expectComplete() + } + + "work with one delayed completed and one nonempty producer" in { + val consumer1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator)) + val subscription1 = consumer1.expectSubscription() + subscription1.requestMore(5) + consumer1.expectNext(1) + consumer1.expectNext(2) + consumer1.expectNext(3) + consumer1.expectNext(4) + consumer1.expectComplete() + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(5) + consumer2.expectNext(1) + consumer2.expectNext(2) + consumer2.expectNext(3) + consumer2.expectNext(4) + consumer2.expectComplete() + } + + "work with one immediately failed and one nonempty producer" in { + val consumer1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator)) + consumer1.expectError(TestException) + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(5) + consumer2.expectError(TestException) + } + + "work with one delayed failed and one nonempty producer" in { + val consumer1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscription1 = consumer1.expectSubscription() + subscription1.requestMore(5) + consumer1.expectError(TestException) + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(5) + consumer2.expectError(TestException) + } + } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala index 0af23bd7ad..eb0b54957a 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamDropSpec extends AkkaSpec with ScriptedTest { - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -21,7 +21,7 @@ class StreamDropSpec extends AkkaSpec with ScriptedTest { def script(d: Int) = Script((1 to 50) map { n ⇒ Seq(n) -> (if (n <= d) Nil else Seq(n)) }: _*) (1 to 50) foreach { _ ⇒ val d = Math.min(Math.max(random.nextInt(-10, 60), 0), 50) - runScript(script(d), genSettings)(_.drop(d)) + runScript(script(d), settings)(_.drop(d)) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala index a8d84ee7e8..4f8feabd7e 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamFilterSpec extends AkkaSpec with ScriptedTest { - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -19,7 +19,7 @@ class StreamFilterSpec extends AkkaSpec with ScriptedTest { "filter" in { def script = Script((1 to 50) map { _ ⇒ val x = random.nextInt(); Seq(x) -> (if ((x & 1) == 0) Seq(x) else Seq()) }: _*) - (1 to 50) foreach (_ ⇒ runScript(script, genSettings)(_.filter(_ % 2 == 0))) + (1 to 50) foreach (_ ⇒ runScript(script, settings)(_.filter(_ % 2 == 0))) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala index 5a0ad97820..abc77de2a8 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamFoldSpec extends AkkaSpec with ScriptedTest { - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -19,7 +19,7 @@ class StreamFoldSpec extends AkkaSpec with ScriptedTest { "fold" in { def script = Script((1 to 50).toSeq -> Seq(25 * 51)) - (1 to 50) foreach (_ ⇒ runScript(script, genSettings)(_.fold(0)(_ + _))) + (1 to 50) foreach (_ ⇒ runScript(script, settings)(_.fold(0)(_ + _))) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala b/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala index ecd8a855ee..2ec67662bd 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowForeachSpec extends AkkaSpec with ScriptedTest { - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -24,7 +24,7 @@ class FlowForeachSpec extends AkkaSpec with ScriptedTest { Script((1 to 50).toSeq -> Seq(())) } (1 to 50) foreach { _ ⇒ - runScript(script, genSettings)(_.foreach(x ⇒ count += x)) + runScript(script, settings)(_.foreach(x ⇒ count += x)) count should be(25 * 51) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index e9cbde1933..954580aeda 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -9,11 +9,12 @@ import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow +import scala.util.control.NoStackTrace @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamGroupBySpec extends AkkaSpec { +class FlowGroupBySpec extends AkkaSpec { - val gen = new ActorBasedFlowMaterializer(MaterializerSettings( + val materializer = new ActorBasedFlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, @@ -28,12 +29,13 @@ class StreamGroupBySpec extends AkkaSpec { def expectNext(elem: Int): Unit = probe.expectNext(elem) def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) def expectComplete(): Unit = probe.expectComplete() + def expectError(e: Throwable) = probe.expectError(e) def cancel(): Unit = subscription.cancel() } class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { - val source = Flow((1 to elementCount).iterator).toProducer(gen) - val groupStream = Flow(source).groupBy(_ % groupCount).toProducer(gen) + val source = Flow((1 to elementCount).iterator).toProducer(materializer) + val groupStream = Flow(source).groupBy(_ % groupCount).toProducer(materializer) val masterConsumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] groupStream.produceTo(masterConsumer) @@ -52,6 +54,8 @@ class StreamGroupBySpec extends AkkaSpec { } + case class TE(message: String) extends RuntimeException(message) with NoStackTrace + "groupBy" must { "work in the happy case" in new SubstreamsSupport(groupCount = 2) { val s1 = StreamPuppet(getSubproducer(1)) @@ -105,13 +109,21 @@ class StreamGroupBySpec extends AkkaSpec { } - "accept cancellation of master stream when not consumed anything" in new SubstreamsSupport(groupCount = 2) { - masterSubscription.cancel() - masterConsumer.expectNoMsg(100.millis) + "accept cancellation of master stream when not consumed anything" in { + val producerProbe = StreamTestKit.producerProbe[Int] + val producer = Flow(producerProbe).groupBy(_ % 2).toProducer(materializer) + val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] + producer.produceTo(consumer) + + val upstreamSubscription = producerProbe.expectSubscription() + val downstreamSubscription = consumer.expectSubscription() + downstreamSubscription.cancel() + upstreamSubscription.expectCancellation() } "accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) { pending + // FIXME: Needs handling of loose substreams that no one refers to anymore. // val substream = StreamPuppet(getSubproducer(1)) // // substream.requestMore(1) @@ -148,15 +160,100 @@ class StreamGroupBySpec extends AkkaSpec { } "work with fanout on master stream" in { - pending + val source = Flow((1 to 4).iterator).toProducer(materializer) + val groupStream = Flow(source).groupBy(_ % 2).toProducer(materializer) + val masterConsumer1 = StreamTestKit.consumerProbe[(Int, Producer[Int])] + val masterConsumer2 = StreamTestKit.consumerProbe[(Int, Producer[Int])] + + groupStream.produceTo(masterConsumer1) + groupStream.produceTo(masterConsumer2) + + val masterSubscription1 = masterConsumer1.expectSubscription() + val masterSubscription2 = masterConsumer2.expectSubscription() + + masterSubscription1.requestMore(2) + masterSubscription2.requestMore(1) + + val (key11, substream11) = masterConsumer1.expectNext() + key11 should be(1) + val (key21, substream21) = masterConsumer2.expectNext() + key21 should be(1) + + val puppet11 = StreamPuppet(substream11) + val puppet21 = StreamPuppet(substream21) + + puppet11.requestMore(2) + puppet11.expectNext(1) + puppet11.expectNext(3) + + puppet21.requestMore(1) + puppet21.expectNext(1) + puppet21.cancel() + + masterSubscription2.cancel() + + val (key12, substream12) = masterConsumer1.expectNext() + key12 should be(0) + + val puppet12 = StreamPuppet(substream12) + puppet12.requestMore(1) + puppet12.expectNext(2) + puppet12.cancel() + masterSubscription1.cancel() } - "work with fanout on substreams and master stream" in { - pending + "work with empty input stream" in { + val producer = Flow(List.empty[Int]).groupBy(_ % 2).toProducer(materializer) + val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] + producer.produceTo(consumer) + + val subscription = consumer.expectSubscription() + subscription.requestMore(100) + consumer.expectComplete() } "abort on onError from upstream" in { - pending + val producerProbe = StreamTestKit.producerProbe[Int] + val producer = Flow(producerProbe).groupBy(_ % 2).toProducer(materializer) + val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] + producer.produceTo(consumer) + + val upstreamSubscription = producerProbe.expectSubscription() + + val downstreamSubscription = consumer.expectSubscription() + downstreamSubscription.requestMore(100) + + val e = TE("test") + upstreamSubscription.sendError(e) + + consumer.expectError(e) + } + + "abort on onError from upstream when substreams are running" in { + val producerProbe = StreamTestKit.producerProbe[Int] + val producer = Flow(producerProbe).groupBy(_ % 2).toProducer(materializer) + val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] + producer.produceTo(consumer) + + val upstreamSubscription = producerProbe.expectSubscription() + + val downstreamSubscription = consumer.expectSubscription() + downstreamSubscription.requestMore(100) + + upstreamSubscription.sendNext(1) + + val (_, substream) = consumer.expectNext() + val substreamPuppet = StreamPuppet(substream) + + substreamPuppet.requestMore(1) + substreamPuppet.expectNext(1) + + val e = TE("test") + upstreamSubscription.sendError(e) + + substreamPuppet.expectError(e) + consumer.expectError(e) + } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala index 052eee5dd1..3f87d63188 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamGroupedSpec extends AkkaSpec with ScriptedTest { - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -20,13 +20,13 @@ class StreamGroupedSpec extends AkkaSpec with ScriptedTest { "group evenly" in { def script = Script((1 to 20) map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*) - (1 to 30) foreach (_ ⇒ runScript(script, genSettings)(_.grouped(3))) + (1 to 30) foreach (_ ⇒ runScript(script, settings)(_.grouped(3))) } "group with rest" in { def script = Script(((1 to 20).map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) } :+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*) - (1 to 30) foreach (_ ⇒ runScript(script, genSettings)(_.grouped(3))) + (1 to 30) foreach (_ ⇒ runScript(script, settings)(_.grouped(3))) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala index 3f38b9d329..4b0955ee1c 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala @@ -16,12 +16,12 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowIterableSpec extends AkkaSpec { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512)) "A Flow based on an iterable" must { "produce elements" in { - val p = Flow(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -35,7 +35,7 @@ class FlowIterableSpec extends AkkaSpec { } "complete empty" in { - val p = Flow(List.empty[Int]).toProducer(gen) + val p = Flow(List.empty[Int]).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) c.expectComplete() @@ -47,7 +47,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements with multiple subscribers" in { - val p = Flow(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(materializer) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -71,7 +71,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Flow(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(materializer) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -97,7 +97,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Flow(List(1, 2, 3)).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3)).map(_ * 2).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -109,7 +109,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Flow(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -121,7 +121,7 @@ class FlowIterableSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 - val p = Flow(1 to count).toProducer(gen) + val p = Flow(1 to count).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala index 2132e2f05a..74d7d48505 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala @@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowIteratorSpec extends AkkaSpec { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 4, @@ -22,7 +22,7 @@ class FlowIteratorSpec extends AkkaSpec { "A Flow based on an iterator" must { "produce elements" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -36,7 +36,7 @@ class FlowIteratorSpec extends AkkaSpec { } "complete empty" in { - val p = Flow(List.empty[Int].iterator).toProducer(gen) + val p = Flow(List.empty[Int].iterator).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) c.expectComplete() @@ -48,7 +48,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements with multiple subscribers" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -72,7 +72,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -95,7 +95,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Flow(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).map(_ * 2).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -107,7 +107,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Flow(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -119,7 +119,7 @@ class FlowIteratorSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 - val p = Flow((1 to count).iterator).toProducer(gen) + val p = Flow((1 to count).iterator).toProducer(materializer) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala index 2b00514900..90736b47e1 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala @@ -8,7 +8,7 @@ import akka.stream.testkit.ScriptedTest class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -24,7 +24,7 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { Seq(3) -> Seq(3, 3, 3), Seq(2) -> Seq(2, 2), Seq(1) -> Seq(1)) - (1 to 100) foreach (_ ⇒ runScript(script, genSettings)(_.mapConcat(x ⇒ (1 to x) map (_ ⇒ x)))) + (1 to 100) foreach (_ ⇒ runScript(script, settings)(_.mapConcat(x ⇒ (1 to x) map (_ ⇒ x)))) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala index 3a1cc1fda2..94523f393a 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamMapSpec extends AkkaSpec with ScriptedTest { - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -19,7 +19,7 @@ class StreamMapSpec extends AkkaSpec with ScriptedTest { "map" in { def script = Script((1 to 50) map { _ ⇒ val x = random.nextInt(); Seq(x) -> Seq(x.toString) }: _*) - (1 to 50) foreach (_ ⇒ runScript(script, genSettings)(_.map(_.toString))) + (1 to 50) foreach (_ ⇒ runScript(script, settings)(_.map(_.toString))) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala index e6731dd999..b03a2db285 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala @@ -10,22 +10,19 @@ import org.reactivestreams.api.Producer import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow -class FlowMergeSpec extends AkkaSpec { +class FlowMergeSpec extends TwoStreamsSetup { - val gen = new ActorBasedFlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2), system) + type Outputs = Int + override def operationUnderTest(in1: Flow[Int], in2: Producer[Int]) = in1.merge(in2) "merge" must { "work in the happy case" in { // Different input sizes (4 and 6) - val source1 = Flow((1 to 4).iterator).toProducer(gen) - val source2 = Flow((5 to 10).iterator).toProducer(gen) - val source3 = Flow(List.empty[Int].iterator).toProducer(gen) - val p = Flow(source1).merge(source2).merge(source3).toProducer(gen) + val source1 = Flow((1 to 4).iterator).toProducer(materializer) + val source2 = Flow((5 to 10).iterator).toProducer(materializer) + val source3 = Flow(List.empty[Int].iterator).toProducer(materializer) + val p = Flow(source1).merge(source2).merge(source3).toProducer(materializer) val probe = StreamTestKit.consumerProbe[Int] p.produceTo(probe) @@ -41,6 +38,58 @@ class FlowMergeSpec extends AkkaSpec { probe.expectComplete() } + commonTests() + + "work with one immediately completed and one nonempty producer" in { + val consumer1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscription1 = consumer1.expectSubscription() + subscription1.requestMore(4) + consumer1.expectNext(1) + consumer1.expectNext(2) + consumer1.expectNext(3) + consumer1.expectNext(4) + consumer1.expectComplete() + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(4) + consumer2.expectNext(1) + consumer2.expectNext(2) + consumer2.expectNext(3) + consumer2.expectNext(4) + consumer2.expectComplete() + } + + "work with one delayed completed and one nonempty producer" in { + val consumer1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator)) + val subscription1 = consumer1.expectSubscription() + subscription1.requestMore(4) + consumer1.expectNext(1) + consumer1.expectNext(2) + consumer1.expectNext(3) + consumer1.expectNext(4) + consumer1.expectComplete() + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(4) + consumer2.expectNext(1) + consumer2.expectNext(2) + consumer2.expectNext(3) + consumer2.expectNext(4) + consumer2.expectComplete() + } + + "work with one immediately failed and one nonempty producer" in { + // This is nondeterministic, multiple scenarios can happen + pending + } + + "work with one delayed failed and one nonempty producer" in { + // This is nondeterministic, multiple scenarios can happen + pending + } + } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala index 121d4a3d1f..59502cad95 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala @@ -18,7 +18,7 @@ import scala.util.Success @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -29,7 +29,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "invoke callback on normal completion" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.producerProbe[Int] - Flow(p).onComplete(gen) { onCompleteProbe.ref ! _ } + Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ } val proc = p.expectSubscription proc.expectRequestMore() proc.sendNext(42) @@ -41,7 +41,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "yield the first error" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.producerProbe[Int] - Flow(p).onComplete(gen) { onCompleteProbe.ref ! _ } + Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ } val proc = p.expectSubscription proc.expectRequestMore() val ex = new RuntimeException("ex") @@ -53,7 +53,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { "invoke callback for an empty stream" in { val onCompleteProbe = TestProbe() val p = StreamTestKit.producerProbe[Int] - Flow(p).onComplete(gen) { onCompleteProbe.ref ! _ } + Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ } val proc = p.expectSubscription proc.expectRequestMore() proc.sendComplete() @@ -69,7 +69,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { x }.foreach { x ⇒ onCompleteProbe.ref ! ("foreach-" + x) - }.onComplete(gen) { onCompleteProbe.ref ! _ } + }.onComplete(materializer) { onCompleteProbe.ref ! _ } val proc = p.expectSubscription proc.expectRequestMore() proc.sendNext(42) diff --git a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala index cd42c961d6..df0e97fac4 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -16,7 +16,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece import system.dispatcher - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -29,14 +29,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) { s"requests initial elements from upstream ($name, $n)" in { - new ChainSetup(op, genSettings.copy(initialInputBufferSize = n)) { + new ChainSetup(op, settings.copy(initialInputBufferSize = n)) { upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize) } } } "requests more elements from upstream when downstream requests more elements" in { - new ChainSetup(identity, genSettings) { + new ChainSetup(identity, settings) { upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize) downstreamSubscription.requestMore(1) upstream.expectNoMsg(100.millis) @@ -55,7 +55,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "deliver events when publisher sends elements and then completes" in { - new ChainSetup(identity, genSettings) { + new ChainSetup(identity, settings) { downstreamSubscription.requestMore(1) upstreamSubscription.sendNext("test") upstreamSubscription.sendComplete() @@ -65,14 +65,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "deliver complete signal when publisher immediately completes" in { - new ChainSetup(identity, genSettings) { + new ChainSetup(identity, settings) { upstreamSubscription.sendComplete() downstream.expectComplete() } } "deliver error signal when publisher immediately fails" in { - new ChainSetup(identity, genSettings) { + new ChainSetup(identity, settings) { object WeirdError extends RuntimeException("weird test exception") EventFilter[WeirdError.type](occurrences = 1) intercept { upstreamSubscription.sendError(WeirdError) @@ -82,7 +82,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "single subscriber cancels subscription while receiving data" in { - new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1)) { + new ChainSetup(identity, settings.copy(initialInputBufferSize = 1)) { downstreamSubscription.requestMore(5) upstreamSubscription.expectRequestMore(1) upstreamSubscription.sendNext("test") @@ -102,7 +102,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "A Flow with multiple subscribers (FanOutBox)" must { "adapt speed to the currently slowest consumer" in { - new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { val downstream2 = StreamTestKit.consumerProbe[Any]() producer.produceTo(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -128,7 +128,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "support slow consumer with fan-out 2" in { - new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) { + new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) { val downstream2 = StreamTestKit.consumerProbe[Any]() producer.produceTo(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -167,7 +167,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "incoming subscriber while elements were requested before" in { - new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { downstreamSubscription.requestMore(5) upstream.expectRequestMore(upstreamSubscription, 1) upstreamSubscription.sendNext("a1") @@ -204,7 +204,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "blocking subscriber cancels subscription" in { - new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { val downstream2 = StreamTestKit.consumerProbe[Any]() producer.produceTo(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -239,7 +239,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "after initial upstream was completed future subscribers' onComplete should be called instead of onSubscribed" in { - new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { val downstream2 = StreamTestKit.consumerProbe[Any]() // don't link it just yet @@ -278,7 +278,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "after initial upstream reported an error future subscribers' onError should be called instead of onSubscribed" in { - new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { downstreamSubscription.requestMore(1) upstreamSubscription.expectRequestMore(1) @@ -296,7 +296,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "when all subscriptions were cancelled future subscribers' onError should be called" in { - new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1)) { + new ChainSetup(identity, settings.copy(initialInputBufferSize = 1)) { upstreamSubscription.expectRequestMore(1) downstreamSubscription.cancel() upstreamSubscription.expectCancellation() diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index 6d88657a05..9f74cf682d 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -11,9 +11,9 @@ import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamSplitWhenSpec extends AkkaSpec { +class FlowSplitWhenSpec extends AkkaSpec { - val gen = new ActorBasedFlowMaterializer(MaterializerSettings( + val materializer = new ActorBasedFlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, @@ -32,8 +32,8 @@ class StreamSplitWhenSpec extends AkkaSpec { } class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { - val source = Flow((1 to elementCount).iterator).toProducer(gen) - val groupStream = Flow(source).splitWhen(_ == splitWhen).toProducer(gen) + val source = Flow((1 to elementCount).iterator).toProducer(materializer) + val groupStream = Flow(source).splitWhen(_ == splitWhen).toProducer(materializer) val masterConsumer = StreamTestKit.consumerProbe[Producer[Int]] groupStream.produceTo(masterConsumer) diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala index 6a93254ef5..eb71a6ea6c 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala @@ -12,7 +12,7 @@ import akka.stream.impl.RequestMore class StreamTakeSpec extends AkkaSpec with ScriptedTest { - val genSettings = MaterializerSettings( + val settings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -26,7 +26,7 @@ class StreamTakeSpec extends AkkaSpec with ScriptedTest { def script(d: Int) = Script((1 to 50) map { n ⇒ Seq(n) -> (if (n > d) Nil else Seq(n)) }: _*) (1 to 50) foreach { _ ⇒ val d = Math.min(Math.max(random.nextInt(-10, 60), 0), 50) - runScript(script(d), genSettings)(_.take(d)) + runScript(script(d), settings)(_.take(d)) } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala index 9ecd802570..ea8f281561 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala @@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow class FlowToFutureSpec extends AkkaSpec with ScriptedTest { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, @@ -24,7 +24,7 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest { "yield the first value" in { val p = StreamTestKit.producerProbe[Int] - val f = Flow(p).toFuture(gen) + val f = Flow(p).toFuture(materializer) val proc = p.expectSubscription proc.expectRequestMore() proc.sendNext(42) @@ -34,7 +34,7 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest { "yield the first error" in { val p = StreamTestKit.producerProbe[Int] - val f = Flow(p).toFuture(gen) + val f = Flow(p).toFuture(materializer) val proc = p.expectSubscription proc.expectRequestMore() val ex = new RuntimeException("ex") @@ -45,7 +45,7 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest { "yield NoSuchElementExcption for empty stream" in { val p = StreamTestKit.producerProbe[Int] - val f = Flow(p).toFuture(gen) + val f = Flow(p).toFuture(materializer) val proc = p.expectSubscription proc.expectRequestMore() proc.sendComplete() diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index 0490274732..b7bb62194e 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTransformRecoverSpec extends AkkaSpec { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, @@ -22,10 +22,10 @@ class FlowTransformRecoverSpec extends AkkaSpec { "A Flow with transformRecover operations" must { "produce one-to-one transformation as expected" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, List(tot + elem.get))). - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -39,10 +39,10 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce one-to-several transformation as expected" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, Vector.fill(elem.get)(tot + elem.get))). - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -59,10 +59,10 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce dropping transformation as expected" in { - val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3, 4).iterator).toProducer(materializer) val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, if (elem.get % 2 == 0) Nil else List(tot + elem.get))). - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -76,14 +76,14 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce multi-step transformation as expected" in { - val p = Flow(List("a", "bc", "def").iterator).toProducer(gen) + val p = Flow(List("a", "bc", "def").iterator).toProducer(materializer) val p2 = Flow(p). transformRecover("") { (str, elem) ⇒ val concat = str + elem (concat, List(concat.length)) }. transformRecover(0)((tot, length) ⇒ (tot + length.get, List(tot + length.get))). - toProducer(gen) + toProducer(materializer) val c1 = StreamTestKit.consumerProbe[Int] p2.produceTo(c1) val sub1 = c1.expectSubscription() @@ -106,8 +106,8 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "invoke onComplete when done" in { - val p = Flow(List("a").iterator).toProducer(gen) - val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) + val p = Flow(List("a").iterator).toProducer(materializer) + val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(materializer) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() @@ -118,7 +118,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "allow cancellation using isComplete" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(gen) + val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(materializer) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -137,7 +137,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { (s, in) ⇒ (s + in, List(in.get)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "Success(1)") - .toProducer(gen) + .toProducer(materializer) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -152,12 +152,12 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "report error when exception is thrown" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transformRecover(0) { (_, elem) ⇒ if (elem.get == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem.get, elem.get)) }. - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -177,7 +177,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { val p2 = Flow(p).transformRecover("")( { case (s, Failure(ex)) ⇒ (s + ex.getMessage, List(ex)) }, onComplete = x ⇒ List(TE(x.size + "10"))) - .toProducer(gen) + .toProducer(materializer) val proc = p.expectSubscription() val c = StreamTestKit.consumerProbe[Throwable] p2.produceTo(c) @@ -191,7 +191,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "forward errors when received and thrown" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Flow(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(gen) + val p2 = Flow(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(materializer) val proc = p.expectSubscription() val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -204,10 +204,10 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "support cancel as expected" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transformRecover(0) { (_, elem) ⇒ (0, List(elem.get, elem.get)) }. - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index 859e357a4e..44541ad5c7 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -16,7 +16,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d import system.dispatcher - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, @@ -24,10 +24,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, List(tot + elem))). - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -41,10 +41,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce one-to-several transformation as expected" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, Vector.fill(elem)(tot + elem))). - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -61,10 +61,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce dropping transformation as expected" in { - val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3, 4).iterator).toProducer(materializer) val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))). - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -78,14 +78,14 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "produce multi-step transformation as expected" in { - val p = Flow(List("a", "bc", "def").iterator).toProducer(gen) + val p = Flow(List("a", "bc", "def").iterator).toProducer(materializer) val p2 = Flow(p). transform("") { (str, elem) ⇒ val concat = str + elem (concat, List(concat.length)) }. transform(0)((tot, length) ⇒ (tot + length, List(tot + length))). - toProducer(gen) + toProducer(materializer) val c1 = StreamTestKit.consumerProbe[Int] p2.produceTo(c1) val sub1 = c1.expectSubscription() @@ -108,8 +108,8 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "invoke onComplete when done" in { - val p = Flow(List("a").iterator).toProducer(gen) - val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) + val p = Flow(List("a").iterator).toProducer(materializer) + val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(materializer) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() @@ -120,9 +120,9 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done" in { val cleanupProbe = TestProbe() - val p = Flow(List("a").iterator).toProducer(gen) + val p = Flow(List("a").iterator).toProducer(materializer) val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B"), - cleanup = s ⇒ cleanupProbe.ref ! s).toProducer(gen) + cleanup = s ⇒ cleanupProbe.ref ! s).toProducer(materializer) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() @@ -134,10 +134,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "invoke cleanup when done after error" in { val cleanupProbe = TestProbe() - val p = Flow(List("a", "b", "c").iterator).toProducer(gen) + val p = Flow(List("a", "b", "c").iterator).toProducer(materializer) val p2 = Flow(p).transform("")( f = (s, in) ⇒ if (in == "b") throw new IllegalArgumentException("Not b") else (s + in.toUpperCase, List(s + in)), - cleanup = s ⇒ cleanupProbe.ref ! s).toProducer(gen) + cleanup = s ⇒ cleanupProbe.ref ! s).toProducer(materializer) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() @@ -150,7 +150,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d "allow cancellation using isComplete" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), isComplete = _ == "1").toProducer(gen) + val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), isComplete = _ == "1").toProducer(materializer) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -167,7 +167,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d val cleanupProbe = TestProbe() val p = StreamTestKit.producerProbe[Int] val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), onComplete = x ⇒ List(x.size + 10), - isComplete = _ == "1", cleanup = s ⇒ cleanupProbe.ref ! s).toProducer(gen) + isComplete = _ == "1", cleanup = s ⇒ cleanupProbe.ref ! s).toProducer(materializer) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -183,12 +183,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "report error when exception is thrown" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transform(0) { (_, elem) ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem)) }. - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -202,10 +202,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "support cancel as expected" in { - val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). transform(0) { (_, elem) ⇒ (0, List(elem, elem)) }. - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() @@ -219,9 +219,9 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d } "support producing elements from empty inputs" in { - val p = Flow(List.empty[Int].iterator).toProducer(gen) + val p = Flow(List.empty[Int].iterator).toProducer(materializer) val p2 = Flow(p).transform(List(1, 2, 3))((s, _) ⇒ (s, Nil), onComplete = s ⇒ s). - toProducer(gen) + toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) val subscription = consumer.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala index c41c4e7865..75e974634b 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala @@ -7,22 +7,20 @@ import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import akka.stream.scaladsl.Flow +import org.reactivestreams.api.Producer -class FlowZipSpec extends AkkaSpec { +class FlowZipSpec extends TwoStreamsSetup { - val gen = new ActorBasedFlowMaterializer(MaterializerSettings( - initialInputBufferSize = 2, - maximumInputBufferSize = 2, - initialFanOutBufferSize = 2, - maxFanOutBufferSize = 2), system) + type Outputs = (Int, Int) + override def operationUnderTest(in1: Flow[Int], in2: Producer[Int]) = in1.zip(in2) "Zip" must { "work in the happy case" in { // Different input sizes (4 and 6) - val source1 = Flow((1 to 4).iterator).toProducer(gen) - val source2 = Flow(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen) - val p = Flow(source1).zip(source2).toProducer(gen) + val source1 = Flow((1 to 4).iterator).toProducer(materializer) + val source2 = Flow(List("A", "B", "C", "D", "E", "F").iterator).toProducer(materializer) + val p = Flow(source1).zip(source2).toProducer(materializer) val probe = StreamTestKit.consumerProbe[(Int, String)] p.produceTo(probe) @@ -40,6 +38,54 @@ class FlowZipSpec extends AkkaSpec { probe.expectComplete() } + commonTests() + + "work with one immediately completed and one nonempty producer" in { + val consumer1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscription1 = consumer1.expectSubscription() + subscription1.requestMore(4) + consumer1.expectComplete() + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(4) + consumer2.expectComplete() + } + + "work with one delayed completed and one nonempty producer" in { + val consumer1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator)) + val subscription1 = consumer1.expectSubscription() + subscription1.requestMore(4) + consumer1.expectComplete() + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(4) + consumer2.expectComplete() + } + + "work with one immediately failed and one nonempty producer" in { + val consumer1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator)) + consumer1.expectError(TestException) + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(4) + consumer2.expectError(TestException) + } + + "work with one delayed failed and one nonempty producer" in { + val consumer1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscription1 = consumer1.expectSubscription() + subscription1.requestMore(4) + consumer1.expectError(TestException) + + val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher) + val subscription2 = consumer2.expectSubscription() + subscription2.requestMore(4) + consumer2.expectError(TestException) + } + } } diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index 59932587fb..d70d14e8c9 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -38,10 +38,10 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With } def createHelperPublisher(elements: Int): Publisher[Int] = { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512))(system) val iter = Iterator from 1000 - Flow(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher + Flow(if (elements > 0) iter take elements else iter).toProducer(materializer).getPublisher } } diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala index 751714def2..8e4414fd17 100644 --- a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala @@ -11,7 +11,7 @@ import akka.stream.scaladsl.Flow class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512))(system) def createPublisher(elements: Int): Publisher[Int] = { @@ -20,10 +20,10 @@ class IterableProducerTest extends PublisherVerification[Int] with WithActorSyst new immutable.Iterable[Int] { override def iterator = Iterator from 0 } else 0 until elements - Flow(iterable).toProducer(gen).getPublisher + Flow(iterable).toProducer(materializer).getPublisher } override def createCompletedStatePublisher(): Publisher[Int] = - Flow[Int](Nil).toProducer(gen).getPublisher + Flow[Int](Nil).toProducer(materializer).getPublisher } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala index cab1922c5e..372486df51 100644 --- a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala @@ -10,7 +10,7 @@ import akka.stream.scaladsl.Flow class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { - val gen = FlowMaterializer(MaterializerSettings( + val materializer = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512))(system) def createPublisher(elements: Int): Publisher[Int] = { @@ -19,10 +19,10 @@ class IteratorProducerTest extends PublisherVerification[Int] with WithActorSyst Iterator from 0 else (Iterator from 0).take(elements) - Flow(iter).toProducer(gen).getPublisher + Flow(iter).toProducer(materializer).getPublisher } override def createCompletedStatePublisher(): Publisher[Int] = - Flow(List.empty[Int].iterator).toProducer(gen).getPublisher + Flow(List.empty[Int].iterator).toProducer(materializer).getPublisher } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala index 44cd3a7530..cee99f1620 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala @@ -15,7 +15,7 @@ import scala.util.control.NonFatal class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") { - val gen = FlowMaterializer(MaterializerSettings()) + val materializer = FlowMaterializer(MaterializerSettings()) def self = ActorBasedFlowMaterializer.ctx.get().asInstanceOf[ActorContext].self @@ -24,11 +24,11 @@ class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\na "generate the right level of descendants" in { val f = Flow(() ⇒ { testActor ! self - Flow(List(1)).map(x ⇒ { testActor ! self; x }).toProducer(gen) + Flow(List(1)).map(x ⇒ { testActor ! self; x }).toProducer(materializer) }).take(3).foreach(x ⇒ { testActor ! self - Flow(x).foreach(_ ⇒ testActor ! self).consume(gen) - }).toFuture(gen) + Flow(x).foreach(_ ⇒ testActor ! self).consume(materializer) + }).toFuture(materializer) Await.result(f, 3.seconds) val refs = receiveWhile(idle = 250.millis) { case r: ActorRef ⇒ r diff --git a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala new file mode 100644 index 0000000000..9ee9e2563d --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala @@ -0,0 +1,123 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import scala.util.control.NoStackTrace +import org.reactivestreams.api.{ Consumer, Producer } +import org.reactivestreams.spi.{ Subscriber, Publisher, Subscription } +import akka.testkit.AkkaSpec +import akka.stream.impl.ActorBasedFlowMaterializer +import akka.stream.testkit.StreamTestKit +import akka.stream.scaladsl.Flow + +abstract class TwoStreamsSetup extends AkkaSpec { + + val materializer = new ActorBasedFlowMaterializer(MaterializerSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2), system) + + case class TE(message: String) extends RuntimeException(message) with NoStackTrace + + val TestException = TE("test") + + type Outputs + + def operationUnderTest(in1: Flow[Int], in2: Producer[Int]): Flow[Outputs] + + def setup(p1: Publisher[Int], p2: Publisher[Int]) = { + val consumer = StreamTestKit.consumerProbe[Outputs] + operationUnderTest(Flow(producerFromPublisher(p1)), producerFromPublisher(p2)).toProducer(materializer).produceTo(consumer) + consumer + } + + def producerFromPublisher[T](publisher: Publisher[T]): Producer[T] = new Producer[T] { + private val pub = publisher + override def produceTo(consumer: Consumer[T]): Unit = pub.subscribe(consumer.getSubscriber) + override def getPublisher: Publisher[T] = pub + } + + def failedPublisher[T]: Publisher[T] = new Publisher[T] { + override def subscribe(subscriber: Subscriber[T]): Unit = { + subscriber.onError(TestException) + } + } + + def completedPublisher[T]: Publisher[T] = new Publisher[T] { + override def subscribe(subscriber: Subscriber[T]): Unit = { + subscriber.onComplete() + } + } + + def nonemptyPublisher[T](elems: Iterator[T]): Publisher[T] = Flow(elems).toProducer(materializer).getPublisher + + def soonToFailPublisher[T]: Publisher[T] = new Publisher[T] { + override def subscribe(subscriber: Subscriber[T]): Unit = subscriber.onSubscribe(FailedSubscription(subscriber)) + } + + def soonToCompletePublisher[T]: Publisher[T] = new Publisher[T] { + override def subscribe(subscriber: Subscriber[T]): Unit = subscriber.onSubscribe(CompletedSubscription(subscriber)) + } + + case class FailedSubscription(subscriber: Subscriber[_]) extends Subscription { + override def requestMore(elements: Int): Unit = subscriber.onError(TestException) + override def cancel(): Unit = () + } + + case class CompletedSubscription(subscriber: Subscriber[_]) extends Subscription { + override def requestMore(elements: Int): Unit = subscriber.onComplete() + override def cancel(): Unit = () + } + + def commonTests() = { + "work with two immediately completed producers" in { + val consumer = setup(completedPublisher, completedPublisher) + val subscription = consumer.expectSubscription() + subscription.requestMore(1) + consumer.expectComplete() + } + + "work with two delayed completed producers" in { + val consumer = setup(soonToCompletePublisher, soonToCompletePublisher) + val subscription = consumer.expectSubscription() + subscription.requestMore(1) + consumer.expectComplete() + } + + "work with one immediately completed and one delayed completed producer" in { + val consumer = setup(completedPublisher, soonToCompletePublisher) + val subscription = consumer.expectSubscription() + subscription.requestMore(1) + consumer.expectComplete() + } + + "work with two immediately failed producers" in { + val consumer = setup(failedPublisher, failedPublisher) + consumer.expectError(TestException) + } + + "work with two delayed failed producers" in { + val consumer = setup(soonToFailPublisher, soonToFailPublisher) + val subscription = consumer.expectSubscription() + subscription.requestMore(1) + consumer.expectError(TestException) + } + + // Warning: The two test cases below are somewhat implementation specific and might fail if the implementation + // is changed. They are here to be an early warning though. + "work with one immediately failed and one delayed failed producer (case 1)" in { + val consumer = setup(soonToFailPublisher, failedPublisher) + val subscription = consumer.expectSubscription() + subscription.requestMore(1) + consumer.expectError(TestException) + } + + "work with one immediately failed and one delayed failed producer (case 2)" in { + val consumer = setup(failedPublisher, soonToFailPublisher) + consumer.expectError(TestException) + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index 8ff3d85426..61aa2f2fc1 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -78,12 +78,12 @@ trait ScriptedTest extends ShouldMatchers { class ScriptRunner[In, Out]( op: Flow[In] ⇒ Flow[Out], - gen: MaterializerSettings, + settings: MaterializerSettings, script: Script[In, Out], maximumOverrun: Int, maximumRequest: Int, maximumBuffer: Int)(implicit _system: ActorSystem) - extends ChainSetup(op, gen) { + extends ChainSetup(op, settings) { var _debugLog = Vector.empty[String] var currentScript = script @@ -186,9 +186,9 @@ trait ScriptedTest extends ShouldMatchers { } - def runScript[In, Out](script: Script[In, Out], gen: MaterializerSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)( + def runScript[In, Out](script: Script[In, Out], settings: MaterializerSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)( op: Flow[In] ⇒ Flow[Out])(implicit system: ActorSystem): Unit = { - new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run() + new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer).run() } }