2014-10-08 18:16:57 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.stream.impl.fusing
|
|
|
|
|
|
2015-04-09 12:21:12 +02:00
|
|
|
import akka.event.Logging.LogLevel
|
2015-05-27 00:27:05 +02:00
|
|
|
import akka.event.{ LogSource, Logging, LoggingAdapter }
|
2015-11-25 21:29:35 -05:00
|
|
|
import akka.stream.Attributes.{ InputBuffer, LogLevels }
|
|
|
|
|
import akka.stream.DelayOverflowStrategy.EmitEarly
|
2015-09-11 15:50:17 +02:00
|
|
|
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
2015-12-20 12:54:05 +01:00
|
|
|
import akka.stream.impl.{ FixedSizeBuffer, BoundedBuffer, ReactiveStreamsCompliance }
|
2014-11-12 10:43:39 +01:00
|
|
|
import akka.stream.stage._
|
2015-05-27 00:27:05 +02:00
|
|
|
import akka.stream.{ Supervision, _ }
|
2015-04-09 22:28:16 +02:00
|
|
|
import scala.annotation.tailrec
|
2015-04-09 12:21:12 +02:00
|
|
|
import scala.collection.immutable
|
2015-09-11 15:50:17 +02:00
|
|
|
import scala.collection.immutable.VectorBuilder
|
2015-04-09 12:21:12 +02:00
|
|
|
import scala.concurrent.Future
|
2015-04-09 22:28:16 +02:00
|
|
|
import scala.util.control.NonFatal
|
2015-05-27 00:27:05 +02:00
|
|
|
import scala.util.{ Failure, Success, Try }
|
2015-10-31 14:46:10 +01:00
|
|
|
import akka.stream.ActorAttributes.SupervisionStrategy
|
2015-11-25 21:29:35 -05:00
|
|
|
import scala.concurrent.duration.{ FiniteDuration, _ }
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-02-04 09:26:32 +01:00
|
|
|
private[akka] final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.Decider) extends PushStage[In, Out] {
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem))
|
2015-02-04 09:26:32 +01:00
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-02-04 09:26:32 +01:00
|
|
|
private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
|
2014-11-12 10:43:39 +01:00
|
|
|
if (p(elem)) ctx.push(elem)
|
|
|
|
|
else ctx.pull()
|
2015-02-04 09:26:32 +01:00
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
2015-06-12 23:22:36 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final case class TakeWhile[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
|
|
|
|
|
|
|
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
|
|
|
|
|
if (p(elem))
|
|
|
|
|
ctx.push(elem)
|
|
|
|
|
else
|
|
|
|
|
ctx.finish()
|
|
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final case class DropWhile[T](p: T ⇒ Boolean, decider: Supervision.Decider) extends PushStage[T, T] {
|
|
|
|
|
var taking = false
|
|
|
|
|
|
|
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
|
|
|
|
|
if (taking || !p(elem)) {
|
|
|
|
|
taking = true
|
|
|
|
|
ctx.push(elem)
|
|
|
|
|
} else {
|
2015-06-14 03:12:30 -04:00
|
|
|
ctx.pull()
|
2015-06-12 23:22:36 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2015-08-01 00:13:14 +02:00
|
|
|
private[akka] object Collect {
|
2014-11-09 21:09:50 +01:00
|
|
|
// Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once,
|
|
|
|
|
// and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not.
|
|
|
|
|
// Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458
|
|
|
|
|
final val NotApplied: Any ⇒ Any = _ ⇒ Collect.NotApplied
|
|
|
|
|
}
|
|
|
|
|
|
2015-06-14 03:12:30 -04:00
|
|
|
private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out], decider: Supervision.Decider) extends PushStage[In, Out] {
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2014-11-09 21:09:50 +01:00
|
|
|
import Collect.NotApplied
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: In, ctx: Context[Out]): SyncDirective =
|
2014-11-09 21:09:50 +01:00
|
|
|
pf.applyOrElse(elem, NotApplied) match {
|
2015-01-23 17:18:09 +01:00
|
|
|
case NotApplied ⇒ ctx.pull()
|
|
|
|
|
case result: Out @unchecked ⇒ ctx.push(result)
|
2014-11-09 21:09:50 +01:00
|
|
|
}
|
2015-02-04 09:26:32 +01:00
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
2014-11-09 21:09:50 +01:00
|
|
|
}
|
|
|
|
|
|
2015-06-13 14:02:37 -04:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends PushPullStage[T, T] {
|
|
|
|
|
import Collect.NotApplied
|
|
|
|
|
var recovered: Option[T] = None
|
|
|
|
|
|
|
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
|
|
|
|
|
ctx.push(elem)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(ctx: Context[T]): SyncDirective =
|
|
|
|
|
recovered match {
|
|
|
|
|
case Some(value) ⇒ ctx.pushAndFinish(value)
|
|
|
|
|
case None ⇒ ctx.pull()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFailure(t: Throwable, ctx: Context[T]): TerminationDirective = {
|
|
|
|
|
pf.applyOrElse(t, NotApplied) match {
|
|
|
|
|
case NotApplied ⇒ ctx.fail(t)
|
|
|
|
|
case result: T @unchecked ⇒
|
|
|
|
|
recovered = Some(result)
|
|
|
|
|
ctx.absorbTermination()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-05-12 15:54:26 +02:00
|
|
|
private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Iterable[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] {
|
2014-10-08 18:16:57 +02:00
|
|
|
private var currentIterator: Iterator[Out] = Iterator.empty
|
|
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
|
2014-10-08 18:16:57 +02:00
|
|
|
currentIterator = f(elem).iterator
|
2015-05-12 15:54:26 +02:00
|
|
|
if (!currentIterator.hasNext) ctx.pull()
|
2014-11-12 10:43:39 +01:00
|
|
|
else ctx.push(currentIterator.next())
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPull(ctx: Context[Out]): SyncDirective =
|
2015-05-12 15:54:26 +02:00
|
|
|
if (ctx.isFinishing) {
|
|
|
|
|
if (currentIterator.hasNext) {
|
|
|
|
|
val elem = currentIterator.next()
|
|
|
|
|
if (currentIterator.hasNext) ctx.push(elem)
|
|
|
|
|
else ctx.pushAndFinish(elem)
|
|
|
|
|
} else ctx.finish()
|
|
|
|
|
} else {
|
|
|
|
|
if (currentIterator.hasNext) ctx.push(currentIterator.next())
|
|
|
|
|
else ctx.pull()
|
|
|
|
|
}
|
2014-11-19 19:50:23 +01:00
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective =
|
2015-05-12 15:54:26 +02:00
|
|
|
if (currentIterator.hasNext) ctx.absorbTermination()
|
|
|
|
|
else ctx.finish()
|
2015-02-04 09:26:32 +01:00
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
|
|
|
|
|
|
|
|
|
override def restart(): MapConcat[In, Out] = copy()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-09-02 13:23:32 +02:00
|
|
|
private[akka] final case class Take[T](count: Long) extends PushPullStage[T, T] {
|
2015-03-03 10:57:25 +01:00
|
|
|
private var left: Long = count
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
|
2014-10-08 18:16:57 +02:00
|
|
|
left -= 1
|
2014-11-12 10:43:39 +01:00
|
|
|
if (left > 0) ctx.push(elem)
|
|
|
|
|
else if (left == 0) ctx.pushAndFinish(elem)
|
|
|
|
|
else ctx.finish() //Handle negative take counts
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
2015-09-02 13:23:32 +02:00
|
|
|
|
|
|
|
|
override def onPull(ctx: Context[T]): SyncDirective =
|
|
|
|
|
if (left <= 0) ctx.finish()
|
|
|
|
|
else ctx.pull()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-03-03 10:57:25 +01:00
|
|
|
private[akka] final case class Drop[T](count: Long) extends PushStage[T, T] {
|
|
|
|
|
private var left: Long = count
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
|
2014-10-08 18:16:57 +02:00
|
|
|
if (left > 0) {
|
|
|
|
|
left -= 1
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pull()
|
|
|
|
|
} else ctx.push(elem)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-02-04 09:26:32 +01:00
|
|
|
private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out, decider: Supervision.Decider) extends PushPullStage[In, Out] {
|
2014-11-09 21:09:50 +01:00
|
|
|
private var aggregator = zero
|
2015-04-20 16:33:57 +02:00
|
|
|
private var pushedZero = false
|
2014-11-09 21:09:50 +01:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
|
2015-04-20 16:33:57 +02:00
|
|
|
if (pushedZero) {
|
|
|
|
|
aggregator = f(aggregator, elem)
|
|
|
|
|
ctx.push(aggregator)
|
|
|
|
|
} else {
|
|
|
|
|
aggregator = f(zero, elem)
|
|
|
|
|
ctx.push(zero)
|
|
|
|
|
}
|
2014-11-09 21:09:50 +01:00
|
|
|
}
|
|
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPull(ctx: Context[Out]): SyncDirective =
|
2015-04-20 16:33:57 +02:00
|
|
|
if (!pushedZero) {
|
|
|
|
|
pushedZero = true
|
|
|
|
|
if (ctx.isFinishing) ctx.pushAndFinish(aggregator) else ctx.push(aggregator)
|
|
|
|
|
} else ctx.pull()
|
2014-11-09 21:09:50 +01:00
|
|
|
|
2015-04-20 16:33:57 +02:00
|
|
|
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective =
|
|
|
|
|
if (pushedZero) ctx.finish()
|
|
|
|
|
else ctx.absorbTermination()
|
2015-02-04 09:26:32 +01:00
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
|
|
|
|
|
|
|
|
|
override def restart(): Scan[In, Out] = copy()
|
2014-11-09 21:09:50 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-02-04 09:26:32 +01:00
|
|
|
private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, decider: Supervision.Decider) extends PushPullStage[In, Out] {
|
2015-06-14 03:12:30 -04:00
|
|
|
private[this] var aggregator: Out = zero
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
|
2014-10-08 18:16:57 +02:00
|
|
|
aggregator = f(aggregator, elem)
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pull()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPull(ctx: Context[Out]): SyncDirective =
|
2014-11-12 10:43:39 +01:00
|
|
|
if (ctx.isFinishing) ctx.pushAndFinish(aggregator)
|
|
|
|
|
else ctx.pull()
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = ctx.absorbTermination()
|
2015-02-04 09:26:32 +01:00
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
|
|
|
|
|
|
|
|
|
override def restart(): Fold[In, Out] = copy()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2015-10-16 01:55:20 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-12-06 15:02:35 +08:00
|
|
|
final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] {
|
2015-12-03 00:04:00 +08:00
|
|
|
|
|
|
|
|
private val in = Inlet[T]("in")
|
|
|
|
|
private val out = Outlet[T]("out")
|
|
|
|
|
|
|
|
|
|
override val shape = FlowShape(in, out)
|
|
|
|
|
|
|
|
|
|
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
2015-12-07 22:48:57 +08:00
|
|
|
val startInHandler = new InHandler {
|
2015-12-06 15:02:35 +08:00
|
|
|
override def onPush(): Unit = {
|
2015-12-07 22:48:57 +08:00
|
|
|
// if else (to avoid using Iterator[T].flatten in hot code)
|
2015-12-09 01:26:42 +08:00
|
|
|
if (start.isDefined) emitMultiple(out, Iterator(start.get, grab(in)))
|
2015-12-07 22:48:57 +08:00
|
|
|
else emit(out, grab(in))
|
|
|
|
|
setHandler(in, restInHandler) // switch handler
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
2015-12-09 01:26:42 +08:00
|
|
|
emitMultiple(out, Iterator(start, end).flatten)
|
2015-12-07 22:48:57 +08:00
|
|
|
completeStage()
|
2015-12-06 15:02:35 +08:00
|
|
|
}
|
2015-10-16 01:55:20 +02:00
|
|
|
}
|
|
|
|
|
|
2015-12-07 22:48:57 +08:00
|
|
|
val restInHandler = new InHandler {
|
2015-12-09 01:26:42 +08:00
|
|
|
override def onPush(): Unit = emitMultiple(out, Iterator(inject, grab(in)))
|
2015-10-16 01:55:20 +02:00
|
|
|
|
2015-12-06 15:02:35 +08:00
|
|
|
override def onUpstreamFinish(): Unit = {
|
2015-12-07 22:48:57 +08:00
|
|
|
if (end.isDefined) emit(out, end.get)
|
2015-12-06 15:02:35 +08:00
|
|
|
completeStage()
|
2015-12-03 00:04:00 +08:00
|
|
|
}
|
2015-10-16 01:55:20 +02:00
|
|
|
}
|
|
|
|
|
|
2015-12-07 22:48:57 +08:00
|
|
|
val outHandler = new OutHandler {
|
2015-12-06 15:02:35 +08:00
|
|
|
override def onPull(): Unit = pull(in)
|
2015-10-16 01:55:20 +02:00
|
|
|
}
|
2015-12-07 22:48:57 +08:00
|
|
|
|
|
|
|
|
setHandler(in, startInHandler)
|
|
|
|
|
setHandler(out, outHandler)
|
2015-10-16 01:55:20 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2014-11-12 10:43:39 +01:00
|
|
|
private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immutable.Seq[T]] {
|
2014-11-09 21:09:50 +01:00
|
|
|
private val buf = {
|
|
|
|
|
val b = Vector.newBuilder[T]
|
|
|
|
|
b.sizeHint(n)
|
|
|
|
|
b
|
|
|
|
|
}
|
|
|
|
|
private var left = n
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = {
|
2014-11-09 21:09:50 +01:00
|
|
|
buf += elem
|
|
|
|
|
left -= 1
|
|
|
|
|
if (left == 0) {
|
|
|
|
|
val emit = buf.result()
|
|
|
|
|
buf.clear()
|
|
|
|
|
left = n
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.push(emit)
|
|
|
|
|
} else ctx.pull()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective =
|
2014-11-12 10:43:39 +01:00
|
|
|
if (ctx.isFinishing) {
|
2014-11-09 21:09:50 +01:00
|
|
|
val elem = buf.result()
|
2015-04-20 15:03:03 +02:00
|
|
|
buf.clear()
|
2014-11-09 21:09:50 +01:00
|
|
|
left = n
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pushAndFinish(elem)
|
|
|
|
|
} else ctx.pull()
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective =
|
|
|
|
|
if (left == n) ctx.finish()
|
|
|
|
|
else ctx.absorbTermination()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2015-11-19 00:11:07 +08:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
private[akka] final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) extends PushStage[T, T] {
|
|
|
|
|
private var left = n
|
|
|
|
|
|
|
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
|
|
|
|
|
left -= costFn(elem)
|
|
|
|
|
if (left >= 0) ctx.push(elem)
|
|
|
|
|
else ctx.fail(new StreamLimitReachedException(n))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-07-27 11:39:54 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] {
|
2015-12-12 14:02:13 +01:00
|
|
|
private var buf = Vector.empty[T]
|
2015-07-27 11:39:54 +02:00
|
|
|
|
|
|
|
|
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = {
|
2015-12-12 14:02:13 +01:00
|
|
|
buf :+= elem
|
|
|
|
|
if (buf.size < n) {
|
|
|
|
|
ctx.pull()
|
|
|
|
|
} else if (buf.size == n) {
|
|
|
|
|
ctx.push(buf)
|
|
|
|
|
} else if (step > n) {
|
|
|
|
|
if (buf.size == step)
|
|
|
|
|
buf = Vector.empty
|
2015-07-27 11:39:54 +02:00
|
|
|
ctx.pull()
|
|
|
|
|
} else {
|
2015-12-12 14:02:13 +01:00
|
|
|
buf = buf.drop(step)
|
|
|
|
|
if (buf.size == n) ctx.push(buf)
|
|
|
|
|
else ctx.pull()
|
2015-07-27 11:39:54 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPull(ctx: Context[immutable.Seq[T]]): SyncDirective =
|
2015-12-12 14:02:13 +01:00
|
|
|
if (!ctx.isFinishing) ctx.pull()
|
|
|
|
|
else if (buf.size >= n) ctx.finish()
|
|
|
|
|
else ctx.pushAndFinish(buf)
|
2015-07-27 11:39:54 +02:00
|
|
|
|
|
|
|
|
override def onUpstreamFinish(ctx: Context[immutable.Seq[T]]): TerminationDirective =
|
2015-12-12 14:02:13 +01:00
|
|
|
if (buf.isEmpty) ctx.finish()
|
2015-07-27 11:39:54 +02:00
|
|
|
else ctx.absorbTermination()
|
|
|
|
|
}
|
|
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2014-11-12 10:43:39 +01:00
|
|
|
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2014-10-08 18:16:57 +02:00
|
|
|
import OverflowStrategy._
|
|
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
private val buffer = FixedSizeBuffer[T](size)
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective =
|
2015-04-09 22:28:16 +02:00
|
|
|
if (ctx.isHoldingDownstream) ctx.pushAndPull(elem)
|
2014-11-12 10:43:39 +01:00
|
|
|
else enqueueAction(ctx, elem)
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
|
|
|
|
|
if (ctx.isFinishing) {
|
2015-08-01 00:13:14 +02:00
|
|
|
val elem = buffer.dequeue()
|
2014-11-12 10:43:39 +01:00
|
|
|
if (buffer.isEmpty) ctx.pushAndFinish(elem)
|
|
|
|
|
else ctx.push(elem)
|
2015-08-01 00:13:14 +02:00
|
|
|
} else if (ctx.isHoldingUpstream) ctx.pushAndPull(buffer.dequeue())
|
2015-04-09 22:28:16 +02:00
|
|
|
else if (buffer.isEmpty) ctx.holdDownstream()
|
2015-08-01 00:13:14 +02:00
|
|
|
else ctx.push(buffer.dequeue())
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective =
|
|
|
|
|
if (buffer.isEmpty) ctx.finish()
|
|
|
|
|
else ctx.absorbTermination()
|
2014-10-08 18:16:57 +02:00
|
|
|
|
|
|
|
|
val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = {
|
2015-11-25 21:29:35 -05:00
|
|
|
(overflowStrategy: @unchecked) match {
|
2015-08-01 00:13:14 +02:00
|
|
|
case DropHead ⇒ (ctx, elem) ⇒
|
2014-10-08 18:16:57 +02:00
|
|
|
if (buffer.isFull) buffer.dropHead()
|
|
|
|
|
buffer.enqueue(elem)
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pull()
|
2015-08-01 00:13:14 +02:00
|
|
|
case DropTail ⇒ (ctx, elem) ⇒
|
2014-10-08 18:16:57 +02:00
|
|
|
if (buffer.isFull) buffer.dropTail()
|
|
|
|
|
buffer.enqueue(elem)
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pull()
|
2015-08-01 00:13:14 +02:00
|
|
|
case DropBuffer ⇒ (ctx, elem) ⇒
|
2014-10-08 18:16:57 +02:00
|
|
|
if (buffer.isFull) buffer.clear()
|
|
|
|
|
buffer.enqueue(elem)
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pull()
|
2015-08-01 00:13:14 +02:00
|
|
|
case DropNew ⇒ (ctx, elem) ⇒
|
2015-06-01 18:08:13 +03:00
|
|
|
if (!buffer.isFull) buffer.enqueue(elem)
|
|
|
|
|
ctx.pull()
|
2015-08-01 00:13:14 +02:00
|
|
|
case Backpressure ⇒ (ctx, elem) ⇒
|
2014-10-08 18:16:57 +02:00
|
|
|
buffer.enqueue(elem)
|
2015-04-09 22:28:16 +02:00
|
|
|
if (buffer.isFull) ctx.holdUpstream()
|
2014-11-12 10:43:39 +01:00
|
|
|
else ctx.pull()
|
2015-08-01 00:13:14 +02:00
|
|
|
case Fail ⇒ (ctx, elem) ⇒
|
2015-01-30 10:30:56 +01:00
|
|
|
if (buffer.isFull) ctx.fail(new Fail.BufferOverflowException(s"Buffer overflow (max capacity was: $size)!"))
|
2014-10-08 18:16:57 +02:00
|
|
|
else {
|
|
|
|
|
buffer.enqueue(elem)
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pull()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2014-11-12 10:43:39 +01:00
|
|
|
private[akka] final case class Completed[T]() extends PushPullStage[T, T] {
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.finish()
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
override def onPull(ctx: Context[T]): SyncDirective = ctx.finish()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-02-04 09:26:32 +01:00
|
|
|
private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out,
|
|
|
|
|
decider: Supervision.Decider) extends DetachedStage[In, Out] {
|
2014-10-08 18:16:57 +02:00
|
|
|
private var agg: Any = null
|
|
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
|
2015-02-04 09:26:32 +01:00
|
|
|
agg =
|
|
|
|
|
if (agg == null) seed(elem)
|
|
|
|
|
else aggregate(agg.asInstanceOf[Out], elem)
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
if (!ctx.isHoldingDownstream) ctx.pull()
|
2014-11-09 21:09:50 +01:00
|
|
|
else {
|
2014-10-08 18:16:57 +02:00
|
|
|
val result = agg.asInstanceOf[Out]
|
|
|
|
|
agg = null
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pushAndPull(result)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
|
|
|
|
|
if (ctx.isFinishing) {
|
|
|
|
|
if (agg == null) ctx.finish()
|
2014-10-08 18:16:57 +02:00
|
|
|
else {
|
|
|
|
|
val result = agg.asInstanceOf[Out]
|
|
|
|
|
agg = null
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pushAndFinish(result)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
} else if (agg == null) ctx.holdDownstream()
|
2014-10-08 18:16:57 +02:00
|
|
|
else {
|
|
|
|
|
val result = agg.asInstanceOf[Out]
|
2015-02-04 09:26:32 +01:00
|
|
|
if (result == null) throw new NullPointerException
|
2014-10-08 18:16:57 +02:00
|
|
|
agg = null
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.push(result)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = ctx.absorbTermination()
|
2015-02-04 09:26:32 +01:00
|
|
|
|
|
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
|
|
|
|
|
|
|
|
|
override def restart(): Conflate[In, Out] = copy()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2014-11-12 10:43:39 +01:00
|
|
|
private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapolate: Seed ⇒ (Out, Seed)) extends DetachedStage[In, Out] {
|
2014-11-25 12:26:24 +01:00
|
|
|
private var s: Seed = _
|
|
|
|
|
private var started: Boolean = false
|
|
|
|
|
private var expanded: Boolean = false
|
2014-10-08 18:16:57 +02:00
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onPush(elem: In, ctx: DetachedContext[Out]): UpstreamDirective = {
|
2014-10-08 18:16:57 +02:00
|
|
|
s = seed(elem)
|
2014-11-25 12:26:24 +01:00
|
|
|
started = true
|
|
|
|
|
expanded = false
|
2015-04-09 22:28:16 +02:00
|
|
|
if (ctx.isHoldingDownstream) {
|
2014-11-25 12:26:24 +01:00
|
|
|
val (emit, newS) = extrapolate(s)
|
2014-10-08 18:16:57 +02:00
|
|
|
s = newS
|
2014-11-25 12:26:24 +01:00
|
|
|
expanded = true
|
2014-11-12 10:43:39 +01:00
|
|
|
ctx.pushAndPull(emit)
|
2015-04-09 22:28:16 +02:00
|
|
|
} else ctx.holdUpstream()
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
2014-11-12 10:43:39 +01:00
|
|
|
override def onPull(ctx: DetachedContext[Out]): DownstreamDirective = {
|
2014-11-25 12:26:24 +01:00
|
|
|
if (ctx.isFinishing) {
|
|
|
|
|
if (!started) ctx.finish()
|
|
|
|
|
else ctx.pushAndFinish(extrapolate(s)._1)
|
2015-04-09 22:28:16 +02:00
|
|
|
} else if (!started) ctx.holdDownstream()
|
2014-10-08 18:16:57 +02:00
|
|
|
else {
|
2014-11-25 12:26:24 +01:00
|
|
|
val (emit, newS) = extrapolate(s)
|
2014-10-08 18:16:57 +02:00
|
|
|
s = newS
|
2014-11-25 12:26:24 +01:00
|
|
|
expanded = true
|
2015-04-09 22:28:16 +02:00
|
|
|
if (ctx.isHoldingUpstream) ctx.pushAndPull(emit)
|
2014-11-12 10:43:39 +01:00
|
|
|
else ctx.push(emit)
|
2014-10-08 18:16:57 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
2014-11-25 12:26:24 +01:00
|
|
|
|
|
|
|
|
override def onUpstreamFinish(ctx: DetachedContext[Out]): TerminationDirective = {
|
|
|
|
|
if (expanded) ctx.finish()
|
|
|
|
|
else ctx.absorbTermination()
|
|
|
|
|
}
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2015-08-01 00:13:14 +02:00
|
|
|
override def decide(t: Throwable): Supervision.Directive = Supervision.Stop
|
2015-02-04 09:26:32 +01:00
|
|
|
|
2015-08-01 00:13:14 +02:00
|
|
|
override def restart(): Expand[In, Out, Seed] =
|
2015-02-04 09:26:32 +01:00
|
|
|
throw new UnsupportedOperationException("Expand doesn't support restart")
|
2014-11-19 19:50:23 +01:00
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object MapAsync {
|
2015-12-20 12:54:05 +01:00
|
|
|
final class Holder[T](var elem: T)
|
2015-04-09 22:28:16 +02:00
|
|
|
val NotYetThere = Failure(new Exception)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-10-31 14:46:10 +01:00
|
|
|
private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Future[Out])
|
|
|
|
|
extends GraphStage[FlowShape[In, Out]] {
|
2015-04-09 12:21:12 +02:00
|
|
|
|
2015-04-09 22:28:16 +02:00
|
|
|
import MapAsync._
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
private val in = Inlet[In]("in")
|
|
|
|
|
private val out = Outlet[Out]("out")
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-12-14 17:02:00 +01:00
|
|
|
override def initialAttributes = Attributes.name("MapAsync")
|
2015-10-31 14:46:10 +01:00
|
|
|
override val shape = FlowShape(in, out)
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
|
|
|
|
|
override def toString = s"MapAsync.Logic(buffer=$buffer)"
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
val decider =
|
|
|
|
|
inheritedAttributes.getAttribute(classOf[SupervisionStrategy])
|
|
|
|
|
.map(_.decider).getOrElse(Supervision.stoppingDecider)
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-12-20 12:54:05 +01:00
|
|
|
val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism)
|
2015-10-31 14:46:10 +01:00
|
|
|
def todo = buffer.used
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
@tailrec private def pushOne(): Unit =
|
|
|
|
|
if (buffer.isEmpty) {
|
|
|
|
|
if (isClosed(in)) completeStage()
|
|
|
|
|
else if (!hasBeenPulled(in)) pull(in)
|
2015-12-20 12:54:05 +01:00
|
|
|
} else if (buffer.peek.elem == NotYetThere) {
|
2015-10-31 14:46:10 +01:00
|
|
|
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
2015-12-20 12:54:05 +01:00
|
|
|
} else buffer.dequeue().elem match {
|
2015-10-31 14:46:10 +01:00
|
|
|
case Failure(ex) ⇒ pushOne()
|
2015-04-09 22:28:16 +02:00
|
|
|
case Success(elem) ⇒
|
2015-10-31 14:46:10 +01:00
|
|
|
push(out, elem)
|
|
|
|
|
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
|
|
|
|
|
2015-12-20 12:54:05 +01:00
|
|
|
def failOrPull(holder: Holder[Try[Out]], f: Failure[Out]) =
|
2015-10-31 14:46:10 +01:00
|
|
|
if (decider(f.exception) == Supervision.Stop) failStage(f.exception)
|
|
|
|
|
else {
|
2015-12-20 12:54:05 +01:00
|
|
|
holder.elem = f
|
2015-10-31 14:46:10 +01:00
|
|
|
if (isAvailable(out)) pushOne()
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
val futureCB =
|
2015-12-20 12:54:05 +01:00
|
|
|
getAsyncCallback[(Holder[Try[Out]], Try[Out])]({
|
|
|
|
|
case (holder, f: Failure[_]) ⇒ failOrPull(holder, f)
|
|
|
|
|
case (holder, s @ Success(elem)) ⇒
|
2015-10-31 14:46:10 +01:00
|
|
|
if (elem == null) {
|
|
|
|
|
val ex = ReactiveStreamsCompliance.elementMustNotBeNullException
|
2015-12-20 12:54:05 +01:00
|
|
|
failOrPull(holder, Failure(ex))
|
2015-10-31 14:46:10 +01:00
|
|
|
} else {
|
2015-12-20 12:54:05 +01:00
|
|
|
holder.elem = s
|
2015-10-31 14:46:10 +01:00
|
|
|
if (isAvailable(out)) pushOne()
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
val future = f(grab(in))
|
2015-12-20 12:54:05 +01:00
|
|
|
val holder = new Holder[Try[Out]](NotYetThere)
|
|
|
|
|
buffer.enqueue(holder)
|
|
|
|
|
future.onComplete(result ⇒ futureCB.invoke(holder -> result))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
2015-04-09 22:28:16 +02:00
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒
|
2015-10-31 14:46:10 +01:00
|
|
|
if (decider(ex) == Supervision.Stop) failStage(ex)
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
2015-10-31 14:46:10 +01:00
|
|
|
if (todo < parallelism) tryPull(in)
|
|
|
|
|
}
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (todo == 0) completeStage()
|
|
|
|
|
}
|
|
|
|
|
})
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = pushOne()
|
|
|
|
|
})
|
|
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-10-31 14:46:10 +01:00
|
|
|
private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out])
|
|
|
|
|
extends GraphStage[FlowShape[In, Out]] {
|
|
|
|
|
|
|
|
|
|
private val in = Inlet[In]("in")
|
|
|
|
|
private val out = Outlet[Out]("out")
|
|
|
|
|
|
2015-12-14 17:02:00 +01:00
|
|
|
override def initialAttributes = Attributes.name("MapAsyncUnordered")
|
2015-10-31 14:46:10 +01:00
|
|
|
override val shape = FlowShape(in, out)
|
|
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
|
|
|
|
|
override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)"
|
|
|
|
|
|
|
|
|
|
val decider =
|
|
|
|
|
inheritedAttributes.getAttribute(classOf[SupervisionStrategy])
|
|
|
|
|
.map(_.decider).getOrElse(Supervision.stoppingDecider)
|
|
|
|
|
|
|
|
|
|
var inFlight = 0
|
2015-12-20 12:54:05 +01:00
|
|
|
val buffer = new BoundedBuffer[Out](parallelism)
|
2015-10-31 14:46:10 +01:00
|
|
|
def todo = inFlight + buffer.used
|
|
|
|
|
|
|
|
|
|
def failOrPull(ex: Throwable) =
|
|
|
|
|
if (decider(ex) == Supervision.Stop) failStage(ex)
|
|
|
|
|
else if (isClosed(in) && todo == 0) completeStage()
|
|
|
|
|
else if (!hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
|
|
|
|
|
val futureCB =
|
|
|
|
|
getAsyncCallback((result: Try[Out]) ⇒ {
|
|
|
|
|
inFlight -= 1
|
|
|
|
|
result match {
|
|
|
|
|
case Failure(ex) ⇒ failOrPull(ex)
|
|
|
|
|
case Success(elem) ⇒
|
|
|
|
|
if (elem == null) {
|
|
|
|
|
val ex = ReactiveStreamsCompliance.elementMustNotBeNullException
|
|
|
|
|
failOrPull(ex)
|
|
|
|
|
} else if (isAvailable(out)) {
|
|
|
|
|
if (!hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
push(out, elem)
|
|
|
|
|
} else buffer.enqueue(elem)
|
|
|
|
|
}
|
|
|
|
|
}).invoke _
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = {
|
|
|
|
|
try {
|
|
|
|
|
val future = f(grab(in))
|
|
|
|
|
inFlight += 1
|
|
|
|
|
future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
|
|
|
|
} catch {
|
|
|
|
|
case NonFatal(ex) ⇒
|
|
|
|
|
if (decider(ex) == Supervision.Stop) failStage(ex)
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
2015-10-31 14:46:10 +01:00
|
|
|
if (todo < parallelism) tryPull(in)
|
|
|
|
|
}
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
if (todo == 0) completeStage()
|
|
|
|
|
}
|
|
|
|
|
})
|
2015-04-09 22:28:16 +02:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (!buffer.isEmpty) push(out, buffer.dequeue())
|
|
|
|
|
else if (isClosed(in) && todo == 0) completeStage()
|
|
|
|
|
if (todo < parallelism && !hasBeenPulled(in)) tryPull(in)
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
}
|
2015-04-09 22:28:16 +02:00
|
|
|
}
|
2015-04-09 12:21:12 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2015-10-30 22:13:10 -04:00
|
|
|
private[akka] final case class Log[T](name: String, extract: T ⇒ Any,
|
|
|
|
|
logAdapter: Option[LoggingAdapter],
|
|
|
|
|
decider: Supervision.Decider) extends PushStage[T, T] {
|
2015-04-09 12:21:12 +02:00
|
|
|
|
|
|
|
|
import Log._
|
|
|
|
|
|
|
|
|
|
private var logLevels: LogLevels = _
|
|
|
|
|
private var log: LoggingAdapter = _
|
|
|
|
|
|
2015-05-11 00:09:59 +02:00
|
|
|
// TODO more optimisations can be done here - prepare logOnPush function etc
|
|
|
|
|
|
2015-05-11 13:16:37 +02:00
|
|
|
override def preStart(ctx: LifecycleContext): Unit = {
|
2015-10-31 14:46:10 +01:00
|
|
|
logLevels = ctx.attributes.get[LogLevels](DefaultLogLevels)
|
2015-05-11 00:09:59 +02:00
|
|
|
log = logAdapter match {
|
|
|
|
|
case Some(l) ⇒ l
|
2015-05-27 00:27:05 +02:00
|
|
|
case _ ⇒
|
2015-06-23 18:28:53 +02:00
|
|
|
val mat = try ActorMaterializer.downcast(ctx.materializer)
|
2015-05-27 00:27:05 +02:00
|
|
|
catch {
|
|
|
|
|
case ex: Exception ⇒
|
2015-06-23 18:28:53 +02:00
|
|
|
throw new RuntimeException("Log stage can only provide LoggingAdapter when used with ActorMaterializer! " +
|
2015-05-27 00:27:05 +02:00
|
|
|
"Provide a LoggingAdapter explicitly or use the actor based flow materializer.", ex)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Logging(mat.system, ctx)(fromLifecycleContext)
|
2015-04-09 12:21:12 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onPush(elem: T, ctx: Context[T]): SyncDirective = {
|
|
|
|
|
if (isEnabled(logLevels.onElement))
|
|
|
|
|
log.log(logLevels.onElement, "[{}] Element: {}", name, extract(elem))
|
|
|
|
|
|
|
|
|
|
ctx.push(elem)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = {
|
|
|
|
|
if (isEnabled(logLevels.onFailure))
|
|
|
|
|
logLevels.onFailure match {
|
|
|
|
|
case Logging.ErrorLevel ⇒ log.error(cause, "[{}] Upstream failed.", name)
|
|
|
|
|
case level ⇒ log.log(level, "[{}] Upstream failed, cause: {}: {}", name, Logging.simpleName(cause.getClass), cause.getMessage)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
super.onUpstreamFailure(cause, ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
|
|
|
|
|
if (isEnabled(logLevels.onFinish))
|
|
|
|
|
log.log(logLevels.onFinish, "[{}] Upstream finished.", name)
|
|
|
|
|
|
|
|
|
|
super.onUpstreamFinish(ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = {
|
|
|
|
|
if (isEnabled(logLevels.onFinish))
|
|
|
|
|
log.log(logLevels.onFinish, "[{}] Downstream finished.", name)
|
|
|
|
|
|
|
|
|
|
super.onDownstreamFinish(ctx)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def isEnabled(l: LogLevel): Boolean = l.asInt != OffInt
|
|
|
|
|
|
2015-10-30 22:13:10 -04:00
|
|
|
override def decide(t: Throwable): Supervision.Directive = decider(t)
|
2015-04-09 12:21:12 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[akka] object Log {
|
|
|
|
|
|
2015-05-27 00:27:05 +02:00
|
|
|
/**
|
|
|
|
|
* Must be located here to be visible for implicit resolution, when LifecycleContext is passed to [[Logging]]
|
|
|
|
|
* More specific LogSource than `fromString`, which would add the ActorSystem name in addition to the supervision to the log source.
|
|
|
|
|
*/
|
|
|
|
|
final val fromLifecycleContext = new LogSource[LifecycleContext] {
|
|
|
|
|
|
|
|
|
|
// do not expose private context classes (of OneBoundedInterpreter)
|
2015-06-23 18:28:53 +02:00
|
|
|
override def getClazz(t: LifecycleContext): Class[_] = classOf[Materializer]
|
2015-05-27 00:27:05 +02:00
|
|
|
|
|
|
|
|
override def genString(t: LifecycleContext): String = {
|
2015-06-23 18:28:53 +02:00
|
|
|
try s"$DefaultLoggerName(${ActorMaterializer.downcast(t.materializer).supervisor.path})"
|
2015-05-27 00:27:05 +02:00
|
|
|
catch {
|
|
|
|
|
case ex: Exception ⇒ LogSource.fromString.genString(DefaultLoggerName)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private final val DefaultLoggerName = "akka.stream.Log"
|
2015-04-09 12:21:12 +02:00
|
|
|
private final val OffInt = LogLevels.Off.asInt
|
|
|
|
|
private final val DefaultLogLevels = LogLevels(onElement = Logging.DebugLevel, onFinish = Logging.DebugLevel, onFailure = Logging.ErrorLevel)
|
2015-06-01 18:08:13 +03:00
|
|
|
}
|
2015-09-11 15:50:17 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
private[stream] object TimerKeys {
|
|
|
|
|
case object TakeWithinTimerKey
|
|
|
|
|
case object DropWithinTimerKey
|
|
|
|
|
case object GroupedWithinTimerKey
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
|
|
|
|
|
val in = Inlet[T]("in")
|
|
|
|
|
val out = Outlet[immutable.Seq[T]]("out")
|
2015-12-14 17:02:00 +01:00
|
|
|
override def initialAttributes = Attributes.name("GroupedWithin")
|
2015-09-11 15:50:17 +02:00
|
|
|
val shape = FlowShape(in, out)
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
2015-09-11 15:50:17 +02:00
|
|
|
private val buf: VectorBuilder[T] = new VectorBuilder
|
|
|
|
|
// True if:
|
|
|
|
|
// - buf is nonEmpty
|
|
|
|
|
// AND
|
|
|
|
|
// - timer fired OR group is full
|
|
|
|
|
private var groupClosed = false
|
|
|
|
|
private var finished = false
|
|
|
|
|
private var elements = 0
|
|
|
|
|
|
|
|
|
|
private val GroupedWithinTimer = "GroupedWithinTimer"
|
|
|
|
|
|
|
|
|
|
override def preStart() = {
|
|
|
|
|
schedulePeriodically(GroupedWithinTimer, d)
|
|
|
|
|
pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def nextElement(elem: T): Unit = {
|
|
|
|
|
buf += elem
|
|
|
|
|
elements += 1
|
|
|
|
|
if (elements == n) {
|
|
|
|
|
schedulePeriodically(GroupedWithinTimer, d)
|
|
|
|
|
closeGroup()
|
|
|
|
|
} else pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def closeGroup(): Unit = {
|
|
|
|
|
groupClosed = true
|
|
|
|
|
if (isAvailable(out)) emitGroup()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def emitGroup(): Unit = {
|
|
|
|
|
push(out, buf.result())
|
|
|
|
|
buf.clear()
|
|
|
|
|
if (!finished) startNewGroup()
|
|
|
|
|
else completeStage()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def startNewGroup(): Unit = {
|
|
|
|
|
elements = 0
|
|
|
|
|
groupClosed = false
|
|
|
|
|
if (isAvailable(in)) nextElement(grab(in))
|
|
|
|
|
else if (!hasBeenPulled(in)) pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit =
|
|
|
|
|
if (!groupClosed) nextElement(grab(in)) // otherwise keep the element for next round
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
|
|
|
|
finished = true
|
|
|
|
|
if (!groupClosed && elements > 0) closeGroup()
|
|
|
|
|
else completeStage()
|
|
|
|
|
}
|
|
|
|
|
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = if (groupClosed) emitGroup()
|
|
|
|
|
override def onDownstreamFinish(): Unit = completeStage()
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
override protected def onTimer(timerKey: Any) =
|
|
|
|
|
if (elements > 0) closeGroup()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-25 21:29:35 -05:00
|
|
|
private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
|
2015-11-21 13:48:10 -05:00
|
|
|
|
|
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
2015-11-27 15:46:35 -05:00
|
|
|
val size = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
|
|
|
|
|
val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element
|
2015-11-21 13:48:10 -05:00
|
|
|
val timerName = "DelayedTimer"
|
|
|
|
|
var willStop = false
|
|
|
|
|
|
2015-11-25 21:29:35 -05:00
|
|
|
setHandler(in, handler = new InHandler {
|
2015-11-21 13:48:10 -05:00
|
|
|
override def onPush(): Unit = {
|
2015-11-25 21:29:35 -05:00
|
|
|
if (buffer.isFull) (strategy: @unchecked) match {
|
|
|
|
|
case EmitEarly ⇒
|
|
|
|
|
if (!isTimerActive(timerName))
|
|
|
|
|
push(out, buffer.dequeue()._2)
|
|
|
|
|
else {
|
|
|
|
|
cancelTimer(timerName)
|
|
|
|
|
onTimer(timerName)
|
|
|
|
|
}
|
|
|
|
|
case DelayOverflowStrategy.DropHead ⇒
|
|
|
|
|
buffer.dropHead()
|
2015-11-27 15:46:35 -05:00
|
|
|
grabAndPull(true)
|
2015-11-25 21:29:35 -05:00
|
|
|
case DelayOverflowStrategy.DropTail ⇒
|
|
|
|
|
buffer.dropTail()
|
2015-11-27 15:46:35 -05:00
|
|
|
grabAndPull(true)
|
2015-11-25 21:29:35 -05:00
|
|
|
case DelayOverflowStrategy.DropNew ⇒
|
|
|
|
|
grab(in)
|
|
|
|
|
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
|
|
|
|
|
case DelayOverflowStrategy.DropBuffer ⇒
|
|
|
|
|
buffer.clear()
|
2015-11-27 15:46:35 -05:00
|
|
|
grabAndPull(true)
|
2015-11-25 21:29:35 -05:00
|
|
|
case DelayOverflowStrategy.Fail ⇒
|
2015-11-27 15:46:35 -05:00
|
|
|
failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
|
|
|
|
|
case DelayOverflowStrategy.Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
|
2015-11-25 21:29:35 -05:00
|
|
|
}
|
|
|
|
|
else {
|
2015-11-27 15:46:35 -05:00
|
|
|
grabAndPull(strategy != DelayOverflowStrategy.Backpressure || buffer.size < size - 1)
|
|
|
|
|
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
|
2015-11-25 21:29:35 -05:00
|
|
|
}
|
2015-11-21 13:48:10 -05:00
|
|
|
}
|
2015-11-25 21:29:35 -05:00
|
|
|
|
2015-11-27 15:46:35 -05:00
|
|
|
def grabAndPull(pullCondition: Boolean): Unit = {
|
2015-12-02 14:58:30 -05:00
|
|
|
buffer.enqueue((System.nanoTime(), grab(in)))
|
2015-11-25 21:29:35 -05:00
|
|
|
if (pullCondition) pull(in)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def onUpstreamFinish(): Unit = {
|
2015-11-21 13:48:10 -05:00
|
|
|
if (isAvailable(out) && isTimerActive(timerName)) willStop = true
|
|
|
|
|
else completeStage()
|
2015-11-25 21:29:35 -05:00
|
|
|
}
|
2015-11-21 13:48:10 -05:00
|
|
|
})
|
|
|
|
|
|
|
|
|
|
setHandler(out, new OutHandler {
|
2015-11-25 21:29:35 -05:00
|
|
|
override def onPull(): Unit = {
|
|
|
|
|
if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0)
|
|
|
|
|
push(out, buffer.dequeue()._2)
|
|
|
|
|
|
|
|
|
|
if (!willStop && !hasBeenPulled(in)) pull(in)
|
|
|
|
|
completeIfReady()
|
|
|
|
|
}
|
2015-11-21 13:48:10 -05:00
|
|
|
})
|
|
|
|
|
|
2015-11-25 21:29:35 -05:00
|
|
|
def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage()
|
|
|
|
|
|
2015-12-02 14:58:30 -05:00
|
|
|
def nextElementWaitTime(): Long = d.toMillis - (System.nanoTime() - buffer.peek()._1) * 1000 * 1000
|
2015-11-25 21:29:35 -05:00
|
|
|
|
2015-11-21 13:48:10 -05:00
|
|
|
final override protected def onTimer(key: Any): Unit = {
|
2015-11-25 21:29:35 -05:00
|
|
|
push(out, buffer.dequeue()._2)
|
|
|
|
|
if (!buffer.isEmpty) {
|
|
|
|
|
val waitTime = nextElementWaitTime()
|
2015-12-02 14:58:30 -05:00
|
|
|
if (waitTime > 10) scheduleOnce(timerName, waitTime.millis)
|
2015-11-25 21:29:35 -05:00
|
|
|
}
|
|
|
|
|
completeIfReady()
|
2015-11-21 13:48:10 -05:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toString = "Delay"
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
2015-09-11 15:50:17 +02:00
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit = push(out, grab(in))
|
|
|
|
|
})
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
})
|
|
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
final override protected def onTimer(key: Any): Unit =
|
|
|
|
|
completeStage()
|
|
|
|
|
|
2015-10-21 17:52:11 +02:00
|
|
|
override def preStart(): Unit = scheduleOnce("TakeWithinTimer", timeout)
|
2015-09-11 15:50:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toString = "TakeWithin"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
2015-10-31 14:46:10 +01:00
|
|
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
2015-12-07 12:41:38 +01:00
|
|
|
|
|
|
|
|
private var allow = false
|
|
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
setHandler(in, new InHandler {
|
|
|
|
|
override def onPush(): Unit =
|
|
|
|
|
if (allow) push(out, grab(in))
|
|
|
|
|
else pull(in)
|
|
|
|
|
})
|
|
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
setHandler(out, new OutHandler {
|
|
|
|
|
override def onPull(): Unit = pull(in)
|
|
|
|
|
})
|
|
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
final override protected def onTimer(key: Any): Unit = allow = true
|
|
|
|
|
|
2015-10-21 17:52:11 +02:00
|
|
|
override def preStart(): Unit = scheduleOnce("DropWithinTimer", timeout)
|
2015-09-11 15:50:17 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def toString = "DropWithin"
|
|
|
|
|
}
|