Merge pull request #17105 from akka/wip-async-stage-∂π

add async stage
This commit is contained in:
Roland Kuhn 2015-04-10 10:52:01 +02:00
commit 8f47b6dfcc
66 changed files with 981 additions and 787 deletions

View file

@ -130,9 +130,10 @@ Finally, sending the emails:
``mapAsync`` is applying the given function that is calling out to the external service to
each of the elements as they pass through this processing step. The function returns a :class:`Future`
and the value of that future will be emitted downstreams. As many futures as requested elements by
downstream may run in parallel and may complete in any order, but the elements that
are emitted downstream are in the same order as received from upstream.
and the value of that future will be emitted downstreams. The number of Futures
that shall run in parallel is given as the first argument to ``mapAsync``.
These Futures may complete in any order, but the elements that are emitted
downstream are in the same order as received from upstream.
That means that back-pressure works as expected. For example if the ``emailServer.send``
is the bottleneck it will limit the rate at which incoming tweets are retrieved and

View file

@ -81,7 +81,7 @@ object BidiFlowDocSpec {
if (stash.isEmpty) ctx.finish()
else ctx.absorbTermination() // we still have bytes to emit
private def run(ctx: Context[ByteString]): Directive =
private def run(ctx: Context[ByteString]): SyncDirective =
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.length < 4) pullOrFinish(ctx)

View file

@ -225,7 +225,7 @@ class FlowGraphDocSpec extends AkkaSpec {
val foldFlow: Flow[Int, Int, Future[Int]] = Flow(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder
fold
(fold.inlet, builder.matValue.mapAsync(identity).outlet)
(fold.inlet, builder.matValue.mapAsync(4, identity).outlet)
}
//#flow-graph-matvalue
@ -242,8 +242,8 @@ class FlowGraphDocSpec extends AkkaSpec {
// fold completes
// As a result this Source will never emit anything, and its materialited
// Future will never complete
builder.matValue.mapAsync(identity) ~> fold
builder.matValue.mapAsync(identity).outlet
builder.matValue.mapAsync(4, identity) ~> fold
builder.matValue.mapAsync(4, identity).outlet
}
//#flow-graph-matvalue-cycle
}

View file

@ -23,21 +23,21 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#one-to-one
class Map[A, B](f: A => B) extends PushPullStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): Directive =
override def onPush(elem: A, ctx: Context[B]): SyncDirective =
ctx.push(f(elem))
override def onPull(ctx: Context[B]): Directive =
override def onPull(ctx: Context[B]): SyncDirective =
ctx.pull()
}
//#one-to-one
//#many-to-one
class Filter[A](p: A => Boolean) extends PushPullStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
override def onPull(ctx: Context[A]): Directive =
override def onPull(ctx: Context[A]): SyncDirective =
ctx.pull()
}
//#many-to-one
@ -47,13 +47,13 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
private var lastElem: A = _
private var oneLeft = false
override def onPush(elem: A, ctx: Context[A]): Directive = {
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
lastElem = elem
oneLeft = true
ctx.push(elem)
}
override def onPull(ctx: Context[A]): Directive =
override def onPull(ctx: Context[A]): SyncDirective =
if (!ctx.isFinishing) {
// the main pulling logic is below as it is demonstrated on the illustration
if (oneLeft) {
@ -95,12 +95,12 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#pushstage
class Map[A, B](f: A => B) extends PushStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): Directive =
override def onPush(elem: A, ctx: Context[B]): SyncDirective =
ctx.push(f(elem))
}
class Filter[A](p: A => Boolean) extends PushStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
}
@ -112,7 +112,7 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#doubler-stateful
class Duplicator[A]() extends StatefulStage[A, A] {
override def initial: StageState[A, A] = new StageState[A, A] {
override def onPush(elem: A, ctx: Context[A]): Directive =
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
emit(List(elem, elem).iterator, ctx)
}
}

View file

@ -142,17 +142,17 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#email-addresses-mapAsync
val emailAddresses: Source[String, Unit] =
authors
.mapAsync(author => addressSystem.lookupEmail(author.handle))
.mapAsync(4, author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
//#email-addresses-mapAsync
//#send-emails
val sendEmails: RunnableFlow[Unit] =
emailAddresses
.mapAsync { address =>
.mapAsync(4, address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
}
})
.to(Sink.ignore)
sendEmails.run()
@ -178,7 +178,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val emailAddresses: Source[String, Unit] =
authors.section(supervisionStrategy(resumingDecider)) {
_.mapAsync(author => addressSystem.lookupEmail(author.handle))
_.mapAsync(4, author => addressSystem.lookupEmail(author.handle))
}
//#email-addresses-mapAsync-supervision
}
@ -194,15 +194,15 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val emailAddresses: Source[String, Unit] =
authors
.mapAsyncUnordered(author => addressSystem.lookupEmail(author.handle))
.mapAsyncUnordered(4, author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
val sendEmails: RunnableFlow[Unit] =
emailAddresses
.mapAsyncUnordered { address =>
.mapAsyncUnordered(4, address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
}
})
.to(Sink.ignore)
sendEmails.run()
@ -226,7 +226,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val phoneNumbers =
authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle))
authors.mapAsync(4, author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
//#blocking-mapAsync
@ -234,12 +234,12 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val sendTextMessages: RunnableFlow[Unit] =
phoneNumbers
.mapAsync { phoneNo =>
.mapAsync(4, phoneNo => {
Future {
smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet"))
}(blockingExecutionContext)
}
})
.to(Sink.ignore)
sendTextMessages.run()
@ -263,7 +263,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val phoneNumbers =
authors.mapAsync(author => addressSystem.lookupPhoneNumber(author.handle))
authors.mapAsync(4, author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
//#blocking-map
@ -299,7 +299,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableFlow[Unit] =
akkaTweets
.mapAsync(tweet => database ? Save(tweet))
.mapAsync(4, tweet => database ? Save(tweet))
.to(Sink.ignore)
//#save-tweets
@ -330,7 +330,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsync(service.convert)
.mapAsync(4, service.convert)
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsync
@ -362,7 +362,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsyncUnordered(service.convert)
.mapAsyncUnordered(4, service.convert)
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsyncUnordered

View file

@ -8,7 +8,7 @@ import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.stage.{ PushStage, Directive, Context }
import akka.stream.stage.{ PushStage, SyncDirective, Context }
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestProbe
import akka.util.ByteString
@ -67,7 +67,7 @@ class StreamTcpDocSpec extends AkkaSpec {
// server logic, parses incoming commands
val commandParser = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]): Directive = {
override def onPush(elem: String, ctx: Context[String]): SyncDirective = {
elem match {
case "BYE" ctx.finish()
case _ ctx.push(elem + "!")
@ -114,7 +114,7 @@ class StreamTcpDocSpec extends AkkaSpec {
val connection = StreamTcp().outgoingConnection(localhost)
val replParser = new PushStage[String, ByteString] {
override def onPush(elem: String, ctx: Context[ByteString]): Directive = {
override def onPush(elem: String, ctx: Context[ByteString]): SyncDirective = {
elem match {
case "q" ctx.pushAndFinish(ByteString("BYE\n"))
case _ ctx.push(ByteString(s"$elem\n"))

View file

@ -20,14 +20,14 @@ class RecipeByteStrings extends RecipeSpec {
class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] {
private var buffer = ByteString.empty
override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
buffer ++= elem
emitChunkOrPull(ctx)
}
override def onPull(ctx: Context[ByteString]): Directive = emitChunkOrPull(ctx)
override def onPull(ctx: Context[ByteString]): SyncDirective = emitChunkOrPull(ctx)
private def emitChunkOrPull(ctx: Context[ByteString]): Directive = {
private def emitChunkOrPull(ctx: Context[ByteString]): SyncDirective = {
if (buffer.isEmpty) ctx.pull()
else {
val (emit, nextBuffer) = buffer.splitAt(chunkSize)
@ -57,7 +57,7 @@ class RecipeByteStrings extends RecipeSpec {
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] {
private var count = 0
override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
count += chunk.size
if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes"))
else ctx.push(chunk)

View file

@ -23,12 +23,12 @@ class RecipeDigest extends RecipeSpec {
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] {
val digest = MessageDigest.getInstance(algorithm)
override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
digest.update(chunk.toArray)
ctx.pull()
}
override def onPull(ctx: Context[ByteString]): Directive = {
override def onPull(ctx: Context[ByteString]): SyncDirective = {
if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest()))
else ctx.pull()
}

View file

@ -79,12 +79,12 @@ class RecipeGlobalRateLimit extends RecipeSpec {
def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, Unit] = {
import akka.pattern.ask
import akka.util.Timeout
Flow[T].mapAsync { (element: T) =>
Flow[T].mapAsync(4, (element: T) => {
import system.dispatcher
implicit val triggerTimeout = Timeout(maxAllowedWait)
val limiterTriggerFuture = limiter ? Limiter.WantToPass
limiterTriggerFuture.map((_) => element)
}
})
}
//#global-limiter-flow

View file

@ -33,12 +33,12 @@ object HoldOps {
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
currentValue = elem
waitingFirstValue = false
if (ctx.isHolding) ctx.pushAndPull(currentValue)
if (ctx.isHoldingDownstream) ctx.pushAndPull(currentValue)
else ctx.pull()
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (waitingFirstValue) ctx.hold()
if (waitingFirstValue) ctx.holdDownstream()
else ctx.push(currentValue)
}

View file

@ -30,7 +30,7 @@ class RecipeLoggingElements extends RecipeSpec {
class LoggingStage[T] extends PushStage[T, T] {
private val log = Logging(system, "loggingName")
override def onPush(elem: T, ctx: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
log.debug("Element flowing through: {}", elem)
ctx.push(elem)
}

View file

@ -44,7 +44,7 @@ class RecipeMultiGroupBy extends RecipeSpec {
val result = multiGroups.map {
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head)
}.mapAsync(identity).grouped(10).runWith(Sink.head)
}.mapAsync(4, identity).grouped(10).runWith(Sink.head)
Await.result(result, 3.seconds).toSet should be(Set(
"1[1: a, 1: b, all: c, all: d, 1: e]",

View file

@ -48,7 +48,7 @@ object RecipeParseLines {
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
override def onPush(chunk: ByteString, ctx: Context[String]): SyncDirective = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +

View file

@ -32,7 +32,7 @@ class RecipeReduceByKey extends RecipeSpec {
val counts: Source[(String, Int), Unit] =
countedWords
.buffer(MaximumDistinctWords, OverflowStrategy.fail)
.mapAsync(identity)
.mapAsync(4, identity)
//#word-count
Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
@ -62,7 +62,7 @@ class RecipeReduceByKey extends RecipeSpec {
}
}
reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(identity)
reducedValues.buffer(maximumGroupSize, OverflowStrategy.fail).mapAsync(4, identity)
}
val wordCounts = words.via(reduceByKey(

View file

@ -124,9 +124,10 @@ Finally, sending the emails:
``mapAsync`` is applying the given function that is calling out to the external service to
each of the elements as they pass through this processing step. The function returns a :class:`Future`
and the value of that future will be emitted downstreams. As many futures as requested elements by
downstream may run in parallel and may complete in any order, but the elements that
are emitted downstream are in the same order as received from upstream.
and the value of that future will be emitted downstreams. The number of Futures
that shall run in parallel is given as the first argument to ``mapAsync``.
These Futures may complete in any order, but the elements that are emitted
downstream are in the same order as received from upstream.
That means that back-pressure works as expected. For example if the ``emailServer.send``
is the bottleneck it will limit the rate at which incoming tweets are retrieved and

View file

@ -96,7 +96,7 @@ class HttpExt(config: Config)(implicit system: ActorSystem) extends akka.actor.E
options: immutable.Traversable[Inet.SocketOption] = Nil,
settings: Option[ServerSettings] = None,
log: LoggingAdapter = system.log)(implicit fm: FlowMaterializer): Future[ServerBinding] =
bindAndHandle(Flow[HttpRequest].mapAsync(handler), interface, port, backlog, options, settings, log)
bindAndHandle(Flow[HttpRequest].mapAsync(1, handler), interface, port, backlog, options, settings, log)
/**
* Transforms a given HTTP-level server [[Flow]] into a lower-level TCP transport flow.
@ -215,7 +215,7 @@ object Http extends ExtensionId[HttpExt] with ExtensionIdProvider {
* Returns the materialization result of the underlying flow materialization.
*/
def handleWithAsyncHandler(handler: HttpRequest Future[HttpResponse])(implicit fm: FlowMaterializer): Unit =
handleWith(Flow[HttpRequest].mapAsync(handler))
handleWith(Flow[HttpRequest].mapAsync(1, handler))
}
/**

View file

@ -253,8 +253,8 @@ private[http] object HttpClient {
def recover[A, B >: A](pf: PartialFunction[Throwable, B]): () PushPullStage[A, B] = {
val stage = new PushPullStage[A, B] {
var recovery: Option[B] = None
def onPush(elem: A, ctx: Context[B]): Directive = ctx.push(elem)
def onPull(ctx: Context[B]): Directive = recovery match {
def onPush(elem: A, ctx: Context[B]): SyncDirective = ctx.push(elem)
def onPull(ctx: Context[B]): SyncDirective = recovery match {
case None ctx.pull()
case Some(x) { recovery = null; ctx.push(x) }
case null ctx.finish()

View file

@ -61,7 +61,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
def warnOnIllegalHeader(errorInfo: ErrorInfo): Unit =
if (illegalHeaderWarnings) log.warning(errorInfo.withSummaryPrepended("Illegal multipart header").formatPretty)
override def onPush(input: ByteString, ctx: Context[Output]): Directive = {
override def onPush(input: ByteString, ctx: Context[Output]): SyncDirective = {
try state(input)
catch {
case e: ParsingException fail(e.info)
@ -74,7 +74,7 @@ private[http] final class BodyPartParser(defaultContentType: ContentType,
else ctx.finish()
}
override def onPull(ctx: Context[Output]): Directive = {
override def onPull(ctx: Context[Output]): SyncDirective = {
if (output.nonEmpty)
ctx.push(dequeue())
else if (ctx.isFinishing) {
@ -274,4 +274,3 @@ private[http] object BodyPartParser {
illegalHeaderWarnings = true,
headerValueCacheLimit = 8)
}

View file

@ -37,7 +37,7 @@ private[http] abstract class HttpMessageParser[Output >: MessageOutput <: Parser
new PushPullStage[ByteString, Output] {
def onPush(elem: ByteString, ctx: Context[Output]) = handleParserOutput(self.onPush(elem), ctx)
def onPull(ctx: Context[Output]) = handleParserOutput(self.onPull(), ctx)
private def handleParserOutput(output: Output, ctx: Context[Output]): Directive =
private def handleParserOutput(output: Output, ctx: Context[Output]): SyncDirective =
output match {
case StreamEnd ctx.finish()
case NeedMoreData ctx.pull()

View file

@ -28,7 +28,7 @@ private[http] object BodyPartRenderer {
new PushPullStage[Multipart.BodyPart, Source[ChunkStreamPart, Unit]] {
var firstBoundaryRendered = false
override def onPush(bodyPart: Multipart.BodyPart, ctx: Context[Source[ChunkStreamPart, Unit]]): Directive = {
override def onPush(bodyPart: Multipart.BodyPart, ctx: Context[Source[ChunkStreamPart, Unit]]): SyncDirective = {
val r = new CustomCharsetByteStringRendering(nioCharset, partHeadersSizeHint)
def bodyPartChunks(data: Source[ByteString, Unit]): Source[ChunkStreamPart, Unit] = {
@ -51,7 +51,7 @@ private[http] object BodyPartRenderer {
ctx.push(completePartRendering())
}
override def onPull(ctx: Context[Source[ChunkStreamPart, Unit]]): Directive = {
override def onPull(ctx: Context[Source[ChunkStreamPart, Unit]]): SyncDirective = {
val finishing = ctx.isFinishing
if (finishing && firstBoundaryRendered) {
val r = new ByteStringRendering(boundary.length + 4)

View file

@ -27,7 +27,7 @@ private[http] class HttpRequestRendererFactory(userAgentHeader: Option[headers.`
final class HttpRequestRenderer extends PushStage[RequestRenderingContext, Source[ByteString, Unit]] {
override def onPush(ctx: RequestRenderingContext, opCtx: Context[Source[ByteString, Unit]]): Directive = {
override def onPush(ctx: RequestRenderingContext, opCtx: Context[Source[ByteString, Unit]]): SyncDirective = {
val r = new ByteStringRendering(requestHeaderSizeHint)
import ctx.request._

View file

@ -58,7 +58,7 @@ private[http] class HttpResponseRendererFactory(serverHeader: Option[headers.Ser
// need this for testing
private[http] def isComplete = close
override def onPush(ctx: ResponseRenderingContext, opCtx: Context[Source[ByteString, Unit]]): Directive = {
override def onPush(ctx: ResponseRenderingContext, opCtx: Context[Source[ByteString, Unit]]): SyncDirective = {
val r = new ByteStringRendering(responseHeaderSizeHint)
import ctx.response._

View file

@ -55,7 +55,7 @@ private object RenderSupport {
var lastChunkSeen = false
override def initial = new State {
override def onPush(chunk: HttpEntity.ChunkStreamPart, ctx: Context[ByteString]): Directive = {
override def onPush(chunk: HttpEntity.ChunkStreamPart, ctx: Context[ByteString]): SyncDirective = {
if (chunk.isLastChunk)
lastChunkSeen = true
ctx.push(renderChunk(chunk))
@ -70,7 +70,7 @@ private object RenderSupport {
class CheckContentLengthTransformer(length: Long) extends PushStage[ByteString, ByteString] {
var sent = 0L
override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
sent += elem.length
if (sent > length)
throw InvalidContentLengthException(s"HTTP message had declared Content-Length $length but entity data stream amounts to more bytes")

View file

@ -6,6 +6,7 @@ package akka.http.util
import akka.stream.stage.{ Directive, Context, StatefulStage }
import akka.util.ByteString
import akka.stream.stage.SyncDirective
/**
* A helper class for writing parsers from ByteStrings.
@ -15,15 +16,15 @@ import akka.util.ByteString
* INTERNAL API
*/
private[akka] abstract class ByteStringParserStage[Out] extends StatefulStage[ByteString, Out] {
protected def onTruncation(ctx: Context[Out]): Directive
protected def onTruncation(ctx: Context[Out]): SyncDirective
/**
* Derive a stage from [[IntermediateState]] and then call `pull(ctx)` instead of
* `ctx.pull()` to have truncation errors reported.
*/
abstract class IntermediateState extends State {
override def onPull(ctx: Context[Out]): Directive = pull(ctx)
def pull(ctx: Context[Out]): Directive =
override def onPull(ctx: Context[Out]): SyncDirective = pull(ctx)
def pull(ctx: Context[Out]): SyncDirective =
if (ctx.isFinishing) onTruncation(ctx)
else ctx.pull()
}
@ -37,9 +38,9 @@ private[akka] abstract class ByteStringParserStage[Out] extends StatefulStage[By
* manipulate any state during reading from the ByteReader.
*/
trait ByteReadingState extends IntermediateState {
def read(reader: ByteReader, ctx: Context[Out]): Directive
def read(reader: ByteReader, ctx: Context[Out]): SyncDirective
def onPush(data: ByteString, ctx: Context[Out]): Directive =
def onPush(data: ByteString, ctx: Context[Out]): SyncDirective =
try {
val reader = new ByteReader(data)
read(reader, ctx)
@ -50,7 +51,7 @@ private[akka] abstract class ByteStringParserStage[Out] extends StatefulStage[By
}
}
case class TryAgain(previousData: ByteString, byteReadingState: ByteReadingState) extends IntermediateState {
def onPush(data: ByteString, ctx: Context[Out]): Directive = {
def onPush(data: ByteString, ctx: Context[Out]): SyncDirective = {
become(byteReadingState)
byteReadingState.onPush(previousData ++ data, ctx)
}

View file

@ -27,10 +27,10 @@ private[http] object StreamUtils {
*/
def byteStringTransformer(f: ByteString ByteString, finish: () ByteString): Stage[ByteString, ByteString] = {
new PushPullStage[ByteString, ByteString] {
override def onPush(element: ByteString, ctx: Context[ByteString]): Directive =
override def onPush(element: ByteString, ctx: Context[ByteString]): SyncDirective =
ctx.push(f(element))
override def onPull(ctx: Context[ByteString]): Directive =
override def onPull(ctx: Context[ByteString]): SyncDirective =
if (ctx.isFinishing) ctx.pushAndFinish(finish())
else ctx.pull()
@ -43,7 +43,7 @@ private[http] object StreamUtils {
def mapErrorTransformer(f: Throwable Throwable): Flow[ByteString, ByteString, Unit] = {
val transformer = new PushStage[ByteString, ByteString] {
override def onPush(element: ByteString, ctx: Context[ByteString]): Directive =
override def onPush(element: ByteString, ctx: Context[ByteString]): SyncDirective =
ctx.push(element)
override def onUpstreamFailure(cause: Throwable, ctx: Context[ByteString]): TerminationDirective =
@ -59,7 +59,7 @@ private[http] object StreamUtils {
def skipping = new State {
var toSkip = start
override def onPush(element: ByteString, ctx: Context[ByteString]): Directive =
override def onPush(element: ByteString, ctx: Context[ByteString]): SyncDirective =
if (element.length < toSkip) {
// keep skipping
toSkip -= element.length
@ -74,7 +74,7 @@ private[http] object StreamUtils {
def taking(initiallyRemaining: Long) = new State {
var remaining: Long = initiallyRemaining
override def onPush(element: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(element: ByteString, ctx: Context[ByteString]): SyncDirective = {
val data = element.take(math.min(remaining, Int.MaxValue).toInt)
remaining -= data.size
if (remaining <= 0) ctx.pushAndFinish(data)
@ -92,7 +92,7 @@ private[http] object StreamUtils {
def initial = WaitingForData
case object WaitingForData extends State {
def onPush(elem: ByteString, ctx: Context[ByteString]): Directive =
def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective =
if (elem.size <= maxBytesPerChunk) ctx.push(elem)
else {
become(DeliveringData(elem.drop(maxBytesPerChunk)))
@ -101,10 +101,10 @@ private[http] object StreamUtils {
}
case class DeliveringData(remaining: ByteString) extends State {
def onPush(elem: ByteString, ctx: Context[ByteString]): Directive =
def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective =
throw new IllegalStateException("Not expecting data")
override def onPull(ctx: Context[ByteString]): Directive = {
override def onPull(ctx: Context[ByteString]): SyncDirective = {
val toPush = remaining.take(maxBytesPerChunk)
val toKeep = remaining.drop(maxBytesPerChunk)

View file

@ -55,7 +55,7 @@ package object util {
def printEvent[T](marker: String): Flow[T, T, Unit] =
Flow[T].transform(() new PushStage[T, T] {
override def onPush(element: T, ctx: Context[T]): Directive = {
override def onPush(element: T, ctx: Context[T]): SyncDirective = {
println(s"$marker: $element")
ctx.push(element)
}
@ -86,7 +86,7 @@ package object util {
private[http] def errorLogger(log: LoggingAdapter, msg: String): PushStage[ByteString, ByteString] =
new PushStage[ByteString, ByteString] {
override def onPush(element: ByteString, ctx: Context[ByteString]): Directive = ctx.push(element)
override def onPush(element: ByteString, ctx: Context[ByteString]): SyncDirective = ctx.push(element)
override def onUpstreamFailure(cause: Throwable, ctx: Context[ByteString]): TerminationDirective = {
log.error(cause, msg)
super.onUpstreamFailure(cause, ctx)
@ -106,4 +106,3 @@ package object util {
} else bytes.toString + " B"
}
}

View file

@ -4,7 +4,7 @@
package akka.http.coding
import akka.stream.stage.{ Directive, Context, PushStage, Stage }
import akka.stream.stage.{ SyncDirective, Context, PushStage, Stage }
import akka.util.ByteString
import org.scalatest.WordSpec
import akka.http.model._
@ -36,7 +36,7 @@ class DecoderSpec extends WordSpec with CodecSpecSupport {
def newDecompressorStage(maxBytesPerChunk: Int): () Stage[ByteString, ByteString] =
() new PushStage[ByteString, ByteString] {
def onPush(elem: ByteString, ctx: Context[ByteString]): Directive =
def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective =
ctx.push(elem ++ ByteString("compressed"))
}
}

View file

@ -89,7 +89,7 @@ class DeflateDecompressor(maxBytesPerChunk: Int = Decoder.MaxBytesPerChunkDefaul
def afterInflate: State = StartInflate
protected def afterBytesRead(buffer: Array[Byte], offset: Int, length: Int): Unit = {}
protected def onTruncation(ctx: Context[ByteString]): Directive = ctx.finish()
protected def onTruncation(ctx: Context[ByteString]): SyncDirective = ctx.finish()
}
abstract class DeflateDecompressorBase(maxBytesPerChunk: Int = Decoder.MaxBytesPerChunkDefault) extends ByteStringParserStage[ByteString] {
@ -101,7 +101,7 @@ abstract class DeflateDecompressorBase(maxBytesPerChunk: Int = Decoder.MaxBytesP
/** Start inflating */
case object StartInflate extends State {
def onPush(data: ByteString, ctx: Context[ByteString]): Directive = {
def onPush(data: ByteString, ctx: Context[ByteString]): SyncDirective = {
require(inflater.needsInput())
inflater.setInput(data.toArray)
@ -111,7 +111,7 @@ abstract class DeflateDecompressorBase(maxBytesPerChunk: Int = Decoder.MaxBytesP
/** Inflate */
case class Inflate()(data: ByteString) extends IntermediateState {
override def onPull(ctx: Context[ByteString]): Directive = {
override def onPull(ctx: Context[ByteString]): SyncDirective = {
val buffer = new Array[Byte](maxBytesPerChunk)
val read = inflater.inflate(buffer)
if (read > 0) {
@ -126,7 +126,7 @@ abstract class DeflateDecompressorBase(maxBytesPerChunk: Int = Decoder.MaxBytesP
becomeWithRemaining(next, remaining, ctx)
}
}
def onPush(elem: ByteString, ctx: Context[ByteString]): Directive =
def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective =
throw new IllegalStateException("Don't expect a new Element")
}

View file

@ -66,11 +66,11 @@ class GzipDecompressor(maxBytesPerChunk: Int = Decoder.MaxBytesPerChunkDefault)
/** No bytes were received yet */
case object Initial extends State {
def onPush(data: ByteString, ctx: Context[ByteString]): Directive =
def onPush(data: ByteString, ctx: Context[ByteString]): SyncDirective =
if (data.isEmpty) ctx.pull()
else becomeWithRemaining(ReadHeaders, data, ctx)
override def onPull(ctx: Context[ByteString]): Directive =
override def onPull(ctx: Context[ByteString]): SyncDirective =
if (ctx.isFinishing) {
ctx.finish()
} else super.onPull(ctx)
@ -81,7 +81,7 @@ class GzipDecompressor(maxBytesPerChunk: Int = Decoder.MaxBytesPerChunkDefault)
/** Reading the header bytes */
case object ReadHeaders extends ByteReadingState {
def read(reader: ByteReader, ctx: Context[ByteString]): Directive = {
def read(reader: ByteReader, ctx: Context[ByteString]): SyncDirective = {
import reader._
if (readByte() != 0x1F || readByte() != 0x8B) fail("Not in GZIP format") // check magic header
@ -104,7 +104,7 @@ class GzipDecompressor(maxBytesPerChunk: Int = Decoder.MaxBytesPerChunkDefault)
/** Reading the trailer */
case object ReadTrailer extends ByteReadingState {
def read(reader: ByteReader, ctx: Context[ByteString]): Directive = {
def read(reader: ByteReader, ctx: Context[ByteString]): SyncDirective = {
import reader._
if (readInt() != crc32.getValue.toInt) fail("Corrupt data (CRC32 checksum error)")
@ -122,7 +122,7 @@ class GzipDecompressor(maxBytesPerChunk: Int = Decoder.MaxBytesPerChunkDefault)
crc.getValue.toInt & 0xFFFF
}
override protected def onTruncation(ctx: Context[ByteString]): Directive = ctx.fail(new ZipException("Truncated GZIP stream"))
override protected def onTruncation(ctx: Context[ByteString]): SyncDirective = ctx.fail(new ZipException("Truncated GZIP stream"))
private def fail(msg: String) = throw new ZipException(msg)
}

View file

@ -33,7 +33,7 @@ object Route {
* Turns a `Route` into an server flow.
*/
def handlerFlow(route: Route)(implicit setup: RoutingSetup): Flow[HttpRequest, HttpResponse, Unit] =
Flow[HttpRequest].mapAsync(asyncHandler(route))
Flow[HttpRequest].mapAsync(1, asyncHandler(route))
/**
* Turns a `Route` into an async handler function.

View file

@ -1,26 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.{ Flow, OperationAttributes }
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import org.reactivestreams.{ Processor, Publisher }
import scala.concurrent.Future
class MapAsyncTest extends AkkaIdentityProcessorVerification[Int] {
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
implicit val materializer = ActorFlowMaterializer(settings)(system)
processorFromFlow(
Flow[Int].mapAsync(Future.successful).withAttributes(OperationAttributes.name("identity")))
}
override def createElement(element: Int): Int = element
}

View file

@ -1,26 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.{ Flow, OperationAttributes }
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import org.reactivestreams.{ Processor, Publisher }
import scala.concurrent.Future
class MapAsyncUnorderedTest extends AkkaIdentityProcessorVerification[Int] {
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
implicit val materializer = ActorFlowMaterializer(settings)(system)
processorFromFlow(
Flow[Int].mapAsyncUnordered(Future.successful).withAttributes(OperationAttributes.name("identity")))
}
override def createElement(element: Int): Int = element
}

View file

@ -47,12 +47,12 @@ public class FlowGraphTest extends StreamTest {
public PushPullStage<T, T> create() throws Exception {
return new PushPullStage<T, T>() {
@Override
public Directive onPush(T element, Context<T> ctx) {
public SyncDirective onPush(T element, Context<T> ctx) {
return ctx.push(element);
}
@Override
public Directive onPull(Context<T> ctx) {
public SyncDirective onPull(Context<T> ctx) {
return ctx.pull();
}
};

View file

@ -102,7 +102,7 @@ public class FlowTest extends StreamTest {
public StageState<Integer, Integer> initial() {
return new StageState<Integer, Integer>() {
@Override
public Directive onPush(Integer element, Context<Integer> ctx) {
public SyncDirective onPush(Integer element, Context<Integer> ctx) {
sum += element;
count += 1;
if (count == 4) {
@ -226,12 +226,12 @@ public class FlowTest extends StreamTest {
public PushPullStage<T, T> create() throws Exception {
return new PushPullStage<T, T>() {
@Override
public Directive onPush(T element, Context<T> ctx) {
public SyncDirective onPush(T element, Context<T> ctx) {
return ctx.push(element);
}
@Override
public Directive onPull(Context<T> ctx) {
public SyncDirective onPull(Context<T> ctx) {
return ctx.pull();
}
};
@ -446,7 +446,7 @@ public class FlowTest extends StreamTest {
public void mustBeAbleToUseMapAsync() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input = Arrays.asList("a", "b", "c");
final Flow<String, String, BoxedUnit> flow = Flow.of(String.class).mapAsync(new Function<String, Future<String>>() {
final Flow<String, String, BoxedUnit> flow = Flow.of(String.class).mapAsync(4, new Function<String, Future<String>>() {
public Future<String> apply(String elem) {
return Futures.successful(elem.toUpperCase());
}

View file

@ -117,7 +117,7 @@ public class SourceTest extends StreamTest {
public StageState<Integer, Integer> initial() {
return new StageState<Integer, Integer>() {
@Override
public Directive onPush(Integer element, Context<Integer> ctx) {
public SyncDirective onPush(Integer element, Context<Integer> ctx) {
sum += element;
count += 1;
if (count == 4) {
@ -428,7 +428,7 @@ public class SourceTest extends StreamTest {
public void mustBeAbleToUseMapFuture() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Iterable<String> input = Arrays.asList("a", "b", "c");
Source.from(input).mapAsync(new Function<String, Future<String>>() {
Source.from(input).mapAsync(4, new Function<String, Future<String>>() {
public Future<String> apply(String elem) {
return Futures.successful(elem.toUpperCase());
}

View file

@ -24,7 +24,7 @@ class ActorInterpreterSpec extends AkkaSpec {
class Setup(ops: List[Stage[_, _]] = List(fusing.Map({ x: Any x }, stoppingDecider))) {
val up = PublisherProbe[Int]
val down = SubscriberProbe[Int]
private val props = ActorInterpreter.props(mat.settings, ops).withDispatcher("akka.test.stream-dispatcher")
private val props = ActorInterpreter.props(mat.settings, ops, mat).withDispatcher("akka.test.stream-dispatcher")
val actor = system.actorOf(props)
val processor = ActorProcessorFactory[Int, Int](actor)
}

View file

@ -18,21 +18,21 @@ class FixedBufferSpec extends AkkaSpec {
}
"become nonempty after enqueueing" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[String](size)
buf.enqueue("test")
buf.isEmpty should be(false)
buf.isFull should be(size == 1)
}
"become full after size elements are enqueued" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[String](size)
for (_ 1 to size) buf.enqueue("test")
buf.isEmpty should be(false)
buf.isFull should be(true)
}
"become empty after enqueueing and tail drop" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[String](size)
buf.enqueue("test")
buf.dropTail()
buf.isEmpty should be(true)
@ -40,7 +40,7 @@ class FixedBufferSpec extends AkkaSpec {
}
"become empty after enqueueing and head drop" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[String](size)
buf.enqueue("test")
buf.dropHead()
buf.isEmpty should be(true)
@ -48,21 +48,21 @@ class FixedBufferSpec extends AkkaSpec {
}
"drop head properly" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[Int](size)
for (elem 1 to size) buf.enqueue(elem)
buf.dropHead()
for (elem 2 to size) buf.dequeue() should be(elem)
}
"drop tail properly" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[Int](size)
for (elem 1 to size) buf.enqueue(elem)
buf.dropTail()
for (elem 1 to size - 1) buf.dequeue() should be(elem)
}
"become non-full after tail dropped from full buffer" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[String](size)
for (_ 1 to size) buf.enqueue("test")
buf.dropTail()
buf.isEmpty should be(size == 1)
@ -70,7 +70,7 @@ class FixedBufferSpec extends AkkaSpec {
}
"become non-full after head dropped from full buffer" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[String](size)
for (_ 1 to size) buf.enqueue("test")
buf.dropTail()
buf.isEmpty should be(size == 1)
@ -78,7 +78,7 @@ class FixedBufferSpec extends AkkaSpec {
}
"work properly with full-range filling/draining cycles" in {
val buf = FixedSizeBuffer(size)
val buf = FixedSizeBuffer[Int](size)
for (_ 1 to 10) {
buf.isEmpty should be(true)

View file

@ -5,6 +5,8 @@ package akka.stream.impl.fusing
import akka.stream.testkit.AkkaSpec
import akka.stream.stage._
import akka.testkit.TestProbe
import akka.stream.ActorFlowMaterializer
trait InterpreterSpecKit extends AkkaSpec {
@ -19,13 +21,13 @@ trait InterpreterSpecKit extends AkkaSpec {
var oneMore: Boolean = false
var lastElem: T = _
override def onPush(elem: T, ctx: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
lastElem = elem
oneMore = true
ctx.push(elem)
}
override def onPull(ctx: Context[T]): Directive = {
override def onPull(ctx: Context[T]): SyncDirective = {
if (oneMore) {
oneMore = false
ctx.push(lastElem)
@ -36,12 +38,12 @@ trait InterpreterSpecKit extends AkkaSpec {
private[akka] case class KeepGoing[T]() extends PushPullStage[T, T] {
var lastElem: T = _
override def onPush(elem: T, ctx: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
lastElem = elem
ctx.push(elem)
}
override def onPull(ctx: Context[T]): Directive = {
override def onPull(ctx: Context[T]): SyncDirective = {
if (ctx.isFinishing) {
ctx.push(lastElem)
} else ctx.pull()
@ -55,7 +57,11 @@ trait InterpreterSpecKit extends AkkaSpec {
val upstream = new UpstreamProbe
val downstream = new DownstreamProbe
val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream, forkLimit, overflowToHeap)
val sidechannel = TestProbe()
val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream,
(op, ctx, event) sidechannel.ref ! ActorInterpreter.AsyncInput(op, ctx, event),
ActorFlowMaterializer(),
forkLimit, overflowToHeap)
interpreter.init()
def lastEvents(): Set[Any] = {
@ -64,7 +70,7 @@ trait InterpreterSpecKit extends AkkaSpec {
result
}
class UpstreamProbe extends BoundaryStage {
private[akka] class UpstreamProbe extends BoundaryStage {
override def onDownstreamFinish(ctx: BoundaryContext): TerminationDirective = {
lastEvent += Cancel
@ -82,13 +88,13 @@ trait InterpreterSpecKit extends AkkaSpec {
override def onPush(elem: Any, ctx: BoundaryContext): Directive =
throw new UnsupportedOperationException("Cannot push the boundary")
def onNext(elem: Any): Unit = enter().push(elem)
def onComplete(): Unit = enter().finish()
def onError(cause: Throwable): Unit = enter().fail(cause)
def onNext(elem: Any): Unit = enterAndPush(elem)
def onComplete(): Unit = enterAndFinish()
def onError(cause: Throwable): Unit = enterAndFail(cause)
}
class DownstreamProbe extends BoundaryStage {
private[akka] class DownstreamProbe extends BoundaryStage {
override def onPush(elem: Any, ctx: BoundaryContext): Directive = {
lastEvent += OnNext(elem)
ctx.exit()
@ -107,9 +113,9 @@ trait InterpreterSpecKit extends AkkaSpec {
override def onPull(ctx: BoundaryContext): Directive =
throw new UnsupportedOperationException("Cannot pull the boundary")
def requestOne(): Unit = enter().pull()
def requestOne(): Unit = enterAndPull()
def cancel(): Unit = enter().finish()
def cancel(): Unit = enterAndFinish()
}
}

View file

@ -10,6 +10,7 @@ import akka.stream.stage.Directive
import akka.stream.stage.PushPullStage
import akka.stream.stage.Stage
import akka.stream.stage.TerminationDirective
import akka.stream.stage.SyncDirective
object InterpreterSupervisionSpec {
val TE = new Exception("TEST") with NoStackTrace {
@ -18,12 +19,12 @@ object InterpreterSupervisionSpec {
class RestartTestStage extends PushPullStage[Int, Int] {
var sum = 0
def onPush(elem: Int, ctx: Context[Int]): Directive = {
def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
sum += elem
ctx.push(sum)
}
override def onPull(ctx: Context[Int]): Directive = {
override def onPull(ctx: Context[Int]): SyncDirective = {
ctx.pull()
}
@ -37,12 +38,12 @@ object InterpreterSupervisionSpec {
case class OneToManyTestStage(decider: Supervision.Decider, absorbTermination: Boolean = false) extends PushPullStage[Int, Int] {
var buf: List[Int] = Nil
def onPush(elem: Int, ctx: Context[Int]): Directive = {
def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
buf = List(elem + 1, elem + 2, elem + 3)
ctx.push(elem)
}
override def onPull(ctx: Context[Int]): Directive = {
override def onPull(ctx: Context[Int]): SyncDirective = {
if (buf.isEmpty && ctx.isFinishing)
ctx.finish()
else if (buf.isEmpty)
@ -198,7 +199,7 @@ class InterpreterSupervisionSpec extends InterpreterSpecKit {
"restart when onPush throws" in {
val stage = new RestartTestStage {
override def onPush(elem: Int, ctx: Context[Int]): Directive = {
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
if (elem <= 0) throw TE
else super.onPush(elem, ctx)
}
@ -226,7 +227,7 @@ class InterpreterSupervisionSpec extends InterpreterSpecKit {
"restart when onPush throws after ctx.push" in {
val stage = new RestartTestStage {
override def onPush(elem: Int, ctx: Context[Int]): Directive = {
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
val ret = ctx.push(sum)
super.onPush(elem, ctx)
if (elem <= 0) throw TE
@ -256,7 +257,7 @@ class InterpreterSupervisionSpec extends InterpreterSpecKit {
"fail when onPull throws" in {
val stage = new RestartTestStage {
override def onPull(ctx: Context[Int]): Directive = {
override def onPull(ctx: Context[Int]): SyncDirective = {
if (sum < 0) throw TE
super.onPull(ctx)
}

View file

@ -51,7 +51,7 @@ class IteratorInterpreterSpec extends AkkaSpec {
"throw exceptions when chain fails" in {
val itr = new IteratorInterpreter[Int, Int](List(1, 2, 3).iterator, Seq(
new PushStage[Int, Int] {
override def onPush(elem: Int, ctx: Context[Int]): Directive = {
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
if (elem == 2) ctx.fail(new ArithmeticException())
else ctx.push(elem)
}
@ -66,7 +66,7 @@ class IteratorInterpreterSpec extends AkkaSpec {
"throw exceptions when op in chain throws" in {
val itr = new IteratorInterpreter[Int, Int](List(1, 2, 3).iterator, Seq(
new PushStage[Int, Int] {
override def onPush(elem: Int, ctx: Context[Int]): Directive = {
override def onPush(elem: Int, ctx: Context[Int]): SyncDirective = {
if (elem == 2) throw new ArithmeticException()
else ctx.push(elem)
}
@ -120,12 +120,12 @@ class IteratorInterpreterSpec extends AkkaSpec {
case class NaiveTake[T](count: Int) extends PushPullStage[T, T] {
private var left: Int = count
override def onPush(elem: T, ctx: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
left -= 1
ctx.push(elem)
}
override def onPull(ctx: Context[T]): Directive = {
override def onPull(ctx: Context[T]): SyncDirective = {
if (left == 0) ctx.finish()
else ctx.pull()
}
@ -137,7 +137,7 @@ class IteratorInterpreterSpec extends AkkaSpec {
private var buf = ByteString.empty
private var passthrough = false
override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = {
override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
if (passthrough) ctx.push(elem)
else {
buf = buf ++ elem
@ -150,7 +150,7 @@ class IteratorInterpreterSpec extends AkkaSpec {
}
}
override def onPull(ctx: Context[ByteString]): Directive = {
override def onPull(ctx: Context[ByteString]): SyncDirective = {
if (ctx.isFinishing) ctx.pushAndFinish(buf)
else ctx.pull()
}

View file

@ -22,7 +22,7 @@ class FlowGraphCompileSpec extends AkkaSpec {
def op[In, Out]: () PushStage[In, Out] = { ()
new PushStage[In, Out] {
override def onPush(elem: In, ctx: Context[Out]): Directive =
override def onPush(elem: In, ctx: Context[Out]): SyncDirective =
ctx.push(elem.asInstanceOf[Out])
}
}

View file

@ -8,8 +8,8 @@ import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NoStackTrace
import akka.stream.ActorFlowMaterializer
import akka.stream.stage._
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.TestLatch
@ -17,8 +17,50 @@ import akka.testkit.TestProbe
import akka.stream.scaladsl.OperationAttributes.supervisionStrategy
import akka.stream.Supervision.resumingDecider
import akka.stream.impl.ReactiveStreamsCompliance
import scala.util.Try
import scala.concurrent.ExecutionContext
import scala.util.Failure
import scala.util.Success
object FlowMapAsyncSpec {
class MapAsyncOne[In, Out](f: In Future[Out])(implicit ec: ExecutionContext) extends AsyncStage[In, Out, Try[Out]] {
private var elemInFlight: Out = _
override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]) = {
val future = f(elem)
val cb = ctx.getAsyncCallback()
future.onComplete(cb.invoke)
ctx.holdUpstream()
}
override def onPull(ctx: AsyncContext[Out, Try[Out]]) =
if (elemInFlight != null) {
val e = elemInFlight
elemInFlight = null.asInstanceOf[Out]
pushIt(e, ctx)
} else ctx.holdDownstream()
override def onAsyncInput(input: Try[Out], ctx: AsyncContext[Out, Try[Out]]) =
input match {
case Failure(ex) ctx.fail(ex)
case Success(e) if ctx.isHoldingDownstream pushIt(e, ctx)
case Success(e)
elemInFlight = e
ctx.ignore()
}
override def onUpstreamFinish(ctx: AsyncContext[Out, Try[Out]]) =
if (ctx.isHoldingUpstream) ctx.absorbTermination()
else ctx.finish()
private def pushIt(elem: Out, ctx: AsyncContext[Out, Try[Out]]) =
if (ctx.isFinishing) ctx.pushAndFinish(elem)
else ctx.pushAndPull(elem)
}
}
class FlowMapAsyncSpec extends AkkaSpec {
import FlowMapAsyncSpec._
implicit val materializer = ActorFlowMaterializer()
@ -27,7 +69,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"produce future elements" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 3).mapAsync(n Future(n)).runWith(Sink(c))
val p = Source(1 to 3).mapAsync(4, n Future(n)).runWith(Sink(c))
val sub = c.expectSubscription()
sub.request(2)
c.expectNext(1)
@ -41,7 +83,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"produce future elements in order" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 50).mapAsync(n Future {
val p = Source(1 to 50).mapAsync(4, n Future {
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10))
n
}).to(Sink(c)).run()
@ -51,25 +93,26 @@ class FlowMapAsyncSpec extends AkkaSpec {
c.expectComplete()
}
"not run more futures than requested elements" in {
"not run more futures than requested parallelism" in {
val probe = TestProbe()
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 20).mapAsync(n Future {
val p = Source(1 to 20).mapAsync(8, n Future {
probe.ref ! n
n
}).to(Sink(c)).run()
val sub = c.expectSubscription()
// nothing before requested
// running 8 in parallel
probe.receiveN(8).toSet should be((1 to 8).toSet)
probe.expectNoMsg(500.millis)
sub.request(1)
probe.expectMsg(1)
probe.expectMsg(9)
probe.expectNoMsg(500.millis)
sub.request(2)
probe.receiveN(2).toSet should be(Set(2, 3))
probe.receiveN(2).toSet should be(Set(10, 11))
probe.expectNoMsg(500.millis)
sub.request(10)
probe.receiveN(10).toSet should be((4 to 13).toSet)
probe.receiveN(9).toSet should be((12 to 20).toSet)
probe.expectNoMsg(200.millis)
for (n 1 to 13) c.expectNext(n)
@ -80,7 +123,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val latch = TestLatch(1)
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsync(n Future {
val p = Source(1 to 5).mapAsync(4, n Future {
if (n == 3) throw new RuntimeException("err1") with NoStackTrace
else {
Await.ready(latch, 10.seconds)
@ -97,7 +140,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val latch = TestLatch(1)
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsync(n
val p = Source(1 to 5).mapAsync(4, n
if (n == 3) throw new RuntimeException("err2") with NoStackTrace
else {
Future {
@ -115,7 +158,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"resume after future failure" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(n Future {
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(4, n Future {
if (n == 3) throw new RuntimeException("err3") with NoStackTrace
else n
})).to(Sink(c)).run()
@ -128,7 +171,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"resume when mapAsync throws" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(n
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsync(4, n
if (n == 3) throw new RuntimeException("err4") with NoStackTrace
else Future(n))).
to(Sink(c)).run()
@ -140,7 +183,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"signal NPE when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b")).mapAsync(elem Future.successful(null)).to(Sink(c)).run()
val p = Source(List("a", "b")).mapAsync(4, elem Future.successful(null)).to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
@ -149,7 +192,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
"resume when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))(
_.mapAsync(elem if (elem == "b") Future.successful(null) else Future.successful(elem)))
_.mapAsync(4, elem if (elem == "b") Future.successful(null) else Future.successful(elem)))
.to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
@ -161,7 +204,7 @@ class FlowMapAsyncSpec extends AkkaSpec {
val pub = StreamTestKit.PublisherProbe[Int]()
val sub = StreamTestKit.SubscriberProbe[Int]()
Source(pub).mapAsync(Future.successful).runWith(Sink(sub))
Source(pub).mapAsync(4, Future.successful).runWith(Sink(sub))
val upstream = pub.expectSubscription()
upstream.expectRequest()
@ -173,4 +216,45 @@ class FlowMapAsyncSpec extends AkkaSpec {
}
}
"A MapAsyncOne" must {
import system.dispatcher
"work in the happy case" in {
val probe = TestProbe()
val N = 100
val f = Source(1 to N).transform(() new MapAsyncOne(i {
probe.ref ! i
Future { Thread.sleep(10); probe.ref ! (i + 10); i * 2 }
})).grouped(N + 10).runWith(Sink.head)
Await.result(f, 2.seconds) should ===((1 to N).map(_ * 2))
probe.receiveN(2 * N) should ===((1 to N).flatMap(x List(x, x + 10)))
probe.expectNoMsg(100.millis)
}
"work when futures fail" in {
val probe = StreamTestKit.SubscriberProbe[Int]
val ex = new Exception("KABOOM")
Source.single(1)
.transform(() new MapAsyncOne(_ Future.failed(ex)))
.runWith(Sink(probe))
val sub = probe.expectSubscription()
sub.request(1)
probe.expectError(ex)
}
"work when futures fail later" in {
val probe = StreamTestKit.SubscriberProbe[Int]
val ex = new Exception("KABOOM")
Source(List(1, 2))
.transform(() new MapAsyncOne(x if (x == 1) Future.successful(1) else Future.failed(ex)))
.runWith(Sink(probe))
val sub = probe.expectSubscription()
sub.request(1)
probe.expectNext(1)
probe.expectError(ex)
}
}
}

View file

@ -29,7 +29,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val latch = (1 to 4).map(_ -> TestLatch(1)).toMap
val p = Source(1 to 4).mapAsyncUnordered(n Future {
val p = Source(1 to 4).mapAsyncUnordered(4, n Future {
Await.ready(latch(n), 5.seconds)
n
}).to(Sink(c)).run()
@ -50,25 +50,26 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val probe = TestProbe()
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 20).mapAsyncUnordered(n Future {
val p = Source(1 to 20).mapAsyncUnordered(4, n Future {
probe.ref ! n
n
}).to(Sink(c)).run()
val sub = c.expectSubscription()
// nothing before requested
probe.expectNoMsg(500.millis)
// first four run immediately
probe.expectMsgAllOf(1, 2, 3, 4)
c.expectNoMsg(200.millis)
probe.expectNoMsg(Duration.Zero)
sub.request(1)
val elem1 = probe.expectMsgType[Int]
var got = Set(c.expectNext())
probe.expectMsg(5)
probe.expectNoMsg(500.millis)
sub.request(2)
val elem2 = probe.expectMsgType[Int]
val elem3 = probe.expectMsgType[Int]
probe.expectNoMsg(500.millis)
sub.request(100)
(probe.receiveN(17).toSet + elem1 + elem2 + elem3) should be((1 to 20).toSet)
probe.expectNoMsg(200.millis)
sub.request(25)
probe.expectMsgAllOf(6 to 20: _*)
c.probe.within(3.seconds) {
for (_ 2 to 20) got += c.expectNext()
}
c.probe.receiveN(20).toSet should be((1 to 20).map(StreamTestKit.OnNext.apply).toSet)
got should be((1 to 20).toSet)
c.expectComplete()
}
@ -76,7 +77,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val latch = TestLatch(1)
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsyncUnordered(n Future {
val p = Source(1 to 5).mapAsyncUnordered(4, n Future {
if (n == 3) throw new RuntimeException("err1") with NoStackTrace
else {
Await.ready(latch, 10.seconds)
@ -93,7 +94,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val latch = TestLatch(1)
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).mapAsyncUnordered(n
val p = Source(1 to 5).mapAsyncUnordered(4, n
if (n == 3) throw new RuntimeException("err2") with NoStackTrace
else {
Future {
@ -111,7 +112,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume after future failure" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(n Future {
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(4, n Future {
if (n == 3) throw new RuntimeException("err3") with NoStackTrace
else n
})).to(Sink(c)).run()
@ -124,19 +125,19 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume when mapAsyncUnordered throws" in {
val c = StreamTestKit.SubscriberProbe[Int]()
implicit val ec = system.dispatcher
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(n
val p = Source(1 to 5).section(supervisionStrategy(resumingDecider))(_.mapAsyncUnordered(4, n
if (n == 3) throw new RuntimeException("err4") with NoStackTrace
else Future(n))).
to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
val expected = (OnComplete :: List(1, 2, 4, 5).map(OnNext.apply)).toSet
c.probe.receiveN(5).toSet should be(expected)
c.probe.receiveWhile(3.seconds, messages = 5) { case x x }.toSet should be(expected)
}
"signal NPE when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b")).mapAsyncUnordered(elem Future.successful(null)).to(Sink(c)).run()
val p = Source(List("a", "b")).mapAsyncUnordered(4, elem Future.successful(null)).to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
c.expectError.getMessage should be(ReactiveStreamsCompliance.ElementMustNotBeNullMsg)
@ -145,7 +146,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
"resume when future is completed with null" in {
val c = StreamTestKit.SubscriberProbe[String]()
val p = Source(List("a", "b", "c")).section(supervisionStrategy(resumingDecider))(
_.mapAsyncUnordered(elem if (elem == "b") Future.successful(null) else Future.successful(elem)))
_.mapAsyncUnordered(4, elem if (elem == "b") Future.successful(null) else Future.successful(elem)))
.to(Sink(c)).run()
val sub = c.expectSubscription()
sub.request(10)
@ -157,7 +158,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
val pub = StreamTestKit.PublisherProbe[Int]()
val sub = StreamTestKit.SubscriberProbe[Int]()
Source(pub).mapAsyncUnordered(Future.successful).runWith(Sink(sub))
Source(pub).mapAsyncUnordered(4, Future.successful).runWith(Sink(sub))
val upstream = pub.expectSubscription()
upstream.expectRequest()

View file

@ -45,7 +45,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
_settings: ActorFlowMaterializerSettings,
_ops: Seq[Stage[_, _]],
brokenMessage: Any)
extends ActorInterpreter(_settings, _ops) {
extends ActorInterpreter(_settings, _ops, mat) {
import akka.stream.actor.ActorSubscriberMessage._

View file

@ -96,7 +96,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
}
}
lazy val echo: State = new State {
def onPush(elem: Int, ctx: Context[Int]): Directive =
def onPush(elem: Int, ctx: Context[Int]): SyncDirective =
if (elem == 0) {
become(inflate)
ctx.pull()

View file

@ -29,7 +29,7 @@ class GraphMatValueSpec extends AkkaSpec {
val f = FlowGraph.closed(foldSink) { implicit b
fold
Source(1 to 10) ~> fold
b.matValue.mapAsync(identity) ~> Sink(sub)
b.matValue.mapAsync(4, identity) ~> Sink(sub)
}.run()
val r1 = Await.result(f, 3.seconds)
@ -46,8 +46,8 @@ class GraphMatValueSpec extends AkkaSpec {
fold
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
Source(1 to 10) ~> fold
b.matValue.mapAsync(identity) ~> zip.in0
b.matValue.mapAsync(identity) ~> zip.in1
b.matValue.mapAsync(4, identity) ~> zip.in0
b.matValue.mapAsync(4, identity) ~> zip.in1
zip.out ~> Sink(sub)
}.run()
@ -67,13 +67,13 @@ class GraphMatValueSpec extends AkkaSpec {
}
"allow exposing the materialized value as port" in {
val (f1, f2) = foldFeedbackSource.mapAsync(identity).map(_ + 100).toMat(Sink.head)(Keep.both).run()
val (f1, f2) = foldFeedbackSource.mapAsync(4, identity).map(_ + 100).toMat(Sink.head)(Keep.both).run()
Await.result(f1, 3.seconds) should ===(55)
Await.result(f2, 3.seconds) should ===(155)
}
"allow exposing the materialized value as port even if wrapped and the final materialized value is Unit" in {
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(identity).map(_ + 100).mapMaterialized((_) ())
val noMatSource: Source[Int, Unit] = foldFeedbackSource.mapAsync(4, identity).map(_ + 100).mapMaterialized((_) ())
Await.result(noMatSource.runWith(Sink.head), 3.seconds) should ===(155)
}
@ -82,8 +82,8 @@ class GraphMatValueSpec extends AkkaSpec {
(s1, s2)
val zip = b.add(ZipWith[Int, Int, Int](_ + _))
s1.outlet.mapAsync(identity) ~> zip.in0
s2.outlet.mapAsync(identity).map(_ * 100) ~> zip.in1
s1.outlet.mapAsync(4, identity) ~> zip.in0
s2.outlet.mapAsync(4, identity).map(_ * 100) ~> zip.in1
zip.out
}

View file

@ -9,7 +9,7 @@ import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit.{ OnNext, SubscriberProbe }
import akka.util.ByteString
import akka.stream.{ Inlet, Outlet, Shape, Graph }
import org.scalautils.ConversionCheckedTripleEquals
import org.scalactic.ConversionCheckedTripleEquals
object GraphOpsIntegrationSpec {
import FlowGraph.Implicits._
@ -23,10 +23,12 @@ object GraphOpsIntegrationSpec {
override def deepCopy() = ShufflePorts(
new Inlet[In](in1.toString), new Inlet[In](in2.toString),
new Outlet[Out](out1.toString), new Outlet[Out](out2.toString))
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]) = {
override def copyFromPorts(inlets: immutable.Seq[Inlet[_]], outlets: immutable.Seq[Outlet[_]]): ShufflePorts[In, Out] = {
assert(inlets.size == this.inlets.size)
assert(outlets.size == this.outlets.size)
ShufflePorts(inlets(0), inlets(1), outlets(0), outlets(1))
val i = inlets.asInstanceOf[Seq[Inlet[In]]]
val o = outlets.asInstanceOf[Seq[Outlet[Out]]]
ShufflePorts(i(0), i(1), o(0), o(1))
}
}

View file

@ -4,7 +4,7 @@ import akka.stream.testkit.AkkaSpec
import akka.stream._
import scala.concurrent.Await
import scala.concurrent.duration._
import org.scalautils.ConversionCheckedTripleEquals
import org.scalactic.ConversionCheckedTripleEquals
import akka.stream.testkit.StreamTestKit._
class ReverseArrowSpec extends AkkaSpec with ConversionCheckedTripleEquals {

View file

@ -32,3 +32,15 @@ abstract class FlowMaterializer {
def executionContext: ExecutionContextExecutor
}
/**
* INTERNAL API
*/
private[akka] object NoFlowMaterializer extends FlowMaterializer {
override def withNamePrefix(name: String): FlowMaterializer =
throw new UnsupportedOperationException("NoFlowMaterializer cannot be named")
override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat =
throw new UnsupportedOperationException("NoFlowMaterializer cannot materialize")
override def executionContext: ExecutionContextExecutor =
throw new UnsupportedOperationException("NoFlowMaterializer does not provide an ExecutionContext")
}

View file

@ -113,7 +113,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
final class StartTimedFlow[T](timedContext: TimedFlowContext) extends PushStage[T, T] {
private var started = false
override def onPush(elem: T, ctx: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
if (!started) {
timedContext.start()
started = true
@ -124,7 +124,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
final class StopTimed[T](timedContext: TimedFlowContext, _onComplete: FiniteDuration Unit) extends PushStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): Directive = ctx.push(elem)
override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.push(elem)
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
stopTime()
@ -145,7 +145,7 @@ object Timed extends TimedOps with TimedIntervalBetweenOps {
private var prevNanos = 0L
private var matched = 0L
override def onPush(elem: T, ctx: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
if (matching(elem)) {
val d = updateInterval(elem)

View file

@ -235,36 +235,38 @@ private[akka] object ActorProcessorFactory {
import akka.stream.impl.Stages._
import ActorFlowMaterializerImpl._
private val _identity = (x: Any) x
def props(materializer: ActorFlowMaterializerImpl, op: StageModule, parentAttributes: OperationAttributes): (Props, Any) = {
val att = parentAttributes and op.attributes
// USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
// Also, otherwise the attributes will not affect the settings properly!
val settings = calcSettings(att)(materializer.settings)
op match {
case Identity(_) (ActorInterpreter.props(settings, List(fusing.Map({ x: Any x }, settings.supervisionDecider))), ())
case Fused(ops, _) (ActorInterpreter.props(settings, ops), ())
case Map(f, _) (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider))), ())
case Filter(p, _) (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider))), ())
case Drop(n, _) (ActorInterpreter.props(settings, List(fusing.Drop(n))), ())
case Take(n, _) (ActorInterpreter.props(settings, List(fusing.Take(n))), ())
case Collect(pf, _) (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf))), ())
case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider))), ())
case Expand(s, f, _) (ActorInterpreter.props(settings, List(fusing.Expand(s, f))), ())
case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider))), ())
case Buffer(n, s, _) (ActorInterpreter.props(settings, List(fusing.Buffer(n, s))), ())
case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider))), ())
case MapAsync(f, _) (MapAsyncProcessorImpl.props(settings, f), ())
case MapAsyncUnordered(f, _) (MapAsyncUnorderedProcessorImpl.props(settings, f), ())
case Grouped(n, _) (ActorInterpreter.props(settings, List(fusing.Grouped(n))), ())
case Identity(_) (ActorInterpreter.props(settings, List(fusing.Map(_identity, settings.supervisionDecider)), materializer), ())
case Fused(ops, _) (ActorInterpreter.props(settings, ops, materializer), ())
case Map(f, _) (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider)), materializer), ())
case Filter(p, _) (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider)), materializer), ())
case Drop(n, _) (ActorInterpreter.props(settings, List(fusing.Drop(n)), materializer), ())
case Take(n, _) (ActorInterpreter.props(settings, List(fusing.Take(n)), materializer), ())
case Collect(pf, _) (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf)), materializer), ())
case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider)), materializer), ())
case Expand(s, f, _) (ActorInterpreter.props(settings, List(fusing.Expand(s, f)), materializer), ())
case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider)), materializer), ())
case Buffer(n, s, _) (ActorInterpreter.props(settings, List(fusing.Buffer(n, s)), materializer), ())
case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider)), materializer), ())
case MapAsync(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsync(p, f, settings.supervisionDecider)), materializer), ())
case MapAsyncUnordered(p, f, _) (ActorInterpreter.props(settings, List(fusing.MapAsyncUnordered(p, f, settings.supervisionDecider)), materializer), ())
case Grouped(n, _) (ActorInterpreter.props(settings, List(fusing.Grouped(n)), materializer), ())
case GroupBy(f, _) (GroupByProcessorImpl.props(settings, f), ())
case PrefixAndTail(n, _) (PrefixAndTailImpl.props(settings, n), ())
case SplitWhen(p, _) (SplitWhenProcessorImpl.props(settings, p), ())
case ConcatAll(_) (ConcatAllImpl.props(materializer), ()) //FIXME closes over the materializer, is this good?
case StageFactory(mkStage, _) (ActorInterpreter.props(settings, List(mkStage())), ())
case StageFactory(mkStage, _) (ActorInterpreter.props(settings, List(mkStage()), materializer), ())
case TimerTransform(mkStage, _) (TimerTransformerProcessorsImpl.props(settings, mkStage()), ())
case MaterializingStageFactory(mkStageAndMat, _)
val (stage, mat) = mkStageAndMat()
(ActorInterpreter.props(settings, List(stage)), mat)
val sm = mkStageAndMat()
(ActorInterpreter.props(settings, List(sm._1), materializer), sm._2)
case DirectProcessor(p, m) throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory")
}
}

View file

@ -3,6 +3,9 @@
*/
package akka.stream.impl
import scala.reflect.classTag
import scala.reflect.ClassTag
/**
* INTERNAL API
*/
@ -17,67 +20,65 @@ private[akka] object FixedSizeBuffer {
*
* Returns a specialized instance for power-of-two sized buffers.
*/
def apply(size: Int): FixedSizeBuffer =
if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
def apply[T](size: Int): FixedSizeBuffer[T] =
if (size < 1) throw new IllegalArgumentException("size must be positive")
else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
else new ModuloFixedSizeBuffer(size)
sealed abstract class FixedSizeBuffer(val size: Int) {
sealed abstract class FixedSizeBuffer[T](val size: Int) {
private val buffer = new Array[AnyRef](size)
protected var readIdx = 0
protected var writeIdx = 0
private var remainingCapacity = size
private val buffer = Array.ofDim[Any](size)
def used: Int = writeIdx - readIdx
protected def incWriteIdx(): Unit
protected def decWriteIdx(): Unit
protected def incReadIdx(): Unit
def isFull: Boolean = used == size
def isEmpty: Boolean = used == 0
def isFull: Boolean = remainingCapacity == 0
def isEmpty: Boolean = remainingCapacity == size
def enqueue(elem: Any): Unit = {
buffer(writeIdx) = elem
incWriteIdx()
remainingCapacity -= 1
def enqueue(elem: T): Int = {
put(writeIdx, elem)
val ret = writeIdx
writeIdx += 1
ret
}
def dequeue(): Any = {
val result = buffer(readIdx)
protected def toOffset(idx: Int): Int
def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = elem.asInstanceOf[AnyRef]
def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T]
def peek(): T = get(readIdx)
def dequeue(): T = {
val result = get(readIdx)
dropHead()
result
}
def clear(): Unit = {
java.util.Arrays.fill(buffer.asInstanceOf[Array[Object]], null)
java.util.Arrays.fill(buffer, null)
readIdx = 0
writeIdx = 0
remainingCapacity = size
}
def dropHead(): Unit = {
buffer(readIdx) = null
incReadIdx()
remainingCapacity += 1
put(readIdx, null.asInstanceOf[T])
readIdx += 1
}
def dropTail(): Unit = {
decWriteIdx()
//buffer(writeIdx) = null
remainingCapacity += 1
writeIdx -= 1
put(writeIdx, null.asInstanceOf[T])
}
}
private final class ModuloFixedSizeBuffer(_size: Int) extends FixedSizeBuffer(_size) {
override protected def incReadIdx(): Unit = readIdx = (readIdx + 1) % size
override protected def decWriteIdx(): Unit = writeIdx = (writeIdx + size - 1) % size
override protected def incWriteIdx(): Unit = writeIdx = (writeIdx + 1) % size
private final class ModuloFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) {
override protected def toOffset(idx: Int): Int = idx % size
}
private final class PowerOfTwoFixedSizeBuffer(_size: Int) extends FixedSizeBuffer(_size) {
private final class PowerOfTwoFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) {
private val Mask = size - 1
override protected def incReadIdx(): Unit = readIdx = (readIdx + 1) & Mask
override protected def decWriteIdx(): Unit = writeIdx = (writeIdx - 1) & Mask
override protected def incWriteIdx(): Unit = writeIdx = (writeIdx + 1) & Mask
override protected def toOffset(idx: Int): Int = idx & Mask
}
}

View file

@ -1,170 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.collection.mutable
import scala.collection.immutable
import scala.collection.immutable.TreeSet
import scala.concurrent.Future
import scala.util.control.NonFatal
import akka.stream.ActorFlowMaterializerSettings
import akka.pattern.pipe
import scala.annotation.tailrec
import akka.actor.Props
import akka.actor.DeadLetterSuppression
import akka.stream.Supervision
/**
* INTERNAL API
*/
private[akka] object MapAsyncProcessorImpl {
def props(settings: ActorFlowMaterializerSettings, f: Any Future[Any]): Props =
Props(new MapAsyncProcessorImpl(settings, f))
object FutureElement {
implicit val ordering: Ordering[FutureElement] = new Ordering[FutureElement] {
def compare(a: FutureElement, b: FutureElement): Int = {
a.seqNo compare b.seqNo
}
}
}
final case class FutureElement(seqNo: Long, element: Any) extends DeadLetterSuppression
final case class FutureFailure(cause: Throwable) extends DeadLetterSuppression
final case class RecoveredError(in: Any, cause: Throwable)
}
/**
* INTERNAL API
*/
private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettings, f: Any Future[Any])
extends ActorProcessorImpl(_settings) {
import MapAsyncProcessorImpl._
// Execution context for pipeTo and friends
import context.dispatcher
val decider = settings.supervisionDecider
var submittedSeqNo = 0L
var doneSeqNo = 0L
def gap: Long = submittedSeqNo - doneSeqNo
// TODO performance improvement: explore Endre's proposal of using an array based ring buffer addressed by
// seqNo & Mask and explicitly storing a Gap object to denote missing pieces instead of the sorted set
// keep future results arriving too early in a buffer sorted by seqNo
var orderedBuffer = TreeSet.empty[FutureElement]
override def activeReceive = futureReceive.orElse[Any, Unit](super.activeReceive)
def drainBuffer(): List[Any] = {
// this is mutable for speed
var n = 0
var elements = mutable.ListBuffer.empty[Any]
var failure: Option[Throwable] = None
val iter = orderedBuffer.iterator
@tailrec def split(): Unit =
if (iter.hasNext) {
val next = iter.next()
val inOrder = next.seqNo == (doneSeqNo + 1)
// stop at first missing seqNo
if (inOrder) {
n += 1
doneSeqNo = next.seqNo
elements += next.element
split()
}
}
split()
orderedBuffer = orderedBuffer.drop(n)
elements.toList
}
def futureReceive: Receive = {
case fe @ FutureElement(seqNo, element)
if (seqNo == (doneSeqNo + 1)) {
// successful element for the next sequence number
// emit that element and all elements from the buffer that are in order
// until next missing sequence number
doneSeqNo = seqNo
// Futures are spawned based on downstream demand and therefore we know at this point
// that the elements can be emitted immediately to downstream
if (!primaryOutputs.demandAvailable) throw new IllegalStateException
if (orderedBuffer.isEmpty) {
emit(element)
} else {
emit(element)
drainBuffer() foreach emit
}
pump()
} else {
assert(seqNo > doneSeqNo, s"Unexpected sequence number [$seqNo], expected seqNo > $doneSeqNo")
// out of order, buffer until missing elements arrive
orderedBuffer += fe
}
case FutureFailure(cause)
fail(cause)
}
def emit(element: Any): Unit = element match {
case RecoveredError(in, err)
if (settings.debugLogging)
log.debug("Dropped element [{}] due to mapAsync future was completed with exception: {}", in, err.getMessage)
case elem
primaryOutputs.enqueueOutputElement(element)
}
override def onError(e: Throwable): Unit = {
// propagate upstream failure immediately
fail(e)
}
object RunningPhaseCondition extends TransferState {
def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) ||
(primaryInputs.inputsDepleted && gap == 0)
def isCompleted = primaryOutputs.isClosed
}
val running: TransferPhase = TransferPhase(RunningPhaseCondition) { ()
if (primaryInputs.inputsDepleted) {
nextPhase(completedPhase)
} else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) {
val elem = primaryInputs.dequeueInputElement()
try {
val future = f(elem)
submittedSeqNo += 1
val seqNo = submittedSeqNo
future.map { elem
ReactiveStreamsCompliance.requireNonNullElement(elem)
FutureElement(seqNo, elem)
}.recover {
case err: Throwable if decider(err) != Supervision.Stop
FutureElement(seqNo, RecoveredError(elem, err))
case err FutureFailure(err)
}.pipeTo(self)
} catch {
case NonFatal(err)
// f threw, handle failure immediately
decider(err) match {
case Supervision.Stop
fail(err)
case Supervision.Resume | Supervision.Restart
// submittedSeqNo was not increased, just continue
if (settings.debugLogging)
log.debug("Dropped element [{}] due to exception from mapAsync factory function: {}", elem, err.getMessage)
}
}
}
}
nextPhase(running)
}

View file

@ -1,106 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.concurrent.Future
import scala.util.control.NonFatal
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.ActorFlowMaterializerSettings
import akka.pattern.pipe
import akka.actor.Props
import akka.actor.DeadLetterSuppression
import akka.stream.Supervision
/**
* INTERNAL API
*/
private[akka] object MapAsyncUnorderedProcessorImpl {
def props(settings: ActorFlowMaterializerSettings, f: Any Future[Any]): Props =
Props(new MapAsyncUnorderedProcessorImpl(settings, f))
final case class FutureElement(element: Any) extends DeadLetterSuppression
final case class FutureFailure(in: Any, cause: Throwable) extends DeadLetterSuppression
}
/**
* INTERNAL API
*/
private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMaterializerSettings, f: Any Future[Any])
extends ActorProcessorImpl(_settings) {
import MapAsyncUnorderedProcessorImpl._
// Execution context for pipeTo and friends
import context.dispatcher
val decider = settings.supervisionDecider
var inProgressCount = 0
override def activeReceive = futureReceive.orElse[Any, Unit](super.activeReceive)
def futureReceive: Receive = {
case FutureElement(element)
// Futures are spawned based on downstream demand and therefore we know at this point
// that the element can be emitted immediately to downstream
if (!primaryOutputs.demandAvailable) throw new IllegalStateException
inProgressCount -= 1
primaryOutputs.enqueueOutputElement(element)
pump()
case FutureFailure(in, err)
decider(err) match {
case Supervision.Stop
fail(err)
case Supervision.Resume | Supervision.Restart
inProgressCount -= 1
if (settings.debugLogging)
log.debug("Dropped element [{}] due to mapAsyncUnordered future was completed with exception: {}",
in, err.getMessage)
pump()
}
}
override def onError(e: Throwable): Unit = {
// propagate upstream failure immediately
fail(e)
}
object RunningPhaseCondition extends TransferState {
def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandCount - inProgressCount > 0) ||
(primaryInputs.inputsDepleted && inProgressCount == 0)
def isCompleted = primaryOutputs.isClosed
}
val running: TransferPhase = TransferPhase(RunningPhaseCondition) { ()
if (primaryInputs.inputsDepleted) {
nextPhase(completedPhase)
} else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - inProgressCount > 0) {
val elem = primaryInputs.dequeueInputElement()
try {
val future = f(elem)
inProgressCount += 1
future.map { elem
ReactiveStreamsCompliance.requireNonNullElement(elem)
FutureElement(elem)
}.recover {
case err FutureFailure(elem, err)
}.pipeTo(self)
} catch {
case NonFatal(err)
// f threw, propagate failure immediately
decider(err) match {
case Supervision.Stop
fail(err)
case Supervision.Resume | Supervision.Restart
// inProgressCount was not increased, just continue
if (settings.debugLogging)
log.debug("Dropped element [{}] due to exception from mapAsyncUnordered factory function: {}", elem, err.getMessage)
}
}
}
}
nextPhase(running)
}

View file

@ -114,12 +114,12 @@ private[stream] object Stages {
override protected def newInstance: StageModule = this.copy()
}
final case class MapAsync(f: Any Future[Any], attributes: OperationAttributes = mapAsync) extends StageModule {
final case class MapAsync(parallelism: Int, f: Any Future[Any], attributes: OperationAttributes = mapAsync) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}
final case class MapAsyncUnordered(f: Any Future[Any], attributes: OperationAttributes = mapAsyncUnordered) extends StageModule {
final case class MapAsyncUnordered(parallelism: Int, f: Any Future[Any], attributes: OperationAttributes = mapAsyncUnordered) extends StageModule {
def withAttributes(attributes: OperationAttributes) = copy(attributes = attributes)
override protected def newInstance: StageModule = this.copy()
}

View file

@ -66,7 +66,7 @@ private[akka] class TimerTransformerProcessorsImpl(
override def inputsAvailable: Boolean = !queue.isEmpty
}
override def activeReceive = super.activeReceive orElse schedulerInputs.subreceive
override def activeReceive = super.activeReceive.orElse[Any, Unit](schedulerInputs.subreceive)
object RunningCondition extends TransferState {
def isReady = {

View file

@ -17,11 +17,12 @@ import akka.actor.Props
import akka.actor.ActorLogging
import akka.event.LoggingAdapter
import akka.actor.DeadLetterSuppression
import akka.stream.ActorFlowMaterializer
/**
* INTERNAL API
*/
private[akka] class BatchingActorInputBoundary(val size: Int)
private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
extends BoundaryStage {
require(size > 0, "buffer size cannot be zero")
@ -60,6 +61,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int)
}
private def enqueue(elem: Any): Unit = {
if (OneBoundedInterpreter.Debug) println(f" enq $elem%-19s $name")
if (!upstreamCompleted) {
if (inputBufferElements == size) throw new IllegalStateException("Input buffer overrun")
inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef]
@ -106,7 +108,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int)
if (!upstreamCompleted) {
upstreamCompleted = true
// onUpstreamFinish is not back-pressured, stages need to deal with this
if (inputBufferElements == 0) enter().finish()
if (inputBufferElements == 0) enterAndFinish()
}
private def onSubscribe(subscription: Subscription): Unit = {
@ -122,7 +124,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int)
private def onError(e: Throwable): Unit = {
upstreamCompleted = true
enter().fail(e)
enterAndFail(e)
}
private def waitingForUpstream: Actor.Receive = {
@ -136,7 +138,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int)
enqueue(element)
if (downstreamWaiting) {
downstreamWaiting = false
enter().push(dequeue())
enterAndPush(dequeue())
}
case OnComplete onComplete()
@ -194,7 +196,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef,
if (upstreamWaiting) {
burstRemaining = outputBurstLimit
upstreamWaiting = false
enter().pull()
enterAndPull()
}
val subreceive = new SubReceive(waitingExposedPublisher)
@ -264,12 +266,16 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef,
subscribePending(exposedPublisher.takePendingSubscribers())
case RequestMore(subscription, elements)
if (elements < 1) {
enter().finish()
enterAndFinish()
fail(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)
} else {
downstreamDemand += elements
if (downstreamDemand < 0)
downstreamDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
if (OneBoundedInterpreter.Debug) {
val s = s"$downstreamDemand (+$elements)"
println(f" dem $s%-19s ${actor.path}")
}
tryPutBallIn()
}
@ -280,7 +286,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef,
downstreamCompleted = true
subscriber = null
exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException))
enter().finish()
enterAndFinish()
}
}
@ -289,22 +295,34 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef,
* INTERNAL API
*/
private[akka] object ActorInterpreter {
def props(settings: ActorFlowMaterializerSettings, ops: Seq[Stage[_, _]]): Props =
Props(new ActorInterpreter(settings, ops))
def props(settings: ActorFlowMaterializerSettings, ops: Seq[Stage[_, _]], materializer: ActorFlowMaterializer): Props =
Props(new ActorInterpreter(settings, ops, materializer))
case class AsyncInput(op: AsyncStage[Any, Any, Any], ctx: AsyncContext[Any, Any], event: Any)
}
/**
* INTERNAL API
*/
private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings, val ops: Seq[Stage[_, _]])
private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings, val ops: Seq[Stage[_, _]], val materializer: ActorFlowMaterializer)
extends Actor with ActorLogging {
import ActorInterpreter._
private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize)
private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize, context.self.path.toString)
private val downstream = new ActorOutputBoundary(self, settings.debugLogging, log, settings.outputBurstLimit)
private val interpreter = new OneBoundedInterpreter(upstream +: ops :+ downstream)
private val interpreter =
new OneBoundedInterpreter(upstream +: ops :+ downstream,
(op, ctx, event) self ! AsyncInput(op, ctx, event),
materializer,
name = context.self.path.toString)
interpreter.init()
def receive: Receive = upstream.subreceive.orElse[Any, Unit](downstream.subreceive)
def receive: Receive = upstream.subreceive.orElse[Any, Unit](downstream.subreceive).orElse[Any, Unit] {
case AsyncInput(op, ctx, event)
ctx.enter()
op.onAsyncInput(event, ctx)
ctx.execute()
}
override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
super.aroundReceive(receive, msg)

View file

@ -3,42 +3,19 @@
*/
package akka.stream.impl.fusing
import scala.annotation.tailrec
import scala.annotation.{ tailrec, switch }
import scala.collection.breakOut
import scala.util.control.NonFatal
import akka.stream.stage._
import akka.stream.Supervision
import akka.stream.impl.ReactiveStreamsCompliance
import akka.stream.FlowMaterializer
// TODO:
// fix jumpback table with keep-going-on-complete ops (we might jump between otherwise isolated execution regions)
// implement grouped, buffer
// add recover
/**
* INTERNAL API
*
* `BoundaryStage` implementations are meant to communicate with the external world. These stages do not have most of the
* safety properties enforced and should be used carefully. One important ability of BoundaryStages that they can take
* off an execution signal by calling `ctx.exit()`. This is typically used immediately after an external signal has
* been produced (for example an actor message). BoundaryStages can also kickstart execution by calling `enter()` which
* returns a context they can use to inject signals into the interpreter. There is no checks in place to enforce that
* the number of signals taken out by exit() and the number of signals returned via enter() are the same -- using this
* stage type needs extra care from the implementer.
*
* BoundaryStages are the elements that make the interpreter *tick*, there is no other way to start the interpreter
* than using a BoundaryStage.
*/
private[akka] abstract class BoundaryStage extends AbstractStage[Any, Any, Directive, Directive, BoundaryContext] {
private[fusing] var bctx: BoundaryContext = _
def enter(): BoundaryContext = bctx
final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop
final override def restart(): BoundaryStage =
throw new UnsupportedOperationException("BoundaryStage doesn't support restart")
}
/**
* INTERNAL API
*/
@ -144,8 +121,15 @@ private[akka] object OneBoundedInterpreter {
* testing and finding callstack wasting bugs), in the other case the forked call is scheduled via a list -- i.e. instead
* of the stack the heap is used.
*/
private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit: Int = 100, val overflowToHeap: Boolean = true) {
private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]],
onAsyncInput: (AsyncStage[Any, Any, Any], AsyncContext[Any, Any], Any) Unit,
materializer: FlowMaterializer,
val forkLimit: Int = 100,
val overflowToHeap: Boolean = true,
val name: String = "") {
import OneBoundedInterpreter._
import AbstractStage._
type UntypedOp = AbstractStage[Any, Any, Directive, Directive, Context[Any]]
require(ops.nonEmpty, "OneBoundedInterpreter cannot be created without at least one Op")
@ -169,6 +153,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
private var elementInFlight: Any = _
// Points to the current point of execution inside the pipeline
private var activeOpIndex = -1
// Points to the last point of exit
private var lastExitedIndex = Downstream
// The current interpreter state that decides what happens at the next round
private var state: State = _
@ -189,12 +175,12 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
case _: PushPullStage[_, _] "pushpull"
case _: DetachedStage[_, _] "detached"
case _ "other"
}) + s"(${o.allowedToPush},${o.holding},${o.terminationPending})"
}) + f"(${o.bits}%04X)"
}
override def toString =
s"""|OneBoundedInterpreter
s"""|OneBoundedInterpreter($name)
| pipeline = ${pipeline map pipeName mkString ":"}
| activeOp=$activeOpIndex state=$state elem=$elementInFlight forks=$forkCount""".stripMargin
| lastExit=$lastExitedIndex activeOp=$activeOpIndex state=$state elem=$elementInFlight forks=$forkCount""".stripMargin
@inline private def currentOp: UntypedOp = pipeline(activeOpIndex)
@ -218,7 +204,11 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
}
private sealed trait State extends DetachedContext[Any] with BoundaryContext {
private sealed trait State extends DetachedContext[Any] with BoundaryContext with AsyncContext[Any, Any] {
def enter(): Unit = throw new IllegalStateException("cannot enter an ordinary Context")
final def execute(): Unit = OneBoundedInterpreter.this.execute()
final def progress(): Unit = {
advance()
if (inside) run()
@ -238,32 +228,79 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
*/
def run(): Unit
/**
* This method shall return the bit set representing the incoming ball (if any).
*/
def incomingBall: Int
protected def hasBits(b: Int): Boolean = ((currentOp.bits | incomingBall) & b) == b
protected def addBits(b: Int): Unit = currentOp.bits |= b
protected def removeBits(b: Int): Unit = currentOp.bits &= ~b
protected def mustHave(b: Int): Unit =
if (!hasBits(b)) {
def format(b: Int) =
(b & BothBalls: @switch) match {
case 0 "no balls"
case UpstreamBall "upstream ball"
case DownstreamBall "downstream ball"
case BothBalls "upstream & downstream balls"
}
throw new IllegalStateException(s"operation requires ${format(b)} while holding ${format(currentOp.bits)} and receiving ${format(incomingBall)}")
}
override def push(elem: Any): DownstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
if (currentOp.holding) throw new IllegalStateException("Cannot push while holding, only pushAndPull")
currentOp.allowedToPush = false
if (currentOp.isDetached) {
if (incomingBall == UpstreamBall)
throw new IllegalStateException("Cannot push during onPush, only pull, pushAndPull or holdUpstreamAndPush")
mustHave(DownstreamBall)
}
removeBits(PrecedingWasPull | DownstreamBall)
elementInFlight = elem
state = Pushing
null
}
override def pull(): UpstreamDirective = {
if (currentOp.holding) throw new IllegalStateException("Cannot pull while holding, only pushAndPull")
currentOp.allowedToPush = !currentOp.isInstanceOf[DetachedStage[_, _]]
if (currentOp.isDetached) {
if (incomingBall == DownstreamBall)
throw new IllegalStateException("Cannot pull during onPull, only push, pushAndPull or holdDownstreamAndPull")
mustHave(UpstreamBall)
}
removeBits(UpstreamBall)
addBits(PrecedingWasPull)
state = Pulling
null
}
override def getAsyncCallback(): AsyncCallback[Any] = {
val current = currentOp.asInstanceOf[AsyncStage[Any, Any, Any]]
val context = current.context // avoid concurrent access (to avoid @volatile)
new AsyncCallback[Any] {
override def invoke(evt: Any): Unit = onAsyncInput(current, context, evt)
}
}
override def ignore(): AsyncDirective = {
if (incomingBall != 0) throw new IllegalStateException("Can only ignore from onAsyncInput")
exit()
}
override def finish(): FreeDirective = {
fork(Completing)
state = Cancelling
null
}
def isFinishing: Boolean = currentOp.terminationPending
def isFinishing: Boolean = hasBits(TerminationPending)
override def pushAndFinish(elem: Any): DownstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
if (currentOp.isDetached) {
mustHave(DownstreamBall)
}
removeBits(DownstreamBall | PrecedingWasPull)
pipeline(activeOpIndex) = Finished.asInstanceOf[UntypedOp]
// This MUST be an unsafeFork because the execution of PushFinish MUST strictly come before the finish execution
// path. Other forks are not order dependent because they execute on isolated execution domains which cannot
@ -282,18 +319,48 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
null
}
override def hold(): FreeDirective = {
if (currentOp.holding) throw new IllegalStateException("Cannot hold while already holding")
currentOp.holding = true
override def holdUpstream(): UpstreamDirective = {
removeBits(PrecedingWasPull)
addBits(UpstreamBall)
exit()
}
override def isHolding: Boolean = currentOp.holding
override def holdUpstreamAndPush(elem: Any): UpstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
if (incomingBall != UpstreamBall)
throw new IllegalStateException("can only holdUpstreamAndPush from onPush")
mustHave(BothBalls)
removeBits(PrecedingWasPull | DownstreamBall)
addBits(UpstreamBall)
elementInFlight = elem
state = Pushing
null
}
override def isHoldingUpstream: Boolean = (currentOp.bits & UpstreamBall) != 0
override def holdDownstream(): DownstreamDirective = {
addBits(DownstreamBall)
exit()
}
override def holdDownstreamAndPull(): DownstreamDirective = {
if (incomingBall != DownstreamBall)
throw new IllegalStateException("can only holdDownstreamAndPull from onPull")
mustHave(BothBalls)
addBits(PrecedingWasPull | DownstreamBall)
removeBits(UpstreamBall)
state = Pulling
null
}
override def isHoldingDownstream: Boolean = (currentOp.bits & DownstreamBall) != 0
override def pushAndPull(elem: Any): FreeDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
if (!currentOp.holding) throw new IllegalStateException("Cannot pushAndPull without holding first")
currentOp.holding = false
mustHave(BothBalls)
addBits(PrecedingWasPull)
removeBits(BothBalls)
fork(Pushing, elem)
state = Pulling
null
@ -301,21 +368,24 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
override def absorbTermination(): TerminationDirective = {
updateJumpBacks(activeOpIndex)
currentOp.holding = false
removeBits(BothBalls)
finish()
}
override def exit(): FreeDirective = {
elementInFlight = null
lastExitedIndex = activeOpIndex
activeOpIndex = -1
null
}
override def materializer: FlowMaterializer = OneBoundedInterpreter.this.materializer
}
private final val Pushing: State = new State {
override def advance(): Unit = activeOpIndex += 1
override def run(): Unit = currentOp.onPush(elementInFlight, ctx = this)
override def incomingBall = UpstreamBall
override def toString = "Pushing"
}
@ -325,6 +395,10 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
override def pushAndFinish(elem: Any): DownstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
/*
* FIXME (RK) please someone explain why this works: the stage already
* terminated, but eventually it will see another onPull because nobody noticed.
*/
elementInFlight = elem
state = PushFinish
null
@ -335,6 +409,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
null
}
override def incomingBall = UpstreamBall
override def toString = "PushFinish"
}
@ -346,10 +422,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
override def run(): Unit = currentOp.onPull(ctx = this)
override def hold(): FreeDirective = {
currentOp.allowedToPush = true
super.hold()
}
override def incomingBall = DownstreamBall
override def toString = "Pulling"
}
@ -362,8 +435,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
override def run(): Unit = {
if (!currentOp.terminationPending) currentOp.onUpstreamFinish(ctx = this)
else exit()
if (hasBits(TerminationPending)) exit()
else currentOp.onUpstreamFinish(ctx = this)
}
override def finish(): FreeDirective = {
@ -372,14 +445,18 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
override def absorbTermination(): TerminationDirective = {
currentOp.terminationPending = true
currentOp.holding = false
addBits(TerminationPending)
removeBits(UpstreamBall)
updateJumpBacks(activeOpIndex)
if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling)
else exit()
if (hasBits(DownstreamBall) || (!currentOp.isDetached && hasBits(PrecedingWasPull))) {
removeBits(DownstreamBall)
currentOp.onPull(ctx = Pulling)
} else exit()
null
}
override def incomingBall = UpstreamBall
override def toString = "Completing"
}
@ -391,8 +468,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
def run(): Unit = {
if (!currentOp.terminationPending) currentOp.onDownstreamFinish(ctx = this)
else exit()
if (hasBits(TerminationPending)) exit()
else currentOp.onDownstreamFinish(ctx = this)
}
override def finish(): FreeDirective = {
@ -400,6 +477,8 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
null
}
override def incomingBall = DownstreamBall
override def toString = "Cancelling"
}
@ -413,13 +492,17 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
def run(): Unit = currentOp.onUpstreamFailure(cause, ctx = this)
override def absorbTermination(): TerminationDirective = {
currentOp.terminationPending = true
currentOp.holding = false
addBits(TerminationPending)
removeBits(UpstreamBall)
updateJumpBacks(activeOpIndex)
if (currentOp.allowedToPush) currentOp.onPull(ctx = Pulling)
else exit()
if (hasBits(DownstreamBall) || (!currentOp.isDetached && hasBits(PrecedingWasPull))) {
removeBits(DownstreamBall)
currentOp.onPull(ctx = Pulling)
} else exit()
null
}
override def incomingBall = UpstreamBall
}
private def inside: Boolean = activeOpIndex > -1 && activeOpIndex < pipeline.length
@ -437,7 +520,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
case Failing(e) padding + s"---X ${e.getMessage} => ${decide(e)}"
case other padding + s"---? $state"
}
println(icon)
println(f"$icon%-24s $name")
}
@tailrec private def execute(): Unit = {
@ -453,16 +536,22 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
case Supervision.Resume
// reset, purpose of lastOpFailing is to avoid infinite loops when fail fails -- double fault
lastOpFailing = -1
state.pull()
afterRecovery()
case Supervision.Restart
// reset, purpose of lastOpFailing is to avoid infinite loops when fail fails -- double fault
lastOpFailing = -1
pipeline(activeOpIndex) = pipeline(activeOpIndex).restart().asInstanceOf[UntypedOp]
state.pull()
afterRecovery()
}
}
}
// FIXME push this into AbstractStage so it can be customized
def afterRecovery(): Unit = state match {
case _: EntryState // no ball to be juggled with
case _ state.pull()
}
// Execute all delayed forks that were put on the heap if the fork limit has been reached
if (overflowStack.nonEmpty) {
val memo = overflowStack.head
@ -475,7 +564,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
}
def decide(e: Throwable): Supervision.Directive =
if (state == Pulling || state.isHolding) Supervision.Stop
if (state == Pulling || state == Cancelling) Supervision.Stop
else currentOp.decide(e)
/**
@ -513,67 +602,39 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
def isFinished: Boolean = pipeline(Upstream) == Finished && pipeline(Downstream) == Finished
private class EntryState(name: String, position: Int) extends State {
val entryPoint = position
final override def enter(): Unit = {
activeOpIndex = entryPoint
if (Debug) {
val s = " " * entryPoint + "ENTR"
println(f"$s%-24s ${OneBoundedInterpreter.this.name}")
}
}
override def run(): Unit = ()
override def advance(): Unit = ()
override def incomingBall = 0
override def toString = s"$name($entryPoint)"
}
/**
* This method injects a Context to each of the BoundaryStages. This will be the context returned by enter().
* This method injects a Context to each of the BoundaryStages and AsyncStages. This will be the context returned by enter().
*/
private def initBoundaries(): Unit = {
var op = 0
while (op < pipeline.length) {
// FIXME try to change this to a pattern match `case boundary: BoundaryStage`
// but that doesn't work with current Context types
if (pipeline(op).isInstanceOf[BoundaryStage]) {
pipeline(op).asInstanceOf[BoundaryStage].bctx = new State {
val entryPoint = op
override def run(): Unit = ()
override def advance(): Unit = ()
override def push(elem: Any): DownstreamDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
activeOpIndex = entryPoint
super.push(elem)
execute()
null
}
override def pull(): UpstreamDirective = {
activeOpIndex = entryPoint
super.pull()
execute()
null
}
override def finish(): FreeDirective = {
activeOpIndex = entryPoint
super.finish()
execute()
null
}
override def fail(cause: Throwable): FreeDirective = {
activeOpIndex = entryPoint
super.fail(cause)
execute()
null
}
override def hold(): FreeDirective = {
activeOpIndex = entryPoint
super.hold()
execute()
null
}
override def pushAndPull(elem: Any): FreeDirective = {
ReactiveStreamsCompliance.requireNonNullElement(elem)
activeOpIndex = entryPoint
super.pushAndPull(elem)
execute()
null
}
override def toString = s"boundary($op)"
}
(pipeline(op): Any) match {
case b: BoundaryStage
b.context = new EntryState("boundary", op)
case a: AsyncStage[Any, Any, Any] @unchecked
a.context = new EntryState("async", op)
activeOpIndex = op
a.initAsyncInput(a.context)
case _
}
op += 1
}
@ -588,7 +649,7 @@ private[akka] class OneBoundedInterpreter(ops: Seq[Stage[_, _]], val forkLimit:
private def runDetached(): Unit = {
var op = pipeline.length - 1
while (op >= 0) {
if (pipeline(op).isInstanceOf[DetachedStage[_, _]]) {
if (pipeline(op).isDetached) {
activeOpIndex = op
state = Pulling
execute()

View file

@ -4,6 +4,7 @@
package akka.stream.impl.fusing
import akka.stream.stage._
import akka.stream._
/**
* INTERNAL API
@ -12,10 +13,10 @@ private[akka] object IteratorInterpreter {
final case class IteratorUpstream[T](input: Iterator[T]) extends PushPullStage[T, T] {
private var hasNext = input.hasNext
override def onPush(elem: T, ctx: Context[T]): Directive =
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
throw new UnsupportedOperationException("IteratorUpstream operates as a source, it cannot be pushed")
override def onPull(ctx: Context[T]): Directive = {
override def onPull(ctx: Context[T]): SyncDirective = {
if (!hasNext) ctx.finish()
else {
val elem = input.next()
@ -58,7 +59,7 @@ private[akka] object IteratorInterpreter {
private def pullIfNeeded(): Unit = {
if (needsPull) {
enter().pull() // will eventually result in a finish, or an onPush which exits
enterAndPull() // will eventually result in a finish, or an onPush which exits
}
}
@ -94,7 +95,9 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S
private val upstream = IteratorUpstream(input)
private val downstream = IteratorDownstream[O]()
private val interpreter = new OneBoundedInterpreter(upstream +: ops.asInstanceOf[Seq[Stage[_, _]]] :+ downstream)
private val interpreter = new OneBoundedInterpreter(upstream +: ops.asInstanceOf[Seq[Stage[_, _]]] :+ downstream,
(op, ctx, evt) throw new UnsupportedOperationException("IteratorInterpreter is fully synchronous"),
NoFlowMaterializer)
interpreter.init()
def iterator: Iterator[O] = downstream

View file

@ -4,16 +4,21 @@
package akka.stream.impl.fusing
import scala.collection.immutable
import akka.stream.OverflowStrategy
import akka.stream.impl.FixedSizeBuffer
import akka.stream.stage._
import akka.stream._
import akka.stream.Supervision
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Try, Success, Failure }
import scala.annotation.tailrec
import scala.util.control.NonFatal
import akka.stream.impl.ReactiveStreamsCompliance
/**
* INTERNAL API
*/
private[akka] final case class Map[In, Out](f: In Out, decider: Supervision.Decider) extends PushStage[In, Out] {
override def onPush(elem: In, ctx: Context[Out]): Directive = ctx.push(f(elem))
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem))
override def decide(t: Throwable): Supervision.Directive = decider(t)
}
@ -22,7 +27,7 @@ private[akka] final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.
* INTERNAL API
*/
private[akka] final case class Filter[T](p: T Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): Directive =
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
@ -38,7 +43,7 @@ private[akka] final object Collect {
private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf: PartialFunction[In, Out]) extends PushStage[In, Out] {
import Collect.NotApplied
override def onPush(elem: In, ctx: Context[Out]): Directive =
override def onPush(elem: In, ctx: Context[Out]): SyncDirective =
pf.applyOrElse(elem, NotApplied) match {
case NotApplied ctx.pull()
case result: Out @unchecked ctx.push(result)
@ -53,13 +58,13 @@ private[akka] final case class Collect[In, Out](decider: Supervision.Decider)(pf
private[akka] final case class MapConcat[In, Out](f: In immutable.Seq[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] {
private var currentIterator: Iterator[Out] = Iterator.empty
override def onPush(elem: In, ctx: Context[Out]): Directive = {
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
currentIterator = f(elem).iterator
if (currentIterator.isEmpty) ctx.pull()
else ctx.push(currentIterator.next())
}
override def onPull(ctx: Context[Out]): Directive =
override def onPull(ctx: Context[Out]): SyncDirective =
if (currentIterator.hasNext) ctx.push(currentIterator.next())
else if (ctx.isFinishing) ctx.finish()
else ctx.pull()
@ -78,7 +83,7 @@ private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out],
private[akka] final case class Take[T](count: Long) extends PushStage[T, T] {
private var left: Long = count
override def onPush(elem: T, ctx: Context[T]): Directive = {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
left -= 1
if (left > 0) ctx.push(elem)
else if (left == 0) ctx.pushAndFinish(elem)
@ -91,7 +96,7 @@ private[akka] final case class Take[T](count: Long) extends PushStage[T, T] {
*/
private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] {
private var left: Long = count
override def onPush(elem: T, ctx: Context[T]): Directive =
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
if (left > 0) {
left -= 1
ctx.pull()
@ -104,13 +109,13 @@ private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] {
private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out, decider: Supervision.Decider) extends PushPullStage[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctx: Context[Out]): Directive = {
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
val old = aggregator
aggregator = f(old, elem)
ctx.push(old)
}
override def onPull(ctx: Context[Out]): Directive =
override def onPull(ctx: Context[Out]): SyncDirective =
if (ctx.isFinishing) ctx.pushAndFinish(aggregator)
else ctx.pull()
@ -127,12 +132,12 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out, de
private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out, decider: Supervision.Decider) extends PushPullStage[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctx: Context[Out]): Directive = {
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
aggregator = f(aggregator, elem)
ctx.pull()
}
override def onPull(ctx: Context[Out]): Directive =
override def onPull(ctx: Context[Out]): SyncDirective =
if (ctx.isFinishing) ctx.pushAndFinish(aggregator)
else ctx.pull()
@ -154,7 +159,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
}
private var left = n
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): Directive = {
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = {
buf += elem
left -= 1
if (left == 0) {
@ -165,7 +170,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
} else ctx.pull()
}
override def onPull(ctx: Context[immutable.Seq[T]]): Directive =
override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective =
if (ctx.isFinishing) {
val elem = buf.result()
buf.clear() //FIXME null out the reference to the `buf`?
@ -184,10 +189,10 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
import OverflowStrategy._
private val buffer = FixedSizeBuffer(size)
private val buffer = FixedSizeBuffer[T](size)
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective =
if (ctx.isHolding) ctx.pushAndPull(elem)
if (ctx.isHoldingDownstream) ctx.pushAndPull(elem)
else enqueueAction(ctx, elem)
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
@ -195,8 +200,8 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
val elem = buffer.dequeue().asInstanceOf[T]
if (buffer.isEmpty) ctx.pushAndFinish(elem)
else ctx.push(elem)
} else if (ctx.isHolding) ctx.pushAndPull(buffer.dequeue().asInstanceOf[T])
else if (buffer.isEmpty) ctx.hold()
} else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue().asInstanceOf[T])
else if (buffer.isEmpty) ctx.holdDownstream()
else ctx.push(buffer.dequeue().asInstanceOf[T])
}
@ -223,7 +228,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
}
case Backpressure { (ctx, elem)
buffer.enqueue(elem)
if (buffer.isFull) ctx.hold()
if (buffer.isFull) ctx.holdUpstream()
else ctx.pull()
}
case Fail { (ctx, elem)
@ -241,8 +246,8 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
* INTERNAL API
*/
private[akka] final case class Completed[T]() extends PushPullStage[T, T] {
override def onPush(elem: T, ctx: Context[T]): Directive = ctx.finish()
override def onPull(ctx: Context[T]): Directive = ctx.finish()
override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.finish()
override def onPull(ctx: Context[T]): SyncDirective = ctx.finish()
}
/**
@ -257,7 +262,7 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
if (agg == null) seed(elem)
else aggregate(agg.asInstanceOf[Out], elem)
if (!ctx.isHolding) ctx.pull()
if (!ctx.isHoldingDownstream) ctx.pull()
else {
val result = agg.asInstanceOf[Out]
agg = null
@ -273,7 +278,7 @@ private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (O
agg = null
ctx.pushAndFinish(result)
}
} else if (agg == null) ctx.hold()
} else if (agg == null) ctx.holdDownstream()
else {
val result = agg.asInstanceOf[Out]
if (result == null) throw new NullPointerException
@ -301,24 +306,24 @@ private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapol
s = seed(elem)
started = true
expanded = false
if (ctx.isHolding) {
if (ctx.isHoldingDownstream) {
val (emit, newS) = extrapolate(s)
s = newS
expanded = true
ctx.pushAndPull(emit)
} else ctx.hold()
} else ctx.holdUpstream()
}
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
if (ctx.isFinishing) {
if (!started) ctx.finish()
else ctx.pushAndFinish(extrapolate(s)._1)
} else if (!started) ctx.hold()
} else if (!started) ctx.holdDownstream()
else {
val (emit, newS) = extrapolate(s)
s = newS
expanded = true
if (ctx.isHolding) ctx.pushAndPull(emit)
if (ctx.isHoldingUpstream) ctx.pushAndPull(emit)
else ctx.push(emit)
}
@ -334,3 +339,155 @@ private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapol
final override def restart(): Expand[In, Out, Seed] =
throw new UnsupportedOperationException("Expand doesn't support restart")
}
/**
* INTERNAL API
*/
private[akka] object MapAsync {
val NotYetThere = Failure(new Exception)
}
/**
* INTERNAL API
*/
private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In Future[Out], decider: Supervision.Decider)
extends AsyncStage[In, Out, (Int, Try[Out])] {
import MapAsync._
type Notification = (Int, Try[Out])
private var callback: AsyncCallback[Notification] = _
private val elemsInFlight = FixedSizeBuffer[Try[Out]](parallelism)
override def initAsyncInput(ctx: AsyncContext[Out, Notification]): Unit = {
callback = ctx.getAsyncCallback()
}
override def decide(ex: Throwable) = decider(ex)
override def onPush(elem: In, ctx: AsyncContext[Out, Notification]) = {
val future = f(elem)
val idx = elemsInFlight.enqueue(NotYetThere)
future.onComplete(t callback.invoke((idx, t)))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
if (elemsInFlight.isFull) ctx.holdUpstream()
else ctx.pull()
}
override def onPull(ctx: AsyncContext[Out, (Int, Try[Out])]) = {
@tailrec def rec(hasFreedUpSpace: Boolean): DownstreamDirective =
if (elemsInFlight.isEmpty && ctx.isFinishing) ctx.finish()
else if (elemsInFlight.isEmpty || elemsInFlight.peek == NotYetThere) {
if (hasFreedUpSpace && ctx.isHoldingUpstream) ctx.holdDownstreamAndPull()
else ctx.holdDownstream()
} else elemsInFlight.dequeue() match {
case Failure(ex) rec(true)
case Success(elem)
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
else ctx.push(elem)
}
rec(false)
}
override def onAsyncInput(input: (Int, Try[Out]), ctx: AsyncContext[Out, Notification]) = {
@tailrec def rec(): Directive =
if (elemsInFlight.isEmpty && ctx.isFinishing) ctx.finish()
else if (elemsInFlight.isEmpty || elemsInFlight.peek == NotYetThere) ctx.ignore()
else elemsInFlight.dequeue() match {
case Failure(ex) rec()
case Success(elem)
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
else ctx.push(elem)
}
input match {
case (idx, f @ Failure(ex))
if (decider(ex) != Supervision.Stop) {
elemsInFlight.put(idx, f)
if (ctx.isHoldingDownstream) rec()
else ctx.ignore()
} else ctx.fail(ex)
case (idx, s: Success[_])
val ex = try {
ReactiveStreamsCompliance.requireNonNullElement(s.value)
elemsInFlight.put(idx, s)
null: Exception
} catch {
case NonFatal(ex)
if (decider(ex) != Supervision.Stop) {
elemsInFlight.put(idx, Failure(ex))
null: Exception
} else ex
}
if (ex != null) ctx.fail(ex)
else if (ctx.isHoldingDownstream) rec()
else ctx.ignore()
}
}
override def onUpstreamFinish(ctx: AsyncContext[Out, Notification]) =
if (ctx.isHoldingUpstream || !elemsInFlight.isEmpty) ctx.absorbTermination()
else ctx.finish()
}
/**
* INTERNAL API
*/
private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In Future[Out], decider: Supervision.Decider)
extends AsyncStage[In, Out, Try[Out]] {
private var callback: AsyncCallback[Try[Out]] = _
private var inFlight = 0
private val buffer = FixedSizeBuffer[Out](parallelism)
private def todo = inFlight + buffer.used
override def initAsyncInput(ctx: AsyncContext[Out, Try[Out]]): Unit = {
callback = ctx.getAsyncCallback()
}
override def decide(ex: Throwable) = decider(ex)
override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]) = {
val future = f(elem)
inFlight += 1
future.onComplete(callback.invoke)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
if (todo == parallelism) ctx.holdUpstream()
else ctx.pull()
}
override def onPull(ctx: AsyncContext[Out, Try[Out]]) =
if (buffer.isEmpty) {
if (ctx.isFinishing && inFlight == 0) ctx.finish() else ctx.holdDownstream()
} else {
val elem = buffer.dequeue()
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
else ctx.push(elem)
}
override def onAsyncInput(input: Try[Out], ctx: AsyncContext[Out, Try[Out]]) = {
def ignoreOrFail(ex: Throwable) =
if (decider(ex) == Supervision.Stop) ctx.fail(ex)
else if (ctx.isHoldingUpstream) ctx.pull()
else ctx.ignore()
inFlight -= 1
input match {
case Failure(ex) ignoreOrFail(ex)
case Success(elem)
if (elem == null) {
val ex = ReactiveStreamsCompliance.elementMustNotBeNullException
ignoreOrFail(ex)
} else if (ctx.isHoldingDownstream) {
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
else ctx.push(elem)
} else {
buffer.enqueue(elem)
ctx.ignore()
}
}
}
override def onUpstreamFinish(ctx: AsyncContext[Out, Try[Out]]) =
if (todo > 0) ctx.absorbTermination()
else ctx.finish()
}

View file

@ -175,8 +175,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsync(f.apply))
def mapAsync[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsync(parallelism, f.apply))
/**
* Transform this stream by applying the given function to each of the elements
@ -196,8 +196,8 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsyncUnordered(f.apply))
def mapAsyncUnordered[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.mapAsyncUnordered(parallelism, f.apply))
/**
* Only pass on those elements that satisfy the given predicate.

View file

@ -278,8 +278,8 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](f: japi.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsync(f.apply))
def mapAsync[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsync(parallelism, f.apply))
/**
* Transform this stream by applying the given function to each of the elements
@ -291,8 +291,8 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](f: japi.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsyncUnordered(f.apply))
def mapAsyncUnordered[T](parallelism: Int, f: japi.Function[Out, Future[T]]): javadsl.Source[T, Mat] =
new Source(delegate.mapAsyncUnordered(parallelism, f.apply))
/**
* Only pass on those elements that satisfy the given predicate.

View file

@ -351,8 +351,9 @@ trait FlowOps[+Out, +Mat] {
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `Future` and the
* value of that future will be emitted downstreams. As many futures as requested elements by
* downstream may run in parallel and may complete in any order, but the elements that
* value of that future will be emitted downstream. The number of Futures
* that shall run in parallel is given as the first argument to ``mapAsync``.
* These Futures may complete in any order, but the elements that
* are emitted downstream are in the same order as received from upstream.
*
* If the group by function `f` throws an exception or if the `Future` is completed
@ -365,8 +366,8 @@ trait FlowOps[+Out, +Mat] {
*
* @see [[#mapAsyncUnordered]]
*/
def mapAsync[T](f: Out Future[T]): Repr[T, Mat] =
andThen(MapAsync(f.asInstanceOf[Any Future[Any]]))
def mapAsync[T](parallelism: Int, f: Out Future[T]): Repr[T, Mat] =
andThen(MapAsync(parallelism, f.asInstanceOf[Any Future[Any]]))
/**
* Transform this stream by applying the given function to each of the elements
@ -386,8 +387,8 @@ trait FlowOps[+Out, +Mat] {
*
* @see [[#mapAsync]]
*/
def mapAsyncUnordered[T](f: Out Future[T]): Repr[T, Mat] =
andThen(MapAsyncUnordered(f.asInstanceOf[Any Future[Any]]))
def mapAsyncUnordered[T](parallelism: Int, f: Out Future[T]): Repr[T, Mat] =
andThen(MapAsyncUnordered(parallelism, f.asInstanceOf[Any Future[Any]]))
/**
* Only pass on those elements that satisfy the given predicate.

View file

@ -17,6 +17,7 @@ import akka.stream.FlowMaterializer
import akka.stream.impl.StreamLayout.Module
import scala.util.control.NonFatal
import akka.stream.Supervision
import akka.stream.stage.SyncDirective
/**
* A `Sink` is a set of stream processing steps that has one open input and an attached output.
@ -113,7 +114,7 @@ object Sink extends SinkApply {
val promise = Promise[Unit]()
val stage = new PushStage[T, Unit] {
override def onPush(elem: T, ctx: Context[Unit]): Directive = {
override def onPush(elem: T, ctx: Context[Unit]): SyncDirective = {
f(elem)
ctx.pull()
}
@ -154,7 +155,7 @@ object Sink extends SinkApply {
val stage = new PushStage[T, U] {
private var aggregator = zero
override def onPush(elem: T, ctx: Context[U]): Directive = {
override def onPush(elem: T, ctx: Context[U]): SyncDirective = {
aggregator = f(aggregator, elem)
ctx.pull()
}
@ -191,7 +192,7 @@ object Sink extends SinkApply {
def newOnCompleteStage(): PushStage[T, Unit] = {
new PushStage[T, Unit] {
override def onPush(elem: T, ctx: Context[Unit]): Directive = ctx.pull()
override def onPush(elem: T, ctx: Context[Unit]): SyncDirective = ctx.pull()
override def onUpstreamFailure(cause: Throwable, ctx: Context[Unit]): TerminationDirective = {
callback(Failure(cause))
ctx.fail(cause)

View file

@ -8,7 +8,6 @@ import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
import akka.stream.{ SourceShape, Inlet, Outlet }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullStage }
import scala.annotation.unchecked.uncheckedVariance
import scala.language.higherKinds
import akka.actor.Props
@ -23,6 +22,7 @@ import akka.actor.Cancellable
import akka.actor.ActorRef
import scala.concurrent.Promise
import org.reactivestreams.Subscriber
import akka.stream.stage.SyncDirective
/**
* A `Source` is a set of stream processing steps that has one open output. It can comprise
@ -198,6 +198,7 @@ object Source extends SourceApply {
* A graph with the shape of a source logically is a source, this method makes
* it so also in type.
*/
// TODO optimize if no wrapping needed
def wrap[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = new Source(g.module)
/**
@ -218,7 +219,7 @@ object Source extends SourceApply {
def initIterator(): Unit = if (iterator eq null) iterator = iterable.iterator
// Upstream is guaranteed to be empty
override def onPush(elem: Nothing, ctx: Context[T]): Directive =
override def onPush(elem: Nothing, ctx: Context[T]): SyncDirective =
throw new UnsupportedOperationException("The IterableSource stage cannot be pushed")
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
@ -227,7 +228,7 @@ object Source extends SourceApply {
else ctx.finish()
}
override def onPull(ctx: Context[T]): Directive = {
override def onPull(ctx: Context[T]): SyncDirective = {
if (!ctx.isFinishing) {
initIterator()
ctx.pull()

View file

@ -4,6 +4,7 @@
package akka.stream.stage
import akka.stream.Supervision
import akka.stream.FlowMaterializer
/**
* General interface for stream transformation.
@ -27,10 +28,68 @@ import akka.stream.Supervision
*/
sealed trait Stage[-In, Out]
private[stream] abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out]] extends Stage[In, Out] {
private[stream] var holding = false
private[stream] var allowedToPush = false
private[stream] var terminationPending = false
/**
* INTERNAL API
*/
private[stream] object AbstractStage {
final val UpstreamBall = 1
final val DownstreamBall = 2
final val BothBalls = UpstreamBall | DownstreamBall
final val PrecedingWasPull = 0x4000
final val TerminationPending = 0x8000
}
abstract class AbstractStage[-In, Out, PushD <: Directive, PullD <: Directive, Ctx <: Context[Out]] extends Stage[In, Out] {
/**
* INTERNAL API
*/
private[stream] var bits = 0
/**
* INTERNAL API
*/
private[stream] var context: Ctx = _
/**
* INTERNAL API
*/
private[stream] def isDetached: Boolean = false
/**
* INTERNAL API
*/
private[stream] def enterAndPush(elem: Out): Unit = {
context.enter()
context.push(elem)
context.execute()
}
/**
* INTERNAL API
*/
private[stream] def enterAndPull(): Unit = {
context.enter()
context.pull()
context.execute()
}
/**
* INTERNAL API
*/
private[stream] def enterAndFinish(): Unit = {
context.enter()
context.finish()
context.execute()
}
/**
* INTERNAL API
*/
private[stream] def enterAndFail(e: Throwable): Unit = {
context.enter()
context.fail(e)
context.execute()
}
/**
* `onPush` is called when an element from upstream is available and there is demand from downstream, i.e.
@ -154,7 +213,7 @@ private[stream] abstract class AbstractStage[-In, Out, PushD <: Directive, PullD
* @see [[StatefulStage]]
* @see [[PushStage]]
*/
abstract class PushPullStage[In, Out] extends AbstractStage[In, Out, Directive, Directive, Context[Out]]
abstract class PushPullStage[In, Out] extends AbstractStage[In, Out, SyncDirective, SyncDirective, Context[Out]]
/**
* `PushStage` is a [[PushPullStage]] that always perform transitive pull by calling `ctx.pull` from `onPull`.
@ -163,7 +222,7 @@ abstract class PushStage[In, Out] extends PushPullStage[In, Out] {
/**
* Always pulls from upstream.
*/
final override def onPull(ctx: Context[Out]): Directive = ctx.pull()
final override def onPull(ctx: Context[Out]): SyncDirective = ctx.pull()
}
/**
@ -188,7 +247,9 @@ abstract class PushStage[In, Out] extends PushPullStage[In, Out] {
*
* @see [[PushPullStage]]
*/
abstract class DetachedStage[In, Out] extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]] {
abstract class DetachedStage[In, Out]
extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, DetachedContext[Out]] {
private[stream] override def isDetached = true
/**
* If an exception is thrown from [[#onPush]] this method is invoked to decide how
@ -203,13 +264,40 @@ abstract class DetachedStage[In, Out] extends AbstractStage[In, Out, UpstreamDir
override def decide(t: Throwable): Supervision.Directive = super.decide(t)
}
/**
* This is a variant of [[DetachedStage]] that can receive asynchronous input
* from external sources, for example timers or Future results. In order to
* do this, obtain an [[AsyncCallback]] from the [[AsyncContext]] and attach
* it to the asynchronous event. When the event fires an asynchronous notification
* will be dispatched that eventually will lead to `onAsyncInput` being invoked
* with the provided data item.
*/
abstract class AsyncStage[In, Out, Ext]
extends AbstractStage[In, Out, UpstreamDirective, DownstreamDirective, AsyncContext[Out, Ext]] {
private[stream] override def isDetached = true
/**
* Initial input for the asynchronous side of this Stage. This can be overridden
* to set initial asynchronous requests in motion or schedule asynchronous
* events.
*/
def initAsyncInput(ctx: AsyncContext[Out, Ext]): Unit = ()
/**
* Implement this method to define the action to be taken in response to an
* asynchronous notification that was previously registered using
* [[AsyncContext#getAsyncCallback]].
*/
def onAsyncInput(event: Ext, ctx: AsyncContext[Out, Ext]): Directive
}
/**
* The behavior of [[StatefulStage]] is defined by these two methods, which
* has the same sematics as corresponding methods in [[PushPullStage]].
*/
abstract class StageState[In, Out] {
def onPush(elem: In, ctx: Context[Out]): Directive
def onPull(ctx: Context[Out]): Directive = ctx.pull()
def onPush(elem: In, ctx: Context[Out]): SyncDirective
def onPull(ctx: Context[Out]): SyncDirective = ctx.pull()
}
/**
@ -268,11 +356,11 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
/**
* Invokes current state.
*/
final override def onPush(elem: In, ctx: Context[Out]): Directive = _current.onPush(elem, ctx)
final override def onPush(elem: In, ctx: Context[Out]): SyncDirective = _current.onPush(elem, ctx)
/**
* Invokes current state.
*/
final override def onPull(ctx: Context[Out]): Directive = _current.onPull(ctx)
final override def onPull(ctx: Context[Out]): SyncDirective = _current.onPull(ctx)
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective =
if (emitting) ctx.absorbTermination()
@ -282,13 +370,13 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstreams.
*/
final def emit(iter: Iterator[Out], ctx: Context[Out]): Directive = emit(iter, ctx, _current)
final def emit(iter: Iterator[Out], ctx: Context[Out]): SyncDirective = emit(iter, ctx, _current)
/**
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstreams.
*/
final def emit(iter: java.util.Iterator[Out], ctx: Context[Out]): Directive = {
final def emit(iter: java.util.Iterator[Out], ctx: Context[Out]): SyncDirective = {
import scala.collection.JavaConverters._
emit(iter.asScala, ctx)
}
@ -297,7 +385,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstreams and after that change behavior.
*/
final def emit(iter: Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): Directive = {
final def emit(iter: Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): SyncDirective = {
if (emitting) throw new IllegalStateException("already in emitting state")
if (iter.isEmpty) {
become(nextState)
@ -317,7 +405,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstreams and after that change behavior.
*/
final def emit(iter: java.util.Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): Directive = {
final def emit(iter: java.util.Iterator[Out], ctx: Context[Out], nextState: StageState[In, Out]): SyncDirective = {
import scala.collection.JavaConverters._
emit(iter.asScala, ctx, nextState)
}
@ -326,7 +414,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
* Scala API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstreams and after that finish (complete downstreams, cancel upstreams).
*/
final def emitAndFinish(iter: Iterator[Out], ctx: Context[Out]): Directive = {
final def emitAndFinish(iter: Iterator[Out], ctx: Context[Out]): SyncDirective = {
if (emitting) throw new IllegalStateException("already in emitting state")
if (iter.isEmpty)
ctx.finish()
@ -345,7 +433,7 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
* Java API: Can be used from [[StageState#onPush]] or [[StageState#onPull]] to push more than one
* element downstreams and after that finish (complete downstreams, cancel upstreams).
*/
final def emitAndFinish(iter: java.util.Iterator[Out], ctx: Context[Out]): Directive = {
final def emitAndFinish(iter: java.util.Iterator[Out], ctx: Context[Out]): SyncDirective = {
import scala.collection.JavaConverters._
emitAndFinish(iter.asScala, ctx)
}
@ -399,20 +487,53 @@ abstract class StatefulStage[In, Out] extends PushPullStage[In, Out] {
}
/**
* INTERNAL API
*
* `BoundaryStage` implementations are meant to communicate with the external world. These stages do not have most of the
* safety properties enforced and should be used carefully. One important ability of BoundaryStages that they can take
* off an execution signal by calling `ctx.exit()`. This is typically used immediately after an external signal has
* been produced (for example an actor message). BoundaryStages can also kickstart execution by calling `enter()` which
* returns a context they can use to inject signals into the interpreter. There is no checks in place to enforce that
* the number of signals taken out by exit() and the number of signals returned via enter() are the same -- using this
* stage type needs extra care from the implementer.
*
* BoundaryStages are the elements that make the interpreter *tick*, there is no other way to start the interpreter
* than using a BoundaryStage.
*/
private[akka] abstract class BoundaryStage extends AbstractStage[Any, Any, Directive, Directive, BoundaryContext] {
final override def decide(t: Throwable): Supervision.Directive = Supervision.Stop
final override def restart(): BoundaryStage =
throw new UnsupportedOperationException("BoundaryStage doesn't support restart")
}
/**
* Return type from [[Context]] methods.
*/
sealed trait Directive
sealed trait UpstreamDirective extends Directive
sealed trait DownstreamDirective extends Directive
sealed trait TerminationDirective extends Directive
sealed trait AsyncDirective extends Directive
sealed trait SyncDirective extends Directive
sealed trait UpstreamDirective extends SyncDirective
sealed trait DownstreamDirective extends SyncDirective
sealed trait TerminationDirective extends SyncDirective
// never instantiated
sealed abstract class FreeDirective private () extends UpstreamDirective with DownstreamDirective with TerminationDirective
sealed abstract class FreeDirective private () extends UpstreamDirective with DownstreamDirective with TerminationDirective with AsyncDirective
/**
* Passed to the callback methods of [[PushPullStage]] and [[StatefulStage]].
*/
sealed trait Context[Out] {
/**
* INTERNAL API
*/
private[stream] def enter(): Unit
/**
* INTERNAL API
*/
private[stream] def execute(): Unit
/**
* Push one element to downstreams.
*/
@ -444,6 +565,12 @@ sealed trait Context[Out] {
* This returns `true` after [[#absorbTermination]] has been used.
*/
def isFinishing: Boolean
/**
* Returns the FlowMaterializer that was used to materialize this [[Stage]].
* It can be used to materialize sub-flows.
*/
def materializer: FlowMaterializer
}
/**
@ -455,18 +582,61 @@ sealed trait Context[Out] {
* events making the balance right again: 1 hold + 1 external event = 2 external event
*/
trait DetachedContext[Out] extends Context[Out] {
def hold(): FreeDirective
def holdUpstream(): UpstreamDirective
def holdUpstreamAndPush(elem: Out): UpstreamDirective
def holdDownstream(): DownstreamDirective
def holdDownstreamAndPull(): DownstreamDirective
/**
* This returns `true` when [[#hold]] has been used
* and it is reset to `false` after [[#pushAndPull]].
*/
def isHolding: Boolean
def isHoldingBoth: Boolean = isHoldingUpstream && isHoldingDownstream
def isHoldingUpstream: Boolean
def isHoldingDownstream: Boolean
def pushAndPull(elem: Out): FreeDirective
}
/**
* An asynchronous callback holder that is attached to an [[AsyncContext]].
* Invoking [[AsyncCallback#invoke]] will eventually lead to [[AsyncStage#onAsyncInput]]
* being called.
*/
trait AsyncCallback[T] {
/**
* Dispatch an asynchronous notification. This method is thread-safe and
* may be invoked from external execution contexts.
*/
def invoke(t: T): Unit
}
/**
* This kind of context is available to [[AsyncStage]]. It implements the same
* interface as for [[DetachedStage]] with the addition of being able to obtain
* [[AsyncCallback]] objects that allow the registration of asynchronous
* notifications.
*/
trait AsyncContext[Out, Ext] extends DetachedContext[Out] {
/**
* Obtain a callback object that can be used asynchronously to re-enter the
* current [[AsyncStage]] with an asynchronous notification. After the
* notification has been invoked, eventually [[AsyncStage#onAsyncInput]]
* will be called with the given data item.
*
* This object can be cached and reused within the same [[AsyncStage]].
*/
def getAsyncCallback(): AsyncCallback[Ext]
/**
* In response to an asynchronous notification an [[AsyncStage]] may choose
* to neither push nor pull nor terminate, which is represented as this
* directive.
*/
def ignore(): AsyncDirective
}
/**
* INTERNAL API
*/