diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 935d76bc44..c80a648861 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -20,9 +20,8 @@ class FlowFromFutureSpec extends AkkaSpec { "A Flow based on a Future" must { "produce one element from already successful Future" in assertAllStagesStopped { - val p = Source(Future.successful(1)).runWith(Sink.publisher) val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) + val p = Source(Future.successful(1)).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) val sub = c.expectSubscription() c.expectNoMsg(100.millis) sub.request(1) @@ -32,17 +31,15 @@ class FlowFromFutureSpec extends AkkaSpec { "produce error from already failed Future" in assertAllStagesStopped { val ex = new RuntimeException("test") with NoStackTrace - val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher) val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) + Source(Future.failed[Int](ex)).runWith(Sink.publisher).subscribe(c) c.expectSubscriptionAndError(ex) } "produce one element when Future is completed" in assertAllStagesStopped { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) + Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) val sub = c.expectSubscription() sub.request(1) c.expectNoMsg(100.millis) @@ -54,9 +51,8 @@ class FlowFromFutureSpec extends AkkaSpec { "produce one element when Future is completed but not before request" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) + Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) val sub = c.expectSubscription() promise.success(1) c.expectNoMsg(200.millis) diff --git a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala index e86aa034ba..dafbf1226a 100644 --- a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala @@ -37,7 +37,7 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio val result = new java.util.ArrayList[T] attributes.foreach { a ⇒ if (c.isInstance(a)) - result.add(a.asInstanceOf[T]) + result.add(c.cast(a)) } result } @@ -47,8 +47,8 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio * If no such attribute exists the `default` value is returned. */ def getAttribute[T <: Attribute](c: Class[T], default: T): T = - attributes.find(a ⇒ c.isInstance(a)) match { - case Some(a) ⇒ a.asInstanceOf[T] + attributes.find(c.isInstance) match { + case Some(a) ⇒ c.cast(a) case None ⇒ default } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala deleted file mode 100644 index ce5ed913d9..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.concurrent.Future -import scala.util.Failure -import scala.util.Success -import scala.util.Try -import akka.actor._ -import akka.stream.ActorFlowMaterializerSettings -import akka.pattern.pipe -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription -import scala.util.control.NonFatal - -/** - * INTERNAL API - */ -private[akka] object FuturePublisher { - def props(future: Future[Any], settings: ActorFlowMaterializerSettings): Props = - Props(new FuturePublisher(future, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local) - - object FutureSubscription { - final case class Cancel(subscription: FutureSubscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded - final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded - } - - case class FutureValue(value: Any) extends NoSerializationVerificationNeeded - - class FutureSubscription(ref: ActorRef) extends Subscription { - import akka.stream.impl.FuturePublisher.FutureSubscription._ - def cancel(): Unit = ref ! Cancel(this) - def request(elements: Long): Unit = ref ! RequestMore(this, elements) - override def toString = "FutureSubscription" - } -} - -/** - * INTERNAL API - */ -// FIXME why do we need to have an actor to drive a Future? -private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor { - import akka.stream.impl.FuturePublisher._ - import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel - import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore - import ReactiveStreamsCompliance._ - - var exposedPublisher: ActorPublisher[Any] = _ - var subscribers = Map.empty[Subscriber[Any], FutureSubscription] - var subscriptions = Map.empty[FutureSubscription, Subscriber[Any]] - var subscriptionsReadyForPush = Set.empty[FutureSubscription] - var futureValue: Option[Try[Any]] = future.value - var shutdownReason: Option[Throwable] = ActorPublisher.SomeNormalShutdownReason - - override val supervisorStrategy = SupervisorStrategy.stoppingStrategy - - def receive = { - case ExposedPublisher(publisher) ⇒ - exposedPublisher = publisher - context.become(waitingForFirstSubscriber) - case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") - } - - def waitingForFirstSubscriber: Receive = { - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - import context.dispatcher - future.map(FutureValue) pipeTo (self) - context.become(active) - } - - def active: Receive = { - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - case RequestMore(subscription, elements) ⇒ // FIXME we aren't tracking demand per subscription so we don't check for overflow. We should. - if (subscriptions.contains(subscription)) { - if (elements < 1) { - val subscriber = subscriptions(subscription) - rejectDueToNonPositiveDemand(subscriber) - removeSubscriber(subscriber) - } else { - subscriptionsReadyForPush += subscription - push(subscriptions(subscription)) - } - } - case Cancel(subscription) if subscriptions.contains(subscription) ⇒ - removeSubscriber(subscriptions(subscription)) - case Status.Failure(ex) ⇒ - if (futureValue.isEmpty) { - futureValue = Some(Failure(ex)) - pushToAll() - } - case FutureValue(value) ⇒ - if (futureValue.isEmpty) { - futureValue = Some(Success(value)) - pushToAll() - } - } - - def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription ⇒ push(subscriptions(subscription)) } - - def push(subscriber: Subscriber[Any]): Unit = - futureValue match { - case Some(someValue) ⇒ try someValue match { - case Success(value) ⇒ - tryOnNext(subscriber, value) - tryOnComplete(subscriber) - case Failure(t) ⇒ - shutdownReason = Some(t) - tryOnError(subscriber, t) - } catch { - case _: SpecViolation ⇒ // continue - } finally { - removeSubscriber(subscriber) - } - case None ⇒ // not completed yet - } - - def registerSubscriber(subscriber: Subscriber[Any]): Unit = { - if (subscribers.contains(subscriber)) - rejectDuplicateSubscriber(subscriber) - else { - val subscription = new FutureSubscription(self) - subscribers = subscribers.updated(subscriber, subscription) - subscriptions = subscriptions.updated(subscription, subscriber) - tryOnSubscribe(subscriber, subscription) - } - } - - def removeSubscriber(subscriber: Subscriber[Any]): Unit = { - val subscription = subscribers(subscriber) - subscriptions -= subscription - subscriptionsReadyForPush -= subscription - subscribers -= subscriber - if (subscribers.isEmpty) { - exposedPublisher.shutdown(shutdownReason) - context.stop(self) - } - } - - override def postStop(): Unit = - if (exposedPublisher ne null) - exposedPublisher.shutdown(shutdownReason) - -} - diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index b7abbfb3eb..cd842ef811 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -87,31 +87,6 @@ private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, amendShape(attr)) } -/** - * INTERNAL API - * Start a new `Source` from the given `Future`. The stream will consist of - * one element when the `Future` is completed with a successful value, which - * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. - */ -private[akka] final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { - override def create(context: MaterializationContext) = - future.value match { - case Some(Success(element)) ⇒ - (SynchronousIterablePublisher(List(element), context.stageName), ()) // Option is not Iterable. sigh - case Some(Failure(t)) ⇒ - (ErrorPublisher(t, context.stageName).asInstanceOf[Publisher[Out]], ()) - case None ⇒ - val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer) - val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes) - (ActorPublisher[Out](actorMaterializer.actorOf(context, - FuturePublisher.props(future, effectiveSettings))), ()) // FIXME this does not need to be an actor - } - - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new FutureSource(future, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new FutureSource(future, attr, amendShape(attr)) -} - /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 424557b665..0e12afe3ca 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -221,7 +221,7 @@ object Source extends SourceApply { * The stream terminates with a failure if the `Future` is completed with a failure. */ def apply[T](future: Future[T]): Source[T, Unit] = - new Source(new FutureSource(future, DefaultAttributes.futureSource, shape("FutureSource"))) + Source.single(future).mapAsync(1)(identity).withAttributes(DefaultAttributes.futureSource) /** * Elements are emitted periodically with the specified interval.