From 9b873b728b2e02139e688bbc3b8ea48d6c46cdd8 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 8 May 2014 09:32:38 +0200 Subject: [PATCH] +str #15106 Add Flow collect operator --- .../scala/akka/stream/impl/FlowImpl.scala | 5 ++++ .../scala/akka/stream/scaladsl/Flow.scala | 7 +++++ .../scala/akka/stream/FlowCollectSpec.scala | 27 +++++++++++++++++++ 3 files changed, 39 insertions(+) create mode 100644 akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 5d4e3d0a05..f2ed53b37b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -52,6 +52,11 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: override def onNext(in: O) = if (p(in)) List(in) else Nil }) + override def collect[U](pf: PartialFunction[O, U]): Flow[U] = + transform(new Transformer[O, U] { + override def onNext(in: O) = if (pf.isDefinedAt(in)) List(pf(in)) else Nil + }) + override def foreach(c: O ⇒ Unit): Flow[Unit] = transform(new Transformer[O, Unit] { override def onNext(in: O) = { c(in); Nil } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c2e4b8688c..76171da5ce 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -92,6 +92,13 @@ trait Flow[+T] { */ def filter(p: T ⇒ Boolean): Flow[T] + /** + * Transform this stream by applying the given partial function to each of the elements + * on which the function is defined as they pass through this processing step. + * Non-matching elements are filtered out. + */ + def collect[U](pf: PartialFunction[T, U]): Flow[U] + /** * Invoke the given procedure for each received element and produce a Unit value * upon reaching the normal end of the stream. Please note that also in this case diff --git a/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala new file mode 100644 index 0000000000..54a0d253eb --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowCollectSpec.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.{ StreamTestKit, ScriptedTest } +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } + +class FlowCollectSpec extends AkkaSpec with ScriptedTest { + + val settings = MaterializerSettings() + + "A Collect" must { + + "collect" in { + val range = 1 to 50 + def script = Script(range map { _ ⇒ + val x = random.nextInt(0, 10000) + Seq(x) -> (if ((x & 1) == 0) Seq((x * x).toString) else Seq.empty[String]) + }: _*) + range foreach (_ ⇒ runScript(script, settings)(_.collect { case x if x % 2 == 0 ⇒ (x * x).toString })) + } + + } + +} \ No newline at end of file