Merge pull request #22360 from johanandren/wip-21363-future-source-last-bit-johanandren

Source that flattens a future source and keeps mat val
This commit is contained in:
Patrik Nordwall 2017-03-16 20:36:05 +01:00 committed by GitHub
commit c3c058b6bb
11 changed files with 385 additions and 15 deletions

View file

@ -76,7 +76,6 @@ If the ``CompletionStage`` fails the stream is failed with that exception.
**completes** after the ``CompletionStage`` has completed or when it fails **completes** after the ``CompletionStage`` has completed or when it fails
fromFuture fromFuture
^^^^^^^^^^ ^^^^^^^^^^
Send the single value of the Scala ``Future`` when it completes and there is demand. Send the single value of the Scala ``Future`` when it completes and there is demand.
@ -86,6 +85,24 @@ If the future fails the stream is failed with that exception.
**completes** after the future has completed **completes** after the future has completed
fromFutureSource
^^^^^^^^^^^^^^^^
Streams the elements of the given future source once it successfully completes.
If the future fails the stream is failed.
**emits** the next value from the `future` source, once it has completed
**completes** after the `future` source completes
fromSourceCompletionStage
^^^^^^^^^^^^^^^^^^^^^^^^^
Streams the elements of an asynchronous source once its given `completion` stage completes.
If the `completion` fails the stream is failed with that exception.
**emits** the next value from the asynchronous source, once its `completion stage` has completed
**completes** after the asynchronous source completes
unfold unfold
^^^^^^ ^^^^^^
Stream the result of a function as long as it returns a ``Optional``, the value inside the optional Stream the result of a function as long as it returns a ``Optional``, the value inside the optional

View file

@ -84,6 +84,23 @@ If the future fails the stream is failed with that exception.
**completes** after the future has completed **completes** after the future has completed
fromFutureSource
^^^^^^^^^^^^^^^^
Streams the elements of the given future source once it successfully completes.
If the future fails the stream is failed.
**emits** the next value from the `future` source, once it has completed
**completes** after the `future` source completes
fromSourceCompletionStage
^^^^^^^^^^^^^^^^^^^^^^^^^
Streams the elements of an asynchronous source once its given `completion` stage completes.
If the `completion` fails the stream is failed with that exception.
**emits** the next value from the asynchronous source, once its `completion stage` has completed
**completes** after the asynchronous source completes
unfold unfold
^^^^^^ ^^^^^^

View file

@ -29,6 +29,7 @@ import scala.concurrent.duration.FiniteDuration;
import akka.testkit.AkkaJUnitActorSystemResource; import akka.testkit.AkkaJUnitActorSystemResource;
import java.util.*; import java.util.*;
import java.util.function.Supplier;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage; import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -381,6 +382,55 @@ public class FlowTest extends StreamTest {
assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result)); assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result));
} }
@Test
public void mustBeAbleToUsefromSourceCompletionStage() throws Exception {
final Flow<String, String, NotUsed> f1 =
Flow.of(String.class).via(FlowTest.this.<String> op()).named("f1");
final Flow<String, String, NotUsed> f2 =
Flow.of(String.class).via(FlowTest.this.<String> op()).named("f2");
@SuppressWarnings("unused")
final Flow<String, String, NotUsed> f3 =
Flow.of(String.class).via(FlowTest.this.<String> op()).named("f3");
final Source<String, NotUsed> in1 = Source.from(Arrays.asList("a", "b", "c"));
final Source<String, NotUsed> in2 = Source.from(Arrays.asList("d", "e", "f"));
final Sink<String, Publisher<String>> publisher = Sink.asPublisher(AsPublisher.WITHOUT_FANOUT);
final Graph<SourceShape<String>, NotUsed> graph = Source.fromGraph(
GraphDSL.create(new Function<GraphDSL.Builder<NotUsed>, SourceShape<String>>() {
@Override
public SourceShape<String> apply(Builder<NotUsed> b)
throws Exception {
final UniformFanInShape<String, String> merge =
b.add(Merge.<String>create(2));
b.from(b.add(in1)).via(b.add(f1)).toInlet(merge.in(0));
b.from(b.add(in2)).via(b.add(f2)).toInlet(merge.in(1));
return new SourceShape<String>(merge.out());
}
}));
final Supplier<Graph<SourceShape<String>, NotUsed>> fn =
new Supplier<Graph<SourceShape<String>, NotUsed>>() {
public Graph<SourceShape<String>, NotUsed> get() { return graph; }
};
final CompletionStage<Graph<SourceShape<String>, NotUsed>> stage =
CompletableFuture.supplyAsync(fn);
final Source<String, CompletionStage<NotUsed>> source =
Source.fromSourceCompletionStage(stage);
// collecting
final Publisher<String> pub = source.runWith(publisher, materializer);
final CompletionStage<List<String>> all = Source.fromPublisher(pub).limit(100).runWith(Sink.<String>seq(), materializer);
final List<String> result = all.toCompletableFuture().get(200, TimeUnit.MILLISECONDS);
assertEquals(new HashSet<Object>(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet<String>(result));
}
@Test @Test
public void mustBeAbleToUseZip() { public void mustBeAbleToUseZip() {
final TestKit probe = new TestKit(system); final TestKit probe = new TestKit(system);

View file

@ -257,7 +257,6 @@ class ActorGraphInterpreterSpec extends StreamSpec {
EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept { EventFilter[IllegalArgumentException](pattern = "Error in stage.*", occurrences = 1).intercept {
Await.result(Source.fromGraph(failyStage).runWith(Sink.ignore), 3.seconds) Await.result(Source.fromGraph(failyStage).runWith(Sink.ignore), 3.seconds)
} }
} }
"be able to properly handle case where a stage fails before subscription happens" in assertAllStagesStopped { "be able to properly handle case where a stage fails before subscription happens" in assertAllStagesStopped {

View file

@ -0,0 +1,168 @@
/**
* Copyright (C) 2015-2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.concurrent.{ CompletableFuture, TimeUnit }
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageWithMaterializedValue }
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped }
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.testkit.TestLatch
import akka.{ Done, NotUsed }
import scala.concurrent.{ Await, Future, Promise }
class FutureFlattenSourceSpec extends StreamSpec {
implicit val materializer = ActorMaterializer()
implicit def ec = system.dispatcher
"Future source" must {
val underlying: Source[Int, String] =
Source(List(1, 2, 3)).mapMaterializedValue(_ "foo")
"emit the elements of the future source" in assertAllStagesStopped {
val sourcePromise = Promise[Source[Int, String]]()
val (sourceMatVal, sinkMatVal) =
Source.fromFutureSource(sourcePromise.future)
.toMat(Sink.seq)(Keep.both)
.run()
sourcePromise.success(underlying)
// should complete as soon as inner source has been materialized
sourceMatVal.futureValue should ===("foo")
sinkMatVal.futureValue should ===(List(1, 2, 3))
}
"emit the elements from a source in a completion stage" in assertAllStagesStopped {
val (sourceMatVal, sinkMatVal) =
Source.fromSourceCompletionStage(
// can't be inferred
CompletableFuture.completedFuture[Graph[SourceShape[Int], String]](underlying)
).toMat(Sink.seq)(Keep.both)
.run()
sourceMatVal.toCompletableFuture.get(remainingOrDefault.toMillis, TimeUnit.MILLISECONDS) should ===("foo")
sinkMatVal.futureValue should ===(List(1, 2, 3))
}
"handle downstream cancelling before the underlying Future completes" in assertAllStagesStopped {
val sourcePromise = Promise[Source[Int, String]]()
val (sourceMatVal, termination) =
Source.fromFutureSource(sourcePromise.future)
.watchTermination()(Keep.both)
.to(Sink.cancelled)
.run()
// wait for cancellation to occur
termination.futureValue should ===(Done)
// even though canceled the underlying matval should arrive
sourcePromise.success(underlying)
sourceMatVal.futureValue should ===("foo")
}
"fail if the underlying Future is failed" in assertAllStagesStopped {
val failure = TE("foo")
val underlying = Future.failed[Source[Int, String]](failure)
val (sourceMatVal, sinkMatVal) = Source.fromFutureSource(underlying).toMat(Sink.seq)(Keep.both).run()
sourceMatVal.failed.futureValue should ===(failure)
sinkMatVal.failed.futureValue should ===(failure)
}
"fail as the underlying Future fails after outer source materialization" in assertAllStagesStopped {
val failure = TE("foo")
val sourcePromise = Promise[Source[Int, String]]()
val materializationLatch = TestLatch(1)
val (sourceMatVal, sinkMatVal) =
Source.fromFutureSource(sourcePromise.future)
.mapMaterializedValue { value
materializationLatch.countDown()
value
}
.toMat(Sink.seq)(Keep.both)
.run()
// we don't know that materialization completed yet (this is still a bit racy)
Await.ready(materializationLatch, remainingOrDefault)
Thread.sleep(100)
sourcePromise.failure(failure)
sourceMatVal.failed.futureValue should ===(failure)
sinkMatVal.failed.futureValue should ===(failure)
}
"fail as the underlying Future fails after outer source materialization with no demand" in assertAllStagesStopped {
val failure = TE("foo")
val sourcePromise = Promise[Source[Int, String]]()
val testProbe = TestSubscriber.probe[Int]()
val sourceMatVal =
Source.fromFutureSource(sourcePromise.future)
.to(Sink.fromSubscriber(testProbe))
.run()
testProbe.expectSubscription()
sourcePromise.failure(failure)
sourceMatVal.failed.futureValue should ===(failure)
}
"handle back-pressure when the future completes" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
val publisher = TestPublisher.probe[Int]()
val sourcePromise = Promise[Source[Int, String]]()
val matVal = Source.fromFutureSource(sourcePromise.future)
.to(Sink.fromSubscriber(subscriber))
.run()
subscriber.ensureSubscription()
sourcePromise.success(Source.fromPublisher(publisher).mapMaterializedValue(_ "woho"))
// materialized value completes but still no demand
matVal.futureValue should ===("woho")
// then demand and let an element through to see it works
subscriber.ensureSubscription()
subscriber.request(1)
publisher.expectRequest()
publisher.sendNext(1)
subscriber.expectNext(1)
publisher.sendComplete()
subscriber.expectComplete()
}
class FailingMatGraphStage extends GraphStageWithMaterializedValue[SourceShape[Int], String] {
val out = Outlet[Int]("whatever")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, String) = {
throw TE("argh, materialization failed")
}
}
// Behaviour when inner source throws during materialization is undefined (leaks ActorGraphInterpreters)
// until ticket #22358 has been fixed, this test fails because of it
"fail when the future source materialization fails" in pendingUntilFixed(assertAllStagesStopped {
val failure = TE("MatEx")
val (sourceMatVal, sinkMatVal) =
Source.fromFutureSource(
Future.successful(Source.fromGraph(new FailingMatGraphStage))
).toMat(Sink.seq)(Keep.both)
.run()
sinkMatVal.failed.futureValue should ===(failure)
println(sourceMatVal.futureValue)
sourceMatVal.failed.futureValue should ===(failure)
})
}
}

View file

@ -6,7 +6,7 @@ package akka.stream.scaladsl
import akka.testkit.DefaultTimeout import akka.testkit.DefaultTimeout
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Span, Millis } import org.scalatest.time.{ Span, Millis }
import scala.concurrent.{ Future, Await } import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Failure import scala.util.Failure
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -48,7 +48,6 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
p.subscribe(c2) p.subscribe(c2)
c2.expectSubscriptionAndError() c2.expectSubscriptionAndError()
} }
} }
"Empty Source" must { "Empty Source" must {
@ -417,5 +416,4 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
closed should ===(true) closed should ===(true)
} }
} }
} }

View file

@ -747,13 +747,27 @@ import scala.util.control.NonFatal
case StreamSupervisor.PrintDebugDump case StreamSupervisor.PrintDebugDump
val builder = new java.lang.StringBuilder(s"activeShells (actor: $self):\n") val builder = new java.lang.StringBuilder(s"activeShells (actor: $self):\n")
activeInterpreters.foreach { shell activeInterpreters.foreach { shell
builder.append(" " + shell.toString.replace("\n", "\n ")) builder.append(" ")
builder.append(shell.interpreter.toString) .append(shell.toString.replace("\n", "\n "))
.append("\n")
if (shell.isInitialized) {
builder.append(shell.interpreter.toString)
} else {
builder.append(" Not initialized")
}
builder.append("\n")
} }
builder.append(s"newShells:") builder.append(s"newShells:\n")
newShells.foreach { shell newShells.foreach { shell
builder.append(" " + shell.toString.replace("\n", "\n ")) builder.append(" ")
builder.append(shell.interpreter.toString) .append(shell.toString.replace("\n", "\n "))
.append("\n")
if (shell.isInitialized) {
builder.append(shell.interpreter.toString)
} else {
builder.append(" Not initialized")
}
builder.append("\n")
} }
println(builder) println(builder)
} }

View file

@ -277,6 +277,89 @@ import scala.util.Try
override def toString: String = "SingleSource" override def toString: String = "SingleSource"
} }
final class FutureFlattenSource[T, M](
val future: Future[Graph[SourceShape[T], M]])
extends GraphStageWithMaterializedValue[SourceShape[T], Future[M]] {
ReactiveStreamsCompliance.requireNonNullElement(future)
val out = Outlet[T]("FutureFlattenSource.out")
override val shape = SourceShape(out)
override def initialAttributes = DefaultAttributes.futureSource
override def createLogicAndMaterializedValue(attr: Attributes): (GraphStageLogic, Future[M]) = {
val materialized = Promise[M]()
val logic = new GraphStageLogic(shape) with InHandler with OutHandler {
private val sinkIn = new SubSinkInlet[T]("FutureFlattenSource.in")
override def preStart(): Unit = {
val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _
future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext)
}
// initial handler (until future completes)
setHandler(out, new OutHandler {
def onPull(): Unit = {}
override def onDownstreamFinish(): Unit = {
if (!materialized.isCompleted) {
// make sure we always yield the matval if possible, even if downstream cancelled
// before the source was materialized
val matValFuture = future.map { gr
val runnable = Source.fromGraph(gr).to(Sink.ignore)
interpreter.subFusingMaterializer.materialize(runnable, initialAttributes = attr)
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
materialized.completeWith(matValFuture)
}
super.onDownstreamFinish()
}
})
def onPush(): Unit = {
push(out, sinkIn.grab())
}
def onPull(): Unit = {
sinkIn.pull()
}
override def onUpstreamFinish(): Unit =
completeStage()
override def postStop(): Unit = {
if (!sinkIn.isClosed) sinkIn.cancel()
}
def onFutureSourceCompleted(result: Try[Graph[SourceShape[T], M]]): Unit = {
result.map { graph
val runnable = Source.fromGraph(graph).toMat(sinkIn.sink)(Keep.left)
val matVal = interpreter.subFusingMaterializer.materialize(runnable, initialAttributes = attr)
materialized.success(matVal)
setHandler(out, this)
sinkIn.setHandler(this)
if (isAvailable(out)) {
sinkIn.pull()
}
}.recover {
case t
sinkIn.cancel()
materialized.failure(t)
failStage(t)
}
}
}
(logic, materialized.future)
}
override def toString: String = "FutureFlattenSource"
}
final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] { final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] {
ReactiveStreamsCompliance.requireNonNullElement(future) ReactiveStreamsCompliance.requireNonNullElement(future)
val shape = SourceShape(Outlet[T]("future.out")) val shape = SourceShape(Outlet[T]("future.out"))

View file

@ -171,7 +171,7 @@ object Source {
new Source(scaladsl.Source.fromFuture(future)) new Source(scaladsl.Source.fromFuture(future))
/** /**
* Start a new `Source` from the given `CompletionStage`. The stream will consist of * Starts a new `Source` from the given `CompletionStage`. The stream will consist of
* one element when the `CompletionStage` is completed with a successful value, which * one element when the `CompletionStage` is completed with a successful value, which
* may happen before or after materializing the `Flow`. * may happen before or after materializing the `Flow`.
* The stream terminates with a failure if the `CompletionStage` is completed with a failure. * The stream terminates with a failure if the `CompletionStage` is completed with a failure.
@ -179,6 +179,18 @@ object Source {
def fromCompletionStage[O](future: CompletionStage[O]): javadsl.Source[O, NotUsed] = def fromCompletionStage[O](future: CompletionStage[O]): javadsl.Source[O, NotUsed] =
new Source(scaladsl.Source.fromCompletionStage(future)) new Source(scaladsl.Source.fromCompletionStage(future))
/**
* Streams the elements of the given future source once it successfully completes.
* If the future fails the stream is failed.
*/
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): javadsl.Source[T, Future[M]] = new Source(scaladsl.Source.fromFutureSource(future))
/**
* Streams the elements of an asynchronous source once its given `completion` stage completes.
* If the `completion` fails the stream is failed with that exception.
*/
def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): javadsl.Source[T, CompletionStage[M]] = new Source(scaladsl.Source.fromSourceCompletionStage(completion))
/** /**
* Elements are emitted periodically with the specified interval. * Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements. * The tick element will be delivered to downstream consumers that has requested any elements.

View file

@ -212,9 +212,9 @@ object Source {
}) })
/** /**
* Create [[Source]] that will continually produce given elements in specified order. * Creates [[Source]] that will continually produce given elements in specified order.
* *
* Start a new 'cycled' `Source` from the given elements. The producer stream of elements * Starts a new 'cycled' `Source` from the given elements. The producer stream of elements
* will continue infinitely by repeating the sequence of elements provided by function parameter. * will continue infinitely by repeating the sequence of elements provided by function parameter.
*/ */
def cycle[T](f: () Iterator[T]): Source[T, NotUsed] = { def cycle[T](f: () Iterator[T]): Source[T, NotUsed] = {
@ -247,7 +247,7 @@ object Source {
single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource) single(iterable).mapConcat(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.iterableSource)
/** /**
* Start a new `Source` from the given `Future`. The stream will consist of * Starts a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which * one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`. * may happen before or after materializing the `Flow`.
* The stream terminates with a failure if the `Future` is completed with a failure. * The stream terminates with a failure if the `Future` is completed with a failure.
@ -256,7 +256,7 @@ object Source {
fromGraph(new FutureSource(future)) fromGraph(new FutureSource(future))
/** /**
* Start a new `Source` from the given `Future`. The stream will consist of * Starts a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which * one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`. * may happen before or after materializing the `Flow`.
* The stream terminates with a failure if the `Future` is completed with a failure. * The stream terminates with a failure if the `Future` is completed with a failure.
@ -264,6 +264,18 @@ object Source {
def fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed] = def fromCompletionStage[T](future: CompletionStage[T]): Source[T, NotUsed] =
fromGraph(new FutureSource(future.toScala)) fromGraph(new FutureSource(future.toScala))
/**
* Streams the elements of the given future source once it successfully completes.
* If the future fails the stream is failed.
*/
def fromFutureSource[T, M](future: Future[Graph[SourceShape[T], M]]): Source[T, Future[M]] = fromGraph(new FutureFlattenSource(future))
/**
* Streams the elements of an asynchronous source once its given `completion` stage completes.
* If the `completion` fails the stream is failed with that exception.
*/
def fromSourceCompletionStage[T, M](completion: CompletionStage[Graph[SourceShape[T], M]]): Source[T, CompletionStage[M]] = fromFutureSource(completion.toScala).mapMaterializedValue(_.toJava)
/** /**
* Elements are emitted periodically with the specified interval. * Elements are emitted periodically with the specified interval.
* The tick element will be delivered to downstream consumers that has requested any elements. * The tick element will be delivered to downstream consumers that has requested any elements.