=htc #19834 Replaced PushStage based captureTermination with GraphStage

* Move promise completion success into postStop to simplify impl
This commit is contained in:
Bernard Leach 2016-04-04 21:37:09 +10:00 committed by Konrad Malawski
parent 598799d5ae
commit 4a8018f63f

View file

@ -63,17 +63,25 @@ private[http] object StreamUtils {
def captureTermination[T, Mat](source: Source[T, Mat]): (Source[T, Mat], Future[Unit]) = {
val promise = Promise[Unit]()
val transformer = new PushStage[T, T] {
def onPush(element: T, ctx: Context[T]) = ctx.push(element)
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]) = {
promise.failure(cause)
ctx.fail(cause)
}
override def postStop(): Unit = {
promise.trySuccess(())
val transformer = new SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = pull(in)
override def onUpstreamFailure(ex: Throwable): Unit = {
promise.failure(ex)
failStage(ex)
}
override def postStop(): Unit = {
promise.trySuccess(())
}
setHandlers(in, out, this)
}
}
source.transform(() transformer) -> promise.future
source.via(transformer) -> promise.future
}
def sliceBytesTransformer(start: Long, length: Long): Flow[ByteString, ByteString, NotUsed] = {