From 76647b34bcb8ac3126acad247d8d1b5f8788f262 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 29 Apr 2014 15:16:05 +0200 Subject: [PATCH] +str #15071 Add Flow.apply from a Future --- .../impl/ActorBasedFlowMaterializer.scala | 14 ++ .../akka/stream/impl/EmptyProducer.scala | 16 ++- .../akka/stream/impl/FutureProducer.scala | 135 ++++++++++++++++++ .../scala/akka/stream/scaladsl/Flow.scala | 9 ++ .../akka/stream/FlowFromFutureSpec.scala | 124 ++++++++++++++++ 5 files changed, 297 insertions(+), 1 deletion(-) create mode 100644 akka-stream/src/main/scala/akka/stream/impl/FutureProducer.scala create mode 100644 akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 6b9ef7faa8..68eb2b85d0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -12,6 +12,9 @@ import akka.stream.{ MaterializerSettings, FlowMaterializer } import akka.stream.scaladsl.Transformer import akka.stream.scaladsl.RecoveryTransformer import scala.util.Try +import scala.concurrent.Future +import scala.util.Success +import scala.util.Failure /** * INTERNAL API @@ -49,6 +52,17 @@ private[akka] object Ast { def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = new ActorProducer(context.actorOf(ActorProducer.props(settings, f))) } + case class FutureProducerNode[I](future: Future[I]) extends ProducerNode[I] { + def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = + future.value match { + case Some(Success(element)) ⇒ + new ActorProducer[I](context.actorOf(IterableProducer.props(List(element), settings))) + case Some(Failure(t)) ⇒ + new ErrorProducer(t).asInstanceOf[Producer[I]] + case None ⇒ + new ActorProducer[I](context.actorOf(FutureProducer.props(future, settings))) + } + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala index 7fa7998c40..26b254393b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala @@ -20,4 +20,18 @@ private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Noth def produceTo(consumer: Consumer[Nothing]): Unit = getPublisher.subscribe(consumer.getSubscriber) -} \ No newline at end of file +} + +/** + * INTERNAL API + */ +private[akka] class ErrorProducer(t: Throwable) extends Producer[Nothing] with Publisher[Nothing] { + def getPublisher: Publisher[Nothing] = this + + def subscribe(subscriber: Subscriber[Nothing]): Unit = + subscriber.onError(t) + + def produceTo(consumer: Consumer[Nothing]): Unit = + getPublisher.subscribe(consumer.getSubscriber) + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FutureProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/FutureProducer.scala new file mode 100644 index 0000000000..6bbd7e904f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/FutureProducer.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import org.reactivestreams.spi.Subscriber +import org.reactivestreams.spi.Subscription +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.Status +import akka.actor.SupervisorStrategy +import akka.pattern.pipe +import akka.stream.MaterializerSettings + +/** + * INTERNAL API + */ +private[akka] object FutureProducer { + def props(future: Future[Any], settings: MaterializerSettings): Props = + Props(new FutureProducer(future, settings)) + + object FutureSubscription { + case class Cancel(subscription: FutureSubscription) + case class RequestMore(subscription: FutureSubscription) + } + + class FutureSubscription(ref: ActorRef) extends Subscription { + import FutureSubscription._ + def cancel(): Unit = ref ! Cancel(this) + def requestMore(elements: Int): Unit = + if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + else ref ! RequestMore(this) + override def toString = "FutureSubscription" + } +} + +/** + * INTERNAL API + */ +private[akka] class FutureProducer(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { + import FutureProducer.FutureSubscription + import FutureProducer.FutureSubscription.Cancel + import FutureProducer.FutureSubscription.RequestMore + + var exposedPublisher: ActorPublisher[Any] = _ + var subscribers = Map.empty[Subscriber[Any], FutureSubscription] + var subscriptions = Map.empty[FutureSubscription, Subscriber[Any]] + var subscriptionsReadyForPush = Set.empty[FutureSubscription] + var futureValue: Option[Try[Any]] = future.value + var shutdownReason = ActorPublisher.NormalShutdownReason + + override val supervisorStrategy = SupervisorStrategy.stoppingStrategy + + def receive = { + case ExposedPublisher(publisher) ⇒ + exposedPublisher = publisher + context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) + context.become(waitingForFirstSubscriber) + case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") + } + + def waitingForFirstSubscriber: Receive = { + case SubscribePending ⇒ + exposedPublisher.takePendingSubscribers() foreach registerSubscriber + context.setReceiveTimeout(Duration.Undefined) + import context.dispatcher + future.pipeTo(self) + context.become(active) + } + + def active: Receive = { + case SubscribePending ⇒ + exposedPublisher.takePendingSubscribers() foreach registerSubscriber + case RequestMore(subscription) ⇒ + if (subscriptions.contains(subscription)) { + subscriptionsReadyForPush += subscription + push(subscriptions(subscription)) + } + case Cancel(subscription) if subscriptions.contains(subscription) ⇒ + removeSubscriber(subscriptions(subscription)) + case Status.Failure(ex) ⇒ + futureValue = Some(Failure(ex)) + pushToAll() + case value ⇒ + futureValue = Some(Success(value)) + pushToAll() + } + + def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription ⇒ push(subscriptions(subscription)) } + + def push(subscriber: Subscriber[Any]): Unit = futureValue match { + case Some(Success(value)) ⇒ + subscriber.onNext(value) + subscriber.onComplete() + removeSubscriber(subscriber) + case Some(Failure(t)) ⇒ + subscriber.onError(t) + removeSubscriber(subscriber) + case None ⇒ // not completed yet + } + + def registerSubscriber(subscriber: Subscriber[Any]): Unit = { + if (subscribers.contains(subscriber)) + subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) + else { + val subscription = new FutureSubscription(self) + subscribers = subscribers.updated(subscriber, subscription) + subscriptions = subscriptions.updated(subscription, subscriber) + subscriber.onSubscribe(subscription) + } + } + + def removeSubscriber(subscriber: Subscriber[Any]): Unit = { + val subscription = subscribers(subscriber) + subscriptions -= subscription + subscriptionsReadyForPush -= subscription + subscribers -= subscriber + if (subscribers.isEmpty) { + exposedPublisher.shutdown(shutdownReason) + softShutdown() + } + } + + override def postStop(): Unit = + if (exposedPublisher ne null) + exposedPublisher.shutdown(shutdownReason) + +} + diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index c2e4b8688c..aed66977fe 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -13,6 +13,7 @@ import org.reactivestreams.api.Producer import akka.stream.FlowMaterializer import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } import akka.stream.impl.FlowImpl +import akka.stream.impl.Ast.FutureProducerNode object Flow { /** @@ -48,6 +49,14 @@ object Flow { */ def apply[T](f: () ⇒ T): Flow[T] = FlowImpl(ThunkProducerNode(f), Nil) + /** + * Start a new flow from the given `Future`. The stream will consist of + * one element when the `Future` is completed with a successful value, which + * may happen before or after materializing the `Flow`. + * The stream terminates with an error if the `Future` is completed with a failure. + */ + def apply[T](future: Future[T]): Flow[T] = FlowImpl(FutureProducerNode(future), Nil) + } /** diff --git a/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala new file mode 100644 index 0000000000..aef054d824 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowFromFutureSpec.scala @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.OnNext +import akka.dispatch.OnComplete +import akka.stream.testkit.OnComplete +import akka.stream.testkit.OnError +import akka.stream.testkit.OnSubscribe +import akka.stream.scaladsl.Flow +import scala.concurrent.Future +import scala.concurrent.Promise + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class FlowFromFutureSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings()) + + "A Flow based on a Future" must { + "produce one element from already successful Future" in { + val p = Flow(Future.successful(1)).toProducer(materializer) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + c.expectNoMsg(100.millis) + sub.requestMore(1) + c.expectNext(1) + c.expectComplete() + } + + "produce error from already failed Future" in { + val ex = new RuntimeException("test") + val p = Flow(Future.failed[Int](ex)).toProducer(materializer) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + c.expectError(ex) + } + + "produce one element when Future is completed" in { + val promise = Promise[Int]() + val p = Flow(promise.future).toProducer(materializer) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(1) + c.expectNoMsg(100.millis) + promise.success(1) + c.expectNext(1) + c.expectComplete() + c.expectNoMsg(100.millis) + } + + "produce one element when Future is completed but not before request" in { + val promise = Promise[Int]() + val p = Flow(promise.future).toProducer(materializer) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + promise.success(1) + c.expectNoMsg(200.millis) + sub.requestMore(1) + c.expectNext(1) + c.expectComplete() + } + + "produce elements with multiple subscribers" in { + val promise = Promise[Int]() + val p = Flow(promise.future).toProducer(materializer) + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c1) + p.produceTo(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.requestMore(1) + promise.success(1) + sub2.requestMore(2) + c1.expectNext(1) + c2.expectNext(1) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val promise = Promise[Int]() + val p = Flow(promise.future).toProducer(materializer) + val keepAlive = StreamTestKit.consumerProbe[Int] + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(keepAlive) + p.produceTo(c1) + + val sub1 = c1.expectSubscription() + sub1.requestMore(1) + promise.success(1) + c1.expectNext(1) + c1.expectComplete() + p.produceTo(c2) + val sub2 = c2.expectSubscription() + sub2.requestMore(1) + c2.expectNext(1) + c2.expectComplete() + } + + "allow cancel before receiving element" in { + val promise = Promise[Int]() + val p = Flow(promise.future).toProducer(materializer) + val keepAlive = StreamTestKit.consumerProbe[Int] + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(keepAlive) + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(1) + sub.cancel() + c.expectNoMsg(500.millis) + promise.success(1) + c.expectNoMsg(200.millis) + } + } +} \ No newline at end of file