!str add Drop

This commit is contained in:
Roland Kuhn 2014-03-29 09:32:15 +01:00
parent b2ddd12e28
commit 100ab9b458
4 changed files with 35 additions and 0 deletions

View file

@ -19,6 +19,7 @@ object Stream {
trait Stream[T] {
def map[U](f: T U): Stream[U]
def filter(p: T Boolean): Stream[T]
def drop(n: Int): Stream[T]
def grouped(n: Int): Stream[immutable.Seq[T]]
def mapConcat[U](f: T immutable.Seq[U]): Stream[U]
def transform[S, U](zero: S)(f: (S, T) (S, immutable.Seq[U])): Stream[U]

View file

@ -33,6 +33,8 @@ private[akka] case class StreamImpl[I, O](producer: Producer[I], ops: List[Ast.A
def filter(p: O Boolean): Stream[O] = transform(())((_, in) if (p(in)) ((), List(in)) else ((), Nil))
def drop(n: Int): Stream[O] = transform(n)((x, in) if (x == 0) 0 -> List(in) else (x - 1) -> Nil)
def grouped(n: Int): Stream[immutable.Seq[O]] =
transform[immutable.Seq[O], immutable.Seq[O]](Nil, (x: immutable.Seq[O]) List(x)) { (buf: immutable.Seq[O], in: O)
val group = buf :+ in

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.testkit.AkkaSpec
import akka.stream.testkit.ScriptedTest
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
class StreamDropSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
"A Map" must {
"map" in {
def script(d: Int) = Script((1 to 50) map { n Seq(n) -> (if (n <= d) Nil else Seq(n)) }: _*)
(1 to 50) foreach { _
val d = Math.min(Math.max(random.nextInt(-10, 60), 0), 50)
runScript(script(d), genSettings)(_.drop(d))
}
}
}
}

View file

@ -85,6 +85,7 @@ trait ScriptedTest extends ShouldMatchers {
var _debugLog = Vector.empty[String]
var currentScript = script
var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(maximumOverrun)
remainingDemand = Math.max(1, remainingDemand)
debugLog(s"starting with remainingDemand=$remainingDemand")
var pendingRequests = 0
var outstandingDemand = 0
@ -167,6 +168,7 @@ trait ScriptedTest extends ShouldMatchers {
}
try {
debugLog(s"running $script")
requestMore(getNextDemand())
doRun(0)
} catch {