!str,htc replace and remove OneBoundedInterpreter
main work by @drewhk with contributions from @2m and @rkuhn This work uncovered many well-hidden bugs in existing Stages, in particular StatefulStage. These were hidden by the behavior of OneBoundedInterpreter that normally behaves more orderly than it guarantees in general, especially with respect to the timeliness of delivery of upstream termination signals; the bugs were then that internal state was not flushed when onComplete arrived “too early”.
This commit is contained in:
parent
20f54435f1
commit
556012b7ee
107 changed files with 2456 additions and 3061 deletions
|
|
@ -10,7 +10,6 @@ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
|||
import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance }
|
||||
import akka.stream.stage._
|
||||
import akka.stream.{ Supervision, _ }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.collection.immutable.VectorBuilder
|
||||
|
|
@ -18,6 +17,7 @@ import scala.concurrent.Future
|
|||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import akka.stream.ActorAttributes.SupervisionStrategy
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -537,149 +537,150 @@ private[akka] object MapAsync {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out], decider: Supervision.Decider)
|
||||
extends AsyncStage[In, Out, (Int, Try[Out])] {
|
||||
private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out])
|
||||
extends GraphStage[FlowShape[In, Out]] {
|
||||
|
||||
import MapAsync._
|
||||
|
||||
type Notification = (Int, Try[Out])
|
||||
private val in = Inlet[In]("in")
|
||||
private val out = Outlet[Out]("out")
|
||||
|
||||
private var callback: AsyncCallback[Notification] = _
|
||||
private val elemsInFlight = FixedSizeBuffer[Try[Out]](parallelism)
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def preStart(ctx: AsyncContext[Out, Notification]): Unit = {
|
||||
callback = ctx.getAsyncCallback
|
||||
}
|
||||
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
|
||||
override def toString = s"MapAsync.Logic(buffer=$buffer)"
|
||||
|
||||
override def decide(ex: Throwable) = decider(ex)
|
||||
val decider =
|
||||
inheritedAttributes.getAttribute(classOf[SupervisionStrategy])
|
||||
.map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
|
||||
override def onPush(elem: In, ctx: AsyncContext[Out, Notification]) = {
|
||||
val future = f(elem)
|
||||
val idx = elemsInFlight.enqueue(NotYetThere)
|
||||
future.onComplete(t ⇒ callback.invoke((idx, t)))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
if (elemsInFlight.isFull) ctx.holdUpstream()
|
||||
else ctx.pull()
|
||||
}
|
||||
val buffer = FixedSizeBuffer[Try[Out]](parallelism)
|
||||
def todo = buffer.used
|
||||
|
||||
override def onPull(ctx: AsyncContext[Out, (Int, Try[Out])]) = {
|
||||
@tailrec def rec(): DownstreamDirective =
|
||||
if (elemsInFlight.isEmpty && ctx.isFinishing) ctx.finish()
|
||||
else if (elemsInFlight.isEmpty || elemsInFlight.peek == NotYetThere) {
|
||||
if (!elemsInFlight.isFull && ctx.isHoldingUpstream) ctx.holdDownstreamAndPull()
|
||||
else ctx.holdDownstream()
|
||||
} else elemsInFlight.dequeue() match {
|
||||
case Failure(ex) ⇒ rec()
|
||||
@tailrec private def pushOne(): Unit =
|
||||
if (buffer.isEmpty) {
|
||||
if (isClosed(in)) completeStage()
|
||||
else if (!hasBeenPulled(in)) pull(in)
|
||||
} else if (buffer.peek == NotYetThere) {
|
||||
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
||||
} else buffer.dequeue() match {
|
||||
case Failure(ex) ⇒ pushOne()
|
||||
case Success(elem) ⇒
|
||||
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
|
||||
else ctx.push(elem)
|
||||
}
|
||||
rec()
|
||||
}
|
||||
|
||||
override def onAsyncInput(input: (Int, Try[Out]), ctx: AsyncContext[Out, Notification]) = {
|
||||
@tailrec def rec(): Directive =
|
||||
if (elemsInFlight.isEmpty && ctx.isFinishing) ctx.finish()
|
||||
else if (elemsInFlight.isEmpty || (elemsInFlight.peek eq NotYetThere)) {
|
||||
if (!elemsInFlight.isFull && ctx.isHoldingUpstream) ctx.pull()
|
||||
else ctx.ignore()
|
||||
} else elemsInFlight.dequeue() match {
|
||||
case Failure(ex) ⇒ rec()
|
||||
case Success(elem) ⇒
|
||||
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
|
||||
else ctx.push(elem)
|
||||
push(out, elem)
|
||||
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
||||
}
|
||||
|
||||
input match {
|
||||
case (idx, f @ Failure(ex)) ⇒
|
||||
if (decider(ex) != Supervision.Stop) {
|
||||
elemsInFlight.put(idx, f)
|
||||
if (ctx.isHoldingDownstream) rec()
|
||||
else ctx.ignore()
|
||||
} else ctx.fail(ex)
|
||||
case (idx, s: Success[_]) ⇒
|
||||
val exception = try {
|
||||
ReactiveStreamsCompliance.requireNonNullElement(s.value)
|
||||
elemsInFlight.put(idx, s)
|
||||
null: Exception
|
||||
def failOrPull(idx: Int, f: Failure[Out]) =
|
||||
if (decider(f.exception) == Supervision.Stop) failStage(f.exception)
|
||||
else {
|
||||
buffer.put(idx, f)
|
||||
if (isAvailable(out)) pushOne()
|
||||
}
|
||||
|
||||
val futureCB =
|
||||
getAsyncCallback[(Int, Try[Out])]({
|
||||
case (idx, f: Failure[_]) ⇒ failOrPull(idx, f)
|
||||
case (idx, s @ Success(elem)) ⇒
|
||||
if (elem == null) {
|
||||
val ex = ReactiveStreamsCompliance.elementMustNotBeNullException
|
||||
failOrPull(idx, Failure(ex))
|
||||
} else {
|
||||
buffer.put(idx, s)
|
||||
if (isAvailable(out)) pushOne()
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
try {
|
||||
val future = f(grab(in))
|
||||
val idx = buffer.enqueue(NotYetThere)
|
||||
future.onComplete(result ⇒ futureCB.invoke(idx -> result))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
if (decider(ex) != Supervision.Stop) {
|
||||
elemsInFlight.put(idx, Failure(ex))
|
||||
null: Exception
|
||||
} else ex
|
||||
if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
}
|
||||
if (exception != null) ctx.fail(exception)
|
||||
else if (ctx.isHoldingDownstream) rec()
|
||||
else ctx.ignore()
|
||||
}
|
||||
}
|
||||
if (todo < parallelism) tryPull(in)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (todo == 0) completeStage()
|
||||
}
|
||||
})
|
||||
|
||||
override def onUpstreamFinish(ctx: AsyncContext[Out, Notification]) =
|
||||
if (ctx.isHoldingUpstream || !elemsInFlight.isEmpty) ctx.absorbTermination()
|
||||
else ctx.finish()
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = pushOne()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out], decider: Supervision.Decider)
|
||||
extends AsyncStage[In, Out, Try[Out]] {
|
||||
private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out])
|
||||
extends GraphStage[FlowShape[In, Out]] {
|
||||
|
||||
private var callback: AsyncCallback[Try[Out]] = _
|
||||
private var inFlight = 0
|
||||
private val buffer = FixedSizeBuffer[Out](parallelism)
|
||||
private val in = Inlet[In]("in")
|
||||
private val out = Outlet[Out]("out")
|
||||
|
||||
private def todo = inFlight + buffer.used
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def preStart(ctx: AsyncContext[Out, Try[Out]]): Unit =
|
||||
callback = ctx.getAsyncCallback
|
||||
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
|
||||
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
|
||||
|
||||
override def decide(ex: Throwable) = decider(ex)
|
||||
val decider =
|
||||
inheritedAttributes.getAttribute(classOf[SupervisionStrategy])
|
||||
.map(_.decider).getOrElse(Supervision.stoppingDecider)
|
||||
|
||||
override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]) = {
|
||||
val future = f(elem)
|
||||
inFlight += 1
|
||||
future.onComplete(callback.invoke)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
if (todo == parallelism) ctx.holdUpstream()
|
||||
else ctx.pull()
|
||||
}
|
||||
var inFlight = 0
|
||||
val buffer = FixedSizeBuffer[Out](parallelism)
|
||||
def todo = inFlight + buffer.used
|
||||
|
||||
override def onPull(ctx: AsyncContext[Out, Try[Out]]) =
|
||||
if (buffer.isEmpty) {
|
||||
if (ctx.isFinishing && inFlight == 0) ctx.finish() else ctx.holdDownstream()
|
||||
} else {
|
||||
val elem = buffer.dequeue()
|
||||
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
|
||||
else ctx.push(elem)
|
||||
}
|
||||
def failOrPull(ex: Throwable) =
|
||||
if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
else if (isClosed(in) && todo == 0) completeStage()
|
||||
else if (!hasBeenPulled(in)) tryPull(in)
|
||||
|
||||
override def onAsyncInput(input: Try[Out], ctx: AsyncContext[Out, Try[Out]]) = {
|
||||
def ignoreOrFail(ex: Throwable) =
|
||||
if (decider(ex) == Supervision.Stop) ctx.fail(ex)
|
||||
else if (ctx.isHoldingUpstream) ctx.pull()
|
||||
else if (ctx.isFinishing && todo == 0) ctx.finish()
|
||||
else ctx.ignore()
|
||||
|
||||
inFlight -= 1
|
||||
input match {
|
||||
case Failure(ex) ⇒ ignoreOrFail(ex)
|
||||
case Success(elem) ⇒
|
||||
if (elem == null) {
|
||||
val ex = ReactiveStreamsCompliance.elementMustNotBeNullException
|
||||
ignoreOrFail(ex)
|
||||
} else if (ctx.isHoldingDownstream) {
|
||||
if (ctx.isHoldingUpstream) ctx.pushAndPull(elem)
|
||||
else ctx.push(elem)
|
||||
} else {
|
||||
buffer.enqueue(elem)
|
||||
ctx.ignore()
|
||||
val futureCB =
|
||||
getAsyncCallback((result: Try[Out]) ⇒ {
|
||||
inFlight -= 1
|
||||
result match {
|
||||
case Failure(ex) ⇒ failOrPull(ex)
|
||||
case Success(elem) ⇒
|
||||
if (elem == null) {
|
||||
val ex = ReactiveStreamsCompliance.elementMustNotBeNullException
|
||||
failOrPull(ex)
|
||||
} else if (isAvailable(out)) {
|
||||
if (!hasBeenPulled(in)) tryPull(in)
|
||||
push(out, elem)
|
||||
} else buffer.enqueue(elem)
|
||||
}
|
||||
}
|
||||
}
|
||||
}).invoke _
|
||||
|
||||
override def onUpstreamFinish(ctx: AsyncContext[Out, Try[Out]]) =
|
||||
if (todo > 0) ctx.absorbTermination()
|
||||
else ctx.finish()
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
try {
|
||||
val future = f(grab(in))
|
||||
inFlight += 1
|
||||
future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||
}
|
||||
if (todo < parallelism) tryPull(in)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (todo == 0) completeStage()
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (!buffer.isEmpty) push(out, buffer.dequeue())
|
||||
else if (isClosed(in) && todo == 0) completeStage()
|
||||
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -695,7 +696,7 @@ private[akka] final case class Log[T](name: String, extract: T ⇒ Any, logAdapt
|
|||
// TODO more optimisations can be done here - prepare logOnPush function etc
|
||||
|
||||
override def preStart(ctx: LifecycleContext): Unit = {
|
||||
logLevels = ctx.attributes.logLevels.getOrElse(DefaultLogLevels)
|
||||
logLevels = ctx.attributes.get[LogLevels](DefaultLogLevels)
|
||||
log = logAdapter match {
|
||||
case Some(l) ⇒ l
|
||||
case _ ⇒
|
||||
|
|
@ -787,7 +788,7 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS
|
|||
val out = Outlet[immutable.Seq[T]]("out")
|
||||
val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogic: GraphStageLogic = new GraphStageLogic(shape) {
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||
private val buf: VectorBuilder[T] = new VectorBuilder
|
||||
// True if:
|
||||
// - buf is nonEmpty
|
||||
|
|
@ -855,13 +856,17 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS
|
|||
|
||||
private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
override def onUpstreamFinish(): Unit = completeStage()
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = pull(in)
|
||||
})
|
||||
|
||||
final override protected def onTimer(key: Any): Unit =
|
||||
completeStage()
|
||||
|
||||
|
|
@ -874,7 +879,7 @@ private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinea
|
|||
private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
||||
private var allow = false
|
||||
|
||||
override def createLogic: GraphStageLogic = new SimpleLinearStageLogic {
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit =
|
||||
if (allow) push(out, grab(in))
|
||||
|
|
@ -883,6 +888,10 @@ private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinea
|
|||
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = pull(in)
|
||||
})
|
||||
|
||||
final override protected def onTimer(key: Any): Unit = allow = true
|
||||
|
||||
override def preStart(): Unit = scheduleOnce("DropWithinTimer", timeout)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue