From 1f8225087f2ebdbce30595e901ca6a4b27ca90c9 Mon Sep 17 00:00:00 2001 From: Priyanka Chordia Date: Thu, 5 May 2016 02:49:02 -0700 Subject: [PATCH] Convert Completed from PushPullStage to GraphStage (#20454) --- .../scala/akka/stream/impl/fusing/Ops.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index c800fac4af..c6ee369d65 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -568,10 +568,23 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt /** * INTERNAL API */ -private[akka] final case class Completed[T]() extends PushPullStage[T, T] { - override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.finish() +private[akka] final case class Completed[T]() extends GraphStage[FlowShape[T, T]] { + + val out: Outlet[T] = Outlet("Completed.out") + val in: Inlet[T] = Inlet("Completed.in") + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + + override def onPush(): Unit = { completeStage() } + + override def onPull(): Unit = { completeStage() } + + setHandlers(in, out, this) + + } + + val shape: FlowShape[T, T] = FlowShape.of(in, out) - override def onPull(ctx: Context[T]): SyncDirective = ctx.finish() } /**