diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 2c4847bdd5..b40e9c356a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -53,6 +53,11 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: override def onNext(in: O) = if (p(in)) List(in) else Nil }) + override def collect[U](pf: PartialFunction[O, U]): Flow[U] = + transform(new Transformer[O, U] { + override def onNext(in: O) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil + }) + override def foreach(c: O ⇒ Unit): Flow[Unit] = transform(new Transformer[O, Unit] { override def onNext(in: O) = { c(in); Nil } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 4ad4339e54..3c56e463e2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -101,6 +101,13 @@ trait Flow[+T] { */ def filter(p: T ⇒ Boolean): Flow[T] + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step. + * Non-matching elements are filtered out. + */ + def collect[U](pf: PartialFunction[T, U]): Flow[U] + /** * Invoke the given procedure for each received element and produce a Unit value * upon reaching the normal end of the stream. Please note that also in this case diff --git a/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala new file mode 100644 index 0000000000..54a0d253eb --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.{ StreamTestKit, ScriptedTest } +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } + +class FlowCollectSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings() + + "A Collect" must { + + "collect" in { + val range = 1 to 50 + def script = Script(range map { _ ⇒ + val x = random.nextInt(0, 10000) + Seq(x) -> (if ((x & 1) == 0) Seq((x * x).toString) else Seq.empty[String]) + }: _*) + range foreach (_ ⇒ runScript(script, settings)(_.collect { case x if x % 2 == 0 ⇒ (x * x).toString })) + } + + } + +} \ No newline at end of file