diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 61d982801d..19d99cae1b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -577,9 +577,7 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { } - private[akka] final case class Doubler[T]() extends GraphStage[FlowShape[T, T]] { - val out: Outlet[T] = Outlet("Doubler.out") - val in: Inlet[T] = Inlet("Doubler.in") + private[akka] final case class Doubler[T]() extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -608,12 +606,9 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { setHandlers(in, out, this) } - override val shape: FlowShape[T, T] = FlowShape(in, out) } - private[akka] final case class KeepGoing[T]() extends GraphStage[FlowShape[T, T]] { - val in = Inlet[T]("KeepGoing.in") - val out = Outlet[T]("KeepGoing.out") + private[akka] final case class KeepGoing[T]() extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -638,7 +633,6 @@ class InterpreterSpec extends StreamSpec with GraphInterpreterSpecKit { setHandlers(in, out, this) } - override val shape: FlowShape[T, T] = FlowShape(in, out) } private[akka] class PushFinishStage(onPostStop: () ⇒ Unit = () ⇒ ()) extends SimpleLinearGraphStage[Any] { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index 5706fc0e25..ff1064b766 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -176,11 +176,7 @@ object Timers { } - final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] { - val in: Inlet[T] = Inlet("IdleInject.in") - val out: Outlet[T] = Outlet("IdleInject.out") - override val shape: FlowShape[T, T] = FlowShape(in, out) - + final class DelayInitial[T](val delay: FiniteDuration) extends SimpleLinearGraphStage[T] { override def initialAttributes = DefaultAttributes.delayInitial override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 0b79c4c25b..9bfd2bfe77 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -75,11 +75,8 @@ object GraphStages { /** * INTERNAL API */ - final class Detacher[T] extends GraphStage[FlowShape[T, T]] { - val in = Inlet[T]("Detacher.in") - val out = Outlet[T]("Detacher.out") + final class Detacher[T] extends SimpleLinearGraphStage[T] { override def initialAttributes = DefaultAttributes.detacher - override val shape = FlowShape(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index fdb668e365..d2b1f70fb5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -129,11 +129,7 @@ final case class TakeWhile[T](p: T ⇒ Boolean, inclusive: Boolean = false) exte /** * INTERNAL API */ -final case class DropWhile[T](p: T ⇒ Boolean) extends GraphStage[FlowShape[T, T]] { - val in = Inlet[T]("DropWhile.in") - val out = Outlet[T]("DropWhile.out") - override val shape = FlowShape(in, out) - +final case class DropWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.dropWhile def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler { @@ -232,12 +228,8 @@ final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends GraphSta /** * INTERNAL API */ -final case class Recover[T](pf: PartialFunction[Throwable, T]) extends GraphStage[FlowShape[T, T]] { - val in = Inlet[T]("Recover.in") - val out = Outlet[T]("Recover.out") - override val shape: FlowShape[T, T] = FlowShape(in, out) - - override protected val initialAttributes: Attributes = DefaultAttributes.recover +final case class Recover[T](pf: PartialFunction[Throwable, T]) extends SimpleLinearGraphStage[T] { + override protected def initialAttributes: Attributes = DefaultAttributes.recover override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { @@ -649,16 +641,11 @@ final class FoldAsync[In, Out](zero: Out, f: (Out, In) ⇒ Future[Out]) extends /** * INTERNAL API */ -final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends GraphStage[FlowShape[T, T]] { +final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends SimpleLinearGraphStage[T] { ReactiveStreamsCompliance.requireNonNullElement(inject) if (start.isDefined) ReactiveStreamsCompliance.requireNonNullElement(start.get) if (end.isDefined) ReactiveStreamsCompliance.requireNonNullElement(end.get) - private val in = Inlet[T]("in") - private val out = Outlet[T]("out") - - override val shape = FlowShape(in, out) - override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { val startInHandler = new InHandler { override def onPush(): Unit = { @@ -747,11 +734,7 @@ final case class Grouped[T](n: Int) extends GraphStage[FlowShape[T, immutable.Se /** * INTERNAL API */ -final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends GraphStage[FlowShape[T, T]] { - val in = Inlet[T]("LimitWeighted.in") - val out = Outlet[T]("LimitWeighted.out") - override val shape = FlowShape(in, out) - +final case class LimitWeighted[T](val n: Long, val costFn: T ⇒ Long) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.limitWeighted def createLogic(inheritedAttributes: Attributes) = new SupervisedGraphStageLogic(inheritedAttributes, shape) with InHandler with OutHandler {