diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala index fd43f798da..313fd6d208 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMonitorSpec.scala @@ -1,6 +1,6 @@ /** - * Copyright (C) 2015-2016 Lightbend Inc. - */ + * Copyright (C) 2015-2016 Lightbend Inc. + */ package akka.stream.scaladsl import akka.stream.testkit.scaladsl.{ TestSource, TestSink } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 71a8a5cd00..a041ec56b6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -43,8 +43,8 @@ private[akka] final case class Filter[T](p: T ⇒ Boolean, decider: Supervision. } /** - * INTERNAL API - */ + * INTERNAL API + */ private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] { override def initialAttributes: Attributes = DefaultAttributes.takeWhile @@ -67,7 +67,7 @@ private[akka] final case class TakeWhile[T](p: T ⇒ Boolean) extends SimpleLine } catch { case NonFatal(ex) ⇒ decider(ex) match { case Supervision.Stop ⇒ failStage(ex) - case _ ⇒ pull(in) + case _ ⇒ pull(in) } } } @@ -699,7 +699,15 @@ private[akka] final class Expand[In, Out](extrapolate: In ⇒ Iterator[Out]) ext * INTERNAL API */ private[akka] object MapAsync { - final class Holder[T](var elem: T) + final class Holder[T](var elem: Try[T], val cb: AsyncCallback[Holder[T]]) extends (Try[T] ⇒ Unit) { + override def apply(t: Try[T]): Unit = { + elem = t match { + case Success(null) ⇒ Failure[T](ReactiveStreamsCompliance.elementMustNotBeNullException) + case other ⇒ other + } + cb.invoke(this) + } + } val NotYetThere = Failure(new Exception) } @@ -711,78 +719,60 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut import MapAsync._ - private val in = Inlet[In]("in") - private val out = Outlet[Out]("out") + private val in = Inlet[In]("MapAsync.in") + private val out = Outlet[Out]("MapAsync.out") override def initialAttributes = DefaultAttributes.mapAsync override val shape = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { - override def toString = s"MapAsync.Logic(buffer=$buffer)" + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def toString = s"MapAsync.Logic(buffer=$buffer)" - //FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync? - lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + //FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync? + lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + var buffer: BufferImpl[Holder[Out]] = _ + val futureCB = + getAsyncCallback[Holder[Out]]( + _.elem match { + case Failure(e) if decider(e) == Supervision.Stop ⇒ failStage(e) + case _ ⇒ if (isAvailable(out)) pushOne() + }) - var buffer: BufferImpl[Holder[Try[Out]]] = _ - def todo = buffer.used + private[this] def todo = buffer.used - override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) + override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) - @tailrec private def pushOne(): Unit = - if (buffer.isEmpty) { - if (isClosed(in)) completeStage() - else if (!hasBeenPulled(in)) pull(in) - } else if (buffer.peek.elem == NotYetThere) { - if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) - } else buffer.dequeue().elem match { - case Failure(ex) ⇒ pushOne() - case Success(elem) ⇒ - push(out, elem) + @tailrec private def pushOne(): Unit = + if (buffer.isEmpty) { + if (isClosed(in)) completeStage() + else if (!hasBeenPulled(in)) pull(in) + } else if (buffer.peek.elem == NotYetThere) { if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) - } + } else buffer.dequeue().elem match { + case Success(elem) ⇒ + push(out, elem) + if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) + case Failure(ex) ⇒ pushOne() + } - def failOrPull(holder: Holder[Try[Out]], f: Failure[Out]) = - if (decider(f.exception) == Supervision.Stop) failStage(f.exception) - else { - holder.elem = f - if (isAvailable(out)) pushOne() - } - - val futureCB = - getAsyncCallback[(Holder[Try[Out]], Try[Out])]({ - case (holder, f: Failure[_]) ⇒ failOrPull(holder, f) - case (holder, s @ Success(elem)) ⇒ - if (elem == null) { - val ex = ReactiveStreamsCompliance.elementMustNotBeNullException - failOrPull(holder, Failure(ex)) - } else { - holder.elem = s - if (isAvailable(out)) pushOne() - } - }) - - setHandler(in, new InHandler { override def onPush(): Unit = { try { val future = f(grab(in)) - val holder = new Holder[Try[Out]](NotYetThere) + val holder = new Holder[Out](NotYetThere, futureCB) buffer.enqueue(holder) - future.onComplete(result ⇒ futureCB.invoke(holder -> result))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) } catch { - case NonFatal(ex) ⇒ - if (decider(ex) == Supervision.Stop) failStage(ex) + case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex) } if (todo < parallelism) tryPull(in) } - override def onUpstreamFinish(): Unit = { - if (todo == 0) completeStage() - } - }) + override def onUpstreamFinish(): Unit = if (todo == 0) completeStage() - setHandler(out, new OutHandler { override def onPull(): Unit = pushOne() - }) - } + + setHandlers(in, out, this) + } } /** @@ -791,71 +781,67 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: In ⇒ Future[Out]) extends GraphStage[FlowShape[In, Out]] { - private val in = Inlet[In]("in") - private val out = Outlet[Out]("out") + private val in = Inlet[In]("MapAsyncUnordered.in") + private val out = Outlet[Out]("MapAsyncUnordered.out") override def initialAttributes = DefaultAttributes.mapAsyncUnordered override val shape = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { - override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)" + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + override def toString = s"MapAsyncUnordered.Logic(inFlight=$inFlight, buffer=$buffer)" - val decider = - inheritedAttributes.get[SupervisionStrategy] - .map(_.decider).getOrElse(Supervision.stoppingDecider) + val decider = + inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - var inFlight = 0 - var buffer: BufferImpl[Out] = _ - def todo = inFlight + buffer.used + var inFlight = 0 + var buffer: BufferImpl[Out] = _ + private[this] def todo = inFlight + buffer.used - override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) + override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) - def failOrPull(ex: Throwable) = - if (decider(ex) == Supervision.Stop) failStage(ex) - else if (isClosed(in) && todo == 0) completeStage() - else if (!hasBeenPulled(in)) tryPull(in) + private val futureCB = + getAsyncCallback((result: Try[Out]) ⇒ { + inFlight -= 1 + result match { + case Success(elem) if elem != null ⇒ + if (isAvailable(out)) { + if (!hasBeenPulled(in)) tryPull(in) + push(out, elem) + } else buffer.enqueue(elem) + case other ⇒ + val ex = other match { + case Failure(t) ⇒ t + case Success(s) if s == null ⇒ ReactiveStreamsCompliance.elementMustNotBeNullException + } + if (decider(ex) == Supervision.Stop) failStage(ex) + else if (isClosed(in) && todo == 0) completeStage() + else if (!hasBeenPulled(in)) tryPull(in) + } + }).invoke _ - val futureCB = - getAsyncCallback((result: Try[Out]) ⇒ { - inFlight -= 1 - result match { - case Failure(ex) ⇒ failOrPull(ex) - case Success(elem) ⇒ - if (elem == null) { - val ex = ReactiveStreamsCompliance.elementMustNotBeNullException - failOrPull(ex) - } else if (isAvailable(out)) { - if (!hasBeenPulled(in)) tryPull(in) - push(out, elem) - } else buffer.enqueue(elem) - } - }).invoke _ - - setHandler(in, new InHandler { override def onPush(): Unit = { try { val future = f(grab(in)) inFlight += 1 future.onComplete(futureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) } catch { - case NonFatal(ex) ⇒ - if (decider(ex) == Supervision.Stop) failStage(ex) + case NonFatal(ex) ⇒ if (decider(ex) == Supervision.Stop) failStage(ex) } if (todo < parallelism) tryPull(in) } override def onUpstreamFinish(): Unit = { if (todo == 0) completeStage() } - }) - setHandler(out, new OutHandler { override def onPull(): Unit = { if (!buffer.isEmpty) push(out, buffer.dequeue()) else if (isClosed(in) && todo == 0) completeStage() if (todo < parallelism && !hasBeenPulled(in)) tryPull(in) } - }) - } + + setHandlers(in, out, this) + } } /**