Merge pull request #15146 from akka/wip-15106-flow-collect-patriknw

+str #15106 Add Flow collect operator
This commit is contained in:
Patrik Nordwall 2014-05-12 11:10:37 +02:00
commit c62bbcdb68
3 changed files with 39 additions and 0 deletions

View file

@ -53,6 +53,11 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops:
override def onNext(in: O) = if (p(in)) List(in) else Nil
})
override def collect[U](pf: PartialFunction[O, U]): Flow[U] =
transform(new Transformer[O, U] {
override def onNext(in: O) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil
})
override def foreach(c: O Unit): Flow[Unit] =
transform(new Transformer[O, Unit] {
override def onNext(in: O) = { c(in); Nil }

View file

@ -101,6 +101,13 @@ trait Flow[+T] {
*/
def filter(p: T Boolean): Flow[T]
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*/
def collect[U](pf: PartialFunction[T, U]): Flow[U]
/**
* Invoke the given procedure for each received element and produce a Unit value
* upon reaching the normal end of the stream. Please note that also in this case

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.{ StreamTestKit, ScriptedTest }
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
class FlowCollectSpec extends AkkaSpec with ScriptedTest {
val settings = MaterializerSettings()
"A Collect" must {
"collect" in {
val range = 1 to 50
def script = Script(range map { _
val x = random.nextInt(0, 10000)
Seq(x) -> (if ((x & 1) == 0) Seq((x * x).toString) else Seq.empty[String])
}: _*)
range foreach (_ runScript(script, settings)(_.collect { case x if x % 2 == 0 (x * x).toString }))
}
}
}