use SimpleLinearGraphStage to reduce the boilerplate code #21830

This commit is contained in:
Hawstein 2016-11-18 17:13:15 +08:00 committed by Johan Andrén
parent e80d7d7a71
commit c373cef20f
4 changed files with 9 additions and 39 deletions

View file

@ -577,9 +577,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
} }
private[akka] final case class Doubler[T]() extends GraphStage[FlowShape[T, T]] { private[akka] final case class Doubler[T]() extends SimpleLinearGraphStage[T] {
val out: Outlet[T] = Outlet("Doubler.out")
val in: Inlet[T] = Inlet("Doubler.in")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler { new GraphStageLogic(shape) with InHandler with OutHandler {
@ -608,12 +606,9 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
setHandlers(in, out, this) setHandlers(in, out, this)
} }
override val shape: FlowShape[T, T] = FlowShape(in, out)
} }
private[akka] final case class KeepGoing[T]() extends GraphStage[FlowShape[T, T]] { private[akka] final case class KeepGoing[T]() extends SimpleLinearGraphStage[T] {
val in = Inlet[T]("KeepGoing.in")
val out = Outlet[T]("KeepGoing.out")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler { new GraphStageLogic(shape) with InHandler with OutHandler {
@ -638,7 +633,6 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit {
setHandlers(in, out, this) setHandlers(in, out, this)
} }
override val shape: FlowShape[T, T] = FlowShape(in, out)
} }
private[akka] class PushFinishStage(onPostStop: () Unit = () ()) extends SimpleLinearGraphStage[Any] { private[akka] class PushFinishStage(onPostStop: () Unit = () ()) extends SimpleLinearGraphStage[Any] {

View file

@ -176,11 +176,7 @@ object Timers {
} }
final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] { final class DelayInitial[T](val delay: FiniteDuration) extends SimpleLinearGraphStage[T] {
val in: Inlet[T] = Inlet("IdleInject.in")
val out: Outlet[T] = Outlet("IdleInject.out")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override def initialAttributes = DefaultAttributes.delayInitial override def initialAttributes = DefaultAttributes.delayInitial
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =

View file

@ -75,11 +75,8 @@ object GraphStages {
/** /**
* INTERNAL API * INTERNAL API
*/ */
final class Detacher[T] extends GraphStage[FlowShape[T, T]] { final class Detacher[T] extends SimpleLinearGraphStage[T] {
val in = Inlet[T]("Detacher.in")
val out = Outlet[T]("Detacher.out")
override def initialAttributes = DefaultAttributes.detacher override def initialAttributes = DefaultAttributes.detacher
override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {

View file

@ -129,11 +129,7 @@ final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) exte
/** /**
* INTERNAL API * INTERNAL API
*/ */
final case class DropWhile[T](p: T Boolean) extends GraphStage[FlowShape[T, T]] { final case class DropWhile[T](p: T Boolean) extends SimpleLinearGraphStage[T] {
val in = Inlet[T]("DropWhile.in")
val out = Outlet[T]("DropWhile.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.dropWhile override def initialAttributes: Attributes = DefaultAttributes.dropWhile
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {
@ -232,12 +228,8 @@ final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphSta
/** /**
* INTERNAL API * INTERNAL API
*/ */
final case class Recover[T](pf: PartialFunction[Throwable, T]) extends GraphStage[FlowShape[T, T]] { final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] {
val in = Inlet[T]("Recover.in") override protected def initialAttributes: Attributes = DefaultAttributes.recover
val out = Outlet[T]("Recover.out")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override protected val initialAttributes: Attributes = DefaultAttributes.recover
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
@ -649,16 +641,11 @@ final class FoldAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends
/** /**
* INTERNAL API * INTERNAL API
*/ */
final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] { final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends SimpleLinearGraphStage[T] {
ReactiveStreamsCompliance.requireNonNullElement(inject) ReactiveStreamsCompliance.requireNonNullElement(inject)
if (start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get) if (start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get)
if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get) if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get)
private val in = Inlet[T]("in")
private val out = Outlet[T]("out")
override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
val startInHandler = new InHandler { val startInHandler = new InHandler {
override def onPush(): Unit = { override def onPush(): Unit = {
@ -747,11 +734,7 @@ final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Se
/** /**
* INTERNAL API * INTERNAL API
*/ */
final case class LimitWeighted[T](val n: Long, val costFn: T Long) extends GraphStage[FlowShape[T, T]] { final case class LimitWeighted[T](val n: Long, val costFn: T Long) extends SimpleLinearGraphStage[T] {
val in = Inlet[T]("LimitWeighted.in")
val out = Outlet[T]("LimitWeighted.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = DefaultAttributes.limitWeighted override def initialAttributes: Attributes = DefaultAttributes.limitWeighted
def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {