pekko/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
Konrad Malawski 4714f16dcf +str #18142 ask pattern integration for akka streams
progressed with cleanup, removing the same thread exec context is
weird... causes issues :-/ Need to debug more, could be that some race
also exists in mapAsync then :\

WIP

finish ask impl via watch stage

mima

consistency spec

fix paradox, and fix adding ask/watch to javadsl source

follow up review
2018-02-22 19:50:46 +09:00

2110 lines
63 KiB
Scala

/**
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.fusing
import java.util.concurrent.TimeUnit.NANOSECONDS
import akka.actor.{ ActorRef, Terminated }
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.dispatch.ExecutionContexts
import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.pattern.AskSupport
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ ConstantFun, ReactiveStreamsCompliance, Stages, Buffer BufferImpl }
import akka.stream.scaladsl.{ Flow, Keep, Source, SourceQueue }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import scala.concurrent.{ Future, Promise }
import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes.SupervisionStrategy
import scala.concurrent.duration.{ FiniteDuration, _ }
import akka.stream.impl.Stages.DefaultAttributes
import akka.util.{ OptionVal, Timeout }
import scala.reflect.ClassTag
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Map[In, Out](f: In Out) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Map.in")
val out = Outlet[Out]("Map.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.map
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private def decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
override def onPush(): Unit = {
try {
push(out, f(grab(in)))
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case _ pull(in)
}
}
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Filter[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.filter
override def toString: String = "Filter"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
override def onPush(): Unit = {
try {
val elem = grab(in)
if (p(elem)) {
push(out, elem)
} else {
pull(in)
}
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case _ pull(in)
}
}
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class TakeWhile[T](p: T Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
override def toString: String = "TakeWhile"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
override def toString = "TakeWhileLogic"
def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
override def onPush(): Unit = {
try {
val elem = grab(in)
if (p(elem)) {
push(out, elem)
} else {
if (inclusive) push(out, elem)
completeStage()
}
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case _ pull(in)
}
}
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class DropWhile[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.dropWhile
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val elem = grab(in)
withSupervision(() p(elem)) match {
case Some(flag) if flag pull(in)
case Some(flag) if !flag
push(out, elem)
setHandler(in, rest)
case None // do nothing
}
}
def rest = new InHandler {
def onPush() = push(out, grab(in))
}
override def onResume(t: Throwable): Unit = if (!hasBeenPulled(in)) pull(in)
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString = "DropWhile"
}
/**
* INTERNAL API
*/
@DoNotInherit private[akka] abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) {
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
def withSupervision[T](f: () T): Option[T] =
try {
Some(f())
} catch {
case NonFatal(ex)
decider(ex) match {
case Supervision.Stop onStop(ex)
case Supervision.Resume onResume(ex)
case Supervision.Restart onRestart(ex)
}
None
}
def onResume(t: Throwable): Unit
def onStop(t: Throwable): Unit = failStage(t)
def onRestart(t: Throwable): Unit = onResume(t)
}
private[stream] object Collect {
// Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once,
// and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not.
// Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458
final val NotApplied: Any Any = _ Collect.NotApplied
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Collect.in")
val out = Outlet[Out]("Collect.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.collect
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
import Collect.NotApplied
val wrappedPf = () pf.applyOrElse(grab(in), NotApplied)
override def onPush(): Unit = withSupervision(wrappedPf) match {
case Some(result) result match {
case NotApplied pull(in)
case result: Out @unchecked push(out, result)
}
case None //do nothing
}
override def onResume(t: Throwable): Unit = if (!hasBeenPulled(in)) pull(in)
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString = "Collect"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] {
override protected def initialAttributes: Attributes = DefaultAttributes.recover
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
import Collect.NotApplied
var recovered: Option[T] = None
override def onPush(): Unit = {
push(out, grab(in))
}
override def onPull(): Unit = {
recovered match {
case Some(elem)
push(out, elem)
completeStage()
case None
pull(in)
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
pf.applyOrElse(ex, NotApplied) match {
case NotApplied failStage(ex)
case result: T @unchecked {
if (isAvailable(out)) {
push(out, result)
completeStage()
} else {
recovered = Some(result)
}
}
}
}
setHandlers(in, out, this)
}
}
/**
* Maps error with the provided function if it is defined for an error or, otherwise, passes it on unchanged.
*
* While similar to [[Recover]] this stage can be used to transform an error signal to a different one *without* logging
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
* would log the `t2` error.
*/
@InternalApi private[akka] final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] {
override def createLogic(attr: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFailure(ex: Throwable): Unit =
if (f.isDefinedAt(ex)) super.onUpstreamFailure(f(ex))
else super.onUpstreamFailure(ex)
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.take
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var left: Long = count
override def onPush(): Unit = {
if (left > 0) {
push(out, grab(in))
left -= 1
}
if (left <= 0) completeStage()
}
override def onPull(): Unit = {
if (left > 0) pull(in)
else completeStage()
}
setHandlers(in, out, this)
}
override def toString: String = "Take"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.drop
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var left: Long = count
override def onPush(): Unit = {
if (left > 0) {
left -= 1
pull(in)
} else push(out, grab(in))
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString: String = "Drop"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends GraphStage[FlowShape[In, Out]] {
override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out"))
override def initialAttributes: Attributes = DefaultAttributes.scan
override def toString: String = "Scan"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
self
private var aggregator = zero
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
import Supervision.{ Stop, Resume, Restart }
import shape.{ in, out }
// Initial behavior makes sure that the zero gets flushed if upstream is empty
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, aggregator)
setHandlers(in, out, self)
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = ()
override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, aggregator)
completeStage()
}
})
})
override def onPull(): Unit = pull(in)
override def onPush(): Unit = {
try {
aggregator = f(aggregator, grab(in))
push(out, aggregator)
} catch {
case NonFatal(ex) decider(ex) match {
case Resume if (!hasBeenPulled(in)) pull(in)
case Stop failStage(ex)
case Restart
aggregator = zero
push(out, aggregator)
}
}
}
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class ScanAsync[In, Out](zero: Out, f: (Out, In) Future[Out]) extends GraphStage[FlowShape[In, Out]] {
import akka.dispatch.ExecutionContexts
val in = Inlet[In]("ScanAsync.in")
val out = Outlet[Out]("ScanAsync.out")
override val shape: FlowShape[In, Out] = FlowShape[In, Out](in, out)
override val initialAttributes: Attributes = Attributes.name("scanAsync")
override val toString: String = "ScanAsync"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
self
private var current: Out = zero
private var eventualCurrent: Future[Out] = Future.successful(current)
private def ec = ExecutionContexts.sameThreadExecutionContext
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler {
override def onPush(): Unit =
throw new IllegalStateException("No push should happen before zero value has been consumed")
override def onPull(): Unit = {
push(out, current)
setHandlers(in, out, self)
}
override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, current)
completeStage()
}
})
}
private def onRestart(t: Throwable): Unit = {
current = zero
}
private def safePull(): Unit = {
if (!hasBeenPulled(in)) {
tryPull(in)
}
}
private def pushAndPullOrFinish(update: Out): Unit = {
push(out, update)
if (isClosed(in)) {
completeStage()
} else if (isAvailable(out)) {
safePull()
}
}
private def doSupervision(t: Throwable): Unit = {
decider(t) match {
case Supervision.Stop failStage(t)
case Supervision.Resume safePull()
case Supervision.Restart
onRestart(t)
safePull()
}
}
private val futureCB = getAsyncCallback[Try[Out]] {
case Success(next) if next != null
current = next
pushAndPullOrFinish(next)
case Success(null) doSupervision(ReactiveStreamsCompliance.elementMustNotBeNullException)
case Failure(t) doSupervision(t)
}.invoke _
setHandlers(in, out, ZeroHandler)
def onPull(): Unit = safePull()
def onPush(): Unit = {
try {
eventualCurrent = f(current, grab(in))
eventualCurrent.value match {
case Some(result) futureCB(result)
case _ eventualCurrent.onComplete(futureCB)(ec)
}
} catch {
case NonFatal(ex)
decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Restart onRestart(ex)
case Supervision.Resume ()
}
tryPull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (current == zero) {
eventualCurrent.value match {
case Some(Success(`zero`))
// #24036 upstream completed without emitting anything but after zero was emitted downstream
completeStage()
case _ // in all other cases we will get a complete when the future completes
}
}
}
override val toString: String = s"ScanAsync.Logic(completed=${eventualCurrent.isCompleted})"
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Fold.in")
val out = Outlet[Out]("Fold.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
override def toString: String = "Fold"
override val initialAttributes = DefaultAttributes.fold
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var aggregator: Out = zero
private def decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
override def onPush(): Unit = {
val elem = grab(in)
try {
aggregator = f(aggregator, elem)
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Restart aggregator = zero
case _ ()
}
} finally {
if (!isClosed(in)) pull(in)
}
}
override def onPull(): Unit = {
if (isClosed(in)) {
push(out, aggregator)
completeStage()
} else {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(out)) {
push(out, aggregator)
completeStage()
}
}
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class FoldAsync[In, Out](zero: Out, f: (Out, In) Future[Out]) extends GraphStage[FlowShape[In, Out]] {
import akka.dispatch.ExecutionContexts
val in = Inlet[In]("FoldAsync.in")
val out = Outlet[Out]("FoldAsync.out")
val shape = FlowShape.of(in, out)
override def toString: String = "FoldAsync"
override val initialAttributes = DefaultAttributes.foldAsync
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var aggregator: Out = zero
private var aggregating: Future[Out] = Future.successful(aggregator)
private def onRestart(t: Throwable): Unit = {
aggregator = zero
}
private def ec = ExecutionContexts.sameThreadExecutionContext
private val futureCB = getAsyncCallback[Try[Out]] {
case Success(update) if update != null
aggregator = update
if (isClosed(in)) {
push(out, update)
completeStage()
} else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)
case other
val ex = other match {
case Failure(t) t
case Success(s) if s == null
ReactiveStreamsCompliance.elementMustNotBeNullException
}
val supervision = decider(ex)
if (supervision == Supervision.Stop) failStage(ex)
else {
if (supervision == Supervision.Restart) onRestart(ex)
if (isClosed(in)) {
push(out, aggregator)
completeStage()
} else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)
}
}.invoke _
def onPush(): Unit = {
try {
aggregating = f(aggregator, grab(in))
handleAggregatingValue()
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case supervision {
supervision match {
case Supervision.Restart onRestart(ex)
case _ () // just ignore on Resume
}
tryPull(in)
}
}
}
}
override def onUpstreamFinish(): Unit = {
handleAggregatingValue()
}
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
private def handleAggregatingValue(): Unit = {
aggregating.value match {
case Some(result) futureCB(result) // already completed
case _ aggregating.onComplete(futureCB)(ec)
}
}
setHandlers(in, out, this)
override def toString =
s"FoldAsync.Logic(completed=${aggregating.isCompleted})"
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends SimpleLinearGraphStage[T] {
ReactiveStreamsCompliance.requireNonNullElement(inject)
if (start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get)
if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get)
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
val startInHandler = new InHandler {
override def onPush(): Unit = {
// if else (to avoid using Iterator[T].flatten in hot code)
if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
else emit(out, grab(in))
setHandler(in, restInHandler) // switch handler
}
override def onUpstreamFinish(): Unit = {
emitMultiple(out, Iterator(start, end).flatten)
completeStage()
}
}
val restInHandler = new InHandler {
override def onPush(): Unit = emitMultiple(out, Iterator(inject, grab(in)))
override def onUpstreamFinish(): Unit = {
if (end.isDefined) emit(out, end.get)
completeStage()
}
}
def onPull(): Unit = pull(in)
setHandler(in, startInHandler)
setHandler(out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
val in = Inlet[T]("Grouped.in")
val out = Outlet[immutable.Seq[T]]("Grouped.out")
override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
override protected val initialAttributes: Attributes = DefaultAttributes.grouped
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val buf = {
val b = Vector.newBuilder[T]
b.sizeHint(n)
b
}
var left = n
override def onPush(): Unit = {
buf += grab(in)
left -= 1
if (left == 0) {
val elements = buf.result()
buf.clear()
left = n
push(out, elements)
} else {
pull(in)
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
// This means the buf is filled with some elements but not enough (left < n) to group together.
// Since the upstream has finished we have to push them to downstream though.
if (left < n) {
val elements = buf.result()
buf.clear()
left = n
push(out, elements)
}
completeStage()
}
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class LimitWeighted[T](val n: Long, val costFn: T Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.limitWeighted
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
private var left = n
override def onPush(): Unit = {
val elem = grab(in)
withSupervision(() costFn(elem)) match {
case Some(weight)
left -= weight
if (left >= 0) push(out, elem) else failStage(new StreamLimitReachedException(n))
case None //do nothing
}
}
override def onResume(t: Throwable): Unit = if (!hasBeenPulled(in)) pull(in)
override def onRestart(t: Throwable): Unit = {
left = n
if (!hasBeenPulled(in)) pull(in)
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString = "LimitWeighted"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
require(step > 0, "step must be greater than 0")
val in = Inlet[T]("Sliding.in")
val out = Outlet[immutable.Seq[T]]("Sliding.out")
override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
override protected val initialAttributes: Attributes = DefaultAttributes.sliding
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var buf = Vector.empty[T]
override def onPush(): Unit = {
buf :+= grab(in)
if (buf.size < n) {
pull(in)
} else if (buf.size == n) {
push(out, buf)
} else if (step <= n) {
buf = buf.drop(step)
if (buf.size == n) {
push(out, buf)
} else pull(in)
} else if (step > n) {
if (buf.size == step) {
buf = buf.drop(step)
}
pull(in)
}
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
// We can finish current stage directly if:
// 1. the buf is empty or
// 2. when the step size is greater than the sliding size (step > n) and current stage is in between
// two sliding (ie. buf.size >= n && buf.size < step).
//
// Otherwise it means there is still a not finished sliding so we have to push them before finish current stage.
if (buf.size < n && buf.size > 0) {
push(out, buf)
}
completeStage()
}
this.setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var buffer: BufferImpl[T] = _
val enqueueAction: T Unit =
overflowStrategy match {
case DropHead elem
if (buffer.isFull) buffer.dropHead()
buffer.enqueue(elem)
pull(in)
case DropTail elem
if (buffer.isFull) buffer.dropTail()
buffer.enqueue(elem)
pull(in)
case DropBuffer elem
if (buffer.isFull) buffer.clear()
buffer.enqueue(elem)
pull(in)
case DropNew elem
if (!buffer.isFull) buffer.enqueue(elem)
pull(in)
case Backpressure elem
buffer.enqueue(elem)
if (!buffer.isFull) pull(in)
case Fail elem
if (buffer.isFull) failStage(new BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
else {
buffer.enqueue(elem)
pull(in)
}
}
override def preStart(): Unit = {
buffer = BufferImpl(size, materializer)
pull(in)
}
override def onPush(): Unit = {
val elem = grab(in)
// If out is available, then it has been pulled but no dequeued element has been delivered.
// It means the buffer at this moment is definitely empty,
// so we just push the current element to out, then pull.
if (isAvailable(out)) {
push(out, elem)
pull(in)
} else {
enqueueAction(elem)
}
}
override def onPull(): Unit = {
if (buffer.nonEmpty) push(out, buffer.dequeue())
if (isClosed(in)) {
if (buffer.isEmpty) completeStage()
} else if (!hasBeenPulled(in)) {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) completeStage()
}
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Batch[In, Out](val max: Long, val costFn: In Long, val seed: In Out, val aggregate: (Out, In) Out)
extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Batch.in")
val out = Outlet[Out]("Batch.out")
override val shape: FlowShape[In, Out] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var agg: Out = null.asInstanceOf[Out]
private var left: Long = max
private var pending: In = null.asInstanceOf[In]
private def flush(): Unit = {
if (agg != null) {
push(out, agg)
left = max
}
if (pending != null) {
try {
agg = seed(pending)
left -= costFn(pending)
pending = null.asInstanceOf[In]
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Restart restartState()
case Supervision.Resume
pending = null.asInstanceOf[In]
}
}
} else {
agg = null.asInstanceOf[Out]
}
}
override def preStart() = pull(in)
def onPush(): Unit = {
val elem = grab(in)
val cost = costFn(elem)
if (agg == null) {
try {
agg = seed(elem)
left -= cost
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Restart
restartState()
case Supervision.Resume
}
}
} else if (left < cost) {
pending = elem
} else {
try {
agg = aggregate(agg, elem)
left -= cost
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Restart
restartState()
case Supervision.Resume
}
}
}
if (isAvailable(out)) flush()
if (pending == null) pull(in)
}
override def onUpstreamFinish(): Unit = {
if (agg == null) completeStage()
}
def onPull(): Unit = {
if (agg == null) {
if (isClosed(in)) completeStage()
else if (!hasBeenPulled(in)) pull(in)
} else if (isClosed(in)) {
push(out, agg)
if (pending == null) completeStage()
else {
try {
agg = seed(pending)
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Resume
case Supervision.Restart
restartState()
if (!hasBeenPulled(in)) pull(in)
}
}
pending = null.asInstanceOf[In]
}
} else {
flush()
if (!hasBeenPulled(in)) pull(in)
}
}
private def restartState(): Unit = {
agg = null.asInstanceOf[Out]
left = max
pending = null.asInstanceOf[In]
}
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Expand[In, Out](val extrapolate: In Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("expand.in")
private val out = Outlet[Out]("expand.out")
override def initialAttributes = DefaultAttributes.expand
override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
private var iterator: Iterator[Out] = Iterator.empty
private var expanded = false
override def preStart(): Unit = pull(in)
def onPush(): Unit = {
iterator = extrapolate(grab(in))
if (iterator.hasNext) {
if (isAvailable(out)) {
expanded = true
pull(in)
push(out, iterator.next())
} else expanded = false
} else pull(in)
}
override def onUpstreamFinish(): Unit = {
if (iterator.hasNext && !expanded) () // need to wait
else completeStage()
}
def onPull(): Unit = {
if (iterator.hasNext) {
if (!expanded) {
expanded = true
if (isClosed(in)) {
push(out, iterator.next())
completeStage()
} else {
// expand needs to pull first to be “fair” when upstream is not actually slow
pull(in)
push(out, iterator.next())
}
} else push(out, iterator.next())
}
}
setHandler(in, this)
setHandler(out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object MapAsync {
final class Holder[T](
var elem: Try[T],
val cb: AsyncCallback[Holder[T]]
) extends (Try[T] Unit) {
// To support both fail-fast when the supervision directive is Stop
// and not calling the decider multiple times (#23888) we need to cache the decider result and re-use that
private var cachedSupervisionDirective: OptionVal[Supervision.Directive] = OptionVal.None
def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): Supervision.Directive = {
cachedSupervisionDirective match {
case OptionVal.Some(d) d
case OptionVal.None
val d = decider(ex)
cachedSupervisionDirective = OptionVal.Some(d)
d
}
}
def setElem(t: Try[T]): Unit = {
elem = t match {
case Success(null) Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException)
case other other
}
}
override def apply(t: Try[T]): Unit = {
setElem(t)
cb.invoke(this)
}
}
val NotYetThere = Failure(new Exception with NoStackTrace)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In Future[Out])
extends GraphStage[FlowShape[In, Out]] {
import MapAsync._
private val in = Inlet[In]("MapAsync.in")
private val out = Outlet[Out]("MapAsync.out")
override def initialAttributes = DefaultAttributes.mapAsync
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var buffer: BufferImpl[Holder[Out]] = _
private val futureCB = getAsyncCallback[Holder[Out]](holder
holder.elem match {
case Success(_) pushNextIfPossible()
case Failure(ex)
holder.supervisionDirectiveFor(decider, ex) match {
// fail fast as if supervision says so
case Supervision.Stop failStage(ex)
case _ pushNextIfPossible()
}
})
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
override def onPull(): Unit = pushNextIfPossible()
override def onPush(): Unit = {
try {
val future = f(grab(in))
val holder = new Holder[Out](NotYetThere, futureCB)
buffer.enqueue(holder)
future.value match {
case None future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
case Some(v)
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
// run the logic directly on this thread
holder.setElem(v)
v match {
// this optimization also requires us to stop the stage to fail fast if the decider says so:
case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop failStage(ex)
case _ pushNextIfPossible()
}
}
} catch {
// this logic must only be executed if f throws, not if the future is failed
case NonFatal(ex) if (decider(ex) == Supervision.Stop) failStage(ex)
}
pullIfNeeded()
}
override def onUpstreamFinish(): Unit = if (buffer.isEmpty) completeStage()
@tailrec
private def pushNextIfPossible(): Unit =
if (buffer.isEmpty) {
if (isClosed(in)) completeStage()
else pullIfNeeded()
} else if (buffer.peek().elem eq NotYetThere) pullIfNeeded() // ahead of line blocking to keep order
else if (isAvailable(out)) {
val holder = buffer.dequeue()
holder.elem match {
case Success(elem)
push(out, elem)
pullIfNeeded()
case Failure(NonFatal(ex))
holder.supervisionDirectiveFor(decider, ex) match {
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
// onComplete callback has run
case Supervision.Stop failStage(ex)
case _
// try next element
pushNextIfPossible()
}
}
}
private def pullIfNeeded(): Unit = {
if (buffer.used < parallelism && !hasBeenPulled(in)) tryPull(in)
}
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In Future[Out])
extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("MapAsyncUnordered.in")
private val out = Outlet[Out]("MapAsyncUnordered.out")
override def initialAttributes = DefaultAttributes.mapAsyncUnordered
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
val decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var inFlight = 0
private var buffer: BufferImpl[Out] = _
private[this] def todo = inFlight + buffer.used
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
def futureCompleted(result: Try[Out]): Unit = {
inFlight -= 1
result match {
case Success(elem) if elem != null
if (isAvailable(out)) {
if (!hasBeenPulled(in)) tryPull(in)
push(out, elem)
} else buffer.enqueue(elem)
case other
val ex = other match {
case Failure(t) t
case Success(s) if s == null ReactiveStreamsCompliance.elementMustNotBeNullException
}
if (decider(ex) == Supervision.Stop) failStage(ex)
else if (isClosed(in) && todo == 0) completeStage()
else if (!hasBeenPulled(in)) tryPull(in)
}
}
private val futureCB = getAsyncCallback(futureCompleted)
private val invokeFutureCB: Try[Out] Unit = futureCB.invoke
override def onPush(): Unit = {
try {
val future = f(grab(in))
inFlight += 1
future.value match {
case None future.onComplete(invokeFutureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
case Some(v) futureCompleted(v)
}
} catch {
case NonFatal(ex) if (decider(ex) == Supervision.Stop) failStage(ex)
}
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
}
override def onUpstreamFinish(): Unit = {
if (todo == 0) completeStage()
}
override def onPull(): Unit = {
if (!buffer.isEmpty) push(out, buffer.dequeue())
else if (isClosed(in) && todo == 0) completeStage()
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
}
setHandlers(in, out, this)
}
}
@InternalApi private[akka] final case class Watch[T](targetRef: ActorRef) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.watch
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging {
private lazy val self = getStageActor {
case (_, Terminated(`targetRef`))
failStage(new WatchedActorTerminatedException("Watch", targetRef))
}
override def preStart(): Unit = {
// initialize self, and watch the target
self.watch(targetRef)
}
override def onPull(): Unit =
pull(in)
override def onPush(): Unit =
push(out, grab(in))
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final case class Log[T](
name: String,
extract: T Any,
logAdapter: Option[LoggingAdapter]) extends SimpleLinearGraphStage[T] {
override def toString = "Log"
// TODO more optimisations can be done here - prepare logOnPush function etc
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
import Log._
private var logLevels: LogLevels = _
private var log: LoggingAdapter = _
def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
override def preStart(): Unit = {
logLevels = inheritedAttributes.get[LogLevels](DefaultLogLevels)
log = logAdapter match {
case Some(l) l
case _
val mat = try ActorMaterializerHelper.downcast(materializer)
catch {
case ex: Exception
throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorMaterializer! " +
"Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex)
}
Logging(mat.system, mat)(fromMaterializer)
}
}
override def onPush(): Unit = {
try {
val elem = grab(in)
if (isEnabled(logLevels.onElement))
log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem))
push(out, elem)
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case _ pull(in)
}
}
}
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(cause: Throwable): Unit = {
if (isEnabled(logLevels.onFailure))
logLevels.onFailure match {
case Logging.ErrorLevel log.error(cause, "[{}] Upstream failed.", name)
case level log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage)
}
super.onUpstreamFailure(cause)
}
override def onUpstreamFinish(): Unit = {
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Upstream finished.", name)
super.onUpstreamFinish()
}
override def onDownstreamFinish(): Unit = {
if (isEnabled(logLevels.onFinish))
log.log(logLevels.onFinish, "[{}] Downstream finished.", name)
super.onDownstreamFinish()
}
private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object Log {
/**
* Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]]
* More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source.
*/
final val fromMaterializer = new LogSource[Materializer] {
// do not expose private context classes (of OneBoundedInterpreter)
override def getClazz(t: Materializer): Class[_] = classOf[Materializer]
override def genString(t: Materializer): String = {
try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t).supervisor.path})"
catch {
case ex: Exception LogSource.fromString.genString(DefaultLoggerName)
}
}
}
private final val DefaultLoggerName = "akka.stream.Log"
private final val OffInt = LogLevels.Off.asInt
private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
}
/**
* INTERNAL API
*/
@InternalApi private[stream] object TimerKeys {
case object TakeWithinTimerKey
case object DropWithinTimerKey
case object GroupedWithinTimerKey
}
@InternalApi private[akka] object GroupedWeightedWithin {
val groupedWeightedWithinTimer = "GroupedWeightedWithinTimer"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class GroupedWeightedWithin[T](val maxWeight: Long, costFn: T Long, val interval: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(maxWeight > 0, "maxWeight must be greater than 0")
require(interval > Duration.Zero)
val in = Inlet[T]("in")
val out = Outlet[immutable.Seq[T]]("out")
override def initialAttributes = DefaultAttributes.groupedWeightedWithin
val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private val buf: VectorBuilder[T] = new VectorBuilder
private var pending: T = null.asInstanceOf[T]
private var pendingWeight: Long = 0L
// True if:
// - buf is nonEmpty
// AND
// - (timer fired
// OR
// totalWeight >= maxWeight
// OR
// pending != null
// OR
// upstream completed)
private var pushEagerly = false
private var groupEmitted = true
private var finished = false
private var totalWeight = 0L
private var hasElements = false
override def preStart() = {
schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval)
pull(in)
}
private def nextElement(elem: T): Unit = {
groupEmitted = false
val cost = costFn(elem)
if (cost < 0L) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
else {
hasElements = true
if (totalWeight + cost <= maxWeight) {
buf += elem
totalWeight += cost
if (totalWeight < maxWeight) pull(in)
else {
// `totalWeight >= maxWeight` which means that downstream can get the next group.
if (!isAvailable(out)) {
// We should emit group when downstream becomes available
pushEagerly = true
// we want to pull anyway, since we allow for zero weight elements
// but since `emitGroup()` will pull internally (by calling `startNewGroup()`)
// we also have to pull if downstream hasn't yet requested an element.
pull(in)
} else {
schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval)
emitGroup()
}
}
} else {
//we have a single heavy element that weighs more than the limit
if (totalWeight == 0L) {
buf += elem
totalWeight += cost
pushEagerly = true
} else {
pending = elem
pendingWeight = cost
}
schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval)
tryCloseGroup()
}
}
}
private def tryCloseGroup(): Unit = {
if (isAvailable(out)) emitGroup()
else if (pending != null || finished) pushEagerly = true
}
private def emitGroup(): Unit = {
groupEmitted = true
push(out, buf.result())
buf.clear()
if (!finished) startNewGroup()
else if (pending != null) emit(out, Vector(pending), () completeStage())
else completeStage()
}
private def startNewGroup(): Unit = {
if (pending != null) {
totalWeight = pendingWeight
pendingWeight = 0L
buf += pending
pending = null.asInstanceOf[T]
groupEmitted = false
} else {
totalWeight = 0L
hasElements = false
}
pushEagerly = false
if (isAvailable(in)) nextElement(grab(in))
else if (!hasBeenPulled(in)) pull(in)
}
override def onPush(): Unit = {
if (pending == null) nextElement(grab(in)) // otherwise keep the element for next round
}
override def onPull(): Unit = if (pushEagerly) emitGroup()
override def onUpstreamFinish(): Unit = {
finished = true
if (groupEmitted) completeStage()
else tryCloseGroup()
}
override protected def onTimer(timerKey: Any) = if (hasElements) {
if (isAvailable(out)) emitGroup()
else pushEagerly = true
}
setHandlers(in, out, this)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer"
final val DelayPrecisionMS = 10
override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max
val delayMillis = d.toMillis
var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element
override def preStart(): Unit = buffer = BufferImpl(size, materializer)
val onPushWhenBufferFull: () Unit = strategy match {
case EmitEarly
() {
if (!isTimerActive(timerName))
push(out, buffer.dequeue()._2)
else {
cancelTimer(timerName)
onTimer(timerName)
}
}
case DropHead
() {
buffer.dropHead()
grabAndPull()
}
case DropTail
() {
buffer.dropTail()
grabAndPull()
}
case DropNew
() {
grab(in)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
}
case DropBuffer
() {
buffer.clear()
grabAndPull()
}
case Fail
() {
failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
}
case Backpressure
() {
throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
}
def onPush(): Unit = {
if (buffer.isFull)
onPushWhenBufferFull()
else {
grabAndPull()
if (!isTimerActive(timerName)) {
scheduleOnce(timerName, d)
}
}
}
def pullCondition: Boolean =
strategy != Backpressure || buffer.used < size
def grabAndPull(): Unit = {
buffer.enqueue((System.nanoTime(), grab(in)))
if (pullCondition) pull(in)
}
override def onUpstreamFinish(): Unit =
completeIfReady()
def onPull(): Unit = {
if (!isTimerActive(timerName) && !buffer.isEmpty) {
val waitTime = nextElementWaitTime()
if (waitTime < 0) {
push(out, buffer.dequeue()._2)
} else {
scheduleOnce(timerName, Math.max(DelayPrecisionMS, waitTime).millis)
}
}
if (!isClosed(in) && !hasBeenPulled(in) && pullCondition)
pull(in)
completeIfReady()
}
setHandler(in, this)
setHandler(out, this)
def completeIfReady(): Unit = if (isClosed(in) && buffer.isEmpty) completeStage()
def nextElementWaitTime(): Long = {
delayMillis - NANOSECONDS.toMillis(System.nanoTime() - buffer.peek()._1)
}
final override protected def onTimer(key: Any): Unit = {
if (isAvailable(out))
push(out, buffer.dequeue()._2)
if (!buffer.isEmpty) {
val waitTime = nextElementWaitTime()
if (waitTime > DelayPrecisionMS)
scheduleOnce(timerName, waitTime.millis)
}
completeIfReady()
}
}
override def toString = "Delay"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
def onPush(): Unit = push(out, grab(in))
def onPull(): Unit = pull(in)
setHandlers(in, out, this)
final override protected def onTimer(key: Any): Unit = completeStage()
override def preStart(): Unit = scheduleOnce("TakeWithinTimer", timeout)
}
override def toString = "TakeWithin"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val startNanoTime = System.nanoTime()
private val timeoutInNano = timeout.toNanos
def onPush(): Unit = {
if (System.nanoTime() - startNanoTime <= timeoutInNano) {
pull(in)
} else {
push(out, grab(in))
// change the in handler to avoid System.nanoTime call after timeout
setHandler(in, new InHandler {
def onPush() = push(out, grab(in))
})
}
}
def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
override def toString = "DropWithin"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class Reduce[T](val f: (T, T) T) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.reduce
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
self
override def toString = s"Reduce.Logic(aggregator=$aggregator)"
var aggregator: T = _
private def decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
def setInitialInHandler(): Unit = {
// Initial input handler
setHandler(in, new InHandler {
override def onPush(): Unit = {
aggregator = grab(in)
pull(in)
setHandler(in, self)
}
override def onUpstreamFinish(): Unit =
failStage(new NoSuchElementException("reduce over empty stream"))
})
}
override def onPush(): Unit = {
val elem = grab(in)
try {
aggregator = f(aggregator, elem)
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Restart
aggregator = _: T
setInitialInHandler()
case _ ()
}
} finally {
if (!isClosed(in)) pull(in)
}
}
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = {
push(out, aggregator)
completeStage()
}
setInitialInHandler()
setHandler(out, self)
}
override def toString = "Reduce"
}
/**
* INTERNAL API
*/
@InternalApi private[stream] object RecoverWith
@InternalApi private[akka] final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.recoverWith
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
var attempt = 0
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
def onFailure(ex: Throwable) =
if ((maximumRetries < 0 || attempt < maximumRetries) && pf.isDefinedAt(ex)) {
switchTo(pf(ex))
attempt += 1
} else
failStage(ex)
def switchTo(source: Graph[SourceShape[T], M]): Unit = {
val sinkIn = new SubSinkInlet[T]("RecoverWithSink")
sinkIn.setHandler(new InHandler {
override def onPush(): Unit = push(out, sinkIn.grab())
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
})
val outHandler = new OutHandler {
override def onPull(): Unit = sinkIn.pull()
override def onDownstreamFinish(): Unit = sinkIn.cancel()
}
Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
setHandler(out, outHandler)
if (isAvailable(out)) sinkIn.pull()
}
}
override def toString: String = "RecoverWith"
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class StatefulMapConcat[In, Out](val f: () In immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("StatefulMapConcat.in")
val out = Outlet[Out]("StatefulMapConcat.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var currentIterator: Iterator[Out] = _
var plainFun = f()
def hasNext = if (currentIterator != null) currentIterator.hasNext else false
setHandlers(in, out, this)
def pushPull(): Unit =
if (hasNext) {
push(out, currentIterator.next())
if (!hasNext && isClosed(in)) completeStage()
} else if (!isClosed(in))
pull(in)
else completeStage()
def onFinish(): Unit = if (!hasNext) completeStage()
override def onPush(): Unit =
try {
currentIterator = plainFun(grab(in)).iterator
pushPull()
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Resume if (!hasBeenPulled(in)) pull(in)
case Supervision.Restart
restartState()
if (!hasBeenPulled(in)) pull(in)
}
}
override def onUpstreamFinish(): Unit = onFinish()
override def onPull(): Unit = pushPull()
private def restartState(): Unit = {
plainFun = f()
currentIterator = null
}
}
override def toString = "StatefulMapConcat"
}
/**
* INTERNAL API
*/
@InternalApi final private[akka] class LazyFlow[I, O, M](flowFactory: I Future[Flow[I, O, M]], zeroMat: () M)
extends GraphStageWithMaterializedValue[FlowShape[I, O], M] {
val in = Inlet[I]("lazyFlow.in")
val out = Outlet[O]("lazyFlow.out")
override def initialAttributes = DefaultAttributes.lazyFlow
override val shape: FlowShape[I, O] = FlowShape.of(in, out)
override def toString: String = "LazyFlow"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var completed = false
var matVal: Option[M] = None
val stageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
val subSink = new SubSinkInlet[O]("LazyFlowSubSink")
override def onPush(): Unit = {
try {
val element = grab(in)
val cb: AsyncCallback[Try[Flow[I, O, M]]] =
getAsyncCallback {
case Success(flow) initInternalSource(flow, element)
case Failure(e) failure(e)
}
flowFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext)
setHandler(in, new InHandler {
override def onPush(): Unit = throw new IllegalStateException("LazyFlow received push while waiting for flowFactory to complete.")
override def onUpstreamFinish(): Unit = gotCompletionEvent()
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
})
} catch {
case NonFatal(e) decider(e) match {
case Supervision.Stop failure(e)
case _ pull(in)
}
}
}
override def onPull(): Unit = {
pull(in)
subSink.pull()
setHandler(out, new OutHandler {
override def onPull(): Unit = {
subSink.pull()
}
override def onDownstreamFinish(): Unit = {
subSink.cancel()
completeStage()
}
})
subSink.setHandler(new InHandler {
override def onPush(): Unit = {
val elem = subSink.grab()
push(out, elem)
}
override def onUpstreamFinish(): Unit = {
completeStage()
}
})
}
setHandler(out, this)
private def failure(ex: Throwable): Unit = {
matVal = Some(zeroMat())
failStage(ex)
}
override def onUpstreamFinish(): Unit = {
matVal = Some(zeroMat())
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
setHandler(in, this)
private def gotCompletionEvent(): Unit = {
setKeepGoing(true)
completed = true
}
private def initInternalSource(flow: Flow[I, O, M], firstElement: I): Unit = {
val sourceOut = new SubSourceOutlet[I]("LazyFlowSubSource")
def switchToFirstElementHandlers(): Unit = {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = {
sourceOut.push(firstElement)
if (completed) internalSourceComplete() else switchToFinalHandlers()
}
override def onDownstreamFinish(): Unit = internalSourceComplete()
})
setHandler(in, new InHandler {
override def onPush(): Unit = sourceOut.push(grab(in))
override def onUpstreamFinish(): Unit = gotCompletionEvent()
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
})
}
def switchToFinalHandlers(): Unit = {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = internalSourceComplete()
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
sourceOut.push(elem)
}
override def onUpstreamFinish(): Unit = internalSourceComplete()
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
})
}
def internalSourceComplete(): Unit = {
sourceOut.complete()
// normal completion, subSink.onUpstreamFinish will complete the stage
}
def internalSourceFailure(ex: Throwable): Unit = {
sourceOut.fail(ex)
failStage(ex)
}
switchToFirstElementHandlers()
try {
matVal = Some(Source.fromGraph(sourceOut.source)
.viaMat(flow)(Keep.right).toMat(subSink.sink)(Keep.left).run()(interpreter.subFusingMaterializer))
} catch {
case NonFatal(ex)
subSink.cancel()
matVal = Some(zeroMat())
failStage(ex)
}
}
}
(stageLogic, matVal.getOrElse(zeroMat()))
}
}