fixes #21193 Expose a number of GraphStage attributes

This commit is contained in:
Kam Kasravi 2016-08-17 06:08:15 -07:00
parent a81a61ba1f
commit 438f489060
6 changed files with 32 additions and 32 deletions

View file

@ -15,11 +15,11 @@ import scala.concurrent.duration.{ FiniteDuration, _ }
* INTERNAL API
*/
class Throttle[T](
cost: Int,
per: FiniteDuration,
maximumBurst: Int,
costCalculation: (T) Int,
mode: ThrottleMode)
val cost: Int,
val per: FiniteDuration,
val maximumBurst: Int,
val costCalculation: (T) Int,
val mode: ThrottleMode)
extends SimpleLinearGraphStage[T] {
require(cost > 0, "cost must be > 0")
require(per.toNanos > 0, "per time must be > 0")

View file

@ -31,7 +31,7 @@ object Timers {
TimeUnit.NANOSECONDS)
}
final class Initial[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
final class Initial[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.initial
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
@ -58,7 +58,7 @@ object Timers {
}
final class Completion[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
final class Completion[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.completion
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
@ -79,7 +79,7 @@ object Timers {
}
final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
final class Idle[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.idle
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
@ -106,7 +106,7 @@ object Timers {
}
final class BackpressureTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
final class BackpressureTimeout[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.backpressureTimeout
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
@ -208,7 +208,7 @@ object Timers {
}
final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () O) extends GraphStage[FlowShape[I, O]] {
final class IdleInject[I, O >: I](val timeout: FiniteDuration, val inject: () O) extends GraphStage[FlowShape[I, O]] {
val in: Inlet[I] = Inlet("IdleInject.in")
val out: Outlet[O] = Outlet("IdleInject.out")

View file

@ -221,7 +221,7 @@ object GraphStages {
}
}
final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T)
final class TickSource[T](val initialDelay: FiniteDuration, val interval: FiniteDuration, val tick: T)
extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] {
override val shape = SourceShape(Outlet[T]("TickSource.out"))
val out = shape.out

View file

@ -522,7 +522,7 @@ final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Se
/**
* INTERNAL API
*/
final case class LimitWeighted[T](n: Long, costFn: T Long) extends GraphStage[FlowShape[T, T]] {
final case class LimitWeighted[T](val n: Long, val costFn: T Long) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("LimitWeighted.in")
val out = Outlet[T]("LimitWeighted.out")
override val shape = FlowShape(in, out)
@ -554,7 +554,7 @@ final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) extends GraphStag
/**
* INTERNAL API
*/
final case class Sliding[T](n: Int, step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
require(step > 0, "step must be greater than 0")
@ -672,7 +672,7 @@ final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extend
/**
* INTERNAL API
*/
final case class Batch[In, Out](max: Long, costFn: In Long, seed: In Out, aggregate: (Out, In) Out)
final case class Batch[In, Out](val max: Long, val costFn: In Long, val seed: In Out, val aggregate: (Out, In) Out)
extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Batch.in")
@ -798,7 +798,7 @@ final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out
/**
* INTERNAL API
*/
final class Expand[In, Out](extrapolate: In Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
final class Expand[In, Out](val extrapolate: In Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("expand.in")
private val out = Outlet[Out]("expand.out")
@ -1127,7 +1127,7 @@ private[stream] object TimerKeys {
case object GroupedWithinTimerKey
}
final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
@ -1203,7 +1203,7 @@ final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowS
}
}
final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer"
override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
@ -1289,7 +1289,7 @@ final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends
override def toString = "Delay"
}
final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
setHandler(in, new InHandler {
@ -1309,7 +1309,7 @@ final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStag
override def toString = "TakeWithin"
}
final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private var allow = false
@ -1335,7 +1335,7 @@ final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStag
/**
* INTERNAL API
*/
final class Reduce[T](f: (T, T) T) extends SimpleLinearGraphStage[T] {
final class Reduce[T](val f: (T, T) T) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.reduce
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self
@ -1379,7 +1379,7 @@ private[stream] object RecoverWith {
val InfiniteRetries = -1
}
final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1")
override def initialAttributes = DefaultAttributes.recoverWith

View file

@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
/**
* INTERNAL API
*/
final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] {
final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] {
private val in = Inlet[Graph[SourceShape[T], M]]("flatten.in")
private val out = Outlet[T]("flatten.out")
@ -103,7 +103,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[
/**
* INTERNAL API
*/
final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])]] {
final class PrefixAndTail[T](val n: Int) extends GraphStage[FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])]] {
val in: Inlet[T] = Inlet("PrefixAndTail.in")
val out: Outlet[(immutable.Seq[T], Source[T, NotUsed])] = Outlet("PrefixAndTail.out")
override val shape: FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])] = FlowShape(in, out)
@ -211,7 +211,7 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable.
/**
* INTERNAL API
*/
final class GroupBy[T, K](maxSubstreams: Int, keyFor: T K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
val in: Inlet[T] = Inlet("GroupBy.in")
val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out")
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)
@ -403,7 +403,7 @@ object Split {
/**
* INTERNAL API
*/
final class Split[T](decision: Split.SplitDecision, p: T Boolean, substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
final class Split[T](val decision: Split.SplitDecision, val p: T Boolean, val substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] {
val in: Inlet[T] = Inlet("Split.in")
val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out")
override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out)

View file

@ -40,7 +40,7 @@ object Merge {
*
* '''Cancels when''' downstream cancels
*/
final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
// one input might seem counter intuitive but saves us from special handling in other places
require(inputPorts >= 1, "A Merge must have one or more input ports")
@ -146,7 +146,7 @@ object MergePreferred {
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports")
override def initialAttributes = DefaultAttributes.mergePreferred
@ -258,7 +258,7 @@ object Interleave {
* '''Cancels when''' downstream cancels
*
*/
final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "input ports must be > 1")
require(segmentSize > 0, "segmentSize must be > 0")
@ -397,7 +397,7 @@ object Broadcast {
* If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*
*/
final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
// one output might seem counter intuitive but saves us from special handling in other places
require(outputPorts >= 1, "A Broadcast must have one or more output ports")
val in: Inlet[T] = Inlet[T]("Broadcast.in")
@ -496,7 +496,7 @@ object Partition {
* when all downstreams cancel
*/
final class Partition[T](outputPorts: Int, partitioner: T Int) extends GraphStage[UniformFanOutShape[T, T]] {
final class Partition[T](val outputPorts: Int, val partitioner: T Int) extends GraphStage[UniformFanOutShape[T, T]] {
val in: Inlet[T] = Inlet[T]("Partition.in")
val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i Outlet[T]("Partition.out" + i))
@ -602,7 +602,7 @@ object Balance {
*
* '''Cancels when''' all downstreams cancel
*/
final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
// one output might seem counter intuitive but saves us from special handling in other places
require(outputPorts >= 1, "A Balance must have one or more output ports")
val in: Inlet[T] = Inlet[T]("Balance.in")
@ -870,7 +870,7 @@ object Concat {
*
* '''Cancels when''' downstream cancels
*/
final class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] {
final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Concat must have more than 1 input ports")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i Inlet[T]("Concat.in" + i))
val out: Outlet[T] = Outlet[T]("Concat.out")