diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index e172552a01..46899803e5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -35,6 +35,9 @@ private[akka] object Ast { case class Transform(transformer: Transformer[Any, Any]) extends AstNode { override def name = transformer.name } + case class MapFuture(f: Any ⇒ Future[Any]) extends AstNode { + override def name = "mapFuture" + } case class GroupBy(f: Any ⇒ Any) extends AstNode { override def name = "groupBy" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index b97493dde4..528de30d25 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -38,6 +38,7 @@ private[akka] object ActorProcessor { case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy)) case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) case ConcatAll ⇒ Props(new ConcatAllImpl(settings)) + case m: MapFuture ⇒ Props(new MapFutureProcessorImpl(settings, m.f)) }).withDispatcher(settings.dispatcher) } @@ -165,7 +166,8 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu private var downstreamBufferSpace = 0 private var downstreamCompleted = false - def demandAvailable = downstreamBufferSpace > 0 + override def demandAvailable = downstreamBufferSpace > 0 + def demandCount: Int = downstreamBufferSpace override val subreceive = new SubReceive(waitingExposedPublisher) @@ -244,7 +246,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e) } - protected val primaryOutputs: Outputs = + protected val primaryOutputs: FanoutOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, this) { override def afterShutdown(): Unit = { primaryOutputsShutdown = true diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 8b4a46e7d7..03c0e73a1b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -152,6 +152,9 @@ private[akka] trait Builder[Out] { override def name = "map" }) + def mapFuture[U](f: Out ⇒ Future[U]): Thing[U] = + andThen(MapFuture(f.asInstanceOf[Any ⇒ Future[Any]])) + def filter(p: Out ⇒ Boolean): Thing[Out] = transform(new Transformer[Out, Out] { override def onNext(in: Out) = if (p(in)) List(in) else Nil diff --git a/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala new file mode 100644 index 0000000000..600d54f49b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/MapFutureProcessorImpl.scala @@ -0,0 +1,156 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.collection.mutable +import scala.collection.immutable +import scala.collection.immutable.TreeSet +import scala.concurrent.Future +import scala.util.control.NonFatal +import akka.stream.MaterializerSettings +import akka.pattern.pipe +import scala.annotation.tailrec + +/** + * INTERNAL API + */ +private[akka] object MapFutureProcessorImpl { + + object FutureElement { + implicit val ordering: Ordering[FutureElement] = new Ordering[FutureElement] { + def compare(a: FutureElement, b: FutureElement): Int = { + a.seqNo compare b.seqNo + } + } + } + + case class FutureElement(seqNo: Long, element: Any) + case class FutureFailure(cause: Throwable) +} + +/** + * INTERNAL API + */ +private[akka] class MapFutureProcessorImpl(_settings: MaterializerSettings, f: Any ⇒ Future[Any]) extends ActorProcessorImpl(_settings) { + import MapFutureProcessorImpl._ + + // Execution context for pipeTo and friends + import context.dispatcher + + // TODO performance improvement: mutable buffer? + var emits = immutable.Seq.empty[Any] + + var submittedSeqNo = 0L + var doneSeqNo = 0L + def gap: Long = submittedSeqNo - doneSeqNo + + // TODO performance improvement: explore Endre's proposal of using an array based ring buffer addressed by + // seqNo & Mask and explicitly storing a Gap object to denote missing pieces instead of the sorted set + + // keep future results arriving too early in a buffer sorted by seqNo + var orderedBuffer = TreeSet.empty[FutureElement] + + override def receive = futureReceive orElse super.receive + + def drainBuffer(): List[Any] = { + + // this is mutable for speed + var n = 0 + var elements = mutable.ListBuffer.empty[Any] + var error: Option[Throwable] = None + val iter = orderedBuffer.iterator + @tailrec def split(): Unit = + if (iter.hasNext) { + val next = iter.next() + val inOrder = next.seqNo == (doneSeqNo + 1) + // stop at first missing seqNo + if (inOrder) { + n += 1 + doneSeqNo = next.seqNo + elements += next.element + split() + } + } + + split() + orderedBuffer = orderedBuffer.drop(n) + elements.toList + } + + def futureReceive: Receive = { + case fe @ FutureElement(seqNo, element) ⇒ + if (seqNo == (doneSeqNo + 1)) { + // successful element for the next sequence number + // emit that element and all elements from the buffer that are in order + // until next missing sequence number + doneSeqNo = seqNo + if (orderedBuffer.isEmpty) { + emits = List(element) + } else { + val fromBuffer = drainBuffer() + emits = element :: fromBuffer + } + emitAndThen(running) + pump() + } else { + assert(seqNo > doneSeqNo, s"Unexpected sequence number [$seqNo], expected seqNo > $doneSeqNo") + // out of order, buffer until missing elements arrive + orderedBuffer += fe + } + + case FutureFailure(cause) ⇒ + fail(cause) + } + + override def onError(e: Throwable): Unit = { + // propagate upstream error immediately + fail(e) + } + + object RunningPhaseCondition extends TransferState { + def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) || + (primaryInputs.inputsDepleted && gap == 0) + def isCompleted = false + } + + val running: TransferPhase = TransferPhase(RunningPhaseCondition) { () ⇒ + if (primaryInputs.inputsDepleted) { + emitAndThen(completedPhase) + } else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) { + val elem = primaryInputs.dequeueInputElement() + submittedSeqNo += 1 + val seqNo = submittedSeqNo + try { + f(elem).map(FutureElement(seqNo, _)).recover { + case err ⇒ FutureFailure(err) + }.pipeTo(self) + } catch { + case NonFatal(err) ⇒ + // f threw, propagate error immediately + fail(err) + } + emitAndThen(running) + } + } + + // Save previous phase we should return to in a var to avoid allocation + var phaseAfterFlush: TransferPhase = _ + + // Enters flushing phase if there are emits pending + def emitAndThen(andThen: TransferPhase): Unit = + if (emits.nonEmpty) { + phaseAfterFlush = andThen + nextPhase(emitting) + } else nextPhase(andThen) + + // Emits all pending elements, then returns to savedPhase + val emitting = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + primaryOutputs.enqueueOutputElement(emits.head) + emits = emits.tail + if (emits.isEmpty) nextPhase(phaseAfterFlush) + } + + nextPhase(running) + +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 1d12145561..c4f7fb1175 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -19,6 +19,7 @@ import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transf import akka.stream.scaladsl.{ Duct ⇒ SDuct } import akka.stream.impl.Ast import scala.concurrent.duration.FiniteDuration +import scala.concurrent.Future /** * Java API @@ -51,6 +52,15 @@ abstract class Duct[In, Out] { */ def map[U](f: Function[Out, U]): Duct[In, U] + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and may complete in any order, but the elements that + * are emitted downstream are in the same order as from upstream. + */ + def mapFuture[U](f: Function[Out, Future[U]]): Duct[In, U] + /** * Only pass on those elements that satisfy the given predicate. */ @@ -330,6 +340,8 @@ abstract class Duct[In, Out] { private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, T] { override def map[U](f: Function[T, U]): Duct[In, U] = new DuctAdapter(delegate.map(f.apply)) + override def mapFuture[U](f: Function[T, Future[U]]): Duct[In, U] = new DuctAdapter(delegate.mapFuture(f.apply)) + override def filter(p: Predicate[T]): Duct[In, T] = new DuctAdapter(delegate.filter(p.test)) override def collect[U](pf: PartialFunction[T, U]): Duct[In, U] = new DuctAdapter(delegate.collect(pf)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index c9cd543af4..ef5a33495d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -114,6 +114,15 @@ abstract class Flow[T] { */ def map[U](f: Function[T, U]): Flow[U] + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and may complete in any order, but the elements that + * are emitted downstream are in the same order as from upstream. + */ + def mapFuture[U](f: Function[T, Future[U]]): Flow[U] + /** * Only pass on those elements that satisfy the given predicate. */ @@ -402,6 +411,8 @@ trait OnCompleteCallback { private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def map[U](f: Function[T, U]): Flow[U] = new FlowAdapter(delegate.map(f.apply)) + override def mapFuture[U](f: Function[T, Future[U]]): Flow[U] = new FlowAdapter(delegate.mapFuture(f.apply)) + override def filter(p: Predicate[T]): Flow[T] = new FlowAdapter(delegate.filter(p.test)) override def collect[U](pf: PartialFunction[T, U]): Flow[U] = new FlowAdapter(delegate.collect(pf)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 74cb076180..9d4ab36633 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -12,6 +12,7 @@ import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transf import akka.stream.impl.DuctImpl import akka.stream.impl.Ast import scala.concurrent.duration.FiniteDuration +import scala.concurrent.Future object Duct { @@ -42,6 +43,15 @@ trait Duct[In, +Out] { */ def map[U](f: Out ⇒ U): Duct[In, U] + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and may complete in any order, but the elements that + * are emitted downstream are in the same order as from upstream. + */ + def mapFuture[U](f: Out ⇒ Future[U]): Duct[In, U] + /** * Only pass on those elements that satisfy the given predicate. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b1aff5b0f9..87f4d14d2b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -110,6 +110,15 @@ trait Flow[+T] { */ def map[U](f: T ⇒ U): Flow[U] + /** + * Transform this stream by applying the given function to each of the elements + * as they pass through this processing step. The function returns a `Future` of the + * element that will be emitted downstream. As many futures as requested elements by + * downstream may run in parallel and may complete in any order, but the elements that + * are emitted downstream are in the same order as from upstream. + */ + def mapFuture[U](f: T ⇒ Future[U]): Flow[U] + /** * Only pass on those elements that satisfy the given predicate. */ diff --git a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java index cf9aefa706..3eb6b82a80 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java @@ -7,9 +7,10 @@ import org.junit.Test; import org.reactivestreams.api.Consumer; import org.reactivestreams.api.Producer; import scala.concurrent.duration.FiniteDuration; - +import scala.concurrent.Future; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.dispatch.Futures; import akka.japi.Function; import akka.japi.Function2; import akka.japi.Pair; @@ -189,4 +190,24 @@ public class DuctTest { probe.expectMsgEquals("done"); } + @Test + public void mustBeAbleToUseMapFuture() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + Consumer c = Duct.create(String.class).mapFuture(new Function>() { + public Future apply(String elem) { + return Futures.successful(elem.toUpperCase()); + } + }).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + + final java.lang.Iterable input = Arrays.asList("a", "b", "c"); + Flow.create(input).produceTo(materializer, c); + probe.expectMsgEquals("A"); + probe.expectMsgEquals("B"); + probe.expectMsgEquals("C"); + } + } diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index 8dc1cacd3d..82da8fd005 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -9,22 +9,20 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - import akka.stream.FlattenStrategy; import akka.stream.OverflowStrategy; import org.junit.ClassRule; import org.junit.Test; - import static org.junit.Assert.assertEquals; - import org.reactivestreams.api.Producer; - import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; + import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.dispatch.Futures; import akka.japi.Function; import akka.japi.Function2; import akka.japi.Pair; @@ -523,4 +521,23 @@ public class FlowTest { probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS)); } + + @Test + public void mustBeAbleToUseMapFuture() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList("a", "b", "c"); + Flow.create(input).mapFuture(new Function>() { + public Future apply(String elem) { + return Futures.successful(elem.toUpperCase()); + } + }).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }).consume(materializer); + probe.expectMsgEquals("A"); + probe.expectMsgEquals("B"); + probe.expectMsgEquals("C"); + } + } diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala new file mode 100644 index 0000000000..c50d6a7e2d --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowMapFutureSpec.scala @@ -0,0 +1,114 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.util.control.NoStackTrace +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.testkit.TestProbe +import akka.testkit.TestLatch +import scala.concurrent.Await + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowMapFutureSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings( + dispatcher = "akka.test.stream-dispatcher")) + + "A Flow with mapFuture" must { + + "produce future elements" in { + val c = StreamTestKit.consumerProbe[Int] + implicit val ec = system.dispatcher + val p = Flow(1 to 3).mapFuture(n ⇒ Future(n)).produceTo(materializer, c) + val sub = c.expectSubscription() + sub.requestMore(2) + c.expectNext(1) + c.expectNext(2) + c.expectNoMsg(200.millis) + sub.requestMore(2) + c.expectNext(3) + c.expectComplete() + } + + "produce future elements in order" in { + val c = StreamTestKit.consumerProbe[Int] + implicit val ec = system.dispatcher + val p = Flow(1 to 50).mapFuture(n ⇒ Future { + Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10)) + n + }).produceTo(materializer, c) + val sub = c.expectSubscription() + sub.requestMore(1000) + for (n ← 1 to 50) c.expectNext(n) + c.expectComplete() + } + + "not run more futures than requested elements" in { + val probe = TestProbe() + val c = StreamTestKit.consumerProbe[Int] + implicit val ec = system.dispatcher + val p = Flow(1 to 20).mapFuture(n ⇒ Future { + probe.ref ! n + n + }).produceTo(materializer, c) + val sub = c.expectSubscription() + // nothing before requested + probe.expectNoMsg(500.millis) + sub.requestMore(1) + probe.expectMsg(1) + probe.expectNoMsg(500.millis) + sub.requestMore(2) + probe.receiveN(2).toSet should be(Set(2, 3)) + probe.expectNoMsg(500.millis) + sub.requestMore(10) + probe.receiveN(10).toSet should be((4 to 13).toSet) + probe.expectNoMsg(200.millis) + + for (n ← 1 to 13) c.expectNext(n) + c.expectNoMsg(200.millis) + } + + "signal future failure" in { + val latch = TestLatch(1) + val c = StreamTestKit.consumerProbe[Int] + implicit val ec = system.dispatcher + val p = Flow(1 to 5).mapFuture(n ⇒ Future { + if (n == 3) throw new RuntimeException("err1") with NoStackTrace + else { + Await.ready(latch, 10.seconds) + n + } + }).produceTo(materializer, c) + val sub = c.expectSubscription() + sub.requestMore(10) + c.expectError.getMessage should be("err1") + latch.countDown() + } + + "signal error from mapFuture" in { + val latch = TestLatch(1) + val c = StreamTestKit.consumerProbe[Int] + implicit val ec = system.dispatcher + val p = Flow(1 to 5).mapFuture(n ⇒ + if (n == 3) throw new RuntimeException("err2") with NoStackTrace + else { + Future { + Await.ready(latch, 10.seconds) + n + } + }). + produceTo(materializer, c) + val sub = c.expectSubscription() + sub.requestMore(10) + c.expectError.getMessage should be("err2") + latch.countDown() + } + + } +} \ No newline at end of file