diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index c800fac4af..c6ee369d65 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -568,10 +568,23 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt /** * INTERNAL API */ -private[akka] final case class Completed[T]() extends PushPullStage[T, T] { - override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.finish() +private[akka] final case class Completed[T]() extends GraphStage[FlowShape[T, T]] { + + val out: Outlet[T] = Outlet("Completed.out") + val in: Inlet[T] = Inlet("Completed.in") + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + + override def onPush(): Unit = { completeStage() } + + override def onPull(): Unit = { completeStage() } + + setHandlers(in, out, this) + + } + + val shape: FlowShape[T, T] = FlowShape.of(in, out) - override def onPull(ctx: Context[T]): SyncDirective = ctx.finish() } /**