Merge pull request #22603 from hepin1989/master
= str #22602 short path for Source.fromFuture and fromFutureSource an…
This commit is contained in:
commit
2bdb6eccd9
2 changed files with 57 additions and 17 deletions
|
|
@ -5,12 +5,12 @@ package akka.stream.scaladsl
|
|||
|
||||
import java.util.concurrent.{ CompletableFuture, TimeUnit }
|
||||
|
||||
import akka.Done
|
||||
import akka.stream._
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, GraphStageWithMaterializedValue }
|
||||
import akka.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue }
|
||||
import akka.stream.testkit.Utils.{ TE, assertAllStagesStopped }
|
||||
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
|
||||
import akka.testkit.TestLatch
|
||||
import akka.{ Done, NotUsed }
|
||||
|
||||
import scala.concurrent.{ Await, Future, Promise }
|
||||
|
||||
|
|
@ -24,6 +24,35 @@ class FutureFlattenSourceSpec extends StreamSpec {
|
|||
val underlying: Source[Int, String] =
|
||||
Source(List(1, 2, 3)).mapMaterializedValue(_ ⇒ "foo")
|
||||
|
||||
"emit the elements of the already successful future source" in assertAllStagesStopped {
|
||||
val (sourceMatVal, sinkMatVal) =
|
||||
Source.fromFutureSource(Future.successful(underlying))
|
||||
.toMat(Sink.seq)(Keep.both)
|
||||
.run()
|
||||
|
||||
// should complete as soon as inner source has been materialized
|
||||
sourceMatVal.futureValue should ===("foo")
|
||||
sinkMatVal.futureValue should ===(List(1, 2, 3))
|
||||
}
|
||||
|
||||
"emit no elements before the future of source successful" in assertAllStagesStopped {
|
||||
val c = TestSubscriber.manualProbe[Int]()
|
||||
val sourcePromise = Promise[Source[Int, String]]()
|
||||
val p = Source.fromFutureSource(sourcePromise.future)
|
||||
.runWith(Sink.asPublisher(true))
|
||||
.subscribe(c)
|
||||
val sub = c.expectSubscription()
|
||||
import scala.concurrent.duration._
|
||||
c.expectNoMsg(100.millis)
|
||||
sub.request(3)
|
||||
c.expectNoMsg(100.millis)
|
||||
sourcePromise.success(underlying)
|
||||
c.expectNext(1)
|
||||
c.expectNext(2)
|
||||
c.expectNext(3)
|
||||
c.expectComplete()
|
||||
}
|
||||
|
||||
"emit the elements of the future source" in assertAllStagesStopped {
|
||||
|
||||
val sourcePromise = Promise[Source[Int, String]]()
|
||||
|
|
@ -31,7 +60,6 @@ class FutureFlattenSourceSpec extends StreamSpec {
|
|||
Source.fromFutureSource(sourcePromise.future)
|
||||
.toMat(Sink.seq)(Keep.both)
|
||||
.run()
|
||||
|
||||
sourcePromise.success(underlying)
|
||||
// should complete as soon as inner source has been materialized
|
||||
sourceMatVal.futureValue should ===("foo")
|
||||
|
|
|
|||
|
|
@ -3,25 +3,24 @@
|
|||
*/
|
||||
package akka.stream.impl.fusing
|
||||
|
||||
import akka.Done
|
||||
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference }
|
||||
|
||||
import akka.Done
|
||||
import akka.actor.Cancellable
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.event.Logging
|
||||
import akka.stream.FlowMonitorState._
|
||||
import akka.stream.{ Shape, _ }
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.stage._
|
||||
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.stream.impl.StreamLayout._
|
||||
import akka.stream.impl.{ LinearTraversalBuilder, ReactiveStreamsCompliance }
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.stage._
|
||||
import akka.stream.{ Shape, _ }
|
||||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.util.Try
|
||||
|
||||
/**
|
||||
|
|
@ -295,8 +294,12 @@ import scala.util.Try
|
|||
private val sinkIn = new SubSinkInlet[T]("FutureFlattenSource.in")
|
||||
|
||||
override def preStart(): Unit = {
|
||||
val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _
|
||||
future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext)
|
||||
if (future.isCompleted) {
|
||||
onFutureSourceCompleted(future.value.get)
|
||||
} else {
|
||||
val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _
|
||||
future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext)
|
||||
}
|
||||
}
|
||||
|
||||
// initial handler (until future completes)
|
||||
|
|
@ -368,11 +371,20 @@ import scala.util.Try
|
|||
override def createLogic(attr: Attributes) =
|
||||
new GraphStageLogic(shape) with OutHandler {
|
||||
def onPull(): Unit = {
|
||||
val cb = getAsyncCallback[Try[T]] {
|
||||
case scala.util.Success(v) ⇒ emit(out, v, () ⇒ completeStage())
|
||||
case scala.util.Failure(t) ⇒ failStage(t)
|
||||
}.invoke _
|
||||
future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext)
|
||||
if (future.isCompleted) {
|
||||
onFutureCompleted(future.value.get)
|
||||
} else {
|
||||
val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _
|
||||
future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext)
|
||||
}
|
||||
|
||||
def onFutureCompleted(result: Try[T]): Unit = {
|
||||
result match {
|
||||
case scala.util.Success(v) ⇒ emit(out, v, () ⇒ completeStage())
|
||||
case scala.util.Failure(t) ⇒ failStage(t)
|
||||
}
|
||||
}
|
||||
|
||||
setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue