diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 878cb822de..ea70aaf273 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -8,12 +8,15 @@ import akka.actor.Cancellable import akka.dispatch.ExecutionContexts import akka.event.Logging import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.stage._ import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration import akka.stream.impl.StreamLayout._ import akka.stream.impl.ReactiveStreamsCompliance +import scala.util.Try + /** * INTERNAL API */ @@ -197,4 +200,24 @@ object GraphStages { } override def toString: String = s"SingleSource($elem)" } + + private[stream] final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] { + ReactiveStreamsCompliance.requireNonNullElement(future) + val shape = SourceShape(Outlet[T]("future.out")) + val out = shape.out + override def initialAttributes: Attributes = DefaultAttributes.futureSource + override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + setHandler(out, new OutHandler { + override def onPull(): Unit = { + val cb = getAsyncCallback[Try[T]] { + case scala.util.Success(v) ⇒ emit(out, v, () ⇒ completeStage()) + case scala.util.Failure(t) ⇒ failStage(t) + }.invoke _ + future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more + } + }) + } + override def toString: String = "FutureSource" + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 82586f5b99..27ee565474 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -215,7 +215,7 @@ object Source { * The stream terminates with a failure if the `Future` is completed with a failure. */ def fromFuture[T](future: Future[T]): Source[T, Unit] = - single(future).mapAsyncUnordered(1)(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.futureSource) + fromGraph(new FutureSource(future)) /** * Elements are emitted periodically with the specified interval.