!str Add tests

This commit is contained in:
Patrik Nordwall 2014-03-28 12:13:57 +01:00 committed by Roland Kuhn
parent 4a9f16f437
commit 156b661bfc
17 changed files with 1035 additions and 19 deletions

View file

@ -2,7 +2,6 @@
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream package akka.stream
// FIXME why plural in the package name?
import scala.collection.immutable import scala.collection.immutable
import asyncrx.api.{ Consumer, Producer } import asyncrx.api.{ Consumer, Producer }

View file

@ -9,7 +9,10 @@ import asyncrx.spi
import SubscriberManagement.ShutDown import SubscriberManagement.ShutDown
import ResizableMultiReaderRingBuffer.NothingToReadException import ResizableMultiReaderRingBuffer.NothingToReadException
object SubscriberManagement { /**
* INTERNAL API
*/
private[akka] object SubscriberManagement {
sealed trait EndOfStream { sealed trait EndOfStream {
def apply[T](subscriber: spi.Subscriber[T]): Unit def apply[T](subscriber: spi.Subscriber[T]): Unit
@ -30,7 +33,10 @@ object SubscriberManagement {
val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher"))
} }
trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor { /**
* INTERNAL API
*/
private[akka] trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor {
def subscriber[T]: spi.Subscriber[T] def subscriber[T]: spi.Subscriber[T]
def isActive: Boolean = cursor != Int.MinValue def isActive: Boolean = cursor != Int.MinValue
def deactivate(): Unit = cursor = Int.MinValue def deactivate(): Unit = cursor = Int.MinValue
@ -45,7 +51,10 @@ trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderR
var cursor: Int = 0 // buffer cursor, set to Int.MinValue if this subscription has been cancelled / terminated var cursor: Int = 0 // buffer cursor, set to Int.MinValue if this subscription has been cancelled / terminated
} }
trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors { /**
* INTERNAL API
*/
private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors {
import SubscriberManagement._ import SubscriberManagement._
type S <: SubscriptionWithCursor type S <: SubscriptionWithCursor
type Subscriptions = List[S] type Subscriptions = List[S]
@ -258,10 +267,12 @@ trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors {
} }
/** /**
* INTERNAL API
*
* Implements basic subscriber management as well as efficient "slowest-subscriber-rate" downstream fan-out support * Implements basic subscriber management as well as efficient "slowest-subscriber-rate" downstream fan-out support
* with configurable and adaptive output buffer size. * with configurable and adaptive output buffer size.
*/ */
abstract class AbstractProducer[T](val initialBufferSize: Int, val maxBufferSize: Int) private[akka] abstract class AbstractProducer[T](val initialBufferSize: Int, val maxBufferSize: Int)
extends api.Producer[T] with spi.Publisher[T] with SubscriberManagement[T] { extends api.Producer[T] with spi.Publisher[T] with SubscriberManagement[T] {
type S = Subscription type S = Subscription

View file

@ -1,19 +1,21 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl package akka.stream.impl
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import ResizableMultiReaderRingBuffer._ import ResizableMultiReaderRingBuffer._
// FIXME move to impl
/** /**
* INTERNAL API
* A mutable RingBuffer that can grow in size and supports multiple readers. * A mutable RingBuffer that can grow in size and supports multiple readers.
* Contrary to many other ring buffer implementations this one does not automatically overwrite the oldest * Contrary to many other ring buffer implementations this one does not automatically overwrite the oldest
* elements, rather, if full, the buffer tries to grow and rejects further writes if max capacity is reached. * elements, rather, if full, the buffer tries to grow and rejects further writes if max capacity is reached.
*/ */
class ResizableMultiReaderRingBuffer[T](initialSize: Int, // constructor param, not field private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // constructor param, not field
maxSize: Int, // constructor param, not field maxSize: Int, // constructor param, not field
val cursors: Cursors) { val cursors: Cursors) {
require(Integer.lowestOneBit(maxSize) == maxSize && 0 < maxSize && maxSize <= Int.MaxValue / 2, require(Integer.lowestOneBit(maxSize) == maxSize && 0 < maxSize && maxSize <= Int.MaxValue / 2,
"maxSize must be a power of 2 that is > 0 and < Int.MaxValue/2") "maxSize must be a power of 2 that is > 0 and < Int.MaxValue/2")
require(Integer.lowestOneBit(initialSize) == initialSize && 0 < initialSize && initialSize <= maxSize, require(Integer.lowestOneBit(initialSize) == initialSize && 0 < initialSize && initialSize <= maxSize,
@ -159,7 +161,10 @@ class ResizableMultiReaderRingBuffer[T](initialSize: Int, // constructor param,
s"ResizableMultiReaderRingBuffer(size=$size, writeIx=$writeIx, readIx=$readIx, cursors=${cursors.cursors.size})" s"ResizableMultiReaderRingBuffer(size=$size, writeIx=$writeIx, readIx=$readIx, cursors=${cursors.cursors.size})"
} }
object ResizableMultiReaderRingBuffer { /**
* INTERNAL API
*/
private[akka] object ResizableMultiReaderRingBuffer {
object NothingToReadException extends RuntimeException with NoStackTrace object NothingToReadException extends RuntimeException with NoStackTrace
trait Cursors { trait Cursors {

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl package akka.stream.impl
import asyncrx.api.{ Producer, Consumer, Processor } import asyncrx.api.{ Producer, Consumer, Processor }
@ -116,6 +119,9 @@ private[akka] object ActorProcessor {
impl ! ExposedPublisher(a.asInstanceOf[ActorPublisher[Any]]) impl ! ExposedPublisher(a.asInstanceOf[ActorPublisher[Any]])
a a
} }
def produceTo(consumer: Consumer[T]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
} }
class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] with ActorConsumer[I] with ActorProducer[O] class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] with ActorConsumer[I] with ActorProducer[O]

View file

@ -1,21 +1,28 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.concurrent.ExecutionContext import scala.concurrent.ExecutionContext
import asyncrx.spi import asyncrx.spi
import asyncrx.api.Consumer
/** /**
* INTERNAL API
*
* An efficient producer for iterators. * An efficient producer for iterators.
* *
* CAUTION: This is a convenience wrapper designed for iterators over static collections. * CAUTION: This is a convenience wrapper designed for iterators over static collections.
* Do *NOT* use it for iterators on lazy collections or other implementations that do more * Do *NOT* use it for iterators on lazy collections or other implementations that do more
* than merely retrieve an element in their `next()` method! * than merely retrieve an element in their `next()` method!
*/ */
class IteratorProducer[T](iterator: Iterator[T], private[akka] class IteratorProducer[T](
maxBufferSize: Int = 16, iterator: Iterator[T],
maxRecursionLevel: Int = 32, maxBufferSize: Int = 16,
maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code maxRecursionLevel: Int = 32,
maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code
extends AbstractStrictProducer[T](initialBufferSize = 1, maxBufferSize, maxRecursionLevel, maxSyncBatchSize) { extends AbstractStrictProducer[T](initialBufferSize = 1, maxBufferSize, maxRecursionLevel, maxSyncBatchSize) {
if (!iterator.hasNext) completeDownstream() if (!iterator.hasNext) completeDownstream()
@ -30,6 +37,8 @@ class IteratorProducer[T](iterator: Iterator[T],
} }
/** /**
* INTERNAL API
*
* Base class for producers that can provide their elements synchronously. * Base class for producers that can provide their elements synchronously.
* *
* For efficiency it tries to produce elements synchronously before returning from `requestMore`. * For efficiency it tries to produce elements synchronously before returning from `requestMore`.
@ -42,16 +51,20 @@ class IteratorProducer[T](iterator: Iterator[T],
* by `AbstractStrictProducer`. If the `maxRecursionLevel` is surpassed the synchronous production * by `AbstractStrictProducer`. If the `maxRecursionLevel` is surpassed the synchronous production
* loop is stopped and production of the remaining elements scheduled to the given executor. * loop is stopped and production of the remaining elements scheduled to the given executor.
*/ */
abstract class AbstractStrictProducer[T](initialBufferSize: Int, private[akka] abstract class AbstractStrictProducer[T](
maxBufferSize: Int, initialBufferSize: Int,
maxRecursionLevel: Int = 32, maxBufferSize: Int,
maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code maxRecursionLevel: Int = 32,
maxSyncBatchSize: Int = 128)(implicit executor: ExecutionContext) //FIXME Remove defaults in code
extends AbstractProducer[T](initialBufferSize, maxBufferSize) { extends AbstractProducer[T](initialBufferSize, maxBufferSize) {
private[this] val locked = new AtomicBoolean // TODO: replace with AtomicFieldUpdater / sun.misc.Unsafe private[this] val locked = new AtomicBoolean // TODO: replace with AtomicFieldUpdater / sun.misc.Unsafe
private[this] var pending = 0L private[this] var pending = 0L
private[this] var recursionLevel = 0 private[this] var recursionLevel = 0
def produceTo(consumer: Consumer[T]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
/** /**
* Implement with the actual production logic. * Implement with the actual production logic.
* It should synchronously call `pushToDownstream(...)` the given number of times. * It should synchronously call `pushToDownstream(...)` the given number of times.

View file

@ -0,0 +1,39 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import asyncrx.spi.Publisher
import asyncrx.api.Processor
import asyncrx.tck.IdentityProcessorVerification
import akka.actor.Props
import akka.stream.impl.ActorProcessor
import akka.stream.impl.TransformProcessorImpl
import akka.stream.impl.Ast
import akka.stream.testkit.TestProducer
class IdentityProcessorTest extends IdentityProcessorVerification[Int] with WithActorSystem with TestNGSuiteLike {
def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val fanoutSize = maxBufferSize / 2
val inputSize = maxBufferSize - fanoutSize
// FIXME can we use API to create the IdentityProcessor instead?
def identityProps(settings: GeneratorSettings): Props =
Props(new TransformProcessorImpl(settings, Ast.Transform(Unit, (_, in: Any) (Unit, List(in)), (_: Any) Nil)))
ActorProcessor[Int, Int](system.actorOf(identityProps(
GeneratorSettings(
initialInputBufferSize = inputSize,
maximumInputBufferSize = inputSize,
initialFanOutBufferSize = fanoutSize,
maxFanOutBufferSize = fanoutSize))))
}
def createHelperPublisher(elements: Int): Publisher[Int] = {
import system.dispatcher
val iter = Iterator from 1000
TestProducer(if (elements > 0) iter take elements else iter).getPublisher
}
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import asyncrx.spi.Publisher
import asyncrx.tck.PublisherVerification
import akka.stream.testkit.TestProducer
class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
import system.dispatcher
def createPublisher(elements: Int): Publisher[Int] = {
val iter = Iterator from 1000
TestProducer(if (elements > 0) iter take elements else iter).getPublisher
}
override def createCompletedStatePublisher(): Publisher[Int] =
TestProducer(Nil).getPublisher
}

View file

@ -0,0 +1,387 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit._
import asyncrx.api.Producer
import org.scalatest.FreeSpecLike
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamSpec extends AkkaSpec {
import system.dispatcher
val genSettings = GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 16)
val identity: Stream[Any] Stream[Any] = in in.map(e e)
val identity2: Stream[Any] Stream[Any] = in identity(in)
"A Stream" must {
"requests initial elements from upstream" in {
for (op List(identity, identity2); n List(1, 2, 4)) {
new ChainSetup(op, genSettings.copy(initialInputBufferSize = n)) {
upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize)
}
}
}
"requests more elements from upstream when downstream requests more elements" in {
new ChainSetup(identity, genSettings) {
upstream.expectRequestMore(upstreamSubscription, settings.initialInputBufferSize)
downstreamSubscription.requestMore(1)
upstream.expectNoMsg(100.millis)
downstreamSubscription.requestMore(2)
upstream.expectNoMsg(100.millis)
upstreamSubscription.sendNext("a")
downstream.expectNext("a")
upstream.expectRequestMore(upstreamSubscription, 1)
upstream.expectNoMsg(100.millis)
upstreamSubscription.sendNext("b")
upstreamSubscription.sendNext("c")
upstreamSubscription.sendNext("d")
downstream.expectNext("b")
downstream.expectNext("c")
upstream.expectRequestMore(upstreamSubscription, 1)
upstream.expectRequestMore(upstreamSubscription, 1)
upstream.expectRequestMore(upstreamSubscription, 1)
}
}
"deliver events when publisher sends elements and then completes" in {
new ChainSetup(identity, genSettings) {
downstreamSubscription.requestMore(1)
upstreamSubscription.sendNext("test")
upstreamSubscription.sendComplete()
downstream.expectNext("test")
downstream.expectComplete()
}
}
"deliver complete signal when publisher immediately completes" in {
new ChainSetup(identity, genSettings) {
upstreamSubscription.sendComplete()
downstream.expectComplete()
}
}
"deliver error signal when publisher immediately fails" in {
new ChainSetup(identity, genSettings) {
object WeirdError extends RuntimeException("weird test exception")
upstreamSubscription.sendError(WeirdError)
downstream.expectError(WeirdError)
}
}
"single subscriber cancels subscription while receiving data" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1)) {
downstreamSubscription.requestMore(5)
upstreamSubscription.expectRequestMore(1)
upstreamSubscription.sendNext("test")
upstreamSubscription.expectRequestMore(1)
upstreamSubscription.sendNext("test2")
upstreamSubscription.expectRequestMore(1)
downstream.expectNext("test")
downstream.expectNext("test2")
downstreamSubscription.cancel()
// because of the "must cancel its upstream Subscription if its last downstream Subscription has been cancelled" rule
upstreamSubscription.expectCancellation()
}
}
// FIXME the below commented out tests were implemented in ImplementationFactoryOperationSpec and might
// be intersting to port.
//
// "operation publishes Producer" in new InitializedChainSetup[String, api.Producer[String]](Span[String](_ == "end").expose) {
// downstreamSubscription.requestMore(5)
// upstream.expectRequestMore(upstreamSubscription, 1)
//
// upstreamSubscription.sendNext("a")
// val subStream = downstream.expectNext()
// val subStreamConsumer = TestKit.consumerProbe[String]()
// subStream.getPublisher.subscribe(subStreamConsumer.getSubscriber)
// val subStreamSubscription = subStreamConsumer.expectSubscription()
//
// subStreamSubscription.requestMore(1)
// subStreamConsumer.expectNext("a")
//
// subStreamSubscription.requestMore(1)
// upstream.expectRequestMore(upstreamSubscription, 1)
// upstreamSubscription.sendNext("end")
// subStreamConsumer.expectNext("end")
// subStreamConsumer.expectComplete()
//
// upstreamSubscription.sendNext("test")
// val subStream2 = downstream.expectNext()
// val subStreamConsumer2 = TestKit.consumerProbe[String]()
// subStream2.getPublisher.subscribe(subStreamConsumer2.getSubscriber)
// val subStreamSubscription2 = subStreamConsumer2.expectSubscription()
//
// subStreamSubscription2.requestMore(1)
// subStreamConsumer2.expectNext("test")
//
// subStreamSubscription2.requestMore(1)
// upstream.expectRequestMore(upstreamSubscription, 1)
// upstreamSubscription.sendNext("abc")
// subStreamConsumer2.expectNext("abc")
//
// subStreamSubscription2.requestMore(1)
// upstream.expectRequestMore(upstreamSubscription, 1)
// upstreamSubscription.sendNext("end")
// upstreamSubscription.sendComplete()
// downstream.expectComplete()
// subStreamConsumer2.expectNext("end")
// subStreamConsumer2.expectComplete()
// }
// "operation consumes Producer" in new InitializedChainSetup[Source[String], String](Flatten())(factoryWithFanOutBuffer(16)) {
// downstreamSubscription.requestMore(4)
// upstream.expectRequestMore(upstreamSubscription, 1)
//
// val subStream = TestKit.producerProbe[String]()
// upstreamSubscription.sendNext(subStream)
// val subStreamSubscription = subStream.expectSubscription()
// subStream.expectRequestMore(subStreamSubscription, 4)
// subStreamSubscription.sendNext("test")
// downstream.expectNext("test")
// subStreamSubscription.sendNext("abc")
// downstream.expectNext("abc")
// subStreamSubscription.sendComplete()
//
// upstream.expectRequestMore(upstreamSubscription, 1)
//
// val subStream2 = TestKit.producerProbe[String]()
// upstreamSubscription.sendNext(subStream2)
// upstreamSubscription.sendComplete()
// val subStreamSubscription2 = subStream2.expectSubscription()
// subStream2.expectRequestMore(subStreamSubscription2, 2)
// subStreamSubscription2.sendNext("123")
// downstream.expectNext("123")
//
// subStreamSubscription2.sendComplete()
// downstream.expectComplete()
// }
// "combined operation spanning internal subscription" in new InitializedChainSetup[Int, Int](Span[Int](_ % 3 == 0).flatten) {
// downstreamSubscription.requestMore(1)
// upstream.expectRequestMore(upstreamSubscription, 1)
//
// upstreamSubscription.sendNext(1)
// downstream.expectNext(1)
//
// downstreamSubscription.requestMore(1)
// upstream.expectRequestMore(upstreamSubscription, 1)
// upstreamSubscription.sendNext(2)
// downstream.expectNext(2)
//
// downstreamSubscription.requestMore(1)
// upstream.expectRequestMore(upstreamSubscription, 1)
// upstreamSubscription.sendNext(3)
// downstream.expectNext(3)
//
// downstreamSubscription.requestMore(1)
// upstream.expectRequestMore(upstreamSubscription, 1)
// upstreamSubscription.sendNext(4)
// downstream.expectNext(4)
//
// downstreamSubscription.requestMore(1)
// upstream.expectRequestMore(upstreamSubscription, 1)
// upstreamSubscription.sendNext(5)
// upstreamSubscription.sendComplete()
// downstream.expectNext(5)
// downstream.expectComplete()
// }
}
"A Stream with multiple subscribers (FanOutBox)" must {
"adapt speed to the currently slowest consumer" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
val downstream2 = StreamTestKit.consumerProbe[Any]()
producer.produceTo(downstream2)
val downstream2Subscription = downstream2.expectSubscription()
downstreamSubscription.requestMore(5)
upstream.expectRequestMore(upstreamSubscription, 1) // because initialInputBufferSize=1
upstreamSubscription.sendNext("firstElement")
downstream.expectNext("firstElement")
downstream2Subscription.requestMore(1)
downstream2.expectNext("firstElement")
upstream.expectRequestMore(upstreamSubscription, 1)
upstreamSubscription.sendNext("element2")
downstream.expectNext("element2")
downstream2Subscription.requestMore(1)
downstream2.expectNext("element2")
}
}
"incoming subscriber while elements were requested before" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
downstreamSubscription.requestMore(5)
upstream.expectRequestMore(upstreamSubscription, 1)
upstreamSubscription.sendNext("a1")
downstream.expectNext("a1")
upstream.expectRequestMore(upstreamSubscription, 1)
upstreamSubscription.sendNext("a2")
downstream.expectNext("a2")
upstream.expectRequestMore(upstreamSubscription, 1)
// link now while an upstream element is already requested
val downstream2 = StreamTestKit.consumerProbe[Any]()
producer.produceTo(downstream2)
val downstream2Subscription = downstream2.expectSubscription()
// situation here:
// downstream 1 now has 3 outstanding
// downstream 2 has 0 outstanding
upstreamSubscription.sendNext("a3")
downstream.expectNext("a3")
downstream2.expectNoMsg(100.millis.dilated) // as nothing was requested yet, fanOutBox needs to cache element in this case
downstream2Subscription.requestMore(1)
downstream2.expectNext("a3")
// d1 now has 2 outstanding
// d2 now has 0 outstanding
// buffer should be empty so we should be requesting one new element
upstream.expectRequestMore(upstreamSubscription, 1) // because of buffer size 1
}
}
// FIXME failing test
"blocking subscriber cancels subscription" ignore {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
val downstream2 = StreamTestKit.consumerProbe[Any]()
producer.produceTo(downstream2)
val downstream2Subscription = downstream2.expectSubscription()
downstreamSubscription.requestMore(5)
upstreamSubscription.expectRequestMore(1)
upstreamSubscription.sendNext("firstElement")
downstream.expectNext("firstElement")
downstream2Subscription.requestMore(1)
downstream2.expectNext("firstElement")
upstreamSubscription.expectRequestMore(1)
upstreamSubscription.sendNext("element2")
upstreamSubscription.sendNext("element3")
upstreamSubscription.sendNext("element4")
upstreamSubscription.expectRequestMore(1)
downstream2.expectNoMsg(100.millis.dilated)
downstream.expectNext("element2")
downstream.expectNoMsg(200.millis.dilated)
upstream.expectNoMsg(100.millis.dilated)
// should unblock fanoutbox
downstream2Subscription.cancel()
upstreamSubscription.expectRequestMore(1)
downstream2.expectNoMsg(200.millis.dilated)
// FIXME fails, "element3" disappears, "element4" is delivered here
downstream.expectNext("element3")
downstream.expectNext("element4")
}
}
"after initial upstream was completed future subscribers' onComplete should be called instead of onSubscribed" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
val downstream2 = StreamTestKit.consumerProbe[Any]()
// don't link it just yet
downstreamSubscription.requestMore(5)
upstream.expectRequestMore(upstreamSubscription, 1)
upstreamSubscription.sendNext("a1")
downstream.expectNext("a1")
upstream.expectRequestMore(upstreamSubscription, 1)
upstreamSubscription.sendNext("a2")
downstream.expectNext("a2")
upstream.expectRequestMore(upstreamSubscription, 1)
// link now while an upstream element is already requested
producer.produceTo(downstream2)
val downstream2Subscription = downstream2.expectSubscription()
upstreamSubscription.sendNext("a3")
upstreamSubscription.sendComplete()
downstream.expectNext("a3")
downstream.expectComplete()
downstream2.expectNoMsg(100.millis.dilated) // as nothing was requested yet, fanOutBox needs to cache element in this case
downstream2Subscription.requestMore(1)
downstream2.expectNext("a3")
downstream2.expectComplete()
// FIXME when adding a sleep before the following link this will fail with IllegalStateExc shut-down
// what is the expected shutdown behavior? Is the title of this test wrong?
// val downstream3 = StreamTestKit.consumerProbe[Any]()
// producer.produceTo(downstream3)
// downstream3.expectComplete()
}
}
"after initial upstream reported an error future subscribers' onError should be called instead of onSubscribed" in {
new ChainSetup[Int, String](_.map(_ throw TestException), genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
downstreamSubscription.requestMore(1)
upstreamSubscription.expectRequestMore(1)
upstreamSubscription.sendNext(5)
upstreamSubscription.expectRequestMore(1)
upstreamSubscription.expectCancellation()
downstream.expectError(TestException)
val downstream2 = StreamTestKit.consumerProbe[String]()
producer.produceTo(downstream2)
// IllegalStateException shut down
downstream2.expectError().getClass should be(classOf[IllegalStateException])
}
}
"when all subscriptions were cancelled future subscribers' onError should be called" in {
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1)) {
upstreamSubscription.expectRequestMore(1)
downstreamSubscription.cancel()
upstreamSubscription.expectCancellation()
val downstream2 = StreamTestKit.consumerProbe[Any]()
producer.produceTo(downstream2)
// IllegalStateException shut down
downstream2.expectError().getClass should be(classOf[IllegalStateException])
}
}
"if an internal error occurs upstream should be cancelled" in pending
"if an internal error occurs subscribers' onError method should be called" in pending
"if an internal error occurs future subscribers' onError should be called instead of onSubscribed" in pending
}
object TestException extends RuntimeException
class ChainSetup[I, O](stream: Stream[I] Stream[O], val settings: GeneratorSettings) {
val upstream = StreamTestKit.producerProbe[I]()
val downstream = StreamTestKit.consumerProbe[O]()
private val s = stream(Stream(upstream))
val producer = s.toProducer(ProcessorGenerator(settings))
val upstreamSubscription = upstream.expectSubscription()
producer.produceTo(downstream)
val downstreamSubscription = downstream.expectSubscription()
}
}

View file

@ -0,0 +1,141 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.impl.IteratorProducer
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamTransformSpec extends AkkaSpec {
import system.dispatcher
val gen = ProcessorGenerator(GeneratorSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2))
"A Stream with transform operations" must {
"produce one-to-one transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p2 = Stream(p).
transform(0)((tot, elem) (tot + elem, List(tot + elem))).
toProducer(gen)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
subscription.requestMore(1)
consumer.expectNext(1)
consumer.expectNoMsg(200.millis)
subscription.requestMore(2)
consumer.expectNext(3)
consumer.expectNext(6)
consumer.expectComplete()
}
"produce one-to-several transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p2 = Stream(p).
transform(0)((tot, elem) (tot + elem, Vector.fill(elem)(tot + elem))).
toProducer(gen)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
subscription.requestMore(4)
consumer.expectNext(1)
consumer.expectNext(3)
consumer.expectNext(3)
consumer.expectNext(6)
consumer.expectNoMsg(200.millis)
subscription.requestMore(100)
consumer.expectNext(6)
consumer.expectNext(6)
consumer.expectComplete()
}
"produce dropping transformation as expected" in {
val p = new IteratorProducer(List(1, 2, 3, 4).iterator)
val p2 = Stream(p).
transform(0)((tot, elem) (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))).
toProducer(gen)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
subscription.requestMore(1)
consumer.expectNext(1)
consumer.expectNoMsg(200.millis)
subscription.requestMore(1)
consumer.expectNext(6)
consumer.expectComplete()
}
"produce multi-step transformation as expected" in {
val p = new IteratorProducer(List("a", "bc", "def").iterator)
val p2 = Stream(p).
transform("") { (str, elem)
val concat = str + elem
(concat, List(concat.length))
}.
transform(0)((tot, length) (tot + length, List(tot + length))).
toProducer(gen)
val c1 = StreamTestKit.consumerProbe[Int]
p2.produceTo(c1)
val sub1 = c1.expectSubscription()
val c2 = StreamTestKit.consumerProbe[Int]
p2.produceTo(c2)
val sub2 = c2.expectSubscription()
sub1.requestMore(1)
sub2.requestMore(2)
c1.expectNext(1)
c2.expectNext(1)
c2.expectNext(4)
c1.expectNoMsg(200.millis)
sub1.requestMore(2)
sub2.requestMore(2)
c1.expectNext(4)
c1.expectNext(10)
c2.expectNext(10)
c1.expectComplete()
c2.expectComplete()
}
"report error when exception is thrown" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p2 = Stream(p).
transform(0) { (_, elem)
if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem))
}.
toProducer(gen)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
subscription.requestMore(100)
consumer.expectNext(1)
consumer.expectNext(1)
consumer.expectError().getMessage should be("two not allowed")
consumer.expectNoMsg(200.millis)
}
"support cancel as expected" in {
val p = new IteratorProducer(List(1, 2, 3).iterator)
val p2 = Stream(p).
transform(0) { (_, elem) (0, List(elem, elem)) }.
toProducer(gen)
val consumer = StreamTestKit.consumerProbe[Int]
p2.produceTo(consumer)
val subscription = consumer.expectSubscription()
subscription.requestMore(2)
consumer.expectNext(1)
subscription.cancel()
consumer.expectNext(1)
consumer.expectNoMsg(500.millis)
subscription.requestMore(2)
consumer.expectNoMsg(200.millis)
}
}
}

View file

@ -0,0 +1,14 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.actor.ActorSystem
import org.testng.annotations.AfterClass
trait WithActorSystem {
val system: ActorSystem = ActorSystem(getClass.getSimpleName)
@AfterClass
def shutdownActorSystem(): Unit = system.shutdown()
}

View file

@ -0,0 +1,198 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.util.Random
import org.scalatest.{ ShouldMatchers, WordSpec }
import akka.stream.impl.ResizableMultiReaderRingBuffer._
class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers {
"A ResizableMultiReaderRingBuffer" should {
"initially be empty (1)" in new Test(iSize = 2, mSize = 4, cursorCount = 1) {
inspect shouldEqual "0 0 (size=0, writeIx=0, readIx=0, cursors=1)"
}
"initially be empty (2)" in new Test(iSize = 4, mSize = 4, cursorCount = 3) {
inspect shouldEqual "0 0 0 0 (size=0, writeIx=0, readIx=0, cursors=3)"
}
"fail reads if nothing can be read" in new Test(iSize = 4, mSize = 4, cursorCount = 3) {
write(1) shouldEqual true
write(2) shouldEqual true
write(3) shouldEqual true
inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=3)"
read(0) shouldEqual 1
read(0) shouldEqual 2
read(1) shouldEqual 1
inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=3)"
read(0) shouldEqual 3
read(0) shouldEqual null
read(1) shouldEqual 2
inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=3)"
read(2) shouldEqual 1
inspect shouldEqual "1 2 3 0 (size=2, writeIx=3, readIx=1, cursors=3)"
read(1) shouldEqual 3
read(1) shouldEqual null
read(2) shouldEqual 2
read(2) shouldEqual 3
inspect shouldEqual "1 2 3 0 (size=0, writeIx=3, readIx=3, cursors=3)"
}
"fail writes if there is no more space" in new Test(iSize = 4, mSize = 4, cursorCount = 2) {
write(1) shouldEqual true
write(2) shouldEqual true
write(3) shouldEqual true
write(4) shouldEqual true
write(5) shouldEqual false
read(0) shouldEqual 1
write(5) shouldEqual false
read(1) shouldEqual 1
write(5) shouldEqual true
read(0) shouldEqual 2
read(0) shouldEqual 3
read(0) shouldEqual 4
read(1) shouldEqual 2
write(6) shouldEqual true
inspect shouldEqual "5 6 3 4 (size=4, writeIx=6, readIx=2, cursors=2)"
read(0) shouldEqual 5
read(0) shouldEqual 6
read(0) shouldEqual null
read(1) shouldEqual 3
read(1) shouldEqual 4
read(1) shouldEqual 5
read(1) shouldEqual 6
read(1) shouldEqual null
inspect shouldEqual "5 6 3 4 (size=0, writeIx=2, readIx=2, cursors=2)"
write(7) shouldEqual true
write(8) shouldEqual true
write(9) shouldEqual true
inspect shouldEqual "9 6 7 8 (size=3, writeIx=5, readIx=2, cursors=2)"
read(0) shouldEqual 7
read(0) shouldEqual 8
read(0) shouldEqual 9
read(0) shouldEqual null
read(1) shouldEqual 7
read(1) shouldEqual 8
read(1) shouldEqual 9
read(1) shouldEqual null
inspect shouldEqual "9 6 7 8 (size=0, writeIx=5, readIx=5, cursors=2)"
}
"automatically grow if possible" in new Test(iSize = 2, mSize = 8, cursorCount = 2) {
write(1) shouldEqual true
inspect shouldEqual "1 0 (size=1, writeIx=1, readIx=0, cursors=2)"
write(2) shouldEqual true
inspect shouldEqual "1 2 (size=2, writeIx=2, readIx=0, cursors=2)"
write(3) shouldEqual true
inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=2)"
write(4) shouldEqual true
inspect shouldEqual "1 2 3 4 (size=4, writeIx=4, readIx=0, cursors=2)"
read(0) shouldEqual 1
read(0) shouldEqual 2
read(0) shouldEqual 3
read(1) shouldEqual 1
read(1) shouldEqual 2
write(5) shouldEqual true
inspect shouldEqual "5 2 3 4 (size=3, writeIx=5, readIx=2, cursors=2)"
write(6) shouldEqual true
inspect shouldEqual "5 6 3 4 (size=4, writeIx=6, readIx=2, cursors=2)"
write(7) shouldEqual true
inspect shouldEqual "3 4 5 6 7 0 0 0 (size=5, writeIx=5, readIx=0, cursors=2)"
read(0) shouldEqual 4
read(0) shouldEqual 5
read(0) shouldEqual 6
read(0) shouldEqual 7
read(0) shouldEqual null
read(1) shouldEqual 3
read(1) shouldEqual 4
read(1) shouldEqual 5
read(1) shouldEqual 6
read(1) shouldEqual 7
read(1) shouldEqual null
inspect shouldEqual "3 4 5 6 7 0 0 0 (size=0, writeIx=5, readIx=5, cursors=2)"
}
"pass the stress test" in {
// create 100 buffers with an initialSize of 1 and a maxSize of 1 to 64,
// for each one attach 1 to 8 cursors and randomly try reading and writing to the buffer;
// in total 200 elements need to be written to the buffer and read in the correct order by each cursor
val MAXSIZEBIT_LIMIT = 6 // 2 ^ (this number)
val COUNTER_LIMIT = 200
val LOG = false
val sb = new java.lang.StringBuilder
def log(s: String): Unit = if (LOG) sb.append(s)
class StressTestCursor(cursorNr: Int, run: Int) extends Cursor {
var cursor: Int = _
var counter = 1
def tryReadAndReturnTrueIfDone(buf: TestBuffer): Boolean = {
log(s" Try reading of $toString: ")
try {
val x = buf.read(this)
log("OK\n")
if (x != counter)
fail(s"""|Run $run, cursorNr $cursorNr, counter $counter: got unexpected $x
| Buf: ${buf.inspect}
| Cursors: ${buf.cursors.cursors.mkString("\n ")}
|Log:\n$sb
""".stripMargin)
counter += 1
counter == COUNTER_LIMIT
} catch {
case NothingToReadException log("FAILED\n"); false // ok, we currently can't read, try again later
}
}
override def toString: String = s"cursorNr $cursorNr, ix $cursor, counter $counter"
}
val random = new Random
for {
bit 1 to MAXSIZEBIT_LIMIT
n 1 to 2
} {
var counter = 1
var activeCursors = List.tabulate(random.nextInt(8) + 1)(new StressTestCursor(_, 1 << bit))
var stillWriting = 2 // give writing a slight bias, so as to somewhat "stretch" the buffer
val buf = new TestBuffer(1, 1 << bit, new Cursors { def cursors = activeCursors })
sb.setLength(0)
while (activeCursors.nonEmpty) {
log(s"Buf: ${buf.inspect}\n")
val activeCursorCount = activeCursors.size
val index = random.nextInt(activeCursorCount + stillWriting)
if (index >= activeCursorCount) {
log(s" Writing $counter: ")
if (buf.write(counter)) {
log("OK\n")
counter += 1
} else {
log("FAILED\n")
if (counter == COUNTER_LIMIT) stillWriting = 0
}
} else {
val cursor = activeCursors(index)
if (cursor.tryReadAndReturnTrueIfDone(buf))
activeCursors = activeCursors.filter(_ != cursor)
}
}
}
}
}
class TestBuffer(iSize: Int, mSize: Int, cursors: Cursors) extends ResizableMultiReaderRingBuffer[Int](iSize, mSize, cursors) {
def inspect: String =
underlyingArray.map(x if (x == null) 0 else x).mkString("", " ", " " + toString.dropWhile(_ != '('))
}
class Test(iSize: Int, mSize: Int, cursorCount: Int) extends TestBuffer(iSize, mSize, new SimpleCursors(cursorCount)) {
def read(cursorIx: Int): Integer =
try read(cursors.cursors(cursorIx)) catch { case NothingToReadException null }
override def rebaseThreshold: Int = underlyingArray.length // use a low threshold in order to test the rebasing logic
}
class SimpleCursors(cursorCount: Int) extends Cursors {
val cursors: List[Cursor] = List.fill(cursorCount)(new Cursor { var cursor: Int = _ })
}
}

View file

@ -0,0 +1,10 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import akka.testkit.TestProbe
trait AkkaConsumerProbe[I] extends ConsumerProbe[I] {
def probe: TestProbe
}

View file

@ -0,0 +1,10 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import akka.testkit.TestProbe
trait AkkaProducerProbe[I] extends ProducerProbe[I] {
def probe: TestProbe
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import scala.concurrent.duration.FiniteDuration
import asyncrx.api.Consumer
import asyncrx.spi.Subscription
sealed trait ConsumerEvent
case class OnSubscribe(subscription: Subscription) extends ConsumerEvent
case class OnNext[I](element: I) extends ConsumerEvent
case object OnComplete extends ConsumerEvent
case class OnError(cause: Throwable) extends ConsumerEvent
trait ConsumerProbe[I] extends Consumer[I] {
def expectSubscription(): Subscription
def expectEvent(event: ConsumerEvent): Unit
def expectNext(element: I): Unit
def expectNext(e1: I, e2: I, es: I*): Unit
def expectNext(): I
def expectError(cause: Throwable): Unit
def expectError(): Throwable
def expectComplete(): Unit
def expectNoMsg(): Unit
def expectNoMsg(max: FiniteDuration): Unit
}

View file

@ -0,0 +1,34 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import asyncrx.spi.{ Subscriber, Subscription }
import asyncrx.api.Producer
import scala.concurrent.duration.FiniteDuration
import asyncrx.api.Consumer
sealed trait ProducerEvent
case class Subscribe(subscription: Subscription) extends ProducerEvent
case class CancelSubscription(subscription: Subscription) extends ProducerEvent
case class RequestMore(subscription: Subscription, elements: Int) extends ProducerEvent
abstract case class ActiveSubscription[I](subscriber: Subscriber[I]) extends Subscription {
def sendNext(element: I): Unit
def sendComplete(): Unit
def sendError(cause: Exception): Unit
def expectCancellation(): Unit
def expectRequestMore(n: Int): Unit
}
trait ProducerProbe[I] extends Producer[I] {
def expectSubscription(): ActiveSubscription[I]
def expectRequestMore(subscription: Subscription, n: Int): Unit
def expectNoMsg(): Unit
def expectNoMsg(max: FiniteDuration): Unit
def produceTo(consumer: Consumer[I]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
}

View file

@ -0,0 +1,72 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import akka.testkit.TestProbe
import asyncrx.spi.{ Publisher, Subscriber, Subscription }
import asyncrx.tck._
import akka.actor.ActorSystem
import scala.concurrent.duration.FiniteDuration
import scala.annotation.tailrec
object StreamTestKit {
def consumerProbe[I]()(implicit system: ActorSystem): AkkaConsumerProbe[I] =
new AkkaConsumerProbe[I] with Subscriber[I] { outer
lazy val probe = TestProbe()
def expectSubscription(): Subscription = probe.expectMsgType[OnSubscribe].subscription
def expectEvent(event: ConsumerEvent): Unit = probe.expectMsg(event)
def expectNext(element: I): Unit = probe.expectMsg(OnNext(element))
def expectNext(e1: I, e2: I, es: I*): Unit = {
val all = e1 +: e2 +: es
all.foreach(e probe.expectMsg(OnNext(e)))
}
def expectNext(): I = probe.expectMsgType[OnNext[I]].element
def expectComplete(): Unit = probe.expectMsg(OnComplete)
def expectError(cause: Throwable): Unit = probe.expectMsg(OnError(cause))
def expectError(): Throwable = probe.expectMsgType[OnError].cause
def expectNoMsg(): Unit = probe.expectNoMsg()
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def onSubscribe(subscription: Subscription): Unit = probe.ref ! OnSubscribe(subscription)
def onNext(element: I): Unit = probe.ref ! OnNext(element)
def onComplete(): Unit = probe.ref ! OnComplete
def onError(cause: Throwable): Unit = probe.ref ! OnError(cause)
def getSubscriber: Subscriber[I] = this
}
def producerProbe[I]()(implicit system: ActorSystem): AkkaProducerProbe[I] =
new AkkaProducerProbe[I] with Publisher[I] {
lazy val probe: TestProbe = TestProbe()
def subscribe(subscriber: Subscriber[I]): Unit = {
lazy val subscription: ActiveSubscription[I] = new ActiveSubscription[I](subscriber) {
def requestMore(elements: Int): Unit = probe.ref ! RequestMore(subscription, elements)
def cancel(): Unit = probe.ref ! CancelSubscription(subscription)
def expectRequestMore(n: Int): Unit = probe.expectMsg(RequestMore(subscription, n))
def expectCancellation(): Unit = probe.expectMsg(CancelSubscription(this))
def sendNext(element: I): Unit = subscriber.onNext(element)
def sendComplete(): Unit = subscriber.onComplete()
def sendError(cause: Exception): Unit = subscriber.onError(cause)
}
probe.ref ! Subscribe(subscription)
subscriber.onSubscribe(subscription)
}
def expectSubscription(): ActiveSubscription[I] =
probe.expectMsgType[Subscribe].subscription.asInstanceOf[ActiveSubscription[I]]
def expectRequestMore(subscription: Subscription, n: Int): Unit = probe.expectMsg(RequestMore(subscription, n))
def expectNoMsg(): Unit = probe.expectNoMsg()
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def getPublisher: Publisher[I] = this
}
}

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.testkit
import asyncrx.api.Producer
import akka.stream.impl.IteratorProducer
import scala.concurrent.ExecutionContext
import asyncrx.spi.Subscriber
import asyncrx.spi.Publisher
import asyncrx.api.Consumer
object TestProducer {
def apply[T](iterable: Iterable[T])(implicit executor: ExecutionContext): Producer[T] = apply(iterable.iterator)
def apply[T](iterator: Iterator[T])(implicit executor: ExecutionContext): Producer[T] = new IteratorProducer[T](iterator)
def empty[T]: Producer[T] = EmptyProducer.asInstanceOf[Producer[T]]
}
object EmptyProducer extends Producer[Nothing] with Publisher[Nothing] {
def getPublisher: Publisher[Nothing] = this
def subscribe(subscriber: Subscriber[Nothing]): Unit =
subscriber.onComplete()
def produceTo(consumer: Consumer[Nothing]): Unit =
getPublisher.subscribe(consumer.getSubscriber)
}