From e9694edaab8aa2d8ccf294dae55242498087c268 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sun, 19 Mar 2017 16:36:09 +0800 Subject: [PATCH] =str #22602 short path for Source.fromFuture and fromFutureSource and their alias --- .../scaladsl/FutureFlattenSourceSpec.scala | 34 ++++++++++++++-- .../akka/stream/impl/fusing/GraphStages.scala | 40 ++++++++++++------- 2 files changed, 57 insertions(+), 17 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala index 810150a2dd..40d25cfd68 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FutureFlattenSourceSpec.scala @@ -5,12 +5,12 @@ package akka.stream.scaladsl import java.util.concurrent.{ CompletableFuture, TimeUnit } +import akka.Done import akka.stream._ -import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageWithMaterializedValue } +import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue } import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.testkit.TestLatch -import akka.{ Done, NotUsed } import scala.concurrent.{ Await, Future, Promise } @@ -24,6 +24,35 @@ class FutureFlattenSourceSpec extends StreamSpec { val underlying: Source[Int, String] = Source(List(1, 2, 3)).mapMaterializedValue(_ ⇒ "foo") + "emit the elements of the already successful future source" in assertAllStagesStopped { + val (sourceMatVal, sinkMatVal) = + Source.fromFutureSource(Future.successful(underlying)) + .toMat(Sink.seq)(Keep.both) + .run() + + // should complete as soon as inner source has been materialized + sourceMatVal.futureValue should ===("foo") + sinkMatVal.futureValue should ===(List(1, 2, 3)) + } + + "emit no elements before the future of source successful" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + val sourcePromise = Promise[Source[Int, String]]() + val p = Source.fromFutureSource(sourcePromise.future) + .runWith(Sink.asPublisher(true)) + .subscribe(c) + val sub = c.expectSubscription() + import scala.concurrent.duration._ + c.expectNoMsg(100.millis) + sub.request(3) + c.expectNoMsg(100.millis) + sourcePromise.success(underlying) + c.expectNext(1) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + "emit the elements of the future source" in assertAllStagesStopped { val sourcePromise = Promise[Source[Int, String]]() @@ -31,7 +60,6 @@ class FutureFlattenSourceSpec extends StreamSpec { Source.fromFutureSource(sourcePromise.future) .toMat(Sink.seq)(Keep.both) .run() - sourcePromise.success(underlying) // should complete as soon as inner source has been materialized sourceMatVal.futureValue should ===("foo") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 5d9f43b466..035f950828 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -3,25 +3,24 @@ */ package akka.stream.impl.fusing -import akka.Done import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference } +import akka.Done import akka.actor.Cancellable import akka.annotation.InternalApi import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.stream.FlowMonitorState._ -import akka.stream.{ Shape, _ } -import akka.stream.scaladsl._ import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.stage._ - -import scala.concurrent.{ Future, Promise } -import scala.concurrent.duration.FiniteDuration import akka.stream.impl.StreamLayout._ import akka.stream.impl.{ LinearTraversalBuilder, ReactiveStreamsCompliance } +import akka.stream.scaladsl._ +import akka.stream.stage._ +import akka.stream.{ Shape, _ } import scala.annotation.unchecked.uncheckedVariance +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ Future, Promise } import scala.util.Try /** @@ -295,8 +294,12 @@ import scala.util.Try private val sinkIn = new SubSinkInlet[T]("FutureFlattenSource.in") override def preStart(): Unit = { - val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _ - future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + if (future.isCompleted) { + onFutureSourceCompleted(future.value.get) + } else { + val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _ + future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + } } // initial handler (until future completes) @@ -368,11 +371,20 @@ import scala.util.Try override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with OutHandler { def onPull(): Unit = { - val cb = getAsyncCallback[Try[T]] { - case scala.util.Success(v) ⇒ emit(out, v, () ⇒ completeStage()) - case scala.util.Failure(t) ⇒ failStage(t) - }.invoke _ - future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + if (future.isCompleted) { + onFutureCompleted(future.value.get) + } else { + val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _ + future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + } + + def onFutureCompleted(result: Try[T]): Unit = { + result match { + case scala.util.Success(v) ⇒ emit(out, v, () ⇒ completeStage()) + case scala.util.Failure(t) ⇒ failStage(t) + } + } + setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more }