=htc #19834 replace StreamUtils.identityFinishReporter with Flow.watchTermination

This commit is contained in:
Bernard Leach 2016-04-07 08:26:31 +10:00
parent f24b227586
commit 7d00d4194f
3 changed files with 7 additions and 45 deletions

View file

@ -5,13 +5,12 @@
package akka.http.impl.util
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
import akka.{Done, NotUsed}
import akka.NotUsed
import akka.stream._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule }
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.util.ByteString
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
@ -220,43 +219,6 @@ private[http] object StreamUtils {
throw new IllegalStateException("Value can be only set once.")
}
/**
* Returns a no-op flow that materializes to a future that will be completed when the flow gets a
* completion or error signal. It doesn't necessarily mean, though, that all of a streaming pipeline
* is finished, only that the part that contains this flow has finished work.
*/
def identityFinishReporter[T]: Flow[T, T, Future[Done]] = {
object IdentityFinishReporter extends GraphStageWithMaterializedValue[FlowShape[T, T], Future[Done]] {
val shape = FlowShape(Inlet[T]("identityFinishReporter.in"), Outlet[T]("identityFinishReporter.out"))
override def toString: String = "IdentityFinishReporter"
def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val promise = Promise[Done]()
val stage = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = push(shape.out, grab(shape.in))
override def onPull(): Unit = pull(shape.in)
override def onUpstreamFailure(ex: Throwable): Unit = {
promise.failure(ex)
failStage(ex)
}
override def postStop(): Unit = {
promise.trySuccess(Done)
}
setHandlers(shape.in, shape.out, this)
}
(stage, promise.future)
}
}
Flow[T].viaMat(IdentityFinishReporter)(Keep.right)
}
/**
* Similar to Source.maybe but doesn't rely on materialization. Can only be used once.
*/

View file

@ -110,7 +110,7 @@ class HttpExt(private val config: Config)(implicit val system: ActorSystem) exte
def handleOneConnection(incomingConnection: IncomingConnection): Future[Done] =
try
incomingConnection.flow
.viaMat(StreamUtils.identityFinishReporter)(Keep.right)
.watchTermination()(Keep.right)
.joinMat(handler)(Keep.left)
.run()
catch {

View file

@ -596,7 +596,7 @@ class ResponseRendererSpec extends FreeSpec with Matchers with BeforeAndAfterAll
case _: ResponseRenderingOutput.SwitchToWebSocket throw new IllegalStateException("Didn't expect websocket response")
}
.groupedWithin(1000, 100.millis)
.viaMat(StreamUtils.identityFinishReporter[Seq[ByteString]])(Keep.right)
.watchTermination()(Keep.right)
.toMat(Sink.head)(Keep.both).run()
// we try to find out if the renderer has already flagged completion even without the upstream being completed