diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala index c33c1bce52..76ea209db0 100644 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ b/akka-stream/src/main/scala/akka/stream/Stream.scala @@ -13,11 +13,14 @@ import scala.concurrent.Future import scala.util.Try import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import akka.stream.impl.Ast.IteratorProducerNode +import akka.stream.impl.Ast.IterableProducerNode +import akka.stream.impl.Ast.ExistingProducer object Stream { - def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(producer, Nil) - def apply[T](iterator: Iterator[T])(implicit ec: ExecutionContext): Stream[T] = StreamImpl(new IteratorProducer(iterator), Nil) - def apply[T](seq: immutable.Seq[T]) = ??? + def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(ExistingProducer(producer), Nil) + def apply[T](iterator: Iterator[T]): Stream[T] = StreamImpl(IteratorProducerNode(iterator), Nil) + def apply[T](iterable: immutable.Iterable[T]): Stream[T] = StreamImpl(IterableProducerNode(iterable), Nil) def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Stream[T] = apply(gen.produce(f)) @@ -57,11 +60,11 @@ trait ProcessorGenerator { * INTERNAL API * ops are stored in reverse order */ - private[akka] def toProducer[I, O](producerToExtend: Producer[I], ops: List[Ast.AstNode]): Producer[O] + private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O] /** * INTERNAL API */ - private[akka] def consume[I](producer: Producer[I], ops: List[Ast.AstNode]): Unit + private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala new file mode 100644 index 0000000000..7fa7998c40 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/EmptyProducer.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import org.reactivestreams.api.Consumer +import org.reactivestreams.api.Producer +import org.reactivestreams.spi.Publisher +import org.reactivestreams.spi.Subscriber + +/** + * INTERNAL API + */ +private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] { + def getPublisher: Publisher[Nothing] = this + + def subscribe(subscriber: Subscriber[Nothing]): Unit = + subscriber.onComplete() + + def produceTo(consumer: Consumer[Nothing]): Unit = + getPublisher.subscribe(consumer.getSubscriber) + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala new file mode 100644 index 0000000000..b5027304f0 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala @@ -0,0 +1,167 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.annotation.tailrec +import scala.collection.immutable +import scala.util.control.NonFatal +import org.reactivestreams.spi.Subscriber +import org.reactivestreams.spi.Subscription +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.SupervisorStrategy +import akka.actor.Terminated +import akka.stream.GeneratorSettings +import scala.concurrent.duration.Duration + +/** + * INTERNAL API + */ +private[akka] object IterableProducer { + def props(iterable: immutable.Iterable[Any], settings: GeneratorSettings): Props = + Props(new IterableProducer(iterable, settings)) + + object BasicActorSubscription { + case object Cancel + case class RequestMore(elements: Int) + } + + class BasicActorSubscription(ref: ActorRef) + extends Subscription { + import BasicActorSubscription._ + def cancel(): Unit = ref ! Cancel + def requestMore(elements: Int): Unit = + if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + else ref ! RequestMore(elements) + override def toString = "BasicActorSubscription" + } +} + +/** + * INTERNAL API + * + * Elements are produced from the iterator of the iterable. Each subscriber + * makes use of its own iterable, i.e. each consumer will receive the elements from the + * beginning of the iterable and it can consume the elements in its own pace. + */ +private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings: GeneratorSettings) extends Actor { + import IterableProducer.BasicActorSubscription + import IterableProducer.BasicActorSubscription.Cancel + + require(iterable.nonEmpty, "Use EmptyProducer for empty iterable") + + var exposedPublisher: ActorPublisher[Any] = _ + var subscribers = Set.empty[Subscriber[Any]] + var workers = Map.empty[ActorRef, Subscriber[Any]] + var completed = false + + override val supervisorStrategy = SupervisorStrategy.stoppingStrategy + + def receive = { + case ExposedPublisher(publisher) ⇒ + exposedPublisher = publisher + context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) + context.become(waitingForFirstSubscriber) + case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") + } + + def waitingForFirstSubscriber: Receive = { + case SubscribePending ⇒ + exposedPublisher.takePendingSubscribers() foreach registerSubscriber + context.setReceiveTimeout(Duration.Undefined) + context.become(active) + } + + def active: Receive = { + case SubscribePending ⇒ + exposedPublisher.takePendingSubscribers() foreach registerSubscriber + + case Terminated(worker) ⇒ + val subscriber = workers(worker) + workers -= worker + subscribers -= subscriber + if (subscribers.isEmpty) + context.stop(self) + } + + def registerSubscriber(subscriber: Subscriber[Any]): Unit = { + if (subscribers(subscriber)) + subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) + else { + val iterator = iterable.iterator + val worker = context.watch(context.actorOf(IterableProducerWorker.props(iterator, subscriber, + settings.maximumInputBufferSize))) + val subscription = new BasicActorSubscription(worker) + subscribers += subscriber + workers = workers.updated(worker, subscriber) + subscriber.onSubscribe(subscription) + } + } + + override def postStop(): Unit = { + if (exposedPublisher ne null) + exposedPublisher.shutdown(completed) + } + +} + +/** + * INTERNAL API + */ +private[akka] object IterableProducerWorker { + def props(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int): Props = + Props(new IterableProducerWorker(iterator, subscriber, maxPush)) + + private object PushMore +} + +/** + * INTERNAL API + * + * Each subscriber is served by this worker actor. It pushes elements to the + * subscriber immediately when it receives demand, but to allow cancel before + * pushing everything it sends a PushMore to itself after a batch of elements. + */ +private[akka] class IterableProducerWorker(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int) + extends Actor { + import IterableProducerWorker._ + import IterableProducer.BasicActorSubscription._ + + require(iterator.hasNext, "Iterator must not be empty") + + var demand = 0L + + def receive = { + case RequestMore(elements) ⇒ + demand += elements + push() + case PushMore ⇒ + push() + case Cancel ⇒ + context.stop(self) + } + + private def push(): Unit = { + @tailrec def doPush(n: Int): Unit = + if (demand > 0) { + demand -= 1 + subscriber.onNext(iterator.next()) + if (!iterator.hasNext) { + subscriber.onComplete() + context.stop(self) + } else if (n == 0 && demand > 0) + self ! PushMore + else + doPush(n - 1) + } + + try doPush(maxPush) catch { + case NonFatal(e) ⇒ + subscriber.onError(e) + context.stop(self) + } + } +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala new file mode 100644 index 0000000000..967037e671 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala @@ -0,0 +1,22 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.Props +import akka.stream.GeneratorSettings +import akka.stream.Stream + +/** + * INTERNAL API + */ +private[akka] object IteratorProducer { + def props(iterator: Iterator[Any], settings: GeneratorSettings): Props = { + def f(): Any = { + if (!iterator.hasNext) throw Stream.Stop + iterator.next() + } + ActorProducer.props(settings, f) + } + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala index 1a5e59c6ea..f1b50d3c58 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala @@ -20,6 +20,25 @@ private[akka] object Ast { case class Transform(zero: Any, f: (Any, Any) ⇒ (Any, immutable.Seq[Any]), onComplete: Any ⇒ immutable.Seq[Any], isComplete: Any ⇒ Boolean) extends AstNode case class Recover(t: Transform) extends AstNode + + trait ProducerNode[I] { + def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] + } + + case class ExistingProducer[I](producer: Producer[I]) extends ProducerNode[I] { + def createProducer(settings: GeneratorSettings, context: ActorRefFactory) = producer + } + + case class IteratorProducerNode[I](iterator: Iterator[I]) extends ProducerNode[I] { + def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] = + if (iterator.isEmpty) EmptyProducer.asInstanceOf[Producer[I]] + else new ActorProducer[I](context.actorOf(IteratorProducer.props(iterator, settings))) + } + case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] { + def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] = + if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]] + else new ActorProducer[I](context.actorOf(IterableProducer.props(iterable, settings))) + } } /** @@ -39,19 +58,19 @@ private[akka] class ActorBasedProcessorGenerator(settings: GeneratorSettings, co } // Ops come in reverse order - override def toProducer[I, O](producerToExtend: Producer[I], ops: List[AstNode]): Producer[O] = { - if (ops.isEmpty) producerToExtend.asInstanceOf[Producer[O]] + override def toProducer[I, O](producerNode: ProducerNode[I], ops: List[AstNode]): Producer[O] = { + if (ops.isEmpty) producerNode.createProducer(settings, context).asInstanceOf[Producer[O]] else { val opProcessor = processorForNode(ops.head) val topConsumer = processorChain(opProcessor, ops.tail) - producerToExtend.getPublisher.subscribe(topConsumer.getSubscriber.asInstanceOf[Subscriber[I]]) + producerNode.createProducer(settings, context).produceTo(topConsumer.asInstanceOf[Consumer[I]]) opProcessor.asInstanceOf[Producer[O]] } } private val identityConsumer = Transform((), (_, _) ⇒ () -> Nil, _ ⇒ Nil, _ ⇒ false) - override def consume[I](producer: Producer[I], ops: List[AstNode]): Unit = { + override def consume[I](producerNode: ProducerNode[I], ops: List[AstNode]): Unit = { val consumer = ops match { case Nil ⇒ new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, identityConsumer))) @@ -59,7 +78,7 @@ private[akka] class ActorBasedProcessorGenerator(settings: GeneratorSettings, co val c = new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, head))) processorChain(c, tail) } - producer.produceTo(consumer.asInstanceOf[Consumer[I]]) + producerNode.createProducer(settings, context).produceTo(consumer.asInstanceOf[Consumer[I]]) } override def produce[T](f: () ⇒ T): Producer[T] = new ActorProducer(context.actorOf(ActorProducer.props(settings, f))) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala index 44194b0390..780f8a5411 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala @@ -15,7 +15,7 @@ import akka.stream.{ ProcessorGenerator, Stream } /** * INTERNAL API */ -private[akka] case class StreamImpl[I, O](producer: Producer[I], ops: List[Ast.AstNode]) extends Stream[O] { +private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Stream[O] { import Ast._ // Storing ops in reverse order private def andThen[U](op: AstNode): Stream[U] = this.copy(ops = op :: ops) @@ -67,8 +67,8 @@ private[akka] case class StreamImpl[I, O](producer: Producer[I], ops: List[Ast.A p.future } - def consume(generator: ProcessorGenerator): Unit = generator.consume(producer, ops) + def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops) - def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producer, ops) + def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala deleted file mode 100644 index dbcc5818d7..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import java.util.concurrent.atomic.AtomicBoolean -import scala.annotation.tailrec -import scala.concurrent.ExecutionContext -import org.reactivestreams.spi -import org.reactivestreams.api.Consumer - -/** - * INTERNAL API - * - * An efficient producer for iterators. - * - * CAUTION: This is a convenience wrapper designed for iterators over static collections. - * Do *NOT* use it for iterators on lazy collections or other implementations that do more - * than merely retrieve an element in their `next()` method! - */ -private[akka] class IteratorProducer[T]( - iterator: Iterator[T], - maxBufferSize: Int = 16, - maxRecursionLevel: Int = 32, - maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code - extends AbstractStrictProducer[T](initialBufferSize = 1, maxBufferSize, maxRecursionLevel, maxSyncBatchSize) { - - if (!iterator.hasNext) completeDownstream() - - @tailrec final protected def pushNext(count: Int): Unit = - if (iterator.hasNext) { - if (count > 0) { - pushToDownstream(iterator.next()) - pushNext(count - 1) - } - } else completeDownstream() -} - -/** - * INTERNAL API - * - * Base class for producers that can provide their elements synchronously. - * - * For efficiency it tries to produce elements synchronously before returning from `requestMore`. - * If the requested element count is > the given `maxSyncBatchSize` or there are still scheduled - * "productions" pending then (part of) the requested elements are produced asynchronously via the - * given executionContext. - * - * Also, in order to protect against stack overflow, the given `maxRecursionLevel` limits the number - * of nested call iterations between the fanout logic and the synchronous production logic provided - * by `AbstractStrictProducer`. If the `maxRecursionLevel` is surpassed the synchronous production - * loop is stopped and production of the remaining elements scheduled to the given executor. - */ -private[akka] abstract class AbstractStrictProducer[T]( - initialBufferSize: Int, - maxBufferSize: Int, - maxRecursionLevel: Int = 32, - maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code - extends AbstractProducer[T](initialBufferSize, maxBufferSize) { - - private[this] val locked = new AtomicBoolean // TODO: replace with AtomicFieldUpdater / sun.misc.Unsafe - private[this] var pending = 0L - private[this] var recursionLevel = 0 - - def produceTo(consumer: Consumer[T]): Unit = - getPublisher.subscribe(consumer.getSubscriber) - - /** - * Implement with the actual production logic. - * It should synchronously call `pushToDownstream(...)` the given number of times. - * If less than (or equal to!) `count` elements are still available `completeDownstream()` must be called after - * all remaining elements have been pushed. - */ - protected def pushNext(count: Int): Unit - - protected def requestFromUpstream(elements: Int): Unit = { - recursionLevel += 1 - try { - if (pending == 0) { - if (recursionLevel <= maxRecursionLevel) produce(elements) - else schedule(elements) - } else pending += elements // if we still have something scheduled we must not produce synchronously - } finally recursionLevel -= 1 - } - - private def produce(elements: Long): Unit = - if (elements > maxSyncBatchSize) { - pushNext(maxSyncBatchSize) - schedule(elements - maxSyncBatchSize) - } else { - pushNext(elements.toInt) - pending = 0 - } - - private def schedule(newPending: Long): Unit = { - pending = newPending - executor.execute( - new Runnable { - @tailrec def run(): Unit = - if (locked.compareAndSet(false, true)) { - try produce(pending) - finally locked.set(false) - } else run() - }) - } - - protected def shutdown(completed: Boolean): Unit = cancelUpstream() - protected def cancelUpstream(): Unit = pending = 0 - - // outside Publisher interface, can potentially called from another thread, - // so we need to wrap with synchronization - @tailrec final override def subscribe(subscriber: spi.Subscriber[T]): Unit = - if (locked.compareAndSet(false, true)) { - try super.subscribe(subscriber) - finally locked.set(false) - } else subscribe(subscriber) - - // called from `Subscription::requestMore`, i.e. from another thread - // so we need to add synchronisation here - @tailrec final override protected def moreRequested(subscription: Subscription, elements: Int): Unit = - if (locked.compareAndSet(false, true)) { - try super.moreRequested(subscription, elements) - finally locked.set(false) - } else moreRequested(subscription, elements) - - // called from a Subscription, i.e. probably from another thread, - // so we need to wrap with synchronization - @tailrec final override def unregisterSubscription(subscription: Subscription) = - if (locked.compareAndSet(false, true)) { - try super.unregisterSubscription(subscription) - finally locked.set(false) - } else unregisterSubscription(subscription) -} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala index 000bfb511c..f230890ed4 100644 --- a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -6,7 +6,6 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification -import akka.stream.testkit.TestProducer import akka.stream.impl.ActorBasedProcessorGenerator import org.reactivestreams.api.Producer diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index 596e6698e9..f4b9918b2b 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -11,7 +11,6 @@ import akka.actor.Props import akka.stream.impl.ActorProcessor import akka.stream.impl.TransformProcessorImpl import akka.stream.impl.Ast -import akka.stream.testkit.TestProducer import akka.testkit.TestEvent import akka.testkit.EventFilter import akka.stream.impl.ActorBasedProcessorGenerator @@ -38,8 +37,9 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With } def createHelperPublisher(elements: Int): Publisher[Int] = { - import system.dispatcher + val gen = ProcessorGenerator(GeneratorSettings( + maximumInputBufferSize = 512))(system) val iter = Iterator from 1000 - TestProducer(if (elements > 0) iter take elements else iter).getPublisher + Stream(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher } } diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala new file mode 100644 index 0000000000..bf99dc9f16 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import org.scalatest.testng.TestNGSuiteLike +import org.reactivestreams.spi.Publisher +import org.reactivestreams.tck.PublisherVerification +import scala.collection.immutable + +class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { + + val gen = ProcessorGenerator(GeneratorSettings( + maximumInputBufferSize = 512))(system) + + def createPublisher(elements: Int): Publisher[Int] = { + val iterable: immutable.Iterable[Int] = + if (elements == 0) + new immutable.Iterable[Int] { override def iterator = Iterator from 0 } + else + 0 until elements + Stream(iterable).toProducer(gen).getPublisher + } + + override def createCompletedStatePublisher(): Publisher[Int] = + Stream(Nil).toProducer(gen).getPublisher +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala index 58f0773fda..56ce117794 100644 --- a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala @@ -6,16 +6,21 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification -import akka.stream.testkit.TestProducer class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { - import system.dispatcher + + val gen = ProcessorGenerator(GeneratorSettings( + maximumInputBufferSize = 512))(system) def createPublisher(elements: Int): Publisher[Int] = { - val iter = Iterator from 1000 - TestProducer(if (elements > 0) iter take elements else iter).getPublisher + val iter: Iterator[Int] = + if (elements == 0) + Iterator from 0 + else + (Iterator from 0).take(elements) + Stream(iter).toProducer(gen).getPublisher } override def createCompletedStatePublisher(): Publisher[Int] = - TestProducer(Nil).getPublisher + Stream(List.empty[Int].iterator).toProducer(gen).getPublisher } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala new file mode 100644 index 0000000000..0a1b13cf65 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.testkit.AkkaSpec +import akka.stream.testkit.OnNext +import akka.dispatch.OnComplete +import akka.stream.testkit.OnComplete +import akka.stream.testkit.OnError +import akka.stream.testkit.OnSubscribe + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class StreamIterableSpec extends AkkaSpec { + + val gen = ProcessorGenerator(GeneratorSettings( + maximumInputBufferSize = 512)) + + "A Stream based on an iterable" must { + "produce elements" in { + val p = Stream(List(1, 2, 3)).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(1) + c.expectNext(1) + c.expectNoMsg(100.millis) + sub.requestMore(2) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + + "complete empty" in { + val p = Stream(List.empty[Int]).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + c.expectComplete() + c.expectNoMsg(100.millis) + + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c2) + c2.expectComplete() + } + + "produce elements with multiple subscribers" in { + val p = Stream(List(1, 2, 3)).toProducer(gen) + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c1) + p.produceTo(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.requestMore(1) + sub2.requestMore(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(2) + c1.expectNoMsg(100.millis) + c2.expectNoMsg(100.millis) + sub1.requestMore(2) + sub2.requestMore(2) + c1.expectNext(2) + c1.expectNext(3) + c2.expectNext(3) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val p = Stream(List(1, 2, 3)).toProducer(gen) + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c1) + + val sub1 = c1.expectSubscription() + sub1.requestMore(1) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + p.produceTo(c2) + val sub2 = c2.expectSubscription() + sub2.requestMore(2) + // starting from first element, new iterator per subscriber + c2.expectNext(1) + c2.expectNext(2) + c2.expectNoMsg(100.millis) + sub2.requestMore(1) + c2.expectNext(3) + c2.expectComplete() + sub1.requestMore(2) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + } + + "produce elements with one transformation step" in { + val p = Stream(List(1, 2, 3)).map(_ * 2).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(10) + c.expectNext(2) + c.expectNext(4) + c.expectNext(6) + c.expectComplete() + } + + "produce elements with two transformation steps" in { + val p = Stream(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(10) + c.expectNext(4) + c.expectNext(8) + c.expectComplete() + } + + "allow cancel before receiving all elements" in { + val count = 100000 + val p = Stream(1 to count).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(count) + c.expectNext(1) + sub.cancel() + val got = c.probe.receiveWhile(3.seconds) { + case _: OnNext[_] ⇒ + case OnComplete ⇒ fail("Cancel expected before OnComplete") + case OnError(e) ⇒ fail(e) + } + got.size should be < (count - 1) + } + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala new file mode 100644 index 0000000000..814a4c2c20 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.testkit.AkkaSpec +import akka.stream.testkit.OnNext +import akka.stream.testkit.OnComplete +import akka.stream.testkit.OnError + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class StreamIteratorSpec extends AkkaSpec { + + val gen = ProcessorGenerator(GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 4, + maxFanOutBufferSize = 4)) + + "A Stream based on an iterator" must { + "produce elements" in { + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(1) + c.expectNext(1) + c.expectNoMsg(100.millis) + sub.requestMore(3) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + + "complete empty" in { + val p = Stream(List.empty[Int].iterator).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + c.expectComplete() + c.expectNoMsg(100.millis) + + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c2) + c2.expectComplete() + } + + "produce elements with multiple subscribers" in { + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c1) + p.produceTo(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.requestMore(1) + sub2.requestMore(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(2) + c1.expectNoMsg(100.millis) + c2.expectNoMsg(100.millis) + sub1.requestMore(2) + sub2.requestMore(2) + c1.expectNext(2) + c1.expectNext(3) + c2.expectNext(3) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c1) + + val sub1 = c1.expectSubscription() + sub1.requestMore(1) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + p.produceTo(c2) + val sub2 = c2.expectSubscription() + sub2.requestMore(3) + // element 1 is already gone + c2.expectNext(2) + c2.expectNext(3) + c2.expectComplete() + sub1.requestMore(3) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + } + + "produce elements with one transformation step" in { + val p = Stream(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(10) + c.expectNext(2) + c.expectNext(4) + c.expectNext(6) + c.expectComplete() + } + + "produce elements with two transformation steps" in { + val p = Stream(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(10) + c.expectNext(4) + c.expectNext(8) + c.expectComplete() + } + + "allow cancel before receiving all elements" in { + val count = 100000 + val p = Stream((1 to count).iterator).toProducer(gen) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(count) + c.expectNext(1) + sub.cancel() + val got = c.probe.receiveWhile(3.seconds) { + case _: OnNext[_] ⇒ + case OnComplete ⇒ fail("Cancel expected before OnComplete") + case OnError(e) ⇒ fail(e) + } + got.size should be < (count - 1) + } + + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala index e79136ebdf..5dc699a2e9 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala @@ -6,7 +6,6 @@ package akka.stream import scala.concurrent.duration._ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec -import akka.stream.impl.IteratorProducer import akka.testkit.EventFilter import scala.util.Failure import scala.util.control.NoStackTrace @@ -24,7 +23,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "A Stream with transformRecover operations" must { "produce one-to-one transformation as expected" in { - val p = new IteratorProducer(List(1, 2, 3).iterator) + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) val p2 = Stream(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, List(tot + elem.get))). toProducer(gen) @@ -41,7 +40,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce one-to-several transformation as expected" in { - val p = new IteratorProducer(List(1, 2, 3).iterator) + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) val p2 = Stream(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, Vector.fill(elem.get)(tot + elem.get))). toProducer(gen) @@ -61,7 +60,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce dropping transformation as expected" in { - val p = new IteratorProducer(List(1, 2, 3, 4).iterator) + val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen) val p2 = Stream(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, if (elem.get % 2 == 0) Nil else List(tot + elem.get))). toProducer(gen) @@ -78,7 +77,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce multi-step transformation as expected" in { - val p = new IteratorProducer(List("a", "bc", "def").iterator) + val p = Stream(List("a", "bc", "def").iterator).toProducer(gen) val p2 = Stream(p). transformRecover("") { (str, elem) ⇒ val concat = str + elem @@ -108,7 +107,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "invoke onComplete when done" in { - val p = new IteratorProducer(List("a").iterator) + val p = Stream(List("a").iterator).toProducer(gen) val p2 = Stream(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) @@ -154,7 +153,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "report error when exception is thrown" in { - val p = new IteratorProducer(List(1, 2, 3).iterator) + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) val p2 = Stream(p). transformRecover(0) { (_, elem) ⇒ if (elem.get == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem.get, elem.get)) @@ -206,7 +205,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "support cancel as expected" in { - val p = new IteratorProducer(List(1, 2, 3).iterator) + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) val p2 = Stream(p). transformRecover(0) { (_, elem) ⇒ (0, List(elem.get, elem.get)) }. toProducer(gen) diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala index 340a4aed18..4cbe897f23 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala @@ -6,7 +6,6 @@ package akka.stream import scala.concurrent.duration._ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec -import akka.stream.impl.IteratorProducer import akka.testkit.EventFilter import com.typesafe.config.ConfigFactory @@ -23,7 +22,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor "A Stream with transform operations" must { "produce one-to-one transformation as expected" in { - val p = new IteratorProducer(List(1, 2, 3).iterator) + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) val p2 = Stream(p). transform(0)((tot, elem) ⇒ (tot + elem, List(tot + elem))). toProducer(gen) @@ -40,7 +39,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce one-to-several transformation as expected" in { - val p = new IteratorProducer(List(1, 2, 3).iterator) + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) val p2 = Stream(p). transform(0)((tot, elem) ⇒ (tot + elem, Vector.fill(elem)(tot + elem))). toProducer(gen) @@ -60,7 +59,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce dropping transformation as expected" in { - val p = new IteratorProducer(List(1, 2, 3, 4).iterator) + val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen) val p2 = Stream(p). transform(0)((tot, elem) ⇒ (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))). toProducer(gen) @@ -77,7 +76,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce multi-step transformation as expected" in { - val p = new IteratorProducer(List("a", "bc", "def").iterator) + val p = Stream(List("a", "bc", "def").iterator).toProducer(gen) val p2 = Stream(p). transform("") { (str, elem) ⇒ val concat = str + elem @@ -107,7 +106,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "invoke onComplete when done" in { - val p = new IteratorProducer(List("a").iterator) + val p = Stream(List("a").iterator).toProducer(gen) val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) @@ -149,7 +148,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "report error when exception is thrown" in { - val p = new IteratorProducer(List(1, 2, 3).iterator) + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) val p2 = Stream(p). transform(0) { (_, elem) ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem)) @@ -168,7 +167,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "support cancel as expected" in { - val p = new IteratorProducer(List(1, 2, 3).iterator) + val p = Stream(List(1, 2, 3).iterator).toProducer(gen) val p2 = Stream(p). transform(0) { (_, elem) ⇒ (0, List(elem, elem)) }. toProducer(gen) diff --git a/akka-stream/src/test/scala/akka/stream/testkit/TestProducer.scala b/akka-stream/src/test/scala/akka/stream/testkit/TestProducer.scala deleted file mode 100644 index 192857545c..0000000000 --- a/akka-stream/src/test/scala/akka/stream/testkit/TestProducer.scala +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.testkit - -import org.reactivestreams.api.Producer -import akka.stream.impl.IteratorProducer -import scala.concurrent.ExecutionContext -import org.reactivestreams.spi.Subscriber -import org.reactivestreams.spi.Publisher -import org.reactivestreams.api.Consumer - -object TestProducer { - def apply[T](iterable: Iterable[T])(implicit executor: ExecutionContext): Producer[T] = apply(iterable.iterator) - def apply[T](iterator: Iterator[T])(implicit executor: ExecutionContext): Producer[T] = new IteratorProducer[T](iterator) - def empty[T]: Producer[T] = EmptyProducer.asInstanceOf[Producer[T]] -} - -object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] { - def getPublisher: Publisher[Nothing] = this - - def subscribe(subscriber: Subscriber[Nothing]): Unit = - subscriber.onComplete() - - def produceTo(consumer: Consumer[Nothing]): Unit = - getPublisher.subscribe(consumer.getSubscriber) - -} \ No newline at end of file