diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowAppendSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowAppendSpec.scala new file mode 100644 index 0000000000..4a467d02bf --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowAppendSpec.scala @@ -0,0 +1,65 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.actor.ActorSystem +import akka.stream.MaterializerSettings +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import org.reactivestreams.Subscriber +import org.scalatest.Matchers + +class FlowAppendSpec extends AkkaSpec with River { + + val settings = MaterializerSettings(system) + implicit val materializer = FlowMaterializer(settings) + + "ProcessorFlow" should { + "append ProcessorFlow" in riverOf[String] { subscriber ⇒ + FlowFrom[Int] + .append(otherFlow) + .withSource(IterableSource(elements)) + .publishTo(subscriber) + } + + "append FlowWithSink" in riverOf[String] { subscriber ⇒ + FlowFrom[Int] + .append(otherFlow.withSink(SubscriberSink(subscriber))) + .withSource(IterableSource(elements)) + .run() + } + } + + "FlowWithSource" should { + "append ProcessorFlow" in riverOf[String] { subscriber ⇒ + FlowFrom(elements) + .append(otherFlow) + .publishTo(subscriber) + } + + "append FlowWithSink" in riverOf[String] { subscriber ⇒ + FlowFrom(elements) + .append(otherFlow.withSink(SubscriberSink(subscriber))) + .run() + } + } + +} + +trait River { self: Matchers ⇒ + + val elements = (1 to 10) + val otherFlow = FlowFrom[Int].map(_.toString) + + def riverOf[T](flowConstructor: Subscriber[T] ⇒ Unit)(implicit system: ActorSystem) = { + val subscriber = StreamTestKit.SubscriberProbe[T]() + + flowConstructor(subscriber) + + val subscription = subscriber.expectSubscription() + subscription.request(elements.size) + subscriber.probe.receiveN(elements.size) should be(elements.map(_.toString).map(StreamTestKit.OnNext(_))) + subscription.request(1) + subscriber.expectComplete() + } +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPrependSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPrependSpec.scala new file mode 100644 index 0000000000..7cb25dbf2a --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowPrependSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.MaterializerSettings +import akka.stream.testkit.AkkaSpec + +class FlowPrependSpec extends AkkaSpec with River { + + val settings = MaterializerSettings(system) + implicit val materializer = FlowMaterializer(settings) + + "ProcessorFlow" should { + "prepend ProcessorFlow" in riverOf[String] { subscriber ⇒ + FlowFrom[String] + .prepend(otherFlow) + .withSource(IterableSource(elements)) + .publishTo(subscriber) + } + + "prepend FlowWithSource" in riverOf[String] { subscriber ⇒ + FlowFrom[String] + .prepend(otherFlow.withSource(IterableSource(elements))) + .publishTo(subscriber) + } + } + + "FlowWithSink" should { + "prepend ProcessorFlow" in riverOf[String] { subscriber ⇒ + FlowFrom[String] + .withSink(SubscriberSink(subscriber)) + .prepend(otherFlow) + .withSource(IterableSource(elements)) + .run() + } + + "prepend FlowWithSource" in riverOf[String] { subscriber ⇒ + FlowFrom[String] + .withSink(SubscriberSink(subscriber)) + .prepend(otherFlow.withSource(IterableSource(elements))) + .run() + } + } + +}