diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala index a766d37a68..3c9b787864 100644 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ b/akka-stream/src/main/scala/akka/stream/Stream.scala @@ -2,7 +2,6 @@ * Copyright (C) 2014 Typesafe Inc. */ package akka.stream -// FIXME why plural in the package name? import scala.collection.immutable import asyncrx.api.{ Consumer, Producer } diff --git a/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala index c22c9598e2..02299404c5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala @@ -9,7 +9,10 @@ import asyncrx.spi import SubscriberManagement.ShutDown import ResizableMultiReaderRingBuffer.NothingToReadException -object SubscriberManagement { +/** + * INTERNAL API + */ +private[akka] object SubscriberManagement { sealed trait EndOfStream { def apply[T](subscriber: spi.Subscriber[T]): Unit @@ -30,7 +33,10 @@ object SubscriberManagement { val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) } -trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor { +/** + * INTERNAL API + */ +private[akka] trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor { def subscriber[T]: spi.Subscriber[T] def isActive: Boolean = cursor != Int.MinValue def deactivate(): Unit = cursor = Int.MinValue @@ -45,7 +51,10 @@ trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderR var cursor: Int = 0 // buffer cursor, set to Int.MinValue if this subscription has been cancelled / terminated } -trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors { +/** + * INTERNAL API + */ +private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors { import SubscriberManagement._ type S <: SubscriptionWithCursor type Subscriptions = List[S] @@ -258,10 +267,12 @@ trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors { } /** + * INTERNAL API + * * Implements basic subscriber management as well as efficient "slowest-subscriber-rate" downstream fan-out support * with configurable and adaptive output buffer size. */ -abstract class AbstractProducer[T](val initialBufferSize: Int, val maxBufferSize: Int) +private[akka] abstract class AbstractProducer[T](val initialBufferSize: Int, val maxBufferSize: Int) extends api.Producer[T] with spi.Publisher[T] with SubscriberManagement[T] { type S = Subscription diff --git a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala index 74bbccdb97..5a54344b67 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala @@ -1,19 +1,21 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ package akka.stream.impl import scala.annotation.tailrec import scala.util.control.NoStackTrace import ResizableMultiReaderRingBuffer._ -// FIXME move to impl - /** + * INTERNAL API * A mutable RingBuffer that can grow in size and supports multiple readers. * Contrary to many other ring buffer implementations this one does not automatically overwrite the oldest * elements, rather, if full, the buffer tries to grow and rejects further writes if max capacity is reached. */ -class ResizableMultiReaderRingBuffer[T](initialSize: Int, // constructor param, not field - maxSize: Int, // constructor param, not field - val cursors: Cursors) { +private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // constructor param, not field + maxSize: Int, // constructor param, not field + val cursors: Cursors) { require(Integer.lowestOneBit(maxSize) == maxSize && 0 < maxSize && maxSize <= Int.MaxValue / 2, "maxSize must be a power of 2 that is > 0 and < Int.MaxValue/2") require(Integer.lowestOneBit(initialSize) == initialSize && 0 < initialSize && initialSize <= maxSize, @@ -159,7 +161,10 @@ class ResizableMultiReaderRingBuffer[T](initialSize: Int, // constructor param, s"ResizableMultiReaderRingBuffer(size=$size, writeIx=$writeIx, readIx=$readIx, cursors=${cursors.cursors.size})" } -object ResizableMultiReaderRingBuffer { +/** + * INTERNAL API + */ +private[akka] object ResizableMultiReaderRingBuffer { object NothingToReadException extends RuntimeException with NoStackTrace trait Cursors { diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala index d5648f8d9d..fb79ab1846 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala @@ -1,3 +1,6 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ package akka.stream.impl import asyncrx.api.{ Producer, Consumer, Processor } @@ -116,6 +119,9 @@ private[akka] object ActorProcessor { impl ! ExposedPublisher(a.asInstanceOf[ActorPublisher[Any]]) a } + + def produceTo(consumer: Consumer[T]): Unit = + getPublisher.subscribe(consumer.getSubscriber) } class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] with ActorConsumer[I] with ActorProducer[O] diff --git a/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala index bc4ee77ed0..7280fef0dc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala @@ -1,21 +1,28 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ package akka.stream.impl import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.tailrec import scala.concurrent.ExecutionContext import asyncrx.spi +import asyncrx.api.Consumer /** + * INTERNAL API + * * An efficient producer for iterators. * * CAUTION: This is a convenience wrapper designed for iterators over static collections. * Do *NOT* use it for iterators on lazy collections or other implementations that do more * than merely retrieve an element in their `next()` method! */ -class IteratorProducer[T](iterator: Iterator[T], - maxBufferSize: Int = 16, - maxRecursionLevel: Int = 32, - maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code +private[akka] class IteratorProducer[T]( + iterator: Iterator[T], + maxBufferSize: Int = 16, + maxRecursionLevel: Int = 32, + maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code extends AbstractStrictProducer[T](initialBufferSize = 1, maxBufferSize, maxRecursionLevel, maxSyncBatchSize) { if (!iterator.hasNext) completeDownstream() @@ -30,6 +37,8 @@ class IteratorProducer[T](iterator: Iterator[T], } /** + * INTERNAL API + * * Base class for producers that can provide their elements synchronously. * * For efficiency it tries to produce elements synchronously before returning from `requestMore`. @@ -42,16 +51,20 @@ class IteratorProducer[T](iterator: Iterator[T], * by `AbstractStrictProducer`. If the `maxRecursionLevel` is surpassed the synchronous production * loop is stopped and production of the remaining elements scheduled to the given executor. */ -abstract class AbstractStrictProducer[T](initialBufferSize: Int, - maxBufferSize: Int, - maxRecursionLevel: Int = 32, - maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code +private[akka] abstract class AbstractStrictProducer[T]( + initialBufferSize: Int, + maxBufferSize: Int, + maxRecursionLevel: Int = 32, + maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code extends AbstractProducer[T](initialBufferSize, maxBufferSize) { private[this] val locked = new AtomicBoolean // TODO: replace with AtomicFieldUpdater / sun.misc.Unsafe private[this] var pending = 0L private[this] var recursionLevel = 0 + def produceTo(consumer: Consumer[T]): Unit = + getPublisher.subscribe(consumer.getSubscriber) + /** * Implement with the actual production logic. * It should synchronously call `pushToDownstream(...)` the given number of times. diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala new file mode 100644 index 0000000000..7903f14301 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -0,0 +1,39 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import org.scalatest.testng.TestNGSuiteLike +import asyncrx.spi.Publisher +import asyncrx.api.Processor +import asyncrx.tck.IdentityProcessorVerification +import akka.actor.Props +import akka.stream.impl.ActorProcessor +import akka.stream.impl.TransformProcessorImpl +import akka.stream.impl.Ast +import akka.stream.testkit.TestProducer + +class IdentityProcessorTest extends IdentityProcessorVerification[Int] with WithActorSystem with TestNGSuiteLike { + + def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { + val fanoutSize = maxBufferSize / 2 + val inputSize = maxBufferSize - fanoutSize + + // FIXME can we use API to create the IdentityProcessor instead? + def identityProps(settings: GeneratorSettings): Props = + Props(new TransformProcessorImpl(settings, Ast.Transform(Unit, (_, in: Any) ⇒ (Unit, List(in)), (_: Any) ⇒ Nil))) + + ActorProcessor[Int, Int](system.actorOf(identityProps( + GeneratorSettings( + initialInputBufferSize = inputSize, + maximumInputBufferSize = inputSize, + initialFanOutBufferSize = fanoutSize, + maxFanOutBufferSize = fanoutSize)))) + } + + def createHelperPublisher(elements: Int): Publisher[Int] = { + import system.dispatcher + val iter = Iterator from 1000 + TestProducer(if (elements > 0) iter take elements else iter).getPublisher + } +} diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala new file mode 100644 index 0000000000..aadb86c099 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import org.scalatest.testng.TestNGSuiteLike +import asyncrx.spi.Publisher +import asyncrx.tck.PublisherVerification +import akka.stream.testkit.TestProducer + +class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { + import system.dispatcher + + def createPublisher(elements: Int): Publisher[Int] = { + val iter = Iterator from 1000 + TestProducer(if (elements > 0) iter take elements else iter).getPublisher + } + + override def createCompletedStatePublisher(): Publisher[Int] = + TestProducer(Nil).getPublisher +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala new file mode 100644 index 0000000000..35bd69854e --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala @@ -0,0 +1,387 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.testkit._ +import asyncrx.api.Producer +import org.scalatest.FreeSpecLike + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class StreamSpec extends AkkaSpec { + + import system.dispatcher + + val genSettings = GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 16, + initialFanOutBufferSize = 1, + maxFanOutBufferSize = 16) + + val identity: Stream[Any] ⇒ Stream[Any] = in ⇒ in.map(e ⇒ e) + val identity2: Stream[Any] ⇒ Stream[Any] = in ⇒ identity(in) + + "A Stream" must { + "requests initial elements from upstream" in { + for (op ← List(identity, identity2); n ← List(1, 2, 4)) { + new ChainSetup(op, genSettings.copy(initialInputBufferSize = n)) { + upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize) + } + } + } + + "requests more elements from upstream when downstream requests more elements" in { + new ChainSetup(identity, genSettings) { + upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize) + downstreamSubscription.requestMore(1) + upstream.expectNoMsg(100.millis) + downstreamSubscription.requestMore(2) + upstream.expectNoMsg(100.millis) + upstreamSubscription.sendNext("a") + downstream.expectNext("a") + upstream.expectRequestMore(upstreamSubscription, 1) + upstream.expectNoMsg(100.millis) + upstreamSubscription.sendNext("b") + upstreamSubscription.sendNext("c") + upstreamSubscription.sendNext("d") + downstream.expectNext("b") + downstream.expectNext("c") + upstream.expectRequestMore(upstreamSubscription, 1) + upstream.expectRequestMore(upstreamSubscription, 1) + upstream.expectRequestMore(upstreamSubscription, 1) + } + } + + "deliver events when publisher sends elements and then completes" in { + new ChainSetup(identity, genSettings) { + downstreamSubscription.requestMore(1) + upstreamSubscription.sendNext("test") + upstreamSubscription.sendComplete() + downstream.expectNext("test") + downstream.expectComplete() + } + } + + "deliver complete signal when publisher immediately completes" in { + new ChainSetup(identity, genSettings) { + upstreamSubscription.sendComplete() + downstream.expectComplete() + } + } + + "deliver error signal when publisher immediately fails" in { + new ChainSetup(identity, genSettings) { + object WeirdError extends RuntimeException("weird test exception") + upstreamSubscription.sendError(WeirdError) + downstream.expectError(WeirdError) + } + } + + "single subscriber cancels subscription while receiving data" in { + new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1)) { + downstreamSubscription.requestMore(5) + upstreamSubscription.expectRequestMore(1) + upstreamSubscription.sendNext("test") + upstreamSubscription.expectRequestMore(1) + upstreamSubscription.sendNext("test2") + upstreamSubscription.expectRequestMore(1) + downstream.expectNext("test") + downstream.expectNext("test2") + downstreamSubscription.cancel() + + // because of the "must cancel its upstream Subscription if its last downstream Subscription has been cancelled" rule + upstreamSubscription.expectCancellation() + } + } + + // FIXME the below commented out tests were implemented in ImplementationFactoryOperationSpec and might + // be intersting to port. + // + // "operation publishes Producer" in new InitializedChainSetup[String, api.Producer[String]](Span[String](_ == "end").expose) { + // downstreamSubscription.requestMore(5) + // upstream.expectRequestMore(upstreamSubscription, 1) + // + // upstreamSubscription.sendNext("a") + // val subStream = downstream.expectNext() + // val subStreamConsumer = TestKit.consumerProbe[String]() + // subStream.getPublisher.subscribe(subStreamConsumer.getSubscriber) + // val subStreamSubscription = subStreamConsumer.expectSubscription() + // + // subStreamSubscription.requestMore(1) + // subStreamConsumer.expectNext("a") + // + // subStreamSubscription.requestMore(1) + // upstream.expectRequestMore(upstreamSubscription, 1) + // upstreamSubscription.sendNext("end") + // subStreamConsumer.expectNext("end") + // subStreamConsumer.expectComplete() + // + // upstreamSubscription.sendNext("test") + // val subStream2 = downstream.expectNext() + // val subStreamConsumer2 = TestKit.consumerProbe[String]() + // subStream2.getPublisher.subscribe(subStreamConsumer2.getSubscriber) + // val subStreamSubscription2 = subStreamConsumer2.expectSubscription() + // + // subStreamSubscription2.requestMore(1) + // subStreamConsumer2.expectNext("test") + // + // subStreamSubscription2.requestMore(1) + // upstream.expectRequestMore(upstreamSubscription, 1) + // upstreamSubscription.sendNext("abc") + // subStreamConsumer2.expectNext("abc") + // + // subStreamSubscription2.requestMore(1) + // upstream.expectRequestMore(upstreamSubscription, 1) + // upstreamSubscription.sendNext("end") + // upstreamSubscription.sendComplete() + // downstream.expectComplete() + // subStreamConsumer2.expectNext("end") + // subStreamConsumer2.expectComplete() + // } + // "operation consumes Producer" in new InitializedChainSetup[Source[String], String](Flatten())(factoryWithFanOutBuffer(16)) { + // downstreamSubscription.requestMore(4) + // upstream.expectRequestMore(upstreamSubscription, 1) + // + // val subStream = TestKit.producerProbe[String]() + // upstreamSubscription.sendNext(subStream) + // val subStreamSubscription = subStream.expectSubscription() + // subStream.expectRequestMore(subStreamSubscription, 4) + // subStreamSubscription.sendNext("test") + // downstream.expectNext("test") + // subStreamSubscription.sendNext("abc") + // downstream.expectNext("abc") + // subStreamSubscription.sendComplete() + // + // upstream.expectRequestMore(upstreamSubscription, 1) + // + // val subStream2 = TestKit.producerProbe[String]() + // upstreamSubscription.sendNext(subStream2) + // upstreamSubscription.sendComplete() + // val subStreamSubscription2 = subStream2.expectSubscription() + // subStream2.expectRequestMore(subStreamSubscription2, 2) + // subStreamSubscription2.sendNext("123") + // downstream.expectNext("123") + // + // subStreamSubscription2.sendComplete() + // downstream.expectComplete() + // } + // "combined operation spanning internal subscription" in new InitializedChainSetup[Int, Int](Span[Int](_ % 3 == 0).flatten) { + // downstreamSubscription.requestMore(1) + // upstream.expectRequestMore(upstreamSubscription, 1) + // + // upstreamSubscription.sendNext(1) + // downstream.expectNext(1) + // + // downstreamSubscription.requestMore(1) + // upstream.expectRequestMore(upstreamSubscription, 1) + // upstreamSubscription.sendNext(2) + // downstream.expectNext(2) + // + // downstreamSubscription.requestMore(1) + // upstream.expectRequestMore(upstreamSubscription, 1) + // upstreamSubscription.sendNext(3) + // downstream.expectNext(3) + // + // downstreamSubscription.requestMore(1) + // upstream.expectRequestMore(upstreamSubscription, 1) + // upstreamSubscription.sendNext(4) + // downstream.expectNext(4) + // + // downstreamSubscription.requestMore(1) + // upstream.expectRequestMore(upstreamSubscription, 1) + // upstreamSubscription.sendNext(5) + // upstreamSubscription.sendComplete() + // downstream.expectNext(5) + // downstream.expectComplete() + // } + + } + + "A Stream with multiple subscribers (FanOutBox)" must { + "adapt speed to the currently slowest consumer" in { + new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + val downstream2 = StreamTestKit.consumerProbe[Any]() + producer.produceTo(downstream2) + val downstream2Subscription = downstream2.expectSubscription() + + downstreamSubscription.requestMore(5) + upstream.expectRequestMore(upstreamSubscription, 1) // because initialInputBufferSize=1 + + upstreamSubscription.sendNext("firstElement") + downstream.expectNext("firstElement") + + downstream2Subscription.requestMore(1) + downstream2.expectNext("firstElement") + upstream.expectRequestMore(upstreamSubscription, 1) + upstreamSubscription.sendNext("element2") + + downstream.expectNext("element2") + + downstream2Subscription.requestMore(1) + downstream2.expectNext("element2") + } + } + + "incoming subscriber while elements were requested before" in { + new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + downstreamSubscription.requestMore(5) + upstream.expectRequestMore(upstreamSubscription, 1) + upstreamSubscription.sendNext("a1") + downstream.expectNext("a1") + + upstream.expectRequestMore(upstreamSubscription, 1) + upstreamSubscription.sendNext("a2") + downstream.expectNext("a2") + + upstream.expectRequestMore(upstreamSubscription, 1) + + // link now while an upstream element is already requested + val downstream2 = StreamTestKit.consumerProbe[Any]() + producer.produceTo(downstream2) + val downstream2Subscription = downstream2.expectSubscription() + + // situation here: + // downstream 1 now has 3 outstanding + // downstream 2 has 0 outstanding + + upstreamSubscription.sendNext("a3") + downstream.expectNext("a3") + downstream2.expectNoMsg(100.millis.dilated) // as nothing was requested yet, fanOutBox needs to cache element in this case + + downstream2Subscription.requestMore(1) + downstream2.expectNext("a3") + + // d1 now has 2 outstanding + // d2 now has 0 outstanding + // buffer should be empty so we should be requesting one new element + + upstream.expectRequestMore(upstreamSubscription, 1) // because of buffer size 1 + } + } + + // FIXME failing test + "blocking subscriber cancels subscription" ignore { + new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + val downstream2 = StreamTestKit.consumerProbe[Any]() + producer.produceTo(downstream2) + val downstream2Subscription = downstream2.expectSubscription() + + downstreamSubscription.requestMore(5) + upstreamSubscription.expectRequestMore(1) + + upstreamSubscription.sendNext("firstElement") + downstream.expectNext("firstElement") + + downstream2Subscription.requestMore(1) + downstream2.expectNext("firstElement") + upstreamSubscription.expectRequestMore(1) + upstreamSubscription.sendNext("element2") + upstreamSubscription.sendNext("element3") + upstreamSubscription.sendNext("element4") + + upstreamSubscription.expectRequestMore(1) + downstream2.expectNoMsg(100.millis.dilated) + + downstream.expectNext("element2") + downstream.expectNoMsg(200.millis.dilated) + upstream.expectNoMsg(100.millis.dilated) + // should unblock fanoutbox + downstream2Subscription.cancel() + upstreamSubscription.expectRequestMore(1) + downstream2.expectNoMsg(200.millis.dilated) + // FIXME fails, "element3" disappears, "element4" is delivered here + downstream.expectNext("element3") + downstream.expectNext("element4") + } + } + + "after initial upstream was completed future subscribers' onComplete should be called instead of onSubscribed" in { + new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + val downstream2 = StreamTestKit.consumerProbe[Any]() + // don't link it just yet + + downstreamSubscription.requestMore(5) + upstream.expectRequestMore(upstreamSubscription, 1) + upstreamSubscription.sendNext("a1") + downstream.expectNext("a1") + + upstream.expectRequestMore(upstreamSubscription, 1) + upstreamSubscription.sendNext("a2") + downstream.expectNext("a2") + + upstream.expectRequestMore(upstreamSubscription, 1) + + // link now while an upstream element is already requested + producer.produceTo(downstream2) + val downstream2Subscription = downstream2.expectSubscription() + + upstreamSubscription.sendNext("a3") + upstreamSubscription.sendComplete() + downstream.expectNext("a3") + downstream.expectComplete() + + downstream2.expectNoMsg(100.millis.dilated) // as nothing was requested yet, fanOutBox needs to cache element in this case + + downstream2Subscription.requestMore(1) + downstream2.expectNext("a3") + downstream2.expectComplete() + + // FIXME when adding a sleep before the following link this will fail with IllegalStateExc shut-down + // what is the expected shutdown behavior? Is the title of this test wrong? + // val downstream3 = StreamTestKit.consumerProbe[Any]() + // producer.produceTo(downstream3) + // downstream3.expectComplete() + } + } + + "after initial upstream reported an error future subscribers' onError should be called instead of onSubscribed" in { + new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { + downstreamSubscription.requestMore(1) + upstreamSubscription.expectRequestMore(1) + + upstreamSubscription.sendNext(5) + upstreamSubscription.expectRequestMore(1) + upstreamSubscription.expectCancellation() + downstream.expectError(TestException) + + val downstream2 = StreamTestKit.consumerProbe[String]() + producer.produceTo(downstream2) + // IllegalStateException shut down + downstream2.expectError().getClass should be(classOf[IllegalStateException]) + } + } + + "when all subscriptions were cancelled future subscribers' onError should be called" in { + new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1)) { + upstreamSubscription.expectRequestMore(1) + downstreamSubscription.cancel() + upstreamSubscription.expectCancellation() + + val downstream2 = StreamTestKit.consumerProbe[Any]() + producer.produceTo(downstream2) + // IllegalStateException shut down + downstream2.expectError().getClass should be(classOf[IllegalStateException]) + } + } + + "if an internal error occurs upstream should be cancelled" in pending + "if an internal error occurs subscribers' onError method should be called" in pending + "if an internal error occurs future subscribers' onError should be called instead of onSubscribed" in pending + + } + + object TestException extends RuntimeException + + class ChainSetup[I, O](stream: Stream[I] ⇒ Stream[O], val settings: GeneratorSettings) { + val upstream = StreamTestKit.producerProbe[I]() + val downstream = StreamTestKit.consumerProbe[O]() + + private val s = stream(Stream(upstream)) + val producer = s.toProducer(ProcessorGenerator(settings)) + val upstreamSubscription = upstream.expectSubscription() + producer.produceTo(downstream) + val downstreamSubscription = downstream.expectSubscription() + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala new file mode 100644 index 0000000000..f223356802 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala @@ -0,0 +1,141 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.testkit.AkkaSpec +import akka.stream.impl.IteratorProducer + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class StreamTransformSpec extends AkkaSpec { + + import system.dispatcher + + val gen = ProcessorGenerator(GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2)) + + "A Stream with transform operations" must { + "produce one-to-one transformation as expected" in { + val p = new IteratorProducer(List(1, 2, 3).iterator) + val p2 = Stream(p). + transform(0)((tot, elem) ⇒ (tot + elem, List(tot + elem))). + toProducer(gen) + val consumer = StreamTestKit.consumerProbe[Int] + p2.produceTo(consumer) + val subscription = consumer.expectSubscription() + subscription.requestMore(1) + consumer.expectNext(1) + consumer.expectNoMsg(200.millis) + subscription.requestMore(2) + consumer.expectNext(3) + consumer.expectNext(6) + consumer.expectComplete() + } + + "produce one-to-several transformation as expected" in { + val p = new IteratorProducer(List(1, 2, 3).iterator) + val p2 = Stream(p). + transform(0)((tot, elem) ⇒ (tot + elem, Vector.fill(elem)(tot + elem))). + toProducer(gen) + val consumer = StreamTestKit.consumerProbe[Int] + p2.produceTo(consumer) + val subscription = consumer.expectSubscription() + subscription.requestMore(4) + consumer.expectNext(1) + consumer.expectNext(3) + consumer.expectNext(3) + consumer.expectNext(6) + consumer.expectNoMsg(200.millis) + subscription.requestMore(100) + consumer.expectNext(6) + consumer.expectNext(6) + consumer.expectComplete() + } + + "produce dropping transformation as expected" in { + val p = new IteratorProducer(List(1, 2, 3, 4).iterator) + val p2 = Stream(p). + transform(0)((tot, elem) ⇒ (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))). + toProducer(gen) + val consumer = StreamTestKit.consumerProbe[Int] + p2.produceTo(consumer) + val subscription = consumer.expectSubscription() + subscription.requestMore(1) + consumer.expectNext(1) + consumer.expectNoMsg(200.millis) + subscription.requestMore(1) + consumer.expectNext(6) + consumer.expectComplete() + } + + "produce multi-step transformation as expected" in { + val p = new IteratorProducer(List("a", "bc", "def").iterator) + val p2 = Stream(p). + transform("") { (str, elem) ⇒ + val concat = str + elem + (concat, List(concat.length)) + }. + transform(0)((tot, length) ⇒ (tot + length, List(tot + length))). + toProducer(gen) + val c1 = StreamTestKit.consumerProbe[Int] + p2.produceTo(c1) + val sub1 = c1.expectSubscription() + val c2 = StreamTestKit.consumerProbe[Int] + p2.produceTo(c2) + val sub2 = c2.expectSubscription() + sub1.requestMore(1) + sub2.requestMore(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(4) + c1.expectNoMsg(200.millis) + sub1.requestMore(2) + sub2.requestMore(2) + c1.expectNext(4) + c1.expectNext(10) + c2.expectNext(10) + c1.expectComplete() + c2.expectComplete() + } + + "report error when exception is thrown" in { + val p = new IteratorProducer(List(1, 2, 3).iterator) + val p2 = Stream(p). + transform(0) { (_, elem) ⇒ + if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem)) + }. + toProducer(gen) + val consumer = StreamTestKit.consumerProbe[Int] + p2.produceTo(consumer) + val subscription = consumer.expectSubscription() + subscription.requestMore(100) + consumer.expectNext(1) + consumer.expectNext(1) + consumer.expectError().getMessage should be("two not allowed") + consumer.expectNoMsg(200.millis) + } + + "support cancel as expected" in { + val p = new IteratorProducer(List(1, 2, 3).iterator) + val p2 = Stream(p). + transform(0) { (_, elem) ⇒ (0, List(elem, elem)) }. + toProducer(gen) + val consumer = StreamTestKit.consumerProbe[Int] + p2.produceTo(consumer) + val subscription = consumer.expectSubscription() + subscription.requestMore(2) + consumer.expectNext(1) + subscription.cancel() + consumer.expectNext(1) + consumer.expectNoMsg(500.millis) + subscription.requestMore(2) + consumer.expectNoMsg(200.millis) + } + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala new file mode 100644 index 0000000000..6edaffe4ec --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import akka.actor.ActorSystem +import org.testng.annotations.AfterClass + +trait WithActorSystem { + val system: ActorSystem = ActorSystem(getClass.getSimpleName) + + @AfterClass + def shutdownActorSystem(): Unit = system.shutdown() +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala b/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala new file mode 100644 index 0000000000..a05caa01d3 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala @@ -0,0 +1,198 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.util.Random +import org.scalatest.{ ShouldMatchers, WordSpec } +import akka.stream.impl.ResizableMultiReaderRingBuffer._ + +class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { + + "A ResizableMultiReaderRingBuffer" should { + + "initially be empty (1)" in new Test(iSize = 2, mSize = 4, cursorCount = 1) { + inspect shouldEqual "0 0 (size=0, writeIx=0, readIx=0, cursors=1)" + } + + "initially be empty (2)" in new Test(iSize = 4, mSize = 4, cursorCount = 3) { + inspect shouldEqual "0 0 0 0 (size=0, writeIx=0, readIx=0, cursors=3)" + } + + "fail reads if nothing can be read" in new Test(iSize = 4, mSize = 4, cursorCount = 3) { + write(1) shouldEqual true + write(2) shouldEqual true + write(3) shouldEqual true + inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=3)" + read(0) shouldEqual 1 + read(0) shouldEqual 2 + read(1) shouldEqual 1 + inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=3)" + read(0) shouldEqual 3 + read(0) shouldEqual null + read(1) shouldEqual 2 + inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=3)" + read(2) shouldEqual 1 + inspect shouldEqual "1 2 3 0 (size=2, writeIx=3, readIx=1, cursors=3)" + read(1) shouldEqual 3 + read(1) shouldEqual null + read(2) shouldEqual 2 + read(2) shouldEqual 3 + inspect shouldEqual "1 2 3 0 (size=0, writeIx=3, readIx=3, cursors=3)" + } + + "fail writes if there is no more space" in new Test(iSize = 4, mSize = 4, cursorCount = 2) { + write(1) shouldEqual true + write(2) shouldEqual true + write(3) shouldEqual true + write(4) shouldEqual true + write(5) shouldEqual false + read(0) shouldEqual 1 + write(5) shouldEqual false + read(1) shouldEqual 1 + write(5) shouldEqual true + read(0) shouldEqual 2 + read(0) shouldEqual 3 + read(0) shouldEqual 4 + read(1) shouldEqual 2 + write(6) shouldEqual true + inspect shouldEqual "5 6 3 4 (size=4, writeIx=6, readIx=2, cursors=2)" + read(0) shouldEqual 5 + read(0) shouldEqual 6 + read(0) shouldEqual null + read(1) shouldEqual 3 + read(1) shouldEqual 4 + read(1) shouldEqual 5 + read(1) shouldEqual 6 + read(1) shouldEqual null + inspect shouldEqual "5 6 3 4 (size=0, writeIx=2, readIx=2, cursors=2)" + write(7) shouldEqual true + write(8) shouldEqual true + write(9) shouldEqual true + inspect shouldEqual "9 6 7 8 (size=3, writeIx=5, readIx=2, cursors=2)" + read(0) shouldEqual 7 + read(0) shouldEqual 8 + read(0) shouldEqual 9 + read(0) shouldEqual null + read(1) shouldEqual 7 + read(1) shouldEqual 8 + read(1) shouldEqual 9 + read(1) shouldEqual null + inspect shouldEqual "9 6 7 8 (size=0, writeIx=5, readIx=5, cursors=2)" + } + + "automatically grow if possible" in new Test(iSize = 2, mSize = 8, cursorCount = 2) { + write(1) shouldEqual true + inspect shouldEqual "1 0 (size=1, writeIx=1, readIx=0, cursors=2)" + write(2) shouldEqual true + inspect shouldEqual "1 2 (size=2, writeIx=2, readIx=0, cursors=2)" + write(3) shouldEqual true + inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=2)" + write(4) shouldEqual true + inspect shouldEqual "1 2 3 4 (size=4, writeIx=4, readIx=0, cursors=2)" + read(0) shouldEqual 1 + read(0) shouldEqual 2 + read(0) shouldEqual 3 + read(1) shouldEqual 1 + read(1) shouldEqual 2 + write(5) shouldEqual true + inspect shouldEqual "5 2 3 4 (size=3, writeIx=5, readIx=2, cursors=2)" + write(6) shouldEqual true + inspect shouldEqual "5 6 3 4 (size=4, writeIx=6, readIx=2, cursors=2)" + write(7) shouldEqual true + inspect shouldEqual "3 4 5 6 7 0 0 0 (size=5, writeIx=5, readIx=0, cursors=2)" + read(0) shouldEqual 4 + read(0) shouldEqual 5 + read(0) shouldEqual 6 + read(0) shouldEqual 7 + read(0) shouldEqual null + read(1) shouldEqual 3 + read(1) shouldEqual 4 + read(1) shouldEqual 5 + read(1) shouldEqual 6 + read(1) shouldEqual 7 + read(1) shouldEqual null + inspect shouldEqual "3 4 5 6 7 0 0 0 (size=0, writeIx=5, readIx=5, cursors=2)" + } + + "pass the stress test" in { + // create 100 buffers with an initialSize of 1 and a maxSize of 1 to 64, + // for each one attach 1 to 8 cursors and randomly try reading and writing to the buffer; + // in total 200 elements need to be written to the buffer and read in the correct order by each cursor + val MAXSIZEBIT_LIMIT = 6 // 2 ^ (this number) + val COUNTER_LIMIT = 200 + val LOG = false + val sb = new java.lang.StringBuilder + def log(s: ⇒ String): Unit = if (LOG) sb.append(s) + + class StressTestCursor(cursorNr: Int, run: Int) extends Cursor { + var cursor: Int = _ + var counter = 1 + def tryReadAndReturnTrueIfDone(buf: TestBuffer): Boolean = { + log(s" Try reading of $toString: ") + try { + val x = buf.read(this) + log("OK\n") + if (x != counter) + fail(s"""|Run $run, cursorNr $cursorNr, counter $counter: got unexpected $x + | Buf: ${buf.inspect} + | Cursors: ${buf.cursors.cursors.mkString("\n ")} + |Log:\n$sb + """.stripMargin) + counter += 1 + counter == COUNTER_LIMIT + } catch { + case NothingToReadException ⇒ log("FAILED\n"); false // ok, we currently can't read, try again later + } + } + override def toString: String = s"cursorNr $cursorNr, ix $cursor, counter $counter" + } + + val random = new Random + for { + bit ← 1 to MAXSIZEBIT_LIMIT + n ← 1 to 2 + } { + var counter = 1 + var activeCursors = List.tabulate(random.nextInt(8) + 1)(new StressTestCursor(_, 1 << bit)) + var stillWriting = 2 // give writing a slight bias, so as to somewhat "stretch" the buffer + val buf = new TestBuffer(1, 1 << bit, new Cursors { def cursors = activeCursors }) + sb.setLength(0) + while (activeCursors.nonEmpty) { + log(s"Buf: ${buf.inspect}\n") + val activeCursorCount = activeCursors.size + val index = random.nextInt(activeCursorCount + stillWriting) + if (index >= activeCursorCount) { + log(s" Writing $counter: ") + if (buf.write(counter)) { + log("OK\n") + counter += 1 + } else { + log("FAILED\n") + if (counter == COUNTER_LIMIT) stillWriting = 0 + } + } else { + val cursor = activeCursors(index) + if (cursor.tryReadAndReturnTrueIfDone(buf)) + activeCursors = activeCursors.filter(_ != cursor) + } + } + } + } + } + + class TestBuffer(iSize: Int, mSize: Int, cursors: Cursors) extends ResizableMultiReaderRingBuffer[Int](iSize, mSize, cursors) { + def inspect: String = + underlyingArray.map(x ⇒ if (x == null) 0 else x).mkString("", " ", " " + toString.dropWhile(_ != '(')) + } + + class Test(iSize: Int, mSize: Int, cursorCount: Int) extends TestBuffer(iSize, mSize, new SimpleCursors(cursorCount)) { + def read(cursorIx: Int): Integer = + try read(cursors.cursors(cursorIx)) catch { case NothingToReadException ⇒ null } + override def rebaseThreshold: Int = underlyingArray.length // use a low threshold in order to test the rebasing logic + } + + class SimpleCursors(cursorCount: Int) extends Cursors { + val cursors: List[Cursor] = List.fill(cursorCount)(new Cursor { var cursor: Int = _ }) + } +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/AkkaConsumerProbe.scala b/akka-stream/src/test/scala/akka/stream/testkit/AkkaConsumerProbe.scala new file mode 100644 index 0000000000..f08e773737 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/AkkaConsumerProbe.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import akka.testkit.TestProbe + +trait AkkaConsumerProbe[I] extends ConsumerProbe[I] { + def probe: TestProbe +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/AkkaProducerProbe.scala b/akka-stream/src/test/scala/akka/stream/testkit/AkkaProducerProbe.scala new file mode 100644 index 0000000000..7a5785e0ba --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/AkkaProducerProbe.scala @@ -0,0 +1,10 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import akka.testkit.TestProbe + +trait AkkaProducerProbe[I] extends ProducerProbe[I] { + def probe: TestProbe +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ConsumerProbe.scala b/akka-stream/src/test/scala/akka/stream/testkit/ConsumerProbe.scala new file mode 100644 index 0000000000..e3a6f1f4ae --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/ConsumerProbe.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import scala.concurrent.duration.FiniteDuration +import asyncrx.api.Consumer +import asyncrx.spi.Subscription + +sealed trait ConsumerEvent +case class OnSubscribe(subscription: Subscription) extends ConsumerEvent +case class OnNext[I](element: I) extends ConsumerEvent +case object OnComplete extends ConsumerEvent +case class OnError(cause: Throwable) extends ConsumerEvent + +trait ConsumerProbe[I] extends Consumer[I] { + def expectSubscription(): Subscription + def expectEvent(event: ConsumerEvent): Unit + def expectNext(element: I): Unit + def expectNext(e1: I, e2: I, es: I*): Unit + def expectNext(): I + def expectError(cause: Throwable): Unit + def expectError(): Throwable + def expectComplete(): Unit + + def expectNoMsg(): Unit + def expectNoMsg(max: FiniteDuration): Unit +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ProducerProbe.scala b/akka-stream/src/test/scala/akka/stream/testkit/ProducerProbe.scala new file mode 100644 index 0000000000..f0a7636971 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/ProducerProbe.scala @@ -0,0 +1,34 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import asyncrx.spi.{ Subscriber, Subscription } +import asyncrx.api.Producer +import scala.concurrent.duration.FiniteDuration +import asyncrx.api.Consumer + +sealed trait ProducerEvent +case class Subscribe(subscription: Subscription) extends ProducerEvent +case class CancelSubscription(subscription: Subscription) extends ProducerEvent +case class RequestMore(subscription: Subscription, elements: Int) extends ProducerEvent + +abstract case class ActiveSubscription[I](subscriber: Subscriber[I]) extends Subscription { + def sendNext(element: I): Unit + def sendComplete(): Unit + def sendError(cause: Exception): Unit + + def expectCancellation(): Unit + def expectRequestMore(n: Int): Unit +} + +trait ProducerProbe[I] extends Producer[I] { + def expectSubscription(): ActiveSubscription[I] + def expectRequestMore(subscription: Subscription, n: Int): Unit + + def expectNoMsg(): Unit + def expectNoMsg(max: FiniteDuration): Unit + + def produceTo(consumer: Consumer[I]): Unit = + getPublisher.subscribe(consumer.getSubscriber) +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala new file mode 100644 index 0000000000..de42d74d73 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import akka.testkit.TestProbe +import asyncrx.spi.{ Publisher, Subscriber, Subscription } +import asyncrx.tck._ +import akka.actor.ActorSystem +import scala.concurrent.duration.FiniteDuration +import scala.annotation.tailrec + +object StreamTestKit { + def consumerProbe[I]()(implicit system: ActorSystem): AkkaConsumerProbe[I] = + new AkkaConsumerProbe[I] with Subscriber[I] { outer ⇒ + lazy val probe = TestProbe() + + def expectSubscription(): Subscription = probe.expectMsgType[OnSubscribe].subscription + def expectEvent(event: ConsumerEvent): Unit = probe.expectMsg(event) + def expectNext(element: I): Unit = probe.expectMsg(OnNext(element)) + def expectNext(e1: I, e2: I, es: I*): Unit = { + val all = e1 +: e2 +: es + all.foreach(e ⇒ probe.expectMsg(OnNext(e))) + } + + def expectNext(): I = probe.expectMsgType[OnNext[I]].element + def expectComplete(): Unit = probe.expectMsg(OnComplete) + def expectError(cause: Throwable): Unit = probe.expectMsg(OnError(cause)) + def expectError(): Throwable = probe.expectMsgType[OnError].cause + + def expectNoMsg(): Unit = probe.expectNoMsg() + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + + def onSubscribe(subscription: Subscription): Unit = probe.ref ! OnSubscribe(subscription) + def onNext(element: I): Unit = probe.ref ! OnNext(element) + def onComplete(): Unit = probe.ref ! OnComplete + def onError(cause: Throwable): Unit = probe.ref ! OnError(cause) + + def getSubscriber: Subscriber[I] = this + } + + def producerProbe[I]()(implicit system: ActorSystem): AkkaProducerProbe[I] = + new AkkaProducerProbe[I] with Publisher[I] { + lazy val probe: TestProbe = TestProbe() + + def subscribe(subscriber: Subscriber[I]): Unit = { + lazy val subscription: ActiveSubscription[I] = new ActiveSubscription[I](subscriber) { + def requestMore(elements: Int): Unit = probe.ref ! RequestMore(subscription, elements) + def cancel(): Unit = probe.ref ! CancelSubscription(subscription) + + def expectRequestMore(n: Int): Unit = probe.expectMsg(RequestMore(subscription, n)) + def expectCancellation(): Unit = probe.expectMsg(CancelSubscription(this)) + + def sendNext(element: I): Unit = subscriber.onNext(element) + def sendComplete(): Unit = subscriber.onComplete() + def sendError(cause: Exception): Unit = subscriber.onError(cause) + } + probe.ref ! Subscribe(subscription) + subscriber.onSubscribe(subscription) + } + + def expectSubscription(): ActiveSubscription[I] = + probe.expectMsgType[Subscribe].subscription.asInstanceOf[ActiveSubscription[I]] + + def expectRequestMore(subscription: Subscription, n: Int): Unit = probe.expectMsg(RequestMore(subscription, n)) + + def expectNoMsg(): Unit = probe.expectNoMsg() + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + + def getPublisher: Publisher[I] = this + } +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/TestProducer.scala b/akka-stream/src/test/scala/akka/stream/testkit/TestProducer.scala new file mode 100644 index 0000000000..b256b961b8 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/testkit/TestProducer.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.testkit + +import asyncrx.api.Producer +import akka.stream.impl.IteratorProducer +import scala.concurrent.ExecutionContext +import asyncrx.spi.Subscriber +import asyncrx.spi.Publisher +import asyncrx.api.Consumer + +object TestProducer { + def apply[T](iterable: Iterable[T])(implicit executor: ExecutionContext): Producer[T] = apply(iterable.iterator) + def apply[T](iterator: Iterator[T])(implicit executor: ExecutionContext): Producer[T] = new IteratorProducer[T](iterator) + def empty[T]: Producer[T] = EmptyProducer.asInstanceOf[Producer[T]] +} + +object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] { + def getPublisher: Publisher[Nothing] = this + + def subscribe(subscriber: Subscriber[Nothing]): Unit = + subscriber.onComplete() + + def produceTo(consumer: Consumer[Nothing]): Unit = + getPublisher.subscribe(consumer.getSubscriber) + +} \ No newline at end of file