diff --git a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala index 5a6a37e1a0..2d4b678689 100644 --- a/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/http/HttpServerExampleSpec.scala @@ -22,7 +22,7 @@ class HttpServerExampleSpec implicit val materializer = FlowMaterializer() val serverBinding = Http(system).bind(interface = "localhost", port = 8080) - serverBinding.connections.foreach { connection => // foreach materializes the source + serverBinding.connections.runForeach { connection => // foreach materializes the source println("Accepted new connection from " + connection.remoteAddress) } //#bind-example @@ -52,7 +52,7 @@ class HttpServerExampleSpec case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!") } - serverBinding.connections foreach { connection => + serverBinding.connections runForeach { connection => println("Accepted new connection from " + connection.remoteAddress) connection handleWithSyncHandler requestHandler diff --git a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala index f5a602f57e..361be23ddb 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -307,7 +307,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) .map(elem => { println(s"before: $elem"); elem }) .mapAsync(service.convert) - .foreach(elem => println(s"after: $elem")) + .runForeach(elem => println(s"after: $elem")) //#sometimes-slow-mapAsync probe.expectMsg("after: A") @@ -339,7 +339,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) .map(elem => { println(s"before: $elem"); elem }) .mapAsyncUnordered(service.convert) - .foreach(elem => println(s"after: $elem")) + .runForeach(elem => println(s"after: $elem")) //#sometimes-slow-mapAsyncUnordered probe.receiveN(10).toSet should be(Set( diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala index dea4949424..f51739c8f9 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamTcpDocSpec.scala @@ -40,7 +40,7 @@ class StreamTcpDocSpec extends AkkaSpec { //#echo-server-simple-handle val connections: Source[IncomingConnection] = binding.connections - connections foreach { connection => + connections runForeach { connection => println(s"New connection from: ${connection.remoteAddress}") val echo = Flow[ByteString] @@ -77,7 +77,7 @@ class StreamTcpDocSpec extends AkkaSpec { val binding = StreamTcp().bind(localhost) //#welcome-banner-chat-server - binding.connections foreach { connection => + binding.connections runForeach { connection => val serverLogic = Flow() { implicit b => import FlowGraphImplicits._ diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 1e8f346c61..827abde491 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -92,7 +92,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { //#authors-foreachsink-println //#authors-foreach-println - authors.foreach(println) + authors.runForeach(println) //#authors-foreach-println } @@ -149,7 +149,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val completion: Future[Unit] = Source(1 to 10) .map(i => { println(s"map => $i"); i }) - .foreach { i => readLine(s"Element = $i; continue reading? [press enter]\n") } + .runForeach { i => readLine(s"Element = $i; continue reading? [press enter]\n") } Await.ready(completion, 1.minute) //#backpressure-by-readline diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala index 671c5c7625..30f654b6c7 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala @@ -23,7 +23,7 @@ class RecipeReduceByKey extends RecipeSpec { // add counting logic to the streams val countedWords: Source[Future[(String, Int)]] = wordStreams.map { case (word, wordStream) => - wordStream.fold((word, 0)) { + wordStream.runFold((word, 0)) { case ((w, count), _) => (w, count + 1) } } @@ -57,7 +57,7 @@ class RecipeReduceByKey extends RecipeSpec { val groupStreams = Flow[In].groupBy(groupKey) val reducedValues = groupStreams.map { case (key, groupStream) => - groupStream.fold((key, foldZero(key))) { + groupStream.runFold((key, foldZero(key))) { case ((key, aggregated), elem) => (key, fold(aggregated, elem)) } } diff --git a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst index 9f3d10f81a..c32b402d45 100644 --- a/akka-docs-dev/rst/scala/stream-flows-and-basics.rst +++ b/akka-docs-dev/rst/scala/stream-flows-and-basics.rst @@ -185,7 +185,7 @@ but is not restricted to that - it could also mean opening files or socket conne Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()`` and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with -well-known sinks, such as ``foreach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``. +well-known sinks, such as ``runForeach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``. Materialization is currently performed synchronously on the materializing thread. Tha actual stream processing is handled by :ref:`Actors actor-scala` started up during the streams materialization, diff --git a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala index 84d0641915..38451b72a2 100644 --- a/akka-http-core/src/main/scala/akka/http/model/Multipart.scala +++ b/akka-http-core/src/main/scala/akka/http/model/Multipart.scala @@ -58,7 +58,7 @@ object Multipart { private def strictify[BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](parts: Source[BP])(f: BP ⇒ Future[BPS])(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Vector[BPS]] = // TODO: move to Vector `:+` when https://issues.scala-lang.org/browse/SI-8930 is fixed - parts.fold(new VectorBuilder[Future[BPS]]) { + parts.runFold(new VectorBuilder[Future[BPS]]) { case (builder, part) ⇒ builder += f(part) }.fast.flatMap(builder ⇒ FastFuture.sequence(builder.result())) diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index c3c9b8e4cb..9f960e59f3 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -270,7 +270,7 @@ private[http] object StreamUtils { */ private[http] class EnhancedByteStringSource(val byteStringStream: Source[ByteString]) extends AnyVal { def join(implicit materializer: FlowMaterializer): Future[ByteString] = - byteStringStream.fold(ByteString.empty)(_ ++ _) + byteStringStream.runFold(ByteString.empty)(_ ++ _) def utf8String(implicit materializer: FlowMaterializer, ec: ExecutionContext): Future[String] = join.map(_.utf8String) } diff --git a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala index fed5fcbd1a..2ad0124efe 100644 --- a/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala +++ b/akka-http/src/main/scala/akka/http/unmarshalling/PredefinedFromEntityUnmarshallers.scala @@ -15,7 +15,7 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers { implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] = Unmarshaller { case HttpEntity.Strict(_, data) ⇒ FastFuture.successful(data) - case entity ⇒ entity.dataBytes.fold(ByteString.empty)(_ ++ _) + case entity ⇒ entity.dataBytes.runFold(ByteString.empty)(_ ++ _) } implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer, diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java index 9682d539b9..df4e100f54 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -43,8 +43,9 @@ public class ActorPublisherTest extends StreamTest { .actorOf(Props.create(TestPublisher.class).withDispatcher("akka.test.stream-dispatcher")); final Publisher publisher = UntypedActorPublisher.create(ref); Source.from(publisher) - .foreach(new akka.stream.javadsl.japi.Procedure(){ - @Override public void apply(Integer elem) throws Exception { + .runForeach(new akka.stream.javadsl.japi.Procedure() { + @Override + public void apply(Integer elem) throws Exception { probe.getRef().tell(elem, ActorRef.noSender()); } }, materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 3659f82762..b63c53ea3c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -65,7 +65,7 @@ public class FlowTest extends StreamTest { } }); - ints.via(flow1.via(flow2)).fold("", new Function2() { + ints.via(flow1.via(flow2)).runFold("", new Function2() { public String apply(String acc, String elem) { return acc + elem; } @@ -116,7 +116,7 @@ public class FlowTest extends StreamTest { }; } }); - Source.from(input).via(flow).foreach(new Procedure() { + Source.from(input).via(flow).runForeach(new Procedure() { public void apply(Integer elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -142,10 +142,10 @@ public class FlowTest extends StreamTest { return elem.substring(0, 1); } }); - Source.from(input).via(slsFlow).foreach(new Procedure>>() { + Source.from(input).via(slsFlow).runForeach(new Procedure>>() { @Override public void apply(final Pair> pair) throws Exception { - pair.second().foreach(new Procedure() { + pair.second().runForeach(new Procedure() { @Override public void apply(String elem) throws Exception { probe.getRef().tell(new Pair(pair.first(), elem), ActorRef.noSender()); @@ -179,7 +179,7 @@ public class FlowTest extends StreamTest { return elem.equals("."); } }); - Source.from(input).via(flow).foreach(new Procedure>() { + Source.from(input).via(flow).runForeach(new Procedure>() { @Override public void apply(Source subStream) throws Exception { subStream.filter(new Predicate() { @@ -187,7 +187,7 @@ public class FlowTest extends StreamTest { public boolean test(String elem) { return !elem.equals("."); } - }).grouped(10).foreach(new Procedure>() { + }).grouped(10).runForeach(new Procedure>() { @Override public void apply(List chunk) throws Exception { probe.getRef().tell(chunk, ActorRef.noSender()); @@ -342,7 +342,7 @@ public class FlowTest extends StreamTest { final Source in1 = Source.from(input1); final Source in2 = Source.from(input2); final Flow flow = Flow.of(String.class); - in1.via(flow.concat(in2)).foreach(new Procedure() { + in1.via(flow.concat(in2)).runForeach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -413,7 +413,7 @@ public class FlowTest extends StreamTest { return aggr + in; } }); - Future future = Source.from(input).via(flow).fold("", new Function2() { + Future future = Source.from(input).via(flow).runFold("", new Function2() { @Override public String apply(String aggr, String in) throws Exception { return aggr + in; @@ -454,7 +454,7 @@ public class FlowTest extends StreamTest { return Futures.successful(elem.toUpperCase()); } }); - Source.from(input).via(flow).foreach(new Procedure() { + Source.from(input).via(flow).runForeach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index c8677119ef..4f24a6d366 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -63,7 +63,7 @@ public class SourceTest extends StreamTest { public java.util.List apply(java.util.List elem) { return elem; } - }).fold("", new Function2() { + }).runFold("", new Function2() { public String apply(String acc, String elem) { return acc + elem; } @@ -82,7 +82,7 @@ public class SourceTest extends StreamTest { final java.lang.Iterable input = Arrays.asList("a", "b", "c"); Source ints = Source.from(input); - Future completion = ints.foreach(new Procedure() { + Future completion = ints.runForeach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -137,7 +137,7 @@ public class SourceTest extends StreamTest { }; } - }).foreach(new Procedure() { + }).runForeach(new Procedure() { public void apply(Integer elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -162,10 +162,10 @@ public class SourceTest extends StreamTest { public String apply(String elem) { return elem.substring(0, 1); } - }).foreach(new Procedure>>() { + }).runForeach(new Procedure>>() { @Override public void apply(final Pair> pair) throws Exception { - pair.second().foreach(new Procedure() { + pair.second().runForeach(new Procedure() { @Override public void apply(String elem) throws Exception { probe.getRef().tell(new Pair(pair.first(), elem), ActorRef.noSender()); @@ -198,7 +198,7 @@ public class SourceTest extends StreamTest { public boolean test(String elem) { return elem.equals("."); } - }).foreach(new Procedure>() { + }).runForeach(new Procedure>() { @Override public void apply(Source subStream) throws Exception { subStream.filter(new Predicate() { @@ -206,7 +206,7 @@ public class SourceTest extends StreamTest { public boolean test(String elem) { return !elem.equals("."); } - }).grouped(10).foreach(new Procedure>() { + }).grouped(10).runForeach(new Procedure>() { @Override public void apply(List chunk) throws Exception { probe.getRef().tell(chunk, ActorRef.noSender()); @@ -240,7 +240,7 @@ public class SourceTest extends StreamTest { final Source in1 = Source.from(input1); final Source in2 = Source.from(input2); - in1.concat(in2).foreach(new Procedure() { + in1.concat(in2).runForeach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -260,7 +260,7 @@ public class SourceTest extends StreamTest { return input1.iterator(); } }; - Source.from(input).foreach(new Procedure() { + Source.from(input).runForeach(new Procedure() { public void apply(Integer elem) { probe.getRef().tell(elem, ActorRef.noSender()); } @@ -375,7 +375,7 @@ public class SourceTest extends StreamTest { public String apply(String aggr, String in) throws Exception { return aggr + in; } - }).fold("", new Function2() { + }).runFold("", new Function2() { @Override public String apply(String aggr, String in) throws Exception { return aggr + in; @@ -431,7 +431,7 @@ public class SourceTest extends StreamTest { public Future apply(String elem) { return Futures.successful(elem.toUpperCase()); } - }).foreach(new Procedure() { + }).runForeach(new Procedure() { public void apply(String elem) { probe.getRef().tell(elem, ActorRef.noSender()); } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java index f2932186b5..292a04f14c 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/StreamTcpTest.java @@ -63,7 +63,7 @@ public class StreamTcpTest extends StreamTest { final Source responseStream = Source.from(testInput).via(StreamTcp.get(system).outgoingConnection(serverAddress).flow()); - final Future resultFuture = responseStream.fold( + final Future resultFuture = responseStream.runFold( ByteString.empty(), new Function2() { public ByteString apply(ByteString acc, ByteString elem) { return acc.concat(elem); diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala index 6bffa63ad5..42b5b56ecd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/StreamTcpSpec.scala @@ -56,7 +56,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val resultFuture = Source(idle.publisherProbe) .via(StreamTcp().outgoingConnection(server.address).flow) - .fold(ByteString.empty)((acc, in) ⇒ acc ++ in) + .runFold(ByteString.empty)((acc, in) ⇒ acc ++ in) val serverConnection = server.waitAccept() for (in ← testInput) { @@ -198,7 +198,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) val resultFuture = - Source(testInput).via(StreamTcp().outgoingConnection(serverAddress).flow).fold(ByteString.empty)((acc, in) ⇒ acc ++ in) + Source(testInput).via(StreamTcp().outgoingConnection(serverAddress).flow).runFold(ByteString.empty)((acc, in) ⇒ acc ++ in) Await.result(resultFuture, 3.seconds) should be(expectedOutput) Await.result(binding.unbind(echoServerMM), 3.seconds) @@ -226,7 +226,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper { .via(echoConnection) .via(echoConnection) .via(echoConnection) - .fold(ByteString.empty)((acc, in) ⇒ acc ++ in) + .runFold(ByteString.empty)((acc, in) ⇒ acc ++ in) Await.result(resultFuture, 5.seconds) should be(expectedOutput) Await.result(binding.unbind(echoServerMM), 3.seconds) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index 484be8ab1f..16c131ed69 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -58,7 +58,7 @@ class FlowConflateSpec extends AkkaSpec { val future = Source(1 to 1000) .conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } - .fold(0)(_ + _) + .runFold(0)(_ + _) Await.result(future, 10.seconds) should be(500500) } @@ -95,7 +95,7 @@ class FlowConflateSpec extends AkkaSpec { val future = Source(1 to 50) .conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) .buffer(50, OverflowStrategy.backpressure) - .fold(0)(_ + _) + .runFold(0)(_ + _) Await.result(future, 3.seconds) should be((1 to 50).sum) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index fb7dde14e9..bae944dd8c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -91,7 +91,7 @@ class FlowExpandSpec extends AkkaSpec { val future = Source(1 to 100) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } .expand(seed = i ⇒ i)(extrapolate = i ⇒ (i, i)) - .fold(Set.empty[Int])(_ + _) + .runFold(Set.empty[Int])(_ + _) Await.result(future, 10.seconds) should contain theSameElementsAs ((1 to 100).toSet) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala index f4699229a3..3054a249ce 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFoldSpec.scala @@ -17,14 +17,14 @@ class FlowFoldSpec extends AkkaSpec with DefaultTimeout { "fold" in { val input = 1 to 100 - val future = Source(input).fold(0)(_ + _) + val future = Source(input).runFold(0)(_ + _) val expected = input.fold(0)(_ + _) Await.result(future, timeout.duration) should be(expected) } "propagate an error" in { val error = new Exception with NoStackTrace - val future = Source[Unit](() ⇒ throw error).fold(())((_, _) ⇒ ()) + val future = Source[Unit](() ⇒ throw error).runFold(())((_, _) ⇒ ()) the[Exception] thrownBy Await.result(future, timeout.duration) should be(error) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala index 35654b5198..e92ff3fcc6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowForeachSpec.scala @@ -16,7 +16,7 @@ class FlowForeachSpec extends AkkaSpec { "A Foreach" must { "call the procedure for each element" in { - Source(1 to 3).foreach(testActor ! _) onSuccess { + Source(1 to 3).runForeach(testActor ! _) onSuccess { case _ ⇒ testActor ! "done" } expectMsg(1) @@ -26,7 +26,7 @@ class FlowForeachSpec extends AkkaSpec { } "complete the future for an empty stream" in { - Source.empty.foreach(testActor ! _) onSuccess { + Source.empty.runForeach(testActor ! _) onSuccess { case _ ⇒ testActor ! "done" } expectMsg("done") @@ -34,7 +34,7 @@ class FlowForeachSpec extends AkkaSpec { "yield the first error" in { val p = StreamTestKit.PublisherProbe[Int]() - Source(p).foreach(testActor ! _) onFailure { + Source(p).runForeach(testActor ! _) onFailure { case ex ⇒ testActor ! ex } val proc = p.expectSubscription diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala index 0429c61efc..6050b23025 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -23,7 +23,7 @@ class FlowScanSpec extends AkkaSpec { "A Scan" must { def scan(s: Source[Int], duration: Duration = 5.seconds): immutable.Seq[Int] = - Await.result(s.scan(0)(_ + _).fold(immutable.Seq.empty[Int])(_ :+ _), duration) + Await.result(s.scan(0)(_ + _).runFold(immutable.Seq.empty[Int])(_ :+ _), duration) "Scan" in { val v = Vector.fill(random.nextInt(100, 1000))(random.nextInt()) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ImplicitFlowMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ImplicitFlowMaterializerSpec.scala index 504c8b790d..2cb0adae1f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ImplicitFlowMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ImplicitFlowMaterializerSpec.scala @@ -21,7 +21,7 @@ object ImplicitFlowMaterializerSpec { case "run" ⇒ // run takes an implicit FlowMaterializer parameter, which is provided by ImplicitFlowMaterializer import context.dispatcher - flow.fold("")(_ + _) pipeTo sender() + flow.runFold("")(_ + _) pipeTo sender() } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala index 12660a6343..6068654501 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala @@ -28,7 +28,7 @@ class OptimizingActorBasedFlowMaterializerSpec extends AkkaSpec with ImplicitSen take(20). take(10). drop(5). - fold(0)(_ + _) + runFold(0)(_ + _) val expected = (1 to 100). drop(9). @@ -44,7 +44,7 @@ class OptimizingActorBasedFlowMaterializerSpec extends AkkaSpec with ImplicitSen "optimize map + map" in { implicit val mat = FlowMaterializer().asInstanceOf[ActorBasedFlowMaterializer].copy(optimizations = Optimizations.all) - val fl = Source(1 to 100).map(_ + 2).map(_ * 2).fold(0)(_ + _) + val fl = Source(1 to 100).map(_ + 2).map(_ * 2).runFold(0)(_ + _) val expected = (1 to 100).map(_ + 2).map(_ * 2).fold(0)(_ + _) Await.result(fl, 5.seconds) should be(expected) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala index 957848316e..f954d6dec6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/PublisherSinkSpec.scala @@ -29,7 +29,7 @@ class PublisherSinkSpec extends AkkaSpec { }.run() Seq(p1, p2) map { sink ⇒ - Source(m.get(sink)).map(identity).fold(0)(_ + _) + Source(m.get(sink)).map(identity).runFold(0)(_ + _) } zip Seq(30, 15) foreach { case (future, result) ⇒ whenReady(future)(_ shouldBe result) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 2171909475..4176956e64 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -205,7 +205,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ - def fold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] = + def runFold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] = runWith(Sink.fold(zero, f), materializer) /** @@ -223,7 +223,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { * normal end of the stream, or completed with `Failure` if there is an error is signaled in * the stream. */ - def foreach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] = + def runForeach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] = runWith(Sink.foreach(f), materializer) // COMMON OPS // diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 39ea49e74a..99112e3518 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -45,7 +45,7 @@ trait Source[+Out] extends FlowOps[Out] with Materializable { * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ - def fold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: FlowMaterializer): Future[U] = runWith(FoldSink(zero)(f)) // FIXME why is fold always an end step? + def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: FlowMaterializer): Future[U] = runWith(FoldSink(zero)(f)) // FIXME why is fold always an end step? /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked @@ -54,7 +54,7 @@ trait Source[+Out] extends FlowOps[Out] with Materializable { * normal end of the stream, or completed with `Failure` if there is an error is signaled in * the stream. */ - def foreach(f: Out ⇒ Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachSink(f)) + def runForeach(f: Out ⇒ Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachSink(f)) /** * Concatenates a second source so that the first element