diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 07c80604ef..c0ea046eda 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -4,16 +4,18 @@ package akka.http.impl.util -import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} import akka.NotUsed import akka.stream._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule } +import akka.stream.impl.{PublisherSink, SinkModule, SourceModule} import akka.stream.scaladsl._ import akka.stream.stage._ +import akka.stream.stage.GraphStageWithMaterializedValue import akka.util.ByteString -import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } -import scala.concurrent.{ ExecutionContext, Future, Promise } +import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription} + +import scala.concurrent.{ExecutionContext, Future, Promise} /** * INTERNAL API @@ -224,38 +226,34 @@ private[http] object StreamUtils { * is finished, only that the part that contains this flow has finished work. */ def identityFinishReporter[T]: Flow[T, T, Future[Unit]] = { - // copy from Sink.foreach - def newForeachStage(): (PushStage[T, T], Future[Unit]) = { - val promise = Promise[Unit]() + Flow[T].viaMat(new GraphStageWithMaterializedValue[FlowShape[T, T], Future[Unit]] { + val shape = FlowShape(Inlet[T]("identityFinishReporter.in"), Outlet[T]("identityFinishReporter.out")) + override def toString: String = "UniqueKillSwitchFlow" - val stage = new PushStage[T, T] { - override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.push(elem) + def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Unit]) = { + val promise = Promise[Unit]() - override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { - promise.failure(cause) - ctx.fail(cause) + val stage = new GraphStageLogic(shape) with InHandler with OutHandler { + override def onPush(): Unit = push(shape.out, grab(shape.in)) + + override def onPull(): Unit = pull(shape.in) + + override def onUpstreamFailure(ex: Throwable): Unit = { + promise.failure(ex) + failStage(ex) + } + + override def postStop(): Unit = { + promise.trySuccess(()) + } + + setHandler(shape.in, this) + setHandler(shape.out, this) } - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { - promise.success(()) - ctx.finish() - } - - override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { - promise.success(()) - ctx.finish() - } - - override def decide(cause: Throwable): Supervision.Directive = { - // supervision will be implemented by #16916 - promise.tryFailure(cause) - super.decide(cause) - } + (stage, promise.future) } - - (stage, promise.future) - } - Flow[T].transformMaterializing(newForeachStage) + })(Keep.right) } /**