2014-10-08 18:16:57 +02:00
|
|
|
/**
|
2017-01-04 17:37:10 +01:00
|
|
|
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
2014-10-08 18:16:57 +02:00
|
|
|
*/
|
|
|
|
|
package akka.stream.impl.fusing
|
|
|
|
|
|
2016-09-13 16:10:49 +02:00
|
|
|
import java.util.concurrent.TimeUnit.NANOSECONDS
|
2017-03-16 21:04:07 +02:00
|
|
|
|
|
|
|
|
import akka.annotation.{ DoNotInherit, InternalApi }
|
2015-04-09 12:21:12 +02:00
|
|
|
import akka.event.Logging.LogLevel
|
2015-05-27 00:27:05 +02:00
|
|
|
import akka.event.{ LogSource, Logging, LoggingAdapter }
|
2015-11-25 21:29:35 -05:00
|
|
|
import akka.stream.Attributes.{ InputBuffer, LogLevels }
|
2016-01-16 12:17:19 -05:00
|
|
|
import akka.stream.OverflowStrategies._
|
2015-09-11 15:50:17 +02:00
|
|
|
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
2017-09-11 14:17:13 +02:00
|
|
|
import akka.stream.impl.{ ConstantFun, ReactiveStreamsCompliance, Stages, Buffer ⇒ BufferImpl }
|
2017-03-16 21:04:07 +02:00
|
|
|
import akka.stream.scaladsl.{ Source, SourceQueue }
|
2014-11-12 10:43:39 +01:00
|
|
|
import akka.stream.stage._
|
2015-05-27 00:27:05 +02:00
|
|
|
import akka.stream.{ Supervision, _ }
|
2017-03-16 21:04:07 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
import scala.annotation.tailrec
|
2015-04-09 12:21:12 +02:00
|
|
|
import scala.collection.immutable
|
2015-09-11 15:50:17 +02:00
|
|
|
import scala.collection.immutable.VectorBuilder
|
2015-04-09 12:21:12 +02:00
|
|
|
import scala.concurrent.Future
|
2017-12-05 18:51:58 +01:00
|
|
|
import scala.util.control.{ NoStackTrace, NonFatal }
|
2015-05-27 00:27:05 +02:00
|
|
|
import scala.util.{ Failure, Success, Try }
|
2015-10-31 14:46:10 +01:00
|
|
|
import akka.stream.ActorAttributes.SupervisionStrategy
|
2017-03-16 21:04:07 +02:00
|
|
|
|
2015-11-25 21:29:35 -05:00
|
|
|
import scala.concurrent.duration.{ FiniteDuration, _ }
|
2016-01-18 11:29:14 +01:00
|
|
|
import akka.stream.impl.Stages.DefaultAttributes
|
2017-12-05 18:51:58 +01:00
|
|
|
import akka.util.OptionVal
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Map[In, Out](f: In ⇒ Out) extends GraphStage[FlowShape[In, Out]] {
|
2016-07-20 13:26:27 +02:00
|
|
|
val in = Inlet[In]("Map.in")
|
|
|
|
|
val out = Outlet[Out]("Map.out")
|
|
|
|
|
override val shape = FlowShape(in, out)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-07-20 13:26:27 +02:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.map
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2016-07-20 13:26:27 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
private def decider =
|
|
|
|
|
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
push(out, f(grab(in)))
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case _ ⇒ pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
|
2016-04-14 11:04:08 +02:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.filter
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2016-04-14 11:04:08 +02:00
|
|
|
override def toString: String = "Filter"
|
|
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with OutHandler with InHandler {
|
|
|
|
|
def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
val elem = grab(in)
|
|
|
|
|
if (p(elem)) {
|
|
|
|
|
push(out, elem)
|
|
|
|
|
} else {
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case _ ⇒ pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-06-12 23:22:36 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2016-04-04 13:11:22 +02:00
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) extends SimpleLinearGraphStage[T] {
|
2016-02-29 13:20:00 +03:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
|
2015-06-12 23:22:36 -04:00
|
|
|
|
2016-02-29 13:20:00 +03:00
|
|
|
override def toString: String = "TakeWhile"
|
2015-06-12 23:22:36 -04:00
|
|
|
|
2016-02-29 13:20:00 +03:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with OutHandler with InHandler {
|
|
|
|
|
override def toString = "TakeWhileLogic"
|
|
|
|
|
|
|
|
|
|
def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
val elem = grab(in)
|
|
|
|
|
if (p(elem)) {
|
|
|
|
|
push(out, elem)
|
|
|
|
|
} else {
|
2016-04-20 14:47:32 -07:00
|
|
|
if (inclusive) push(out, elem)
|
2016-02-29 13:20:00 +03:00
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
2016-04-04 13:11:22 +02:00
|
|
|
case _ ⇒ pull(in)
|
2016-02-29 13:20:00 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-06-12 23:22:36 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class DropWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
|
2016-03-09 20:46:42 -05:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.dropWhile
|
2015-06-12 23:22:36 -04:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
val elem = grab(in)
|
|
|
|
|
withSupervision(() ⇒ p(elem)) match {
|
|
|
|
|
case Some(flag) if flag ⇒ pull(in)
|
|
|
|
|
case Some(flag) if !flag ⇒
|
|
|
|
|
push(out, elem)
|
|
|
|
|
setHandler(in, rest)
|
|
|
|
|
case None ⇒ // do nothing
|
|
|
|
|
}
|
2015-06-12 23:22:36 -04:00
|
|
|
}
|
|
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
def rest = new InHandler {
|
|
|
|
|
def onPush() = push(out, grab(in))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onResume(t: Throwable): Unit = if (!hasBeenPulled(in)) pull(in)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def onPull(): Unit = pull(in)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def toString = "DropWhile"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@DoNotInherit private[akka] abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) {
|
2016-03-09 20:46:42 -05:00
|
|
|
private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
def withSupervision[T](f: () ⇒ T): Option[T] =
|
2016-10-17 08:02:54 +01:00
|
|
|
try {
|
|
|
|
|
Some(f())
|
|
|
|
|
} catch {
|
2016-03-09 20:46:42 -05:00
|
|
|
case NonFatal(ex) ⇒
|
|
|
|
|
decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ onStop(ex)
|
|
|
|
|
case Supervision.Resume ⇒ onResume(ex)
|
|
|
|
|
case Supervision.Restart ⇒ onRestart(ex)
|
|
|
|
|
}
|
|
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def onResume(t: Throwable): Unit
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
def onStop(t: Throwable): Unit = failStage(t)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
def onRestart(t: Throwable): Unit = onResume(t)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
private[stream] object Collect {
|
2014-11-09 21:09:50 +01:00
|
|
|
// Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once,
|
|
|
|
|
// and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not.
|
|
|
|
|
// Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458
|
|
|
|
|
final val NotApplied: Any ⇒ Any = _ ⇒ Collect.NotApplied
|
|
|
|
|
}
|
|
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
|
2016-03-09 20:46:42 -05:00
|
|
|
val in = Inlet[In]("Collect.in")
|
|
|
|
|
val out = Outlet[Out]("Collect.out")
|
|
|
|
|
override val shape = FlowShape(in, out)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.collect
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
import Collect.NotApplied
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
val wrappedPf = () ⇒ pf.applyOrElse(grab(in), NotApplied)
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def onPush(): Unit = withSupervision(wrappedPf) match {
|
|
|
|
|
case Some(result) ⇒ result match {
|
|
|
|
|
case NotApplied ⇒ pull(in)
|
|
|
|
|
case result: Out @unchecked ⇒ push(out, result)
|
|
|
|
|
}
|
|
|
|
|
case None ⇒ //do nothing
|
2014-11-09 21:09:50 +01:00
|
|
|
}
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def onResume(t: Throwable): Unit = if (!hasBeenPulled(in)) pull(in)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def onPull(): Unit = pull(in)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def toString = "Collect"
|
2014-11-09 21:09:50 +01:00
|
|
|
}
|
|
|
|
|
|
2015-06-13 14:02:37 -04:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] {
|
2016-11-18 17:13:15 +08:00
|
|
|
override protected def initialAttributes: Attributes = DefaultAttributes.recover
|
2016-04-18 15:20:32 +08:00
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-04-18 15:20:32 +08:00
|
|
|
import Collect.NotApplied
|
|
|
|
|
|
|
|
|
|
var recovered: Option[T] = None
|
2015-06-13 14:02:37 -04:00
|
|
|
|
2016-04-18 15:20:32 +08:00
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
push(out, grab(in))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
recovered match {
|
2016-05-06 10:32:06 +02:00
|
|
|
case Some(elem) ⇒
|
2016-04-18 15:20:32 +08:00
|
|
|
push(out, elem)
|
|
|
|
|
completeStage()
|
2016-05-06 10:32:06 +02:00
|
|
|
case None ⇒
|
|
|
|
|
pull(in)
|
2016-04-18 15:20:32 +08:00
|
|
|
}
|
2015-06-13 14:02:37 -04:00
|
|
|
}
|
|
|
|
|
|
2016-04-18 15:20:32 +08:00
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = {
|
|
|
|
|
pf.applyOrElse(ex, NotApplied) match {
|
|
|
|
|
case NotApplied ⇒ failStage(ex)
|
|
|
|
|
case result: T @unchecked ⇒ {
|
|
|
|
|
if (isAvailable(out)) {
|
|
|
|
|
push(out, result)
|
|
|
|
|
completeStage()
|
|
|
|
|
} else {
|
|
|
|
|
recovered = Some(result)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-06-13 14:02:37 -04:00
|
|
|
}
|
|
|
|
|
|
2016-04-18 15:20:32 +08:00
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-06-13 14:02:37 -04:00
|
|
|
}
|
|
|
|
|
|
2016-12-12 17:57:14 +01:00
|
|
|
/**
|
|
|
|
|
* Maps error with the provided function if it is defined for an error or, otherwise, passes it on unchanged.
|
|
|
|
|
*
|
|
|
|
|
* While similar to [[Recover]] this stage can be used to transform an error signal to a different one *without* logging
|
|
|
|
|
* it as an error in the process. So in that sense it is NOT exactly equivalent to `recover(t => throw t2)` since recover
|
|
|
|
|
* would log the `t2` error.
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class MapError[T](f: PartialFunction[Throwable, Throwable]) extends SimpleLinearGraphStage[T] {
|
2016-12-12 17:57:14 +01:00
|
|
|
override def createLogic(attr: Attributes) =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
override def onPush(): Unit = push(out, grab(in))
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit =
|
|
|
|
|
if (f.isDefinedAt(ex)) super.onUpstreamFailure(f(ex))
|
|
|
|
|
else super.onUpstreamFailure(ex)
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] {
|
2016-02-28 23:14:29 +02:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.take
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-02-28 23:14:29 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
private var left: Long = count
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
2017-02-14 19:24:28 +08:00
|
|
|
if (left > 0) {
|
2016-02-28 23:14:29 +02:00
|
|
|
push(out, grab(in))
|
2017-02-14 19:24:28 +08:00
|
|
|
left -= 1
|
2016-02-28 23:14:29 +02:00
|
|
|
}
|
2017-02-14 19:24:28 +08:00
|
|
|
if (left <= 0) completeStage()
|
2016-02-28 23:14:29 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (left > 0) pull(in)
|
|
|
|
|
else completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
2015-09-02 13:23:32 +02:00
|
|
|
|
2016-02-28 23:14:29 +02:00
|
|
|
override def toString: String = "Take"
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
|
2016-02-26 02:51:07 +02:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.drop
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-02-26 02:51:07 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
private var left: Long = count
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
if (left > 0) {
|
|
|
|
|
left -= 1
|
|
|
|
|
pull(in)
|
|
|
|
|
} else push(out, grab(in))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toString: String = "Drop"
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] {
|
2016-04-14 17:33:19 +02:00
|
|
|
override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out"))
|
2014-11-09 21:09:50 +01:00
|
|
|
|
2016-04-14 17:33:19 +02:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.scan
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-04-14 17:33:19 +02:00
|
|
|
override def toString: String = "Scan"
|
2014-11-09 21:09:50 +01:00
|
|
|
|
2016-04-14 17:33:19 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
self ⇒
|
2014-11-09 21:09:50 +01:00
|
|
|
|
2016-04-14 17:33:19 +02:00
|
|
|
private var aggregator = zero
|
|
|
|
|
private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2016-04-14 17:33:19 +02:00
|
|
|
import Supervision.{ Stop, Resume, Restart }
|
|
|
|
|
import shape.{ in, out }
|
|
|
|
|
|
|
|
|
|
// Initial behavior makes sure that the zero gets flushed if upstream is empty
|
|
|
|
|
setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
push(out, aggregator)
|
|
|
|
|
setHandlers(in, out, self)
|
|
|
|
|
}
|
|
|
|
|
})
|
2016-07-05 00:00:52 -04:00
|
|
|
|
|
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = ()
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-07-05 00:00:52 -04:00
|
|
|
override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
push(out, aggregator)
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
})
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2016-04-14 17:33:19 +02:00
|
|
|
override def onPull(): Unit = pull(in)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-04-14 17:33:19 +02:00
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
aggregator = f(aggregator, grab(in))
|
|
|
|
|
push(out, aggregator)
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Resume ⇒ if (!hasBeenPulled(in)) pull(in)
|
|
|
|
|
case Stop ⇒ failStage(ex)
|
|
|
|
|
case Restart ⇒
|
|
|
|
|
aggregator = zero
|
|
|
|
|
push(out, aggregator)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2014-11-09 21:09:50 +01:00
|
|
|
}
|
|
|
|
|
|
2016-10-17 12:43:11 -02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class ScanAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] {
|
2016-10-17 12:43:11 -02:00
|
|
|
|
|
|
|
|
import akka.dispatch.ExecutionContexts
|
|
|
|
|
|
|
|
|
|
val in = Inlet[In]("ScanAsync.in")
|
|
|
|
|
val out = Outlet[Out]("ScanAsync.out")
|
|
|
|
|
override val shape: FlowShape[In, Out] = FlowShape[In, Out](in, out)
|
|
|
|
|
|
|
|
|
|
override val initialAttributes: Attributes = Attributes.name("scanAsync")
|
|
|
|
|
|
|
|
|
|
override val toString: String = "ScanAsync"
|
|
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
self ⇒
|
|
|
|
|
|
|
|
|
|
private var current: Out = zero
|
|
|
|
|
private var eventualCurrent: Future[Out] = Future.successful(current)
|
|
|
|
|
|
|
|
|
|
private def ec = ExecutionContexts.sameThreadExecutionContext
|
|
|
|
|
|
|
|
|
|
private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
|
|
|
|
|
|
|
|
|
private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler {
|
2017-12-05 09:08:33 +01:00
|
|
|
override def onPush(): Unit =
|
|
|
|
|
throw new IllegalStateException("No push should happen before zero value has been consumed")
|
2016-10-17 12:43:11 -02:00
|
|
|
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
push(out, current)
|
|
|
|
|
setHandlers(in, out, self)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
push(out, current)
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def onRestart(t: Throwable): Unit = {
|
|
|
|
|
current = zero
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def safePull(): Unit = {
|
|
|
|
|
if (!hasBeenPulled(in)) {
|
|
|
|
|
tryPull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def pushAndPullOrFinish(update: Out): Unit = {
|
|
|
|
|
push(out, update)
|
|
|
|
|
if (isClosed(in)) {
|
|
|
|
|
completeStage()
|
|
|
|
|
} else if (isAvailable(out)) {
|
|
|
|
|
safePull()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def doSupervision(t: Throwable): Unit = {
|
|
|
|
|
decider(t) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(t)
|
|
|
|
|
case Supervision.Resume ⇒ safePull()
|
|
|
|
|
case Supervision.Restart ⇒
|
|
|
|
|
onRestart(t)
|
|
|
|
|
safePull()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private val futureCB = getAsyncCallback[Try[Out]] {
|
|
|
|
|
case Success(next) if next != null ⇒
|
|
|
|
|
current = next
|
|
|
|
|
pushAndPullOrFinish(next)
|
|
|
|
|
case Success(null) ⇒ doSupervision(ReactiveStreamsCompliance.elementMustNotBeNullException)
|
|
|
|
|
case Failure(t) ⇒ doSupervision(t)
|
|
|
|
|
}.invoke _
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, ZeroHandler)
|
|
|
|
|
|
|
|
|
|
def onPull(): Unit = safePull()
|
|
|
|
|
|
|
|
|
|
def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
eventualCurrent = f(current, grab(in))
|
|
|
|
|
|
|
|
|
|
eventualCurrent.value match {
|
|
|
|
|
case Some(result) ⇒ futureCB(result)
|
|
|
|
|
case _ ⇒ eventualCurrent.onComplete(futureCB)(ec)
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒
|
|
|
|
|
decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case Supervision.Restart ⇒ onRestart(ex)
|
|
|
|
|
case Supervision.Resume ⇒ ()
|
|
|
|
|
}
|
|
|
|
|
tryPull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-12-05 09:08:33 +01:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (current == zero) {
|
|
|
|
|
eventualCurrent.value match {
|
|
|
|
|
case Some(Success(`zero`)) ⇒
|
|
|
|
|
// #24036 upstream completed without emitting anything but after zero was emitted downstream
|
|
|
|
|
completeStage()
|
|
|
|
|
case _ ⇒ // in all other cases we will get a complete when the future completes
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-10-17 12:43:11 -02:00
|
|
|
|
|
|
|
|
override val toString: String = s"ScanAsync.Logic(completed=${eventualCurrent.isCompleted})"
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-11-09 21:09:50 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] {
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-07-08 14:22:18 +02:00
|
|
|
val in = Inlet[In]("Fold.in")
|
|
|
|
|
val out = Outlet[Out]("Fold.out")
|
|
|
|
|
override val shape: FlowShape[In, Out] = FlowShape(in, out)
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-08-24 21:02:32 +02:00
|
|
|
override def toString: String = "Fold"
|
|
|
|
|
|
2016-07-08 14:22:18 +02:00
|
|
|
override val initialAttributes = DefaultAttributes.fold
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-07-08 14:22:18 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
2016-07-20 13:39:23 +02:00
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
2016-07-08 14:22:18 +02:00
|
|
|
private var aggregator: Out = zero
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2016-07-20 13:39:23 +02:00
|
|
|
private def decider =
|
|
|
|
|
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2016-07-08 14:22:18 +02:00
|
|
|
|
2016-07-20 13:39:23 +02:00
|
|
|
override def onPush(): Unit = {
|
2016-11-04 16:13:11 +00:00
|
|
|
val elem = grab(in)
|
2016-07-20 13:39:23 +02:00
|
|
|
try {
|
2016-11-04 16:13:11 +00:00
|
|
|
aggregator = f(aggregator, elem)
|
2016-07-20 13:39:23 +02:00
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
2016-11-04 16:13:11 +00:00
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case Supervision.Restart ⇒ aggregator = zero
|
|
|
|
|
case _ ⇒ ()
|
2016-07-20 13:39:23 +02:00
|
|
|
}
|
2016-11-04 16:13:11 +00:00
|
|
|
} finally {
|
|
|
|
|
if (!isClosed(in)) pull(in)
|
2016-07-08 14:22:18 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (isClosed(in)) {
|
|
|
|
|
push(out, aggregator)
|
|
|
|
|
completeStage()
|
|
|
|
|
} else {
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (isAvailable(out)) {
|
|
|
|
|
push(out, aggregator)
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2016-07-08 14:22:18 +02:00
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2016-08-24 21:02:32 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final class FoldAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] {
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-08-24 21:02:32 +02:00
|
|
|
import akka.dispatch.ExecutionContexts
|
|
|
|
|
|
|
|
|
|
val in = Inlet[In]("FoldAsync.in")
|
|
|
|
|
val out = Outlet[Out]("FoldAsync.out")
|
|
|
|
|
val shape = FlowShape.of(in, out)
|
|
|
|
|
|
|
|
|
|
override def toString: String = "FoldAsync"
|
|
|
|
|
|
|
|
|
|
override val initialAttributes = DefaultAttributes.foldAsync
|
|
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
private var aggregator: Out = zero
|
|
|
|
|
private var aggregating: Future[Out] = Future.successful(aggregator)
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
private def onRestart(t: Throwable): Unit = {
|
|
|
|
|
aggregator = zero
|
|
|
|
|
}
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
private def ec = ExecutionContexts.sameThreadExecutionContext
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
private val futureCB = getAsyncCallback[Try[Out]] {
|
|
|
|
|
case Success(update) if update != null ⇒
|
2016-08-24 21:02:32 +02:00
|
|
|
aggregator = update
|
|
|
|
|
|
|
|
|
|
if (isClosed(in)) {
|
|
|
|
|
push(out, update)
|
|
|
|
|
completeStage()
|
|
|
|
|
} else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
case other ⇒
|
2016-08-24 21:02:32 +02:00
|
|
|
val ex = other match {
|
|
|
|
|
case Failure(t) ⇒ t
|
|
|
|
|
case Success(s) if s == null ⇒
|
|
|
|
|
ReactiveStreamsCompliance.elementMustNotBeNullException
|
|
|
|
|
}
|
|
|
|
|
val supervision = decider(ex)
|
|
|
|
|
|
|
|
|
|
if (supervision == Supervision.Stop) failStage(ex)
|
|
|
|
|
else {
|
|
|
|
|
if (supervision == Supervision.Restart) onRestart(ex)
|
|
|
|
|
|
|
|
|
|
if (isClosed(in)) {
|
|
|
|
|
push(out, aggregator)
|
|
|
|
|
completeStage()
|
|
|
|
|
} else if (isAvailable(out) && !hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
}.invoke _
|
|
|
|
|
|
|
|
|
|
def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
aggregating = f(aggregator, grab(in))
|
|
|
|
|
handleAggregatingValue()
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case supervision ⇒ {
|
|
|
|
|
supervision match {
|
|
|
|
|
case Supervision.Restart ⇒ onRestart(ex)
|
|
|
|
|
case _ ⇒ () // just ignore on Resume
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
tryPull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-08-24 21:02:32 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
handleAggregatingValue()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def onPull(): Unit = if (!hasBeenPulled(in)) tryPull(in)
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
private def handleAggregatingValue(): Unit = {
|
2016-08-24 21:02:32 +02:00
|
|
|
aggregating.value match {
|
|
|
|
|
case Some(result) ⇒ futureCB(result) // already completed
|
|
|
|
|
case _ ⇒ aggregating.onComplete(futureCB)(ec)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
setHandlers(in, out, this)
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
override def toString =
|
|
|
|
|
s"FoldAsync.Logic(completed=${aggregating.isCompleted})"
|
|
|
|
|
}
|
2016-08-24 21:02:32 +02:00
|
|
|
}
|
|
|
|
|
|
2015-10-16 01:55:20 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends SimpleLinearGraphStage[T] {
|
2016-01-18 17:49:32 +01:00
|
|
|
ReactiveStreamsCompliance.requireNonNullElement(inject)
|
2016-01-20 00:16:53 +01:00
|
|
|
if (start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get)
|
|
|
|
|
if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get)
|
2015-12-03 00:04:00 +08:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
|
2015-12-07 22:48:57 +08:00
|
|
|
val startInHandler = new InHandler {
|
2015-12-06 15:02:35 +08:00
|
|
|
override def onPush(): Unit = {
|
2015-12-07 22:48:57 +08:00
|
|
|
// if else (to avoid using Iterator[T].flatten in hot code)
|
2015-12-09 01:26:42 +08:00
|
|
|
if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
|
2015-12-07 22:48:57 +08:00
|
|
|
else emit(out, grab(in))
|
|
|
|
|
setHandler(in, restInHandler) // switch handler
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
2015-12-09 01:26:42 +08:00
|
|
|
emitMultiple(out, Iterator(start, end).flatten)
|
2015-12-07 22:48:57 +08:00
|
|
|
completeStage()
|
2015-12-06 15:02:35 +08:00
|
|
|
}
|
2015-10-16 01:55:20 +02:00
|
|
|
}
|
|
|
|
|
|
2015-12-07 22:48:57 +08:00
|
|
|
val restInHandler = new InHandler {
|
2015-12-09 01:26:42 +08:00
|
|
|
override def onPush(): Unit = emitMultiple(out, Iterator(inject, grab(in)))
|
2015-10-16 01:55:20 +02:00
|
|
|
|
2015-12-06 15:02:35 +08:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
2015-12-07 22:48:57 +08:00
|
|
|
if (end.isDefined) emit(out, end.get)
|
2015-12-06 15:02:35 +08:00
|
|
|
completeStage()
|
2015-12-03 00:04:00 +08:00
|
|
|
}
|
2015-10-16 01:55:20 +02:00
|
|
|
}
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPull(): Unit = pull(in)
|
2015-12-07 22:48:57 +08:00
|
|
|
|
|
|
|
|
setHandler(in, startInHandler)
|
2016-08-29 14:00:48 +02:00
|
|
|
setHandler(out, this)
|
2015-10-16 01:55:20 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
|
2016-07-08 14:22:18 +02:00
|
|
|
require(n > 0, "n must be greater than 0")
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-07-08 14:22:18 +02:00
|
|
|
val in = Inlet[T]("Grouped.in")
|
|
|
|
|
val out = Outlet[immutable.Seq[T]]("Grouped.out")
|
|
|
|
|
override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
|
|
|
|
|
|
|
|
|
|
override protected val initialAttributes: Attributes = DefaultAttributes.grouped
|
|
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
private val buf = {
|
|
|
|
|
val b = Vector.newBuilder[T]
|
|
|
|
|
b.sizeHint(n)
|
|
|
|
|
b
|
|
|
|
|
}
|
|
|
|
|
var left = n
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
buf += grab(in)
|
|
|
|
|
left -= 1
|
|
|
|
|
if (left == 0) {
|
|
|
|
|
val elements = buf.result()
|
|
|
|
|
buf.clear()
|
|
|
|
|
left = n
|
|
|
|
|
push(out, elements)
|
|
|
|
|
} else {
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
// This means the buf is filled with some elements but not enough (left < n) to group together.
|
|
|
|
|
// Since the upstream has finished we have to push them to downstream though.
|
|
|
|
|
if (left < n) {
|
|
|
|
|
val elements = buf.result()
|
|
|
|
|
buf.clear()
|
|
|
|
|
left = n
|
|
|
|
|
push(out, elements)
|
|
|
|
|
}
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-19 00:11:07 +08:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends SimpleLinearGraphStage[T] {
|
2016-03-09 20:46:42 -05:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.limitWeighted
|
2015-11-19 00:11:07 +08:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
|
|
|
|
|
private var left = n
|
2015-11-19 00:11:07 +08:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
val elem = grab(in)
|
|
|
|
|
withSupervision(() ⇒ costFn(elem)) match {
|
2017-02-14 19:24:28 +08:00
|
|
|
case Some(weight) ⇒
|
|
|
|
|
left -= weight
|
2016-03-09 20:46:42 -05:00
|
|
|
if (left >= 0) push(out, elem) else failStage(new StreamLimitReachedException(n))
|
|
|
|
|
case None ⇒ //do nothing
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def onResume(t: Throwable): Unit = if (!hasBeenPulled(in)) pull(in)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def onRestart(t: Throwable): Unit = {
|
|
|
|
|
left = n
|
|
|
|
|
if (!hasBeenPulled(in)) pull(in)
|
|
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def onPull(): Unit = pull(in)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
setHandlers(in, out, this)
|
2015-11-19 00:11:07 +08:00
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-03-09 20:46:42 -05:00
|
|
|
override def toString = "LimitWeighted"
|
2015-11-19 00:11:07 +08:00
|
|
|
}
|
|
|
|
|
|
2015-07-27 11:39:54 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
|
2016-07-08 14:22:18 +02:00
|
|
|
require(n > 0, "n must be greater than 0")
|
|
|
|
|
require(step > 0, "step must be greater than 0")
|
|
|
|
|
|
|
|
|
|
val in = Inlet[T]("Sliding.in")
|
|
|
|
|
val out = Outlet[immutable.Seq[T]]("Sliding.out")
|
|
|
|
|
override val shape: FlowShape[T, immutable.Seq[T]] = FlowShape(in, out)
|
|
|
|
|
|
|
|
|
|
override protected val initialAttributes: Attributes = DefaultAttributes.sliding
|
|
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
private var buf = Vector.empty[T]
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
buf :+= grab(in)
|
|
|
|
|
if (buf.size < n) {
|
|
|
|
|
pull(in)
|
|
|
|
|
} else if (buf.size == n) {
|
|
|
|
|
push(out, buf)
|
|
|
|
|
} else if (step <= n) {
|
|
|
|
|
buf = buf.drop(step)
|
|
|
|
|
if (buf.size == n) {
|
|
|
|
|
push(out, buf)
|
|
|
|
|
} else pull(in)
|
|
|
|
|
} else if (step > n) {
|
|
|
|
|
if (buf.size == step) {
|
|
|
|
|
buf = buf.drop(step)
|
|
|
|
|
}
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
2015-07-27 11:39:54 +02:00
|
|
|
}
|
|
|
|
|
|
2016-07-08 14:22:18 +02:00
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
|
|
|
|
|
// We can finish current stage directly if:
|
|
|
|
|
// 1. the buf is empty or
|
|
|
|
|
// 2. when the step size is greater than the sliding size (step > n) and current stage is in between
|
|
|
|
|
// two sliding (ie. buf.size >= n && buf.size < step).
|
|
|
|
|
//
|
|
|
|
|
// Otherwise it means there is still a not finished sliding so we have to push them before finish current stage.
|
|
|
|
|
if (buf.size < n && buf.size > 0) {
|
|
|
|
|
push(out, buf)
|
|
|
|
|
}
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-07-27 11:39:54 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends SimpleLinearGraphStage[T] {
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-11-16 01:48:33 +08:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
2016-02-07 14:54:48 +01:00
|
|
|
|
2016-11-16 01:48:33 +08:00
|
|
|
private var buffer: BufferImpl[T] = _
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-11-16 01:48:33 +08:00
|
|
|
val enqueueAction: T ⇒ Unit =
|
|
|
|
|
overflowStrategy match {
|
|
|
|
|
case DropHead ⇒ elem ⇒
|
|
|
|
|
if (buffer.isFull) buffer.dropHead()
|
2014-10-08 18:16:57 +02:00
|
|
|
buffer.enqueue(elem)
|
2016-11-16 01:48:33 +08:00
|
|
|
pull(in)
|
|
|
|
|
case DropTail ⇒ elem ⇒
|
|
|
|
|
if (buffer.isFull) buffer.dropTail()
|
|
|
|
|
buffer.enqueue(elem)
|
|
|
|
|
pull(in)
|
|
|
|
|
case DropBuffer ⇒ elem ⇒
|
|
|
|
|
if (buffer.isFull) buffer.clear()
|
|
|
|
|
buffer.enqueue(elem)
|
|
|
|
|
pull(in)
|
|
|
|
|
case DropNew ⇒ elem ⇒
|
|
|
|
|
if (!buffer.isFull) buffer.enqueue(elem)
|
|
|
|
|
pull(in)
|
|
|
|
|
case Backpressure ⇒ elem ⇒
|
|
|
|
|
buffer.enqueue(elem)
|
|
|
|
|
if (!buffer.isFull) pull(in)
|
|
|
|
|
case Fail ⇒ elem ⇒
|
|
|
|
|
if (buffer.isFull) failStage(new BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
|
|
|
|
|
else {
|
|
|
|
|
buffer.enqueue(elem)
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
buffer = BufferImpl(size, materializer)
|
|
|
|
|
pull(in)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
2016-01-16 12:17:19 -05:00
|
|
|
|
2016-11-16 01:48:33 +08:00
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
val elem = grab(in)
|
|
|
|
|
// If out is available, then it has been pulled but no dequeued element has been delivered.
|
|
|
|
|
// It means the buffer at this moment is definitely empty,
|
|
|
|
|
// so we just push the current element to out, then pull.
|
|
|
|
|
if (isAvailable(out)) {
|
|
|
|
|
push(out, elem)
|
|
|
|
|
pull(in)
|
|
|
|
|
} else {
|
|
|
|
|
enqueueAction(elem)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (buffer.nonEmpty) push(out, buffer.dequeue())
|
|
|
|
|
if (isClosed(in)) {
|
|
|
|
|
if (buffer.isEmpty) completeStage()
|
|
|
|
|
} else if (!hasBeenPulled(in)) {
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (buffer.isEmpty) completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2016-01-20 19:29:50 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed: In ⇒ Out, val aggregate: (Out, In) ⇒ Out)
|
2016-01-20 18:20:12 +02:00
|
|
|
extends GraphStage[FlowShape[In, Out]] {
|
|
|
|
|
|
|
|
|
|
val in = Inlet[In]("Batch.in")
|
|
|
|
|
val out = Outlet[Out]("Batch.out")
|
2016-01-19 23:03:36 +02:00
|
|
|
|
|
|
|
|
override val shape: FlowShape[In, Out] = FlowShape.of(in, out)
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
2016-01-21 16:52:44 +01:00
|
|
|
|
2016-02-22 23:22:47 -05:00
|
|
|
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2016-01-21 16:52:44 +01:00
|
|
|
|
2016-01-20 18:20:12 +02:00
|
|
|
private var agg: Out = null.asInstanceOf[Out]
|
2016-01-19 23:03:36 +02:00
|
|
|
private var left: Long = max
|
2016-01-20 18:20:12 +02:00
|
|
|
private var pending: In = null.asInstanceOf[In]
|
2016-01-19 23:03:36 +02:00
|
|
|
|
|
|
|
|
private def flush(): Unit = {
|
2016-02-09 09:23:06 +01:00
|
|
|
if (agg != null) {
|
|
|
|
|
push(out, agg)
|
|
|
|
|
left = max
|
|
|
|
|
}
|
2016-01-19 23:03:36 +02:00
|
|
|
if (pending != null) {
|
2016-01-21 16:52:44 +01:00
|
|
|
try {
|
|
|
|
|
agg = seed(pending)
|
|
|
|
|
left -= costFn(pending)
|
|
|
|
|
pending = null.asInstanceOf[In]
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case Supervision.Restart ⇒ restartState()
|
|
|
|
|
case Supervision.Resume ⇒
|
|
|
|
|
pending = null.asInstanceOf[In]
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-01-19 23:03:36 +02:00
|
|
|
} else {
|
2016-01-20 18:20:12 +02:00
|
|
|
agg = null.asInstanceOf[Out]
|
2016-01-19 23:03:36 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def preStart() = pull(in)
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPush(): Unit = {
|
|
|
|
|
val elem = grab(in)
|
|
|
|
|
val cost = costFn(elem)
|
2016-01-19 23:03:36 +02:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
if (agg == null) {
|
|
|
|
|
try {
|
|
|
|
|
agg = seed(elem)
|
|
|
|
|
left -= cost
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case Supervision.Restart ⇒
|
|
|
|
|
restartState()
|
|
|
|
|
case Supervision.Resume ⇒
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else if (left < cost) {
|
|
|
|
|
pending = elem
|
|
|
|
|
} else {
|
|
|
|
|
try {
|
|
|
|
|
agg = aggregate(agg, elem)
|
|
|
|
|
left -= cost
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case Supervision.Restart ⇒
|
|
|
|
|
restartState()
|
|
|
|
|
case Supervision.Resume ⇒
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-01-21 16:52:44 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
if (isAvailable(out)) flush()
|
|
|
|
|
if (pending == null) pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (agg == null) completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def onPull(): Unit = {
|
|
|
|
|
if (agg == null) {
|
|
|
|
|
if (isClosed(in)) completeStage()
|
|
|
|
|
else if (!hasBeenPulled(in)) pull(in)
|
|
|
|
|
} else if (isClosed(in)) {
|
|
|
|
|
push(out, agg)
|
|
|
|
|
if (pending == null) completeStage()
|
|
|
|
|
else {
|
2016-01-21 16:52:44 +01:00
|
|
|
try {
|
2016-08-29 14:00:48 +02:00
|
|
|
agg = seed(pending)
|
2016-01-21 16:52:44 +01:00
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
2016-08-29 14:00:48 +02:00
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
2016-01-21 16:52:44 +01:00
|
|
|
case Supervision.Resume ⇒
|
|
|
|
|
case Supervision.Restart ⇒
|
|
|
|
|
restartState()
|
2016-08-29 14:00:48 +02:00
|
|
|
if (!hasBeenPulled(in)) pull(in)
|
2016-01-21 16:52:44 +01:00
|
|
|
}
|
|
|
|
|
}
|
2016-08-29 14:00:48 +02:00
|
|
|
pending = null.asInstanceOf[In]
|
2016-01-19 23:03:36 +02:00
|
|
|
}
|
2016-08-29 14:00:48 +02:00
|
|
|
} else {
|
|
|
|
|
flush()
|
|
|
|
|
if (!hasBeenPulled(in)) pull(in)
|
2016-01-19 23:03:36 +02:00
|
|
|
}
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
}
|
2016-01-21 16:52:44 +01:00
|
|
|
|
|
|
|
|
private def restartState(): Unit = {
|
|
|
|
|
agg = null.asInstanceOf[Out]
|
|
|
|
|
left = max
|
|
|
|
|
pending = null.asInstanceOf[In]
|
|
|
|
|
}
|
2016-08-29 14:00:48 +02:00
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
2016-01-19 23:03:36 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
|
2016-01-18 11:29:14 +01:00
|
|
|
private val in = Inlet[In]("expand.in")
|
|
|
|
|
private val out = Outlet[Out]("expand.out")
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-01-18 11:29:14 +01:00
|
|
|
override def initialAttributes = DefaultAttributes.expand
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-18 11:29:14 +01:00
|
|
|
override val shape = FlowShape(in, out)
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
|
2016-01-18 11:29:14 +01:00
|
|
|
private var iterator: Iterator[Out] = Iterator.empty
|
|
|
|
|
private var expanded = false
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2016-01-18 11:29:14 +01:00
|
|
|
override def preStart(): Unit = pull(in)
|
2014-11-25 12:26:24 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPush(): Unit = {
|
|
|
|
|
iterator = extrapolate(grab(in))
|
|
|
|
|
if (iterator.hasNext) {
|
|
|
|
|
if (isAvailable(out)) {
|
|
|
|
|
expanded = true
|
|
|
|
|
pull(in)
|
|
|
|
|
push(out, iterator.next())
|
|
|
|
|
} else expanded = false
|
|
|
|
|
} else pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (iterator.hasNext && !expanded) () // need to wait
|
|
|
|
|
else completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def onPull(): Unit = {
|
|
|
|
|
if (iterator.hasNext) {
|
|
|
|
|
if (!expanded) {
|
|
|
|
|
expanded = true
|
|
|
|
|
if (isClosed(in)) {
|
|
|
|
|
push(out, iterator.next())
|
|
|
|
|
completeStage()
|
|
|
|
|
} else {
|
|
|
|
|
// expand needs to pull first to be “fair” when upstream is not actually slow
|
2016-01-18 11:29:14 +01:00
|
|
|
pull(in)
|
|
|
|
|
push(out, iterator.next())
|
2016-08-29 14:00:48 +02:00
|
|
|
}
|
|
|
|
|
} else push(out, iterator.next())
|
2016-01-18 11:29:14 +01:00
|
|
|
}
|
2016-08-29 14:00:48 +02:00
|
|
|
}
|
2016-01-18 11:29:14 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
setHandler(in, this)
|
|
|
|
|
setHandler(out, this)
|
2014-11-25 12:26:24 +01:00
|
|
|
}
|
2014-11-19 19:50:23 +01:00
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] object MapAsync {
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2017-12-05 18:51:58 +01:00
|
|
|
final class Holder[T](
|
|
|
|
|
var elem: Try[T],
|
|
|
|
|
val cb: AsyncCallback[Holder[T]]
|
|
|
|
|
) extends (Try[T] ⇒ Unit) {
|
|
|
|
|
|
|
|
|
|
// To support both fail-fast when the supervision directive is Stop
|
|
|
|
|
// and not calling the decider multiple times (#23888) we need to cache the decider result and re-use that
|
|
|
|
|
private var cachedSupervisionDirective: OptionVal[Supervision.Directive] = OptionVal.None
|
|
|
|
|
|
|
|
|
|
def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): Supervision.Directive = {
|
|
|
|
|
cachedSupervisionDirective match {
|
|
|
|
|
case OptionVal.Some(d) ⇒ d
|
|
|
|
|
case OptionVal.None ⇒
|
|
|
|
|
val d = decider(ex)
|
|
|
|
|
cachedSupervisionDirective = OptionVal.Some(d)
|
|
|
|
|
d
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-09-11 14:17:13 +02:00
|
|
|
def setElem(t: Try[T]): Unit = {
|
2016-04-04 13:11:22 +02:00
|
|
|
elem = t match {
|
|
|
|
|
case Success(null) ⇒ Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException)
|
|
|
|
|
case other ⇒ other
|
|
|
|
|
}
|
2017-09-11 14:17:13 +02:00
|
|
|
}
|
2016-08-30 19:30:08 +02:00
|
|
|
|
|
|
|
|
override def apply(t: Try[T]): Unit = {
|
|
|
|
|
setElem(t)
|
2016-04-04 13:11:22 +02:00
|
|
|
cb.invoke(this)
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2017-12-05 18:51:58 +01:00
|
|
|
val NotYetThere = Failure(new Exception with NoStackTrace)
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out])
|
2015-10-31 14:46:10 +01:00
|
|
|
extends GraphStage[FlowShape[In, Out]] {
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
import MapAsync._
|
|
|
|
|
|
2016-04-04 13:11:22 +02:00
|
|
|
private val in = Inlet[In]("MapAsync.in")
|
|
|
|
|
private val out = Outlet[Out]("MapAsync.out")
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2016-01-18 17:49:32 +01:00
|
|
|
override def initialAttributes = DefaultAttributes.mapAsync
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
override val shape = FlowShape(in, out)
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2016-04-04 13:11:22 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2017-12-05 18:51:58 +01:00
|
|
|
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider)
|
|
|
|
|
.getOrElse(Supervision.stoppingDecider)
|
|
|
|
|
|
|
|
|
|
private val futureCB = getAsyncCallback[Holder[Out]](holder ⇒
|
|
|
|
|
holder.elem match {
|
|
|
|
|
case Success(_) ⇒ pushNextIfPossible()
|
|
|
|
|
case Failure(NonFatal(ex)) ⇒
|
|
|
|
|
holder.supervisionDirectiveFor(decider, ex) match {
|
|
|
|
|
// fail fast as if supervision says so
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case _ ⇒ pushNextIfPossible()
|
|
|
|
|
}
|
|
|
|
|
})
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2017-12-05 18:51:58 +01:00
|
|
|
private var buffer: BufferImpl[Holder[Out]] = _
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2016-04-04 13:11:22 +02:00
|
|
|
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
|
2016-02-07 14:54:48 +01:00
|
|
|
|
2017-12-05 18:51:58 +01:00
|
|
|
override def onPull(): Unit = pushNextIfPossible()
|
2015-10-31 14:46:10 +01:00
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
val future = f(grab(in))
|
2016-04-04 13:11:22 +02:00
|
|
|
val holder = new Holder[Out](NotYetThere, futureCB)
|
2015-12-20 12:54:05 +01:00
|
|
|
buffer.enqueue(holder)
|
2016-04-28 13:23:03 -07:00
|
|
|
|
|
|
|
|
future.value match {
|
2016-08-30 19:30:08 +02:00
|
|
|
case None ⇒ future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
|
|
|
|
case Some(v) ⇒
|
2017-12-05 18:51:58 +01:00
|
|
|
// #20217 the future is already here, avoid scheduling it on the dispatcher
|
2016-08-30 19:30:08 +02:00
|
|
|
holder.setElem(v)
|
2017-12-05 18:51:58 +01:00
|
|
|
pushNextIfPossible()
|
2016-04-28 13:23:03 -07:00
|
|
|
}
|
|
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
} catch {
|
2017-12-05 18:51:58 +01:00
|
|
|
// this logic must only be executed if f throws, not if the future is failed
|
2016-04-04 13:11:22 +02:00
|
|
|
case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex)
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
2017-12-05 18:51:58 +01:00
|
|
|
|
|
|
|
|
pullIfNeeded()
|
2015-10-31 14:46:10 +01:00
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2017-12-05 18:51:58 +01:00
|
|
|
override def onUpstreamFinish(): Unit = if (buffer.isEmpty) completeStage()
|
|
|
|
|
|
|
|
|
|
@tailrec
|
|
|
|
|
private def pushNextIfPossible(): Unit =
|
|
|
|
|
if (buffer.isEmpty) {
|
|
|
|
|
if (isClosed(in)) completeStage()
|
|
|
|
|
else pullIfNeeded()
|
|
|
|
|
} else if (buffer.peek().elem eq NotYetThere) pullIfNeeded() // ahead of line blocking to keep order
|
|
|
|
|
else if (isAvailable(out)) {
|
|
|
|
|
val holder = buffer.dequeue()
|
|
|
|
|
holder.elem match {
|
|
|
|
|
case Success(elem) ⇒
|
|
|
|
|
push(out, elem)
|
|
|
|
|
pullIfNeeded()
|
|
|
|
|
|
|
|
|
|
case Failure(NonFatal(ex)) ⇒
|
|
|
|
|
holder.supervisionDirectiveFor(decider, ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case _ ⇒
|
|
|
|
|
// try next element
|
|
|
|
|
pushNextIfPossible()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2017-12-05 18:51:58 +01:00
|
|
|
private def pullIfNeeded(): Unit = {
|
|
|
|
|
if (buffer.used < parallelism && !hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
}
|
2016-04-04 13:11:22 +02:00
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out])
|
2015-10-31 14:46:10 +01:00
|
|
|
extends GraphStage[FlowShape[In, Out]] {
|
|
|
|
|
|
2016-04-04 13:11:22 +02:00
|
|
|
private val in = Inlet[In]("MapAsyncUnordered.in")
|
|
|
|
|
private val out = Outlet[Out]("MapAsyncUnordered.out")
|
2015-10-31 14:46:10 +01:00
|
|
|
|
2016-01-18 17:49:32 +01:00
|
|
|
override def initialAttributes = DefaultAttributes.mapAsyncUnordered
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
override val shape = FlowShape(in, out)
|
|
|
|
|
|
2016-04-04 13:11:22 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
|
|
|
|
|
|
|
|
|
|
val decider =
|
|
|
|
|
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
|
|
|
|
|
2016-08-24 21:02:32 +02:00
|
|
|
private var inFlight = 0
|
|
|
|
|
private var buffer: BufferImpl[Out] = _
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-04-04 13:11:22 +02:00
|
|
|
private[this] def todo = inFlight + buffer.used
|
|
|
|
|
|
|
|
|
|
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
|
|
|
|
|
|
2016-08-30 19:30:08 +02:00
|
|
|
def futureCompleted(result: Try[Out]): Unit = {
|
|
|
|
|
inFlight -= 1
|
|
|
|
|
result match {
|
|
|
|
|
case Success(elem) if elem != null ⇒
|
|
|
|
|
if (isAvailable(out)) {
|
|
|
|
|
if (!hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
push(out, elem)
|
|
|
|
|
} else buffer.enqueue(elem)
|
|
|
|
|
case other ⇒
|
|
|
|
|
val ex = other match {
|
|
|
|
|
case Failure(t) ⇒ t
|
|
|
|
|
case Success(s) if s == null ⇒ ReactiveStreamsCompliance.elementMustNotBeNullException
|
|
|
|
|
}
|
|
|
|
|
if (decider(ex) == Supervision.Stop) failStage(ex)
|
|
|
|
|
else if (isClosed(in) && todo == 0) completeStage()
|
|
|
|
|
else if (!hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-08-30 19:30:08 +02:00
|
|
|
private val futureCB = getAsyncCallback(futureCompleted)
|
|
|
|
|
private val invokeFutureCB: Try[Out] ⇒ Unit = futureCB.invoke
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
val future = f(grab(in))
|
|
|
|
|
inFlight += 1
|
2016-04-28 13:23:03 -07:00
|
|
|
future.value match {
|
2016-08-30 19:30:08 +02:00
|
|
|
case None ⇒ future.onComplete(invokeFutureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
|
|
|
|
case Some(v) ⇒ futureCompleted(v)
|
2016-04-28 13:23:03 -07:00
|
|
|
}
|
2015-10-31 14:46:10 +01:00
|
|
|
} catch {
|
2016-04-04 13:11:22 +02:00
|
|
|
case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex)
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
2016-08-30 19:30:08 +02:00
|
|
|
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
2015-10-31 14:46:10 +01:00
|
|
|
}
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (todo == 0) completeStage()
|
|
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (!buffer.isEmpty) push(out, buffer.dequeue())
|
|
|
|
|
else if (isClosed(in) && todo == 0) completeStage()
|
2016-08-24 21:02:32 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
}
|
2016-04-04 13:11:22 +02:00
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final case class Log[T](
|
2016-06-22 16:36:18 +02:00
|
|
|
name: String,
|
|
|
|
|
extract: T ⇒ Any,
|
|
|
|
|
logAdapter: Option[LoggingAdapter]) extends SimpleLinearGraphStage[T] {
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
override def toString = "Log"
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2015-05-11 00:09:59 +02:00
|
|
|
// TODO more optimisations can be done here - prepare logOnPush function etc
|
2016-06-22 16:36:18 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
|
|
|
|
new GraphStageLogic(shape) with OutHandler with InHandler {
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
import Log._
|
2015-05-11 00:09:59 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
private var logLevels: LogLevels = _
|
|
|
|
|
private var log: LoggingAdapter = _
|
2015-05-27 00:27:05 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
logLevels = inheritedAttributes.get[LogLevels](DefaultLogLevels)
|
|
|
|
|
log = logAdapter match {
|
|
|
|
|
case Some(l) ⇒ l
|
|
|
|
|
case _ ⇒
|
|
|
|
|
val mat = try ActorMaterializerHelper.downcast(materializer)
|
|
|
|
|
catch {
|
|
|
|
|
case ex: Exception ⇒
|
|
|
|
|
throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorMaterializer! " +
|
|
|
|
|
"Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex)
|
|
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
Logging(mat.system, mat)(fromMaterializer)
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
val elem = grab(in)
|
|
|
|
|
if (isEnabled(logLevels.onElement))
|
|
|
|
|
log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem))
|
|
|
|
|
|
|
|
|
|
push(out, elem)
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case _ ⇒ pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
}
|
|
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
override def onPull(): Unit = pull(in)
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
override def onUpstreamFailure(cause: Throwable): Unit = {
|
|
|
|
|
if (isEnabled(logLevels.onFailure))
|
|
|
|
|
logLevels.onFailure match {
|
|
|
|
|
case Logging.ErrorLevel ⇒ log.error(cause, "[{}] Upstream failed.", name)
|
|
|
|
|
case level ⇒ log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage)
|
|
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
super.onUpstreamFailure(cause)
|
|
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (isEnabled(logLevels.onFinish))
|
|
|
|
|
log.log(logLevels.onFinish, "[{}] Upstream finished.", name)
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
super.onUpstreamFinish()
|
|
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
override def onDownstreamFinish(): Unit = {
|
|
|
|
|
if (isEnabled(logLevels.onFinish))
|
|
|
|
|
log.log(logLevels.onFinish, "[{}] Downstream finished.", name)
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
super.onDownstreamFinish()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt
|
|
|
|
|
|
|
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] object Log {
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2015-05-27 00:27:05 +02:00
|
|
|
/**
|
2016-06-22 16:36:18 +02:00
|
|
|
* Must be located here to be visible for implicit resolution, when [[Materializer]] is passed to [[Logging]]
|
2015-05-27 00:27:05 +02:00
|
|
|
* More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source.
|
|
|
|
|
*/
|
2016-06-22 16:36:18 +02:00
|
|
|
final val fromMaterializer = new LogSource[Materializer] {
|
2015-05-27 00:27:05 +02:00
|
|
|
|
|
|
|
|
// do not expose private context classes (of OneBoundedInterpreter)
|
2016-06-22 16:36:18 +02:00
|
|
|
override def getClazz(t: Materializer): Class[_] = classOf[Materializer]
|
2015-05-27 00:27:05 +02:00
|
|
|
|
2016-06-22 16:36:18 +02:00
|
|
|
override def genString(t: Materializer): String = {
|
|
|
|
|
try s"$DefaultLoggerName(${ActorMaterializerHelper.downcast(t).supervisor.path})"
|
2015-05-27 00:27:05 +02:00
|
|
|
catch {
|
|
|
|
|
case ex: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private final val DefaultLoggerName = "akka.stream.Log"
|
2015-04-09 12:21:12 +02:00
|
|
|
private final val OffInt = LogLevels.Off.asInt
|
|
|
|
|
private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
|
2015-06-01 18:08:13 +03:00
|
|
|
}
|
2015-09-11 15:50:17 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[stream] object TimerKeys {
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
case object TakeWithinTimerKey
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
case object DropWithinTimerKey
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
case object GroupedWithinTimerKey
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
}
|
|
|
|
|
|
2017-04-28 16:07:06 +03:00
|
|
|
@InternalApi private[akka] object GroupedWeightedWithin {
|
|
|
|
|
val groupedWeightedWithinTimer = "GroupedWeightedWithinTimer"
|
|
|
|
|
}
|
2017-03-16 21:04:07 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-04-28 16:07:06 +03:00
|
|
|
@InternalApi private[akka] final class GroupedWeightedWithin[T](val maxWeight: Long, costFn: T ⇒ Long, val interval: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
|
|
|
|
|
require(maxWeight > 0, "maxWeight must be greater than 0")
|
|
|
|
|
require(interval > Duration.Zero)
|
2016-01-18 17:49:32 +01:00
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
val in = Inlet[T]("in")
|
|
|
|
|
val out = Outlet[immutable.Seq[T]]("out")
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2017-04-28 16:07:06 +03:00
|
|
|
override def initialAttributes = DefaultAttributes.groupedWeightedWithin
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
val shape = FlowShape(in, out)
|
|
|
|
|
|
2016-04-05 12:46:31 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
private val buf: VectorBuilder[T] = new VectorBuilder
|
2017-04-28 16:07:06 +03:00
|
|
|
private var pending: T = null.asInstanceOf[T]
|
|
|
|
|
private var pendingWeight: Long = 0L
|
2015-09-11 15:50:17 +02:00
|
|
|
// True if:
|
|
|
|
|
// - buf is nonEmpty
|
|
|
|
|
// AND
|
2017-04-28 16:07:06 +03:00
|
|
|
// - (timer fired
|
|
|
|
|
// OR
|
|
|
|
|
// totalWeight >= maxWeight
|
|
|
|
|
// OR
|
|
|
|
|
// pending != null
|
|
|
|
|
// OR
|
|
|
|
|
// upstream completed)
|
|
|
|
|
private var pushEagerly = false
|
2017-03-14 15:45:45 +03:00
|
|
|
private var groupEmitted = true
|
2015-09-11 15:50:17 +02:00
|
|
|
private var finished = false
|
2017-04-28 16:07:06 +03:00
|
|
|
private var totalWeight = 0L
|
2017-08-08 14:30:32 +03:00
|
|
|
private var hasElements = false
|
2015-09-11 15:50:17 +02:00
|
|
|
|
|
|
|
|
override def preStart() = {
|
2017-04-28 16:07:06 +03:00
|
|
|
schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval)
|
2015-09-11 15:50:17 +02:00
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def nextElement(elem: T): Unit = {
|
2016-04-05 12:46:31 +02:00
|
|
|
groupEmitted = false
|
2017-04-28 16:07:06 +03:00
|
|
|
val cost = costFn(elem)
|
2017-08-08 14:30:32 +03:00
|
|
|
if (cost < 0L) failStage(new IllegalArgumentException(s"Negative weight [$cost] for element [$elem] is not allowed"))
|
2017-04-28 16:07:06 +03:00
|
|
|
else {
|
2017-08-08 14:30:32 +03:00
|
|
|
hasElements = true
|
2017-04-28 16:07:06 +03:00
|
|
|
if (totalWeight + cost <= maxWeight) {
|
|
|
|
|
buf += elem
|
|
|
|
|
totalWeight += cost
|
|
|
|
|
|
|
|
|
|
if (totalWeight < maxWeight) pull(in)
|
|
|
|
|
else {
|
|
|
|
|
// `totalWeight >= maxWeight` which means that downstream can get the next group.
|
|
|
|
|
if (!isAvailable(out)) {
|
|
|
|
|
// We should emit group when downstream becomes available
|
|
|
|
|
pushEagerly = true
|
|
|
|
|
// we want to pull anyway, since we allow for zero weight elements
|
|
|
|
|
// but since `emitGroup()` will pull internally (by calling `startNewGroup()`)
|
|
|
|
|
// we also have to pull if downstream hasn't yet requested an element.
|
|
|
|
|
pull(in)
|
|
|
|
|
} else {
|
|
|
|
|
schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval)
|
|
|
|
|
emitGroup()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2017-08-08 14:30:32 +03:00
|
|
|
//we have a single heavy element that weighs more than the limit
|
2017-04-28 16:07:06 +03:00
|
|
|
if (totalWeight == 0L) {
|
|
|
|
|
buf += elem
|
|
|
|
|
totalWeight += cost
|
|
|
|
|
pushEagerly = true
|
|
|
|
|
} else {
|
|
|
|
|
pending = elem
|
|
|
|
|
pendingWeight = cost
|
|
|
|
|
}
|
|
|
|
|
schedulePeriodically(GroupedWeightedWithin.groupedWeightedWithinTimer, interval)
|
|
|
|
|
tryCloseGroup()
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-09-11 15:50:17 +02:00
|
|
|
}
|
|
|
|
|
|
2017-04-28 16:07:06 +03:00
|
|
|
private def tryCloseGroup(): Unit = {
|
2015-09-11 15:50:17 +02:00
|
|
|
if (isAvailable(out)) emitGroup()
|
2017-04-28 16:07:06 +03:00
|
|
|
else if (pending != null || finished) pushEagerly = true
|
2015-09-11 15:50:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def emitGroup(): Unit = {
|
2016-04-05 12:46:31 +02:00
|
|
|
groupEmitted = true
|
2015-09-11 15:50:17 +02:00
|
|
|
push(out, buf.result())
|
|
|
|
|
buf.clear()
|
|
|
|
|
if (!finished) startNewGroup()
|
2017-04-28 16:07:06 +03:00
|
|
|
else if (pending != null) emit(out, Vector(pending), () ⇒ completeStage())
|
2015-09-11 15:50:17 +02:00
|
|
|
else completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def startNewGroup(): Unit = {
|
2017-04-28 16:07:06 +03:00
|
|
|
if (pending != null) {
|
|
|
|
|
totalWeight = pendingWeight
|
|
|
|
|
pendingWeight = 0L
|
|
|
|
|
buf += pending
|
|
|
|
|
pending = null.asInstanceOf[T]
|
|
|
|
|
groupEmitted = false
|
|
|
|
|
} else {
|
2017-08-08 14:30:32 +03:00
|
|
|
totalWeight = 0L
|
|
|
|
|
hasElements = false
|
2017-04-28 16:07:06 +03:00
|
|
|
}
|
|
|
|
|
pushEagerly = false
|
2015-09-11 15:50:17 +02:00
|
|
|
if (isAvailable(in)) nextElement(grab(in))
|
|
|
|
|
else if (!hasBeenPulled(in)) pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
2016-04-05 12:46:31 +02:00
|
|
|
override def onPush(): Unit = {
|
2017-04-28 16:07:06 +03:00
|
|
|
if (pending == null) nextElement(grab(in)) // otherwise keep the element for next round
|
2016-04-05 12:46:31 +02:00
|
|
|
}
|
2015-09-11 15:50:17 +02:00
|
|
|
|
2017-04-28 16:07:06 +03:00
|
|
|
override def onPull(): Unit = if (pushEagerly) emitGroup()
|
2015-09-11 15:50:17 +02:00
|
|
|
|
2016-04-05 12:46:31 +02:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
finished = true
|
|
|
|
|
if (groupEmitted) completeStage()
|
2017-04-28 16:07:06 +03:00
|
|
|
else tryCloseGroup()
|
2016-04-05 12:46:31 +02:00
|
|
|
}
|
|
|
|
|
|
2017-08-08 14:30:32 +03:00
|
|
|
override protected def onTimer(timerKey: Any) = if (hasElements) {
|
2017-04-28 16:07:06 +03:00
|
|
|
if (isAvailable(out)) emitGroup()
|
|
|
|
|
else pushEagerly = true
|
|
|
|
|
}
|
2016-04-05 12:46:31 +02:00
|
|
|
setHandlers(in, out, this)
|
2015-09-11 15:50:17 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-16 21:04:07 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
@InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
|
2016-01-18 17:49:32 +01:00
|
|
|
private[this] def timerName = "DelayedTimer"
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2017-03-30 09:27:01 -05:00
|
|
|
final val DelayPrecisionMS = 10
|
|
|
|
|
|
2016-01-18 17:49:32 +01:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.delay
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
|
2016-01-18 17:49:32 +01:00
|
|
|
val size =
|
2016-02-08 12:06:11 +01:00
|
|
|
inheritedAttributes.get[InputBuffer] match {
|
2016-01-18 17:49:32 +01:00
|
|
|
case None ⇒ throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this")
|
|
|
|
|
case Some(InputBuffer(min, max)) ⇒ max
|
|
|
|
|
}
|
2016-09-13 16:10:49 +02:00
|
|
|
val delayMillis = d.toMillis
|
2016-01-18 17:49:32 +01:00
|
|
|
|
2016-02-07 14:54:48 +01:00
|
|
|
var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element
|
2015-11-21 13:48:10 -05:00
|
|
|
|
2016-02-07 14:54:48 +01:00
|
|
|
override def preStart(): Unit = buffer = BufferImpl(size, materializer)
|
|
|
|
|
|
2016-09-13 16:10:49 +02:00
|
|
|
val onPushWhenBufferFull: () ⇒ Unit = strategy match {
|
|
|
|
|
case EmitEarly ⇒
|
|
|
|
|
() ⇒ {
|
2016-08-29 14:00:48 +02:00
|
|
|
if (!isTimerActive(timerName))
|
|
|
|
|
push(out, buffer.dequeue()._2)
|
|
|
|
|
else {
|
|
|
|
|
cancelTimer(timerName)
|
|
|
|
|
onTimer(timerName)
|
|
|
|
|
}
|
2016-09-13 16:10:49 +02:00
|
|
|
}
|
|
|
|
|
case DropHead ⇒
|
|
|
|
|
() ⇒ {
|
2016-08-29 14:00:48 +02:00
|
|
|
buffer.dropHead()
|
2016-09-13 16:10:49 +02:00
|
|
|
grabAndPull()
|
|
|
|
|
}
|
|
|
|
|
case DropTail ⇒
|
|
|
|
|
() ⇒ {
|
2016-08-29 14:00:48 +02:00
|
|
|
buffer.dropTail()
|
2016-09-13 16:10:49 +02:00
|
|
|
grabAndPull()
|
|
|
|
|
}
|
|
|
|
|
case DropNew ⇒
|
|
|
|
|
() ⇒ {
|
2016-08-29 14:00:48 +02:00
|
|
|
grab(in)
|
2015-11-27 15:46:35 -05:00
|
|
|
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
|
2016-09-13 16:10:49 +02:00
|
|
|
}
|
|
|
|
|
case DropBuffer ⇒
|
|
|
|
|
() ⇒ {
|
2016-08-29 14:00:48 +02:00
|
|
|
buffer.clear()
|
2016-09-13 16:10:49 +02:00
|
|
|
grabAndPull()
|
|
|
|
|
}
|
|
|
|
|
case Fail ⇒
|
|
|
|
|
() ⇒ {
|
2016-08-29 14:00:48 +02:00
|
|
|
failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
|
2016-09-13 16:10:49 +02:00
|
|
|
}
|
|
|
|
|
case Backpressure ⇒
|
|
|
|
|
() ⇒ {
|
|
|
|
|
throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def onPush(): Unit = {
|
|
|
|
|
if (buffer.isFull)
|
|
|
|
|
onPushWhenBufferFull()
|
2016-08-29 14:00:48 +02:00
|
|
|
else {
|
2016-09-13 16:10:49 +02:00
|
|
|
grabAndPull()
|
|
|
|
|
if (!isTimerActive(timerName)) {
|
|
|
|
|
scheduleOnce(timerName, d)
|
|
|
|
|
}
|
2015-11-25 21:29:35 -05:00
|
|
|
}
|
2016-08-29 14:00:48 +02:00
|
|
|
}
|
2015-11-25 21:29:35 -05:00
|
|
|
|
2016-09-13 16:10:49 +02:00
|
|
|
def pullCondition: Boolean =
|
|
|
|
|
strategy != Backpressure || buffer.used < size
|
|
|
|
|
|
|
|
|
|
def grabAndPull(): Unit = {
|
2016-08-29 14:00:48 +02:00
|
|
|
buffer.enqueue((System.nanoTime(), grab(in)))
|
|
|
|
|
if (pullCondition) pull(in)
|
|
|
|
|
}
|
2015-11-21 13:48:10 -05:00
|
|
|
|
2016-09-13 16:10:49 +02:00
|
|
|
override def onUpstreamFinish(): Unit =
|
|
|
|
|
completeIfReady()
|
2015-11-25 21:29:35 -05:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPull(): Unit = {
|
2017-03-30 09:27:01 -05:00
|
|
|
if (!isTimerActive(timerName) && !buffer.isEmpty) {
|
|
|
|
|
val waitTime = nextElementWaitTime()
|
|
|
|
|
if (waitTime < 0) {
|
|
|
|
|
push(out, buffer.dequeue()._2)
|
|
|
|
|
} else {
|
|
|
|
|
scheduleOnce(timerName, Math.max(DelayPrecisionMS, waitTime).millis)
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-08-29 14:00:48 +02:00
|
|
|
|
2016-09-13 16:10:49 +02:00
|
|
|
if (!isClosed(in) && !hasBeenPulled(in) && pullCondition)
|
|
|
|
|
pull(in)
|
|
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
completeIfReady()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandler(in, this)
|
|
|
|
|
setHandler(out, this)
|
2015-11-21 13:48:10 -05:00
|
|
|
|
2016-09-13 16:10:49 +02:00
|
|
|
def completeIfReady(): Unit = if (isClosed(in) && buffer.isEmpty) completeStage()
|
2015-11-25 21:29:35 -05:00
|
|
|
|
2016-09-13 16:10:49 +02:00
|
|
|
def nextElementWaitTime(): Long = {
|
|
|
|
|
delayMillis - NANOSECONDS.toMillis(System.nanoTime() - buffer.peek()._1)
|
|
|
|
|
}
|
2015-11-25 21:29:35 -05:00
|
|
|
|
2015-11-21 13:48:10 -05:00
|
|
|
final override protected def onTimer(key: Any): Unit = {
|
2016-09-13 16:10:49 +02:00
|
|
|
if (isAvailable(out))
|
|
|
|
|
push(out, buffer.dequeue()._2)
|
|
|
|
|
|
2015-11-25 21:29:35 -05:00
|
|
|
if (!buffer.isEmpty) {
|
|
|
|
|
val waitTime = nextElementWaitTime()
|
2017-03-30 09:27:01 -05:00
|
|
|
if (waitTime > DelayPrecisionMS)
|
2016-09-13 16:10:49 +02:00
|
|
|
scheduleOnce(timerName, waitTime.millis)
|
2015-11-25 21:29:35 -05:00
|
|
|
}
|
|
|
|
|
completeIfReady()
|
2015-11-21 13:48:10 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toString = "Delay"
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-16 21:04:07 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
@InternalApi private[akka] final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
2015-09-11 15:50:17 +02:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
def onPush(): Unit = push(out, grab(in))
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPull(): Unit = pull(in)
|
2015-09-11 15:50:17 +02:00
|
|
|
|
2017-02-14 19:24:28 +08:00
|
|
|
setHandlers(in, out, this)
|
2015-10-31 14:46:10 +01:00
|
|
|
|
2017-02-14 19:24:28 +08:00
|
|
|
final override protected def onTimer(key: Any): Unit = completeStage()
|
2015-09-11 15:50:17 +02:00
|
|
|
|
2015-10-21 17:52:11 +02:00
|
|
|
override def preStart(): Unit = scheduleOnce("TakeWithinTimer", timeout)
|
2015-09-11 15:50:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toString = "TakeWithin"
|
|
|
|
|
}
|
|
|
|
|
|
2017-03-16 21:04:07 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
@InternalApi private[akka] final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
2017-02-14 19:24:28 +08:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
2015-12-07 12:41:38 +01:00
|
|
|
|
2017-02-14 19:24:28 +08:00
|
|
|
private val startNanoTime = System.nanoTime()
|
|
|
|
|
private val timeoutInNano = timeout.toNanos
|
2015-12-07 12:41:38 +01:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPush(): Unit = {
|
2017-02-14 19:24:28 +08:00
|
|
|
if (System.nanoTime() - startNanoTime <= timeoutInNano) {
|
|
|
|
|
pull(in)
|
|
|
|
|
} else {
|
|
|
|
|
push(out, grab(in))
|
|
|
|
|
// change the in handler to avoid System.nanoTime call after timeout
|
|
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
def onPush() = push(out, grab(in))
|
|
|
|
|
})
|
|
|
|
|
}
|
2016-08-29 14:00:48 +02:00
|
|
|
}
|
2015-09-11 15:50:17 +02:00
|
|
|
|
2016-08-29 14:00:48 +02:00
|
|
|
def onPull(): Unit = pull(in)
|
|
|
|
|
|
2017-02-14 19:24:28 +08:00
|
|
|
setHandlers(in, out, this)
|
2015-09-11 15:50:17 +02:00
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toString = "DropWithin"
|
2016-01-18 17:21:14 +02:00
|
|
|
}
|
2016-01-15 22:51:26 -05:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] {
|
2016-01-15 22:51:26 -05:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.reduce
|
|
|
|
|
|
2016-10-17 08:02:54 +01:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
|
|
|
|
|
self ⇒
|
2016-01-15 22:51:26 -05:00
|
|
|
override def toString = s"Reduce.Logic(aggregator=$aggregator)"
|
2016-04-11 15:36:10 +02:00
|
|
|
|
2016-01-15 22:51:26 -05:00
|
|
|
var aggregator: T = _
|
|
|
|
|
|
2016-11-04 10:38:35 +00:00
|
|
|
private def decider =
|
|
|
|
|
inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2016-04-11 15:36:10 +02:00
|
|
|
|
2016-11-04 10:38:35 +00:00
|
|
|
def setInitialInHandler(): Unit = {
|
|
|
|
|
// Initial input handler
|
|
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
aggregator = grab(in)
|
|
|
|
|
pull(in)
|
|
|
|
|
setHandler(in, self)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit =
|
|
|
|
|
failStage(new NoSuchElementException("reduce over empty stream"))
|
|
|
|
|
})
|
|
|
|
|
}
|
2016-04-11 15:36:10 +02:00
|
|
|
|
|
|
|
|
override def onPush(): Unit = {
|
2016-11-04 16:13:11 +00:00
|
|
|
val elem = grab(in)
|
2016-11-04 10:38:35 +00:00
|
|
|
try {
|
2016-11-04 16:13:11 +00:00
|
|
|
aggregator = f(aggregator, elem)
|
2016-11-04 10:38:35 +00:00
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
2016-11-04 16:13:11 +00:00
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
2016-11-04 10:38:35 +00:00
|
|
|
case Supervision.Restart ⇒
|
|
|
|
|
aggregator = _: T
|
|
|
|
|
setInitialInHandler()
|
2016-11-04 16:13:11 +00:00
|
|
|
case _ ⇒ ()
|
|
|
|
|
|
2016-11-04 10:38:35 +00:00
|
|
|
}
|
2016-11-04 16:13:11 +00:00
|
|
|
} finally {
|
|
|
|
|
if (!isClosed(in)) pull(in)
|
2016-11-04 10:38:35 +00:00
|
|
|
}
|
2016-01-15 22:51:26 -05:00
|
|
|
}
|
|
|
|
|
|
2016-04-11 15:36:10 +02:00
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
push(out, aggregator)
|
|
|
|
|
completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
2016-11-04 10:38:35 +00:00
|
|
|
setInitialInHandler()
|
2016-04-11 15:36:10 +02:00
|
|
|
setHandler(out, self)
|
2016-01-15 22:51:26 -05:00
|
|
|
}
|
2016-08-29 14:00:48 +02:00
|
|
|
|
2016-01-15 22:51:26 -05:00
|
|
|
override def toString = "Reduce"
|
|
|
|
|
}
|
2016-01-29 22:06:36 -05:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-06-13 18:18:46 +09:00
|
|
|
@InternalApi private[stream] object RecoverWith
|
2016-04-20 06:24:12 -07:00
|
|
|
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-29 22:06:36 -05:00
|
|
|
override def initialAttributes = DefaultAttributes.recoverWith
|
|
|
|
|
|
|
|
|
|
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
|
2016-04-20 06:24:12 -07:00
|
|
|
var attempt = 0
|
|
|
|
|
|
2016-01-29 22:06:36 -05:00
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = push(out, grab(in))
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-29 22:06:36 -05:00
|
|
|
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
})
|
|
|
|
|
|
2016-04-20 06:24:12 -07:00
|
|
|
def onFailure(ex: Throwable) =
|
2017-06-13 18:18:46 +09:00
|
|
|
if ((maximumRetries < 0 || attempt < maximumRetries) && pf.isDefinedAt(ex)) {
|
2016-04-20 06:24:12 -07:00
|
|
|
switchTo(pf(ex))
|
|
|
|
|
attempt += 1
|
2016-04-25 12:01:03 +02:00
|
|
|
} else
|
2016-04-20 06:24:12 -07:00
|
|
|
failStage(ex)
|
2016-01-29 22:06:36 -05:00
|
|
|
|
|
|
|
|
def switchTo(source: Graph[SourceShape[T], M]): Unit = {
|
|
|
|
|
val sinkIn = new SubSinkInlet[T]("RecoverWithSink")
|
2016-08-23 13:13:11 +02:00
|
|
|
|
2016-01-29 22:06:36 -05:00
|
|
|
sinkIn.setHandler(new InHandler {
|
2016-08-23 13:13:11 +02:00
|
|
|
override def onPush(): Unit = push(out, sinkIn.grab())
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-08-23 13:13:11 +02:00
|
|
|
override def onUpstreamFinish(): Unit = completeStage()
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-29 22:06:36 -05:00
|
|
|
override def onUpstreamFailure(ex: Throwable) = onFailure(ex)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
val outHandler = new OutHandler {
|
2016-08-23 13:13:11 +02:00
|
|
|
override def onPull(): Unit = sinkIn.pull()
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-29 22:06:36 -05:00
|
|
|
override def onDownstreamFinish(): Unit = sinkIn.cancel()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Source.fromGraph(source).runWith(sinkIn.sink)(interpreter.subFusingMaterializer)
|
|
|
|
|
setHandler(out, outHandler)
|
2016-08-23 13:13:11 +02:00
|
|
|
if (isAvailable(out)) sinkIn.pull()
|
2016-01-29 22:06:36 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toString: String = "RecoverWith"
|
2016-01-27 00:00:39 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2017-03-16 21:04:07 +02:00
|
|
|
@InternalApi private[akka] final class StatefulMapConcat[In, Out](val f: () ⇒ In ⇒ immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] {
|
2016-01-27 00:00:39 -05:00
|
|
|
val in = Inlet[In]("StatefulMapConcat.in")
|
|
|
|
|
val out = Outlet[Out]("StatefulMapConcat.out")
|
|
|
|
|
override val shape = FlowShape(in, out)
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-27 00:00:39 -05:00
|
|
|
override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat
|
|
|
|
|
|
|
|
|
|
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
|
2016-02-22 23:22:47 -05:00
|
|
|
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2016-01-27 00:00:39 -05:00
|
|
|
var currentIterator: Iterator[Out] = _
|
|
|
|
|
var plainFun = f()
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-27 00:00:39 -05:00
|
|
|
def hasNext = if (currentIterator != null) currentIterator.hasNext else false
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-27 00:00:39 -05:00
|
|
|
setHandlers(in, out, this)
|
|
|
|
|
|
|
|
|
|
def pushPull(): Unit =
|
|
|
|
|
if (hasNext) {
|
|
|
|
|
push(out, currentIterator.next())
|
|
|
|
|
if (!hasNext && isClosed(in)) completeStage()
|
|
|
|
|
} else if (!isClosed(in))
|
|
|
|
|
pull(in)
|
|
|
|
|
else completeStage()
|
|
|
|
|
|
|
|
|
|
def onFinish(): Unit = if (!hasNext) completeStage()
|
|
|
|
|
|
|
|
|
|
override def onPush(): Unit =
|
|
|
|
|
try {
|
|
|
|
|
currentIterator = plainFun(grab(in)).iterator
|
|
|
|
|
pushPull()
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒ decider(ex) match {
|
|
|
|
|
case Supervision.Stop ⇒ failStage(ex)
|
|
|
|
|
case Supervision.Resume ⇒ if (!hasBeenPulled(in)) pull(in)
|
|
|
|
|
case Supervision.Restart ⇒
|
|
|
|
|
restartState()
|
|
|
|
|
if (!hasBeenPulled(in)) pull(in)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = onFinish()
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-27 00:00:39 -05:00
|
|
|
override def onPull(): Unit = pushPull()
|
|
|
|
|
|
|
|
|
|
private def restartState(): Unit = {
|
|
|
|
|
plainFun = f()
|
|
|
|
|
currentIterator = null
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-10-17 08:02:54 +01:00
|
|
|
|
2016-01-27 00:00:39 -05:00
|
|
|
override def toString = "StatefulMapConcat"
|
|
|
|
|
|
2016-02-22 20:18:15 +01:00
|
|
|
}
|