=htc #19953 Complete Promise in captureTermination onDownstreamFinish
PoolSlot uses StreamUtils.captureTermination to determine when the associated request is completed, when a request from the pool was subsequently used as a response by a server Flow the rendering process would cancel the subscription to the Source but that would not complete the Promise in StreamUtils.captureTerminate and so the SlotEvent.RequestCompleted would not be generated.
This commit is contained in:
parent
ca61a4eec4
commit
89a6187efc
2 changed files with 51 additions and 4 deletions
|
|
@ -65,14 +65,13 @@ private[http] object StreamUtils {
|
||||||
val promise = Promise[Unit]()
|
val promise = Promise[Unit]()
|
||||||
val transformer = new PushStage[T, T] {
|
val transformer = new PushStage[T, T] {
|
||||||
def onPush(element: T, ctx: Context[T]) = ctx.push(element)
|
def onPush(element: T, ctx: Context[T]) = ctx.push(element)
|
||||||
override def onUpstreamFinish(ctx: Context[T]) = {
|
|
||||||
promise.success(())
|
|
||||||
super.onUpstreamFinish(ctx)
|
|
||||||
}
|
|
||||||
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]) = {
|
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]) = {
|
||||||
promise.failure(cause)
|
promise.failure(cause)
|
||||||
ctx.fail(cause)
|
ctx.fail(cause)
|
||||||
}
|
}
|
||||||
|
override def postStop(): Unit = {
|
||||||
|
promise.trySuccess(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
source.transform(() ⇒ transformer) -> promise.future
|
source.transform(() ⇒ transformer) -> promise.future
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,48 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.http.impl.util
|
||||||
|
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.stream.scaladsl.{Sink, Source}
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import scala.util.Failure
|
||||||
|
|
||||||
|
class StreamUtilsSpec extends AkkaSpec {
|
||||||
|
implicit val materializer = ActorMaterializer()
|
||||||
|
|
||||||
|
"captureTermination" should {
|
||||||
|
"signal completion" when {
|
||||||
|
"upstream terminates" in {
|
||||||
|
val (newSource, whenCompleted) = StreamUtils.captureTermination(Source(List(1, 2, 3)))
|
||||||
|
|
||||||
|
newSource.runWith(Sink.ignore)
|
||||||
|
|
||||||
|
Await.result(whenCompleted, 3.seconds) shouldBe ()
|
||||||
|
}
|
||||||
|
|
||||||
|
"upstream fails" in {
|
||||||
|
val ex = new RuntimeException("ex")
|
||||||
|
val (newSource, whenCompleted) = StreamUtils.captureTermination(Source.failed[Int](ex))
|
||||||
|
intercept[RuntimeException] {
|
||||||
|
Await.result(newSource.runWith(Sink.head), 3.second)
|
||||||
|
} should be theSameInstanceAs ex
|
||||||
|
|
||||||
|
Await.ready(whenCompleted, 3.seconds).value shouldBe Some(Failure(ex))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
"downstream cancels" in {
|
||||||
|
val (newSource, whenCompleted) = StreamUtils.captureTermination(Source(List(1, 2, 3)))
|
||||||
|
|
||||||
|
newSource.runWith(Sink.head)
|
||||||
|
|
||||||
|
Await.result(whenCompleted, 3.seconds) shouldBe ()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue