!str add ScriptTest and apply it to mapConcat
This commit is contained in:
parent
606ec5fae8
commit
e623dcb560
4 changed files with 239 additions and 13 deletions
|
|
@ -0,0 +1,31 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.stream.testkit.ScriptedTest
|
||||
|
||||
class StreamMapConcatSpec extends AkkaSpec with ScriptedTest {
|
||||
|
||||
val genSettings = GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
maximumInputBufferSize = 16,
|
||||
initialFanOutBufferSize = 1,
|
||||
maxFanOutBufferSize = 16)
|
||||
|
||||
"A MapConcat" must {
|
||||
|
||||
"map and concat" in {
|
||||
val script = Script(
|
||||
Seq(1) -> Seq(1),
|
||||
Seq(2) -> Seq(2, 2),
|
||||
Seq(3) -> Seq(3, 3, 3),
|
||||
Seq(2) -> Seq(2, 2),
|
||||
Seq(1) -> Seq(1))
|
||||
(1 to 100) foreach (_ ⇒ runScript(script, genSettings)(_.mapConcat(x ⇒ (1 to x) map (_ ⇒ x))))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.stream
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.stream.testkit.StreamTestKit
|
||||
import akka.stream.testkit.{ ChainSetup, StreamTestKit }
|
||||
import akka.testkit._
|
||||
import org.reactivestreams.api.Producer
|
||||
import org.scalatest.FreeSpecLike
|
||||
|
|
@ -24,8 +24,8 @@ class StreamSpec extends AkkaSpec {
|
|||
val identity2: Stream[Any] ⇒ Stream[Any] = in ⇒ identity(in)
|
||||
|
||||
"A Stream" must {
|
||||
"requests initial elements from upstream" in {
|
||||
for (op ← List(identity, identity2); n ← List(1, 2, 4)) {
|
||||
for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) {
|
||||
s"requests initial elements from upstream ($name, $n)" in {
|
||||
new ChainSetup(op, genSettings.copy(initialInputBufferSize = n)) {
|
||||
upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize)
|
||||
}
|
||||
|
|
@ -374,14 +374,4 @@ class StreamSpec extends AkkaSpec {
|
|||
|
||||
object TestException extends RuntimeException
|
||||
|
||||
class ChainSetup[I, O](stream: Stream[I] ⇒ Stream[O], val settings: GeneratorSettings) {
|
||||
val upstream = StreamTestKit.producerProbe[I]()
|
||||
val downstream = StreamTestKit.consumerProbe[O]()
|
||||
|
||||
private val s = stream(Stream(upstream))
|
||||
val producer = s.toProducer(ProcessorGenerator(settings))
|
||||
val upstreamSubscription = upstream.expectSubscription()
|
||||
producer.produceTo(downstream)
|
||||
val downstreamSubscription = downstream.expectSubscription()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,18 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.testkit
|
||||
|
||||
import akka.stream.{ GeneratorSettings, Stream, ProcessorGenerator }
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
class ChainSetup[I, O](stream: Stream[I] ⇒ Stream[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) {
|
||||
val upstream = StreamTestKit.producerProbe[I]()
|
||||
val downstream = StreamTestKit.consumerProbe[O]()
|
||||
|
||||
private val s = stream(Stream(upstream))
|
||||
val producer = s.toProducer(ProcessorGenerator(settings))
|
||||
val upstreamSubscription = upstream.expectSubscription()
|
||||
producer.produceTo(downstream)
|
||||
val downstreamSubscription = downstream.expectSubscription()
|
||||
}
|
||||
|
|
@ -0,0 +1,187 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.testkit
|
||||
|
||||
import org.scalatest._
|
||||
import akka.actor.ActorSystem
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NonFatal
|
||||
import akka.stream._
|
||||
|
||||
trait ScriptedTest extends ShouldMatchers {
|
||||
|
||||
class ScriptException(msg: String) extends RuntimeException(msg)
|
||||
|
||||
object Script {
|
||||
def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = {
|
||||
var providedInputs = Vector[In]()
|
||||
var expectedOutputs = Vector[Out]()
|
||||
var jumps = Vector[Int]()
|
||||
|
||||
for ((ins, outs) ← phases) {
|
||||
providedInputs ++= ins
|
||||
expectedOutputs ++= outs
|
||||
jumps ++= Vector.fill(ins.size - 1)(0) ++ Vector(outs.size)
|
||||
}
|
||||
|
||||
Script(providedInputs, expectedOutputs, jumps, inputCursor = 0, outputCursor = 0, outputEndCursor = 0, completed = false)
|
||||
}
|
||||
}
|
||||
|
||||
case class Script[In, Out](
|
||||
providedInputs: Vector[In],
|
||||
expectedOutputs: Vector[Out],
|
||||
jumps: Vector[Int],
|
||||
inputCursor: Int,
|
||||
outputCursor: Int,
|
||||
outputEndCursor: Int,
|
||||
completed: Boolean) {
|
||||
require(jumps.size == providedInputs.size)
|
||||
|
||||
def provideInput: (In, Script[In, Out]) =
|
||||
if (noInsPending)
|
||||
throw new ScriptException("Script cannot provide more input.")
|
||||
else
|
||||
(providedInputs(inputCursor), this.copy(inputCursor = inputCursor + 1, outputEndCursor = outputEndCursor + jumps(inputCursor)))
|
||||
|
||||
def consumeOutput(out: Out): Script[In, Out] = {
|
||||
if (noOutsPending)
|
||||
throw new ScriptException(s"Tried to produce element ${out} but no elements should be produced right now.")
|
||||
expectedOutputs(outputCursor) should be(out)
|
||||
this.copy(outputCursor = outputCursor + 1)
|
||||
}
|
||||
|
||||
def complete(): Script[In, Out] = {
|
||||
if (finished) copy(completed = true)
|
||||
else fail("received onComplete prematurely")
|
||||
}
|
||||
|
||||
def finished: Boolean = outputCursor == expectedOutputs.size
|
||||
|
||||
def pendingOuts: Int = outputEndCursor - outputCursor
|
||||
def noOutsPending: Boolean = pendingOuts == 0
|
||||
def someOutsPending: Boolean = !noOutsPending
|
||||
|
||||
def pendingIns: Int = providedInputs.size - inputCursor
|
||||
def noInsPending: Boolean = pendingIns == 0
|
||||
def someInsPending: Boolean = !noInsPending
|
||||
|
||||
def debug: String = s"Script(pending=($pendingIns in, $pendingOuts out), remainingIns=${providedInputs.drop(inputCursor).mkString("/")}, remainingOuts=${expectedOutputs.drop(outputCursor).mkString("/")})"
|
||||
}
|
||||
|
||||
class ScriptRunner[In, Out](
|
||||
op: Stream[In] ⇒ Stream[Out],
|
||||
gen: GeneratorSettings,
|
||||
script: Script[In, Out],
|
||||
maximumOverrun: Int,
|
||||
maximumRequest: Int,
|
||||
maximumBuffer: Int)(implicit _system: ActorSystem)
|
||||
extends ChainSetup(op, gen) {
|
||||
|
||||
var _debugLog = Vector.empty[String]
|
||||
var currentScript = script
|
||||
var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(maximumOverrun)
|
||||
debugLog(s"starting with remainingDemand=$remainingDemand")
|
||||
var pendingRequests = 0
|
||||
var outstandingDemand = 0
|
||||
var completed = false
|
||||
|
||||
def getNextDemand(): Int = {
|
||||
val max = Math.min(remainingDemand, maximumRequest)
|
||||
if (max == 1) {
|
||||
remainingDemand = 0
|
||||
1
|
||||
} else {
|
||||
val demand = ThreadLocalRandom.current().nextInt(1, max)
|
||||
remainingDemand -= demand
|
||||
demand
|
||||
}
|
||||
}
|
||||
|
||||
def debugLog(msg: String): Unit = _debugLog :+= msg
|
||||
|
||||
def requestMore(demand: Int): Unit = {
|
||||
debugLog(s"test environment requests $demand")
|
||||
downstreamSubscription.requestMore(demand)
|
||||
outstandingDemand += demand
|
||||
}
|
||||
|
||||
def mayProvideInput: Boolean = currentScript.someInsPending && (pendingRequests > 0) && (currentScript.pendingOuts <= maximumBuffer)
|
||||
def mayRequestMore: Boolean = remainingDemand > 0
|
||||
|
||||
def shakeIt(): Boolean = {
|
||||
val u = upstream.probe.receiveWhile(1.milliseconds) {
|
||||
case RequestMore(_, n) ⇒
|
||||
debugLog(s"operation requests $n")
|
||||
pendingRequests += n
|
||||
true
|
||||
case _ ⇒ false // Ignore
|
||||
}
|
||||
val d = downstream.probe.receiveWhile(1.milliseconds) {
|
||||
case OnNext(elem: Out) ⇒
|
||||
debugLog(s"operation produces [$elem]")
|
||||
if (outstandingDemand == 0) fail("operation produced while there was no demand")
|
||||
outstandingDemand -= 1
|
||||
currentScript = currentScript.consumeOutput(elem)
|
||||
true
|
||||
case OnComplete ⇒
|
||||
currentScript = currentScript.complete()
|
||||
true
|
||||
case _ ⇒ false // Ignore
|
||||
}
|
||||
(u ++ d) exists (x ⇒ x)
|
||||
}
|
||||
|
||||
def run(): Unit = {
|
||||
|
||||
@tailrec def doRun(idleRounds: Int): Unit = {
|
||||
if (idleRounds > 10) fail("too many idle rounds")
|
||||
if (!currentScript.completed) {
|
||||
val nextIdle = if (shakeIt()) 0 else idleRounds + 1
|
||||
|
||||
val tieBreak = ThreadLocalRandom.current().nextBoolean()
|
||||
if (mayProvideInput && (!mayRequestMore || tieBreak)) {
|
||||
val (input, nextScript) = currentScript.provideInput
|
||||
debugLog(s"test environment produces [${input}]")
|
||||
pendingRequests -= 1
|
||||
currentScript = nextScript
|
||||
upstreamSubscription.sendNext(input)
|
||||
doRun(nextIdle)
|
||||
} else if (mayRequestMore && (!mayProvideInput || !tieBreak)) {
|
||||
requestMore(getNextDemand())
|
||||
doRun(nextIdle)
|
||||
} else {
|
||||
if (currentScript.noInsPending && !completed) {
|
||||
debugLog("test environment completes")
|
||||
upstreamSubscription.sendComplete()
|
||||
completed = true
|
||||
}
|
||||
doRun(nextIdle)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
requestMore(getNextDemand())
|
||||
doRun(0)
|
||||
} catch {
|
||||
case e: Throwable ⇒
|
||||
println(_debugLog.mkString("Steps leading to failure:\n", "\n", "\nCurrentScript: " + currentScript.debug))
|
||||
throw e
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def runScript[In, Out](script: Script[In, Out], gen: GeneratorSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)(
|
||||
op: Stream[In] ⇒ Stream[Out])(implicit system: ActorSystem): Unit = {
|
||||
new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run()
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue