From 89a6187efcda124fa5ecb1dad9d1a33553a2029c Mon Sep 17 00:00:00 2001 From: Bernard Leach Date: Thu, 17 Mar 2016 15:00:53 +1100 Subject: [PATCH] =htc #19953 Complete Promise in captureTermination onDownstreamFinish PoolSlot uses StreamUtils.captureTermination to determine when the associated request is completed, when a request from the pool was subsequently used as a response by a server Flow the rendering process would cancel the subscription to the Source but that would not complete the Promise in StreamUtils.captureTerminate and so the SlotEvent.RequestCompleted would not be generated. --- .../akka/http/impl/util/StreamUtils.scala | 7 ++- .../akka/http/impl/util/StreamUtilsSpec.scala | 48 +++++++++++++++++++ 2 files changed, 51 insertions(+), 4 deletions(-) create mode 100644 akka-http-core/src/test/scala/akka/http/impl/util/StreamUtilsSpec.scala diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 0379ea38d9..f27570c71a 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -65,14 +65,13 @@ private[http] object StreamUtils { val promise = Promise[Unit]() val transformer = new PushStage[T, T] { def onPush(element: T, ctx: Context[T]) = ctx.push(element) - override def onUpstreamFinish(ctx: Context[T]) = { - promise.success(()) - super.onUpstreamFinish(ctx) - } override def onUpstreamFailure(cause: Throwable, ctx: Context[T]) = { promise.failure(cause) ctx.fail(cause) } + override def postStop(): Unit = { + promise.trySuccess(()) + } } source.transform(() ⇒ transformer) -> promise.future } diff --git a/akka-http-core/src/test/scala/akka/http/impl/util/StreamUtilsSpec.scala b/akka-http-core/src/test/scala/akka/http/impl/util/StreamUtilsSpec.scala new file mode 100644 index 0000000000..1293372687 --- /dev/null +++ b/akka-http-core/src/test/scala/akka/http/impl/util/StreamUtilsSpec.scala @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2009-2016 Lightbend Inc. + */ +package akka.http.impl.util + +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Sink, Source} +import akka.testkit.AkkaSpec + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Failure + +class StreamUtilsSpec extends AkkaSpec { + implicit val materializer = ActorMaterializer() + + "captureTermination" should { + "signal completion" when { + "upstream terminates" in { + val (newSource, whenCompleted) = StreamUtils.captureTermination(Source(List(1, 2, 3))) + + newSource.runWith(Sink.ignore) + + Await.result(whenCompleted, 3.seconds) shouldBe () + } + + "upstream fails" in { + val ex = new RuntimeException("ex") + val (newSource, whenCompleted) = StreamUtils.captureTermination(Source.failed[Int](ex)) + intercept[RuntimeException] { + Await.result(newSource.runWith(Sink.head), 3.second) + } should be theSameInstanceAs ex + + Await.ready(whenCompleted, 3.seconds).value shouldBe Some(Failure(ex)) + } + + + "downstream cancels" in { + val (newSource, whenCompleted) = StreamUtils.captureTermination(Source(List(1, 2, 3))) + + newSource.runWith(Sink.head) + + Await.result(whenCompleted, 3.seconds) shouldBe () + } + } + } + +}