diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 80db8fd130..6fe3ca1ba5 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -4,18 +4,17 @@ package akka.http.impl.util -import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} -import akka.{Done, NotUsed} +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } +import akka.NotUsed import akka.stream._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.impl.{PublisherSink, SinkModule, SourceModule} +import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule } import akka.stream.scaladsl._ import akka.stream.stage._ -import akka.stream.stage.GraphStageWithMaterializedValue import akka.util.ByteString -import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription} +import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent.{ ExecutionContext, Future, Promise } /** * INTERNAL API @@ -220,43 +219,6 @@ private[http] object StreamUtils { throw new IllegalStateException("Value can be only set once.") } - /** - * Returns a no-op flow that materializes to a future that will be completed when the flow gets a - * completion or error signal. It doesn't necessarily mean, though, that all of a streaming pipeline - * is finished, only that the part that contains this flow has finished work. - */ - def identityFinishReporter[T]: Flow[T, T, Future[Done]] = { - object IdentityFinishReporter extends GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] { - val shape = FlowShape(Inlet[T]("identityFinishReporter.in"), Outlet[T]("identityFinishReporter.out")) - override def toString: String = "IdentityFinishReporter" - - def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { - val promise = Promise[Done]() - - val stage = new GraphStageLogic(shape) with InHandler with OutHandler { - override def onPush(): Unit = push(shape.out, grab(shape.in)) - - override def onPull(): Unit = pull(shape.in) - - override def onUpstreamFailure(ex: Throwable): Unit = { - promise.failure(ex) - failStage(ex) - } - - override def postStop(): Unit = { - promise.trySuccess(Done) - } - - setHandlers(shape.in, shape.out, this) - } - - (stage, promise.future) - } - } - - Flow[T].viaMat(IdentityFinishReporter)(Keep.right) - } - /** * Similar to Source.maybe but doesn't rely on materialization. Can only be used once. */ diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala index 9830b550b8..102cd07612 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/Http.scala @@ -110,7 +110,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] = try incomingConnection.flow - .viaMat(StreamUtils.identityFinishReporter)(Keep.right) + .watchTermination()(Keep.right) .joinMat(handler)(Keep.left) .run() catch { diff --git a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala index 5a53d09a01..b416f2d6b3 100644 --- a/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/impl/engine/rendering/ResponseRendererSpec.scala @@ -596,7 +596,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll case _: ResponseRenderingOutput.SwitchToWebSocket ⇒ throw new IllegalStateException("Didn't expect websocket response") } .groupedWithin(1000, 100.millis) - .viaMat(StreamUtils.identityFinishReporter[Seq[ByteString]])(Keep.right) + .watchTermination()(Keep.right) .toMat(Sink.head)(Keep.both).run() // we try to find out if the renderer has already flagged completion even without the upstream being completed