diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index 6c956b7070..de20d38669 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -72,7 +72,7 @@ class FlowExpandSpec extends AkkaSpec { .expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)) .fold(Set.empty[Int])(_ + _) - Await.result(future, 10.seconds) should be(Set.empty[Int] ++ (1 to 100)) + Await.result(future, 10.seconds) should contain theSameElementsAs ((1 to 100).toSet) } "backpressure publisher when subscriber is slower" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index c821db9f3d..56888c0c7b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -37,9 +37,10 @@ class FlowIteratorSpec extends AkkaSpec { } "complete empty" in { - val p = Source(List.empty[Int].iterator).runWith(Sink.publisher) + val p = Source[Int](Iterator.empty).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) + c.expectSubscription() c.expectComplete() c.expectNoMsg(100.millis) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index ba856d1a96..049063b36e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -136,17 +136,14 @@ class GraphBalanceSpec extends AkkaSpec { "fairly balance between three outputs" in { val numElementsForSink = 10000 - val f1, f2, f3 = Sink.fold[Int, Int](0)(_ + _) + val outputs = Seq.fill(3)(Sink.fold[Int, Int](0)(_ + _)) val g = FlowGraph { implicit b ⇒ val balance = Balance[Int]("balance", waitForAllDownstreams = true) - Source(Stream.fill(10000 * 3)(1)) ~> balance ~> f1 - balance ~> f2 - balance ~> f3 + Source(Stream.fill(numElementsForSink * outputs.size)(1)) ~> balance + for { o ← outputs } balance ~> o }.run() - Seq(f1, f2, f3) map { sink ⇒ - Await.result(g.get(sink), 3.seconds) should be(numElementsForSink +- 1000) - } + for { o ← outputs } Await.result(g.get(o), 3.seconds) should be(numElementsForSink +- 1000) } "produce to second even though first cancels" in { diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala index 4ce7cadfdf..d78167863f 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentSource.scala @@ -12,7 +12,6 @@ import akka.stream.impl.Cancel import akka.stream.impl.ExposedPublisher import akka.stream.impl.RequestMore import akka.stream.impl.SoftShutdown -import akka.stream.impl.Stop import akka.stream.impl.SubscribePending import akka.stream.impl.SubscriberManagement import akka.stream.impl.ActorBasedFlowMaterializer @@ -112,11 +111,7 @@ private class PersistentSourceImpl(persistenceId: String, sourceSettings: Persis case Cancel(sub) ⇒ unregisterSubscription(sub.asInstanceOf[S]) case Response(ps) ⇒ - try { - ps.foreach(pushToDownstream) - } catch { - case Stop ⇒ - completeDownstream(); shutdownReason = None + try ps.foreach(pushToDownstream) catch { case NonFatal(e) ⇒ abortDownstream(e); shutdownReason = Some(e) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 03294555c3..97ff8ccfad 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -17,9 +17,7 @@ import org.reactivestreams.{ Publisher, Subscriber } * INTERNAL API */ private[akka] object SimpleCallbackPublisher { - def props[T](settings: MaterializerSettings, f: () ⇒ T): Props = - Props(new SimpleCallbackPublisherImpl(f, settings)).withDispatcher(settings.dispatcher) - + def props[T](f: () ⇒ T, settings: MaterializerSettings): Props = IteratorPublisher.props(Iterator.continually(f()), settings) } /** @@ -126,22 +124,23 @@ private[akka] trait SoftShutdown { this: Actor ⇒ /** * INTERNAL API */ -private[akka] object SimpleCallbackPublisherImpl { - case object Generate +private[akka] object IteratorPublisherImpl { + case object Flush } /** * INTERNAL API */ -private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: MaterializerSettings) +private[akka] class IteratorPublisherImpl[T](iterator: Iterator[T], settings: MaterializerSettings) extends Actor with ActorLogging with SubscriberManagement[T] with SoftShutdown { - import akka.stream.impl.SimpleCallbackPublisherImpl._ + import IteratorPublisherImpl.Flush type S = ActorSubscription[T] + private var demand = 0L var pub: ActorPublisher[T] = _ var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason @@ -155,36 +154,46 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi case SubscribePending ⇒ pub.takePendingSubscribers() foreach registerSubscriber context.become(active) + flush() } final def active: Receive = { case SubscribePending ⇒ pub.takePendingSubscribers() foreach registerSubscriber + flush() case RequestMore(sub, elements) ⇒ moreRequested(sub.asInstanceOf[S], elements) - generate() + flush() case Cancel(sub) ⇒ unregisterSubscription(sub.asInstanceOf[S]) - generate() - case Generate ⇒ - generate() + flush() + case Flush ⇒ + flush() } override def postStop(): Unit = if (pub ne null) pub.shutdown(shutdownReason) - private var demand = 0L - private def generate(): Unit = { - if (demand > 0) { - try { - demand -= 1 - pushToDownstream(f()) - if (demand > 0) self ! Generate - } catch { - case Stop ⇒ { completeDownstream(); shutdownReason = None } - case NonFatal(e) ⇒ { abortDownstream(e); shutdownReason = Some(e) } - } + private[this] def flush(): Unit = try { + val endOfStream = + if (iterator.hasNext) { + if (demand > 0) { + pushToDownstream(iterator.next()) + demand -= 1 + iterator.hasNext == false + } else false + } else true + + if (endOfStream) { + completeDownstream() + shutdownReason = None + } else if (demand > 0) { + self ! Flush } + } catch { + case NonFatal(e) ⇒ + abortDownstream(e) + shutdownReason = Some(e) } override def initialBufferSize = settings.initialFanOutBufferSize @@ -203,5 +212,4 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi pub.shutdown(shutdownReason) softShutdown() } - } diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala index 4ef9b572d6..d3ec1f32d9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala @@ -10,12 +10,7 @@ import akka.stream.MaterializerSettings * INTERNAL API */ private[akka] object IteratorPublisher { - def props(iterator: Iterator[Any], settings: MaterializerSettings): Props = { - def f(): Any = { - if (!iterator.hasNext) throw Stop - iterator.next() - } - SimpleCallbackPublisher.props(settings, f) - } + def props[T](iterator: Iterator[T], settings: MaterializerSettings): Props = + Props(new IteratorPublisherImpl(iterator, settings)).withDispatcher(settings.dispatcher) } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stop.scala b/akka-stream/src/main/scala/akka/stream/impl/Stop.scala deleted file mode 100644 index 55831f62fb..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/Stop.scala +++ /dev/null @@ -1,15 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.util.control.ControlThrowable - -/** - * INTERNAL API - * - * This exception must be thrown from a callback-based stream publisher to - * signal the end of stream (if the produced stream is not infinite). This is used for example in - * [[akka.stream.scaladsl.Flow#apply]] (the variant which takes a closure). - */ -private[akka] case object Stop extends ControlThrowable diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index b7711d7644..02cc7658e6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -3,20 +3,18 @@ */ package akka.stream.scaladsl -import akka.actor.ActorRef -import akka.actor.Props +import akka.actor.{ Props, ActorRef } import akka.stream.impl._ -import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.impl.Ast.AstNode import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import scala.annotation.unchecked.uncheckedVariance +import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration -import scala.util.Failure -import scala.util.Success +import scala.util.{ Success, Failure } sealed trait ActorFlowSource[+Out] extends Source[Out] { @@ -114,8 +112,7 @@ final case class IteratorSource[Out](iterator: Iterator[Out]) extends SimpleActo create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = - if (iterator.isEmpty) (EmptyPublisher[Out], ()) - else (ActorPublisher[Out](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings), + (ActorPublisher[Out](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings), name = s"$flowName-0-iterator")), ()) } @@ -135,21 +132,31 @@ final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends name = s"$flowName-0-iterable")), ()) } -/** - * Define the sequence of elements to be produced by the given closure. - * The stream ends normally when evaluation of the closure returns a `None`. - * The stream ends exceptionally when an exception is thrown from the closure. - */ -final case class ThunkSource[Out](f: () ⇒ Option[Out]) extends SimpleActorFlowSource[Out] { - override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = - create(materializer, flowName)._1.subscribe(flowSubscriber) - override def isActive: Boolean = true - override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = - (ActorPublisher[Out](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings, - () ⇒ f() match { - case Some(out) ⇒ out - case _ ⇒ throw Stop - }), name = s"$flowName-0-thunk")), ()) +final class ThunkIterator[Out](thunk: () ⇒ Option[Out]) extends Iterator[Out] { + require(thunk ne null, "thunk is not allowed to be null") + private[this] var value: Option[Out] = null + + private[this] def advance(): Unit = + value = thunk() match { + case null ⇒ throw new NullPointerException("Thunk is not allowed to return null") + case option ⇒ option + } + + @tailrec override final def hasNext: Boolean = value match { + case null ⇒ + advance(); hasNext + case option ⇒ option.isDefined + } + + @tailrec override final def next(): Out = value match { + case null ⇒ + advance(); next() + case Some(next) ⇒ + advance(); next + case None ⇒ Iterator.empty.next() + } + + override def toString: String = "ThunkIterator" } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7d9b009c0d..afb225a3d1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -116,7 +116,7 @@ object Source { * The stream ends normally when evaluation of the closure returns a `None`. * The stream ends exceptionally when an exception is thrown from the closure. */ - def apply[T](f: () ⇒ Option[T]): Source[T] = ThunkSource(f) + def apply[T](f: () ⇒ Option[T]): Source[T] = IteratorSource(new ThunkIterator(f)) /** * Start a new `Source` from the given `Future`. The stream will consist of