Merge pull request #16721 from ktoso/runOps-ktoso
!str #16563 terminal operations start with run*; runFold runForeach
This commit is contained in:
commit
9c9984aaf9
24 changed files with 58 additions and 57 deletions
|
|
@ -307,7 +307,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
|
||||
.map(elem => { println(s"before: $elem"); elem })
|
||||
.mapAsync(service.convert)
|
||||
.foreach(elem => println(s"after: $elem"))
|
||||
.runForeach(elem => println(s"after: $elem"))
|
||||
//#sometimes-slow-mapAsync
|
||||
|
||||
probe.expectMsg("after: A")
|
||||
|
|
@ -339,7 +339,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
|
|||
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
|
||||
.map(elem => { println(s"before: $elem"); elem })
|
||||
.mapAsyncUnordered(service.convert)
|
||||
.foreach(elem => println(s"after: $elem"))
|
||||
.runForeach(elem => println(s"after: $elem"))
|
||||
//#sometimes-slow-mapAsyncUnordered
|
||||
|
||||
probe.receiveN(10).toSet should be(Set(
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
//#echo-server-simple-handle
|
||||
val connections: Source[IncomingConnection] = binding.connections
|
||||
|
||||
connections foreach { connection =>
|
||||
connections runForeach { connection =>
|
||||
println(s"New connection from: ${connection.remoteAddress}")
|
||||
|
||||
val echo = Flow[ByteString]
|
||||
|
|
@ -77,7 +77,7 @@ class StreamTcpDocSpec extends AkkaSpec {
|
|||
val binding = StreamTcp().bind(localhost)
|
||||
|
||||
//#welcome-banner-chat-server
|
||||
binding.connections foreach { connection =>
|
||||
binding.connections runForeach { connection =>
|
||||
|
||||
val serverLogic = Flow() { implicit b =>
|
||||
import FlowGraphImplicits._
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
|
|||
//#authors-foreachsink-println
|
||||
|
||||
//#authors-foreach-println
|
||||
authors.foreach(println)
|
||||
authors.runForeach(println)
|
||||
//#authors-foreach-println
|
||||
}
|
||||
|
||||
|
|
@ -149,7 +149,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
|
|||
val completion: Future[Unit] =
|
||||
Source(1 to 10)
|
||||
.map(i => { println(s"map => $i"); i })
|
||||
.foreach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
|
||||
.runForeach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
|
||||
|
||||
Await.ready(completion, 1.minute)
|
||||
//#backpressure-by-readline
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ class RecipeReduceByKey extends RecipeSpec {
|
|||
// add counting logic to the streams
|
||||
val countedWords: Source[Future[(String, Int)]] = wordStreams.map {
|
||||
case (word, wordStream) =>
|
||||
wordStream.fold((word, 0)) {
|
||||
wordStream.runFold((word, 0)) {
|
||||
case ((w, count), _) => (w, count + 1)
|
||||
}
|
||||
}
|
||||
|
|
@ -57,7 +57,7 @@ class RecipeReduceByKey extends RecipeSpec {
|
|||
val groupStreams = Flow[In].groupBy(groupKey)
|
||||
val reducedValues = groupStreams.map {
|
||||
case (key, groupStream) =>
|
||||
groupStream.fold((key, foldZero(key))) {
|
||||
groupStream.runFold((key, foldZero(key))) {
|
||||
case ((key, aggregated), elem) => (key, fold(aggregated, elem))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue