!str #17090 add AsyncStage

This commit is contained in:
Roland Kuhn 2015-04-09 22:28:16 +02:00
parent ad3829cd74
commit 4c623fade7
66 changed files with 981 additions and 787 deletions

View file

@ -81,7 +81,7 @@ object BidiFlowDocSpec {
if (stash.isEmpty) ctx.finish()
else ctx.absorbTermination() // we still have bytes to emit
private def run(ctx: Context[ByteString]): Directive =
private def run(ctx: Context[ByteString]): SyncDirective =
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.length < 4) pullOrFinish(ctx)

View file

@ -225,7 +225,7 @@ class FlowGraphDocSpec extends AkkaSpec {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder
fold
(fold.inlet, builder.matValue.mapAsync(identity).outlet)
(fold.inlet, builder.matValue.mapAsync(4, identity).outlet)
}
//#flow-graph-matvalue
@ -242,8 +242,8 @@ class FlowGraphDocSpec extends AkkaSpec {
// fold completes
// As a result this Source will never emit anything, and its materialited
// Future will never complete
builder.matValue.mapAsync(identity) ~> fold
builder.matValue.mapAsync(identity).outlet
builder.matValue.mapAsync(4, identity) ~> fold
builder.matValue.mapAsync(4, identity).outlet
}
//#flow-graph-matvalue-cycle
}

View file

@ -23,21 +23,21 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#one-to-one
class Map[A, B](f: A => B) extends PushPullStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): Directive =
override def onPush(elem: A, ctx: Context[B]): SyncDirective =
ctx.push(f(elem))
override def onPull(ctx: Context[B]): Directive =
override def onPull(ctx: Context[B]): SyncDirective =
ctx.pull()
}
//#one-to-one
//#many-to-one
class Filter[A](p: A => Boolean) extends PushPullStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
override def onPull(ctx: Context[A]): Directive =
override def onPull(ctx: Context[A]): SyncDirective =
ctx.pull()
}
//#many-to-one
@ -47,13 +47,13 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
private var lastElem: A = _
private var oneLeft = false
override def onPush(elem: A, ctx: Context[A]): Directive = {
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
lastElem = elem
oneLeft = true
ctx.push(elem)
}
override def onPull(ctx: Context[A]): Directive =
override def onPull(ctx: Context[A]): SyncDirective =
if (!ctx.isFinishing) {
// the main pulling logic is below as it is demonstrated on the illustration
if (oneLeft) {
@ -95,12 +95,12 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#pushstage
class Map[A, B](f: A => B) extends PushStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): Directive =
override def onPush(elem: A, ctx: Context[B]): SyncDirective =
ctx.push(f(elem))
}
class Filter[A](p: A => Boolean) extends PushStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
}
@ -112,7 +112,7 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#doubler-stateful
class Duplicator[A]() extends StatefulStage[A, A] {
override def initial: StageState[A, A] = new StageState[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
emit(List(elem, elem).iterator, ctx)
}
}

View file

@ -142,17 +142,17 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#email-addresses-mapAsync
val emailAddresses: Source[String, Unit] =
authors
.mapAsync(author => addressSystem.lookupEmail(author.handle))
.mapAsync(4, author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
//#email-addresses-mapAsync
//#send-emails
val sendEmails: RunnableFlow[Unit] =
emailAddresses
.mapAsync { address =>
.mapAsync(4, address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
}
})
.to(Sink.ignore)
sendEmails.run()
@ -178,7 +178,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val emailAddresses: Source[String, Unit] =
authors.section(supervisionStrategy(resumingDecider)) {
_.mapAsync(author => addressSystem.lookupEmail(author.handle))
_.mapAsync(4, author => addressSystem.lookupEmail(author.handle))
}
//#email-addresses-mapAsync-supervision
}
@ -194,15 +194,15 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val emailAddresses: Source[String, Unit] =
authors
.mapAsyncUnordered(author => addressSystem.lookupEmail(author.handle))
.mapAsyncUnordered(4, author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
val sendEmails: RunnableFlow[Unit] =
emailAddresses
.mapAsyncUnordered { address =>
.mapAsyncUnordered(4, address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
}
})
.to(Sink.ignore)
sendEmails.run()
@ -226,7 +226,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val phoneNumbers =
authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle))
authors.mapAsync(4, author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
//#blocking-mapAsync
@ -234,12 +234,12 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val sendTextMessages: RunnableFlow[Unit] =
phoneNumbers
.mapAsync { phoneNo =>
.mapAsync(4, phoneNo => {
Future {
smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet"))
}(blockingExecutionContext)
}
})
.to(Sink.ignore)
sendTextMessages.run()
@ -263,7 +263,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val phoneNumbers =
authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle))
authors.mapAsync(4, author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
//#blocking-map
@ -299,7 +299,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableFlow[Unit] =
akkaTweets
.mapAsync(tweet => database ? Save(tweet))
.mapAsync(4, tweet => database ? Save(tweet))
.to(Sink.ignore)
//#save-tweets
@ -330,7 +330,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsync(service.convert)
.mapAsync(4, service.convert)
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsync
@ -362,7 +362,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsyncUnordered(service.convert)
.mapAsyncUnordered(4, service.convert)
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsyncUnordered

View file

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{ PushStage, Directive, Context }
import akka.stream.stage.{ PushStage, SyncDirective, Context }
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestProbe
import akka.util.ByteString
@ -89,7 +89,7 @@ class StreamTcpDocSpec extends AkkaSpec {
// server logic, parses incoming commands
val commandParser = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]): Directive = {
override def onPush(elem: String, ctx: Context[String]): SyncDirective = {
elem match {
case "BYE" ctx.finish()
case _ ctx.push(elem + "!")
@ -136,7 +136,7 @@ class StreamTcpDocSpec extends AkkaSpec {
val connection = StreamTcp().outgoingConnection(localhost)
val replParser = new PushStage[String, ByteString] {
override def onPush(elem: String, ctx: Context[ByteString]): Directive = {
override def onPush(elem: String, ctx: Context[ByteString]): SyncDirective = {
elem match {
case "q" ctx.pushAndFinish(ByteString("BYE\n"))
case _ ctx.push(ByteString(s"$elem\n"))

View file

@ -20,14 +20,14 @@ class RecipeByteStrings extends RecipeSpec {
class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] {
private var buffer = ByteString.empty
override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
buffer ++= elem
emitChunkOrPull(ctx)
}
override def onPull(ctx: Context[ByteString]): Directive = emitChunkOrPull(ctx)
override def onPull(ctx: Context[ByteString]): SyncDirective = emitChunkOrPull(ctx)
private def emitChunkOrPull(ctx: Context[ByteString]): Directive = {
private def emitChunkOrPull(ctx: Context[ByteString]): SyncDirective = {
if (buffer.isEmpty) ctx.pull()
else {
val (emit, nextBuffer) = buffer.splitAt(chunkSize)
@ -57,7 +57,7 @@ class RecipeByteStrings extends RecipeSpec {
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] {
private var count = 0
override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
count += chunk.size
if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes"))
else ctx.push(chunk)

View file

@ -23,12 +23,12 @@ class RecipeDigest extends RecipeSpec {
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] {
val digest = MessageDigest.getInstance(algorithm)
override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
digest.update(chunk.toArray)
ctx.pull()
}
override def onPull(ctx: Context[ByteString]): Directive = {
override def onPull(ctx: Context[ByteString]): SyncDirective = {
if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest()))
else ctx.pull()
}

View file

@ -79,12 +79,12 @@ class RecipeGlobalRateLimit extends RecipeSpec {
def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, Unit] = {
import akka.pattern.ask
import akka.util.Timeout
Flow[T].mapAsync { (element: T) =>
Flow[T].mapAsync(4, (element: T) => {
import system.dispatcher
implicit val triggerTimeout = Timeout(maxAllowedWait)
val limiterTriggerFuture = limiter ? Limiter.WantToPass
limiterTriggerFuture.map((_) => element)
}
})
}
//#global-limiter-flow

View file

@ -33,12 +33,12 @@ object HoldOps {
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
currentValue = elem
waitingFirstValue = false
if (ctx.isHolding) ctx.pushAndPull(currentValue)
if (ctx.isHoldingDownstream) ctx.pushAndPull(currentValue)
else ctx.pull()
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (waitingFirstValue) ctx.hold()
if (waitingFirstValue) ctx.holdDownstream()
else ctx.push(currentValue)
}

View file

@ -30,7 +30,7 @@ class RecipeLoggingElements extends RecipeSpec {
class LoggingStage[T] extends PushStage[T, T] {
private val log = Logging(system, "loggingName")
override def onPush(elem: T, ctx: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
log.debug("Element flowing through: {}", elem)
ctx.push(elem)
}

View file

@ -44,7 +44,7 @@ class RecipeMultiGroupBy extends RecipeSpec {
val result = multiGroups.map {
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head)
}.mapAsync(identity).grouped(10).runWith(Sink.head)
}.mapAsync(4, identity).grouped(10).runWith(Sink.head)
Await.result(result, 3.seconds).toSet should be(Set(
"1[1: a, 1: b, all: c, all: d, 1: e]",

View file

@ -48,7 +48,7 @@ object RecipeParseLines {
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
override def onPush(chunk: ByteString, ctx: Context[String]): SyncDirective = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +

View file

@ -32,7 +32,7 @@ class RecipeReduceByKey extends RecipeSpec {
val counts: Source[(String, Int), Unit] =
countedWords
.buffer(MaximumDistinctWords, OverflowStrategy.fail)
.mapAsync(identity)
.mapAsync(4, identity)
//#word-count
Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
@ -62,7 +62,7 @@ class RecipeReduceByKey extends RecipeSpec {
}
}
reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(identity)
reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(4, identity)
}
val wordCounts = words.via(reduceByKey(

View file

@ -124,9 +124,10 @@ Finally, sending the emails:
``mapAsync`` is applying the given function that is calling out to the external service to
each of the elements as they pass through this processing step. The function returns a :class:`Future`
and the value of that future will be emitted downstreams. As many futures as requested elements by
downstream may run in parallel and may complete in any order, but the elements that
are emitted downstream are in the same order as received from upstream.
and the value of that future will be emitted downstreams. The number of Futures
that shall run in parallel is given as the first argument to ``mapAsync``.
These Futures may complete in any order, but the elements that are emitted
downstream are in the same order as received from upstream.
That means that back-pressure works as expected. For example if the ``emailServer.send``
is the bottleneck it will limit the rate at which incoming tweets are retrieved and