Merge pull request #15281 from akka/wip-15218-mapFuture-patriknw

+str #15218 Add mapFuture operator
This commit is contained in:
Patrik Nordwall 2014-06-04 08:25:28 +02:00
commit de22fa4b02
11 changed files with 365 additions and 7 deletions

View file

@ -35,6 +35,9 @@ private[akka] object Ast {
case class Transform(transformer: Transformer[Any, Any]) extends AstNode {
override def name = transformer.name
}
case class MapFuture(f: Any Future[Any]) extends AstNode {
override def name = "mapFuture"
}
case class GroupBy(f: Any Any) extends AstNode {
override def name = "groupBy"
}

View file

@ -38,6 +38,7 @@ private[akka] object ActorProcessor {
case bf: Buffer Props(new BufferImpl(settings, bf.size, bf.overflowStrategy))
case tt: PrefixAndTail Props(new PrefixAndTailImpl(settings, tt.n))
case ConcatAll Props(new ConcatAllImpl(settings))
case m: MapFuture Props(new MapFutureProcessorImpl(settings, m.f))
}).withDispatcher(settings.dispatcher)
}
@ -165,7 +166,8 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
private var downstreamBufferSpace = 0
private var downstreamCompleted = false
def demandAvailable = downstreamBufferSpace > 0
override def demandAvailable = downstreamBufferSpace > 0
def demandCount: Int = downstreamBufferSpace
override val subreceive = new SubReceive(waitingExposedPublisher)
@ -244,7 +246,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin
override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e)
}
protected val primaryOutputs: Outputs =
protected val primaryOutputs: FanoutOutputs =
new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, this) {
override def afterShutdown(): Unit = {
primaryOutputsShutdown = true

View file

@ -152,6 +152,9 @@ private[akka] trait Builder[Out] {
override def name = "map"
})
def mapFuture[U](f: Out Future[U]): Thing[U] =
andThen(MapFuture(f.asInstanceOf[Any Future[Any]]))
def filter(p: Out Boolean): Thing[Out] =
transform(new Transformer[Out, Out] {
override def onNext(in: Out) = if (p(in)) List(in) else Nil

View file

@ -0,0 +1,156 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.collection.mutable
import scala.collection.immutable
import scala.collection.immutable.TreeSet
import scala.concurrent.Future
import scala.util.control.NonFatal
import akka.stream.MaterializerSettings
import akka.pattern.pipe
import scala.annotation.tailrec
/**
* INTERNAL API
*/
private[akka] object MapFutureProcessorImpl {
object FutureElement {
implicit val ordering: Ordering[FutureElement] = new Ordering[FutureElement] {
def compare(a: FutureElement, b: FutureElement): Int = {
a.seqNo compare b.seqNo
}
}
}
case class FutureElement(seqNo: Long, element: Any)
case class FutureFailure(cause: Throwable)
}
/**
* INTERNAL API
*/
private[akka] class MapFutureProcessorImpl(_settings: MaterializerSettings, f: Any Future[Any]) extends ActorProcessorImpl(_settings) {
import MapFutureProcessorImpl._
// Execution context for pipeTo and friends
import context.dispatcher
// TODO performance improvement: mutable buffer?
var emits = immutable.Seq.empty[Any]
var submittedSeqNo = 0L
var doneSeqNo = 0L
def gap: Long = submittedSeqNo - doneSeqNo
// TODO performance improvement: explore Endre's proposal of using an array based ring buffer addressed by
// seqNo & Mask and explicitly storing a Gap object to denote missing pieces instead of the sorted set
// keep future results arriving too early in a buffer sorted by seqNo
var orderedBuffer = TreeSet.empty[FutureElement]
override def receive = futureReceive orElse super.receive
def drainBuffer(): List[Any] = {
// this is mutable for speed
var n = 0
var elements = mutable.ListBuffer.empty[Any]
var error: Option[Throwable] = None
val iter = orderedBuffer.iterator
@tailrec def split(): Unit =
if (iter.hasNext) {
val next = iter.next()
val inOrder = next.seqNo == (doneSeqNo + 1)
// stop at first missing seqNo
if (inOrder) {
n += 1
doneSeqNo = next.seqNo
elements += next.element
split()
}
}
split()
orderedBuffer = orderedBuffer.drop(n)
elements.toList
}
def futureReceive: Receive = {
case fe @ FutureElement(seqNo, element)
if (seqNo == (doneSeqNo + 1)) {
// successful element for the next sequence number
// emit that element and all elements from the buffer that are in order
// until next missing sequence number
doneSeqNo = seqNo
if (orderedBuffer.isEmpty) {
emits = List(element)
} else {
val fromBuffer = drainBuffer()
emits = element :: fromBuffer
}
emitAndThen(running)
pump()
} else {
assert(seqNo > doneSeqNo, s"Unexpected sequence number [$seqNo], expected seqNo > $doneSeqNo")
// out of order, buffer until missing elements arrive
orderedBuffer += fe
}
case FutureFailure(cause)
fail(cause)
}
override def onError(e: Throwable): Unit = {
// propagate upstream error immediately
fail(e)
}
object RunningPhaseCondition extends TransferState {
def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) ||
(primaryInputs.inputsDepleted && gap == 0)
def isCompleted = false
}
val running: TransferPhase = TransferPhase(RunningPhaseCondition) { ()
if (primaryInputs.inputsDepleted) {
emitAndThen(completedPhase)
} else if (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) {
val elem = primaryInputs.dequeueInputElement()
submittedSeqNo += 1
val seqNo = submittedSeqNo
try {
f(elem).map(FutureElement(seqNo, _)).recover {
case err FutureFailure(err)
}.pipeTo(self)
} catch {
case NonFatal(err)
// f threw, propagate error immediately
fail(err)
}
emitAndThen(running)
}
}
// Save previous phase we should return to in a var to avoid allocation
var phaseAfterFlush: TransferPhase = _
// Enters flushing phase if there are emits pending
def emitAndThen(andThen: TransferPhase): Unit =
if (emits.nonEmpty) {
phaseAfterFlush = andThen
nextPhase(emitting)
} else nextPhase(andThen)
// Emits all pending elements, then returns to savedPhase
val emitting = TransferPhase(primaryOutputs.NeedsDemand) { ()
primaryOutputs.enqueueOutputElement(emits.head)
emits = emits.tail
if (emits.isEmpty) nextPhase(phaseAfterFlush)
}
nextPhase(running)
}

View file

@ -19,6 +19,7 @@ import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transf
import akka.stream.scaladsl.{ Duct SDuct }
import akka.stream.impl.Ast
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
/**
* Java API
@ -51,6 +52,15 @@ abstract class Duct[In, Out] {
*/
def map[U](f: Function[Out, U]): Duct[In, U]
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `Future` of the
* element that will be emitted downstream. As many futures as requested elements by
* downstream may run in parallel and may complete in any order, but the elements that
* are emitted downstream are in the same order as from upstream.
*/
def mapFuture[U](f: Function[Out, Future[U]]): Duct[In, U]
/**
* Only pass on those elements that satisfy the given predicate.
*/
@ -330,6 +340,8 @@ abstract class Duct[In, Out] {
private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, T] {
override def map[U](f: Function[T, U]): Duct[In, U] = new DuctAdapter(delegate.map(f.apply))
override def mapFuture[U](f: Function[T, Future[U]]): Duct[In, U] = new DuctAdapter(delegate.mapFuture(f.apply))
override def filter(p: Predicate[T]): Duct[In, T] = new DuctAdapter(delegate.filter(p.test))
override def collect[U](pf: PartialFunction[T, U]): Duct[In, U] = new DuctAdapter(delegate.collect(pf))

View file

@ -114,6 +114,15 @@ abstract class Flow[T] {
*/
def map[U](f: Function[T, U]): Flow[U]
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `Future` of the
* element that will be emitted downstream. As many futures as requested elements by
* downstream may run in parallel and may complete in any order, but the elements that
* are emitted downstream are in the same order as from upstream.
*/
def mapFuture[U](f: Function[T, Future[U]]): Flow[U]
/**
* Only pass on those elements that satisfy the given predicate.
*/
@ -402,6 +411,8 @@ trait OnCompleteCallback {
private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] {
override def map[U](f: Function[T, U]): Flow[U] = new FlowAdapter(delegate.map(f.apply))
override def mapFuture[U](f: Function[T, Future[U]]): Flow[U] = new FlowAdapter(delegate.mapFuture(f.apply))
override def filter(p: Predicate[T]): Flow[T] = new FlowAdapter(delegate.filter(p.test))
override def collect[U](pf: PartialFunction[T, U]): Flow[U] = new FlowAdapter(delegate.collect(pf))

View file

@ -12,6 +12,7 @@ import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transf
import akka.stream.impl.DuctImpl
import akka.stream.impl.Ast
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
object Duct {
@ -42,6 +43,15 @@ trait Duct[In, +Out] {
*/
def map[U](f: Out U): Duct[In, U]
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `Future` of the
* element that will be emitted downstream. As many futures as requested elements by
* downstream may run in parallel and may complete in any order, but the elements that
* are emitted downstream are in the same order as from upstream.
*/
def mapFuture[U](f: Out Future[U]): Duct[In, U]
/**
* Only pass on those elements that satisfy the given predicate.
*/

View file

@ -110,6 +110,15 @@ trait Flow[+T] {
*/
def map[U](f: T U): Flow[U]
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step. The function returns a `Future` of the
* element that will be emitted downstream. As many futures as requested elements by
* downstream may run in parallel and may complete in any order, but the elements that
* are emitted downstream are in the same order as from upstream.
*/
def mapFuture[U](f: T Future[U]): Flow[U]
/**
* Only pass on those elements that satisfy the given predicate.
*/

View file

@ -7,9 +7,10 @@ import org.junit.Test;
import org.reactivestreams.api.Consumer;
import org.reactivestreams.api.Producer;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.Future;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.japi.Function;
import akka.japi.Function2;
import akka.japi.Pair;
@ -189,4 +190,24 @@ public class DuctTest {
probe.expectMsgEquals("done");
}
@Test
public void mustBeAbleToUseMapFuture() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
Consumer<String> c = Duct.create(String.class).mapFuture(new Function<String, Future<String>>() {
public Future<String> apply(String elem) {
return Futures.successful(elem.toUpperCase());
}
}).foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
final java.lang.Iterable<String> input = Arrays.asList("a", "b", "c");
Flow.create(input).produceTo(materializer, c);
probe.expectMsgEquals("A");
probe.expectMsgEquals("B");
probe.expectMsgEquals("C");
}
}

View file

@ -9,22 +9,20 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import akka.stream.FlattenStrategy;
import akka.stream.OverflowStrategy;
import org.junit.ClassRule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.reactivestreams.api.Producer;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.japi.Function;
import akka.japi.Function2;
import akka.japi.Pair;
@ -523,4 +521,23 @@ public class FlowTest {
probe.expectNoMsg(FiniteDuration.create(200, TimeUnit.MILLISECONDS));
}
@Test
public void mustBeAbleToUseMapFuture() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<String> input = Arrays.asList("a", "b", "c");
Flow.create(input).mapFuture(new Function<String, Future<String>>() {
public Future<String> apply(String elem) {
return Futures.successful(elem.toUpperCase());
}
}).foreach(new Procedure<String>() {
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}).consume(materializer);
probe.expectMsgEquals("A");
probe.expectMsgEquals("B");
probe.expectMsgEquals("C");
}
}

View file

@ -0,0 +1,114 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NoStackTrace
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.TestProbe
import akka.testkit.TestLatch
import scala.concurrent.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowMapFutureSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings(
dispatcher = "akka.test.stream-dispatcher"))
"A Flow with mapFuture" must {
"produce future elements" in {
val c = StreamTestKit.consumerProbe[Int]
implicit val ec = system.dispatcher
val p = Flow(1 to 3).mapFuture(n Future(n)).produceTo(materializer, c)
val sub = c.expectSubscription()
sub.requestMore(2)
c.expectNext(1)
c.expectNext(2)
c.expectNoMsg(200.millis)
sub.requestMore(2)
c.expectNext(3)
c.expectComplete()
}
"produce future elements in order" in {
val c = StreamTestKit.consumerProbe[Int]
implicit val ec = system.dispatcher
val p = Flow(1 to 50).mapFuture(n Future {
Thread.sleep(ThreadLocalRandom.current().nextInt(1, 10))
n
}).produceTo(materializer, c)
val sub = c.expectSubscription()
sub.requestMore(1000)
for (n 1 to 50) c.expectNext(n)
c.expectComplete()
}
"not run more futures than requested elements" in {
val probe = TestProbe()
val c = StreamTestKit.consumerProbe[Int]
implicit val ec = system.dispatcher
val p = Flow(1 to 20).mapFuture(n Future {
probe.ref ! n
n
}).produceTo(materializer, c)
val sub = c.expectSubscription()
// nothing before requested
probe.expectNoMsg(500.millis)
sub.requestMore(1)
probe.expectMsg(1)
probe.expectNoMsg(500.millis)
sub.requestMore(2)
probe.receiveN(2).toSet should be(Set(2, 3))
probe.expectNoMsg(500.millis)
sub.requestMore(10)
probe.receiveN(10).toSet should be((4 to 13).toSet)
probe.expectNoMsg(200.millis)
for (n 1 to 13) c.expectNext(n)
c.expectNoMsg(200.millis)
}
"signal future failure" in {
val latch = TestLatch(1)
val c = StreamTestKit.consumerProbe[Int]
implicit val ec = system.dispatcher
val p = Flow(1 to 5).mapFuture(n Future {
if (n == 3) throw new RuntimeException("err1") with NoStackTrace
else {
Await.ready(latch, 10.seconds)
n
}
}).produceTo(materializer, c)
val sub = c.expectSubscription()
sub.requestMore(10)
c.expectError.getMessage should be("err1")
latch.countDown()
}
"signal error from mapFuture" in {
val latch = TestLatch(1)
val c = StreamTestKit.consumerProbe[Int]
implicit val ec = system.dispatcher
val p = Flow(1 to 5).mapFuture(n
if (n == 3) throw new RuntimeException("err2") with NoStackTrace
else {
Future {
Await.ready(latch, 10.seconds)
n
}
}).
produceTo(materializer, c)
val sub = c.expectSubscription()
sub.requestMore(10)
c.expectError.getMessage should be("err2")
latch.countDown()
}
}
}