Merge pull request #20060 from leachbj/fix-19953-bl

=htc #19953 Complete Promise in captureTermination onDownstreamFinish
This commit is contained in:
Patrik Nordwall 2016-03-31 17:58:53 +02:00
commit 6675f68bf7
2 changed files with 51 additions and 4 deletions

View file

@ -65,14 +65,13 @@ private[http] object StreamUtils {
val promise = Promise[Unit]()
val transformer = new PushStage[T, T] {
def onPush(element: T, ctx: Context[T]) = ctx.push(element)
override def onUpstreamFinish(ctx: Context[T]) = {
promise.success(())
super.onUpstreamFinish(ctx)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]) = {
promise.failure(cause)
ctx.fail(cause)
}
override def postStop(): Unit = {
promise.trySuccess(())
}
}
source.transform(() transformer) -> promise.future
}

View file

@ -0,0 +1,48 @@
/**
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.http.impl.util
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import akka.testkit.AkkaSpec
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Failure
class StreamUtilsSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
"captureTermination" should {
"signal completion" when {
"upstream terminates" in {
val (newSource, whenCompleted) = StreamUtils.captureTermination(Source(List(1, 2, 3)))
newSource.runWith(Sink.ignore)
Await.result(whenCompleted, 3.seconds) shouldBe ()
}
"upstream fails" in {
val ex = new RuntimeException("ex")
val (newSource, whenCompleted) = StreamUtils.captureTermination(Source.failed[Int](ex))
intercept[RuntimeException] {
Await.result(newSource.runWith(Sink.head), 3.second)
} should be theSameInstanceAs ex
Await.ready(whenCompleted, 3.seconds).value shouldBe Some(Failure(ex))
}
"downstream cancels" in {
val (newSource, whenCompleted) = StreamUtils.captureTermination(Source(List(1, 2, 3)))
newSource.runWith(Sink.head)
Await.result(whenCompleted, 3.seconds) shouldBe ()
}
}
}
}