+str - Deletes FuturePublisher and replaces it with Source.single(f).mapAsync(1)(identity)

This commit is contained in:
Viktor Klang 2015-06-06 14:13:26 +02:00
parent 8527e0347e
commit 79b6de1558
5 changed files with 8 additions and 184 deletions

View file

@ -87,31 +87,6 @@ private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes
override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, amendShape(attr))
}
/**
* INTERNAL API
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
private[akka] final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) {
override def create(context: MaterializationContext) =
future.value match {
case Some(Success(element))
(SynchronousIterablePublisher(List(element), context.stageName), ()) // Option is not Iterable. sigh
case Some(Failure(t))
(ErrorPublisher(t, context.stageName).asInstanceOf[Publisher[Out]], ())
case None
val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer)
val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes)
(ActorPublisher[Out](actorMaterializer.actorOf(context,
FuturePublisher.props(future, effectiveSettings))), ()) // FIXME this does not need to be an actor
}
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new FutureSource(future, attributes, shape)
override def withAttributes(attr: OperationAttributes): Module = new FutureSource(future, attr, amendShape(attr))
}
/**
* INTERNAL API
*/