From 18868842d37a5c793fd6753d8079c5644ecbb582 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 6 Oct 2014 14:46:52 +0200 Subject: [PATCH] +str #15950 Add fold and foreach shortcuts --- .../stream/scaladsl2/FlowConflateSpec.scala | 3 +-- .../stream/scaladsl2/FlowExpandSpec.scala | 2 +- .../akka/stream/scaladsl2/FlowFoldSpec.scala | 4 ++-- .../stream/scaladsl2/FlowForeachSpec.scala | 8 +++---- .../ImplicitFlowMaterializerSpec.scala | 2 +- .../scala/akka/stream/scaladsl2/Source.scala | 21 +++++++++++++++++++ 6 files changed, 29 insertions(+), 11 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala index bfa487b7e8..ca86176ba1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowConflateSpec.scala @@ -56,11 +56,10 @@ class FlowConflateSpec extends AkkaSpec { } "work on a variable rate chain" in { - val foldDrain = FoldDrain[Int, Int](0)(_ + _) val future = Source((1 to 1000).iterator) .conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } - .runWith(FoldDrain[Int, Int](0)(_ + _)) + .fold(0)(_ + _) Await.result(future, 10.seconds) should be(500500) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala index 3624caede3..8635e5819c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowExpandSpec.scala @@ -67,7 +67,7 @@ class FlowExpandSpec extends AkkaSpec { val future = Source((1 to 100).iterator) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } .expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)) - .runWith(FoldDrain[Set[Int], Int](Set.empty[Int])(_ + _)) + .fold(Set.empty[Int])(_ + _) Await.result(future, 10.seconds) should be(Set.empty[Int] ++ (1 to 100)) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala index d9e64360d4..70df9730d7 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowFoldSpec.scala @@ -15,14 +15,14 @@ class FlowFoldSpec extends AkkaSpec with DefaultTimeout { "fold" in { val input = 1 to 100 - val future = Source(input).runWith(FoldDrain[Int, Int](0)(_ + _)) + val future = Source(input).fold(0)(_ + _) val expected = input.fold(0)(_ + _) Await.result(future, timeout.duration) should be(expected) } "propagate an error" in { val error = new Exception with NoStackTrace - val future = Source[Unit](() ⇒ throw error).runWith(FoldDrain[Unit, Unit](())((_, _) ⇒ ())) + val future = Source[Unit](() ⇒ throw error).fold(())((_, _) ⇒ ()) the[Exception] thrownBy Await.result(future, timeout.duration) should be(error) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala index 9c899831c7..96fb3e8d43 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala @@ -16,7 +16,7 @@ class FlowForeachSpec extends AkkaSpec { "A Foreach" must { "call the procedure for each element" in { - Source(1 to 3).runWith(ForeachDrain[Int](testActor ! _)) onSuccess { + Source(1 to 3).foreach(testActor ! _) onSuccess { case _ ⇒ testActor ! "done" } expectMsg(1) @@ -26,8 +26,7 @@ class FlowForeachSpec extends AkkaSpec { } "complete the future for an empty stream" in { - val foreachDrain = ForeachDrain[Int](testActor ! _) - val mf = Source(Nil).runWith(ForeachDrain[Int](testActor ! _)) onSuccess { + val mf = Source(Nil).foreach(testActor ! _) onSuccess { case _ ⇒ testActor ! "done" } expectMsg("done") @@ -35,8 +34,7 @@ class FlowForeachSpec extends AkkaSpec { "yield the first error" in { val p = StreamTestKit.PublisherProbe[Int]() - val foreachDrain = ForeachDrain[Int](testActor ! _) - val mf = Source(p).runWith(ForeachDrain[Int](testActor ! _)) onFailure { + val mf = Source(p).foreach(testActor ! _) onFailure { case ex ⇒ testActor ! ex } val proc = p.expectSubscription diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/ImplicitFlowMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/ImplicitFlowMaterializerSpec.scala index f78478c8e6..c1014097ae 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/ImplicitFlowMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/ImplicitFlowMaterializerSpec.scala @@ -23,7 +23,7 @@ object ImplicitFlowMaterializerSpec { // run takes an implicit FlowMaterializer parameter, which is provided by ImplicitFlowMaterializer import context.dispatcher val foldDrain = FoldDrain[String, String]("")(_ + _) - flow.runWith(foldDrain) pipeTo sender() + flow.fold("")(_ + _) pipeTo sender() } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index 4b3223c681..df072ef750 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -36,6 +36,27 @@ trait Source[+Out] extends FlowOps[Out] { */ def runWith(drain: DrainWithKey[Out])(implicit materializer: FlowMaterializer): drain.MaterializedType + /** + * Shortcut for running this `Source` with a fold function. + * The given function is invoked for every received element, giving it its previous + * output (or the given `zero` value) and the element as input. + * The returned [[scala.concurrent.Future]] will be completed with value of the final + * function evaluation when the input stream ends, or completed with `Failure` + * if there is an error is signaled in the stream. + */ + def fold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: FlowMaterializer): Future[U] = + runWith(FoldDrain(zero)(f)) + + /** + * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked + * for each received element. + * The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the + * normal end of the stream, or completed with `Failure` if there is an error is signaled in + * the stream. + */ + def foreach(f: Out ⇒ Unit)(implicit materializer: FlowMaterializer): Future[Unit] = + runWith(ForeachDrain(f)) + } object Source {