Minor streams touchups

This commit is contained in:
Viktor Klang 2016-01-18 17:49:32 +01:00
parent 93952f356a
commit 58510a2b3f
9 changed files with 74 additions and 70 deletions

View file

@ -7,6 +7,7 @@ import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.DelayOverflowStrategy.EmitEarly
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ FixedSizeBuffer, BoundedBuffer, ReactiveStreamsCompliance }
import akka.stream.stage._
@ -241,6 +242,9 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, de
* INTERNAL API
*/
final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] {
ReactiveStreamsCompliance.requireNonNullElement(inject)
if(start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get)
if(end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get)
private val in = Inlet[T]("in")
private val out = Outlet[T]("out")
@ -538,15 +542,14 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
private val in = Inlet[In]("in")
private val out = Outlet[Out]("out")
override def initialAttributes = Attributes.name("MapAsync")
override def initialAttributes = DefaultAttributes.mapAsync
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
override def toString = s"MapAsync.Logic(buffer=$buffer)"
val decider =
inheritedAttributes.getAttribute(classOf[SupervisionStrategy])
.map(_.decider).getOrElse(Supervision.stoppingDecider)
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
val decider = inheritedAttributes.getAttribute(classOf[SupervisionStrategy]).map(_.decider).getOrElse(Supervision.stoppingDecider)
val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism)
def todo = buffer.used
@ -617,7 +620,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
private val in = Inlet[In]("in")
private val out = Outlet[Out]("out")
override def initialAttributes = Attributes.name("MapAsyncUnordered")
override def initialAttributes = DefaultAttributes.mapAsyncUnordered
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
@ -782,10 +785,13 @@ private[stream] object TimerKeys {
case object GroupedWithinTimerKey
}
private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
val in = Inlet[T]("in")
val out = Outlet[immutable.Seq[T]]("out")
override def initialAttributes = Attributes.name("GroupedWithin")
override def initialAttributes = DefaultAttributes.groupedWithin
val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
@ -854,15 +860,21 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS
}
}
private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer"
override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
val size = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val size =
inheritedAttributes.getAttribute(classOf[InputBuffer]) match {
case None throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this")
case Some(InputBuffer(min, max)) max
}
val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element
val timerName = "DelayedTimer"
var willStop = false
setHandler(in, handler = new InHandler {
//FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full
override def onPush(): Unit = {
if (buffer.isFull) (strategy: @unchecked) match {
case EmitEarly
@ -932,7 +944,7 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg
override def toString = "Delay"
}
private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
private[stream] final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
setHandler(in, new InHandler {
@ -952,7 +964,7 @@ private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinea
override def toString = "TakeWithin"
}
private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
private[stream] final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private var allow = false
@ -973,4 +985,4 @@ private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinea
}
override def toString = "DropWithin"
}
}