=htc #19834 convert StreamUtils.identityFinishReporter to GraphStage

This commit is contained in:
Bernard Leach 2016-04-06 10:23:07 +10:00
parent 0a1431e5d3
commit 7357571c82

View file

@ -4,16 +4,18 @@
package akka.http.impl.util package akka.http.impl.util
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import akka.NotUsed import akka.NotUsed
import akka.stream._ import akka.stream._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ PublisherSink, SinkModule, SourceModule } import akka.stream.impl.{PublisherSink, SinkModule, SourceModule}
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.stage._ import akka.stream.stage._
import akka.stream.stage.GraphStageWithMaterializedValue
import akka.util.ByteString import akka.util.ByteString
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription}
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.{ExecutionContext, Future, Promise}
/** /**
* INTERNAL API * INTERNAL API
@ -224,38 +226,34 @@ private[http] object StreamUtils {
* is finished, only that the part that contains this flow has finished work. * is finished, only that the part that contains this flow has finished work.
*/ */
def identityFinishReporter[T]: Flow[T, T, Future[Unit]] = { def identityFinishReporter[T]: Flow[T, T, Future[Unit]] = {
// copy from Sink.foreach Flow[T].viaMat(new GraphStageWithMaterializedValue[FlowShape[T, T], Future[Unit]] {
def newForeachStage(): (PushStage[T, T], Future[Unit]) = { val shape = FlowShape(Inlet[T]("identityFinishReporter.in"), Outlet[T]("identityFinishReporter.out"))
override def toString: String = "UniqueKillSwitchFlow"
def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Unit]) = {
val promise = Promise[Unit]() val promise = Promise[Unit]()
val stage = new PushStage[T, T] { val stage = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(elem: T, ctx: Context[T]): SyncDirective = ctx.push(elem) override def onPush(): Unit = push(shape.out, grab(shape.in))
override def onUpstreamFailure(cause: Throwable, ctx: Context[T]): TerminationDirective = { override def onPull(): Unit = pull(shape.in)
promise.failure(cause)
ctx.fail(cause) override def onUpstreamFailure(ex: Throwable): Unit = {
promise.failure(ex)
failStage(ex)
} }
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { override def postStop(): Unit = {
promise.success(()) promise.trySuccess(())
ctx.finish()
} }
override def onDownstreamFinish(ctx: Context[T]): TerminationDirective = { setHandler(shape.in, this)
promise.success(()) setHandler(shape.out, this)
ctx.finish()
}
override def decide(cause: Throwable): Supervision.Directive = {
// supervision will be implemented by #16916
promise.tryFailure(cause)
super.decide(cause)
}
} }
(stage, promise.future) (stage, promise.future)
} }
Flow[T].transformMaterializing(newForeachStage) })(Keep.right)
} }
/** /**