=str #15743 tests for prepend and append

* these have no AST nodes, they work with AST List directly
This commit is contained in:
Martynas Mickevicius 2014-09-10 16:56:41 +03:00
parent 597ad076e4
commit 03b0364786
2 changed files with 111 additions and 0 deletions

View file

@ -0,0 +1,65 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.actor.ActorSystem
import akka.stream.MaterializerSettings
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import org.reactivestreams.Subscriber
import org.scalatest.Matchers
class FlowAppendSpec extends AkkaSpec with River {
val settings = MaterializerSettings(system)
implicit val materializer = FlowMaterializer(settings)
"ProcessorFlow" should {
"append ProcessorFlow" in riverOf[String] { subscriber
FlowFrom[Int]
.append(otherFlow)
.withSource(IterableSource(elements))
.publishTo(subscriber)
}
"append FlowWithSink" in riverOf[String] { subscriber
FlowFrom[Int]
.append(otherFlow.withSink(SubscriberSink(subscriber)))
.withSource(IterableSource(elements))
.run()
}
}
"FlowWithSource" should {
"append ProcessorFlow" in riverOf[String] { subscriber
FlowFrom(elements)
.append(otherFlow)
.publishTo(subscriber)
}
"append FlowWithSink" in riverOf[String] { subscriber
FlowFrom(elements)
.append(otherFlow.withSink(SubscriberSink(subscriber)))
.run()
}
}
}
trait River { self: Matchers
val elements = (1 to 10)
val otherFlow = FlowFrom[Int].map(_.toString)
def riverOf[T](flowConstructor: Subscriber[T] Unit)(implicit system: ActorSystem) = {
val subscriber = StreamTestKit.SubscriberProbe[T]()
flowConstructor(subscriber)
val subscription = subscriber.expectSubscription()
subscription.request(elements.size)
subscriber.probe.receiveN(elements.size) should be(elements.map(_.toString).map(StreamTestKit.OnNext(_)))
subscription.request(1)
subscriber.expectComplete()
}
}

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.stream.MaterializerSettings
import akka.stream.testkit.AkkaSpec
class FlowPrependSpec extends AkkaSpec with River {
val settings = MaterializerSettings(system)
implicit val materializer = FlowMaterializer(settings)
"ProcessorFlow" should {
"prepend ProcessorFlow" in riverOf[String] { subscriber
FlowFrom[String]
.prepend(otherFlow)
.withSource(IterableSource(elements))
.publishTo(subscriber)
}
"prepend FlowWithSource" in riverOf[String] { subscriber
FlowFrom[String]
.prepend(otherFlow.withSource(IterableSource(elements)))
.publishTo(subscriber)
}
}
"FlowWithSink" should {
"prepend ProcessorFlow" in riverOf[String] { subscriber
FlowFrom[String]
.withSink(SubscriberSink(subscriber))
.prepend(otherFlow)
.withSource(IterableSource(elements))
.run()
}
"prepend FlowWithSource" in riverOf[String] { subscriber
FlowFrom[String]
.withSink(SubscriberSink(subscriber))
.prepend(otherFlow.withSource(IterableSource(elements)))
.run()
}
}
}