Fixes #20543 GraphStage subtypes should not be private to akka

This commit is contained in:
Kam Kasravi 2016-05-03 18:58:26 -07:00
parent 91eb27947b
commit 1d692daaad
33 changed files with 152 additions and 136 deletions

View file

@ -25,7 +25,7 @@ import akka.stream.impl.Stages.DefaultAttributes
/**
* INTERNAL API
*/
private[akka] final case class Map[In, Out](f: In Out, decider: Supervision.Decider) extends PushStage[In, Out] {
final case class Map[In, Out](f: In Out, decider: Supervision.Decider) extends PushStage[In, Out] {
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = ctx.push(f(elem))
override def decide(t: Throwable): Supervision.Directive = decider(t)
@ -34,7 +34,7 @@ private[akka] final case class Map[In, Out](f: In ⇒ Out, decider: Supervision.
/**
* INTERNAL API
*/
private[akka] final case class Filter[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
final case class Filter[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.filter
override def toString: String = "Filter"
@ -68,7 +68,7 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearG
/**
* INTERNAL API
*/
private[akka] final case class TakeWhile[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
final case class TakeWhile[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.takeWhile
override def toString: String = "TakeWhile"
@ -104,7 +104,7 @@ private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLine
/**
* INTERNAL API
*/
private[stream] final case class DropWhile[T](p: T Boolean) extends GraphStage[FlowShape[T, T]] {
final case class DropWhile[T](p: T Boolean) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("DropWhile.in")
val out = Outlet[T]("DropWhile.out")
override val shape = FlowShape(in, out)
@ -136,7 +136,7 @@ private[stream] final case class DropWhile[T](p: T ⇒ Boolean) extends GraphSta
/**
* INTERNAL API
*/
abstract private[stream] class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) {
abstract class SupervisedGraphStageLogic(inheritedAttributes: Attributes, shape: Shape) extends GraphStageLogic(shape) {
private lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
def withSupervision[T](f: () T): Option[T] =
try { Some(f()) } catch {
@ -164,7 +164,7 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
private[stream] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Collect.in")
val out = Outlet[Out]("Collect.out")
override val shape = FlowShape(in, out)
@ -192,7 +192,7 @@ private[stream] final case class Collect[In, Out](pf: PartialFunction[In, Out])
/**
* INTERNAL API
*/
private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) extends GraphStage[FlowShape[T, T]] {
final case class Recover[T](pf: PartialFunction[Throwable, T]) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("Recover.in")
val out = Outlet[T]("Recover.out")
override val shape: FlowShape[T, T] = FlowShape(in, out)
@ -239,7 +239,7 @@ private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) ext
/**
* INTERNAL API
*/
private[akka] final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] {
final case class Take[T](count: Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.take
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -268,7 +268,7 @@ private[akka] final case class Take[T](count: Long) extends SimpleLinearGraphSta
/**
* INTERNAL API
*/
private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
final case class Drop[T](count: Long) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.drop
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -292,7 +292,7 @@ private[akka] final case class Drop[T](count: Long) extends SimpleLinearGraphSta
/**
* INTERNAL API
*/
private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends GraphStage[FlowShape[In, Out]] {
final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends GraphStage[FlowShape[In, Out]] {
override val shape = FlowShape[In, Out](Inlet("Scan.in"), Outlet("Scan.out"))
override def initialAttributes: Attributes = DefaultAttributes.scan
@ -338,7 +338,7 @@ private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) ex
/**
* INTERNAL API
*/
private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out, decider: Supervision.Decider) extends PushPullStage[In, Out] {
final case class Fold[In, Out](zero: Out, f: (Out, In) Out, decider: Supervision.Decider) extends PushPullStage[In, Out] {
private[this] var aggregator: Out = zero
override def onPush(elem: In, ctx: Context[Out]): SyncDirective = {
@ -406,7 +406,7 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext
/**
* INTERNAL API
*/
private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immutable.Seq[T]] {
final case class Grouped[T](n: Int) extends PushPullStage[T, immutable.Seq[T]] {
private val buf = {
val b = Vector.newBuilder[T]
b.sizeHint(n)
@ -441,7 +441,7 @@ private[akka] final case class Grouped[T](n: Int) extends PushPullStage[T, immut
/**
* INTERNAL API
*/
private[stream] final case class LimitWeighted[T](n: Long, costFn: T Long) extends GraphStage[FlowShape[T, T]] {
final case class LimitWeighted[T](n: Long, costFn: T Long) extends GraphStage[FlowShape[T, T]] {
val in = Inlet[T]("LimitWeighted.in")
val out = Outlet[T]("LimitWeighted.out")
override val shape = FlowShape(in, out)
@ -473,7 +473,7 @@ private[stream] final case class LimitWeighted[T](n: Long, costFn: T ⇒ Long) e
/**
* INTERNAL API
*/
private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] {
final case class Sliding[T](n: Int, step: Int) extends PushPullStage[T, immutable.Seq[T]] {
private var buf = Vector.empty[T]
override def onPush(elem: T, ctx: Context[immutable.Seq[T]]): SyncDirective = {
@ -506,7 +506,7 @@ private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullSta
/**
* INTERNAL API
*/
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
private var buffer: BufferImpl[T] = _
@ -566,7 +566,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
/**
* INTERNAL API
*/
private[akka] final case class Batch[In, Out](max: Long, costFn: In Long, seed: In Out, aggregate: (Out, In) Out)
final case class Batch[In, Out](max: Long, costFn: In Long, seed: In Out, aggregate: (Out, In) Out)
extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("Batch.in")
@ -692,7 +692,7 @@ private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, se
/**
* INTERNAL API
*/
private[akka] final class Expand[In, Out](extrapolate: In Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
final class Expand[In, Out](extrapolate: In Iterator[Out]) extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("expand.in")
private val out = Outlet[Out]("expand.out")
@ -761,7 +761,7 @@ private[akka] object MapAsync {
/**
* INTERNAL API
*/
private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In Future[Out])
final case class MapAsync[In, Out](parallelism: Int, f: In Future[Out])
extends GraphStage[FlowShape[In, Out]] {
import MapAsync._
@ -832,7 +832,7 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
/**
* INTERNAL API
*/
private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In Future[Out])
final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In Future[Out])
extends GraphStage[FlowShape[In, Out]] {
private val in = Inlet[In]("MapAsyncUnordered.in")
@ -904,7 +904,7 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
/**
* INTERNAL API
*/
private[akka] final case class Log[T](
final case class Log[T](
name: String,
extract: T Any,
logAdapter: Option[LoggingAdapter]) extends SimpleLinearGraphStage[T] {
@ -1021,7 +1021,7 @@ private[stream] object TimerKeys {
case object GroupedWithinTimerKey
}
private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
final class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphStage[FlowShape[T, immutable.Seq[T]]] {
require(n > 0, "n must be greater than 0")
require(d > Duration.Zero)
@ -1097,7 +1097,7 @@ private[stream] final class GroupedWithin[T](n: Int, d: FiniteDuration) extends
}
}
private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
final class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer"
override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
@ -1183,7 +1183,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
override def toString = "Delay"
}
private[stream] final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
final class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
setHandler(in, new InHandler {
@ -1203,7 +1203,7 @@ private[stream] final class TakeWithin[T](timeout: FiniteDuration) extends Simpl
override def toString = "TakeWithin"
}
private[stream] final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
final class DropWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private var allow = false
@ -1229,7 +1229,7 @@ private[stream] final class DropWithin[T](timeout: FiniteDuration) extends Simpl
/**
* INTERNAL API
*/
private[stream] final class Reduce[T](f: (T, T) T) extends SimpleLinearGraphStage[T] {
final class Reduce[T](f: (T, T) T) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.reduce
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { self
@ -1273,7 +1273,7 @@ private[stream] object RecoverWith {
val InfiniteRetries = -1
}
private[stream] final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], M]]) extends SimpleLinearGraphStage[T] {
require(maximumRetries >= -1, "number of retries must be non-negative or equal to -1")
override def initialAttributes = DefaultAttributes.recoverWith
@ -1331,7 +1331,7 @@ private[stream] final class RecoverWith[T, M](maximumRetries: Int, pf: PartialFu
/**
* INTERNAL API
*/
private[stream] final class StatefulMapConcat[In, Out](f: () In immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] {
final class StatefulMapConcat[In, Out](val f: () In immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("StatefulMapConcat.in")
val out = Outlet[Out]("StatefulMapConcat.out")
override val shape = FlowShape(in, out)