!str #17327 use curried methods for mapAsync

This commit is contained in:
Roland Kuhn 2015-04-28 14:37:58 +02:00
parent 8714d556d5
commit 55e1d71eea
14 changed files with 61 additions and 61 deletions

View file

@ -226,7 +226,7 @@ class FlowGraphDocSpec extends AkkaSpec {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) { val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder implicit builder
fold fold
(fold.inlet, builder.matValue.mapAsync(4, identity).outlet) (fold.inlet, builder.matValue.mapAsync(4)(identity).outlet)
} }
//#flow-graph-matvalue //#flow-graph-matvalue
@ -243,8 +243,8 @@ class FlowGraphDocSpec extends AkkaSpec {
// fold completes // fold completes
// As a result this Source will never emit anything, and its materialited // As a result this Source will never emit anything, and its materialited
// Future will never complete // Future will never complete
builder.matValue.mapAsync(4, identity) ~> fold builder.matValue.mapAsync(4)(identity) ~> fold
builder.matValue.mapAsync(4, identity).outlet builder.matValue.mapAsync(4)(identity).outlet
} }
//#flow-graph-matvalue-cycle //#flow-graph-matvalue-cycle
} }

View file

@ -141,14 +141,14 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#email-addresses-mapAsync //#email-addresses-mapAsync
val emailAddresses: Source[String, Unit] = val emailAddresses: Source[String, Unit] =
authors authors
.mapAsync(4, author => addressSystem.lookupEmail(author.handle)) .mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress } .collect { case Some(emailAddress) => emailAddress }
//#email-addresses-mapAsync //#email-addresses-mapAsync
//#send-emails //#send-emails
val sendEmails: RunnableFlow[Unit] = val sendEmails: RunnableFlow[Unit] =
emailAddresses emailAddresses
.mapAsync(4, address => { .mapAsync(4)(address => {
emailServer.send( emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet")) Email(to = address, title = "Akka", body = "I like your tweet"))
}) })
@ -177,7 +177,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val emailAddresses: Source[String, Unit] = val emailAddresses: Source[String, Unit] =
authors.via( authors.via(
Flow[Author].mapAsync(4, author => addressSystem.lookupEmail(author.handle)) Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.withAttributes(supervisionStrategy(resumingDecider))) .withAttributes(supervisionStrategy(resumingDecider)))
//#email-addresses-mapAsync-supervision //#email-addresses-mapAsync-supervision
} }
@ -193,12 +193,12 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val emailAddresses: Source[String, Unit] = val emailAddresses: Source[String, Unit] =
authors authors
.mapAsyncUnordered(4, author => addressSystem.lookupEmail(author.handle)) .mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress } .collect { case Some(emailAddress) => emailAddress }
val sendEmails: RunnableFlow[Unit] = val sendEmails: RunnableFlow[Unit] =
emailAddresses emailAddresses
.mapAsyncUnordered(4, address => { .mapAsyncUnordered(4)(address => {
emailServer.send( emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet")) Email(to = address, title = "Akka", body = "I like your tweet"))
}) })
@ -225,7 +225,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author) val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val phoneNumbers = val phoneNumbers =
authors.mapAsync(4, author => addressSystem.lookupPhoneNumber(author.handle)) authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo } .collect { case Some(phoneNo) => phoneNo }
//#blocking-mapAsync //#blocking-mapAsync
@ -233,7 +233,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val sendTextMessages: RunnableFlow[Unit] = val sendTextMessages: RunnableFlow[Unit] =
phoneNumbers phoneNumbers
.mapAsync(4, phoneNo => { .mapAsync(4)(phoneNo => {
Future { Future {
smsServer.send( smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet")) TextMessage(to = phoneNo, body = "I like your tweet"))
@ -262,7 +262,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author) val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val phoneNumbers = val phoneNumbers =
authors.mapAsync(4, author => addressSystem.lookupPhoneNumber(author.handle)) authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo } .collect { case Some(phoneNo) => phoneNo }
//#blocking-map //#blocking-map
@ -296,7 +296,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
implicit val timeout = Timeout(3.seconds) implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableFlow[Unit] = val saveTweets: RunnableFlow[Unit] =
akkaTweets akkaTweets
.mapAsync(4, tweet => database ? Save(tweet)) .mapAsync(4)(tweet => database ? Save(tweet))
.to(Sink.ignore) .to(Sink.ignore)
//#save-tweets //#save-tweets
@ -327,7 +327,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem }) .map(elem => { println(s"before: $elem"); elem })
.mapAsync(4, service.convert) .mapAsync(4)(service.convert)
.runForeach(elem => println(s"after: $elem")) .runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsync //#sometimes-slow-mapAsync
@ -359,7 +359,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J")) Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem }) .map(elem => { println(s"before: $elem"); elem })
.mapAsyncUnordered(4, service.convert) .mapAsyncUnordered(4)(service.convert)
.runForeach(elem => println(s"after: $elem")) .runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsyncUnordered //#sometimes-slow-mapAsyncUnordered

View file

@ -79,7 +79,7 @@ class RecipeGlobalRateLimit extends RecipeSpec {
def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, Unit] = { def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, Unit] = {
import akka.pattern.ask import akka.pattern.ask
import akka.util.Timeout import akka.util.Timeout
Flow[T].mapAsync(4, (element: T) => { Flow[T].mapAsync(4)((element: T) => {
import system.dispatcher import system.dispatcher
implicit val triggerTimeout = Timeout(maxAllowedWait) implicit val triggerTimeout = Timeout(maxAllowedWait)
val limiterTriggerFuture = limiter ? Limiter.WantToPass val limiterTriggerFuture = limiter ? Limiter.WantToPass

View file

@ -44,7 +44,7 @@ class RecipeMultiGroupBy extends RecipeSpec {
val result = multiGroups.map { val result = multiGroups.map {
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head) case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head)
}.mapAsync(4, identity).grouped(10).runWith(Sink.head) }.mapAsync(4)(identity).grouped(10).runWith(Sink.head)
Await.result(result, 3.seconds).toSet should be(Set( Await.result(result, 3.seconds).toSet should be(Set(
"1[1: a, 1: b, all: c, all: d, 1: e]", "1[1: a, 1: b, all: c, all: d, 1: e]",

View file

@ -32,7 +32,7 @@ class RecipeReduceByKey extends RecipeSpec {
val counts: Source[(String, Int), Unit] = val counts: Source[(String, Int), Unit] =
countedWords countedWords
.buffer(MaximumDistinctWords, OverflowStrategy.fail) .buffer(MaximumDistinctWords, OverflowStrategy.fail)
.mapAsync(4, identity) .mapAsync(4)(identity)
//#word-count //#word-count
Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
@ -62,7 +62,7 @@ class RecipeReduceByKey extends RecipeSpec {
} }
} }
reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(4, identity) reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(4)(identity)
} }
val wordCounts = words.via(reduceByKey( val wordCounts = words.via(reduceByKey(

View file

@ -103,7 +103,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
options: immutable.Traversable[Inet.SocketOption] = Nil, options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: ServerSettings = ServerSettings(system), settings: ServerSettings = ServerSettings(system),
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] = log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
bindAndHandle(Flow[HttpRequest].mapAsync(1, handler), interface, port, backlog, options, settings, log) bindAndHandle(Flow[HttpRequest].mapAsync(1)(handler), interface, port, backlog, options, settings, log)
/** /**
* The type of the server-side HTTP layer as a stand-alone BidiStage * The type of the server-side HTTP layer as a stand-alone BidiStage
@ -374,7 +374,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
implicit system: ActorSystem, fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = { implicit system: ActorSystem, fm: FlowMaterializer): Flow[(HttpRequest, T), (Try[HttpResponse], T), Unit] = {
// a connection pool can never have more than pipeliningLimit * maxConnections requests in flight at any point // a connection pool can never have more than pipeliningLimit * maxConnections requests in flight at any point
val parallelism = settings.pipeliningLimit * settings.maxConnections val parallelism = settings.pipeliningLimit * settings.maxConnections
Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism, { Flow[(HttpRequest, T)].mapAsyncUnordered(parallelism) {
case (request, userContext) case (request, userContext)
val (effectiveRequest, gatewayFuture) = f(request) val (effectiveRequest, gatewayFuture) = f(request)
val result = Promise[(Try[HttpResponse], T)]() // TODO: simplify to `transformWith` when on Scala 2.12 val result = Promise[(Try[HttpResponse], T)]() // TODO: simplify to `transformWith` when on Scala 2.12
@ -382,7 +382,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
.flatMap(_(effectiveRequest))(fm.executionContext) .flatMap(_(effectiveRequest))(fm.executionContext)
.onComplete(responseTry result.success(responseTry -> userContext))(fm.executionContext) .onComplete(responseTry result.success(responseTry -> userContext))(fm.executionContext)
result.future result.future
}) }
} }
private def hostHeader(host: String, port: Int, scheme: String) = headers.Host(host, Uri.normalizePort(port, scheme)) private def hostHeader(host: String, port: Int, scheme: String) = headers.Host(host, Uri.normalizePort(port, scheme))
@ -434,7 +434,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
* Returns the materialization result of the underlying flow materialization. * Returns the materialization result of the underlying flow materialization.
*/ */
def handleWithAsyncHandler(handler: HttpRequest Future[HttpResponse])(implicit fm: FlowMaterializer): Unit = def handleWithAsyncHandler(handler: HttpRequest Future[HttpResponse])(implicit fm: FlowMaterializer): Unit =
handleWith(Flow[HttpRequest].mapAsync(1, handler)) handleWith(Flow[HttpRequest].mapAsync(1)(handler))
} }
/** /**

View file

@ -28,7 +28,7 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka
.take(N) .take(N)
.map(id HttpRequest(uri = s"/r$id")) .map(id HttpRequest(uri = s"/r$id"))
.via(Http().outgoingConnection(serverHostName, serverPort)) .via(Http().outgoingConnection(serverHostName, serverPort))
.mapAsync(4, _.entity.toStrict(1.second)) .mapAsync(4)(_.entity.toStrict(1.second))
.map { r val s = r.data.utf8String; log.debug(s); s.toInt } .map { r val s = r.data.utf8String; log.debug(s); s.toInt }
.runFold(0)(_ + _) .runFold(0)(_ + _)
@ -60,11 +60,11 @@ class HighLevelOutgoingConnectionSpec extends AkkaSpec("akka.loggers = []\n akka
.take(N) .take(N)
.map(id HttpRequest(uri = s"/r$id")) .map(id HttpRequest(uri = s"/r$id"))
.via(doubleConnection) .via(doubleConnection)
.mapAsync(4, _.entity.toStrict(1.second)) .mapAsync(4)(_.entity.toStrict(1.second))
.map { r val s = r.data.utf8String; log.debug(s); s.toInt } .map { r val s = r.data.utf8String; log.debug(s); s.toInt }
.runFold(0)(_ + _) .runFold(0)(_ + _)
Await.result(result, 10.seconds) shouldEqual C * N * (N + 1) / 2 Await.result(result, 10.seconds) shouldEqual C * N * (N + 1) / 2
} }
} }
} }

View file

@ -262,7 +262,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
case SessionBytes(s, bytes) bytes.map(b SessionBytes(s, ByteString(b))) case SessionBytes(s, bytes) bytes.map(b SessionBytes(s, ByteString(b)))
} }
.take(5) .take(5)
.mapAsync(5, x later(500.millis, system.scheduler)(Future.successful(x))) .mapAsync(5)(x later(500.millis, system.scheduler)(Future.successful(x)))
.via(super.flow) .via(super.flow)
override def rightClosing = IgnoreCancel override def rightClosing = IgnoreCancel
@ -279,7 +279,7 @@ class TlsSpec extends AkkaSpec("akka.loglevel=INFO\nakka.actor.debug.receive=off
case SessionBytes(s, bytes) bytes.map(b SessionBytes(s, ByteString(b))) case SessionBytes(s, bytes) bytes.map(b SessionBytes(s, ByteString(b)))
} }
.take(5) .take(5)
.mapAsync(5, x later(500.millis, system.scheduler)(Future.successful(x))) .mapAsync(5)(x later(500.millis, system.scheduler)(Future.successful(x)))
.via(super.flow) .via(super.flow)
override def rightClosing = IgnoreBoth override def rightClosing = IgnoreBoth

View file

@ -69,7 +69,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"produce future elements" in assertAllStagesStopped { "produce future elements" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 3).mapAsync(4, n Future(n)).runWith(Sink(c)) val p = Source(1 to 3).mapAsync(4)(n Future(n)).runWith(Sink(c))
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(2) sub.request(2)
c.expectNext(1) c.expectNext(1)
@ -83,7 +83,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"produce future elements in order" in { "produce future elements in order" in {
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 50).mapAsync(4, n Future { val p = Source(1 to 50).mapAsync(4)(n Future {
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10))
n n
}).to(Sink(c)).run() }).to(Sink(c)).run()
@ -97,7 +97,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val probe = TestProbe() val probe = TestProbe()
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 20).mapAsync(8, n Future { val p = Source(1 to 20).mapAsync(8)(n Future {
probe.ref ! n probe.ref ! n
n n
}).to(Sink(c)).run() }).to(Sink(c)).run()
@ -123,7 +123,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val latch = TestLatch(1) val latch = TestLatch(1)
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsync(4, n Future { val p = Source(1 to 5).mapAsync(4)(n Future {
if (n == 3) throw new RuntimeException("err1") with NoStackTrace if (n == 3) throw new RuntimeException("err1") with NoStackTrace
else { else {
Await.ready(latch, 10.seconds) Await.ready(latch, 10.seconds)
@ -140,7 +140,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val latch = TestLatch(1) val latch = TestLatch(1)
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsync(4, n val p = Source(1 to 5).mapAsync(4)(n
if (n == 3) throw new RuntimeException("err2") with NoStackTrace if (n == 3) throw new RuntimeException("err2") with NoStackTrace
else { else {
Future { Future {
@ -159,7 +159,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5) val p = Source(1 to 5)
.mapAsync(4, n Future { .mapAsync(4)(n Future {
if (n == 3) throw new RuntimeException("err3") with NoStackTrace if (n == 3) throw new RuntimeException("err3") with NoStackTrace
else n else n
}) })
@ -173,7 +173,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"finish after future failure" in assertAllStagesStopped { "finish after future failure" in assertAllStagesStopped {
import system.dispatcher import system.dispatcher
Await.result(Source(1 to 3).mapAsync(1, n Future { Await.result(Source(1 to 3).mapAsync(1)(n Future {
if (n == 3) throw new RuntimeException("err3b") with NoStackTrace if (n == 3) throw new RuntimeException("err3b") with NoStackTrace
else n else n
}).withAttributes(supervisionStrategy(resumingDecider)) }).withAttributes(supervisionStrategy(resumingDecider))
@ -185,7 +185,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5) val p = Source(1 to 5)
.mapAsync(4, n .mapAsync(4)(n
if (n == 3) throw new RuntimeException("err4") with NoStackTrace if (n == 3) throw new RuntimeException("err4") with NoStackTrace
else Future(n)) else Future(n))
.withAttributes(supervisionStrategy(resumingDecider)) .withAttributes(supervisionStrategy(resumingDecider))
@ -198,7 +198,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"signal NPE when future is completed with null" in { "signal NPE when future is completed with null" in {
val c = TestSubscriber.manualProbe[String]() val c = TestSubscriber.manualProbe[String]()
val p = Source(List("a", "b")).mapAsync(4, elem Future.successful(null)).to(Sink(c)).run() val p = Source(List("a", "b")).mapAsync(4)(elem Future.successful(null)).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
@ -207,7 +207,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"resume when future is completed with null" in { "resume when future is completed with null" in {
val c = TestSubscriber.manualProbe[String]() val c = TestSubscriber.manualProbe[String]()
val p = Source(List("a", "b", "c")) val p = Source(List("a", "b", "c"))
.mapAsync(4, elem if (elem == "b") Future.successful(null) else Future.successful(elem)) .mapAsync(4)(elem if (elem == "b") Future.successful(null) else Future.successful(elem))
.withAttributes(supervisionStrategy(resumingDecider)) .withAttributes(supervisionStrategy(resumingDecider))
.to(Sink(c)).run() .to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
@ -220,7 +220,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val pub = TestPublisher.manualProbe[Int]() val pub = TestPublisher.manualProbe[Int]()
val sub = TestSubscriber.manualProbe[Int]() val sub = TestSubscriber.manualProbe[Int]()
Source(pub).mapAsync(4, Future.successful).runWith(Sink(sub)) Source(pub).mapAsync(4)(Future.successful).runWith(Sink(sub))
val upstream = pub.expectSubscription() val upstream = pub.expectSubscription()
upstream.expectRequest() upstream.expectRequest()

View file

@ -28,7 +28,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val latch = (1 to 4).map(_ -> TestLatch(1)).toMap val latch = (1 to 4).map(_ -> TestLatch(1)).toMap
val p = Source(1 to 4).mapAsyncUnordered(4, n Future { val p = Source(1 to 4).mapAsyncUnordered(4)(n Future {
Await.ready(latch(n), 5.seconds) Await.ready(latch(n), 5.seconds)
n n
}).to(Sink(c)).run() }).to(Sink(c)).run()
@ -49,7 +49,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val probe = TestProbe() val probe = TestProbe()
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 20).mapAsyncUnordered(4, n Future { val p = Source(1 to 20).mapAsyncUnordered(4)(n Future {
probe.ref ! n probe.ref ! n
n n
}).to(Sink(c)).run() }).to(Sink(c)).run()
@ -76,7 +76,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val latch = TestLatch(1) val latch = TestLatch(1)
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsyncUnordered(4, n Future { val p = Source(1 to 5).mapAsyncUnordered(4)(n Future {
if (n == 3) throw new RuntimeException("err1") with NoStackTrace if (n == 3) throw new RuntimeException("err1") with NoStackTrace
else { else {
Await.ready(latch, 10.seconds) Await.ready(latch, 10.seconds)
@ -93,7 +93,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val latch = TestLatch(1) val latch = TestLatch(1)
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsyncUnordered(4, n val p = Source(1 to 5).mapAsyncUnordered(4)(n
if (n == 3) throw new RuntimeException("err2") with NoStackTrace if (n == 3) throw new RuntimeException("err2") with NoStackTrace
else { else {
Future { Future {
@ -111,7 +111,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume after future failure" in { "resume after future failure" in {
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
Source(1 to 5) Source(1 to 5)
.mapAsyncUnordered(4, n Future { .mapAsyncUnordered(4)(n Future {
if (n == 3) throw new RuntimeException("err3") with NoStackTrace if (n == 3) throw new RuntimeException("err3") with NoStackTrace
else n else n
}) })
@ -124,7 +124,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"finish after future failure" in assertAllStagesStopped { "finish after future failure" in assertAllStagesStopped {
import system.dispatcher import system.dispatcher
Await.result(Source(1 to 3).mapAsyncUnordered(1, n Future { Await.result(Source(1 to 3).mapAsyncUnordered(1)(n Future {
if (n == 3) throw new RuntimeException("err3b") with NoStackTrace if (n == 3) throw new RuntimeException("err3b") with NoStackTrace
else n else n
}).withAttributes(supervisionStrategy(resumingDecider)) }).withAttributes(supervisionStrategy(resumingDecider))
@ -135,7 +135,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume when mapAsyncUnordered throws" in { "resume when mapAsyncUnordered throws" in {
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
Source(1 to 5) Source(1 to 5)
.mapAsyncUnordered(4, n .mapAsyncUnordered(4)(n
if (n == 3) throw new RuntimeException("err4") with NoStackTrace if (n == 3) throw new RuntimeException("err4") with NoStackTrace
else Future(n)) else Future(n))
.withAttributes(supervisionStrategy(resumingDecider)) .withAttributes(supervisionStrategy(resumingDecider))
@ -147,7 +147,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"signal NPE when future is completed with null" in { "signal NPE when future is completed with null" in {
val c = TestSubscriber.manualProbe[String]() val c = TestSubscriber.manualProbe[String]()
val p = Source(List("a", "b")).mapAsyncUnordered(4, elem Future.successful(null)).to(Sink(c)).run() val p = Source(List("a", "b")).mapAsyncUnordered(4)(elem Future.successful(null)).to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
sub.request(10) sub.request(10)
c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg) c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
@ -156,7 +156,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume when future is completed with null" in { "resume when future is completed with null" in {
val c = TestSubscriber.manualProbe[String]() val c = TestSubscriber.manualProbe[String]()
val p = Source(List("a", "b", "c")) val p = Source(List("a", "b", "c"))
.mapAsyncUnordered(4, elem if (elem == "b") Future.successful(null) else Future.successful(elem)) .mapAsyncUnordered(4)(elem if (elem == "b") Future.successful(null) else Future.successful(elem))
.withAttributes(supervisionStrategy(resumingDecider)) .withAttributes(supervisionStrategy(resumingDecider))
.to(Sink(c)).run() .to(Sink(c)).run()
val sub = c.expectSubscription() val sub = c.expectSubscription()
@ -169,7 +169,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val pub = TestPublisher.manualProbe[Int]() val pub = TestPublisher.manualProbe[Int]()
val sub = TestSubscriber.manualProbe[Int]() val sub = TestSubscriber.manualProbe[Int]()
Source(pub).mapAsyncUnordered(4, Future.successful).runWith(Sink(sub)) Source(pub).mapAsyncUnordered(4)(Future.successful).runWith(Sink(sub))
val upstream = pub.expectSubscription() val upstream = pub.expectSubscription()
upstream.expectRequest() upstream.expectRequest()

View file

@ -28,7 +28,7 @@ class GraphMatValueSpec extends AkkaSpec {
val f = FlowGraph.closed(foldSink) { implicit b val f = FlowGraph.closed(foldSink) { implicit b
fold fold
Source(1 to 10) ~> fold Source(1 to 10) ~> fold
b.matValue.mapAsync(4, identity) ~> Sink(sub) b.matValue.mapAsync(4)(identity) ~> Sink(sub)
}.run() }.run()
val r1 = Await.result(f, 3.seconds) val r1 = Await.result(f, 3.seconds)
@ -45,8 +45,8 @@ class GraphMatValueSpec extends AkkaSpec {
fold fold
val zip = b.add(ZipWith[Int, Int, Int](_ + _)) val zip = b.add(ZipWith[Int, Int, Int](_ + _))
Source(1 to 10) ~> fold Source(1 to 10) ~> fold
b.matValue.mapAsync(4, identity) ~> zip.in0 b.matValue.mapAsync(4)(identity) ~> zip.in0
b.matValue.mapAsync(4, identity) ~> zip.in1 b.matValue.mapAsync(4)(identity) ~> zip.in1
zip.out ~> Sink(sub) zip.out ~> Sink(sub)
}.run() }.run()
@ -66,13 +66,13 @@ class GraphMatValueSpec extends AkkaSpec {
} }
"allow exposing the materialized value as port" in { "allow exposing the materialized value as port" in {
val (f1, f2) = foldFeedbackSource.mapAsync(4, identity).map(_ + 100).toMat(Sink.head)(Keep.both).run() val (f1, f2) = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run()
Await.result(f1, 3.seconds) should ===(55) Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(155) Await.result(f2, 3.seconds) should ===(155)
} }
"allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in { "allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in {
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4, identity).map(_ + 100).mapMaterialized((_) ()) val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4)(identity).map(_ + 100).mapMaterialized((_) ())
Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155) Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155)
} }
@ -81,8 +81,8 @@ class GraphMatValueSpec extends AkkaSpec {
(s1, s2) (s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _)) val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.outlet.mapAsync(4, identity) ~> zip.in0 s1.outlet.mapAsync(4)(identity) ~> zip.in0
s2.outlet.mapAsync(4, identity).map(_ * 100) ~> zip.in1 s2.outlet.mapAsync(4)(identity).map(_ * 100) ~> zip.in1
zip.out zip.out
} }

View file

@ -216,7 +216,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* @see [[#mapAsyncUnordered]] * @see [[#mapAsyncUnordered]]
*/ */
def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] = def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsync(parallelism, f.apply)) new Flow(delegate.mapAsync(parallelism)(f.apply))
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements
@ -245,7 +245,7 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
* @see [[#mapAsync]] * @see [[#mapAsync]]
*/ */
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] = def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsyncUnordered(parallelism, f.apply)) new Flow(delegate.mapAsyncUnordered(parallelism)(f.apply))
/** /**
* Only pass on those elements that satisfy the given predicate. * Only pass on those elements that satisfy the given predicate.

View file

@ -328,7 +328,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
* @see [[#mapAsyncUnordered]] * @see [[#mapAsyncUnordered]]
*/ */
def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] = def mapAsync[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsync(parallelism, f.apply)) new Source(delegate.mapAsync(parallelism)(f.apply))
/** /**
* Transform this stream by applying the given function to each of the elements * Transform this stream by applying the given function to each of the elements
@ -341,7 +341,7 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
* @see [[#mapAsync]] * @see [[#mapAsync]]
*/ */
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] = def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsyncUnordered(parallelism, f.apply)) new Source(delegate.mapAsyncUnordered(parallelism)(f.apply))
/** /**
* Only pass on those elements that satisfy the given predicate. * Only pass on those elements that satisfy the given predicate.

View file

@ -407,7 +407,7 @@ trait FlowOps[+Out, +Mat] {
* *
* @see [[#mapAsyncUnordered]] * @see [[#mapAsyncUnordered]]
*/ */
def mapAsync[T](parallelism: Int, f: Out Future[T]): Repr[T, Mat] = def mapAsync[T](parallelism: Int)(f: Out Future[T]): Repr[T, Mat] =
andThen(MapAsync(parallelism, f.asInstanceOf[Any Future[Any]])) andThen(MapAsync(parallelism, f.asInstanceOf[Any Future[Any]]))
/** /**
@ -436,7 +436,7 @@ trait FlowOps[+Out, +Mat] {
* *
* @see [[#mapAsync]] * @see [[#mapAsync]]
*/ */
def mapAsyncUnordered[T](parallelism: Int, f: Out Future[T]): Repr[T, Mat] = def mapAsyncUnordered[T](parallelism: Int)(f: Out Future[T]): Repr[T, Mat] =
andThen(MapAsyncUnordered(parallelism, f.asInstanceOf[Any Future[Any]])) andThen(MapAsyncUnordered(parallelism, f.asInstanceOf[Any Future[Any]]))
/** /**