=str - 16503 - Implements a dedicated FutureSource

Avoids the overhead of
`Source.single(future).mapAsyncUnordered(1)(identity).withAttr…`

Fixes #16503
This commit is contained in:
Viktor Klang 2016-01-06 13:01:37 +01:00
parent 1ac9b4eafb
commit 59bff70c87
2 changed files with 24 additions and 1 deletions

View file

@ -8,12 +8,15 @@ import akka.actor.Cancellable
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.stage._
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.FiniteDuration
import akka.stream.impl.StreamLayout._
import akka.stream.impl.ReactiveStreamsCompliance
import scala.util.Try
/**
* INTERNAL API
*/
@ -197,4 +200,24 @@ object GraphStages {
}
override def toString: String = s"SingleSource($elem)"
}
private[stream] final class FutureSource[T](val future: Future[T]) extends GraphStage[SourceShape[T]] {
ReactiveStreamsCompliance.requireNonNullElement(future)
val shape = SourceShape(Outlet[T]("future.out"))
val out = shape.out
override def initialAttributes: Attributes = DefaultAttributes.futureSource
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val cb = getAsyncCallback[Try[T]] {
case scala.util.Success(v) emit(out, v, () completeStage())
case scala.util.Failure(t) failStage(t)
}.invoke _
future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext)
setHandler(out, eagerTerminateOutput) // After first pull we won't produce anything more
}
})
}
override def toString: String = "FutureSource"
}
}

View file

@ -215,7 +215,7 @@ object Source {
* The stream terminates with a failure if the `Future` is completed with a failure.
*/
def fromFuture[T](future: Future[T]): Source[T, Unit] =
single(future).mapAsyncUnordered(1)(ConstantFun.scalaIdentityFunction).withAttributes(DefaultAttributes.futureSource)
fromGraph(new FutureSource(future))
/**
* Elements are emitted periodically with the specified interval.