!str #16563 terminal operations start with run*; runFold runForeach

This commit is contained in:
Konrad Malawski 2015-01-26 14:57:05 +01:00
parent af4555ce1f
commit 39038597b2
24 changed files with 58 additions and 57 deletions

View file

@ -22,7 +22,7 @@ class HttpServerExampleSpec
implicit val materializer = FlowMaterializer()
val serverBinding = Http(system).bind(interface = "localhost", port = 8080)
serverBinding.connections.foreach { connection => // foreach materializes the source
serverBinding.connections.runForeach { connection => // foreach materializes the source
println("Accepted new connection from " + connection.remoteAddress)
}
//#bind-example
@ -52,7 +52,7 @@ class HttpServerExampleSpec
case _: HttpRequest => HttpResponse(404, entity = "Unknown resource!")
}
serverBinding.connections foreach { connection =>
serverBinding.connections runForeach { connection =>
println("Accepted new connection from " + connection.remoteAddress)
connection handleWithSyncHandler requestHandler

View file

@ -307,7 +307,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsync(service.convert)
.foreach(elem => println(s"after: $elem"))
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsync
probe.expectMsg("after: A")
@ -339,7 +339,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsyncUnordered(service.convert)
.foreach(elem => println(s"after: $elem"))
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsyncUnordered
probe.receiveN(10).toSet should be(Set(

View file

@ -40,7 +40,7 @@ class StreamTcpDocSpec extends AkkaSpec {
//#echo-server-simple-handle
val connections: Source[IncomingConnection] = binding.connections
connections foreach { connection =>
connections runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
@ -77,7 +77,7 @@ class StreamTcpDocSpec extends AkkaSpec {
val binding = StreamTcp().bind(localhost)
//#welcome-banner-chat-server
binding.connections foreach { connection =>
binding.connections runForeach { connection =>
val serverLogic = Flow() { implicit b =>
import FlowGraphImplicits._

View file

@ -92,7 +92,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#authors-foreachsink-println
//#authors-foreach-println
authors.foreach(println)
authors.runForeach(println)
//#authors-foreach-println
}
@ -149,7 +149,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val completion: Future[Unit] =
Source(1 to 10)
.map(i => { println(s"map => $i"); i })
.foreach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
.runForeach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
Await.ready(completion, 1.minute)
//#backpressure-by-readline

View file

@ -23,7 +23,7 @@ class RecipeReduceByKey extends RecipeSpec {
// add counting logic to the streams
val countedWords: Source[Future[(String, Int)]] = wordStreams.map {
case (word, wordStream) =>
wordStream.fold((word, 0)) {
wordStream.runFold((word, 0)) {
case ((w, count), _) => (w, count + 1)
}
}
@ -57,7 +57,7 @@ class RecipeReduceByKey extends RecipeSpec {
val groupStreams = Flow[In].groupBy(groupKey)
val reducedValues = groupStreams.map {
case (key, groupStream) =>
groupStream.fold((key, foldZero(key))) {
groupStream.runFold((key, foldZero(key))) {
case ((key, aggregated), elem) => (key, fold(aggregated, elem))
}
}

View file

@ -185,7 +185,7 @@ but is not restricted to that - it could also mean opening files or socket conne
Materialization is triggered at so called "terminal operations". Most notably this includes the various forms of the ``run()``
and ``runWith()`` methods defined on flow elements as well as a small number of special syntactic sugars for running with
well-known sinks, such as ``foreach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``.
well-known sinks, such as ``runForeach(el => )`` (being an alias to ``runWith(Sink.foreach(el => ))``.
Materialization is currently performed synchronously on the materializing thread.
Tha actual stream processing is handled by :ref:`Actors actor-scala` started up during the streams materialization,

View file

@ -58,7 +58,7 @@ object Multipart {
private def strictify[BP <: Multipart.BodyPart, BPS <: Multipart.BodyPart.Strict](parts: Source[BP])(f: BP Future[BPS])(implicit ec: ExecutionContext, fm: FlowMaterializer): Future[Vector[BPS]] =
// TODO: move to Vector `:+` when https://issues.scala-lang.org/browse/SI-8930 is fixed
parts.fold(new VectorBuilder[Future[BPS]]) {
parts.runFold(new VectorBuilder[Future[BPS]]) {
case (builder, part) builder += f(part)
}.fast.flatMap(builder FastFuture.sequence(builder.result()))

View file

@ -270,7 +270,7 @@ private[http] object StreamUtils {
*/
private[http] class EnhancedByteStringSource(val byteStringStream: Source[ByteString]) extends AnyVal {
def join(implicit materializer: FlowMaterializer): Future[ByteString] =
byteStringStream.fold(ByteString.empty)(_ ++ _)
byteStringStream.runFold(ByteString.empty)(_ ++ _)
def utf8String(implicit materializer: FlowMaterializer, ec: ExecutionContext): Future[String] =
join.map(_.utf8String)
}

View file

@ -15,7 +15,7 @@ trait PredefinedFromEntityUnmarshallers extends MultipartUnmarshallers {
implicit def byteStringUnmarshaller(implicit fm: FlowMaterializer): FromEntityUnmarshaller[ByteString] =
Unmarshaller {
case HttpEntity.Strict(_, data) FastFuture.successful(data)
case entity entity.dataBytes.fold(ByteString.empty)(_ ++ _)
case entity entity.dataBytes.runFold(ByteString.empty)(_ ++ _)
}
implicit def byteArrayUnmarshaller(implicit fm: FlowMaterializer,

View file

@ -43,8 +43,9 @@ public class ActorPublisherTest extends StreamTest {
.actorOf(Props.create(TestPublisher.class).withDispatcher("akka.test.stream-dispatcher"));
final Publisher<Integer> publisher = UntypedActorPublisher.create(ref);
Source.from(publisher)
.foreach(new akka.stream.javadsl.japi.Procedure<Integer>(){
@Override public void apply(Integer elem) throws Exception {
.runForeach(new akka.stream.javadsl.japi.Procedure<Integer>() {
@Override
public void apply(Integer elem) throws Exception {
probe.getRef().tell(elem, ActorRef.noSender());
}
}, materializer);

View file

@ -65,7 +65,7 @@ public class FlowTest extends StreamTest {
}
});
ints.via(flow1.via(flow2)).fold("", new Function2<String, String, String>() {
ints.via(flow1.via(flow2)).runFold("", new Function2<String, String, String>() {
public String apply(String acc, String elem) {
return acc + elem;
}
@ -116,7 +116,7 @@ public class FlowTest extends StreamTest {
};
}
});
Source.from(input).via(flow).foreach(new Procedure<Integer>() {
Source.from(input).via(flow).runForeach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
@ -142,10 +142,10 @@ public class FlowTest extends StreamTest {
return elem.substring(0, 1);
}
});
Source.from(input).via(slsFlow).foreach(new Procedure<Pair<String, Source<String>>>() {
Source.from(input).via(slsFlow).runForeach(new Procedure<Pair<String, Source<String>>>() {
@Override
public void apply(final Pair<String, Source<String>> pair) throws Exception {
pair.second().foreach(new Procedure<String>() {
pair.second().runForeach(new Procedure<String>() {
@Override
public void apply(String elem) throws Exception {
probe.getRef().tell(new Pair<String, String>(pair.first(), elem), ActorRef.noSender());
@ -179,7 +179,7 @@ public class FlowTest extends StreamTest {
return elem.equals(".");
}
});
Source.from(input).via(flow).foreach(new Procedure<Source<String>>() {
Source.from(input).via(flow).runForeach(new Procedure<Source<String>>() {
@Override
public void apply(Source<String> subStream) throws Exception {
subStream.filter(new Predicate<String>() {
@ -187,7 +187,7 @@ public class FlowTest extends StreamTest {
public boolean test(String elem) {
return !elem.equals(".");
}
}).grouped(10).foreach(new Procedure<List<String>>() {
}).grouped(10).runForeach(new Procedure<List<String>>() {
@Override
public void apply(List<String> chunk) throws Exception {
probe.getRef().tell(chunk, ActorRef.noSender());
@ -342,7 +342,7 @@ public class FlowTest extends StreamTest {
final Source<String> in1 = Source.from(input1);
final Source<String> in2 = Source.from(input2);
final Flow<String, String> flow = Flow.of(String.class);
in1.via(flow.concat(in2)).foreach(new Procedure<String>() {
in1.via(flow.concat(in2)).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
@ -413,7 +413,7 @@ public class FlowTest extends StreamTest {
return aggr + in;
}
});
Future <String> future = Source.from(input).via(flow).fold("", new Function2<String, String, String>() {
Future <String> future = Source.from(input).via(flow).runFold("", new Function2<String, String, String>() {
@Override
public String apply(String aggr, String in) throws Exception {
return aggr + in;
@ -454,7 +454,7 @@ public class FlowTest extends StreamTest {
return Futures.successful(elem.toUpperCase());
}
});
Source.from(input).via(flow).foreach(new Procedure<String>() {
Source.from(input).via(flow).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}

View file

@ -63,7 +63,7 @@ public class SourceTest extends StreamTest {
public java.util.List<String> apply(java.util.List<String> elem) {
return elem;
}
}).fold("", new Function2<String, String, String>() {
}).runFold("", new Function2<String, String, String>() {
public String apply(String acc, String elem) {
return acc + elem;
}
@ -82,7 +82,7 @@ public class SourceTest extends StreamTest {
final java.lang.Iterable<String> input = Arrays.asList("a", "b", "c");
Source<String> ints = Source.from(input);
Future<BoxedUnit> completion = ints.foreach(new Procedure<String>() {
Future<BoxedUnit> completion = ints.runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
@ -137,7 +137,7 @@ public class SourceTest extends StreamTest {
};
}
}).foreach(new Procedure<Integer>() {
}).runForeach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
@ -162,10 +162,10 @@ public class SourceTest extends StreamTest {
public String apply(String elem) {
return elem.substring(0, 1);
}
}).foreach(new Procedure<Pair<String, Source<String>>>() {
}).runForeach(new Procedure<Pair<String, Source<String>>>() {
@Override
public void apply(final Pair<String, Source<String>> pair) throws Exception {
pair.second().foreach(new Procedure<String>() {
pair.second().runForeach(new Procedure<String>() {
@Override
public void apply(String elem) throws Exception {
probe.getRef().tell(new Pair<String, String>(pair.first(), elem), ActorRef.noSender());
@ -198,7 +198,7 @@ public class SourceTest extends StreamTest {
public boolean test(String elem) {
return elem.equals(".");
}
}).foreach(new Procedure<Source<String>>() {
}).runForeach(new Procedure<Source<String>>() {
@Override
public void apply(Source<String> subStream) throws Exception {
subStream.filter(new Predicate<String>() {
@ -206,7 +206,7 @@ public class SourceTest extends StreamTest {
public boolean test(String elem) {
return !elem.equals(".");
}
}).grouped(10).foreach(new Procedure<List<String>>() {
}).grouped(10).runForeach(new Procedure<List<String>>() {
@Override
public void apply(List<String> chunk) throws Exception {
probe.getRef().tell(chunk, ActorRef.noSender());
@ -240,7 +240,7 @@ public class SourceTest extends StreamTest {
final Source<String> in1 = Source.from(input1);
final Source<String> in2 = Source.from(input2);
in1.concat(in2).foreach(new Procedure<String>() {
in1.concat(in2).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
@ -260,7 +260,7 @@ public class SourceTest extends StreamTest {
return input1.iterator();
}
};
Source.from(input).foreach(new Procedure<Integer>() {
Source.from(input).runForeach(new Procedure<Integer>() {
public void apply(Integer elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
@ -375,7 +375,7 @@ public class SourceTest extends StreamTest {
public String apply(String aggr, String in) throws Exception {
return aggr + in;
}
}).fold("", new Function2<String, String, String>() {
}).runFold("", new Function2<String, String, String>() {
@Override
public String apply(String aggr, String in) throws Exception {
return aggr + in;
@ -438,7 +438,7 @@ public class SourceTest extends StreamTest {
public Future<String> apply(String elem) {
return Futures.successful(elem.toUpperCase());
}
}).foreach(new Procedure<String>() {
}).runForeach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}

View file

@ -63,7 +63,7 @@ public class StreamTcpTest extends StreamTest {
final Source<ByteString> responseStream =
Source.from(testInput).via(StreamTcp.get(system).outgoingConnection(serverAddress).flow());
final Future<ByteString> resultFuture = responseStream.fold(
final Future<ByteString> resultFuture = responseStream.runFold(
ByteString.empty(), new Function2<ByteString, ByteString, ByteString>() {
public ByteString apply(ByteString acc, ByteString elem) {
return acc.concat(elem);

View file

@ -56,7 +56,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper {
val resultFuture =
Source(idle.publisherProbe)
.via(StreamTcp().outgoingConnection(server.address).flow)
.fold(ByteString.empty)((acc, in) acc ++ in)
.runFold(ByteString.empty)((acc, in) acc ++ in)
val serverConnection = server.waitAccept()
for (in testInput) {
@ -198,7 +198,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper {
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
val resultFuture =
Source(testInput).via(StreamTcp().outgoingConnection(serverAddress).flow).fold(ByteString.empty)((acc, in) acc ++ in)
Source(testInput).via(StreamTcp().outgoingConnection(serverAddress).flow).runFold(ByteString.empty)((acc, in) acc ++ in)
Await.result(resultFuture, 3.seconds) should be(expectedOutput)
Await.result(binding.unbind(echoServerMM), 3.seconds)
@ -226,7 +226,7 @@ class StreamTcpSpec extends AkkaSpec with TcpHelper {
.via(echoConnection)
.via(echoConnection)
.via(echoConnection)
.fold(ByteString.empty)((acc, in) acc ++ in)
.runFold(ByteString.empty)((acc, in) acc ++ in)
Await.result(resultFuture, 5.seconds) should be(expectedOutput)
Await.result(binding.unbind(echoServerMM), 3.seconds)

View file

@ -58,7 +58,7 @@ class FlowConflateSpec extends AkkaSpec {
val future = Source(1 to 1000)
.conflate(seed = i i)(aggregate = (sum, i) sum + i)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.fold(0)(_ + _)
.runFold(0)(_ + _)
Await.result(future, 10.seconds) should be(500500)
}
@ -95,7 +95,7 @@ class FlowConflateSpec extends AkkaSpec {
val future = Source(1 to 50)
.conflate(seed = i i)(aggregate = (sum, i) sum + i)
.buffer(50, OverflowStrategy.backpressure)
.fold(0)(_ + _)
.runFold(0)(_ + _)
Await.result(future, 3.seconds) should be((1 to 50).sum)
}

View file

@ -91,7 +91,7 @@ class FlowExpandSpec extends AkkaSpec {
val future = Source(1 to 100)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.expand(seed = i i)(extrapolate = i (i, i))
.fold(Set.empty[Int])(_ + _)
.runFold(Set.empty[Int])(_ + _)
Await.result(future, 10.seconds) should contain theSameElementsAs ((1 to 100).toSet)
}

View file

@ -17,14 +17,14 @@ class FlowFoldSpec extends AkkaSpec with DefaultTimeout {
"fold" in {
val input = 1 to 100
val future = Source(input).fold(0)(_ + _)
val future = Source(input).runFold(0)(_ + _)
val expected = input.fold(0)(_ + _)
Await.result(future, timeout.duration) should be(expected)
}
"propagate an error" in {
val error = new Exception with NoStackTrace
val future = Source[Unit](() throw error).fold(())((_, _) ())
val future = Source[Unit](() throw error).runFold(())((_, _) ())
the[Exception] thrownBy Await.result(future, timeout.duration) should be(error)
}

View file

@ -16,7 +16,7 @@ class FlowForeachSpec extends AkkaSpec {
"A Foreach" must {
"call the procedure for each element" in {
Source(1 to 3).foreach(testActor ! _) onSuccess {
Source(1 to 3).runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg(1)
@ -26,7 +26,7 @@ class FlowForeachSpec extends AkkaSpec {
}
"complete the future for an empty stream" in {
Source.empty.foreach(testActor ! _) onSuccess {
Source.empty.runForeach(testActor ! _) onSuccess {
case _ testActor ! "done"
}
expectMsg("done")
@ -34,7 +34,7 @@ class FlowForeachSpec extends AkkaSpec {
"yield the first error" in {
val p = StreamTestKit.PublisherProbe[Int]()
Source(p).foreach(testActor ! _) onFailure {
Source(p).runForeach(testActor ! _) onFailure {
case ex testActor ! ex
}
val proc = p.expectSubscription

View file

@ -23,7 +23,7 @@ class FlowScanSpec extends AkkaSpec {
"A Scan" must {
def scan(s: Source[Int], duration: Duration = 5.seconds): immutable.Seq[Int] =
Await.result(s.scan(0)(_ + _).fold(immutable.Seq.empty[Int])(_ :+ _), duration)
Await.result(s.scan(0)(_ + _).runFold(immutable.Seq.empty[Int])(_ :+ _), duration)
"Scan" in {
val v = Vector.fill(random.nextInt(100, 1000))(random.nextInt())

View file

@ -21,7 +21,7 @@ object ImplicitFlowMaterializerSpec {
case "run"
// run takes an implicit FlowMaterializer parameter, which is provided by ImplicitFlowMaterializer
import context.dispatcher
flow.fold("")(_ + _) pipeTo sender()
flow.runFold("")(_ + _) pipeTo sender()
}
}
}

View file

@ -28,7 +28,7 @@ class OptimizingActorBasedFlowMaterializerSpec extends AkkaSpec with ImplicitSen
take(20).
take(10).
drop(5).
fold(0)(_ + _)
runFold(0)(_ + _)
val expected = (1 to 100).
drop(9).
@ -44,7 +44,7 @@ class OptimizingActorBasedFlowMaterializerSpec extends AkkaSpec with ImplicitSen
"optimize map + map" in {
implicit val mat = FlowMaterializer().asInstanceOf[ActorBasedFlowMaterializer].copy(optimizations = Optimizations.all)
val fl = Source(1 to 100).map(_ + 2).map(_ * 2).fold(0)(_ + _)
val fl = Source(1 to 100).map(_ + 2).map(_ * 2).runFold(0)(_ + _)
val expected = (1 to 100).map(_ + 2).map(_ * 2).fold(0)(_ + _)
Await.result(fl, 5.seconds) should be(expected)

View file

@ -29,7 +29,7 @@ class PublisherSinkSpec extends AkkaSpec {
}.run()
Seq(p1, p2) map { sink
Source(m.get(sink)).map(identity).fold(0)(_ + _)
Source(m.get(sink)).map(identity).runFold(0)(_ + _)
} zip Seq(30, 15) foreach {
case (future, result) whenReady(future)(_ shouldBe result)
}

View file

@ -205,7 +205,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
*/
def fold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] =
def runFold[U](zero: U, f: japi.Function2[U, Out, U], materializer: FlowMaterializer): Future[U] =
runWith(Sink.fold(zero, f), materializer)
/**
@ -223,7 +223,7 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
* normal end of the stream, or completed with `Failure` if there is an error is signaled in
* the stream.
*/
def foreach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] =
def runForeach(f: japi.Procedure[Out], materializer: FlowMaterializer): Future[Unit] =
runWith(Sink.foreach(f), materializer)
// COMMON OPS //

View file

@ -45,7 +45,7 @@ trait Source[+Out] extends FlowOps[Out] with Materializable {
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
*/
def fold[U](zero: U)(f: (U, Out) U)(implicit materializer: FlowMaterializer): Future[U] = runWith(FoldSink(zero)(f)) // FIXME why is fold always an end step?
def runFold[U](zero: U)(f: (U, Out) U)(implicit materializer: FlowMaterializer): Future[U] = runWith(FoldSink(zero)(f)) // FIXME why is fold always an end step?
/**
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
@ -54,7 +54,7 @@ trait Source[+Out] extends FlowOps[Out] with Materializable {
* normal end of the stream, or completed with `Failure` if there is an error is signaled in
* the stream.
*/
def foreach(f: Out Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachSink(f))
def runForeach(f: Out Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachSink(f))
/**
* Concatenates a second source so that the first element