!str Add Stream from Iterator and Iterable

* actor based producer for iterator
* actor based producer for iterable
* remove old IteratorProducer and StrictProducer
This commit is contained in:
Patrik Nordwall 2014-03-28 15:44:18 +01:00 committed by Roland Kuhn
parent d2b40c6c21
commit 0608db4b0d
16 changed files with 576 additions and 199 deletions

View file

@ -13,11 +13,14 @@ import scala.concurrent.Future
import scala.util.Try
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.impl.Ast.IteratorProducerNode
import akka.stream.impl.Ast.IterableProducerNode
import akka.stream.impl.Ast.ExistingProducer
object Stream {
def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(producer, Nil)
def apply[T](iterator: Iterator[T])(implicit ec: ExecutionContext): Stream[T] = StreamImpl(new IteratorProducer(iterator), Nil)
def apply[T](seq: immutable.Seq[T]) = ???
def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(ExistingProducer(producer), Nil)
def apply[T](iterator: Iterator[T]): Stream[T] = StreamImpl(IteratorProducerNode(iterator), Nil)
def apply[T](iterable: immutable.Iterable[T]): Stream[T] = StreamImpl(IterableProducerNode(iterable), Nil)
def apply[T](gen: ProcessorGenerator, f: () T): Stream[T] = apply(gen.produce(f))
@ -57,11 +60,11 @@ trait ProcessorGenerator {
* INTERNAL API
* ops are stored in reverse order
*/
private[akka] def toProducer[I, O](producerToExtend: Producer[I], ops: List[Ast.AstNode]): Producer[O]
private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O]
/**
* INTERNAL API
*/
private[akka] def consume[I](producer: Producer[I], ops: List[Ast.AstNode]): Unit
private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit
/**
* INTERNAL API
*/

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import org.reactivestreams.spi.Publisher
import org.reactivestreams.spi.Subscriber
/**
* INTERNAL API
*/
private[akka] object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] {
def getPublisher: Publisher[Nothing] = this
def subscribe(subscriber: Subscriber[Nothing]): Unit =
subscriber.onComplete()
def produceTo(consumer: Consumer[Nothing]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
}

View file

@ -0,0 +1,167 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.NonFatal
import org.reactivestreams.spi.Subscriber
import org.reactivestreams.spi.Subscription
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.Terminated
import akka.stream.GeneratorSettings
import scala.concurrent.duration.Duration
/**
* INTERNAL API
*/
private[akka] object IterableProducer {
def props(iterable: immutable.Iterable[Any], settings: GeneratorSettings): Props =
Props(new IterableProducer(iterable, settings))
object BasicActorSubscription {
case object Cancel
case class RequestMore(elements: Int)
}
class BasicActorSubscription(ref: ActorRef)
extends Subscription {
import BasicActorSubscription._
def cancel(): Unit = ref ! Cancel
def requestMore(elements: Int): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0")
else ref ! RequestMore(elements)
override def toString = "BasicActorSubscription"
}
}
/**
* INTERNAL API
*
* Elements are produced from the iterator of the iterable. Each subscriber
* makes use of its own iterable, i.e. each consumer will receive the elements from the
* beginning of the iterable and it can consume the elements in its own pace.
*/
private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings: GeneratorSettings) extends Actor {
import IterableProducer.BasicActorSubscription
import IterableProducer.BasicActorSubscription.Cancel
require(iterable.nonEmpty, "Use EmptyProducer for empty iterable")
var exposedPublisher: ActorPublisher[Any] = _
var subscribers = Set.empty[Subscriber[Any]]
var workers = Map.empty[ActorRef, Subscriber[Any]]
var completed = false
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
context.setReceiveTimeout(settings.downstreamSubscriptionTimeout)
context.become(waitingForFirstSubscriber)
case _ throw new IllegalStateException("The first message must be ExposedPublisher")
}
def waitingForFirstSubscriber: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
context.setReceiveTimeout(Duration.Undefined)
context.become(active)
}
def active: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
case Terminated(worker)
val subscriber = workers(worker)
workers -= worker
subscribers -= subscriber
if (subscribers.isEmpty)
context.stop(self)
}
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (subscribers(subscriber))
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice"))
else {
val iterator = iterable.iterator
val worker = context.watch(context.actorOf(IterableProducerWorker.props(iterator, subscriber,
settings.maximumInputBufferSize)))
val subscription = new BasicActorSubscription(worker)
subscribers += subscriber
workers = workers.updated(worker, subscriber)
subscriber.onSubscribe(subscription)
}
}
override def postStop(): Unit = {
if (exposedPublisher ne null)
exposedPublisher.shutdown(completed)
}
}
/**
* INTERNAL API
*/
private[akka] object IterableProducerWorker {
def props(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int): Props =
Props(new IterableProducerWorker(iterator, subscriber, maxPush))
private object PushMore
}
/**
* INTERNAL API
*
* Each subscriber is served by this worker actor. It pushes elements to the
* subscriber immediately when it receives demand, but to allow cancel before
* pushing everything it sends a PushMore to itself after a batch of elements.
*/
private[akka] class IterableProducerWorker(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int)
extends Actor {
import IterableProducerWorker._
import IterableProducer.BasicActorSubscription._
require(iterator.hasNext, "Iterator must not be empty")
var demand = 0L
def receive = {
case RequestMore(elements)
demand += elements
push()
case PushMore
push()
case Cancel
context.stop(self)
}
private def push(): Unit = {
@tailrec def doPush(n: Int): Unit =
if (demand > 0) {
demand -= 1
subscriber.onNext(iterator.next())
if (!iterator.hasNext) {
subscriber.onComplete()
context.stop(self)
} else if (n == 0 && demand > 0)
self ! PushMore
else
doPush(n - 1)
}
try doPush(maxPush) catch {
case NonFatal(e)
subscriber.onError(e)
context.stop(self)
}
}
}

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.Props
import akka.stream.GeneratorSettings
import akka.stream.Stream
/**
* INTERNAL API
*/
private[akka] object IteratorProducer {
def props(iterator: Iterator[Any], settings: GeneratorSettings): Props = {
def f(): Any = {
if (!iterator.hasNext) throw Stream.Stop
iterator.next()
}
ActorProducer.props(settings, f)
}
}

View file

@ -20,6 +20,25 @@ private[akka] object Ast {
case class Transform(zero: Any, f: (Any, Any) (Any, immutable.Seq[Any]), onComplete: Any immutable.Seq[Any], isComplete: Any Boolean) extends AstNode
case class Recover(t: Transform) extends AstNode
trait ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I]
}
case class ExistingProducer[I](producer: Producer[I]) extends ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory) = producer
}
case class IteratorProducerNode[I](iterator: Iterator[I]) extends ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] =
if (iterator.isEmpty) EmptyProducer.asInstanceOf[Producer[I]]
else new ActorProducer[I](context.actorOf(IteratorProducer.props(iterator, settings)))
}
case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] =
if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]]
else new ActorProducer[I](context.actorOf(IterableProducer.props(iterable, settings)))
}
}
/**
@ -39,19 +58,19 @@ private[akka] class ActorBasedProcessorGenerator(settings: GeneratorSettings, co
}
// Ops come in reverse order
override def toProducer[I, O](producerToExtend: Producer[I], ops: List[AstNode]): Producer[O] = {
if (ops.isEmpty) producerToExtend.asInstanceOf[Producer[O]]
override def toProducer[I, O](producerNode: ProducerNode[I], ops: List[AstNode]): Producer[O] = {
if (ops.isEmpty) producerNode.createProducer(settings, context).asInstanceOf[Producer[O]]
else {
val opProcessor = processorForNode(ops.head)
val topConsumer = processorChain(opProcessor, ops.tail)
producerToExtend.getPublisher.subscribe(topConsumer.getSubscriber.asInstanceOf[Subscriber[I]])
producerNode.createProducer(settings, context).produceTo(topConsumer.asInstanceOf[Consumer[I]])
opProcessor.asInstanceOf[Producer[O]]
}
}
private val identityConsumer = Transform((), (_, _) () -> Nil, _ Nil, _ false)
override def consume[I](producer: Producer[I], ops: List[AstNode]): Unit = {
override def consume[I](producerNode: ProducerNode[I], ops: List[AstNode]): Unit = {
val consumer = ops match {
case Nil
new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, identityConsumer)))
@ -59,7 +78,7 @@ private[akka] class ActorBasedProcessorGenerator(settings: GeneratorSettings, co
val c = new ActorConsumer[Any](context.actorOf(ActorConsumer.props(settings, head)))
processorChain(c, tail)
}
producer.produceTo(consumer.asInstanceOf[Consumer[I]])
producerNode.createProducer(settings, context).produceTo(consumer.asInstanceOf[Consumer[I]])
}
override def produce[T](f: () T): Producer[T] = new ActorProducer(context.actorOf(ActorProducer.props(settings, f)))

View file

@ -15,7 +15,7 @@ import akka.stream.{ ProcessorGenerator, Stream }
/**
* INTERNAL API
*/
private[akka] case class StreamImpl[I, O](producer: Producer[I], ops: List[Ast.AstNode]) extends Stream[O] {
private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Stream[O] {
import Ast._
// Storing ops in reverse order
private def andThen[U](op: AstNode): Stream[U] = this.copy(ops = op :: ops)
@ -67,8 +67,8 @@ private[akka] case class StreamImpl[I, O](producer: Producer[I], ops: List[Ast.A
p.future
}
def consume(generator: ProcessorGenerator): Unit = generator.consume(producer, ops)
def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops)
def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producer, ops)
def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops)
}

View file

@ -1,133 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.tailrec
import scala.concurrent.ExecutionContext
import org.reactivestreams.spi
import org.reactivestreams.api.Consumer
/**
* INTERNAL API
*
* An efficient producer for iterators.
*
* CAUTION: This is a convenience wrapper designed for iterators over static collections.
* Do *NOT* use it for iterators on lazy collections or other implementations that do more
* than merely retrieve an element in their `next()` method!
*/
private[akka] class IteratorProducer[T](
iterator: Iterator[T],
maxBufferSize: Int = 16,
maxRecursionLevel: Int = 32,
maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code
extends AbstractStrictProducer[T](initialBufferSize = 1, maxBufferSize, maxRecursionLevel, maxSyncBatchSize) {
if (!iterator.hasNext) completeDownstream()
@tailrec final protected def pushNext(count: Int): Unit =
if (iterator.hasNext) {
if (count > 0) {
pushToDownstream(iterator.next())
pushNext(count - 1)
}
} else completeDownstream()
}
/**
* INTERNAL API
*
* Base class for producers that can provide their elements synchronously.
*
* For efficiency it tries to produce elements synchronously before returning from `requestMore`.
* If the requested element count is > the given `maxSyncBatchSize` or there are still scheduled
* "productions" pending then (part of) the requested elements are produced asynchronously via the
* given executionContext.
*
* Also, in order to protect against stack overflow, the given `maxRecursionLevel` limits the number
* of nested call iterations between the fanout logic and the synchronous production logic provided
* by `AbstractStrictProducer`. If the `maxRecursionLevel` is surpassed the synchronous production
* loop is stopped and production of the remaining elements scheduled to the given executor.
*/
private[akka] abstract class AbstractStrictProducer[T](
initialBufferSize: Int,
maxBufferSize: Int,
maxRecursionLevel: Int = 32,
maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code
extends AbstractProducer[T](initialBufferSize, maxBufferSize) {
private[this] val locked = new AtomicBoolean // TODO: replace with AtomicFieldUpdater / sun.misc.Unsafe
private[this] var pending = 0L
private[this] var recursionLevel = 0
def produceTo(consumer: Consumer[T]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
/**
* Implement with the actual production logic.
* It should synchronously call `pushToDownstream(...)` the given number of times.
* If less than (or equal to!) `count` elements are still available `completeDownstream()` must be called after
* all remaining elements have been pushed.
*/
protected def pushNext(count: Int): Unit
protected def requestFromUpstream(elements: Int): Unit = {
recursionLevel += 1
try {
if (pending == 0) {
if (recursionLevel <= maxRecursionLevel) produce(elements)
else schedule(elements)
} else pending += elements // if we still have something scheduled we must not produce synchronously
} finally recursionLevel -= 1
}
private def produce(elements: Long): Unit =
if (elements > maxSyncBatchSize) {
pushNext(maxSyncBatchSize)
schedule(elements - maxSyncBatchSize)
} else {
pushNext(elements.toInt)
pending = 0
}
private def schedule(newPending: Long): Unit = {
pending = newPending
executor.execute(
new Runnable {
@tailrec def run(): Unit =
if (locked.compareAndSet(false, true)) {
try produce(pending)
finally locked.set(false)
} else run()
})
}
protected def shutdown(completed: Boolean): Unit = cancelUpstream()
protected def cancelUpstream(): Unit = pending = 0
// outside Publisher interface, can potentially called from another thread,
// so we need to wrap with synchronization
@tailrec final override def subscribe(subscriber: spi.Subscriber[T]): Unit =
if (locked.compareAndSet(false, true)) {
try super.subscribe(subscriber)
finally locked.set(false)
} else subscribe(subscriber)
// called from `Subscription::requestMore`, i.e. from another thread
// so we need to add synchronisation here
@tailrec final override protected def moreRequested(subscription: Subscription, elements: Int): Unit =
if (locked.compareAndSet(false, true)) {
try super.moreRequested(subscription, elements)
finally locked.set(false)
} else moreRequested(subscription, elements)
// called from a Subscription, i.e. probably from another thread,
// so we need to wrap with synchronization
@tailrec final override def unregisterSubscription(subscription: Subscription) =
if (locked.compareAndSet(false, true)) {
try super.unregisterSubscription(subscription)
finally locked.set(false)
} else unregisterSubscription(subscription)
}

View file

@ -6,7 +6,6 @@ package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.PublisherVerification
import akka.stream.testkit.TestProducer
import akka.stream.impl.ActorBasedProcessorGenerator
import org.reactivestreams.api.Producer

View file

@ -11,7 +11,6 @@ import akka.actor.Props
import akka.stream.impl.ActorProcessor
import akka.stream.impl.TransformProcessorImpl
import akka.stream.impl.Ast
import akka.stream.testkit.TestProducer
import akka.testkit.TestEvent
import akka.testkit.EventFilter
import akka.stream.impl.ActorBasedProcessorGenerator
@ -38,8 +37,9 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With
}
def createHelperPublisher(elements: Int): Publisher[Int] = {
import system.dispatcher
val gen = ProcessorGenerator(GeneratorSettings(
maximumInputBufferSize = 512))(system)
val iter = Iterator from 1000
TestProducer(if (elements > 0) iter take elements else iter).getPublisher
Stream(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher
}
}

View file

@ -0,0 +1,27 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.PublisherVerification
import scala.collection.immutable
class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
val gen = ProcessorGenerator(GeneratorSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == 0)
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else
0 until elements
Stream(iterable).toProducer(gen).getPublisher
}
override def createCompletedStatePublisher(): Publisher[Int] =
Stream(Nil).toProducer(gen).getPublisher
}

View file

@ -6,16 +6,21 @@ package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.PublisherVerification
import akka.stream.testkit.TestProducer
class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
import system.dispatcher
val gen = ProcessorGenerator(GeneratorSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {
val iter = Iterator from 1000
TestProducer(if (elements > 0) iter take elements else iter).getPublisher
val iter: Iterator[Int] =
if (elements == 0)
Iterator from 0
else
(Iterator from 0).take(elements)
Stream(iter).toProducer(gen).getPublisher
}
override def createCompletedStatePublisher(): Publisher[Int] =
TestProducer(Nil).getPublisher
Stream(List.empty[Int].iterator).toProducer(gen).getPublisher
}

View file

@ -0,0 +1,138 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.testkit.OnNext
import akka.dispatch.OnComplete
import akka.stream.testkit.OnComplete
import akka.stream.testkit.OnError
import akka.stream.testkit.OnSubscribe
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamIterableSpec extends AkkaSpec {
val gen = ProcessorGenerator(GeneratorSettings(
maximumInputBufferSize = 512))
"A Stream based on an iterable" must {
"produce elements" in {
val p = Stream(List(1, 2, 3)).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(1)
c.expectNext(1)
c.expectNoMsg(100.millis)
sub.requestMore(2)
c.expectNext(2)
c.expectNext(3)
c.expectComplete()
}
"complete empty" in {
val p = Stream(List.empty[Int]).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
c.expectComplete()
c.expectNoMsg(100.millis)
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c2)
c2.expectComplete()
}
"produce elements with multiple subscribers" in {
val p = Stream(List(1, 2, 3)).toProducer(gen)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
p.produceTo(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.requestMore(1)
sub2.requestMore(2)
c1.expectNext(1)
c2.expectNext(1)
c2.expectNext(2)
c1.expectNoMsg(100.millis)
c2.expectNoMsg(100.millis)
sub1.requestMore(2)
sub2.requestMore(2)
c1.expectNext(2)
c1.expectNext(3)
c2.expectNext(3)
c1.expectComplete()
c2.expectComplete()
}
"produce elements to later subscriber" in {
val p = Stream(List(1, 2, 3)).toProducer(gen)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
val sub1 = c1.expectSubscription()
sub1.requestMore(1)
c1.expectNext(1)
c1.expectNoMsg(100.millis)
p.produceTo(c2)
val sub2 = c2.expectSubscription()
sub2.requestMore(2)
// starting from first element, new iterator per subscriber
c2.expectNext(1)
c2.expectNext(2)
c2.expectNoMsg(100.millis)
sub2.requestMore(1)
c2.expectNext(3)
c2.expectComplete()
sub1.requestMore(2)
c1.expectNext(2)
c1.expectNext(3)
c1.expectComplete()
}
"produce elements with one transformation step" in {
val p = Stream(List(1, 2, 3)).map(_ * 2).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(10)
c.expectNext(2)
c.expectNext(4)
c.expectNext(6)
c.expectComplete()
}
"produce elements with two transformation steps" in {
val p = Stream(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(10)
c.expectNext(4)
c.expectNext(8)
c.expectComplete()
}
"allow cancel before receiving all elements" in {
val count = 100000
val p = Stream(1 to count).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(count)
c.expectNext(1)
sub.cancel()
val got = c.probe.receiveWhile(3.seconds) {
case _: OnNext[_]
case OnComplete fail("Cancel expected before OnComplete")
case OnError(e) fail(e)
}
got.size should be < (count - 1)
}
}
}

View file

@ -0,0 +1,137 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.testkit.OnNext
import akka.stream.testkit.OnComplete
import akka.stream.testkit.OnError
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamIteratorSpec extends AkkaSpec {
val gen = ProcessorGenerator(GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 4,
maxFanOutBufferSize = 4))
"A Stream based on an iterator" must {
"produce elements" in {
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(1)
c.expectNext(1)
c.expectNoMsg(100.millis)
sub.requestMore(3)
c.expectNext(2)
c.expectNext(3)
c.expectComplete()
}
"complete empty" in {
val p = Stream(List.empty[Int].iterator).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
c.expectComplete()
c.expectNoMsg(100.millis)
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c2)
c2.expectComplete()
}
"produce elements with multiple subscribers" in {
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
p.produceTo(c2)
val sub1 = c1.expectSubscription()
val sub2 = c2.expectSubscription()
sub1.requestMore(1)
sub2.requestMore(2)
c1.expectNext(1)
c2.expectNext(1)
c2.expectNext(2)
c1.expectNoMsg(100.millis)
c2.expectNoMsg(100.millis)
sub1.requestMore(2)
sub2.requestMore(2)
c1.expectNext(2)
c1.expectNext(3)
c2.expectNext(3)
c1.expectComplete()
c2.expectComplete()
}
"produce elements to later subscriber" in {
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val c1 = StreamTestKit.consumerProbe[Int]
val c2 = StreamTestKit.consumerProbe[Int]
p.produceTo(c1)
val sub1 = c1.expectSubscription()
sub1.requestMore(1)
c1.expectNext(1)
c1.expectNoMsg(100.millis)
p.produceTo(c2)
val sub2 = c2.expectSubscription()
sub2.requestMore(3)
// element 1 is already gone
c2.expectNext(2)
c2.expectNext(3)
c2.expectComplete()
sub1.requestMore(3)
c1.expectNext(2)
c1.expectNext(3)
c1.expectComplete()
}
"produce elements with one transformation step" in {
val p = Stream(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(10)
c.expectNext(2)
c.expectNext(4)
c.expectNext(6)
c.expectComplete()
}
"produce elements with two transformation steps" in {
val p = Stream(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(10)
c.expectNext(4)
c.expectNext(8)
c.expectComplete()
}
"allow cancel before receiving all elements" in {
val count = 100000
val p = Stream((1 to count).iterator).toProducer(gen)
val c = StreamTestKit.consumerProbe[Int]
p.produceTo(c)
val sub = c.expectSubscription()
sub.requestMore(count)
c.expectNext(1)
sub.cancel()
val got = c.probe.receiveWhile(3.seconds) {
case _: OnNext[_]
case OnComplete fail("Cancel expected before OnComplete")
case OnError(e) fail(e)
}
got.size should be < (count - 1)
}
}
}

View file

@ -6,7 +6,6 @@ package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.impl.IteratorProducer
import akka.testkit.EventFilter
import scala.util.Failure
import scala.util.control.NoStackTrace
@ -24,7 +23,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
"A Stream with transformRecover operations" must {
"produce one-to-one transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val p2 = Stream(p).
transformRecover(0)((tot, elem) (tot + elem.get, List(tot + elem.get))).
toProducer(gen)
@ -41,7 +40,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
}
"produce one-to-several transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val p2 = Stream(p).
transformRecover(0)((tot, elem) (tot + elem.get, Vector.fill(elem.get)(tot + elem.get))).
toProducer(gen)
@ -61,7 +60,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
}
"produce dropping transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3, 4).iterator)
val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen)
val p2 = Stream(p).
transformRecover(0)((tot, elem) (tot + elem.get, if (elem.get % 2 == 0) Nil else List(tot + elem.get))).
toProducer(gen)
@ -78,7 +77,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
}
"produce multi-step transformation as expected" in {
val p = new IteratorProducer(List("a", "bc", "def").iterator)
val p = Stream(List("a", "bc", "def").iterator).toProducer(gen)
val p2 = Stream(p).
transformRecover("") { (str, elem)
val concat = str + elem
@ -108,7 +107,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
}
"invoke onComplete when done" in {
val p = new IteratorProducer(List("a").iterator)
val p = Stream(List("a").iterator).toProducer(gen)
val p2 = Stream(p).transformRecover("")((s, in) (s + in, Nil), x List(x + "B")).toProducer(gen)
val c = StreamTestKit.consumerProbe[String]
p2.produceTo(c)
@ -154,7 +153,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
}
"report error when exception is thrown" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val p2 = Stream(p).
transformRecover(0) { (_, elem)
if (elem.get == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem.get, elem.get))
@ -206,7 +205,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
}
"support cancel as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val p2 = Stream(p).
transformRecover(0) { (_, elem) (0, List(elem.get, elem.get)) }.
toProducer(gen)

View file

@ -6,7 +6,6 @@ package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.impl.IteratorProducer
import akka.testkit.EventFilter
import com.typesafe.config.ConfigFactory
@ -23,7 +22,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
"A Stream with transform operations" must {
"produce one-to-one transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val p2 = Stream(p).
transform(0)((tot, elem) (tot + elem, List(tot + elem))).
toProducer(gen)
@ -40,7 +39,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
}
"produce one-to-several transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val p2 = Stream(p).
transform(0)((tot, elem) (tot + elem, Vector.fill(elem)(tot + elem))).
toProducer(gen)
@ -60,7 +59,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
}
"produce dropping transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3, 4).iterator)
val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen)
val p2 = Stream(p).
transform(0)((tot, elem) (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))).
toProducer(gen)
@ -77,7 +76,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
}
"produce multi-step transformation as expected" in {
val p = new IteratorProducer(List("a", "bc", "def").iterator)
val p = Stream(List("a", "bc", "def").iterator).toProducer(gen)
val p2 = Stream(p).
transform("") { (str, elem)
val concat = str + elem
@ -107,7 +106,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
}
"invoke onComplete when done" in {
val p = new IteratorProducer(List("a").iterator)
val p = Stream(List("a").iterator).toProducer(gen)
val p2 = Stream(p).transform("")((s, in) (s + in, Nil), x List(x + "B")).toProducer(gen)
val c = StreamTestKit.consumerProbe[String]
p2.produceTo(c)
@ -149,7 +148,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
}
"report error when exception is thrown" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val p2 = Stream(p).
transform(0) { (_, elem)
if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem))
@ -168,7 +167,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
}
"support cancel as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
val p2 = Stream(p).
transform(0) { (_, elem) (0, List(elem, elem)) }.
toProducer(gen)

View file

@ -1,28 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import org.reactivestreams.api.Producer
import akka.stream.impl.IteratorProducer
import scala.concurrent.ExecutionContext
import org.reactivestreams.spi.Subscriber
import org.reactivestreams.spi.Publisher
import org.reactivestreams.api.Consumer
object TestProducer {
def apply[T](iterable: Iterable[T])(implicit executor: ExecutionContext): Producer[T] = apply(iterable.iterator)
def apply[T](iterator: Iterator[T])(implicit executor: ExecutionContext): Producer[T] = new IteratorProducer[T](iterator)
def empty[T]: Producer[T] = EmptyProducer.asInstanceOf[Producer[T]]
}
object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] {
def getPublisher: Publisher[Nothing] = this
def subscribe(subscriber: Subscriber[Nothing]): Unit =
subscriber.onComplete()
def produceTo(consumer: Consumer[Nothing]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
}