From 79b6de1558be0064c3f81169a15df5b5a797bc52 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 6 Jun 2015 14:13:26 +0200 Subject: [PATCH 1/3] +str - Deletes FuturePublisher and replaces it with Source.single(f).mapAsync(1)(identity) --- .../stream/scaladsl/FlowFromFutureSpec.scala | 12 +- .../akka/stream/OperationAttributes.scala | 6 +- .../akka/stream/impl/FuturePublisher.scala | 147 ------------------ .../main/scala/akka/stream/impl/Modules.scala | 25 --- .../scala/akka/stream/scaladsl/Source.scala | 2 +- 5 files changed, 8 insertions(+), 184 deletions(-) delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala index 935d76bc44..c80a648861 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFromFutureSpec.scala @@ -20,9 +20,8 @@ class FlowFromFutureSpec extends AkkaSpec { "A Flow based on a Future" must { "produce one element from already successful Future" in assertAllStagesStopped { - val p = Source(Future.successful(1)).runWith(Sink.publisher) val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) + val p = Source(Future.successful(1)).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) val sub = c.expectSubscription() c.expectNoMsg(100.millis) sub.request(1) @@ -32,17 +31,15 @@ class FlowFromFutureSpec extends AkkaSpec { "produce error from already failed Future" in assertAllStagesStopped { val ex = new RuntimeException("test") with NoStackTrace - val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher) val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) + Source(Future.failed[Int](ex)).runWith(Sink.publisher).subscribe(c) c.expectSubscriptionAndError(ex) } "produce one element when Future is completed" in assertAllStagesStopped { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) + Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) val sub = c.expectSubscription() sub.request(1) c.expectNoMsg(100.millis) @@ -54,9 +51,8 @@ class FlowFromFutureSpec extends AkkaSpec { "produce one element when Future is completed but not before request" in { val promise = Promise[Int]() - val p = Source(promise.future).runWith(Sink.publisher) val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) + Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c) val sub = c.expectSubscription() promise.success(1) c.expectNoMsg(200.millis) diff --git a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala index e86aa034ba..dafbf1226a 100644 --- a/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala +++ b/akka-stream/src/main/scala/akka/stream/OperationAttributes.scala @@ -37,7 +37,7 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio val result = new java.util.ArrayList[T] attributes.foreach { a ⇒ if (c.isInstance(a)) - result.add(a.asInstanceOf[T]) + result.add(c.cast(a)) } result } @@ -47,8 +47,8 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio * If no such attribute exists the `default` value is returned. */ def getAttribute[T <: Attribute](c: Class[T], default: T): T = - attributes.find(a ⇒ c.isInstance(a)) match { - case Some(a) ⇒ a.asInstanceOf[T] + attributes.find(c.isInstance) match { + case Some(a) ⇒ c.cast(a) case None ⇒ default } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala deleted file mode 100644 index ce5ed913d9..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.concurrent.Future -import scala.util.Failure -import scala.util.Success -import scala.util.Try -import akka.actor._ -import akka.stream.ActorFlowMaterializerSettings -import akka.pattern.pipe -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription -import scala.util.control.NonFatal - -/** - * INTERNAL API - */ -private[akka] object FuturePublisher { - def props(future: Future[Any], settings: ActorFlowMaterializerSettings): Props = - Props(new FuturePublisher(future, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local) - - object FutureSubscription { - final case class Cancel(subscription: FutureSubscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded - final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded - } - - case class FutureValue(value: Any) extends NoSerializationVerificationNeeded - - class FutureSubscription(ref: ActorRef) extends Subscription { - import akka.stream.impl.FuturePublisher.FutureSubscription._ - def cancel(): Unit = ref ! Cancel(this) - def request(elements: Long): Unit = ref ! RequestMore(this, elements) - override def toString = "FutureSubscription" - } -} - -/** - * INTERNAL API - */ -// FIXME why do we need to have an actor to drive a Future? -private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor { - import akka.stream.impl.FuturePublisher._ - import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel - import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore - import ReactiveStreamsCompliance._ - - var exposedPublisher: ActorPublisher[Any] = _ - var subscribers = Map.empty[Subscriber[Any], FutureSubscription] - var subscriptions = Map.empty[FutureSubscription, Subscriber[Any]] - var subscriptionsReadyForPush = Set.empty[FutureSubscription] - var futureValue: Option[Try[Any]] = future.value - var shutdownReason: Option[Throwable] = ActorPublisher.SomeNormalShutdownReason - - override val supervisorStrategy = SupervisorStrategy.stoppingStrategy - - def receive = { - case ExposedPublisher(publisher) ⇒ - exposedPublisher = publisher - context.become(waitingForFirstSubscriber) - case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") - } - - def waitingForFirstSubscriber: Receive = { - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - import context.dispatcher - future.map(FutureValue) pipeTo (self) - context.become(active) - } - - def active: Receive = { - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - case RequestMore(subscription, elements) ⇒ // FIXME we aren't tracking demand per subscription so we don't check for overflow. We should. - if (subscriptions.contains(subscription)) { - if (elements < 1) { - val subscriber = subscriptions(subscription) - rejectDueToNonPositiveDemand(subscriber) - removeSubscriber(subscriber) - } else { - subscriptionsReadyForPush += subscription - push(subscriptions(subscription)) - } - } - case Cancel(subscription) if subscriptions.contains(subscription) ⇒ - removeSubscriber(subscriptions(subscription)) - case Status.Failure(ex) ⇒ - if (futureValue.isEmpty) { - futureValue = Some(Failure(ex)) - pushToAll() - } - case FutureValue(value) ⇒ - if (futureValue.isEmpty) { - futureValue = Some(Success(value)) - pushToAll() - } - } - - def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription ⇒ push(subscriptions(subscription)) } - - def push(subscriber: Subscriber[Any]): Unit = - futureValue match { - case Some(someValue) ⇒ try someValue match { - case Success(value) ⇒ - tryOnNext(subscriber, value) - tryOnComplete(subscriber) - case Failure(t) ⇒ - shutdownReason = Some(t) - tryOnError(subscriber, t) - } catch { - case _: SpecViolation ⇒ // continue - } finally { - removeSubscriber(subscriber) - } - case None ⇒ // not completed yet - } - - def registerSubscriber(subscriber: Subscriber[Any]): Unit = { - if (subscribers.contains(subscriber)) - rejectDuplicateSubscriber(subscriber) - else { - val subscription = new FutureSubscription(self) - subscribers = subscribers.updated(subscriber, subscription) - subscriptions = subscriptions.updated(subscription, subscriber) - tryOnSubscribe(subscriber, subscription) - } - } - - def removeSubscriber(subscriber: Subscriber[Any]): Unit = { - val subscription = subscribers(subscriber) - subscriptions -= subscription - subscriptionsReadyForPush -= subscription - subscribers -= subscriber - if (subscribers.isEmpty) { - exposedPublisher.shutdown(shutdownReason) - context.stop(self) - } - } - - override def postStop(): Unit = - if (exposedPublisher ne null) - exposedPublisher.shutdown(shutdownReason) - -} - diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index b7abbfb3eb..cd842ef811 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -87,31 +87,6 @@ private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, amendShape(attr)) } -/** - * INTERNAL API - * Start a new `Source` from the given `Future`. The stream will consist of - * one element when the `Future` is completed with a successful value, which - * may happen before or after materializing the `Flow`. - * The stream terminates with an error if the `Future` is completed with a failure. - */ -private[akka] final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) { - override def create(context: MaterializationContext) = - future.value match { - case Some(Success(element)) ⇒ - (SynchronousIterablePublisher(List(element), context.stageName), ()) // Option is not Iterable. sigh - case Some(Failure(t)) ⇒ - (ErrorPublisher(t, context.stageName).asInstanceOf[Publisher[Out]], ()) - case None ⇒ - val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer) - val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes) - (ActorPublisher[Out](actorMaterializer.actorOf(context, - FuturePublisher.props(future, effectiveSettings))), ()) // FIXME this does not need to be an actor - } - - override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new FutureSource(future, attributes, shape) - override def withAttributes(attr: OperationAttributes): Module = new FutureSource(future, attr, amendShape(attr)) -} - /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 99551c371a..689b8ed1fe 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -225,7 +225,7 @@ object Source extends SourceApply { * The stream terminates with a failure if the `Future` is completed with a failure. */ def apply[T](future: Future[T]): Source[T, Unit] = - new Source(new FutureSource(future, DefaultAttributes.futureSource, shape("FutureSource"))) + Source.single(future).mapAsync(1)(identity).withAttributes(DefaultAttributes.futureSource) /** * Elements are emitted periodically with the specified interval. From b49746b0da49a2095fad9225229ccee8120b9f41 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 6 Jun 2015 16:07:35 +0200 Subject: [PATCH 2/3] =str - Introduces a TCK-verified SingleElementPublisher to optimize Source.single and things depending on it --- .../tck/SingleElementPublisherTest.scala | 20 +++++++++++ .../stream/impl/CompletedPublishers.scala | 33 +++++++++++++++++-- .../scala/akka/stream/scaladsl/Source.scala | 14 +++----- 3 files changed, 56 insertions(+), 11 deletions(-) create mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala new file mode 100644 index 0000000000..c1dc1de756 --- /dev/null +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/SingleElementPublisherTest.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.impl.SingleElementPublisher + +import scala.collection.immutable +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams._ + +class SingleElementPublisherTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher) + } + + override def maxElementsFromPublisher(): Long = 1 +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index fbd22a79d1..87ab5d6f59 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -3,8 +3,7 @@ */ package akka.stream.impl -import org.reactivestreams.{ Subscriber, Publisher } -import org.reactivestreams.Subscription +import org.reactivestreams.{ Subscriber, Publisher, Subscription } /** * INTERNAL API @@ -40,6 +39,36 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend override def toString: String = name } +private[akka] final case class SingleElementPublisher[T](value: T, name: String) extends Publisher[T] { + import ReactiveStreamsCompliance._ + + private[this] class SingleElementSubscription(subscriber: Subscriber[_ >: T]) extends Subscription { + private[this] var done: Boolean = false + override def cancel(): Unit = done = true + + override def request(elements: Long): Unit = if (!done) { + if (elements < 1) rejectDueToNonPositiveDemand(subscriber) + done = true + try { + tryOnNext(subscriber, value) + tryOnComplete(subscriber) + } catch { + case _: SpecViolation ⇒ // TODO log? + } + } + } + + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = + try { + requireNonNullSubscriber(subscriber) + tryOnSubscribe(subscriber, new SingleElementSubscription(subscriber)) + } catch { + case _: SpecViolation ⇒ // nothing we can do + } + def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] + override def toString: String = name +} + /** * INTERNAL API * This is only a legal subscription when it is immediately followed by diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 99551c371a..424557b665 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -192,11 +192,8 @@ object Source extends SourceApply { * Elements are pulled out of the iterator in accordance with the demand coming * from the downstream transformation steps. */ - def apply[T](f: () ⇒ Iterator[T]): Source[T, Unit] = { - apply(new immutable.Iterable[T] { - override def iterator: Iterator[T] = f() - }) - } + def apply[T](f: () ⇒ Iterator[T]): Source[T, Unit] = + apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() }) /** * A graph with the shape of a source logically is a source, this method makes @@ -214,9 +211,8 @@ object Source extends SourceApply { * stream will see an individual flow of elements (always starting from the * beginning) regardless of when they subscribed. */ - def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = { - Source.single(()).mapConcat((_: Unit) ⇒ iterable).withAttributes(DefaultAttributes.iterableSource) - } + def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = + Source.single(iterable).mapConcat(identity).withAttributes(DefaultAttributes.iterableSource) /** * Start a new `Source` from the given `Future`. The stream will consist of @@ -242,7 +238,7 @@ object Source extends SourceApply { * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ def single[T](element: T): Source[T, Unit] = - apply(SynchronousIterablePublisher(List(element), "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize + apply(SingleElementPublisher(element, "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize /** * Create a `Source` that will continually emit the given element. From e8936964b54a89e3b08a4f4cb5b3cac94f4cafb4 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 6 Jun 2015 16:45:01 +0200 Subject: [PATCH 3/3] =str - Deleted SynchronousIterablePublisher as it is replaced by a fast SingleElementPublisher + mapConcat --- .../tck/SyncIterablePublisherTest.scala | 25 ---- .../stream/scaladsl/FlowIteratorSpec.scala | 74 ----------- .../impl/SynchronousIterablePublisher.scala | 122 ------------------ .../scala/akka/stream/scaladsl/Source.scala | 11 +- 4 files changed, 7 insertions(+), 225 deletions(-) delete mode 100644 akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala deleted file mode 100644 index 8d4de79b07..0000000000 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SyncIterablePublisherTest.scala +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.tck - -import akka.stream.impl.SynchronousIterablePublisher - -import scala.collection.immutable -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import org.reactivestreams._ - -class SyncIterablePublisherTest extends AkkaPublisherVerification[Int] { - - def createPublisher(elements: Long): Publisher[Int] = { - val iterable: immutable.Iterable[Int] = - if (elements >= 10000) - 0 until 10000 // this publisher is not intended to be used for large collections - else - 0 until elements.toInt - - Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher) - } - -} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index c5cad49b46..839975c96e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -9,7 +9,6 @@ import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit._ import akka.stream.testkit.Utils._ -import akka.stream.impl.SynchronousIterablePublisher import org.reactivestreams.Subscription import akka.testkit.TestProbe import org.reactivestreams.Subscriber @@ -71,79 +70,6 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec { } } -class SynchronousIterableSpec extends AbstractFlowIteratorSpec { - override def testName = "A Flow based on small collection" - override def createSource(elements: Int): Source[Int, Unit] = - Source(SynchronousIterablePublisher(1 to elements, "range")) - - "not produce after cancel from onNext" in { - val p = SynchronousIterablePublisher(1 to 5, "range") - val probe = TestProbe() - p.subscribe(new Subscriber[Int] { - var sub: Subscription = _ - override def onError(cause: Throwable): Unit = probe.ref ! cause - override def onComplete(): Unit = probe.ref ! "complete" - override def onNext(element: Int): Unit = { - probe.ref ! element - if (element == 3) sub.cancel() - } - override def onSubscribe(subscription: Subscription): Unit = { - sub = subscription - sub.request(10) - } - }) - - probe.expectMsg(1) - probe.expectMsg(2) - probe.expectMsg(3) - probe.expectNoMsg(500.millis) - } - - "produce onError when iterator throws" in { - val iterable = new immutable.Iterable[Int] { - override def iterator: Iterator[Int] = - (1 to 3).iterator.map(x ⇒ if (x == 2) throw new IllegalStateException("not two") else x) - } - val p = SynchronousIterablePublisher(iterable, "iterable") - val c = TestSubscriber.manualProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(1) - c.expectNext(1) - c.expectNoMsg(100.millis) - sub.request(2) - c.expectError.getMessage should be("not two") - sub.request(2) - c.expectNoMsg(100.millis) - } - - "handle reentrant requests" in { - val N = 50000 - val p = SynchronousIterablePublisher(1 to N, "range") - val probe = TestProbe() - p.subscribe(new Subscriber[Int] { - var sub: Subscription = _ - override def onError(cause: Throwable): Unit = probe.ref ! cause - override def onComplete(): Unit = probe.ref ! "complete" - override def onNext(element: Int): Unit = { - probe.ref ! element - sub.request(1) - - } - override def onSubscribe(subscription: Subscription): Unit = { - sub = subscription - sub.request(1) - } - }) - probe.receiveN(N) should be((1 to N).toVector) - probe.expectMsg("complete") - } - - "have a toString that doesn't OOME" in { - SynchronousIterablePublisher(1 to 3, "range").toString should be("range") - } -} - abstract class AbstractFlowIteratorSpec extends AkkaSpec { val settings = ActorFlowMaterializerSettings(system) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala deleted file mode 100644 index 65ff9feb5b..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousIterablePublisher.scala +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.dispatch.ExecutionContexts - -import org.reactivestreams.{ Publisher, Subscriber, Subscription } - -import scala.annotation.tailrec -import scala.collection.immutable -import scala.util.control.NonFatal -import akka.stream.impl.ReactiveStreamsCompliance._ - -/** - * INTERNAL API - */ -private[akka] object SynchronousIterablePublisher { - def apply[T](iterable: immutable.Iterable[T], name: String): Publisher[T] = - new SynchronousIterablePublisher(iterable, name) - - object IteratorSubscription { - def apply[T](subscriber: Subscriber[T], iterator: Iterator[T]): Unit = - new IteratorSubscription[T](subscriber, iterator).init() - } - - private[this] final class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription { - var done = false - var pendingDemand = 0L - var pushing = false - - import ReactiveStreamsCompliance._ - - def init(): Unit = try { - if (!iterator.hasNext) { // Let's be prudent and issue onComplete immediately - cancel() - tryOnSubscribe(subscriber, this) - tryOnComplete(subscriber) - } else { - tryOnSubscribe(subscriber, this) - } - } catch { - case sv: SpecViolation ⇒ - cancel() - throw sv // I think it is prudent to "escalate" the spec violation - case NonFatal(e) ⇒ - cancel() - tryOnError(subscriber, e) - } - - override def cancel(): Unit = done = true - - override def request(elements: Long): Unit = { - if (done) () // According to Reactive Streams Spec 3.6, `request` on a cancelled `Subscription` must be a NoOp - else if (elements < 1) { // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError - cancel() - rejectDueToNonPositiveDemand(subscriber) - } else { - pendingDemand += elements - if (pendingDemand < 1) - pendingDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded - if (!pushing) { - // According to Reactive Streams Spec 3:3, we must prevent unbounded recursion - try { - pushing = true - pendingDemand = elements - - @tailrec def pushNext(): Unit = - if (done) () - else if (iterator.isEmpty) { - cancel() - tryOnComplete(subscriber) - } else if (pendingDemand > 0) { - pendingDemand -= 1 - tryOnNext(subscriber, iterator.next()) - pushNext() - } - - pushNext() - } catch { - case sv: SpecViolation ⇒ - cancel() - throw sv // I think it is prudent to "escalate" the spec violation - case NonFatal(e) ⇒ - cancel() - tryOnError(subscriber, e) - } finally { - pushing = false - } - } - } - } - } -} - -/** - * INTERNAL API - * Publisher that will push all requested elements from the iterator of the iterable - * to the subscriber in the calling thread of `requestMore`. - * - * It is only intended to be used with iterators over static collections. - * Do *NOT* use it for iterators on lazy collections or other implementations that do more - * than merely retrieve an element in their `next()` method! - * - * It is the responsibility of the subscriber to provide necessary memory visibility - * if calls to `requestMore` and `cancel` are performed from different threads. - * For example, usage from an actor is fine. Concurrent calls to the subscription is not allowed. - * Reentrant calls to `requestMore` directly from `onNext` are supported by this publisher. - */ -private[akka] final class SynchronousIterablePublisher[T]( - private val iterable: immutable.Iterable[T], - private val name: String) extends Publisher[T] { - - import SynchronousIterablePublisher.IteratorSubscription - - override def subscribe(subscriber: Subscriber[_ >: T]): Unit = { - requireNonNullSubscriber(subscriber) - IteratorSubscription(subscriber, try iterable.iterator catch { case NonFatal(t) ⇒ Iterator.continually(throw t) }) - } - - override def toString: String = name -} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 0e12afe3ca..3eb1dc68c2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -7,7 +7,7 @@ import akka.actor.{ ActorRef, Cancellable, Props } import akka.stream._ import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousIterablePublisher, _ } +import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ } import akka.stream.stage.{ Context, PushPullStage, SyncDirective, TerminationDirective } import org.reactivestreams.{ Publisher, Subscriber } @@ -17,7 +17,7 @@ import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullSta import scala.annotation.unchecked.uncheckedVariance import scala.language.higherKinds import akka.actor.Props -import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousIterablePublisher } +import akka.stream.impl.{ EmptyPublisher, ErrorPublisher } import org.reactivestreams.Publisher import scala.collection.immutable import scala.concurrent.duration.FiniteDuration @@ -165,6 +165,9 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) object Source extends SourceApply { + private[this] final val _id: Any ⇒ Any = x ⇒ x + private[this] final def id[A]: A ⇒ A = _id.asInstanceOf[A ⇒ A] + private[stream] def apply[Out, Mat](module: SourceModule[Out, Mat]): Source[Out, Mat] = new Source(module) @@ -212,7 +215,7 @@ object Source extends SourceApply { * beginning) regardless of when they subscribed. */ def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = - Source.single(iterable).mapConcat(identity).withAttributes(DefaultAttributes.iterableSource) + Source.single(iterable).mapConcat(id).withAttributes(DefaultAttributes.iterableSource) /** * Start a new `Source` from the given `Future`. The stream will consist of @@ -221,7 +224,7 @@ object Source extends SourceApply { * The stream terminates with a failure if the `Future` is completed with a failure. */ def apply[T](future: Future[T]): Source[T, Unit] = - Source.single(future).mapAsync(1)(identity).withAttributes(DefaultAttributes.futureSource) + Source.single(future).mapAsyncUnordered(1)(id).withAttributes(DefaultAttributes.futureSource) /** * Elements are emitted periodically with the specified interval.