diff --git a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala index 577ea899e0..44cd31ee6c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Throttle.scala @@ -15,11 +15,11 @@ import scala.concurrent.duration.{ FiniteDuration, _ } * INTERNAL API */ class Throttle[T]( - cost: Int, - per: FiniteDuration, - maximumBurst: Int, - costCalculation: (T) ⇒ Int, - mode: ThrottleMode) + val cost: Int, + val per: FiniteDuration, + val maximumBurst: Int, + val costCalculation: (T) ⇒ Int, + val mode: ThrottleMode) extends SimpleLinearGraphStage[T] { require(cost > 0, "cost must be > 0") require(per.toNanos > 0, "per time must be > 0") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index daa0fea687..5706fc0e25 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -31,7 +31,7 @@ object Timers { TimeUnit.NANOSECONDS) } - final class Initial[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + final class Initial[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def initialAttributes = DefaultAttributes.initial override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = @@ -58,7 +58,7 @@ object Timers { } - final class Completion[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + final class Completion[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def initialAttributes = DefaultAttributes.completion override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = @@ -79,7 +79,7 @@ object Timers { } - final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + final class Idle[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def initialAttributes = DefaultAttributes.idle override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = @@ -106,7 +106,7 @@ object Timers { } - final class BackpressureTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + final class BackpressureTimeout[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def initialAttributes = DefaultAttributes.backpressureTimeout override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = @@ -208,7 +208,7 @@ object Timers { } - final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () ⇒ O) extends GraphStage[FlowShape[I, O]] { + final class IdleInject[I, O >: I](val timeout: FiniteDuration, val inject: () ⇒ O) extends GraphStage[FlowShape[I, O]] { val in: Inlet[I] = Inlet("IdleInject.in") val out: Outlet[O] = Outlet("IdleInject.out") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 4e2fe7e682..0a08583d84 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -221,7 +221,7 @@ object GraphStages { } } - final class TickSource[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: T) + final class TickSource[T](val initialDelay: FiniteDuration, val interval: FiniteDuration, val tick: T) extends GraphStageWithMaterializedValue[SourceShape[T], Cancellable] { override val shape = SourceShape(Outlet[T]("TickSource.out")) val out = shape.out diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 3103712952..da46440efd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -522,7 +522,7 @@ final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Se /** * INTERNAL API */ -final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) extends GraphStage[FlowShape[T, T]] { +final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends GraphStage[FlowShape[T, T]] { val in = Inlet[T]("LimitWeighted.in") val out = Outlet[T]("LimitWeighted.out") override val shape = FlowShape(in, out) @@ -554,7 +554,7 @@ final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) extends GraphStag /** * INTERNAL API */ -final case class Sliding[T](n: Int, step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { +final case class Sliding[T](val n: Int, val step: Int) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { require(n > 0, "n must be greater than 0") require(step > 0, "step must be greater than 0") @@ -672,7 +672,7 @@ final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extend /** * INTERNAL API */ -final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) +final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed: In ⇒ Out, val aggregate: (Out, In) ⇒ Out) extends GraphStage[FlowShape[In, Out]] { val in = Inlet[In]("Batch.in") @@ -798,7 +798,7 @@ final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, seed: In ⇒ Out /** * INTERNAL API */ -final class Expand[In, Out](extrapolate: In ⇒ Iterator[Out]) extends GraphStage[FlowShape[In, Out]] { +final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends GraphStage[FlowShape[In, Out]] { private val in = Inlet[In]("expand.in") private val out = Outlet[Out]("expand.out") @@ -1127,7 +1127,7 @@ private[stream] object TimerKeys { case object GroupedWithinTimerKey } -final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { +final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] { require(n > 0, "n must be greater than 0") require(d > Duration.Zero) @@ -1203,7 +1203,7 @@ final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowS } } -final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { +final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { private[this] def timerName = "DelayedTimer" override def initialAttributes: Attributes = DefaultAttributes.delay override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { @@ -1289,7 +1289,7 @@ final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends override def toString = "Delay" } -final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { +final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { setHandler(in, new InHandler { @@ -1309,7 +1309,7 @@ final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStag override def toString = "TakeWithin" } -final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { +final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { private var allow = false @@ -1335,7 +1335,7 @@ final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStag /** * INTERNAL API */ -final class Reduce[T](f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { +final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.reduce override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self ⇒ @@ -1379,7 +1379,7 @@ private[stream] object RecoverWith { val InfiniteRetries = -1 } -final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { +final class RecoverWith[T, M](val maximumRetries: Int, val pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] { require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1") override def initialAttributes = DefaultAttributes.recoverWith diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index f238a5eedd..43cc221160 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -24,7 +24,7 @@ import scala.collection.JavaConversions._ /** * INTERNAL API */ -final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] { +final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] { private val in = Inlet[Graph[SourceShape[T], M]]("flatten.in") private val out = Outlet[T]("flatten.out") @@ -103,7 +103,7 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[ /** * INTERNAL API */ -final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])]] { +final class PrefixAndTail[T](val n: Int) extends GraphStage[FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])]] { val in: Inlet[T] = Inlet("PrefixAndTail.in") val out: Outlet[(immutable.Seq[T], Source[T, NotUsed])] = Outlet("PrefixAndTail.out") override val shape: FlowShape[T, (immutable.Seq[T], Source[T, NotUsed])] = FlowShape(in, out) @@ -211,7 +211,7 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. /** * INTERNAL API */ -final class GroupBy[T, K](maxSubstreams: Int, keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { +final class GroupBy[T, K](val maxSubstreams: Int, val keyFor: T ⇒ K) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { val in: Inlet[T] = Inlet("GroupBy.in") val out: Outlet[Source[T, NotUsed]] = Outlet("GroupBy.out") override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) @@ -403,7 +403,7 @@ object Split { /** * INTERNAL API */ -final class Split[T](decision: Split.SplitDecision, p: T ⇒ Boolean, substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { +final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, val substreamCancelStrategy: SubstreamCancelStrategy) extends GraphStage[FlowShape[T, Source[T, NotUsed]]] { val in: Inlet[T] = Inlet("Split.in") val out: Outlet[Source[T, NotUsed]] = Outlet("Split.out") override val shape: FlowShape[T, Source[T, NotUsed]] = FlowShape(in, out) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index a57a6ac437..28d6ab58dd 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -40,7 +40,7 @@ object Merge { * * '''Cancels when''' downstream cancels */ -final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] { +final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] { // one input might seem counter intuitive but saves us from special handling in other places require(inputPorts >= 1, "A Merge must have one or more input ports") @@ -146,7 +146,7 @@ object MergePreferred { * * A `Broadcast` has one `in` port and 2 or more `out` ports. */ -final class MergePreferred[T] private (val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] { +final class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] { require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports") override def initialAttributes = DefaultAttributes.mergePreferred @@ -258,7 +258,7 @@ object Interleave { * '''Cancels when''' downstream cancels * */ -final class Interleave[T] private (val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { +final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] { require(inputPorts > 1, "input ports must be > 1") require(segmentSize > 0, "segmentSize must be > 0") @@ -397,7 +397,7 @@ object Broadcast { * If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel * */ -final class Broadcast[T](private val outputPorts: Int, eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { +final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { // one output might seem counter intuitive but saves us from special handling in other places require(outputPorts >= 1, "A Broadcast must have one or more output ports") val in: Inlet[T] = Inlet[T]("Broadcast.in") @@ -496,7 +496,7 @@ object Partition { * when all downstreams cancel */ -final class Partition[T](outputPorts: Int, partitioner: T ⇒ Int) extends GraphStage[UniformFanOutShape[T, T]] { +final class Partition[T](val outputPorts: Int, val partitioner: T ⇒ Int) extends GraphStage[UniformFanOutShape[T, T]] { val in: Inlet[T] = Inlet[T]("Partition.in") val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i ⇒ Outlet[T]("Partition.out" + i)) @@ -602,7 +602,7 @@ object Balance { * * '''Cancels when''' all downstreams cancel */ -final class Balance[T](val outputPorts: Int, waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { +final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean) extends GraphStage[UniformFanOutShape[T, T]] { // one output might seem counter intuitive but saves us from special handling in other places require(outputPorts >= 1, "A Balance must have one or more output ports") val in: Inlet[T] = Inlet[T]("Balance.in") @@ -870,7 +870,7 @@ object Concat { * * '''Cancels when''' downstream cancels */ -final class Concat[T](inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] { +final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] { require(inputPorts > 1, "A Concat must have more than 1 input ports") val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Concat.in" + i)) val out: Outlet[T] = Outlet[T]("Concat.out")