diff --git a/akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala new file mode 100644 index 0000000000..6e62844fb7 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.testkit.AkkaSpec +import akka.stream.testkit.ScriptedTest + +class StreamMapConcatSpec extends AkkaSpec with ScriptedTest { + + val genSettings = GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 16, + initialFanOutBufferSize = 1, + maxFanOutBufferSize = 16) + + "A MapConcat" must { + + "map and concat" in { + val script = Script( + Seq(1) -> Seq(1), + Seq(2) -> Seq(2, 2), + Seq(3) -> Seq(3, 3, 3), + Seq(2) -> Seq(2, 2), + Seq(1) -> Seq(1)) + (1 to 100) foreach (_ ⇒ runScript(script, genSettings)(_.mapConcat(x ⇒ (1 to x) map (_ ⇒ x)))) + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala index d2cfcd18ea..7cad566224 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala @@ -4,7 +4,7 @@ package akka.stream import scala.concurrent.duration._ -import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.{ ChainSetup, StreamTestKit } import akka.testkit._ import org.reactivestreams.api.Producer import org.scalatest.FreeSpecLike @@ -24,8 +24,8 @@ class StreamSpec extends AkkaSpec { val identity2: Stream[Any] ⇒ Stream[Any] = in ⇒ identity(in) "A Stream" must { - "requests initial elements from upstream" in { - for (op ← List(identity, identity2); n ← List(1, 2, 4)) { + for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) { + s"requests initial elements from upstream ($name, $n)" in { new ChainSetup(op, genSettings.copy(initialInputBufferSize = n)) { upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize) } @@ -374,14 +374,4 @@ class StreamSpec extends AkkaSpec { object TestException extends RuntimeException - class ChainSetup[I, O](stream: Stream[I] ⇒ Stream[O], val settings: GeneratorSettings) { - val upstream = StreamTestKit.producerProbe[I]() - val downstream = StreamTestKit.consumerProbe[O]() - - private val s = stream(Stream(upstream)) - val producer = s.toProducer(ProcessorGenerator(settings)) - val upstreamSubscription = upstream.expectSubscription() - producer.produceTo(downstream) - val downstreamSubscription = downstream.expectSubscription() - } } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala b/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala new file mode 100644 index 0000000000..e011a2d4ca --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala @@ -0,0 +1,18 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import akka.stream.{ GeneratorSettings, Stream, ProcessorGenerator } +import akka.actor.ActorSystem + +class ChainSetup[I, O](stream: Stream[I] ⇒ Stream[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) { + val upstream = StreamTestKit.producerProbe[I]() + val downstream = StreamTestKit.consumerProbe[O]() + + private val s = stream(Stream(upstream)) + val producer = s.toProducer(ProcessorGenerator(settings)) + val upstreamSubscription = upstream.expectSubscription() + producer.produceTo(downstream) + val downstreamSubscription = downstream.expectSubscription() +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala new file mode 100644 index 0000000000..f4a1e634e5 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -0,0 +1,187 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import org.scalatest._ +import akka.actor.ActorSystem +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.duration._ +import scala.util.control.NonFatal +import akka.stream._ + +trait ScriptedTest extends ShouldMatchers { + + class ScriptException(msg: String) extends RuntimeException(msg) + + object Script { + def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = { + var providedInputs = Vector[In]() + var expectedOutputs = Vector[Out]() + var jumps = Vector[Int]() + + for ((ins, outs) ← phases) { + providedInputs ++= ins + expectedOutputs ++= outs + jumps ++= Vector.fill(ins.size - 1)(0) ++ Vector(outs.size) + } + + Script(providedInputs, expectedOutputs, jumps, inputCursor = 0, outputCursor = 0, outputEndCursor = 0, completed = false) + } + } + + case class Script[In, Out]( + providedInputs: Vector[In], + expectedOutputs: Vector[Out], + jumps: Vector[Int], + inputCursor: Int, + outputCursor: Int, + outputEndCursor: Int, + completed: Boolean) { + require(jumps.size == providedInputs.size) + + def provideInput: (In, Script[In, Out]) = + if (noInsPending) + throw new ScriptException("Script cannot provide more input.") + else + (providedInputs(inputCursor), this.copy(inputCursor = inputCursor + 1, outputEndCursor = outputEndCursor + jumps(inputCursor))) + + def consumeOutput(out: Out): Script[In, Out] = { + if (noOutsPending) + throw new ScriptException(s"Tried to produce element ${out} but no elements should be produced right now.") + expectedOutputs(outputCursor) should be(out) + this.copy(outputCursor = outputCursor + 1) + } + + def complete(): Script[In, Out] = { + if (finished) copy(completed = true) + else fail("received onComplete prematurely") + } + + def finished: Boolean = outputCursor == expectedOutputs.size + + def pendingOuts: Int = outputEndCursor - outputCursor + def noOutsPending: Boolean = pendingOuts == 0 + def someOutsPending: Boolean = !noOutsPending + + def pendingIns: Int = providedInputs.size - inputCursor + def noInsPending: Boolean = pendingIns == 0 + def someInsPending: Boolean = !noInsPending + + def debug: String = s"Script(pending=($pendingIns in, $pendingOuts out), remainingIns=${providedInputs.drop(inputCursor).mkString("/")}, remainingOuts=${expectedOutputs.drop(outputCursor).mkString("/")})" + } + + class ScriptRunner[In, Out]( + op: Stream[In] ⇒ Stream[Out], + gen: GeneratorSettings, + script: Script[In, Out], + maximumOverrun: Int, + maximumRequest: Int, + maximumBuffer: Int)(implicit _system: ActorSystem) + extends ChainSetup(op, gen) { + + var _debugLog = Vector.empty[String] + var currentScript = script + var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(maximumOverrun) + debugLog(s"starting with remainingDemand=$remainingDemand") + var pendingRequests = 0 + var outstandingDemand = 0 + var completed = false + + def getNextDemand(): Int = { + val max = Math.min(remainingDemand, maximumRequest) + if (max == 1) { + remainingDemand = 0 + 1 + } else { + val demand = ThreadLocalRandom.current().nextInt(1, max) + remainingDemand -= demand + demand + } + } + + def debugLog(msg: String): Unit = _debugLog :+= msg + + def requestMore(demand: Int): Unit = { + debugLog(s"test environment requests $demand") + downstreamSubscription.requestMore(demand) + outstandingDemand += demand + } + + def mayProvideInput: Boolean = currentScript.someInsPending && (pendingRequests > 0) && (currentScript.pendingOuts <= maximumBuffer) + def mayRequestMore: Boolean = remainingDemand > 0 + + def shakeIt(): Boolean = { + val u = upstream.probe.receiveWhile(1.milliseconds) { + case RequestMore(_, n) ⇒ + debugLog(s"operation requests $n") + pendingRequests += n + true + case _ ⇒ false // Ignore + } + val d = downstream.probe.receiveWhile(1.milliseconds) { + case OnNext(elem: Out) ⇒ + debugLog(s"operation produces [$elem]") + if (outstandingDemand == 0) fail("operation produced while there was no demand") + outstandingDemand -= 1 + currentScript = currentScript.consumeOutput(elem) + true + case OnComplete ⇒ + currentScript = currentScript.complete() + true + case _ ⇒ false // Ignore + } + (u ++ d) exists (x ⇒ x) + } + + def run(): Unit = { + + @tailrec def doRun(idleRounds: Int): Unit = { + if (idleRounds > 10) fail("too many idle rounds") + if (!currentScript.completed) { + val nextIdle = if (shakeIt()) 0 else idleRounds + 1 + + val tieBreak = ThreadLocalRandom.current().nextBoolean() + if (mayProvideInput && (!mayRequestMore || tieBreak)) { + val (input, nextScript) = currentScript.provideInput + debugLog(s"test environment produces [${input}]") + pendingRequests -= 1 + currentScript = nextScript + upstreamSubscription.sendNext(input) + doRun(nextIdle) + } else if (mayRequestMore && (!mayProvideInput || !tieBreak)) { + requestMore(getNextDemand()) + doRun(nextIdle) + } else { + if (currentScript.noInsPending && !completed) { + debugLog("test environment completes") + upstreamSubscription.sendComplete() + completed = true + } + doRun(nextIdle) + } + + } + } + + try { + requestMore(getNextDemand()) + doRun(0) + } catch { + case e: Throwable ⇒ + println(_debugLog.mkString("Steps leading to failure:\n", "\n", "\nCurrentScript: " + currentScript.debug)) + throw e + } + + } + + } + + def runScript[In, Out](script: Script[In, Out], gen: GeneratorSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)( + op: Stream[In] ⇒ Stream[Out])(implicit system: ActorSystem): Unit = { + new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run() + } + +}