+str #15071 Add Flow.apply from a Future

This commit is contained in:
Patrik Nordwall 2014-04-29 15:16:05 +02:00
parent 042ecd00b6
commit 76647b34bc
5 changed files with 297 additions and 1 deletions

View file

@ -12,6 +12,9 @@ import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.stream.scaladsl.Transformer
import akka.stream.scaladsl.RecoveryTransformer
import scala.util.Try
import scala.concurrent.Future
import scala.util.Success
import scala.util.Failure
/**
* INTERNAL API
@ -49,6 +52,17 @@ private[akka] object Ast {
def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] =
new ActorProducer(context.actorOf(ActorProducer.props(settings, f)))
}
case class FutureProducerNode[I](future: Future[I]) extends ProducerNode[I] {
def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] =
future.value match {
case Some(Success(element))
new ActorProducer[I](context.actorOf(IterableProducer.props(List(element), settings)))
case Some(Failure(t))
new ErrorProducer(t).asInstanceOf[Producer[I]]
case None
new ActorProducer[I](context.actorOf(FutureProducer.props(future, settings)))
}
}
}
/**

View file

@ -20,4 +20,18 @@ private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Noth
def produceTo(consumer: Consumer[Nothing]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
}
}
/**
* INTERNAL API
*/
private[akka] class ErrorProducer(t: Throwable) extends Producer[Nothing] with Publisher[Nothing] {
def getPublisher: Publisher[Nothing] = this
def subscribe(subscriber: Subscriber[Nothing]): Unit =
subscriber.onError(t)
def produceTo(consumer: Consumer[Nothing]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
}

View file

@ -0,0 +1,135 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import org.reactivestreams.spi.Subscriber
import org.reactivestreams.spi.Subscription
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Status
import akka.actor.SupervisorStrategy
import akka.pattern.pipe
import akka.stream.MaterializerSettings
/**
* INTERNAL API
*/
private[akka] object FutureProducer {
def props(future: Future[Any], settings: MaterializerSettings): Props =
Props(new FutureProducer(future, settings))
object FutureSubscription {
case class Cancel(subscription: FutureSubscription)
case class RequestMore(subscription: FutureSubscription)
}
class FutureSubscription(ref: ActorRef) extends Subscription {
import FutureSubscription._
def cancel(): Unit = ref ! Cancel(this)
def requestMore(elements: Int): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else ref ! RequestMore(this)
override def toString = "FutureSubscription"
}
}
/**
* INTERNAL API
*/
private[akka] class FutureProducer(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown {
import FutureProducer.FutureSubscription
import FutureProducer.FutureSubscription.Cancel
import FutureProducer.FutureSubscription.RequestMore
var exposedPublisher: ActorPublisher[Any] = _
var subscribers = Map.empty[Subscriber[Any], FutureSubscription]
var subscriptions = Map.empty[FutureSubscription, Subscriber[Any]]
var subscriptionsReadyForPush = Set.empty[FutureSubscription]
var futureValue: Option[Try[Any]] = future.value
var shutdownReason = ActorPublisher.NormalShutdownReason
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
context.setReceiveTimeout(settings.downstreamSubscriptionTimeout)
context.become(waitingForFirstSubscriber)
case _ throw new IllegalStateException("The first message must be ExposedPublisher")
}
def waitingForFirstSubscriber: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
context.setReceiveTimeout(Duration.Undefined)
import context.dispatcher
future.pipeTo(self)
context.become(active)
}
def active: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
case RequestMore(subscription)
if (subscriptions.contains(subscription)) {
subscriptionsReadyForPush += subscription
push(subscriptions(subscription))
}
case Cancel(subscription) if subscriptions.contains(subscription)
removeSubscriber(subscriptions(subscription))
case Status.Failure(ex)
futureValue = Some(Failure(ex))
pushToAll()
case value
futureValue = Some(Success(value))
pushToAll()
}
def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription push(subscriptions(subscription)) }
def push(subscriber: Subscriber[Any]): Unit = futureValue match {
case Some(Success(value))
subscriber.onNext(value)
subscriber.onComplete()
removeSubscriber(subscriber)
case Some(Failure(t))
subscriber.onError(t)
removeSubscriber(subscriber)
case None // not completed yet
}
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (subscribers.contains(subscriber))
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice"))
else {
val subscription = new FutureSubscription(self)
subscribers = subscribers.updated(subscriber, subscription)
subscriptions = subscriptions.updated(subscription, subscriber)
subscriber.onSubscribe(subscription)
}
}
def removeSubscriber(subscriber: Subscriber[Any]): Unit = {
val subscription = subscribers(subscriber)
subscriptions -= subscription
subscriptionsReadyForPush -= subscription
subscribers -= subscriber
if (subscribers.isEmpty) {
exposedPublisher.shutdown(shutdownReason)
softShutdown()
}
}
override def postStop(): Unit =
if (exposedPublisher ne null)
exposedPublisher.shutdown(shutdownReason)
}

View file

@ -13,6 +13,7 @@ import org.reactivestreams.api.Producer
import akka.stream.FlowMaterializer
import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode }
import akka.stream.impl.FlowImpl
import akka.stream.impl.Ast.FutureProducerNode
object Flow {
/**
@ -48,6 +49,14 @@ object Flow {
*/
def apply[T](f: () T): Flow[T] = FlowImpl(ThunkProducerNode(f), Nil)
/**
* Start a new flow from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
* may happen before or after materializing the `Flow`.
* The stream terminates with an error if the `Future` is completed with a failure.
*/
def apply[T](future: Future[T]): Flow[T] = FlowImpl(FutureProducerNode(future), Nil)
}
/**

View file

@ -0,0 +1,124 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.OnNext
import akka.dispatch.OnComplete
import akka.stream.testkit.OnComplete
import akka.stream.testkit.OnError
import akka.stream.testkit.OnSubscribe
import akka.stream.scaladsl.Flow
import scala.concurrent.Future
import scala.concurrent.Promise
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowFromFutureSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings())
"A Flow based on a Future" must {
"produce one element from already successful Future" in {
val p = Flow(Future.successful(1)).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
c.expectNoMsg(100.millis)
sub.requestMore(1)
c.expectNext(1)
c.expectComplete()
}
"produce error from already failed Future" in {
val ex = new RuntimeException("test")
val p = Flow(Future.failed[Int](ex)).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
c.expectError(ex)
}
"produce one element when Future is completed" in {
val promise = Promise[Int]()
val p = Flow(promise.future).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(1)
c.expectNoMsg(100.millis)
promise.success(1)
c.expectNext(1)
c.expectComplete()
c.expectNoMsg(100.millis)
}
"produce one element when Future is completed but not before request" in {
val promise = Promise[Int]()
val p = Flow(promise.future).toProducer(materializer)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
promise.success(1)
c.expectNoMsg(200.millis)
sub.requestMore(1)
c.expectNext(1)
c.expectComplete()
}
"produce elements with multiple subscribers" in {
val promise = Promise[Int]()
val p = Flow(promise.future).toProducer(materializer)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
p.produceTo(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.requestMore(1)
promise.success(1)
sub2.requestMore(2)
c1.expectNext(1)
c2.expectNext(1)
c1.expectComplete()
c2.expectComplete()
}
"produce elements to later subscriber" in {
val promise = Promise[Int]()
val p = Flow(promise.future).toProducer(materializer)
val keepAlive = StreamTestKit.consumerProbe[Int]
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(keepAlive)
p.produceTo(c1)
val sub1 = c1.expectSubscription()
sub1.requestMore(1)
promise.success(1)
c1.expectNext(1)
c1.expectComplete()
p.produceTo(c2)
val sub2 = c2.expectSubscription()
sub2.requestMore(1)
c2.expectNext(1)
c2.expectComplete()
}
"allow cancel before receiving element" in {
val promise = Promise[Int]()
val p = Flow(promise.future).toProducer(materializer)
val keepAlive = StreamTestKit.consumerProbe[Int]
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(keepAlive)
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(1)
sub.cancel()
c.expectNoMsg(500.millis)
promise.success(1)
c.expectNoMsg(200.millis)
}
}
}