diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncSpec.scala similarity index 87% rename from akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapFutureSpec.scala rename to akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncSpec.scala index 316839aa58..e95fb96006 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncSpec.scala @@ -14,16 +14,16 @@ import scala.concurrent.Await import scala.concurrent.Future @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class FlowMapFutureSpec extends AkkaSpec { +class FlowMapAsyncSpec extends AkkaSpec { implicit val materializer = FlowMaterializer() - "A Flow with mapFuture" must { + "A Flow with mapAsync" must { "produce future elements" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = FlowFrom(1 to 3).mapFuture(n ⇒ Future(n)).publishTo(c) + val p = FlowFrom(1 to 3).mapAsync(n ⇒ Future(n)).publishTo(c) val sub = c.expectSubscription() sub.request(2) c.expectNext(1) @@ -37,7 +37,7 @@ class FlowMapFutureSpec extends AkkaSpec { "produce future elements in order" in { val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = FlowFrom(1 to 50).mapFuture(n ⇒ Future { + val p = FlowFrom(1 to 50).mapAsync(n ⇒ Future { Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) n }).publishTo(c) @@ -51,7 +51,7 @@ class FlowMapFutureSpec extends AkkaSpec { val probe = TestProbe() val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = FlowFrom(1 to 20).mapFuture(n ⇒ Future { + val p = FlowFrom(1 to 20).mapAsync(n ⇒ Future { probe.ref ! n n }).publishTo(c) @@ -76,7 +76,7 @@ class FlowMapFutureSpec extends AkkaSpec { val latch = TestLatch(1) val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = FlowFrom(1 to 5).mapFuture(n ⇒ Future { + val p = FlowFrom(1 to 5).mapAsync(n ⇒ Future { if (n == 3) throw new RuntimeException("err1") with NoStackTrace else { Await.ready(latch, 10.seconds) @@ -89,11 +89,11 @@ class FlowMapFutureSpec extends AkkaSpec { latch.countDown() } - "signal error from mapFuture" in { + "signal error from mapAsync" in { val latch = TestLatch(1) val c = StreamTestKit.SubscriberProbe[Int]() implicit val ec = system.dispatcher - val p = FlowFrom(1 to 5).mapFuture(n ⇒ + val p = FlowFrom(1 to 5).mapAsync(n ⇒ if (n == 3) throw new RuntimeException("err2") with NoStackTrace else { Future { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncUnorderedSpec.scala new file mode 100644 index 0000000000..d408511901 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowMapAsyncUnorderedSpec.scala @@ -0,0 +1,106 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.util.control.NoStackTrace +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.testkit.TestProbe +import akka.testkit.TestLatch +import scala.concurrent.Await +import scala.concurrent.Future + +class FlowMapAsyncUnorderedSpec extends AkkaSpec { + + implicit val materializer = FlowMaterializer() + + "A Flow with mapAsyncUnordered" must { + + "produce future elements in the order they are ready" in { + val c = StreamTestKit.SubscriberProbe[Int]() + implicit val ec = system.dispatcher + val latch = (1 to 4).map(_ -> TestLatch(1)).toMap + val p = FlowFrom(1 to 4).mapAsyncUnordered(n ⇒ Future { + Await.ready(latch(n), 5.seconds) + n + }).publishTo(c) + val sub = c.expectSubscription() + sub.request(5) + latch(2).countDown() + c.expectNext(2) + latch(4).countDown() + c.expectNext(4) + latch(3).countDown() + c.expectNext(3) + latch(1).countDown() + c.expectNext(1) + c.expectComplete() + } + + "not run more futures than requested elements" in { + val probe = TestProbe() + val c = StreamTestKit.SubscriberProbe[Int]() + implicit val ec = system.dispatcher + val p = FlowFrom(1 to 20).mapAsyncUnordered(n ⇒ Future { + probe.ref ! n + n + }).publishTo(c) + val sub = c.expectSubscription() + // nothing before requested + probe.expectNoMsg(500.millis) + sub.request(1) + val elem1 = probe.expectMsgType[Int] + probe.expectNoMsg(500.millis) + sub.request(2) + val elem2 = probe.expectMsgType[Int] + val elem3 = probe.expectMsgType[Int] + probe.expectNoMsg(500.millis) + sub.request(100) + (probe.receiveN(17).toSet + elem1 + elem2 + elem3) should be((1 to 20).toSet) + probe.expectNoMsg(200.millis) + + c.probe.receiveN(20).toSet should be((1 to 20).map(StreamTestKit.OnNext.apply).toSet) + c.expectComplete() + } + + "signal future failure" in { + val latch = TestLatch(1) + val c = StreamTestKit.SubscriberProbe[Int]() + implicit val ec = system.dispatcher + val p = FlowFrom(1 to 5).mapAsyncUnordered(n ⇒ Future { + if (n == 3) throw new RuntimeException("err1") with NoStackTrace + else { + Await.ready(latch, 10.seconds) + n + } + }).publishTo(c) + val sub = c.expectSubscription() + sub.request(10) + c.expectError.getMessage should be("err1") + latch.countDown() + } + + "signal error from mapAsyncUnordered" in { + val latch = TestLatch(1) + val c = StreamTestKit.SubscriberProbe[Int]() + implicit val ec = system.dispatcher + val p = FlowFrom(1 to 5).mapAsync(n ⇒ + if (n == 3) throw new RuntimeException("err2") with NoStackTrace + else { + Future { + Await.ready(latch, 10.seconds) + n + } + }). + publishTo(c) + val sub = c.expectSubscription() + sub.request(10) + c.expectError.getMessage should be("err2") + latch.countDown() + } + + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index a81fe87e0b..b5cf030242 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -32,7 +32,7 @@ private[akka] object ActorProcessor { case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy)) case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) case ConcatAll ⇒ Props(new ConcatAllImpl(settings)) - case m: MapFuture ⇒ Props(new MapFutureProcessorImpl(settings, m.f)) + case m: MapFuture ⇒ Props(new MapAsyncProcessorImpl(settings, m.f)) }).withDispatcher(settings.dispatcher) def apply[I, O](impl: ActorRef): ActorProcessor[I, O] = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Emit.scala b/akka-stream/src/main/scala/akka/stream/impl/Emit.scala new file mode 100644 index 0000000000..f757d6910e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/Emit.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.collection.immutable + +/** + * INTERNAL API + */ +private[akka] trait Emit { this: ActorProcessorImpl with Pump ⇒ + + // TODO performance improvement: mutable buffer? + var emits = immutable.Seq.empty[Any] + + // Save previous phase we should return to in a var to avoid allocation + private var phaseAfterFlush: TransferPhase = _ + + // Enters flushing phase if there are emits pending + def emitAndThen(andThen: TransferPhase): Unit = + if (emits.nonEmpty) { + phaseAfterFlush = andThen + nextPhase(emitting) + } else nextPhase(andThen) + + // Emits all pending elements, then returns to savedPhase + private val emitting = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + primaryOutputs.enqueueOutputElement(emits.head) + emits = emits.tail + if (emits.isEmpty) nextPhase(phaseAfterFlush) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala similarity index 80% rename from akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala rename to akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala index 37e7aaa3e1..3e62562db2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MapAsyncProcessorImpl.scala @@ -15,7 +15,7 @@ import scala.annotation.tailrec /** * INTERNAL API */ -private[akka] object MapFutureProcessorImpl { +private[akka] object MapAsyncProcessorImpl { object FutureElement { implicit val ordering: Ordering[FutureElement] = new Ordering[FutureElement] { @@ -32,15 +32,13 @@ private[akka] object MapFutureProcessorImpl { /** * INTERNAL API */ -private[akka] class MapFutureProcessorImpl(_settings: MaterializerSettings, f: Any ⇒ Future[Any]) extends ActorProcessorImpl(_settings) { - import MapFutureProcessorImpl._ +private[akka] class MapAsyncProcessorImpl(_settings: MaterializerSettings, f: Any ⇒ Future[Any]) + extends ActorProcessorImpl(_settings) with Emit { + import MapAsyncProcessorImpl._ // Execution context for pipeTo and friends import context.dispatcher - // TODO performance improvement: mutable buffer? - var emits = immutable.Seq.empty[Any] - var submittedSeqNo = 0L var doneSeqNo = 0L def gap: Long = submittedSeqNo - doneSeqNo @@ -134,23 +132,6 @@ private[akka] class MapFutureProcessorImpl(_settings: MaterializerSettings, f: A } } - // Save previous phase we should return to in a var to avoid allocation - var phaseAfterFlush: TransferPhase = _ - - // Enters flushing phase if there are emits pending - def emitAndThen(andThen: TransferPhase): Unit = - if (emits.nonEmpty) { - phaseAfterFlush = andThen - nextPhase(emitting) - } else nextPhase(andThen) - - // Emits all pending elements, then returns to savedPhase - val emitting = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - primaryOutputs.enqueueOutputElement(emits.head) - emits = emits.tail - if (emits.isEmpty) nextPhase(phaseAfterFlush) - } - nextPhase(running) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index a9bb2555ed..c1eb21f90d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -13,10 +13,8 @@ import scala.util.control.NonFatal * INTERNAL API */ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, transformer: TransformerLike[Any, Any]) - extends ActorProcessorImpl(_settings) { + extends ActorProcessorImpl(_settings) with Emit { - // TODO performance improvement: mutable buffer? - var emits = immutable.Seq.empty[Any] var errorEvent: Option[Throwable] = None override def preStart(): Unit = { @@ -53,23 +51,6 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran emitAndThen(completedPhase) } - // Save previous phase we should return to in a var to avoid allocation - var phaseAfterFlush: TransferPhase = _ - - // Enters flushing phase if there are emits pending - def emitAndThen(andThen: TransferPhase): Unit = - if (emits.nonEmpty) { - phaseAfterFlush = andThen - nextPhase(emitting) - } else nextPhase(andThen) - - // Emits all pending elements, then returns to savedPhase - val emitting = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - primaryOutputs.enqueueOutputElement(emits.head) - emits = emits.tail - if (emits.isEmpty) nextPhase(phaseAfterFlush) - } - override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)" override def postStop(): Unit = try super.postStop() finally transformer.cleanup() diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 6bf7f6aabf..6a7347866b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -23,7 +23,7 @@ import akka.stream.impl.ConflateImpl import akka.stream.impl.ExpandImpl import akka.stream.impl.BufferImpl import akka.stream.impl.BlackholeSubscriber -import akka.stream.impl.MapFutureProcessorImpl +import akka.stream.impl.MapAsyncProcessorImpl /** * INTERNAL API @@ -37,8 +37,12 @@ private[akka] object Ast { case class TimerTransform(name: String, mkTransformer: () ⇒ TimerTransformer[Any, Any]) extends AstNode - case class MapFuture(f: Any ⇒ Future[Any]) extends AstNode { - override def name = "mapFuture" + case class MapAsync(f: Any ⇒ Future[Any]) extends AstNode { + override def name = "mapAsync" + } + + case class MapAsyncUnordered(f: Any ⇒ Future[Any]) extends AstNode { + override def name = "mapAsyncUnordered" } case class GroupBy(f: Any ⇒ Any) extends AstNode { @@ -290,16 +294,17 @@ private[akka] object ActorProcessorFactory { def props(materializer: FlowMaterializer, op: AstNode): Props = { val settings = materializer.settings (op match { - case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) - case t: TimerTransform ⇒ Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer())) - case m: MapFuture ⇒ Props(new MapFutureProcessorImpl(settings, m.f)) - case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) - case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) - case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) - case ConcatAll ⇒ Props(new ConcatAllImpl(materializer)) - case cf: Conflate ⇒ Props(new ConflateImpl(settings, cf.seed, cf.aggregate)) - case ex: Expand ⇒ Props(new ExpandImpl(settings, ex.seed, ex.extrapolate)) - case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy)) + case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) + case t: TimerTransform ⇒ Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer())) + case m: MapAsync ⇒ Props(new MapAsyncProcessorImpl(settings, m.f)) + case m: MapAsyncUnordered ⇒ Props(new MapAsyncUnorderedProcessorImpl(settings, m.f)) + case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) + case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) + case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) + case ConcatAll ⇒ Props(new ConcatAllImpl(materializer)) + case cf: Conflate ⇒ Props(new ConflateImpl(settings, cf.seed, cf.aggregate)) + case ex: Expand ⇒ Props(new ExpandImpl(settings, ex.seed, ex.extrapolate)) + case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy)) }).withDispatcher(settings.dispatcher) } diff --git a/akka-stream/src/main/scala/akka/stream/impl2/MapAsyncUnorderedProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl2/MapAsyncUnorderedProcessorImpl.scala new file mode 100644 index 0000000000..3a52a5296d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl2/MapAsyncUnorderedProcessorImpl.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl2 + +import scala.collection.immutable +import scala.concurrent.Future +import scala.util.control.NonFatal +import akka.stream.MaterializerSettings +import akka.stream.impl.ActorProcessorImpl +import akka.stream.impl.Emit +import akka.stream.impl.TransferPhase +import akka.stream.impl.TransferState +import akka.pattern.pipe + +/** + * INTERNAL API + */ +private[akka] object MapAsyncUnorderedProcessorImpl { + case class FutureElement(element: Any) + case class FutureFailure(cause: Throwable) +} + +/** + * INTERNAL API + */ +private[akka] class MapAsyncUnorderedProcessorImpl(_settings: MaterializerSettings, f: Any ⇒ Future[Any]) + extends ActorProcessorImpl(_settings) with Emit { + import MapAsyncUnorderedProcessorImpl._ + + // Execution context for pipeTo and friends + import context.dispatcher + + var inProgressCount = 0 + + override def activeReceive = futureReceive orElse super.activeReceive + + def futureReceive: Receive = { + case FutureElement(element) ⇒ + inProgressCount -= 1 + emits = List(element) + emitAndThen(running) + pump() + + case FutureFailure(cause) ⇒ + fail(cause) + } + + override def onError(e: Throwable): Unit = { + // propagate upstream error immediately + fail(e) + } + + object RunningPhaseCondition extends TransferState { + def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandCount - inProgressCount > 0) || + (primaryInputs.inputsDepleted && inProgressCount == 0) + def isCompleted = false + } + + val running: TransferPhase = TransferPhase(RunningPhaseCondition) { () ⇒ + if (primaryInputs.inputsDepleted) { + emitAndThen(completedPhase) + } else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - inProgressCount > 0) { + val elem = primaryInputs.dequeueInputElement() + inProgressCount += 1 + try { + f(elem).map(FutureElement.apply).recover { + case err ⇒ FutureFailure(err) + }.pipeTo(self) + } catch { + case NonFatal(err) ⇒ + // f threw, propagate error immediately + fail(err) + } + emitAndThen(running) + } + } + + nextPhase(running) + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index a7d182f25e..8cf2a6237f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -73,9 +73,24 @@ trait FlowOps[-In, +Out] { * element that will be emitted downstream. As many futures as requested elements by * downstream may run in parallel and may complete in any order, but the elements that * are emitted downstream are in the same order as from upstream. + * + * @see [[#mapAsyncUnordered]] */ - def mapFuture[T](f: Out ⇒ Future[T]): Repr[In, T] = - andThen(MapFuture(f.asInstanceOf[Any ⇒ Future[Any]])) + def mapAsync[T](f: Out ⇒ Future[T]): Repr[In, T] = + andThen(MapAsync(f.asInstanceOf[Any ⇒ Future[Any]])) + + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and each processed element will be emitted dowstream + * as soon as it is ready, i.e. it is possible that the elements are not emitted downstream + * in the same order as from upstream. + * + * @see [[#mapAsync]] + */ + def mapAsyncUnordered[T](f: Out ⇒ Future[T]): Repr[In, T] = + andThen(MapAsyncUnordered(f.asInstanceOf[Any ⇒ Future[Any]])) /** * Only pass on those elements that satisfy the given predicate.