!str Increased test coverage

This commit is contained in:
Endre Sándor Varga 2014-04-01 16:00:04 +02:00 committed by Roland Kuhn
parent a318676f4a
commit 14ef65262f
29 changed files with 545 additions and 162 deletions

View file

@ -43,9 +43,9 @@ private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorCon
private[akka] object ActorConsumer {
import Ast._
def props(gen: MaterializerSettings, op: AstNode) = op match {
case t: Transform Props(new TransformActorConsumer(gen, t))
case r: Recover Props(new RecoverActorConsumer(gen, r))
def props(settings: MaterializerSettings, op: AstNode) = op match {
case t: Transform Props(new TransformActorConsumer(settings, t))
case r: Recover Props(new RecoverActorConsumer(settings, r))
}
}

View file

@ -58,10 +58,15 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin
def waitingExposedPublisher: Receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
publisherExposed()
context.become(waitingForUpstream)
case _ throw new IllegalStateException("The first message must be ExposedPublisher")
}
// WARNING: DO NOT SEND messages from the constructor (that includes subscribing to other streams) since their reply
// might arrive earlier than ExposedPublisher. Override this method to schedule such events.
protected def publisherExposed(): Unit = ()
def waitingForUpstream: Receive = downstreamManagement orElse {
case OnComplete
// Instead of introducing an edge case, handle it in the general way

View file

@ -131,7 +131,8 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
var secondaryInputs: Inputs = _
other.getPublisher.subscribe(new OtherActorSubscriber(self))
override def publisherExposed(): Unit =
other.getPublisher.subscribe(new OtherActorSubscriber(self))
override def waitingForUpstream: Receive = super.waitingForUpstream orElse {
case OtherStreamOnComplete

View file

@ -7,22 +7,20 @@ import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
import org.reactivestreams.api.Producer
class FlowConcatSpec extends AkkaSpec {
class FlowConcatSpec extends TwoStreamsSetup {
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
type Outputs = Int
override def operationUnderTest(in1: Flow[Int], in2: Producer[Int]) = in1.concat(in2)
"Concat" must {
"work in the happy case" in {
val source0 = Flow(List.empty[Int].iterator).toProducer(gen)
val source1 = Flow((1 to 4).iterator).toProducer(gen)
val source2 = Flow((5 to 10).iterator).toProducer(gen)
val p = Flow(source0).concat(source1).concat(source2).toProducer(gen)
val source0 = Flow(List.empty[Int].iterator).toProducer(materializer)
val source1 = Flow((1 to 4).iterator).toProducer(materializer)
val source2 = Flow((5 to 10).iterator).toProducer(materializer)
val p = Flow(source0).concat(source1).concat(source2).toProducer(materializer)
val probe = StreamTestKit.consumerProbe[Int]
p.produceTo(probe)
@ -36,5 +34,69 @@ class FlowConcatSpec extends AkkaSpec {
probe.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty producer" in {
val consumer1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = consumer1.expectSubscription()
subscription1.requestMore(5)
consumer1.expectNext(1)
consumer1.expectNext(2)
consumer1.expectNext(3)
consumer1.expectNext(4)
consumer1.expectComplete()
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(5)
consumer2.expectNext(1)
consumer2.expectNext(2)
consumer2.expectNext(3)
consumer2.expectNext(4)
consumer2.expectComplete()
}
"work with one delayed completed and one nonempty producer" in {
val consumer1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = consumer1.expectSubscription()
subscription1.requestMore(5)
consumer1.expectNext(1)
consumer1.expectNext(2)
consumer1.expectNext(3)
consumer1.expectNext(4)
consumer1.expectComplete()
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(5)
consumer2.expectNext(1)
consumer2.expectNext(2)
consumer2.expectNext(3)
consumer2.expectNext(4)
consumer2.expectComplete()
}
"work with one immediately failed and one nonempty producer" in {
val consumer1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator))
consumer1.expectError(TestException)
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(5)
consumer2.expectError(TestException)
}
"work with one delayed failed and one nonempty producer" in {
val consumer1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = consumer1.expectSubscription()
subscription1.requestMore(5)
consumer1.expectError(TestException)
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(5)
consumer2.expectError(TestException)
}
}
}

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamDropSpec extends AkkaSpec with ScriptedTest {
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -21,7 +21,7 @@ class StreamDropSpec extends AkkaSpec with ScriptedTest {
def script(d: Int) = Script((1 to 50) map { n Seq(n) -> (if (n <= d) Nil else Seq(n)) }: _*)
(1 to 50) foreach { _
val d = Math.min(Math.max(random.nextInt(-10, 60), 0), 50)
runScript(script(d), genSettings)(_.drop(d))
runScript(script(d), settings)(_.drop(d))
}
}

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamFilterSpec extends AkkaSpec with ScriptedTest {
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -19,7 +19,7 @@ class StreamFilterSpec extends AkkaSpec with ScriptedTest {
"filter" in {
def script = Script((1 to 50) map { _ val x = random.nextInt(); Seq(x) -> (if ((x & 1) == 0) Seq(x) else Seq()) }: _*)
(1 to 50) foreach (_ runScript(script, genSettings)(_.filter(_ % 2 == 0)))
(1 to 50) foreach (_ runScript(script, settings)(_.filter(_ % 2 == 0)))
}
}

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamFoldSpec extends AkkaSpec with ScriptedTest {
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -19,7 +19,7 @@ class StreamFoldSpec extends AkkaSpec with ScriptedTest {
"fold" in {
def script = Script((1 to 50).toSeq -> Seq(25 * 51))
(1 to 50) foreach (_ runScript(script, genSettings)(_.fold(0)(_ + _)))
(1 to 50) foreach (_ runScript(script, settings)(_.fold(0)(_ + _)))
}
}

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class FlowForeachSpec extends AkkaSpec with ScriptedTest {
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -24,7 +24,7 @@ class FlowForeachSpec extends AkkaSpec with ScriptedTest {
Script((1 to 50).toSeq -> Seq(()))
}
(1 to 50) foreach { _
runScript(script, genSettings)(_.foreach(x count += x))
runScript(script, settings)(_.foreach(x count += x))
count should be(25 * 51)
}
}

View file

@ -9,11 +9,12 @@ import akka.testkit.AkkaSpec
import org.reactivestreams.api.Producer
import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.scaladsl.Flow
import scala.util.control.NoStackTrace
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamGroupBySpec extends AkkaSpec {
class FlowGroupBySpec extends AkkaSpec {
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
val materializer = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
@ -28,12 +29,13 @@ class StreamGroupBySpec extends AkkaSpec {
def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete()
def expectError(e: Throwable) = probe.expectError(e)
def cancel(): Unit = subscription.cancel()
}
class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) {
val source = Flow((1 to elementCount).iterator).toProducer(gen)
val groupStream = Flow(source).groupBy(_ % groupCount).toProducer(gen)
val source = Flow((1 to elementCount).iterator).toProducer(materializer)
val groupStream = Flow(source).groupBy(_ % groupCount).toProducer(materializer)
val masterConsumer = StreamTestKit.consumerProbe[(Int, Producer[Int])]
groupStream.produceTo(masterConsumer)
@ -52,6 +54,8 @@ class StreamGroupBySpec extends AkkaSpec {
}
case class TE(message: String) extends RuntimeException(message) with NoStackTrace
"groupBy" must {
"work in the happy case" in new SubstreamsSupport(groupCount = 2) {
val s1 = StreamPuppet(getSubproducer(1))
@ -105,13 +109,21 @@ class StreamGroupBySpec extends AkkaSpec {
}
"accept cancellation of master stream when not consumed anything" in new SubstreamsSupport(groupCount = 2) {
masterSubscription.cancel()
masterConsumer.expectNoMsg(100.millis)
"accept cancellation of master stream when not consumed anything" in {
val producerProbe = StreamTestKit.producerProbe[Int]
val producer = Flow(producerProbe).groupBy(_ % 2).toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])]
producer.produceTo(consumer)
val upstreamSubscription = producerProbe.expectSubscription()
val downstreamSubscription = consumer.expectSubscription()
downstreamSubscription.cancel()
upstreamSubscription.expectCancellation()
}
"accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) {
pending
// FIXME: Needs handling of loose substreams that no one refers to anymore.
// val substream = StreamPuppet(getSubproducer(1))
//
// substream.requestMore(1)
@ -148,15 +160,100 @@ class StreamGroupBySpec extends AkkaSpec {
}
"work with fanout on master stream" in {
pending
val source = Flow((1 to 4).iterator).toProducer(materializer)
val groupStream = Flow(source).groupBy(_ % 2).toProducer(materializer)
val masterConsumer1 = StreamTestKit.consumerProbe[(Int, Producer[Int])]
val masterConsumer2 = StreamTestKit.consumerProbe[(Int, Producer[Int])]
groupStream.produceTo(masterConsumer1)
groupStream.produceTo(masterConsumer2)
val masterSubscription1 = masterConsumer1.expectSubscription()
val masterSubscription2 = masterConsumer2.expectSubscription()
masterSubscription1.requestMore(2)
masterSubscription2.requestMore(1)
val (key11, substream11) = masterConsumer1.expectNext()
key11 should be(1)
val (key21, substream21) = masterConsumer2.expectNext()
key21 should be(1)
val puppet11 = StreamPuppet(substream11)
val puppet21 = StreamPuppet(substream21)
puppet11.requestMore(2)
puppet11.expectNext(1)
puppet11.expectNext(3)
puppet21.requestMore(1)
puppet21.expectNext(1)
puppet21.cancel()
masterSubscription2.cancel()
val (key12, substream12) = masterConsumer1.expectNext()
key12 should be(0)
val puppet12 = StreamPuppet(substream12)
puppet12.requestMore(1)
puppet12.expectNext(2)
puppet12.cancel()
masterSubscription1.cancel()
}
"work with fanout on substreams and master stream" in {
pending
"work with empty input stream" in {
val producer = Flow(List.empty[Int]).groupBy(_ % 2).toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])]
producer.produceTo(consumer)
val subscription = consumer.expectSubscription()
subscription.requestMore(100)
consumer.expectComplete()
}
"abort on onError from upstream" in {
pending
val producerProbe = StreamTestKit.producerProbe[Int]
val producer = Flow(producerProbe).groupBy(_ % 2).toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])]
producer.produceTo(consumer)
val upstreamSubscription = producerProbe.expectSubscription()
val downstreamSubscription = consumer.expectSubscription()
downstreamSubscription.requestMore(100)
val e = TE("test")
upstreamSubscription.sendError(e)
consumer.expectError(e)
}
"abort on onError from upstream when substreams are running" in {
val producerProbe = StreamTestKit.producerProbe[Int]
val producer = Flow(producerProbe).groupBy(_ % 2).toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])]
producer.produceTo(consumer)
val upstreamSubscription = producerProbe.expectSubscription()
val downstreamSubscription = consumer.expectSubscription()
downstreamSubscription.requestMore(100)
upstreamSubscription.sendNext(1)
val (_, substream) = consumer.expectNext()
val substreamPuppet = StreamPuppet(substream)
substreamPuppet.requestMore(1)
substreamPuppet.expectNext(1)
val e = TE("test")
upstreamSubscription.sendError(e)
substreamPuppet.expectError(e)
consumer.expectError(e)
}
}

View file

@ -10,7 +10,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamGroupedSpec extends AkkaSpec with ScriptedTest {
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -20,13 +20,13 @@ class StreamGroupedSpec extends AkkaSpec with ScriptedTest {
"group evenly" in {
def script = Script((1 to 20) map { _ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*)
(1 to 30) foreach (_ runScript(script, genSettings)(_.grouped(3)))
(1 to 30) foreach (_ runScript(script, settings)(_.grouped(3)))
}
"group with rest" in {
def script = Script(((1 to 20).map { _ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }
:+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*)
(1 to 30) foreach (_ runScript(script, genSettings)(_.grouped(3)))
(1 to 30) foreach (_ runScript(script, settings)(_.grouped(3)))
}
}

View file

@ -16,12 +16,12 @@ import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowIterableSpec extends AkkaSpec {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))
"A Flow based on an iterable" must {
"produce elements" in {
val p = Flow(List(1, 2, 3)).toProducer(gen)
val p = Flow(List(1, 2, 3)).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
@ -35,7 +35,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"complete empty" in {
val p = Flow(List.empty[Int]).toProducer(gen)
val p = Flow(List.empty[Int]).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
c.expectComplete()
@ -47,7 +47,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements with multiple subscribers" in {
val p = Flow(List(1, 2, 3)).toProducer(gen)
val p = Flow(List(1, 2, 3)).toProducer(materializer)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
@ -71,7 +71,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements to later subscriber" in {
val p = Flow(List(1, 2, 3)).toProducer(gen)
val p = Flow(List(1, 2, 3)).toProducer(materializer)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
@ -97,7 +97,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements with one transformation step" in {
val p = Flow(List(1, 2, 3)).map(_ * 2).toProducer(gen)
val p = Flow(List(1, 2, 3)).map(_ * 2).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
@ -109,7 +109,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements with two transformation steps" in {
val p = Flow(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen)
val p = Flow(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
@ -121,7 +121,7 @@ class FlowIterableSpec extends AkkaSpec {
"allow cancel before receiving all elements" in {
val count = 100000
val p = Flow(1 to count).toProducer(gen)
val p = Flow(1 to count).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()

View file

@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowIteratorSpec extends AkkaSpec {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 4,
@ -22,7 +22,7 @@ class FlowIteratorSpec extends AkkaSpec {
"A Flow based on an iterator" must {
"produce elements" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
@ -36,7 +36,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"complete empty" in {
val p = Flow(List.empty[Int].iterator).toProducer(gen)
val p = Flow(List.empty[Int].iterator).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
c.expectComplete()
@ -48,7 +48,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements with multiple subscribers" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
@ -72,7 +72,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements to later subscriber" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
@ -95,7 +95,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements with one transformation step" in {
val p = Flow(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).map(_ * 2).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
@ -107,7 +107,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements with two transformation steps" in {
val p = Flow(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen)
val p = Flow(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
@ -119,7 +119,7 @@ class FlowIteratorSpec extends AkkaSpec {
"allow cancel before receiving all elements" in {
val count = 100000
val p = Flow((1 to count).iterator).toProducer(gen)
val p = Flow((1 to count).iterator).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()

View file

@ -8,7 +8,7 @@ import akka.stream.testkit.ScriptedTest
class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -24,7 +24,7 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
Seq(3) -> Seq(3, 3, 3),
Seq(2) -> Seq(2, 2),
Seq(1) -> Seq(1))
(1 to 100) foreach (_ runScript(script, genSettings)(_.mapConcat(x (1 to x) map (_ x))))
(1 to 100) foreach (_ runScript(script, settings)(_.mapConcat(x (1 to x) map (_ x))))
}
}

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamMapSpec extends AkkaSpec with ScriptedTest {
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -19,7 +19,7 @@ class StreamMapSpec extends AkkaSpec with ScriptedTest {
"map" in {
def script = Script((1 to 50) map { _ val x = random.nextInt(); Seq(x) -> Seq(x.toString) }: _*)
(1 to 50) foreach (_ runScript(script, genSettings)(_.map(_.toString)))
(1 to 50) foreach (_ runScript(script, settings)(_.map(_.toString)))
}
}

View file

@ -10,22 +10,19 @@ import org.reactivestreams.api.Producer
import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.scaladsl.Flow
class FlowMergeSpec extends AkkaSpec {
class FlowMergeSpec extends TwoStreamsSetup {
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
type Outputs = Int
override def operationUnderTest(in1: Flow[Int], in2: Producer[Int]) = in1.merge(in2)
"merge" must {
"work in the happy case" in {
// Different input sizes (4 and 6)
val source1 = Flow((1 to 4).iterator).toProducer(gen)
val source2 = Flow((5 to 10).iterator).toProducer(gen)
val source3 = Flow(List.empty[Int].iterator).toProducer(gen)
val p = Flow(source1).merge(source2).merge(source3).toProducer(gen)
val source1 = Flow((1 to 4).iterator).toProducer(materializer)
val source2 = Flow((5 to 10).iterator).toProducer(materializer)
val source3 = Flow(List.empty[Int].iterator).toProducer(materializer)
val p = Flow(source1).merge(source2).merge(source3).toProducer(materializer)
val probe = StreamTestKit.consumerProbe[Int]
p.produceTo(probe)
@ -41,6 +38,58 @@ class FlowMergeSpec extends AkkaSpec {
probe.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty producer" in {
val consumer1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = consumer1.expectSubscription()
subscription1.requestMore(4)
consumer1.expectNext(1)
consumer1.expectNext(2)
consumer1.expectNext(3)
consumer1.expectNext(4)
consumer1.expectComplete()
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(4)
consumer2.expectNext(1)
consumer2.expectNext(2)
consumer2.expectNext(3)
consumer2.expectNext(4)
consumer2.expectComplete()
}
"work with one delayed completed and one nonempty producer" in {
val consumer1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = consumer1.expectSubscription()
subscription1.requestMore(4)
consumer1.expectNext(1)
consumer1.expectNext(2)
consumer1.expectNext(3)
consumer1.expectNext(4)
consumer1.expectComplete()
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(4)
consumer2.expectNext(1)
consumer2.expectNext(2)
consumer2.expectNext(3)
consumer2.expectNext(4)
consumer2.expectComplete()
}
"work with one immediately failed and one nonempty producer" in {
// This is nondeterministic, multiple scenarios can happen
pending
}
"work with one delayed failed and one nonempty producer" in {
// This is nondeterministic, multiple scenarios can happen
pending
}
}
}

View file

@ -18,7 +18,7 @@ import scala.util.Success
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -29,7 +29,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"invoke callback on normal completion" in {
val onCompleteProbe = TestProbe()
val p = StreamTestKit.producerProbe[Int]
Flow(p).onComplete(gen) { onCompleteProbe.ref ! _ }
Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ }
val proc = p.expectSubscription
proc.expectRequestMore()
proc.sendNext(42)
@ -41,7 +41,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"yield the first error" in {
val onCompleteProbe = TestProbe()
val p = StreamTestKit.producerProbe[Int]
Flow(p).onComplete(gen) { onCompleteProbe.ref ! _ }
Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ }
val proc = p.expectSubscription
proc.expectRequestMore()
val ex = new RuntimeException("ex")
@ -53,7 +53,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
"invoke callback for an empty stream" in {
val onCompleteProbe = TestProbe()
val p = StreamTestKit.producerProbe[Int]
Flow(p).onComplete(gen) { onCompleteProbe.ref ! _ }
Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ }
val proc = p.expectSubscription
proc.expectRequestMore()
proc.sendComplete()
@ -69,7 +69,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
x
}.foreach {
x onCompleteProbe.ref ! ("foreach-" + x)
}.onComplete(gen) { onCompleteProbe.ref ! _ }
}.onComplete(materializer) { onCompleteProbe.ref ! _ }
val proc = p.expectSubscription
proc.expectRequestMore()
proc.sendNext(42)

View file

@ -16,7 +16,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
import system.dispatcher
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -29,14 +29,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
for ((name, op) List("identity" -> identity, "identity2" -> identity2); n List(1, 2, 4)) {
s"requests initial elements from upstream ($name, $n)" in {
new ChainSetup(op, genSettings.copy(initialInputBufferSize = n)) {
new ChainSetup(op, settings.copy(initialInputBufferSize = n)) {
upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize)
}
}
}
"requests more elements from upstream when downstream requests more elements" in {
new ChainSetup(identity, genSettings) {
new ChainSetup(identity, settings) {
upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize)
downstreamSubscription.requestMore(1)
upstream.expectNoMsg(100.millis)
@ -55,7 +55,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"deliver events when publisher sends elements and then completes" in {
new ChainSetup(identity, genSettings) {
new ChainSetup(identity, settings) {
downstreamSubscription.requestMore(1)
upstreamSubscription.sendNext("test")
upstreamSubscription.sendComplete()
@ -65,14 +65,14 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"deliver complete signal when publisher immediately completes" in {
new ChainSetup(identity, genSettings) {
new ChainSetup(identity, settings) {
upstreamSubscription.sendComplete()
downstream.expectComplete()
}
}
"deliver error signal when publisher immediately fails" in {
new ChainSetup(identity, genSettings) {
new ChainSetup(identity, settings) {
object WeirdError extends RuntimeException("weird test exception")
EventFilter[WeirdError.type](occurrences = 1) intercept {
upstreamSubscription.sendError(WeirdError)
@ -82,7 +82,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"single subscriber cancels subscription while receiving data" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1)) {
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1)) {
downstreamSubscription.requestMore(5)
upstreamSubscription.expectRequestMore(1)
upstreamSubscription.sendNext("test")
@ -102,7 +102,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
"A Flow with multiple subscribers (FanOutBox)" must {
"adapt speed to the currently slowest consumer" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
val downstream2 = StreamTestKit.consumerProbe[Any]()
producer.produceTo(downstream2)
val downstream2Subscription = downstream2.expectSubscription()
@ -128,7 +128,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"support slow consumer with fan-out 2" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) {
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) {
val downstream2 = StreamTestKit.consumerProbe[Any]()
producer.produceTo(downstream2)
val downstream2Subscription = downstream2.expectSubscription()
@ -167,7 +167,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"incoming subscriber while elements were requested before" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
downstreamSubscription.requestMore(5)
upstream.expectRequestMore(upstreamSubscription, 1)
upstreamSubscription.sendNext("a1")
@ -204,7 +204,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"blocking subscriber cancels subscription" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
val downstream2 = StreamTestKit.consumerProbe[Any]()
producer.produceTo(downstream2)
val downstream2Subscription = downstream2.expectSubscription()
@ -239,7 +239,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"after initial upstream was completed future subscribers' onComplete should be called instead of onSubscribed" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
val downstream2 = StreamTestKit.consumerProbe[Any]()
// don't link it just yet
@ -278,7 +278,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"after initial upstream reported an error future subscribers' onError should be called instead of onSubscribed" in {
new ChainSetup[Int, String](_.map(_ throw TestException), genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
new ChainSetup[Int, String](_.map(_ throw TestException), settings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
downstreamSubscription.requestMore(1)
upstreamSubscription.expectRequestMore(1)
@ -296,7 +296,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"when all subscriptions were cancelled future subscribers' onError should be called" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1)) {
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1)) {
upstreamSubscription.expectRequestMore(1)
downstreamSubscription.cancel()
upstreamSubscription.expectCancellation()

View file

@ -11,9 +11,9 @@ import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamSplitWhenSpec extends AkkaSpec {
class FlowSplitWhenSpec extends AkkaSpec {
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
val materializer = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
@ -32,8 +32,8 @@ class StreamSplitWhenSpec extends AkkaSpec {
}
class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) {
val source = Flow((1 to elementCount).iterator).toProducer(gen)
val groupStream = Flow(source).splitWhen(_ == splitWhen).toProducer(gen)
val source = Flow((1 to elementCount).iterator).toProducer(materializer)
val groupStream = Flow(source).splitWhen(_ == splitWhen).toProducer(materializer)
val masterConsumer = StreamTestKit.consumerProbe[Producer[Int]]
groupStream.produceTo(masterConsumer)

View file

@ -12,7 +12,7 @@ import akka.stream.impl.RequestMore
class StreamTakeSpec extends AkkaSpec with ScriptedTest {
val genSettings = MaterializerSettings(
val settings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -26,7 +26,7 @@ class StreamTakeSpec extends AkkaSpec with ScriptedTest {
def script(d: Int) = Script((1 to 50) map { n Seq(n) -> (if (n > d) Nil else Seq(n)) }: _*)
(1 to 50) foreach { _
val d = Math.min(Math.max(random.nextInt(-10, 60), 0), 50)
runScript(script(d), genSettings)(_.take(d))
runScript(script(d), settings)(_.take(d))
}
}

View file

@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow
class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
@ -24,7 +24,7 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
"yield the first value" in {
val p = StreamTestKit.producerProbe[Int]
val f = Flow(p).toFuture(gen)
val f = Flow(p).toFuture(materializer)
val proc = p.expectSubscription
proc.expectRequestMore()
proc.sendNext(42)
@ -34,7 +34,7 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
"yield the first error" in {
val p = StreamTestKit.producerProbe[Int]
val f = Flow(p).toFuture(gen)
val f = Flow(p).toFuture(materializer)
val proc = p.expectSubscription
proc.expectRequestMore()
val ex = new RuntimeException("ex")
@ -45,7 +45,7 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
"yield NoSuchElementExcption for empty stream" in {
val p = StreamTestKit.producerProbe[Int]
val f = Flow(p).toFuture(gen)
val f = Flow(p).toFuture(materializer)
val proc = p.expectSubscription
proc.expectRequestMore()
proc.sendComplete()

View file

@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowTransformRecoverSpec extends AkkaSpec {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
@ -22,10 +22,10 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"A Flow with transformRecover operations" must {
"produce one-to-one transformation as expected" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val p2 = Flow(p).
transformRecover(0)((tot, elem) (tot + elem.get, List(tot + elem.get))).
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -39,10 +39,10 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce one-to-several transformation as expected" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val p2 = Flow(p).
transformRecover(0)((tot, elem) (tot + elem.get, Vector.fill(elem.get)(tot + elem.get))).
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -59,10 +59,10 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce dropping transformation as expected" in {
val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3, 4).iterator).toProducer(materializer)
val p2 = Flow(p).
transformRecover(0)((tot, elem) (tot + elem.get, if (elem.get % 2 == 0) Nil else List(tot + elem.get))).
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -76,14 +76,14 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce multi-step transformation as expected" in {
val p = Flow(List("a", "bc", "def").iterator).toProducer(gen)
val p = Flow(List("a", "bc", "def").iterator).toProducer(materializer)
val p2 = Flow(p).
transformRecover("") { (str, elem)
val concat = str + elem
(concat, List(concat.length))
}.
transformRecover(0)((tot, length) (tot + length.get, List(tot + length.get))).
toProducer(gen)
toProducer(materializer)
val c1 = StreamTestKit.consumerProbe[Int]
p2.produceTo(c1)
val sub1 = c1.expectSubscription()
@ -106,8 +106,8 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"invoke onComplete when done" in {
val p = Flow(List("a").iterator).toProducer(gen)
val p2 = Flow(p).transformRecover("")((s, in) (s + in, Nil), x List(x + "B")).toProducer(gen)
val p = Flow(List("a").iterator).toProducer(materializer)
val p2 = Flow(p).transformRecover("")((s, in) (s + in, Nil), x List(x + "B")).toProducer(materializer)
val c = StreamTestKit.consumerProbe[String]
p2.produceTo(c)
val s = c.expectSubscription()
@ -118,7 +118,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"allow cancellation using isComplete" in {
val p = StreamTestKit.producerProbe[Int]
val p2 = Flow(p).transformRecover("")((s, in) (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(gen)
val p2 = Flow(p).transformRecover("")((s, in) (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(materializer)
val proc = p.expectSubscription
val c = StreamTestKit.consumerProbe[Int]
p2.produceTo(c)
@ -137,7 +137,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
(s, in) (s + in, List(in.get)),
onComplete = x List(x.size + 10),
isComplete = _ == "Success(1)")
.toProducer(gen)
.toProducer(materializer)
val proc = p.expectSubscription
val c = StreamTestKit.consumerProbe[Int]
p2.produceTo(c)
@ -152,12 +152,12 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"report error when exception is thrown" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val p2 = Flow(p).
transformRecover(0) { (_, elem)
if (elem.get == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem.get, elem.get))
}.
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -177,7 +177,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
val p2 = Flow(p).transformRecover("")(
{ case (s, Failure(ex)) (s + ex.getMessage, List(ex)) },
onComplete = x List(TE(x.size + "10")))
.toProducer(gen)
.toProducer(materializer)
val proc = p.expectSubscription()
val c = StreamTestKit.consumerProbe[Throwable]
p2.produceTo(c)
@ -191,7 +191,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"forward errors when received and thrown" in {
val p = StreamTestKit.producerProbe[Int]
val p2 = Flow(p).transformRecover("")((_, in) "" -> List(in.get)).toProducer(gen)
val p2 = Flow(p).transformRecover("")((_, in) "" -> List(in.get)).toProducer(materializer)
val proc = p.expectSubscription()
val c = StreamTestKit.consumerProbe[Int]
p2.produceTo(c)
@ -204,10 +204,10 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"support cancel as expected" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val p2 = Flow(p).
transformRecover(0) { (_, elem) (0, List(elem.get, elem.get)) }.
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()

View file

@ -16,7 +16,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
import system.dispatcher
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
@ -24,10 +24,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"A Flow with transform operations" must {
"produce one-to-one transformation as expected" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val p2 = Flow(p).
transform(0)((tot, elem) (tot + elem, List(tot + elem))).
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -41,10 +41,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"produce one-to-several transformation as expected" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val p2 = Flow(p).
transform(0)((tot, elem) (tot + elem, Vector.fill(elem)(tot + elem))).
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -61,10 +61,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"produce dropping transformation as expected" in {
val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3, 4).iterator).toProducer(materializer)
val p2 = Flow(p).
transform(0)((tot, elem) (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))).
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -78,14 +78,14 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"produce multi-step transformation as expected" in {
val p = Flow(List("a", "bc", "def").iterator).toProducer(gen)
val p = Flow(List("a", "bc", "def").iterator).toProducer(materializer)
val p2 = Flow(p).
transform("") { (str, elem)
val concat = str + elem
(concat, List(concat.length))
}.
transform(0)((tot, length) (tot + length, List(tot + length))).
toProducer(gen)
toProducer(materializer)
val c1 = StreamTestKit.consumerProbe[Int]
p2.produceTo(c1)
val sub1 = c1.expectSubscription()
@ -108,8 +108,8 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"invoke onComplete when done" in {
val p = Flow(List("a").iterator).toProducer(gen)
val p2 = Flow(p).transform("")((s, in) (s + in, Nil), x List(x + "B")).toProducer(gen)
val p = Flow(List("a").iterator).toProducer(materializer)
val p2 = Flow(p).transform("")((s, in) (s + in, Nil), x List(x + "B")).toProducer(materializer)
val c = StreamTestKit.consumerProbe[String]
p2.produceTo(c)
val s = c.expectSubscription()
@ -120,9 +120,9 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"invoke cleanup when done" in {
val cleanupProbe = TestProbe()
val p = Flow(List("a").iterator).toProducer(gen)
val p = Flow(List("a").iterator).toProducer(materializer)
val p2 = Flow(p).transform("")((s, in) (s + in, Nil), x List(x + "B"),
cleanup = s cleanupProbe.ref ! s).toProducer(gen)
cleanup = s cleanupProbe.ref ! s).toProducer(materializer)
val c = StreamTestKit.consumerProbe[String]
p2.produceTo(c)
val s = c.expectSubscription()
@ -134,10 +134,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"invoke cleanup when done after error" in {
val cleanupProbe = TestProbe()
val p = Flow(List("a", "b", "c").iterator).toProducer(gen)
val p = Flow(List("a", "b", "c").iterator).toProducer(materializer)
val p2 = Flow(p).transform("")(
f = (s, in) if (in == "b") throw new IllegalArgumentException("Not b") else (s + in.toUpperCase, List(s + in)),
cleanup = s cleanupProbe.ref ! s).toProducer(gen)
cleanup = s cleanupProbe.ref ! s).toProducer(materializer)
val c = StreamTestKit.consumerProbe[String]
p2.produceTo(c)
val s = c.expectSubscription()
@ -150,7 +150,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
"allow cancellation using isComplete" in {
val p = StreamTestKit.producerProbe[Int]
val p2 = Flow(p).transform("")((s, in) (s + in, List(in)), isComplete = _ == "1").toProducer(gen)
val p2 = Flow(p).transform("")((s, in) (s + in, List(in)), isComplete = _ == "1").toProducer(materializer)
val proc = p.expectSubscription
val c = StreamTestKit.consumerProbe[Int]
p2.produceTo(c)
@ -167,7 +167,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
val cleanupProbe = TestProbe()
val p = StreamTestKit.producerProbe[Int]
val p2 = Flow(p).transform("")((s, in) (s + in, List(in)), onComplete = x List(x.size + 10),
isComplete = _ == "1", cleanup = s cleanupProbe.ref ! s).toProducer(gen)
isComplete = _ == "1", cleanup = s cleanupProbe.ref ! s).toProducer(materializer)
val proc = p.expectSubscription
val c = StreamTestKit.consumerProbe[Int]
p2.produceTo(c)
@ -183,12 +183,12 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"report error when exception is thrown" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val p2 = Flow(p).
transform(0) { (_, elem)
if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem))
}.
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -202,10 +202,10 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"support cancel as expected" in {
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
val p = Flow(List(1, 2, 3).iterator).toProducer(materializer)
val p2 = Flow(p).
transform(0) { (_, elem) (0, List(elem, elem)) }.
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
@ -219,9 +219,9 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
}
"support producing elements from empty inputs" in {
val p = Flow(List.empty[Int].iterator).toProducer(gen)
val p = Flow(List.empty[Int].iterator).toProducer(materializer)
val p2 = Flow(p).transform(List(1, 2, 3))((s, _) (s, Nil), onComplete = s s).
toProducer(gen)
toProducer(materializer)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()

View file

@ -7,22 +7,20 @@ import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
import org.reactivestreams.api.Producer
class FlowZipSpec extends AkkaSpec {
class FlowZipSpec extends TwoStreamsSetup {
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
type Outputs = (Int, Int)
override def operationUnderTest(in1: Flow[Int], in2: Producer[Int]) = in1.zip(in2)
"Zip" must {
"work in the happy case" in {
// Different input sizes (4 and 6)
val source1 = Flow((1 to 4).iterator).toProducer(gen)
val source2 = Flow(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen)
val p = Flow(source1).zip(source2).toProducer(gen)
val source1 = Flow((1 to 4).iterator).toProducer(materializer)
val source2 = Flow(List("A", "B", "C", "D", "E", "F").iterator).toProducer(materializer)
val p = Flow(source1).zip(source2).toProducer(materializer)
val probe = StreamTestKit.consumerProbe[(Int, String)]
p.produceTo(probe)
@ -40,6 +38,54 @@ class FlowZipSpec extends AkkaSpec {
probe.expectComplete()
}
commonTests()
"work with one immediately completed and one nonempty producer" in {
val consumer1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = consumer1.expectSubscription()
subscription1.requestMore(4)
consumer1.expectComplete()
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(4)
consumer2.expectComplete()
}
"work with one delayed completed and one nonempty producer" in {
val consumer1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = consumer1.expectSubscription()
subscription1.requestMore(4)
consumer1.expectComplete()
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(4)
consumer2.expectComplete()
}
"work with one immediately failed and one nonempty producer" in {
val consumer1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator))
consumer1.expectError(TestException)
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(4)
consumer2.expectError(TestException)
}
"work with one delayed failed and one nonempty producer" in {
val consumer1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator))
val subscription1 = consumer1.expectSubscription()
subscription1.requestMore(4)
consumer1.expectError(TestException)
val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher)
val subscription2 = consumer2.expectSubscription()
subscription2.requestMore(4)
consumer2.expectError(TestException)
}
}
}

View file

@ -38,10 +38,10 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With
}
def createHelperPublisher(elements: Int): Publisher[Int] = {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
val iter = Iterator from 1000
Flow(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher
Flow(if (elements > 0) iter take elements else iter).toProducer(materializer).getPublisher
}
}

View file

@ -11,7 +11,7 @@ import akka.stream.scaladsl.Flow
class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {
@ -20,10 +20,10 @@ class IterableProducerTest extends PublisherVerification[Int] with WithActorSyst
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else
0 until elements
Flow(iterable).toProducer(gen).getPublisher
Flow(iterable).toProducer(materializer).getPublisher
}
override def createCompletedStatePublisher(): Publisher[Int] =
Flow[Int](Nil).toProducer(gen).getPublisher
Flow[Int](Nil).toProducer(materializer).getPublisher
}

View file

@ -10,7 +10,7 @@ import akka.stream.scaladsl.Flow
class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
val gen = FlowMaterializer(MaterializerSettings(
val materializer = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {
@ -19,10 +19,10 @@ class IteratorProducerTest extends PublisherVerification[Int] with WithActorSyst
Iterator from 0
else
(Iterator from 0).take(elements)
Flow(iter).toProducer(gen).getPublisher
Flow(iter).toProducer(materializer).getPublisher
}
override def createCompletedStatePublisher(): Publisher[Int] =
Flow(List.empty[Int].iterator).toProducer(gen).getPublisher
Flow(List.empty[Int].iterator).toProducer(materializer).getPublisher
}

View file

@ -15,7 +15,7 @@ import scala.util.control.NonFatal
class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") {
val gen = FlowMaterializer(MaterializerSettings())
val materializer = FlowMaterializer(MaterializerSettings())
def self = ActorBasedFlowMaterializer.ctx.get().asInstanceOf[ActorContext].self
@ -24,11 +24,11 @@ class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\na
"generate the right level of descendants" in {
val f = Flow(() {
testActor ! self
Flow(List(1)).map(x { testActor ! self; x }).toProducer(gen)
Flow(List(1)).map(x { testActor ! self; x }).toProducer(materializer)
}).take(3).foreach(x {
testActor ! self
Flow(x).foreach(_ testActor ! self).consume(gen)
}).toFuture(gen)
Flow(x).foreach(_ testActor ! self).consume(materializer)
}).toFuture(materializer)
Await.result(f, 3.seconds)
val refs = receiveWhile(idle = 250.millis) {
case r: ActorRef r

View file

@ -0,0 +1,123 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.util.control.NoStackTrace
import org.reactivestreams.api.{ Consumer, Producer }
import org.reactivestreams.spi.{ Subscriber, Publisher, Subscription }
import akka.testkit.AkkaSpec
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.testkit.StreamTestKit
import akka.stream.scaladsl.Flow
abstract class TwoStreamsSetup extends AkkaSpec {
val materializer = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2), system)
case class TE(message: String) extends RuntimeException(message) with NoStackTrace
val TestException = TE("test")
type Outputs
def operationUnderTest(in1: Flow[Int], in2: Producer[Int]): Flow[Outputs]
def setup(p1: Publisher[Int], p2: Publisher[Int]) = {
val consumer = StreamTestKit.consumerProbe[Outputs]
operationUnderTest(Flow(producerFromPublisher(p1)), producerFromPublisher(p2)).toProducer(materializer).produceTo(consumer)
consumer
}
def producerFromPublisher[T](publisher: Publisher[T]): Producer[T] = new Producer[T] {
private val pub = publisher
override def produceTo(consumer: Consumer[T]): Unit = pub.subscribe(consumer.getSubscriber)
override def getPublisher: Publisher[T] = pub
}
def failedPublisher[T]: Publisher[T] = new Publisher[T] {
override def subscribe(subscriber: Subscriber[T]): Unit = {
subscriber.onError(TestException)
}
}
def completedPublisher[T]: Publisher[T] = new Publisher[T] {
override def subscribe(subscriber: Subscriber[T]): Unit = {
subscriber.onComplete()
}
}
def nonemptyPublisher[T](elems: Iterator[T]): Publisher[T] = Flow(elems).toProducer(materializer).getPublisher
def soonToFailPublisher[T]: Publisher[T] = new Publisher[T] {
override def subscribe(subscriber: Subscriber[T]): Unit = subscriber.onSubscribe(FailedSubscription(subscriber))
}
def soonToCompletePublisher[T]: Publisher[T] = new Publisher[T] {
override def subscribe(subscriber: Subscriber[T]): Unit = subscriber.onSubscribe(CompletedSubscription(subscriber))
}
case class FailedSubscription(subscriber: Subscriber[_]) extends Subscription {
override def requestMore(elements: Int): Unit = subscriber.onError(TestException)
override def cancel(): Unit = ()
}
case class CompletedSubscription(subscriber: Subscriber[_]) extends Subscription {
override def requestMore(elements: Int): Unit = subscriber.onComplete()
override def cancel(): Unit = ()
}
def commonTests() = {
"work with two immediately completed producers" in {
val consumer = setup(completedPublisher, completedPublisher)
val subscription = consumer.expectSubscription()
subscription.requestMore(1)
consumer.expectComplete()
}
"work with two delayed completed producers" in {
val consumer = setup(soonToCompletePublisher, soonToCompletePublisher)
val subscription = consumer.expectSubscription()
subscription.requestMore(1)
consumer.expectComplete()
}
"work with one immediately completed and one delayed completed producer" in {
val consumer = setup(completedPublisher, soonToCompletePublisher)
val subscription = consumer.expectSubscription()
subscription.requestMore(1)
consumer.expectComplete()
}
"work with two immediately failed producers" in {
val consumer = setup(failedPublisher, failedPublisher)
consumer.expectError(TestException)
}
"work with two delayed failed producers" in {
val consumer = setup(soonToFailPublisher, soonToFailPublisher)
val subscription = consumer.expectSubscription()
subscription.requestMore(1)
consumer.expectError(TestException)
}
// Warning: The two test cases below are somewhat implementation specific and might fail if the implementation
// is changed. They are here to be an early warning though.
"work with one immediately failed and one delayed failed producer (case 1)" in {
val consumer = setup(soonToFailPublisher, failedPublisher)
val subscription = consumer.expectSubscription()
subscription.requestMore(1)
consumer.expectError(TestException)
}
"work with one immediately failed and one delayed failed producer (case 2)" in {
val consumer = setup(failedPublisher, soonToFailPublisher)
consumer.expectError(TestException)
}
}
}

View file

@ -78,12 +78,12 @@ trait ScriptedTest extends ShouldMatchers {
class ScriptRunner[In, Out](
op: Flow[In] Flow[Out],
gen: MaterializerSettings,
settings: MaterializerSettings,
script: Script[In, Out],
maximumOverrun: Int,
maximumRequest: Int,
maximumBuffer: Int)(implicit _system: ActorSystem)
extends ChainSetup(op, gen) {
extends ChainSetup(op, settings) {
var _debugLog = Vector.empty[String]
var currentScript = script
@ -186,9 +186,9 @@ trait ScriptedTest extends ShouldMatchers {
}
def runScript[In, Out](script: Script[In, Out], gen: MaterializerSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)(
def runScript[In, Out](script: Script[In, Out], settings: MaterializerSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)(
op: Flow[In] Flow[Out])(implicit system: ActorSystem): Unit = {
new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run()
new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer).run()
}
}