diff --git a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala index ec08d593ec..953a094d69 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/FlowGraphDocSpec.scala @@ -226,7 +226,7 @@ class FlowGraphDocSpec extends AkkaSpec { val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) { implicit builder ⇒ fold ⇒ - (fold.inlet, builder.matValue.mapAsync(4, identity).outlet) + (fold.inlet, builder.matValue.mapAsync(4)(identity).outlet) } //#flow-graph-matvalue @@ -243,8 +243,8 @@ class FlowGraphDocSpec extends AkkaSpec { // fold completes // As a result this Source will never emit anything, and its materialited // Future will never complete - builder.matValue.mapAsync(4, identity) ~> fold - builder.matValue.mapAsync(4, identity).outlet + builder.matValue.mapAsync(4)(identity) ~> fold + builder.matValue.mapAsync(4)(identity).outlet } //#flow-graph-matvalue-cycle } diff --git a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala index a27a20c1f0..0fb750d322 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/IntegrationDocSpec.scala @@ -141,14 +141,14 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { //#email-addresses-mapAsync val emailAddresses: Source[String, Unit] = authors - .mapAsync(4, author => addressSystem.lookupEmail(author.handle)) + .mapAsync(4)(author => addressSystem.lookupEmail(author.handle)) .collect { case Some(emailAddress) => emailAddress } //#email-addresses-mapAsync //#send-emails val sendEmails: RunnableFlow[Unit] = emailAddresses - .mapAsync(4, address => { + .mapAsync(4)(address => { emailServer.send( Email(to = address, title = "Akka", body = "I like your tweet")) }) @@ -177,7 +177,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val emailAddresses: Source[String, Unit] = authors.via( - Flow[Author].mapAsync(4, author => addressSystem.lookupEmail(author.handle)) + Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle)) .withAttributes(supervisionStrategy(resumingDecider))) //#email-addresses-mapAsync-supervision } @@ -193,12 +193,12 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val emailAddresses: Source[String, Unit] = authors - .mapAsyncUnordered(4, author => addressSystem.lookupEmail(author.handle)) + .mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle)) .collect { case Some(emailAddress) => emailAddress } val sendEmails: RunnableFlow[Unit] = emailAddresses - .mapAsyncUnordered(4, address => { + .mapAsyncUnordered(4)(address => { emailServer.send( Email(to = address, title = "Akka", body = "I like your tweet")) }) @@ -225,7 +225,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author) val phoneNumbers = - authors.mapAsync(4, author => addressSystem.lookupPhoneNumber(author.handle)) + authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle)) .collect { case Some(phoneNo) => phoneNo } //#blocking-mapAsync @@ -233,7 +233,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val sendTextMessages: RunnableFlow[Unit] = phoneNumbers - .mapAsync(4, phoneNo => { + .mapAsync(4)(phoneNo => { Future { smsServer.send( TextMessage(to = phoneNo, body = "I like your tweet")) @@ -262,7 +262,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author) val phoneNumbers = - authors.mapAsync(4, author => addressSystem.lookupPhoneNumber(author.handle)) + authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle)) .collect { case Some(phoneNo) => phoneNo } //#blocking-map @@ -296,7 +296,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { implicit val timeout = Timeout(3.seconds) val saveTweets: RunnableFlow[Unit] = akkaTweets - .mapAsync(4, tweet => database ? Save(tweet)) + .mapAsync(4)(tweet => database ? Save(tweet)) .to(Sink.ignore) //#save-tweets @@ -327,7 +327,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) .map(elem => { println(s"before: $elem"); elem }) - .mapAsync(4, service.convert) + .mapAsync(4)(service.convert) .runForeach(elem => println(s"after: $elem")) //#sometimes-slow-mapAsync @@ -359,7 +359,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) .map(elem => { println(s"before: $elem"); elem }) - .mapAsyncUnordered(4, service.convert) + .mapAsyncUnordered(4)(service.convert) .runForeach(elem => println(s"after: $elem")) //#sometimes-slow-mapAsyncUnordered diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeGlobalRateLimit.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeGlobalRateLimit.scala index 19d2a4475c..6f7dfcfa28 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeGlobalRateLimit.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeGlobalRateLimit.scala @@ -79,7 +79,7 @@ class RecipeGlobalRateLimit extends RecipeSpec { def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, Unit] = { import akka.pattern.ask import akka.util.Timeout - Flow[T].mapAsync(4, (element: T) => { + Flow[T].mapAsync(4)((element: T) => { import system.dispatcher implicit val triggerTimeout = Timeout(maxAllowedWait) val limiterTriggerFuture = limiter ? Limiter.WantToPass diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala index c5782645f6..9a1ce6dfa9 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala @@ -44,7 +44,7 @@ class RecipeMultiGroupBy extends RecipeSpec { val result = multiGroups.map { case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head) - }.mapAsync(4, identity).grouped(10).runWith(Sink.head) + }.mapAsync(4)(identity).grouped(10).runWith(Sink.head) Await.result(result, 3.seconds).toSet should be(Set( "1[1: a, 1: b, all: c, all: d, 1: e]", diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala index 25ad5dde2c..ec0e49c8fa 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala @@ -32,7 +32,7 @@ class RecipeReduceByKey extends RecipeSpec { val counts: Source[(String, Int), Unit] = countedWords .buffer(MaximumDistinctWords, OverflowStrategy.fail) - .mapAsync(4, identity) + .mapAsync(4)(identity) //#word-count Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( @@ -62,7 +62,7 @@ class RecipeReduceByKey extends RecipeSpec { } } - reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(4, identity) + reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(4)(identity) } val wordCounts = words.via(reduceByKey( diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index d38aa46981..307c023eeb 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -103,7 +103,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E options: immutable.Traversable[Inet.SocketOption] = Nil, settings: ServerSettings = ServerSettings(system), log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = - bindAndHandle(Flow[HttpRequest].mapAsync(1, handler), interface, port, backlog, options, settings, log) + bindAndHandle(Flow[HttpRequest].mapAsync(1)(handler), interface, port, backlog, options, settings, log) /** * The type of the server-side HTTP layer as a stand-alone BidiStage @@ -374,7 +374,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E implicit system: ActorSystem, fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = { // a connection pool can never have more than pipeliningLimit * maxConnections requests in flight at any point val parallelism = settings.pipeliningLimit * settings.maxConnections - Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism, { + Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism) { case (request, userContext) ⇒ val (effectiveRequest, gatewayFuture) = f(request) val result = Promise[(Try[HttpResponse], T)]() // TODO: simplify to `transformWith` when on Scala 2.12 @@ -382,7 +382,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E .flatMap(_(effectiveRequest))(fm.executionContext) .onComplete(responseTry ⇒ result.success(responseTry -> userContext))(fm.executionContext) result.future - }) + } } private def hostHeader(host: String, port: Int, scheme: String) = headers.Host(host, Uri.normalizePort(port, scheme)) @@ -434,7 +434,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider { * Returns the materialization result of the underlying flow materialization. */ def handleWithAsyncHandler(handler: HttpRequest ⇒ Future[HttpResponse])(implicit fm: FlowMaterializer): Unit = - handleWith(Flow[HttpRequest].mapAsync(1, handler)) + handleWith(Flow[HttpRequest].mapAsync(1)(handler)) } /** diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala index ed98173171..209c3d9270 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/client/HighLevelOutgoingConnectionSpec.scala @@ -28,7 +28,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka .take(N) .map(id ⇒ HttpRequest(uri = s"/r$id")) .via(Http().outgoingConnection(serverHostName, serverPort)) - .mapAsync(4, _.entity.toStrict(1.second)) + .mapAsync(4)(_.entity.toStrict(1.second)) .map { r ⇒ val s = r.data.utf8String; log.debug(s); s.toInt } .runFold(0)(_ + _) @@ -60,11 +60,11 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka .take(N) .map(id ⇒ HttpRequest(uri = s"/r$id")) .via(doubleConnection) - .mapAsync(4, _.entity.toStrict(1.second)) + .mapAsync(4)(_.entity.toStrict(1.second)) .map { r ⇒ val s = r.data.utf8String; log.debug(s); s.toInt } .runFold(0)(_ + _) Await.result(result, 10.seconds) shouldEqual C * N * (N + 1) / 2 } } -} \ No newline at end of file +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala index aaca97a2c3..fa5c52f6e3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TlsSpec.scala @@ -262,7 +262,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off case SessionBytes(s, bytes) ⇒ bytes.map(b ⇒ SessionBytes(s, ByteString(b))) } .take(5) - .mapAsync(5, x ⇒ later(500.millis, system.scheduler)(Future.successful(x))) + .mapAsync(5)(x ⇒ later(500.millis, system.scheduler)(Future.successful(x))) .via(super.flow) override def rightClosing = IgnoreCancel @@ -279,7 +279,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off case SessionBytes(s, bytes) ⇒ bytes.map(b ⇒ SessionBytes(s, ByteString(b))) } .take(5) - .mapAsync(5, x ⇒ later(500.millis, system.scheduler)(Future.successful(x))) + .mapAsync(5)(x ⇒ later(500.millis, system.scheduler)(Future.successful(x))) .via(super.flow) override def rightClosing = IgnoreBoth diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index b262b3db2d..98290ff466 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -69,7 +69,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "produce future elements" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 3).mapAsync(4, n ⇒ Future(n)).runWith(Sink(c)) + val p = Source(1 to 3).mapAsync(4)(n ⇒ Future(n)).runWith(Sink(c)) val sub = c.expectSubscription() sub.request(2) c.expectNext(1) @@ -83,7 +83,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "produce future elements in order" in { val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 50).mapAsync(4, n ⇒ Future { + val p = Source(1 to 50).mapAsync(4)(n ⇒ Future { Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) n }).to(Sink(c)).run() @@ -97,7 +97,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val probe = TestProbe() val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 20).mapAsync(8, n ⇒ Future { + val p = Source(1 to 20).mapAsync(8)(n ⇒ Future { probe.ref ! n n }).to(Sink(c)).run() @@ -123,7 +123,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val latch = TestLatch(1) val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).mapAsync(4, n ⇒ Future { + val p = Source(1 to 5).mapAsync(4)(n ⇒ Future { if (n == 3) throw new RuntimeException("err1") with NoStackTrace else { Await.ready(latch, 10.seconds) @@ -140,7 +140,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val latch = TestLatch(1) val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).mapAsync(4, n ⇒ + val p = Source(1 to 5).mapAsync(4)(n ⇒ if (n == 3) throw new RuntimeException("err2") with NoStackTrace else { Future { @@ -159,7 +159,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher val p = Source(1 to 5) - .mapAsync(4, n ⇒ Future { + .mapAsync(4)(n ⇒ Future { if (n == 3) throw new RuntimeException("err3") with NoStackTrace else n }) @@ -173,7 +173,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "finish after future failure" in assertAllStagesStopped { import system.dispatcher - Await.result(Source(1 to 3).mapAsync(1, n ⇒ Future { + Await.result(Source(1 to 3).mapAsync(1)(n ⇒ Future { if (n == 3) throw new RuntimeException("err3b") with NoStackTrace else n }).withAttributes(supervisionStrategy(resumingDecider)) @@ -185,7 +185,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher val p = Source(1 to 5) - .mapAsync(4, n ⇒ + .mapAsync(4)(n ⇒ if (n == 3) throw new RuntimeException("err4") with NoStackTrace else Future(n)) .withAttributes(supervisionStrategy(resumingDecider)) @@ -198,7 +198,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "signal NPE when future is completed with null" in { val c = TestSubscriber.manualProbe[String]() - val p = Source(List("a", "b")).mapAsync(4, elem ⇒ Future.successful(null)).to(Sink(c)).run() + val p = Source(List("a", "b")).mapAsync(4)(elem ⇒ Future.successful(null)).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) @@ -207,7 +207,7 @@ class FlowMapAsyncSpec extends AkkaSpec { "resume when future is completed with null" in { val c = TestSubscriber.manualProbe[String]() val p = Source(List("a", "b", "c")) - .mapAsync(4, elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem)) + .mapAsync(4)(elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem)) .withAttributes(supervisionStrategy(resumingDecider)) .to(Sink(c)).run() val sub = c.expectSubscription() @@ -220,7 +220,7 @@ class FlowMapAsyncSpec extends AkkaSpec { val pub = TestPublisher.manualProbe[Int]() val sub = TestSubscriber.manualProbe[Int]() - Source(pub).mapAsync(4, Future.successful).runWith(Sink(sub)) + Source(pub).mapAsync(4)(Future.successful).runWith(Sink(sub)) val upstream = pub.expectSubscription() upstream.expectRequest() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index 08d63abc51..d0faed508a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -28,7 +28,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher val latch = (1 to 4).map(_ -> TestLatch(1)).toMap - val p = Source(1 to 4).mapAsyncUnordered(4, n ⇒ Future { + val p = Source(1 to 4).mapAsyncUnordered(4)(n ⇒ Future { Await.ready(latch(n), 5.seconds) n }).to(Sink(c)).run() @@ -49,7 +49,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val probe = TestProbe() val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 20).mapAsyncUnordered(4, n ⇒ Future { + val p = Source(1 to 20).mapAsyncUnordered(4)(n ⇒ Future { probe.ref ! n n }).to(Sink(c)).run() @@ -76,7 +76,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val latch = TestLatch(1) val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).mapAsyncUnordered(4, n ⇒ Future { + val p = Source(1 to 5).mapAsyncUnordered(4)(n ⇒ Future { if (n == 3) throw new RuntimeException("err1") with NoStackTrace else { Await.ready(latch, 10.seconds) @@ -93,7 +93,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val latch = TestLatch(1) val c = TestSubscriber.manualProbe[Int]() implicit val ec = system.dispatcher - val p = Source(1 to 5).mapAsyncUnordered(4, n ⇒ + val p = Source(1 to 5).mapAsyncUnordered(4)(n ⇒ if (n == 3) throw new RuntimeException("err2") with NoStackTrace else { Future { @@ -111,7 +111,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "resume after future failure" in { implicit val ec = system.dispatcher Source(1 to 5) - .mapAsyncUnordered(4, n ⇒ Future { + .mapAsyncUnordered(4)(n ⇒ Future { if (n == 3) throw new RuntimeException("err3") with NoStackTrace else n }) @@ -124,7 +124,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "finish after future failure" in assertAllStagesStopped { import system.dispatcher - Await.result(Source(1 to 3).mapAsyncUnordered(1, n ⇒ Future { + Await.result(Source(1 to 3).mapAsyncUnordered(1)(n ⇒ Future { if (n == 3) throw new RuntimeException("err3b") with NoStackTrace else n }).withAttributes(supervisionStrategy(resumingDecider)) @@ -135,7 +135,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "resume when mapAsyncUnordered throws" in { implicit val ec = system.dispatcher Source(1 to 5) - .mapAsyncUnordered(4, n ⇒ + .mapAsyncUnordered(4)(n ⇒ if (n == 3) throw new RuntimeException("err4") with NoStackTrace else Future(n)) .withAttributes(supervisionStrategy(resumingDecider)) @@ -147,7 +147,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "signal NPE when future is completed with null" in { val c = TestSubscriber.manualProbe[String]() - val p = Source(List("a", "b")).mapAsyncUnordered(4, elem ⇒ Future.successful(null)).to(Sink(c)).run() + val p = Source(List("a", "b")).mapAsyncUnordered(4)(elem ⇒ Future.successful(null)).to(Sink(c)).run() val sub = c.expectSubscription() sub.request(10) c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) @@ -156,7 +156,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { "resume when future is completed with null" in { val c = TestSubscriber.manualProbe[String]() val p = Source(List("a", "b", "c")) - .mapAsyncUnordered(4, elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem)) + .mapAsyncUnordered(4)(elem ⇒ if (elem == "b") Future.successful(null) else Future.successful(elem)) .withAttributes(supervisionStrategy(resumingDecider)) .to(Sink(c)).run() val sub = c.expectSubscription() @@ -169,7 +169,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec { val pub = TestPublisher.manualProbe[Int]() val sub = TestSubscriber.manualProbe[Int]() - Source(pub).mapAsyncUnordered(4, Future.successful).runWith(Sink(sub)) + Source(pub).mapAsyncUnordered(4)(Future.successful).runWith(Sink(sub)) val upstream = pub.expectSubscription() upstream.expectRequest() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala index d0f691bb42..2474f523ef 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -28,7 +28,7 @@ class GraphMatValueSpec extends AkkaSpec { val f = FlowGraph.closed(foldSink) { implicit b ⇒ fold ⇒ Source(1 to 10) ~> fold - b.matValue.mapAsync(4, identity) ~> Sink(sub) + b.matValue.mapAsync(4)(identity) ~> Sink(sub) }.run() val r1 = Await.result(f, 3.seconds) @@ -45,8 +45,8 @@ class GraphMatValueSpec extends AkkaSpec { fold ⇒ val zip = b.add(ZipWith[Int, Int, Int](_ + _)) Source(1 to 10) ~> fold - b.matValue.mapAsync(4, identity) ~> zip.in0 - b.matValue.mapAsync(4, identity) ~> zip.in1 + b.matValue.mapAsync(4)(identity) ~> zip.in0 + b.matValue.mapAsync(4)(identity) ~> zip.in1 zip.out ~> Sink(sub) }.run() @@ -66,13 +66,13 @@ class GraphMatValueSpec extends AkkaSpec { } "allow exposing the materialized value as port" in { - val (f1, f2) = foldFeedbackSource.mapAsync(4, identity).map(_ + 100).toMat(Sink.head)(Keep.both).run() + val (f1, f2) = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run() Await.result(f1, 3.seconds) should ===(55) Await.result(f2, 3.seconds) should ===(155) } "allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in { - val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4, identity).map(_ + 100).mapMaterialized((_) ⇒ ()) + val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterialized((_) ⇒ ()) Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155) } @@ -81,8 +81,8 @@ class GraphMatValueSpec extends AkkaSpec { (s1, s2) ⇒ val zip = b.add(ZipWith[Int, Int, Int](_ + _)) - s1.outlet.mapAsync(4, identity) ~> zip.in0 - s2.outlet.mapAsync(4, identity).map(_ * 100) ~> zip.in1 + s1.outlet.mapAsync(4)(identity) ~> zip.in0 + s2.outlet.mapAsync(4)(identity).map(_ * 100) ~> zip.in1 zip.out } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index ca21076054..313d38c165 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -216,7 +216,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * @see [[#mapAsyncUnordered]] */ def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.mapAsync(parallelism, f.apply)) + new Flow(delegate.mapAsync(parallelism)(f.apply)) /** * Transform this stream by applying the given function to each of the elements @@ -245,7 +245,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph * @see [[#mapAsync]] */ def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] = - new Flow(delegate.mapAsyncUnordered(parallelism, f.apply)) + new Flow(delegate.mapAsyncUnordered(parallelism)(f.apply)) /** * Only pass on those elements that satisfy the given predicate. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index cf3641166c..fde6b1e319 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -328,7 +328,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * @see [[#mapAsyncUnordered]] */ def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] = - new Source(delegate.mapAsync(parallelism, f.apply)) + new Source(delegate.mapAsync(parallelism)(f.apply)) /** * Transform this stream by applying the given function to each of the elements @@ -341,7 +341,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour * @see [[#mapAsync]] */ def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] = - new Source(delegate.mapAsyncUnordered(parallelism, f.apply)) + new Source(delegate.mapAsyncUnordered(parallelism)(f.apply)) /** * Only pass on those elements that satisfy the given predicate. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 306b6c162f..2a30d45c79 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -407,7 +407,7 @@ trait FlowOps[+Out, +Mat] { * * @see [[#mapAsyncUnordered]] */ - def mapAsync[T](parallelism: Int, f: Out ⇒ Future[T]): Repr[T, Mat] = + def mapAsync[T](parallelism: Int)(f: Out ⇒ Future[T]): Repr[T, Mat] = andThen(MapAsync(parallelism, f.asInstanceOf[Any ⇒ Future[Any]])) /** @@ -436,7 +436,7 @@ trait FlowOps[+Out, +Mat] { * * @see [[#mapAsync]] */ - def mapAsyncUnordered[T](parallelism: Int, f: Out ⇒ Future[T]): Repr[T, Mat] = + def mapAsyncUnordered[T](parallelism: Int)(f: Out ⇒ Future[T]): Repr[T, Mat] = andThen(MapAsyncUnordered(parallelism, f.asInstanceOf[Any ⇒ Future[Any]])) /**