=str Refactoring to avoid distinct handler instantiation for stage logics (#21309)

* Refactoring to avoid distinct handler instantiation for stage logics

* setHandlers
This commit is contained in:
Cédric Chantepie 2016-08-29 14:00:48 +02:00 committed by Konrad Malawski
parent fb45dd03f3
commit 324a40ba97
12 changed files with 458 additions and 456 deletions

View file

@ -24,7 +24,7 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa
override val shape: SinkShape[In] = SinkShape(in) override val shape: SinkShape[In] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { new GraphStageLogic(shape) with InHandler {
implicit def self: ActorRef = stageActor.ref implicit def self: ActorRef = stageActor.ref
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
@ -67,24 +67,26 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa
completeStage() completeStage()
} }
setHandler(in, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = { buffer offer grab(in)
buffer offer grab(in) if (acknowledgementReceived) {
if (acknowledgementReceived) { dequeueAndSend()
dequeueAndSend() acknowledgementReceived = false
acknowledgementReceived = false
}
if (buffer.size() < maxBuffer) pull(in)
} }
override def onUpstreamFinish(): Unit = { if (buffer.size() < maxBuffer) pull(in)
if (buffer.isEmpty) finish() }
else completeReceived = true
} override def onUpstreamFinish(): Unit = {
override def onUpstreamFailure(ex: Throwable): Unit = { if (buffer.isEmpty) finish()
ref ! onFailureMessage(ex) else completeReceived = true
failStage(ex) }
}
}) override def onUpstreamFailure(ex: Throwable): Unit = {
ref ! onFailureMessage(ex)
failStage(ex)
}
setHandler(in, this)
} }
override def toString = "ActorRefBackpressureSink" override def toString = "ActorRefBackpressureSink"

View file

@ -205,29 +205,30 @@ final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[Option[T]] = Promise() val p: Promise[Option[T]] = Promise()
(new GraphStageLogic(shape) { (new GraphStageLogic(shape) with InHandler {
private[this] var prev: T = null.asInstanceOf[T]
override def preStart(): Unit = pull(in) override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
private[this] var prev: T = null.asInstanceOf[T]
override def onPush(): Unit = { def onPush(): Unit = {
prev = grab(in) prev = grab(in)
pull(in) pull(in)
} }
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {
val head = prev val head = prev
prev = null.asInstanceOf[T] prev = null.asInstanceOf[T]
p.trySuccess(Option(head)) p.trySuccess(Option(head))
completeStage() completeStage()
} }
override def onUpstreamFailure(ex: Throwable): Unit = { override def onUpstreamFailure(ex: Throwable): Unit = {
prev = null.asInstanceOf[T] prev = null.asInstanceOf[T]
p.tryFailure(ex) p.tryFailure(ex)
failStage(ex) failStage(ex)
} }
})
setHandler(in, this)
}, p.future) }, p.future)
} }
@ -242,24 +243,25 @@ final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[Option[T]] = Promise() val p: Promise[Option[T]] = Promise()
(new GraphStageLogic(shape) { (new GraphStageLogic(shape) with InHandler {
override def preStart(): Unit = pull(in) override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
p.trySuccess(Option(grab(in)))
completeStage()
}
override def onUpstreamFinish(): Unit = { def onPush(): Unit = {
p.trySuccess(None) p.trySuccess(Option(grab(in)))
completeStage() completeStage()
} }
override def onUpstreamFailure(ex: Throwable): Unit = { override def onUpstreamFinish(): Unit = {
p.tryFailure(ex) p.trySuccess(None)
failStage(ex) completeStage()
} }
})
override def onUpstreamFailure(ex: Throwable): Unit = {
p.tryFailure(ex)
failStage(ex)
}
setHandler(in, this)
}, p.future) }, p.future)
} }
@ -277,29 +279,28 @@ final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Fu
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[immutable.Seq[T]] = Promise() val p: Promise[immutable.Seq[T]] = Promise()
val logic = new GraphStageLogic(shape) { val logic = new GraphStageLogic(shape) with InHandler {
val buf = Vector.newBuilder[T] val buf = Vector.newBuilder[T]
override def preStart(): Unit = pull(in) override def preStart(): Unit = pull(in)
setHandler(in, new InHandler { def onPush(): Unit = {
buf += grab(in)
pull(in)
}
override def onPush(): Unit = { override def onUpstreamFinish(): Unit = {
buf += grab(in) val result = buf.result()
pull(in) p.trySuccess(result)
} completeStage()
}
override def onUpstreamFinish(): Unit = { override def onUpstreamFailure(ex: Throwable): Unit = {
val result = buf.result() p.tryFailure(ex)
p.trySuccess(result) failStage(ex)
completeStage() }
}
override def onUpstreamFailure(ex: Throwable): Unit = { setHandler(in, this)
p.tryFailure(ex)
failStage(ex)
}
})
} }
(logic, p.future) (logic, p.future)
@ -325,7 +326,7 @@ final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T],
override def toString: String = "QueueSink" override def toString: String = "QueueSink"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] { val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] with InHandler {
type Received[E] = Try[Option[E]] type Received[E] = Try[Option[E]]
val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
@ -383,14 +384,15 @@ final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T],
} }
} }
setHandler(in, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = { enqueueAndNotify(Success(Some(grab(in))))
enqueueAndNotify(Success(Some(grab(in)))) if (buffer.used < maxBuffer) pull(in)
if (buffer.used < maxBuffer) pull(in) }
}
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None)) override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex)) override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))
})
setHandler(in, this)
} }
(stageLogic, new SinkQueueWithCancel[T] { (stageLogic, new SinkQueueWithCancel[T] {

View file

@ -18,18 +18,18 @@ final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[Sourc
override val shape: SourceShape[E] = SourceShape(out) override val shape: SourceShape[E] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfold override def initialAttributes: Attributes = DefaultAttributes.unfold
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { new GraphStageLogic(shape) with OutHandler {
private[this] var state = s private[this] var state = s
setHandler(out, new OutHandler { def onPull(): Unit = f(state) match {
override def onPull(): Unit = f(state) match { case None complete(out)
case None complete(out) case Some((newState, v)) {
case Some((newState, v)) { push(out, v)
push(out, v) state = newState
state = newState
}
} }
}) }
setHandler(out, this)
} }
} }
@ -41,7 +41,7 @@ final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends Gra
override val shape: SourceShape[E] = SourceShape(out) override val shape: SourceShape[E] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfoldAsync override def initialAttributes: Attributes = DefaultAttributes.unfoldAsync
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { new GraphStageLogic(shape) with OutHandler {
private[this] var state = s private[this] var state = s
private[this] var asyncHandler: Function1[Try[Option[(S, E)]], Unit] = _ private[this] var asyncHandler: Function1[Try[Option[(S, E)]], Unit] = _
@ -56,9 +56,9 @@ final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends Gra
asyncHandler = ac.invoke asyncHandler = ac.invoke
} }
setHandler(out, new OutHandler { def onPull(): Unit = f(state).onComplete(asyncHandler)(
override def onPull(): Unit = akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
}) setHandler(out, this)
} }
} }

View file

@ -126,6 +126,7 @@ object ActorGraphInterpreter {
nextInputElementCursor = (nextInputElementCursor + 1) & IndexMask nextInputElementCursor = (nextInputElementCursor + 1) & IndexMask
elem elem
} }
private def clear(): Unit = { private def clear(): Unit = {
java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null) java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null)
inputBufferElements = 0 inputBufferElements = 0

View file

@ -59,14 +59,12 @@ object GraphStages {
object Identity extends SimpleLinearGraphStage[Any] { object Identity extends SimpleLinearGraphStage[Any] {
override def initialAttributes = DefaultAttributes.identityOp override def initialAttributes = DefaultAttributes.identityOp
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
setHandler(in, new InHandler { def onPush(): Unit = push(out, grab(in))
override def onPush(): Unit = push(out, grab(in)) def onPull(): Unit = pull(in)
})
setHandler(out, new OutHandler { setHandler(in, this)
override def onPull(): Unit = pull(in) setHandler(out, this)
})
} }
override def toString = "Identity" override def toString = "Identity"
@ -83,29 +81,28 @@ object GraphStages {
override def initialAttributes = DefaultAttributes.detacher override def initialAttributes = DefaultAttributes.detacher
override val shape = FlowShape(in, out) override val shape = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
setHandler(in, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = { if (isAvailable(out)) {
if (isAvailable(out)) { push(out, grab(in))
push(out, grab(in)) tryPull(in)
tryPull(in)
}
} }
override def onUpstreamFinish(): Unit = { }
if (!isAvailable(in)) completeStage()
}
})
setHandler(out, new OutHandler { override def onUpstreamFinish(): Unit = {
override def onPull(): Unit = { if (!isAvailable(in)) completeStage()
if (isAvailable(in)) { }
push(out, grab(in))
if (isClosed(in)) completeStage() def onPull(): Unit = {
else pull(in) if (isAvailable(in)) {
} push(out, grab(in))
if (isClosed(in)) completeStage()
else pull(in)
} }
}) }
setHandlers(in, out, this)
override def preStart(): Unit = tryPull(in) override def preStart(): Unit = tryPull(in)
} }
@ -125,27 +122,27 @@ object GraphStages {
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
val finishPromise = Promise[Done]() val finishPromise = Promise[Done]()
(new GraphStageLogic(shape) { (new GraphStageLogic(shape) with InHandler with OutHandler {
setHandler(in, new InHandler { def onPush(): Unit = push(out, grab(in))
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {
finishPromise.success(Done) finishPromise.success(Done)
completeStage() completeStage()
} }
override def onUpstreamFailure(ex: Throwable): Unit = { override def onUpstreamFailure(ex: Throwable): Unit = {
finishPromise.failure(ex) finishPromise.failure(ex)
failStage(ex) failStage(ex)
} }
})
setHandler(out, new OutHandler { def onPull(): Unit = pull(in)
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = { override def onDownstreamFinish(): Unit = {
finishPromise.success(Done) finishPromise.success(Done)
completeStage() completeStage()
} }
})
setHandlers(in, out, this)
}, finishPromise.future) }, finishPromise.future)
} }
@ -170,29 +167,33 @@ object GraphStages {
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, FlowMonitor[T]) = { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, FlowMonitor[T]) = {
val monitor: FlowMonitorImpl[T] = new FlowMonitorImpl[T] val monitor: FlowMonitorImpl[T] = new FlowMonitorImpl[T]
val logic: GraphStageLogic = new GraphStageLogic(shape) { val logic: GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
setHandler(in, new InHandler {
override def onPush(): Unit = { def onPush(): Unit = {
val msg = grab(in) val msg = grab(in)
push(out, msg) push(out, msg)
monitor.set(if (msg.isInstanceOf[StreamState[_]]) Received(msg) else msg) monitor.set(if (msg.isInstanceOf[StreamState[_]]) Received(msg) else msg)
} }
override def onUpstreamFinish(): Unit = {
super.onUpstreamFinish() override def onUpstreamFinish(): Unit = {
monitor.set(Finished) super.onUpstreamFinish()
} monitor.set(Finished)
override def onUpstreamFailure(ex: Throwable): Unit = { }
super.onUpstreamFailure(ex)
monitor.set(Failed(ex)) override def onUpstreamFailure(ex: Throwable): Unit = {
} super.onUpstreamFailure(ex)
}) monitor.set(Failed(ex))
setHandler(out, new OutHandler { }
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = { def onPull(): Unit = pull(in)
super.onDownstreamFinish()
monitor.set(Finished) override def onDownstreamFinish(): Unit = {
} super.onDownstreamFinish()
}) monitor.set(Finished)
}
setHandler(in, this)
setHandler(out, this)
override def toString = "MonitorFlowLogic" override def toString = "MonitorFlowLogic"
} }
@ -293,14 +294,15 @@ object GraphStages {
ReactiveStreamsCompliance.requireNonNullElement(elem) ReactiveStreamsCompliance.requireNonNullElement(elem)
val out = Outlet[T]("single.out") val out = Outlet[T]("single.out")
val shape = SourceShape(out) val shape = SourceShape(out)
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { def createLogic(attr: Attributes) =
setHandler(out, new OutHandler { new GraphStageLogic(shape) with OutHandler {
override def onPull(): Unit = { def onPull(): Unit = {
push(out, elem) push(out, elem)
completeStage() completeStage()
} }
}) setHandler(out, this)
} }
override def toString: String = s"SingleSource($elem)" override def toString: String = s"SingleSource($elem)"
} }
@ -309,9 +311,9 @@ object GraphStages {
val shape = SourceShape(Outlet[T]("future.out")) val shape = SourceShape(Outlet[T]("future.out"))
val out = shape.out val out = shape.out
override def initialAttributes: Attributes = DefaultAttributes.futureSource override def initialAttributes: Attributes = DefaultAttributes.futureSource
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { override def createLogic(attr: Attributes) =
setHandler(out, new OutHandler { new GraphStageLogic(shape) with OutHandler {
override def onPull(): Unit = { def onPull(): Unit = {
val cb = getAsyncCallback[Try[T]] { val cb = getAsyncCallback[Try[T]] {
case scala.util.Success(v) emit(out, v, () completeStage()) case scala.util.Success(v) emit(out, v, () completeStage())
case scala.util.Failure(t) failStage(t) case scala.util.Failure(t) failStage(t)
@ -319,8 +321,10 @@ object GraphStages {
future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext)
setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more
} }
})
} setHandler(out, this)
}
override def toString: String = "FutureSource" override def toString: String = "FutureSource"
} }

View file

@ -15,32 +15,32 @@ import java.{ util ⇒ ju }
*/ */
private[akka] object IteratorInterpreter { private[akka] object IteratorInterpreter {
final case class IteratorUpstream[T](input: Iterator[T]) extends UpstreamBoundaryStageLogic[T] { final case class IteratorUpstream[T](input: Iterator[T]) extends UpstreamBoundaryStageLogic[T] with OutHandler {
val out: Outlet[T] = Outlet[T]("IteratorUpstream.out") val out: Outlet[T] = Outlet[T]("IteratorUpstream.out")
out.id = 0 out.id = 0
private var hasNext = input.hasNext private var hasNext = input.hasNext
setHandler(out, new OutHandler { def onPull(): Unit = {
override def onPull(): Unit = { if (!hasNext) complete(out)
if (!hasNext) complete(out) else {
else { val elem = input.next()
val elem = input.next() hasNext = input.hasNext
hasNext = input.hasNext if (!hasNext) {
if (!hasNext) { push(out, elem)
push(out, elem) complete(out)
complete(out) } else push(out, elem)
} else push(out, elem)
}
} }
}
override def onDownstreamFinish(): Unit = () override def onDownstreamFinish(): Unit = ()
})
setHandler(out, this)
override def toString = "IteratorUpstream" override def toString = "IteratorUpstream"
} }
final case class IteratorDownstream[T]() extends DownstreamBoundaryStageLogic[T] with Iterator[T] { final case class IteratorDownstream[T]() extends DownstreamBoundaryStageLogic[T] with Iterator[T] with InHandler {
val in: Inlet[T] = Inlet[T]("IteratorDownstream.in") val in: Inlet[T] = Inlet[T]("IteratorDownstream.in")
in.id = 0 in.id = 0
@ -49,21 +49,21 @@ private[akka] object IteratorInterpreter {
private var needsPull = true private var needsPull = true
private var lastFailure: Throwable = null private var lastFailure: Throwable = null
setHandler(in, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = { nextElem = grab(in)
nextElem = grab(in) needsPull = false
needsPull = false }
}
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {
done = true done = true
} }
override def onUpstreamFailure(cause: Throwable): Unit = { override def onUpstreamFailure(cause: Throwable): Unit = {
done = true done = true
lastFailure = cause lastFailure = cause
} }
})
setHandler(in, this)
private def pullIfNeeded(): Unit = { private def pullIfNeeded(): Unit = {
if (needsPull) { if (needsPull) {
@ -93,7 +93,6 @@ private[akka] object IteratorInterpreter {
// don't let toString consume the iterator // don't let toString consume the iterator
override def toString: String = "IteratorDownstream" override def toString: String = "IteratorDownstream"
} }
} }
/** /**

View file

@ -527,7 +527,7 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext
override val shape = FlowShape(in, out) override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
val startInHandler = new InHandler { val startInHandler = new InHandler {
override def onPush(): Unit = { override def onPush(): Unit = {
// if else (to avoid using Iterator[T].flatten in hot code) // if else (to avoid using Iterator[T].flatten in hot code)
@ -551,12 +551,10 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext
} }
} }
val outHandler = new OutHandler { def onPull(): Unit = pull(in)
override def onPull(): Unit = pull(in)
}
setHandler(in, startInHandler) setHandler(in, startInHandler)
setHandler(out, outHandler) setHandler(out, this)
} }
} }
@ -775,7 +773,7 @@ final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed
override val shape: FlowShape[In, Out] = FlowShape.of(in, out) override val shape: FlowShape[In, Out] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
@ -808,85 +806,81 @@ final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed
override def preStart() = pull(in) override def preStart() = pull(in)
setHandler(in, new InHandler { def onPush(): Unit = {
val elem = grab(in)
val cost = costFn(elem)
override def onPush(): Unit = { if (agg == null) {
val elem = grab(in) try {
val cost = costFn(elem) agg = seed(elem)
left -= cost
if (agg == null) { } catch {
try { case NonFatal(ex) decider(ex) match {
agg = seed(elem) case Supervision.Stop failStage(ex)
left -= cost case Supervision.Restart
} catch { restartState()
case NonFatal(ex) decider(ex) match { case Supervision.Resume
case Supervision.Stop failStage(ex)
case Supervision.Restart
restartState()
case Supervision.Resume
}
}
} else if (left < cost) {
pending = elem
} else {
try {
agg = aggregate(agg, elem)
left -= cost
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Restart
restartState()
case Supervision.Resume
}
} }
} }
} else if (left < cost) {
if (isAvailable(out)) flush() pending = elem
if (pending == null) pull(in) } else {
} try {
agg = aggregate(agg, elem)
override def onUpstreamFinish(): Unit = { left -= cost
if (agg == null) completeStage() } catch {
} case NonFatal(ex) decider(ex) match {
}) case Supervision.Stop failStage(ex)
case Supervision.Restart
setHandler(out, new OutHandler { restartState()
case Supervision.Resume
override def onPull(): Unit = {
if (agg == null) {
if (isClosed(in)) completeStage()
else if (!hasBeenPulled(in)) pull(in)
} else if (isClosed(in)) {
push(out, agg)
if (pending == null) completeStage()
else {
try {
agg = seed(pending)
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Resume
case Supervision.Restart
restartState()
if (!hasBeenPulled(in)) pull(in)
}
}
pending = null.asInstanceOf[In]
} }
} else {
flush()
if (!hasBeenPulled(in)) pull(in)
} }
} }
})
if (isAvailable(out)) flush()
if (pending == null) pull(in)
}
override def onUpstreamFinish(): Unit = {
if (agg == null) completeStage()
}
def onPull(): Unit = {
if (agg == null) {
if (isClosed(in)) completeStage()
else if (!hasBeenPulled(in)) pull(in)
} else if (isClosed(in)) {
push(out, agg)
if (pending == null) completeStage()
else {
try {
agg = seed(pending)
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case Supervision.Resume
case Supervision.Restart
restartState()
if (!hasBeenPulled(in)) pull(in)
}
}
pending = null.asInstanceOf[In]
}
} else {
flush()
if (!hasBeenPulled(in)) pull(in)
}
}
private def restartState(): Unit = { private def restartState(): Unit = {
agg = null.asInstanceOf[Out] agg = null.asInstanceOf[Out]
left = max left = max
pending = null.asInstanceOf[In] pending = null.asInstanceOf[In]
} }
setHandlers(in, out, this)
} }
} }
@ -900,46 +894,46 @@ final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends Graph
override def initialAttributes = DefaultAttributes.expand override def initialAttributes = DefaultAttributes.expand
override val shape = FlowShape(in, out) override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
private var iterator: Iterator[Out] = Iterator.empty private var iterator: Iterator[Out] = Iterator.empty
private var expanded = false private var expanded = false
override def preStart(): Unit = pull(in) override def preStart(): Unit = pull(in)
setHandler(in, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = { iterator = extrapolate(grab(in))
iterator = extrapolate(grab(in)) if (iterator.hasNext) {
if (iterator.hasNext) { if (isAvailable(out)) {
if (isAvailable(out)) { expanded = true
expanded = true pull(in)
push(out, iterator.next())
} else expanded = false
} else pull(in)
}
override def onUpstreamFinish(): Unit = {
if (iterator.hasNext && !expanded) () // need to wait
else completeStage()
}
def onPull(): Unit = {
if (iterator.hasNext) {
if (!expanded) {
expanded = true
if (isClosed(in)) {
push(out, iterator.next())
completeStage()
} else {
// expand needs to pull first to be fair when upstream is not actually slow
pull(in) pull(in)
push(out, iterator.next()) push(out, iterator.next())
} else expanded = false }
} else pull(in) } else push(out, iterator.next())
} }
override def onUpstreamFinish(): Unit = { }
if (iterator.hasNext && !expanded) () // need to wait
else completeStage()
}
})
setHandler(out, new OutHandler { setHandler(in, this)
override def onPull(): Unit = { setHandler(out, this)
if (iterator.hasNext) {
if (!expanded) {
expanded = true
if (isClosed(in)) {
push(out, iterator.next())
completeStage()
} else {
// expand needs to pull first to be fair when upstream is not actually slow
pull(in)
push(out, iterator.next())
}
} else push(out, iterator.next())
}
}
})
} }
} }
@ -1303,7 +1297,7 @@ final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphSta
final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer" private[this] def timerName = "DelayedTimer"
override def initialAttributes: Attributes = DefaultAttributes.delay override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
val size = val size =
inheritedAttributes.get[InputBuffer] match { inheritedAttributes.get[InputBuffer] match {
case None throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this") case None throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this")
@ -1315,59 +1309,58 @@ final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy)
override def preStart(): Unit = buffer = BufferImpl(size, materializer) override def preStart(): Unit = buffer = BufferImpl(size, materializer)
setHandler(in, handler = new InHandler { //FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full
//FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full def onPush(): Unit = {
override def onPush(): Unit = { if (buffer.isFull) strategy match {
if (buffer.isFull) strategy match { case EmitEarly
case EmitEarly if (!isTimerActive(timerName))
if (!isTimerActive(timerName)) push(out, buffer.dequeue()._2)
push(out, buffer.dequeue()._2) else {
else { cancelTimer(timerName)
cancelTimer(timerName) onTimer(timerName)
onTimer(timerName) }
} case DropHead
case DropHead buffer.dropHead()
buffer.dropHead() grabAndPull(true)
grabAndPull(true) case DropTail
case DropTail buffer.dropTail()
buffer.dropTail() grabAndPull(true)
grabAndPull(true) case DropNew
case DropNew grab(in)
grab(in)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
case DropBuffer
buffer.clear()
grabAndPull(true)
case Fail
failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
case Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
else {
grabAndPull(strategy != Backpressure || buffer.used < size - 1)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d) if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
} case DropBuffer
buffer.clear()
grabAndPull(true)
case Fail
failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
case Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
} }
else {
def grabAndPull(pullCondition: Boolean): Unit = { grabAndPull(strategy != Backpressure || buffer.used < size - 1)
buffer.enqueue((System.nanoTime(), grab(in))) if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
if (pullCondition) pull(in)
} }
}
override def onUpstreamFinish(): Unit = { def grabAndPull(pullCondition: Boolean): Unit = {
if (isAvailable(out) && isTimerActive(timerName)) willStop = true buffer.enqueue((System.nanoTime(), grab(in)))
else completeStage() if (pullCondition) pull(in)
} }
})
setHandler(out, new OutHandler { override def onUpstreamFinish(): Unit = {
override def onPull(): Unit = { if (isAvailable(out) && isTimerActive(timerName)) willStop = true
if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) else completeStage()
push(out, buffer.dequeue()._2) }
if (!willStop && !hasBeenPulled(in)) pull(in) def onPull(): Unit = {
completeIfReady() if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0)
} push(out, buffer.dequeue()._2)
})
if (!willStop && !hasBeenPulled(in)) pull(in)
completeIfReady()
}
setHandler(in, this)
setHandler(out, this)
def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage() def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage()
@ -1388,14 +1381,12 @@ final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy)
final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
setHandler(in, new InHandler { def onPush(): Unit = push(out, grab(in))
override def onPush(): Unit = push(out, grab(in)) def onPull(): Unit = pull(in)
})
setHandler(out, new OutHandler { setHandler(in, this)
override def onPull(): Unit = pull(in) setHandler(out, this)
})
final override protected def onTimer(key: Any): Unit = final override protected def onTimer(key: Any): Unit =
completeStage() completeStage()
@ -1407,19 +1398,19 @@ final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph
} }
final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private var allow = false private var allow = false
setHandler(in, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = if (allow) push(out, grab(in))
if (allow) push(out, grab(in)) else pull(in)
else pull(in) }
})
setHandler(out, new OutHandler { def onPull(): Unit = pull(in)
override def onPull(): Unit = pull(in)
}) setHandler(in, this)
setHandler(out, this)
final override protected def onTimer(key: Any): Unit = allow = true final override protected def onTimer(key: Any): Unit = allow = true
@ -1466,6 +1457,7 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] {
setHandler(out, self) setHandler(out, self)
} }
override def toString = "Reduce" override def toString = "Reduce"
} }

View file

@ -32,7 +32,6 @@ final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Gr
override val shape = FlowShape(in, out) override val shape = FlowShape(in, out)
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
var sources = Set.empty[SubSinkInlet[T]] var sources = Set.empty[SubSinkInlet[T]]
def activeSources = sources.size def activeSources = sources.size

View file

@ -22,7 +22,7 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
override def initialAttributes = Attributes.name("ByteStringParser") override def initialAttributes = Attributes.name("ByteStringParser")
final override val shape = FlowShape(bytesIn, objOut) final override val shape = FlowShape(bytesIn, objOut)
class ParsingLogic extends GraphStageLogic(shape) { class ParsingLogic extends GraphStageLogic(shape) with InHandler {
var pullOnParserRequest = false var pullOnParserRequest = false
override def preStart(): Unit = pull(bytesIn) override def preStart(): Unit = pull(bytesIn)
setHandler(objOut, eagerTerminateOutput) setHandler(objOut, eagerTerminateOutput)
@ -58,16 +58,18 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By
if (cont) doParse() if (cont) doParse()
} else pull(bytesIn) } else pull(bytesIn)
setHandler(bytesIn, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = { pullOnParserRequest = false
pullOnParserRequest = false buffer ++= grab(bytesIn)
buffer ++= grab(bytesIn) doParse()
doParse() }
}
override def onUpstreamFinish(): Unit = override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty && acceptUpstreamFinish) completeStage() if (buffer.isEmpty && acceptUpstreamFinish) completeStage()
else current.onTruncation() else current.onTruncation()
}) }
setHandler(bytesIn, this)
} }
} }

View file

@ -46,7 +46,7 @@ final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) ex
val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 2) val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 2)
val logic = new GraphStageLogic(shape) with StageWithCallback { val logic = new GraphStageLogic(shape) with StageWithCallback with InHandler {
private val callback: AsyncCallback[AdapterToStageMessage] = private val callback: AsyncCallback[AdapterToStageMessage] =
getAsyncCallback { getAsyncCallback {
@ -65,23 +65,26 @@ final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) ex
pull(in) pull(in)
} }
setHandler(in, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = { //1 is buffer for Finished or Failed callback
//1 is buffer for Finished or Failed callback require(dataQueue.remainingCapacity() > 1)
require(dataQueue.remainingCapacity() > 1) dataQueue.add(Data(grab(in)))
dataQueue.add(Data(grab(in))) if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed()
if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed() }
}
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {
dataQueue.add(Finished) dataQueue.add(Finished)
completeStage() completeStage()
} }
override def onUpstreamFailure(ex: Throwable): Unit = {
dataQueue.add(Failed(ex)) override def onUpstreamFailure(ex: Throwable): Unit = {
failStage(ex) dataQueue.add(Failed(ex))
} failStage(ex)
}) }
setHandler(in, this)
} }
(logic, new InputStreamAdapter(dataQueue, logic.wakeUp, readTimeout)) (logic, new InputStreamAdapter(dataQueue, logic.wakeUp, readTimeout))
} }
} }

View file

@ -266,7 +266,7 @@ final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerCl
val out: Outlet[T] = Outlet[T]("Interleave.out") val out: Outlet[T] = Outlet[T]("Interleave.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
private var counter = 0 private var counter = 0
private var currentUpstreamIndex = 0 private var currentUpstreamIndex = 0
private var runningUpstreams = inputPorts private var runningUpstreams = inputPorts
@ -315,9 +315,10 @@ final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerCl
}) })
} }
setHandler(out, new OutHandler { def onPull(): Unit =
override def onPull(): Unit = if (!hasBeenPulled(currentUpstream)) tryPull(currentUpstream) if (!hasBeenPulled(currentUpstream)) tryPull(currentUpstream)
})
setHandler(out, this)
} }
override def toString = "Interleave" override def toString = "Interleave"
@ -405,30 +406,30 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends
override def initialAttributes = DefaultAttributes.broadcast override def initialAttributes = DefaultAttributes.broadcast
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*) override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler {
private var pendingCount = outputPorts private var pendingCount = outputPorts
private val pending = Array.fill[Boolean](outputPorts)(true) private val pending = Array.fill[Boolean](outputPorts)(true)
private var downstreamsRunning = outputPorts private var downstreamsRunning = outputPorts
setHandler(in, new InHandler { def onPush(): Unit = {
override def onPush(): Unit = { pendingCount = downstreamsRunning
pendingCount = downstreamsRunning val elem = grab(in)
val elem = grab(in)
var idx = 0 var idx = 0
val itr = out.iterator val itr = out.iterator
while (itr.hasNext) { while (itr.hasNext) {
val o = itr.next() val o = itr.next()
val i = idx val i = idx
if (!isClosed(o)) { if (!isClosed(o)) {
push(o, elem) push(o, elem)
pending(i) = true pending(i) = true
}
idx += 1
} }
idx += 1
} }
}) }
setHandler(in, this)
private def tryPull(): Unit = private def tryPull(): Unit =
if (pendingCount == 0 && !hasBeenPulled(in)) pull(in) if (pendingCount == 0 && !hasBeenPulled(in)) pull(in)
@ -502,36 +503,35 @@ final class Partition[T](val outputPorts: Int, val partitioner: T ⇒ Int) exten
val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i Outlet[T]("Partition.out" + i)) val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i Outlet[T]("Partition.out" + i))
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler {
private var outPendingElem: Any = null private var outPendingElem: Any = null
private var outPendingIdx: Int = _ private var outPendingIdx: Int = _
private var downstreamRunning = outputPorts private var downstreamRunning = outputPorts
setHandler(in, new InHandler { def onPush() = {
override def onPush() = { val elem = grab(in)
val elem = grab(in) val idx = partitioner(elem)
val idx = partitioner(elem) if (idx < 0 || idx >= outputPorts) {
if (idx < 0 || idx >= outputPorts) failStage(PartitionOutOfBoundsException(s"partitioner must return an index in the range [0,${outputPorts - 1}]. returned: [$idx] for input [${elem.getClass.getName}]."))
failStage(PartitionOutOfBoundsException(s"partitioner must return an index in the range [0,${outputPorts - 1}]. returned: [$idx] for input [${elem.getClass.getName}].")) } else if (!isClosed(out(idx))) {
else if (!isClosed(out(idx))) { if (isAvailable(out(idx))) {
if (isAvailable(out(idx))) { push(out(idx), elem)
push(out(idx), elem) if (out.exists(isAvailable(_)))
if (out.exists(isAvailable(_))) pull(in)
pull(in) } else {
} else { outPendingElem = elem
outPendingElem = elem outPendingIdx = idx
outPendingIdx = idx }
}
} else if (out.exists(isAvailable(_))) } else if (out.exists(isAvailable(_)))
pull(in) pull(in)
} }
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {
if (outPendingElem == null) if (outPendingElem == null) completeStage()
completeStage() }
}
}) setHandler(in, this)
out.zipWithIndex.foreach { out.zipWithIndex.foreach {
case (o, idx) case (o, idx)
@ -610,7 +610,7 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean)
override def initialAttributes = DefaultAttributes.balance override def initialAttributes = DefaultAttributes.balance
override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler {
private val pendingQueue = FixedSizeBuffer[Outlet[T]](outputPorts) private val pendingQueue = FixedSizeBuffer[Outlet[T]](outputPorts)
private def noPending: Boolean = pendingQueue.isEmpty private def noPending: Boolean = pendingQueue.isEmpty
@ -633,9 +633,8 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean)
} }
} }
setHandler(in, new InHandler { def onPush(): Unit = dequeueAndDispatch()
override def onPush(): Unit = dequeueAndDispatch() setHandler(in, this)
})
out.foreach { o out.foreach { o
setHandler(o, new OutHandler { setHandler(o, new OutHandler {
@ -802,7 +801,7 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[
def out = shape.out def out = shape.out
val inSeq = shape.inSeq val inSeq = shape.inSeq
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler {
var pending = 0 var pending = 0
// Without this field the completion signalling would take one extra pull // Without this field the completion signalling would take one extra pull
var willShutDown = false var willShutDown = false
@ -835,16 +834,15 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[
}) })
}) })
setHandler(out, new OutHandler { def onPull(): Unit = {
override def onPull(): Unit = { pending += n
pending += n if (pending == 0) pushAll()
if (pending == 0) pushAll() }
}
}) setHandler(out, this)
} }
override def toString = "ZipWithN" override def toString = "ZipWithN"
} }
object Concat { object Concat {
@ -877,7 +875,7 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
override def initialAttributes = DefaultAttributes.concat override def initialAttributes = DefaultAttributes.concat
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler {
var activeStream: Int = 0 var activeStream: Int = 0
{ {
@ -905,9 +903,9 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[
} }
} }
setHandler(out, new OutHandler { def onPull() = pull(in(activeStream))
override def onPull() = pull(in(activeStream))
}) setHandler(out, this)
} }
override def toString: String = s"Concat($inputPorts)" override def toString: String = s"Concat($inputPorts)"

View file

@ -49,7 +49,7 @@ object AbstractStage {
private var currentStage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext] = stage private var currentStage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext] = stage
{ {
// No need to refer to the handle in a private val // No need to refer to the handler in a private val
val handler = new InHandler with OutHandler { val handler = new InHandler with OutHandler {
override def onPush(): Unit = override def onPush(): Unit =
try { currentStage.onPush(grab(shape.in), ctx) } catch { case NonFatal(ex) onSupervision(ex) } try { currentStage.onPush(grab(shape.in), ctx) } catch { case NonFatal(ex) onSupervision(ex) }