diff --git a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala new file mode 100644 index 0000000000..a8382ccc41 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -0,0 +1,360 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package docs.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec +import akka.stream.scaladsl.Source +import java.util.Date +import akka.stream.FlowMaterializer +import scala.concurrent.Future +import akka.stream.scaladsl.RunnableFlow +import akka.stream.scaladsl.Sink +import akka.testkit.TestProbe +import akka.actor.ActorRef +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.Props +import akka.pattern.ask +import akka.util.Timeout +import akka.stream.scaladsl.OperationAttributes +import scala.concurrent.ExecutionContext +import akka.stream.MaterializerSettings +import java.util.concurrent.atomic.AtomicInteger + +object IntegrationDocSpec { + import TwitterStreamQuickstartDocSpec._ + + val config = ConfigFactory.parseString(""" + #//#blocking-dispatcher-config + blocking-dispatcher { + executor = "thread-pool-executor" + thread-pool-executor { + core-pool-size-min = 10 + core-pool-size-max = 10 + } + } + #//#blocking-dispatcher-config + + akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox + """) + + class AddressSystem { + //#email-address-lookup + def lookupEmail(handle: String): Future[Option[String]] = + //#email-address-lookup + Future.successful(Some(handle + "@somewhere.com")) + + //#phone-lookup + def lookupPhoneNumber(handle: String): Future[Option[String]] = + //#phone-lookup + Future.successful(Some(handle.hashCode.toString)) + } + + final case class Email(to: String, title: String, body: String) + final case class TextMessage(to: String, body: String) + + class EmailServer(probe: ActorRef) { + //#email-server-send + def send(email: Email): Future[Unit] = { + // ... + //#email-server-send + probe ! email.to + Future.successful(()) + //#email-server-send + } + //#email-server-send + } + + class SmsServer(probe: ActorRef) { + //#sms-server-send + def send(text: TextMessage): Unit = { + // ... + //#sms-server-send + probe ! text.to + //#sms-server-send + } + //#sms-server-send + } + + final case class Save(tweet: Tweet) + final case object SaveDone + + class DatabaseService(probe: ActorRef) extends Actor { + override def receive = { + case Save(tweet: Tweet) => + probe ! tweet.author.handle + sender() ! SaveDone + } + } + + //#sometimes-slow-service + class SometimesSlowService(implicit ec: ExecutionContext) { + //#sometimes-slow-service + def println(s: String): Unit = () + //#sometimes-slow-service + + private val runningCount = new AtomicInteger + + def convert(s: String): Future[String] = { + println(s"running: $s (${runningCount.incrementAndGet()})") + Future { + if (s.nonEmpty && s.head.isLower) + Thread.sleep(500) + else + Thread.sleep(20) + println(s"completed: $s (${runningCount.decrementAndGet()})") + s.toUpperCase + } + } + } + //#sometimes-slow-service + +} + +class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { + import TwitterStreamQuickstartDocSpec._ + import IntegrationDocSpec._ + + implicit val mat = FlowMaterializer() + + "calling external service with mapAsync" in { + val probe = TestProbe() + val addressSystem = new AddressSystem + val emailServer = new EmailServer(probe.ref) + + //#tweet-authors + val authors: Source[Author] = + tweets + .filter(_.hashtags.contains(Akka)) + .map(_.author) + //#tweet-authors + + //#email-addresses-mapAsync + val emailAddresses: Source[String] = + authors + .mapAsync(author => addressSystem.lookupEmail(author.handle)) + .collect { case Some(emailAddress) => emailAddress } + //#email-addresses-mapAsync + + //#send-emails + val sendEmails: RunnableFlow = + emailAddresses + .mapAsync { address => + emailServer.send( + Email(to = address, title = "Akka", body = "I like your tweet")) + } + .to(Sink.ignore) + + sendEmails.run() + //#send-emails + + probe.expectMsg("rolandkuhn@somewhere.com") + probe.expectMsg("patriknw@somewhere.com") + probe.expectMsg("bantonsson@somewhere.com") + probe.expectMsg("drewhk@somewhere.com") + probe.expectMsg("ktosopl@somewhere.com") + probe.expectMsg("mmartynas@somewhere.com") + probe.expectMsg("akkateam@somewhere.com") + } + + "calling external service with mapAsyncUnordered" in { + val probe = TestProbe() + val addressSystem = new AddressSystem + val emailServer = new EmailServer(probe.ref) + + //#external-service-mapAsyncUnordered + val authors: Source[Author] = + tweets.filter(_.hashtags.contains(Akka)).map(_.author) + + val emailAddresses: Source[String] = + authors + .mapAsyncUnordered(author => addressSystem.lookupEmail(author.handle)) + .collect { case Some(emailAddress) => emailAddress } + + val sendEmails: RunnableFlow = + emailAddresses + .mapAsyncUnordered { address => + emailServer.send( + Email(to = address, title = "Akka", body = "I like your tweet")) + } + .to(Sink.ignore) + + sendEmails.run() + //#external-service-mapAsyncUnordered + + probe.receiveN(7).toSet should be(Set( + "rolandkuhn@somewhere.com", + "patriknw@somewhere.com", + "bantonsson@somewhere.com", + "drewhk@somewhere.com", + "ktosopl@somewhere.com", + "mmartynas@somewhere.com", + "akkateam@somewhere.com")) + } + + "careful managed blocking with mapAsync" in { + val probe = TestProbe() + val addressSystem = new AddressSystem + val smsServer = new SmsServer(probe.ref) + + val authors = tweets.filter(_.hashtags.contains(Akka)).map(_.author) + + val phoneNumbers = + authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle)) + .collect { case Some(phoneNo) => phoneNo } + + //#blocking-mapAsync + val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher") + + val sendTextMessages: RunnableFlow = + phoneNumbers + .mapAsync { phoneNo => + Future { + smsServer.send( + TextMessage(to = phoneNo, body = "I like your tweet")) + }(blockingExecutionContext) + } + .to(Sink.ignore) + + sendTextMessages.run() + //#blocking-mapAsync + + probe.receiveN(7).toSet should be(Set( + "rolandkuhn".hashCode.toString, + "patriknw".hashCode.toString, + "bantonsson".hashCode.toString, + "drewhk".hashCode.toString, + "ktosopl".hashCode.toString, + "mmartynas".hashCode.toString, + "akkateam".hashCode.toString)) + } + + "careful managed blocking with map" in { + val probe = TestProbe() + val addressSystem = new AddressSystem + val smsServer = new SmsServer(probe.ref) + + val authors = tweets.filter(_.hashtags.contains(Akka)).map(_.author) + + val phoneNumbers = + authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle)) + .collect { case Some(phoneNo) => phoneNo } + + //#blocking-map + val sendTextMessages: RunnableFlow = + phoneNumbers + .section(OperationAttributes.dispatcher("blocking-dispatcher")) { + _.map { phoneNo => + smsServer.send( + TextMessage(to = phoneNo, body = "I like your tweet")) + } + } + .to(Sink.ignore) + + sendTextMessages.run() + //#blocking-map + + probe.expectMsg("rolandkuhn".hashCode.toString) + probe.expectMsg("patriknw".hashCode.toString) + probe.expectMsg("bantonsson".hashCode.toString) + probe.expectMsg("drewhk".hashCode.toString) + probe.expectMsg("ktosopl".hashCode.toString) + probe.expectMsg("mmartynas".hashCode.toString) + probe.expectMsg("akkateam".hashCode.toString) + } + + "calling actor service with mapAsync" in { + val probe = TestProbe() + val database = system.actorOf(Props(classOf[DatabaseService], probe.ref), "db") + + //#save-tweets + import akka.pattern.ask + + val akkaTweets: Source[Tweet] = tweets.filter(_.hashtags.contains(Akka)) + + implicit val timeout = Timeout(3.seconds) + val saveTweets: RunnableFlow = + akkaTweets + .mapAsync(tweet => database ? Save(tweet)) + .to(Sink.ignore) + //#save-tweets + + saveTweets.run() + + probe.expectMsg("rolandkuhn") + probe.expectMsg("patriknw") + probe.expectMsg("bantonsson") + probe.expectMsg("drewhk") + probe.expectMsg("ktosopl") + probe.expectMsg("mmartynas") + probe.expectMsg("akkateam") + } + + "illustrate ordering and parallelism of mapAsync" in { + val probe = TestProbe() + def println(s: String): Unit = { + if (s.startsWith("after:")) + probe.ref ! s + } + + //#sometimes-slow-mapAsync + implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher") + val service = new SometimesSlowService + + implicit val mat = FlowMaterializer( + MaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4)) + + Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) + .map(elem => { println(s"before: $elem"); elem }) + .mapAsync(service.convert) + .foreach(elem => println(s"after: $elem")) + //#sometimes-slow-mapAsync + + probe.expectMsg("after: A") + probe.expectMsg("after: B") + probe.expectMsg("after: C") + probe.expectMsg("after: D") + probe.expectMsg("after: E") + probe.expectMsg("after: F") + probe.expectMsg("after: G") + probe.expectMsg("after: H") + probe.expectMsg("after: I") + probe.expectMsg("after: J") + } + + "illustrate ordering and parallelism of mapAsyncUnordered" in { + val probe = TestProbe() + def println(s: String): Unit = { + if (s.startsWith("after:")) + probe.ref ! s + } + + //#sometimes-slow-mapAsyncUnordered + implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher") + val service = new SometimesSlowService + + implicit val mat = FlowMaterializer( + MaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4)) + + Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) + .map(elem => { println(s"before: $elem"); elem }) + .mapAsyncUnordered(service.convert) + .foreach(elem => println(s"after: $elem")) + //#sometimes-slow-mapAsyncUnordered + + probe.receiveN(10).toSet should be(Set( + "after: A", + "after: B", + "after: C", + "after: D", + "after: E", + "after: F", + "after: G", + "after: H", + "after: I", + "after: J")) + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala index 330de1ac3a..fa00df3457 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/TwitterStreamQuickstartDocSpec.scala @@ -26,23 +26,40 @@ import concurrent.Future import akka.stream.testkit.AkkaSpec -// TODO replace ⇒ with => and disable this intellij setting -class TwitterStreamQuickstartDocSpec extends AkkaSpec { - - implicit val executionContext = system.dispatcher - +object TwitterStreamQuickstartDocSpec { //#model final case class Author(handle: String) val AkkaTeam = Author("akkateam") + val Akka = Hashtag("#akka") final case class Hashtag(name: String) final case class Tweet(author: Author, timestamp: Long, body: String) { - def hashtags: List[Hashtag] = - body.split(" ").toList.collect { case t if t.startsWith("#") ⇒ Hashtag(t) } + def hashtags: Set[Hashtag] = + body.split(" ").collect { case t if t.startsWith("#") ⇒ Hashtag(t) }.toSet } //#model + val tweets = Source( + Tweet(Author("rolandkuhn"), (new Date).getTime, "#akka rocks!") :: + Tweet(Author("patriknw"), (new Date).getTime, "#akka !") :: + Tweet(Author("bantonsson"), (new Date).getTime, "#akka !") :: + Tweet(Author("drewhk"), (new Date).getTime, "#akka !") :: + Tweet(Author("ktosopl"), (new Date).getTime, "#akka on the rocks!") :: + Tweet(Author("mmartynas"), (new Date).getTime, "wow #akka !") :: + Tweet(Author("akkateam"), (new Date).getTime, "#akka rocks!") :: + Tweet(Author("bananaman"), (new Date).getTime, "#bananas rock!") :: + Tweet(Author("appleman"), (new Date).getTime, "#apples rock!") :: + Tweet(Author("drama"), (new Date).getTime, "we compared #apples to #oranges!") :: + Nil) +} + +// TODO replace ⇒ with => and disable this intellij setting +class TwitterStreamQuickstartDocSpec extends AkkaSpec { + import TwitterStreamQuickstartDocSpec._ + + implicit val executionContext = system.dispatcher + trait Example0 { //#tweet-source val tweets: Source[Tweet] @@ -56,33 +73,20 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { //#materializer-setup } - val tweets = Source( - Tweet(Author("rolandkuhn"), (new Date).getTime, "#akka rocks!") :: - Tweet(Author("patriknw"), (new Date).getTime, "#akka!") :: - Tweet(Author("bantonsson"), (new Date).getTime, "#akka!") :: - Tweet(Author("drewhk"), (new Date).getTime, "#akka!") :: - Tweet(Author("ktosopl"), (new Date).getTime, "#akka on the rocks!") :: - Tweet(Author("mmartynas"), (new Date).getTime, "wow #akka!") :: - Tweet(Author("akkateam"), (new Date).getTime, "#akka rocks!") :: - Tweet(Author("bananaman"), (new Date).getTime, "#bananas rock!") :: - Tweet(Author("appleman"), (new Date).getTime, "#apples rock!") :: - Tweet(Author("drama"), (new Date).getTime, "we compared #apples to #oranges!") :: - Nil) - implicit val mat = FlowMaterializer() "filter and map" in { //#authors-filter-map val authors: Source[Author] = tweets - .filter(_.hashtags.contains("#akka")) + .filter(_.hashtags.contains(Akka)) .map(_.author) //#authors-filter-map trait Example3 { //#authors-collect val authors: Source[Author] = - tweets.collect { case t if t.hashtags.contains("#akka") ⇒ t.author } + tweets.collect { case t if t.hashtags.contains(Akka) ⇒ t.author } //#authors-collect } @@ -97,7 +101,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { "mapConcat hashtags" in { //#hashtags-mapConcat - val hashtags: Source[Hashtag] = tweets.mapConcat(_.hashtags) + val hashtags: Source[Hashtag] = tweets.mapConcat(_.hashtags.toList) //#hashtags-mapConcat } @@ -119,7 +123,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val b = Broadcast[Tweet] tweets ~> b ~> Flow[Tweet].map(_.author) ~> writeAuthors - b ~> Flow[Tweet].mapConcat(_.hashtags) ~> writeHashtags + b ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags } g.run() //#flow-graph-broadcast @@ -181,13 +185,13 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec { val sumSink = Sink.fold[Int, Int](0)(_ + _) val counterRunnableFlow: RunnableFlow = tweetsInMinuteFromNow - .filter(_.hashtags contains "#akka") + .filter(_.hashtags contains Akka) .map(t ⇒ 1) .to(sumSink) // materialize the stream once in the morning val morningMaterialized = counterRunnableFlow.run() - // and once in the evening, reusing the + // and once in the evening, reusing the val eveningMaterialized = counterRunnableFlow.run() // the sumSink materialized two different futures diff --git a/akka-docs-dev/rst/scala/stream-integration-external.rst b/akka-docs-dev/rst/scala/stream-integration-external.rst new file mode 100644 index 0000000000..c6455a9b69 --- /dev/null +++ b/akka-docs-dev/rst/scala/stream-integration-external.rst @@ -0,0 +1,213 @@ +.. _stream-integration-external-scala: + +Integrating with External Services +================================== + +Stream transformations and side effects involving external non-stream based services can be +performed with ``mapAsync`` or ``mapAsyncUnordered``. + +For example, sending emails to the authors of selected tweets using an external +email service: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-server-send + +We start with the tweet stream of authors: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#tweet-authors + +Assume that we can lookup their email address using: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-address-lookup + +Transforming the stream of authors to a stream of email addresses by using the ``lookupEmail`` +service can be done with ``mapAsync``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#email-addresses-mapAsync + +Finally, sending the emails: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#send-emails + +``mapAsync`` is applying the given function that is calling out to the external service to +each of the elements as they pass through this processing step. The function returns a :class:`Future` +and the value of that future will be emitted downstreams. As many futures as requested elements by +downstream may run in parallel and may complete in any order, but the elements that +are emitted downstream are in the same order as received from upstream. + +That means that back-pressure works as expected. For example if the ``emailServer.send`` +is the bottleneck it will limit the rate at which incoming tweets are retrieved and +email addresses looked up. + +Note that ``mapAsync`` preserves the order of the stream elements. In this example the order +is not important and then we can use the more efficient ``mapAsyncUnordered``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#external-service-mapAsyncUnordered + +In the above example the services conveniently returned a :class:`Future` of the result. +If that is not the case you need to wrap the call in a :class:`Future`. If the service call +involves blocking you must also make sure that you run it on a dedicated execution context, to +avoid starvation and disturbance of other tasks in the system. + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-mapAsync + +The configuration of the ``"blocking-dispatcher"`` may look something like: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-dispatcher-config + +An alternative for blocking calls is to perform them in a ``map`` operation, still using a +dedicated dispatcher for that operation. + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#blocking-map + +However, that is not exactly the same as ``mapAsync``, since the ``mapAsync`` may run +several calls concurrently, but ``map`` performs them one at a time. + +For a service that is exposed as an actor, or if an actor is used as a gateway in front of an +external service, you can use ``ask``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#save-tweets + +Note that if the ``ask`` is not completed within the given timeout the stream is completed with failure. +If that is not desired outcome you can use ``recover`` on the ``ask`` :class:`Future`. + +Illustrating ordering and parallelism +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Let us look at another example to get a better understanding of the ordering +and parallelism characteristics of ``mapAsync`` and ``mapAsyncUnordered``. + +Several ``mapAsync`` and ``mapAsyncUnordered`` futures may run concurrently. +The number of concurrent futures are limited by the downstream demand. +For example, if 5 elements have been requested by downstream there will be at most 5 +futures in progress. + +``mapAsync`` emits the future results in the same order as the input elements +were received. That means that completed results are only emitted downstreams +when earlier results have been completed and emitted. One slow call will thereby +delay the results of all successive calls, even though they are completed before +the slow call. + +``mapAsyncUnordered`` emits the future results as soon as they are completed, i.e. +it is possible that the elements are not emitted downstream in the same order as +received from upstream. One slow call will thereby not delay the results of faster +successive calls as long as there is downstream demand of several elements. + +Here is a fictive service that we can use to illustrate these aspects. + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-service + +Elements starting with a lower case character are simulated to take longer time +to process. + +Here is how we can use it with ``mapAsync``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-mapAsync + +The output may look like this: + +:: + + before: a + before: B + before: C + before: D + running: a (1) + running: B (2) + before: e + running: C (3) + before: F + running: D (4) + before: g + before: H + completed: C (3) + completed: B (2) + completed: D (1) + completed: a (0) + after: A + after: B + running: e (1) + after: C + after: D + running: F (2) + before: i + before: J + running: g (3) + running: H (4) + completed: H (2) + completed: F (3) + completed: e (1) + completed: g (0) + after: E + after: F + running: i (1) + after: G + after: H + running: J (2) + completed: J (1) + completed: i (0) + after: I + after: J + +Note that ``after`` lines are in the same order as the ``before`` lines even +though elements are ``completed`` in a different order. For example ``H`` +is ``completed`` before ``g``, but still emitted afterwards. + +The numbers in parenthesis illustrates how many calls that are in progress at +the same time. Here the downstream demand and thereby the number of concurrent +calls are limited by the buffer size (4) of the :class:`MaterializerSettings`. + +Here is how we can use the same service with ``mapAsyncUnordered``: + +.. includecode:: code/docs/stream/IntegrationDocSpec.scala#sometimes-slow-mapAsyncUnordered + +The output may look like this: + +:: + + before: a + before: B + before: C + before: D + running: a (1) + running: B (2) + before: e + running: C (3) + before: F + running: D (4) + before: g + before: H + completed: B (3) + completed: C (1) + completed: D (2) + after: B + after: D + running: e (2) + after: C + running: F (3) + before: i + before: J + completed: F (2) + after: F + running: g (3) + running: H (4) + completed: H (3) + after: H + completed: a (2) + after: A + running: i (3) + running: J (4) + completed: J (3) + after: J + completed: e (2) + after: E + completed: g (1) + after: G + completed: i (0) + after: I + +Note that ``after`` lines are not in the same order as the ``before`` lines. For example +``H`` overtakes the slow ``G``. + +The numbers in parenthesis illustrates how many calls that are in progress at +the same time. Here the downstream demand and thereby the number of concurrent +calls are limited by the buffer size (4) of the :class:`MaterializerSettings`. diff --git a/akka-docs-dev/rst/scala/stream.rst b/akka-docs-dev/rst/scala/stream.rst index 168fa884cb..8b077d725e 100644 --- a/akka-docs-dev/rst/scala/stream.rst +++ b/akka-docs-dev/rst/scala/stream.rst @@ -19,6 +19,11 @@ It should be roughly: **TODO - write me** +.. toctree:: + :maxdepth: 1 + + stream-integration-external + Motivation ========== @@ -432,18 +437,6 @@ Flexi Route ----------- **TODO - write me (feel free to move around as well)** -Actor based custom elements ---------------------------- - -ActorPublisher -^^^^^^^^^^^^^^ - -ActorSubscriber -^^^^^^^^^^^^^^^ - - -// TODO: Implementing Reactive Streams interfaces directly vs. extending ActorPublisher / ActorSubscriber??? - Integrating with Actors ======================= @@ -462,3 +455,13 @@ Integration with Reactive Streams enabled libraries // TODO: Simply runWith(Sink.publisher) and runWith(Source.subscriber) to get the corresponding reactive streams types. +// TODO: fanoutPublisher + +ActorPublisher +^^^^^^^^^^^^^^ + +ActorSubscriber +^^^^^^^^^^^^^^^ + +// TODO: Implementing Reactive Streams interfaces directly vs. extending ActorPublisher / ActoSubscriber??? + diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index ed568c6d57..b70884fc8d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -147,10 +147,10 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { /** * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by + * as they pass through this processing step. The function returns a `Future` and the + * value of that future will be emitted downstreams. As many futures as requested elements by * downstream may run in parallel and may complete in any order, but the elements that - * are emitted downstream are in the same order as from upstream. + * are emitted downstream are in the same order as received from upstream. * * @see [[#mapAsyncUnordered]] */ @@ -159,11 +159,11 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { /** * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by + * as they pass through this processing step. The function returns a `Future` and the + * value of that future will be emitted downstreams. As many futures as requested elements by * downstream may run in parallel and each processed element will be emitted dowstream * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream - * in the same order as from upstream. + * in the same order as received from upstream. * * @see [[#mapAsync]] */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 22caf8fca5..fcc0ca8d7c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -238,10 +238,10 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { /** * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by + * as they pass through this processing step. The function returns a `Future` and the + * value of that future will be emitted downstreams. As many futures as requested elements by * downstream may run in parallel and may complete in any order, but the elements that - * are emitted downstream are in the same order as from upstream. + * are emitted downstream are in the same order as received from upstream. * * @see [[#mapAsyncUnordered]] */ @@ -250,11 +250,11 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { /** * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by + * as they pass through this processing step. The function returns a `Future` and the + * value of that future will be emitted downstreams. As many futures as requested elements by * downstream may run in parallel and each processed element will be emitted dowstream * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream - * in the same order as from upstream. + * in the same order as received from upstream. * * @see [[#mapAsync]] */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7ddfd323c4..30dbae3ce6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -155,10 +155,10 @@ trait FlowOps[+Out] { /** * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by + * as they pass through this processing step. The function returns a `Future` and the + * value of that future will be emitted downstreams. As many futures as requested elements by * downstream may run in parallel and may complete in any order, but the elements that - * are emitted downstream are in the same order as from upstream. + * are emitted downstream are in the same order as received from upstream. * * @see [[#mapAsyncUnordered]] */ @@ -167,11 +167,11 @@ trait FlowOps[+Out] { /** * Transform this stream by applying the given function to each of the elements - * as they pass through this processing step. The function returns a `Future` of the - * element that will be emitted downstream. As many futures as requested elements by + * as they pass through this processing step. The function returns a `Future` and the + * value of that future will be emitted downstreams. As many futures as requested elements by * downstream may run in parallel and each processed element will be emitted dowstream * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream - * in the same order as from upstream. + * in the same order as received from upstream. * * @see [[#mapAsync]] */