=str #15740 hookup filter and collect

* copy tests from old DSL
* adapt ChainSetup to the new DSL
This commit is contained in:
Martynas Mickevicius 2014-09-09 12:22:14 +03:00
parent bf1e264028
commit 2a29fc0e63
5 changed files with 332 additions and 0 deletions

View file

@ -36,11 +36,55 @@ trait FlowOps[-In, +Out] extends HasNoSink[Out] {
// Storing ops in reverse order
protected def andThen[U](op: AstNode): Repr[In, U]
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.
*/
def map[T](f: Out T): Repr[In, T] =
transform("map", () new Transformer[Out, T] {
override def onNext(in: Out) = List(f(in))
})
/**
* Only pass on those elements that satisfy the given predicate.
*/
def filter(p: Out Boolean): Repr[In, Out] =
transform("filter", () new Transformer[Out, Out] {
override def onNext(in: Out) = if (p(in)) List(in) else Nil
})
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*/
def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] =
transform("collect", () new Transformer[Out, T] {
override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil
})
/**
* Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]]
* function is invoked, expecting a (possibly empty) sequence of output elements
* to be produced.
* After handing off the elements produced from one input element to the downstream
* subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end
* stream processing at this point; in that case the upstream subscription is
* canceled. Before signaling normal completion to the downstream subscribers,
* the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty)
* sequence of elements in response to the end-of-stream event.
*
* [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream.
*
* After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called.
*
* It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with
* ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and
* therefore you do not have to add any additional thread safety or memory
* visibility constructs to access the state from the callback methods.
*
* Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer.
*/
def transform[T](name: String, mkTransformer: () Transformer[Out, T]): Repr[In, T] = {
andThen(Transform(name, mkTransformer.asInstanceOf[() Transformer[Any, Any]]))
}

View file

@ -0,0 +1,29 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.stream.MaterializerSettings
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit2.ScriptedTest
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
class FlowCollectSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings(system)
"A Collect" must {
"collect" in {
val range = 1 to 50
def script = Script(range map { _
val x = random.nextInt(0, 10000)
Seq(x) -> (if ((x & 1) == 0) Seq((x * x).toString) else Seq.empty[String])
}: _*)
range foreach (_ runScript(script, settings)(_.collect { case x if x % 2 == 0 (x * x).toString }))
}
}
}

View file

@ -0,0 +1,47 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import akka.stream.MaterializerSettings
import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import akka.stream.testkit2.ScriptedTest
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
class FlowFilterSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
.withFanOutBuffer(initialSize = 1, maxSize = 16)
"A Filter" must {
"filter" in {
def script = Script((1 to 50) map { _ val x = random.nextInt(); Seq(x) -> (if ((x & 1) == 0) Seq(x) else Seq()) }: _*)
(1 to 50) foreach (_ runScript(script, settings)(_.filter(_ % 2 == 0)))
}
"not blow up with high request counts" in {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 1, maxSize = 1)
.withFanOutBuffer(initialSize = 1, maxSize = 1)
implicit val materializer = FlowMaterializer(settings)
val probe = StreamTestKit.SubscriberProbe[Int]()
FlowFrom(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0).
toPublisher().subscribe(probe)
val subscription = probe.expectSubscription()
for (_ 1 to 10000) {
subscription.request(Int.MaxValue)
}
probe.expectNext(1)
probe.expectComplete()
}
}
}

View file

@ -0,0 +1,17 @@
package akka.stream.testkit2
import akka.actor.ActorSystem
import akka.stream.MaterializerSettings
import akka.stream.scaladsl2.{ FlowFrom, FlowMaterializer, ProcessorFlow, PublisherSource }
import akka.stream.testkit.StreamTestKit
class ChainSetup[In, Out](stream: ProcessorFlow[In, In] ProcessorFlow[In, Out], val settings: MaterializerSettings)(implicit val system: ActorSystem) {
val upstream = StreamTestKit.PublisherProbe[In]()
val downstream = StreamTestKit.SubscriberProbe[Out]()
private val s = stream(FlowFrom[In]).withSource(PublisherSource(upstream))
val publisher = s.toPublisher()(FlowMaterializer(settings))
val upstreamSubscription = upstream.expectSubscription()
publisher.subscribe(downstream)
val downstreamSubscription = downstream.expectSubscription()
}

View file

@ -0,0 +1,195 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit2
import akka.actor.ActorSystem
import akka.stream.MaterializerSettings
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl2.ProcessorFlow
import akka.stream.testkit.StreamTestKit._
import org.scalatest.Matchers
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
trait ScriptedTest extends Matchers {
class ScriptException(msg: String) extends RuntimeException(msg)
object Script {
def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = {
var providedInputs = Vector[In]()
var expectedOutputs = Vector[Out]()
var jumps = Vector[Int]()
for ((ins, outs) phases) {
providedInputs ++= ins
expectedOutputs ++= outs
jumps ++= Vector.fill(ins.size - 1)(0) ++ Vector(outs.size)
}
Script(providedInputs, expectedOutputs, jumps, inputCursor = 0, outputCursor = 0, outputEndCursor = 0, completed = false)
}
}
case class Script[In, Out](
providedInputs: Vector[In],
expectedOutputs: Vector[Out],
jumps: Vector[Int],
inputCursor: Int,
outputCursor: Int,
outputEndCursor: Int,
completed: Boolean) {
require(jumps.size == providedInputs.size)
def provideInput: (In, Script[In, Out]) =
if (noInsPending)
throw new ScriptException("Script cannot provide more input.")
else
(providedInputs(inputCursor), this.copy(inputCursor = inputCursor + 1, outputEndCursor = outputEndCursor + jumps(inputCursor)))
def consumeOutput(out: Out): Script[In, Out] = {
if (noOutsPending)
throw new ScriptException(s"Tried to produce element ${out} but no elements should be produced right now.")
out should be(expectedOutputs(outputCursor))
this.copy(outputCursor = outputCursor + 1)
}
def complete(): Script[In, Out] = {
if (finished) copy(completed = true)
else fail("received onComplete prematurely")
}
def finished: Boolean = outputCursor == expectedOutputs.size
def error(e: Throwable): Script[In, Out] = throw e
def pendingOuts: Int = outputEndCursor - outputCursor
def noOutsPending: Boolean = pendingOuts == 0
def someOutsPending: Boolean = !noOutsPending
def pendingIns: Int = providedInputs.size - inputCursor
def noInsPending: Boolean = pendingIns == 0
def someInsPending: Boolean = !noInsPending
def debug: String = s"Script(pending=($pendingIns in, $pendingOuts out), remainingIns=${providedInputs.drop(inputCursor).mkString("/")}, remainingOuts=${expectedOutputs.drop(outputCursor).mkString("/")})"
}
class ScriptRunner[In, Out](
op: ProcessorFlow[In, In] ProcessorFlow[In, Out],
settings: MaterializerSettings,
script: Script[In, Out],
maximumOverrun: Int,
maximumRequest: Int,
maximumBuffer: Int)(implicit _system: ActorSystem)
extends ChainSetup(op, settings) {
var _debugLog = Vector.empty[String]
var currentScript = script
var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun)
debugLog(s"starting with remainingDemand=$remainingDemand")
var pendingRequests = 0
var outstandingDemand = 0
var completed = false
def getNextDemand(): Int = {
val max = Math.min(remainingDemand, maximumRequest)
if (max == 1) {
remainingDemand = 0
1
} else {
val demand = ThreadLocalRandom.current().nextInt(1, max)
remainingDemand -= demand
demand
}
}
def debugLog(msg: String): Unit = _debugLog :+= msg
def request(demand: Int): Unit = {
debugLog(s"test environment requests $demand")
downstreamSubscription.request(demand)
outstandingDemand += demand
}
def mayProvideInput: Boolean = currentScript.someInsPending && (pendingRequests > 0) && (currentScript.pendingOuts <= maximumBuffer)
def mayRequestMore: Boolean = remainingDemand > 0
def shakeIt(): Boolean = {
val u = upstream.probe.receiveWhile(1.milliseconds) {
case RequestMore(_, n)
debugLog(s"operation requests $n")
pendingRequests += n
true
case _ false // Ignore
}
val d = downstream.probe.receiveWhile(1.milliseconds) {
case OnNext(elem: Out)
debugLog(s"operation produces [$elem]")
if (outstandingDemand == 0) fail("operation produced while there was no demand")
outstandingDemand -= 1
currentScript = currentScript.consumeOutput(elem)
true
case OnComplete
currentScript = currentScript.complete()
true
case OnError(e)
currentScript = currentScript.error(e)
true
case _ false // Ignore
}
(u ++ d) exists (x x)
}
def run(): Unit = {
@tailrec def doRun(idleRounds: Int): Unit = {
if (idleRounds > 250) fail("too many idle rounds")
if (!currentScript.completed) {
val nextIdle = if (shakeIt()) 0 else idleRounds + 1
val tieBreak = ThreadLocalRandom.current().nextBoolean()
if (mayProvideInput && (!mayRequestMore || tieBreak)) {
val (input, nextScript) = currentScript.provideInput
debugLog(s"test environment produces [${input}]")
pendingRequests -= 1
currentScript = nextScript
upstreamSubscription.sendNext(input)
doRun(nextIdle)
} else if (mayRequestMore && (!mayProvideInput || !tieBreak)) {
request(getNextDemand())
doRun(nextIdle)
} else {
if (currentScript.noInsPending && !completed) {
debugLog("test environment completes")
upstreamSubscription.sendComplete()
completed = true
}
doRun(nextIdle)
}
}
}
try {
debugLog(s"running $script")
request(getNextDemand())
doRun(0)
} catch {
case e: Throwable
println(_debugLog.mkString("Steps leading to failure:\n", "\n", "\nCurrentScript: " + currentScript.debug))
throw e
}
}
}
def runScript[In, Out](script: Script[In, Out], settings: MaterializerSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)(
op: ProcessorFlow[In, In] ProcessorFlow[In, Out])(implicit system: ActorSystem): Unit = {
new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer).run()
}
}