diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala index 50f90da798..dd8746f949 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefBackpressureSinkStage.scala @@ -24,7 +24,7 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa override val shape: SinkShape[In] = SinkShape(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) { + new GraphStageLogic(shape) with InHandler { implicit def self: ActorRef = stageActor.ref val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max @@ -67,24 +67,26 @@ private[akka] class ActorRefBackpressureSinkStage[In](ref: ActorRef, onInitMessa completeStage() } - setHandler(in, new InHandler { - override def onPush(): Unit = { - buffer offer grab(in) - if (acknowledgementReceived) { - dequeueAndSend() - acknowledgementReceived = false - } - if (buffer.size() < maxBuffer) pull(in) + def onPush(): Unit = { + buffer offer grab(in) + if (acknowledgementReceived) { + dequeueAndSend() + acknowledgementReceived = false } - override def onUpstreamFinish(): Unit = { - if (buffer.isEmpty) finish() - else completeReceived = true - } - override def onUpstreamFailure(ex: Throwable): Unit = { - ref ! onFailureMessage(ex) - failStage(ex) - } - }) + if (buffer.size() < maxBuffer) pull(in) + } + + override def onUpstreamFinish(): Unit = { + if (buffer.isEmpty) finish() + else completeReceived = true + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + ref ! onFailureMessage(ex) + failStage(ex) + } + + setHandler(in, this) } override def toString = "ActorRefBackpressureSink" diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index d2f62f25bc..ae33bd13ae 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -205,29 +205,30 @@ final class LastOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val p: Promise[Option[T]] = Promise() - (new GraphStageLogic(shape) { + (new GraphStageLogic(shape) with InHandler { + private[this] var prev: T = null.asInstanceOf[T] + override def preStart(): Unit = pull(in) - setHandler(in, new InHandler { - private[this] var prev: T = null.asInstanceOf[T] - override def onPush(): Unit = { - prev = grab(in) - pull(in) - } + def onPush(): Unit = { + prev = grab(in) + pull(in) + } - override def onUpstreamFinish(): Unit = { - val head = prev - prev = null.asInstanceOf[T] - p.trySuccess(Option(head)) - completeStage() - } + override def onUpstreamFinish(): Unit = { + val head = prev + prev = null.asInstanceOf[T] + p.trySuccess(Option(head)) + completeStage() + } - override def onUpstreamFailure(ex: Throwable): Unit = { - prev = null.asInstanceOf[T] - p.tryFailure(ex) - failStage(ex) - } - }) + override def onUpstreamFailure(ex: Throwable): Unit = { + prev = null.asInstanceOf[T] + p.tryFailure(ex) + failStage(ex) + } + + setHandler(in, this) }, p.future) } @@ -242,24 +243,25 @@ final class HeadOptionStage[T] extends GraphStageWithMaterializedValue[SinkShape override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val p: Promise[Option[T]] = Promise() - (new GraphStageLogic(shape) { + (new GraphStageLogic(shape) with InHandler { override def preStart(): Unit = pull(in) - setHandler(in, new InHandler { - override def onPush(): Unit = { - p.trySuccess(Option(grab(in))) - completeStage() - } - override def onUpstreamFinish(): Unit = { - p.trySuccess(None) - completeStage() - } + def onPush(): Unit = { + p.trySuccess(Option(grab(in))) + completeStage() + } - override def onUpstreamFailure(ex: Throwable): Unit = { - p.tryFailure(ex) - failStage(ex) - } - }) + override def onUpstreamFinish(): Unit = { + p.trySuccess(None) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + p.tryFailure(ex) + failStage(ex) + } + + setHandler(in, this) }, p.future) } @@ -277,29 +279,28 @@ final class SeqStage[T] extends GraphStageWithMaterializedValue[SinkShape[T], Fu override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val p: Promise[immutable.Seq[T]] = Promise() - val logic = new GraphStageLogic(shape) { + val logic = new GraphStageLogic(shape) with InHandler { val buf = Vector.newBuilder[T] override def preStart(): Unit = pull(in) - setHandler(in, new InHandler { + def onPush(): Unit = { + buf += grab(in) + pull(in) + } - override def onPush(): Unit = { - buf += grab(in) - pull(in) - } + override def onUpstreamFinish(): Unit = { + val result = buf.result() + p.trySuccess(result) + completeStage() + } - override def onUpstreamFinish(): Unit = { - val result = buf.result() - p.trySuccess(result) - completeStage() - } + override def onUpstreamFailure(ex: Throwable): Unit = { + p.tryFailure(ex) + failStage(ex) + } - override def onUpstreamFailure(ex: Throwable): Unit = { - p.tryFailure(ex) - failStage(ex) - } - }) + setHandler(in, this) } (logic, p.future) @@ -325,7 +326,7 @@ final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], override def toString: String = "QueueSink" override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] { + val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Output[T]] with InHandler { type Received[E] = Try[Option[E]] val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max @@ -383,14 +384,15 @@ final class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkShape[T], } } - setHandler(in, new InHandler { - override def onPush(): Unit = { - enqueueAndNotify(Success(Some(grab(in)))) - if (buffer.used < maxBuffer) pull(in) - } - override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None)) - override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex)) - }) + def onPush(): Unit = { + enqueueAndNotify(Success(Some(grab(in)))) + if (buffer.used < maxBuffer) pull(in) + } + + override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None)) + override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex)) + + setHandler(in, this) } (stageLogic, new SinkQueueWithCancel[T] { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala index e60c810079..a69a7e07ed 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala @@ -18,18 +18,18 @@ final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[Sourc override val shape: SourceShape[E] = SourceShape(out) override def initialAttributes: Attributes = DefaultAttributes.unfold override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) { + new GraphStageLogic(shape) with OutHandler { private[this] var state = s - setHandler(out, new OutHandler { - override def onPull(): Unit = f(state) match { - case None ⇒ complete(out) - case Some((newState, v)) ⇒ { - push(out, v) - state = newState - } + def onPull(): Unit = f(state) match { + case None ⇒ complete(out) + case Some((newState, v)) ⇒ { + push(out, v) + state = newState } - }) + } + + setHandler(out, this) } } @@ -41,7 +41,7 @@ final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends Gra override val shape: SourceShape[E] = SourceShape(out) override def initialAttributes: Attributes = DefaultAttributes.unfoldAsync override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) { + new GraphStageLogic(shape) with OutHandler { private[this] var state = s private[this] var asyncHandler: Function1[Try[Option[(S, E)]], Unit] = _ @@ -56,9 +56,9 @@ final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends Gra asyncHandler = ac.invoke } - setHandler(out, new OutHandler { - override def onPull(): Unit = - f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) - }) + def onPull(): Unit = f(state).onComplete(asyncHandler)( + akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + + setHandler(out, this) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index 403513cda9..4a29288ffa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -126,6 +126,7 @@ object ActorGraphInterpreter { nextInputElementCursor = (nextInputElementCursor + 1) & IndexMask elem } + private def clear(): Unit = { java.util.Arrays.fill(inputBuffer, 0, inputBuffer.length, null) inputBufferElements = 0 diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 0a08583d84..7f664d1935 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -59,14 +59,12 @@ object GraphStages { object Identity extends SimpleLinearGraphStage[Any] { override def initialAttributes = DefaultAttributes.identityOp - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { - setHandler(in, new InHandler { - override def onPush(): Unit = push(out, grab(in)) - }) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + def onPush(): Unit = push(out, grab(in)) + def onPull(): Unit = pull(in) - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - }) + setHandler(in, this) + setHandler(out, this) } override def toString = "Identity" @@ -83,29 +81,28 @@ object GraphStages { override def initialAttributes = DefaultAttributes.detacher override val shape = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { - setHandler(in, new InHandler { - override def onPush(): Unit = { - if (isAvailable(out)) { - push(out, grab(in)) - tryPull(in) - } + def onPush(): Unit = { + if (isAvailable(out)) { + push(out, grab(in)) + tryPull(in) } - override def onUpstreamFinish(): Unit = { - if (!isAvailable(in)) completeStage() - } - }) + } - setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (isAvailable(in)) { - push(out, grab(in)) - if (isClosed(in)) completeStage() - else pull(in) - } + override def onUpstreamFinish(): Unit = { + if (!isAvailable(in)) completeStage() + } + + def onPull(): Unit = { + if (isAvailable(in)) { + push(out, grab(in)) + if (isClosed(in)) completeStage() + else pull(in) } - }) + } + + setHandlers(in, out, this) override def preStart(): Unit = tryPull(in) } @@ -125,27 +122,27 @@ object GraphStages { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { val finishPromise = Promise[Done]() - (new GraphStageLogic(shape) { - setHandler(in, new InHandler { - override def onPush(): Unit = push(out, grab(in)) + (new GraphStageLogic(shape) with InHandler with OutHandler { + def onPush(): Unit = push(out, grab(in)) - override def onUpstreamFinish(): Unit = { - finishPromise.success(Done) - completeStage() - } + override def onUpstreamFinish(): Unit = { + finishPromise.success(Done) + completeStage() + } - override def onUpstreamFailure(ex: Throwable): Unit = { - finishPromise.failure(ex) - failStage(ex) - } - }) - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - override def onDownstreamFinish(): Unit = { - finishPromise.success(Done) - completeStage() - } - }) + override def onUpstreamFailure(ex: Throwable): Unit = { + finishPromise.failure(ex) + failStage(ex) + } + + def onPull(): Unit = pull(in) + + override def onDownstreamFinish(): Unit = { + finishPromise.success(Done) + completeStage() + } + + setHandlers(in, out, this) }, finishPromise.future) } @@ -170,29 +167,33 @@ object GraphStages { override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, FlowMonitor[T]) = { val monitor: FlowMonitorImpl[T] = new FlowMonitorImpl[T] - val logic: GraphStageLogic = new GraphStageLogic(shape) { - setHandler(in, new InHandler { - override def onPush(): Unit = { - val msg = grab(in) - push(out, msg) - monitor.set(if (msg.isInstanceOf[StreamState[_]]) Received(msg) else msg) - } - override def onUpstreamFinish(): Unit = { - super.onUpstreamFinish() - monitor.set(Finished) - } - override def onUpstreamFailure(ex: Throwable): Unit = { - super.onUpstreamFailure(ex) - monitor.set(Failed(ex)) - } - }) - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - override def onDownstreamFinish(): Unit = { - super.onDownstreamFinish() - monitor.set(Finished) - } - }) + val logic: GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { + + def onPush(): Unit = { + val msg = grab(in) + push(out, msg) + monitor.set(if (msg.isInstanceOf[StreamState[_]]) Received(msg) else msg) + } + + override def onUpstreamFinish(): Unit = { + super.onUpstreamFinish() + monitor.set(Finished) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + super.onUpstreamFailure(ex) + monitor.set(Failed(ex)) + } + + def onPull(): Unit = pull(in) + + override def onDownstreamFinish(): Unit = { + super.onDownstreamFinish() + monitor.set(Finished) + } + + setHandler(in, this) + setHandler(out, this) override def toString = "MonitorFlowLogic" } @@ -293,14 +294,15 @@ object GraphStages { ReactiveStreamsCompliance.requireNonNullElement(elem) val out = Outlet[T]("single.out") val shape = SourceShape(out) - override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { - setHandler(out, new OutHandler { - override def onPull(): Unit = { + def createLogic(attr: Attributes) = + new GraphStageLogic(shape) with OutHandler { + def onPull(): Unit = { push(out, elem) completeStage() } - }) - } + setHandler(out, this) + } + override def toString: String = s"SingleSource($elem)" } @@ -309,9 +311,9 @@ object GraphStages { val shape = SourceShape(Outlet[T]("future.out")) val out = shape.out override def initialAttributes: Attributes = DefaultAttributes.futureSource - override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { - setHandler(out, new OutHandler { - override def onPull(): Unit = { + override def createLogic(attr: Attributes) = + new GraphStageLogic(shape) with OutHandler { + def onPull(): Unit = { val cb = getAsyncCallback[Try[T]] { case scala.util.Success(v) ⇒ emit(out, v, () ⇒ completeStage()) case scala.util.Failure(t) ⇒ failStage(t) @@ -319,8 +321,10 @@ object GraphStages { future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more } - }) - } + + setHandler(out, this) + } + override def toString: String = "FutureSource" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index cb011795b3..37704497b9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -15,32 +15,32 @@ import java.{ util ⇒ ju } */ private[akka] object IteratorInterpreter { - final case class IteratorUpstream[T](input: Iterator[T]) extends UpstreamBoundaryStageLogic[T] { + final case class IteratorUpstream[T](input: Iterator[T]) extends UpstreamBoundaryStageLogic[T] with OutHandler { val out: Outlet[T] = Outlet[T]("IteratorUpstream.out") out.id = 0 private var hasNext = input.hasNext - setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (!hasNext) complete(out) - else { - val elem = input.next() - hasNext = input.hasNext - if (!hasNext) { - push(out, elem) - complete(out) - } else push(out, elem) - } + def onPull(): Unit = { + if (!hasNext) complete(out) + else { + val elem = input.next() + hasNext = input.hasNext + if (!hasNext) { + push(out, elem) + complete(out) + } else push(out, elem) } + } - override def onDownstreamFinish(): Unit = () - }) + override def onDownstreamFinish(): Unit = () + + setHandler(out, this) override def toString = "IteratorUpstream" } - final case class IteratorDownstream[T]() extends DownstreamBoundaryStageLogic[T] with Iterator[T] { + final case class IteratorDownstream[T]() extends DownstreamBoundaryStageLogic[T] with Iterator[T] with InHandler { val in: Inlet[T] = Inlet[T]("IteratorDownstream.in") in.id = 0 @@ -49,21 +49,21 @@ private[akka] object IteratorInterpreter { private var needsPull = true private var lastFailure: Throwable = null - setHandler(in, new InHandler { - override def onPush(): Unit = { - nextElem = grab(in) - needsPull = false - } + def onPush(): Unit = { + nextElem = grab(in) + needsPull = false + } - override def onUpstreamFinish(): Unit = { - done = true - } + override def onUpstreamFinish(): Unit = { + done = true + } - override def onUpstreamFailure(cause: Throwable): Unit = { - done = true - lastFailure = cause - } - }) + override def onUpstreamFailure(cause: Throwable): Unit = { + done = true + lastFailure = cause + } + + setHandler(in, this) private def pullIfNeeded(): Unit = { if (needsPull) { @@ -93,7 +93,6 @@ private[akka] object IteratorInterpreter { // don't let toString consume the iterator override def toString: String = "IteratorDownstream" } - } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index e2bcbe76d9..c8c59fbe62 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -527,7 +527,7 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext override val shape = FlowShape(in, out) - override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(attr: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { val startInHandler = new InHandler { override def onPush(): Unit = { // if else (to avoid using Iterator[T].flatten in hot code) @@ -551,12 +551,10 @@ final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) ext } } - val outHandler = new OutHandler { - override def onPull(): Unit = pull(in) - } + def onPull(): Unit = pull(in) setHandler(in, startInHandler) - setHandler(out, outHandler) + setHandler(out, this) } } @@ -775,7 +773,7 @@ final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed override val shape: FlowShape[In, Out] = FlowShape.of(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) @@ -808,85 +806,81 @@ final case class Batch[In, Out](val max: Long, val costFn: In ⇒ Long, val seed override def preStart() = pull(in) - setHandler(in, new InHandler { + def onPush(): Unit = { + val elem = grab(in) + val cost = costFn(elem) - override def onPush(): Unit = { - val elem = grab(in) - val cost = costFn(elem) - - if (agg == null) { - try { - agg = seed(elem) - left -= cost - } catch { - case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ failStage(ex) - case Supervision.Restart ⇒ - restartState() - case Supervision.Resume ⇒ - } - } - } else if (left < cost) { - pending = elem - } else { - try { - agg = aggregate(agg, elem) - left -= cost - } catch { - case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ failStage(ex) - case Supervision.Restart ⇒ - restartState() - case Supervision.Resume ⇒ - } + if (agg == null) { + try { + agg = seed(elem) + left -= cost + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case Supervision.Restart ⇒ + restartState() + case Supervision.Resume ⇒ } } - - if (isAvailable(out)) flush() - if (pending == null) pull(in) - } - - override def onUpstreamFinish(): Unit = { - if (agg == null) completeStage() - } - }) - - setHandler(out, new OutHandler { - - override def onPull(): Unit = { - if (agg == null) { - if (isClosed(in)) completeStage() - else if (!hasBeenPulled(in)) pull(in) - } else if (isClosed(in)) { - push(out, agg) - if (pending == null) completeStage() - else { - try { - agg = seed(pending) - } catch { - case NonFatal(ex) ⇒ decider(ex) match { - case Supervision.Stop ⇒ failStage(ex) - case Supervision.Resume ⇒ - case Supervision.Restart ⇒ - restartState() - if (!hasBeenPulled(in)) pull(in) - } - } - pending = null.asInstanceOf[In] + } else if (left < cost) { + pending = elem + } else { + try { + agg = aggregate(agg, elem) + left -= cost + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case Supervision.Restart ⇒ + restartState() + case Supervision.Resume ⇒ } - } else { - flush() - if (!hasBeenPulled(in)) pull(in) } - } - }) + + if (isAvailable(out)) flush() + if (pending == null) pull(in) + } + + override def onUpstreamFinish(): Unit = { + if (agg == null) completeStage() + } + + def onPull(): Unit = { + if (agg == null) { + if (isClosed(in)) completeStage() + else if (!hasBeenPulled(in)) pull(in) + } else if (isClosed(in)) { + push(out, agg) + if (pending == null) completeStage() + else { + try { + agg = seed(pending) + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case Supervision.Resume ⇒ + case Supervision.Restart ⇒ + restartState() + if (!hasBeenPulled(in)) pull(in) + } + } + pending = null.asInstanceOf[In] + } + } else { + flush() + if (!hasBeenPulled(in)) pull(in) + } + + } private def restartState(): Unit = { agg = null.asInstanceOf[Out] left = max pending = null.asInstanceOf[In] } + + setHandlers(in, out, this) } } @@ -900,46 +894,46 @@ final class Expand[In, Out](val extrapolate: In ⇒ Iterator[Out]) extends Graph override def initialAttributes = DefaultAttributes.expand override val shape = FlowShape(in, out) - override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { private var iterator: Iterator[Out] = Iterator.empty private var expanded = false override def preStart(): Unit = pull(in) - setHandler(in, new InHandler { - override def onPush(): Unit = { - iterator = extrapolate(grab(in)) - if (iterator.hasNext) { - if (isAvailable(out)) { - expanded = true + def onPush(): Unit = { + iterator = extrapolate(grab(in)) + if (iterator.hasNext) { + if (isAvailable(out)) { + expanded = true + pull(in) + push(out, iterator.next()) + } else expanded = false + } else pull(in) + } + + override def onUpstreamFinish(): Unit = { + if (iterator.hasNext && !expanded) () // need to wait + else completeStage() + } + + def onPull(): Unit = { + if (iterator.hasNext) { + if (!expanded) { + expanded = true + if (isClosed(in)) { + push(out, iterator.next()) + completeStage() + } else { + // expand needs to pull first to be “fair” when upstream is not actually slow pull(in) push(out, iterator.next()) - } else expanded = false - } else pull(in) + } + } else push(out, iterator.next()) } - override def onUpstreamFinish(): Unit = { - if (iterator.hasNext && !expanded) () // need to wait - else completeStage() - } - }) + } - setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (iterator.hasNext) { - if (!expanded) { - expanded = true - if (isClosed(in)) { - push(out, iterator.next()) - completeStage() - } else { - // expand needs to pull first to be “fair” when upstream is not actually slow - pull(in) - push(out, iterator.next()) - } - } else push(out, iterator.next()) - } - } - }) + setHandler(in, this) + setHandler(out, this) } } @@ -1303,7 +1297,7 @@ final class GroupedWithin[T](val n: Int, val d: FiniteDuration) extends GraphSta final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { private[this] def timerName = "DelayedTimer" override def initialAttributes: Attributes = DefaultAttributes.delay - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { val size = inheritedAttributes.get[InputBuffer] match { case None ⇒ throw new IllegalStateException(s"Couldn't find InputBuffer Attribute for $this") @@ -1315,59 +1309,58 @@ final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) override def preStart(): Unit = buffer = BufferImpl(size, materializer) - setHandler(in, handler = new InHandler { - //FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full - override def onPush(): Unit = { - if (buffer.isFull) strategy match { - case EmitEarly ⇒ - if (!isTimerActive(timerName)) - push(out, buffer.dequeue()._2) - else { - cancelTimer(timerName) - onTimer(timerName) - } - case DropHead ⇒ - buffer.dropHead() - grabAndPull(true) - case DropTail ⇒ - buffer.dropTail() - grabAndPull(true) - case DropNew ⇒ - grab(in) - if (!isTimerActive(timerName)) scheduleOnce(timerName, d) - case DropBuffer ⇒ - buffer.clear() - grabAndPull(true) - case Fail ⇒ - failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!")) - case Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") - } - else { - grabAndPull(strategy != Backpressure || buffer.used < size - 1) + //FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full + def onPush(): Unit = { + if (buffer.isFull) strategy match { + case EmitEarly ⇒ + if (!isTimerActive(timerName)) + push(out, buffer.dequeue()._2) + else { + cancelTimer(timerName) + onTimer(timerName) + } + case DropHead ⇒ + buffer.dropHead() + grabAndPull(true) + case DropTail ⇒ + buffer.dropTail() + grabAndPull(true) + case DropNew ⇒ + grab(in) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) - } + case DropBuffer ⇒ + buffer.clear() + grabAndPull(true) + case Fail ⇒ + failStage(new BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!")) + case Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") } - - def grabAndPull(pullCondition: Boolean): Unit = { - buffer.enqueue((System.nanoTime(), grab(in))) - if (pullCondition) pull(in) + else { + grabAndPull(strategy != Backpressure || buffer.used < size - 1) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) } + } - override def onUpstreamFinish(): Unit = { - if (isAvailable(out) && isTimerActive(timerName)) willStop = true - else completeStage() - } - }) + def grabAndPull(pullCondition: Boolean): Unit = { + buffer.enqueue((System.nanoTime(), grab(in))) + if (pullCondition) pull(in) + } - setHandler(out, new OutHandler { - override def onPull(): Unit = { - if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) - push(out, buffer.dequeue()._2) + override def onUpstreamFinish(): Unit = { + if (isAvailable(out) && isTimerActive(timerName)) willStop = true + else completeStage() + } - if (!willStop && !hasBeenPulled(in)) pull(in) - completeIfReady() - } - }) + def onPull(): Unit = { + if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) + push(out, buffer.dequeue()._2) + + if (!willStop && !hasBeenPulled(in)) pull(in) + completeIfReady() + } + + setHandler(in, this) + setHandler(out, this) def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage() @@ -1388,14 +1381,12 @@ final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy) final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - setHandler(in, new InHandler { - override def onPush(): Unit = push(out, grab(in)) - }) + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { + def onPush(): Unit = push(out, grab(in)) + def onPull(): Unit = pull(in) - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - }) + setHandler(in, this) + setHandler(out, this) final override protected def onTimer(key: Any): Unit = completeStage() @@ -1407,19 +1398,19 @@ final class TakeWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraph } final class DropWithin[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) with InHandler with OutHandler { private var allow = false - setHandler(in, new InHandler { - override def onPush(): Unit = - if (allow) push(out, grab(in)) - else pull(in) - }) + def onPush(): Unit = { + if (allow) push(out, grab(in)) + else pull(in) + } - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - }) + def onPull(): Unit = pull(in) + + setHandler(in, this) + setHandler(out, this) final override protected def onTimer(key: Any): Unit = allow = true @@ -1466,6 +1457,7 @@ final class Reduce[T](val f: (T, T) ⇒ T) extends SimpleLinearGraphStage[T] { setHandler(out, self) } + override def toString = "Reduce" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 43cc221160..1ce1a03a2a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -32,7 +32,6 @@ final class FlattenMerge[T, M](val breadth: Int) extends GraphStage[FlowShape[Gr override val shape = FlowShape(in, out) override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { - var sources = Set.empty[SubSinkInlet[T]] def activeSources = sources.size diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala index 1a0f75563f..988f239815 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/ByteStringParser.scala @@ -22,7 +22,7 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By override def initialAttributes = Attributes.name("ByteStringParser") final override val shape = FlowShape(bytesIn, objOut) - class ParsingLogic extends GraphStageLogic(shape) { + class ParsingLogic extends GraphStageLogic(shape) with InHandler { var pullOnParserRequest = false override def preStart(): Unit = pull(bytesIn) setHandler(objOut, eagerTerminateOutput) @@ -58,16 +58,18 @@ private[akka] abstract class ByteStringParser[T] extends GraphStage[FlowShape[By if (cont) doParse() } else pull(bytesIn) - setHandler(bytesIn, new InHandler { - override def onPush(): Unit = { - pullOnParserRequest = false - buffer ++= grab(bytesIn) - doParse() - } - override def onUpstreamFinish(): Unit = - if (buffer.isEmpty && acceptUpstreamFinish) completeStage() - else current.onTruncation() - }) + def onPush(): Unit = { + pullOnParserRequest = false + buffer ++= grab(bytesIn) + doParse() + } + + override def onUpstreamFinish(): Unit = { + if (buffer.isEmpty && acceptUpstreamFinish) completeStage() + else current.onTruncation() + } + + setHandler(bytesIn, this) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index b953a08cea..ea49e954dd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -46,7 +46,7 @@ final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) ex val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 2) - val logic = new GraphStageLogic(shape) with StageWithCallback { + val logic = new GraphStageLogic(shape) with StageWithCallback with InHandler { private val callback: AsyncCallback[AdapterToStageMessage] = getAsyncCallback { @@ -65,23 +65,26 @@ final private[stream] class InputStreamSinkStage(readTimeout: FiniteDuration) ex pull(in) } - setHandler(in, new InHandler { - override def onPush(): Unit = { - //1 is buffer for Finished or Failed callback - require(dataQueue.remainingCapacity() > 1) - dataQueue.add(Data(grab(in))) - if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed() - } - override def onUpstreamFinish(): Unit = { - dataQueue.add(Finished) - completeStage() - } - override def onUpstreamFailure(ex: Throwable): Unit = { - dataQueue.add(Failed(ex)) - failStage(ex) - } - }) + def onPush(): Unit = { + //1 is buffer for Finished or Failed callback + require(dataQueue.remainingCapacity() > 1) + dataQueue.add(Data(grab(in))) + if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed() + } + + override def onUpstreamFinish(): Unit = { + dataQueue.add(Finished) + completeStage() + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + dataQueue.add(Failed(ex)) + failStage(ex) + } + + setHandler(in, this) } + (logic, new InputStreamAdapter(dataQueue, logic.wakeUp, readTimeout)) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala index 28d6ab58dd..8c924f90fe 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala @@ -266,7 +266,7 @@ final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerCl val out: Outlet[T] = Outlet[T]("Interleave.out") override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { private var counter = 0 private var currentUpstreamIndex = 0 private var runningUpstreams = inputPorts @@ -315,9 +315,10 @@ final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerCl }) } - setHandler(out, new OutHandler { - override def onPull(): Unit = if (!hasBeenPulled(currentUpstream)) tryPull(currentUpstream) - }) + def onPull(): Unit = + if (!hasBeenPulled(currentUpstream)) tryPull(currentUpstream) + + setHandler(out, this) } override def toString = "Interleave" @@ -405,30 +406,30 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends override def initialAttributes = DefaultAttributes.broadcast override val shape: UniformFanOutShape[T, T] = UniformFanOutShape(in, out: _*) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler { private var pendingCount = outputPorts private val pending = Array.fill[Boolean](outputPorts)(true) private var downstreamsRunning = outputPorts - setHandler(in, new InHandler { - override def onPush(): Unit = { - pendingCount = downstreamsRunning - val elem = grab(in) + def onPush(): Unit = { + pendingCount = downstreamsRunning + val elem = grab(in) - var idx = 0 - val itr = out.iterator + var idx = 0 + val itr = out.iterator - while (itr.hasNext) { - val o = itr.next() - val i = idx - if (!isClosed(o)) { - push(o, elem) - pending(i) = true - } - idx += 1 + while (itr.hasNext) { + val o = itr.next() + val i = idx + if (!isClosed(o)) { + push(o, elem) + pending(i) = true } + idx += 1 } - }) + } + + setHandler(in, this) private def tryPull(): Unit = if (pendingCount == 0 && !hasBeenPulled(in)) pull(in) @@ -502,36 +503,35 @@ final class Partition[T](val outputPorts: Int, val partitioner: T ⇒ Int) exten val out: Seq[Outlet[T]] = Seq.tabulate(outputPorts)(i ⇒ Outlet[T]("Partition.out" + i)) override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler { private var outPendingElem: Any = null private var outPendingIdx: Int = _ private var downstreamRunning = outputPorts - setHandler(in, new InHandler { - override def onPush() = { - val elem = grab(in) - val idx = partitioner(elem) - if (idx < 0 || idx >= outputPorts) - failStage(PartitionOutOfBoundsException(s"partitioner must return an index in the range [0,${outputPorts - 1}]. returned: [$idx] for input [${elem.getClass.getName}].")) - else if (!isClosed(out(idx))) { - if (isAvailable(out(idx))) { - push(out(idx), elem) - if (out.exists(isAvailable(_))) - pull(in) - } else { - outPendingElem = elem - outPendingIdx = idx - } + def onPush() = { + val elem = grab(in) + val idx = partitioner(elem) + if (idx < 0 || idx >= outputPorts) { + failStage(PartitionOutOfBoundsException(s"partitioner must return an index in the range [0,${outputPorts - 1}]. returned: [$idx] for input [${elem.getClass.getName}].")) + } else if (!isClosed(out(idx))) { + if (isAvailable(out(idx))) { + push(out(idx), elem) + if (out.exists(isAvailable(_))) + pull(in) + } else { + outPendingElem = elem + outPendingIdx = idx + } - } else if (out.exists(isAvailable(_))) - pull(in) - } + } else if (out.exists(isAvailable(_))) + pull(in) + } - override def onUpstreamFinish(): Unit = { - if (outPendingElem == null) - completeStage() - } - }) + override def onUpstreamFinish(): Unit = { + if (outPendingElem == null) completeStage() + } + + setHandler(in, this) out.zipWithIndex.foreach { case (o, idx) ⇒ @@ -610,7 +610,7 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean) override def initialAttributes = DefaultAttributes.balance override val shape: UniformFanOutShape[T, T] = UniformFanOutShape[T, T](in, out: _*) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler { private val pendingQueue = FixedSizeBuffer[Outlet[T]](outputPorts) private def noPending: Boolean = pendingQueue.isEmpty @@ -633,9 +633,8 @@ final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean) } } - setHandler(in, new InHandler { - override def onPush(): Unit = dequeueAndDispatch() - }) + def onPush(): Unit = dequeueAndDispatch() + setHandler(in, this) out.foreach { o ⇒ setHandler(o, new OutHandler { @@ -802,7 +801,7 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[ def out = shape.out val inSeq = shape.inSeq - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with OutHandler { var pending = 0 // Without this field the completion signalling would take one extra pull var willShutDown = false @@ -835,16 +834,15 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] ⇒ O)(n: Int) extends GraphStage[ }) }) - setHandler(out, new OutHandler { - override def onPull(): Unit = { - pending += n - if (pending == 0) pushAll() - } - }) + def onPull(): Unit = { + pending += n + if (pending == 0) pushAll() + } + + setHandler(out, this) } override def toString = "ZipWithN" - } object Concat { @@ -877,7 +875,7 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[ override def initialAttributes = DefaultAttributes.concat override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*) - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with OutHandler { var activeStream: Int = 0 { @@ -905,9 +903,9 @@ final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[ } } - setHandler(out, new OutHandler { - override def onPull() = pull(in(activeStream)) - }) + def onPull() = pull(in(activeStream)) + + setHandler(out, this) } override def toString: String = s"Concat($inputPorts)" diff --git a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala index aa44f6859d..865eb31462 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/Stage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/Stage.scala @@ -49,7 +49,7 @@ object AbstractStage { private var currentStage: AbstractStage[In, Out, Directive, Directive, Context[Out], LifecycleContext] = stage { - // No need to refer to the handle in a private val + // No need to refer to the handler in a private val val handler = new InHandler with OutHandler { override def onPush(): Unit = try { currentStage.onPush(grab(shape.in), ctx) } catch { case NonFatal(ex) ⇒ onSupervision(ex) }