From 100ab9b458543d19a13d92aa3e10fa01edd406ee Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sat, 29 Mar 2014 09:32:15 +0100 Subject: [PATCH] !str add Drop --- .../src/main/scala/akka/stream/Stream.scala | 1 + .../scala/akka/stream/impl/StreamImpl.scala | 2 ++ .../scala/akka/stream/StreamDropSpec.scala | 30 +++++++++++++++++++ .../akka/stream/testkit/ScriptedTest.scala | 2 ++ 4 files changed, 35 insertions(+) create mode 100644 akka-stream/src/test/scala/akka/stream/StreamDropSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala index e7a272740f..d58fb29af4 100644 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ b/akka-stream/src/main/scala/akka/stream/Stream.scala @@ -19,6 +19,7 @@ object Stream { trait Stream[T] { def map[U](f: T ⇒ U): Stream[U] def filter(p: T ⇒ Boolean): Stream[T] + def drop(n: Int): Stream[T] def grouped(n: Int): Stream[immutable.Seq[T]] def mapConcat[U](f: T ⇒ immutable.Seq[U]): Stream[U] def transform[S, U](zero: S)(f: (S, T) ⇒ (S, immutable.Seq[U])): Stream[U] diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala index 71e9656ab2..e6d466a37c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala @@ -33,6 +33,8 @@ private[akka] case class StreamImpl[I, O](producer: Producer[I], ops: List[Ast.A def filter(p: O ⇒ Boolean): Stream[O] = transform(())((_, in) ⇒ if (p(in)) ((), List(in)) else ((), Nil)) + def drop(n: Int): Stream[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> List(in) else (x - 1) -> Nil) + def grouped(n: Int): Stream[immutable.Seq[O]] = transform[immutable.Seq[O], immutable.Seq[O]](Nil, (x: immutable.Seq[O]) ⇒ List(x)) { (buf: immutable.Seq[O], in: O) ⇒ val group = buf :+ in diff --git a/akka-stream/src/test/scala/akka/stream/StreamDropSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamDropSpec.scala new file mode 100644 index 0000000000..e7e3a1f8e6 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamDropSpec.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.testkit.AkkaSpec +import akka.stream.testkit.ScriptedTest +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } + +class StreamDropSpec extends AkkaSpec with ScriptedTest { + + val genSettings = GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 16, + initialFanOutBufferSize = 1, + maxFanOutBufferSize = 16) + + "A Map" must { + + "map" in { + def script(d: Int) = Script((1 to 50) map { n ⇒ Seq(n) -> (if (n <= d) Nil else Seq(n)) }: _*) + (1 to 50) foreach { _ ⇒ + val d = Math.min(Math.max(random.nextInt(-10, 60), 0), 50) + runScript(script(d), genSettings)(_.drop(d)) + } + } + + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index 3d403243cd..4af5eeaf23 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -85,6 +85,7 @@ trait ScriptedTest extends ShouldMatchers { var _debugLog = Vector.empty[String] var currentScript = script var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(maximumOverrun) + remainingDemand = Math.max(1, remainingDemand) debugLog(s"starting with remainingDemand=$remainingDemand") var pendingRequests = 0 var outstandingDemand = 0 @@ -167,6 +168,7 @@ trait ScriptedTest extends ShouldMatchers { } try { + debugLog(s"running $script") requestMore(getNextDemand()) doRun(0) } catch {