From 2a29fc0e634374be84436d3680e34fc2ca18c37e Mon Sep 17 00:00:00 2001 From: Martynas Mickevicius Date: Tue, 9 Sep 2014 12:22:14 +0300 Subject: [PATCH] =str #15740 hookup filter and collect * copy tests from old DSL * adapt ChainSetup to the new DSL --- .../scala/akka/stream/scaladsl2/Flow.scala | 44 ++++ .../stream/scaladsl2/FlowCollectSpec.scala | 29 +++ .../stream/scaladsl2/FlowFilterSpec.scala | 47 +++++ .../akka/stream/testkit2/ChainSetup.scala | 17 ++ .../akka/stream/testkit2/ScriptedTest.scala | 195 ++++++++++++++++++ 5 files changed, 332 insertions(+) create mode 100644 akka-stream/src/test/scala/akka/stream/scaladsl2/FlowCollectSpec.scala create mode 100644 akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFilterSpec.scala create mode 100644 akka-stream/src/test/scala/akka/stream/testkit2/ChainSetup.scala create mode 100644 akka-stream/src/test/scala/akka/stream/testkit2/ScriptedTest.scala diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index a3d991025f..600d821fac 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -36,11 +36,55 @@ trait FlowOps[-In, +Out] extends HasNoSink[Out] { // Storing ops in reverse order protected def andThen[U](op: AstNode): Repr[In, U] + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. + */ def map[T](f: Out ⇒ T): Repr[In, T] = transform("map", () ⇒ new Transformer[Out, T] { override def onNext(in: Out) = List(f(in)) }) + /** + * Only pass on those elements that satisfy the given predicate. + */ + def filter(p: Out ⇒ Boolean): Repr[In, Out] = + transform("filter", () ⇒ new Transformer[Out, Out] { + override def onNext(in: Out) = if (p(in)) List(in) else Nil + }) + + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step. + * Non-matching elements are filtered out. + */ + def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] = + transform("collect", () ⇒ new Transformer[Out, T] { + override def onNext(in: Out) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil + }) + + /** + * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] + * function is invoked, expecting a (possibly empty) sequence of output elements + * to be produced. + * After handing off the elements produced from one input element to the downstream + * subscribers, the [[akka.stream.Transformer#isComplete]] predicate determines whether to end + * stream processing at this point; in that case the upstream subscription is + * canceled. Before signaling normal completion to the downstream subscribers, + * the [[akka.stream.Transformer#onComplete]] function is invoked to produce a (possibly empty) + * sequence of elements in response to the end-of-stream event. + * + * [[akka.stream.Transformer#onError]] is called when failure is signaled from upstream. + * + * After normal completion or error the [[akka.stream.Transformer#cleanup]] function is called. + * + * It is possible to keep state in the concrete [[akka.stream.Transformer]] instance with + * ordinary instance variables. The [[akka.stream.Transformer]] is executed by an actor and + * therefore you do not have to add any additional thread safety or memory + * visibility constructs to access the state from the callback methods. + * + * Note that you can use [[#timerTransform]] if you need support for scheduled events in the transformer. + */ def transform[T](name: String, mkTransformer: () ⇒ Transformer[Out, T]): Repr[In, T] = { andThen(Transform(name, mkTransformer.asInstanceOf[() ⇒ Transformer[Any, Any]])) } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowCollectSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowCollectSpec.scala new file mode 100644 index 0000000000..fdb90090d1 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowCollectSpec.scala @@ -0,0 +1,29 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.MaterializerSettings +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit2.ScriptedTest + +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } + +class FlowCollectSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(system) + + "A Collect" must { + + "collect" in { + val range = 1 to 50 + def script = Script(range map { _ ⇒ + val x = random.nextInt(0, 10000) + Seq(x) -> (if ((x & 1) == 0) Seq((x * x).toString) else Seq.empty[String]) + }: _*) + range foreach (_ ⇒ runScript(script, settings)(_.collect { case x if x % 2 == 0 ⇒ (x * x).toString })) + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFilterSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFilterSpec.scala new file mode 100644 index 0000000000..f35e466ebe --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowFilterSpec.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.MaterializerSettings +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit2.ScriptedTest + +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } + +class FlowFilterSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + "A Filter" must { + + "filter" in { + def script = Script((1 to 50) map { _ ⇒ val x = random.nextInt(); Seq(x) -> (if ((x & 1) == 0) Seq(x) else Seq()) }: _*) + (1 to 50) foreach (_ ⇒ runScript(script, settings)(_.filter(_ % 2 == 0))) + } + + "not blow up with high request counts" in { + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 1, maxSize = 1) + .withFanOutBuffer(initialSize = 1, maxSize = 1) + implicit val materializer = FlowMaterializer(settings) + + val probe = StreamTestKit.SubscriberProbe[Int]() + FlowFrom(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). + toPublisher().subscribe(probe) + + val subscription = probe.expectSubscription() + for (_ ← 1 to 10000) { + subscription.request(Int.MaxValue) + } + + probe.expectNext(1) + probe.expectComplete() + + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit2/ChainSetup.scala b/akka-stream/src/test/scala/akka/stream/testkit2/ChainSetup.scala new file mode 100644 index 0000000000..7ea7e5a2db --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit2/ChainSetup.scala @@ -0,0 +1,17 @@ +package akka.stream.testkit2 + +import akka.actor.ActorSystem +import akka.stream.MaterializerSettings +import akka.stream.scaladsl2.{ FlowFrom, FlowMaterializer, ProcessorFlow, PublisherSource } +import akka.stream.testkit.StreamTestKit + +class ChainSetup[In, Out](stream: ProcessorFlow[In, In] ⇒ ProcessorFlow[In, Out], val settings: MaterializerSettings)(implicit val system: ActorSystem) { + val upstream = StreamTestKit.PublisherProbe[In]() + val downstream = StreamTestKit.SubscriberProbe[Out]() + + private val s = stream(FlowFrom[In]).withSource(PublisherSource(upstream)) + val publisher = s.toPublisher()(FlowMaterializer(settings)) + val upstreamSubscription = upstream.expectSubscription() + publisher.subscribe(downstream) + val downstreamSubscription = downstream.expectSubscription() +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit2/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit2/ScriptedTest.scala new file mode 100644 index 0000000000..b44c4188e2 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit2/ScriptedTest.scala @@ -0,0 +1,195 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit2 + +import akka.actor.ActorSystem +import akka.stream.MaterializerSettings +import akka.stream.scaladsl.Flow +import akka.stream.scaladsl2.ProcessorFlow +import akka.stream.testkit.StreamTestKit._ +import org.scalatest.Matchers + +import scala.annotation.tailrec +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom + +trait ScriptedTest extends Matchers { + + class ScriptException(msg: String) extends RuntimeException(msg) + + object Script { + def apply[In, Out](phases: (Seq[In], Seq[Out])*): Script[In, Out] = { + var providedInputs = Vector[In]() + var expectedOutputs = Vector[Out]() + var jumps = Vector[Int]() + + for ((ins, outs) ← phases) { + providedInputs ++= ins + expectedOutputs ++= outs + jumps ++= Vector.fill(ins.size - 1)(0) ++ Vector(outs.size) + } + + Script(providedInputs, expectedOutputs, jumps, inputCursor = 0, outputCursor = 0, outputEndCursor = 0, completed = false) + } + } + + case class Script[In, Out]( + providedInputs: Vector[In], + expectedOutputs: Vector[Out], + jumps: Vector[Int], + inputCursor: Int, + outputCursor: Int, + outputEndCursor: Int, + completed: Boolean) { + require(jumps.size == providedInputs.size) + + def provideInput: (In, Script[In, Out]) = + if (noInsPending) + throw new ScriptException("Script cannot provide more input.") + else + (providedInputs(inputCursor), this.copy(inputCursor = inputCursor + 1, outputEndCursor = outputEndCursor + jumps(inputCursor))) + + def consumeOutput(out: Out): Script[In, Out] = { + if (noOutsPending) + throw new ScriptException(s"Tried to produce element ${out} but no elements should be produced right now.") + out should be(expectedOutputs(outputCursor)) + this.copy(outputCursor = outputCursor + 1) + } + + def complete(): Script[In, Out] = { + if (finished) copy(completed = true) + else fail("received onComplete prematurely") + } + + def finished: Boolean = outputCursor == expectedOutputs.size + + def error(e: Throwable): Script[In, Out] = throw e + + def pendingOuts: Int = outputEndCursor - outputCursor + def noOutsPending: Boolean = pendingOuts == 0 + def someOutsPending: Boolean = !noOutsPending + + def pendingIns: Int = providedInputs.size - inputCursor + def noInsPending: Boolean = pendingIns == 0 + def someInsPending: Boolean = !noInsPending + + def debug: String = s"Script(pending=($pendingIns in, $pendingOuts out), remainingIns=${providedInputs.drop(inputCursor).mkString("/")}, remainingOuts=${expectedOutputs.drop(outputCursor).mkString("/")})" + } + + class ScriptRunner[In, Out]( + op: ProcessorFlow[In, In] ⇒ ProcessorFlow[In, Out], + settings: MaterializerSettings, + script: Script[In, Out], + maximumOverrun: Int, + maximumRequest: Int, + maximumBuffer: Int)(implicit _system: ActorSystem) + extends ChainSetup(op, settings) { + + var _debugLog = Vector.empty[String] + var currentScript = script + var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun) + debugLog(s"starting with remainingDemand=$remainingDemand") + var pendingRequests = 0 + var outstandingDemand = 0 + var completed = false + + def getNextDemand(): Int = { + val max = Math.min(remainingDemand, maximumRequest) + if (max == 1) { + remainingDemand = 0 + 1 + } else { + val demand = ThreadLocalRandom.current().nextInt(1, max) + remainingDemand -= demand + demand + } + } + + def debugLog(msg: String): Unit = _debugLog :+= msg + + def request(demand: Int): Unit = { + debugLog(s"test environment requests $demand") + downstreamSubscription.request(demand) + outstandingDemand += demand + } + + def mayProvideInput: Boolean = currentScript.someInsPending && (pendingRequests > 0) && (currentScript.pendingOuts <= maximumBuffer) + def mayRequestMore: Boolean = remainingDemand > 0 + + def shakeIt(): Boolean = { + val u = upstream.probe.receiveWhile(1.milliseconds) { + case RequestMore(_, n) ⇒ + debugLog(s"operation requests $n") + pendingRequests += n + true + case _ ⇒ false // Ignore + } + val d = downstream.probe.receiveWhile(1.milliseconds) { + case OnNext(elem: Out) ⇒ + debugLog(s"operation produces [$elem]") + if (outstandingDemand == 0) fail("operation produced while there was no demand") + outstandingDemand -= 1 + currentScript = currentScript.consumeOutput(elem) + true + case OnComplete ⇒ + currentScript = currentScript.complete() + true + case OnError(e) ⇒ + currentScript = currentScript.error(e) + true + case _ ⇒ false // Ignore + } + (u ++ d) exists (x ⇒ x) + } + + def run(): Unit = { + + @tailrec def doRun(idleRounds: Int): Unit = { + if (idleRounds > 250) fail("too many idle rounds") + if (!currentScript.completed) { + val nextIdle = if (shakeIt()) 0 else idleRounds + 1 + + val tieBreak = ThreadLocalRandom.current().nextBoolean() + if (mayProvideInput && (!mayRequestMore || tieBreak)) { + val (input, nextScript) = currentScript.provideInput + debugLog(s"test environment produces [${input}]") + pendingRequests -= 1 + currentScript = nextScript + upstreamSubscription.sendNext(input) + doRun(nextIdle) + } else if (mayRequestMore && (!mayProvideInput || !tieBreak)) { + request(getNextDemand()) + doRun(nextIdle) + } else { + if (currentScript.noInsPending && !completed) { + debugLog("test environment completes") + upstreamSubscription.sendComplete() + completed = true + } + doRun(nextIdle) + } + + } + } + + try { + debugLog(s"running $script") + request(getNextDemand()) + doRun(0) + } catch { + case e: Throwable ⇒ + println(_debugLog.mkString("Steps leading to failure:\n", "\n", "\nCurrentScript: " + currentScript.debug)) + throw e + } + + } + + } + + def runScript[In, Out](script: Script[In, Out], settings: MaterializerSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)( + op: ProcessorFlow[In, In] ⇒ ProcessorFlow[In, Out])(implicit system: ActorSystem): Unit = { + new ScriptRunner(op, settings, script, maximumOverrun, maximumRequest, maximumBuffer).run() + } + +}