diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala index 85275a6326..c33c1bce52 100644 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ b/akka-stream/src/main/scala/akka/stream/Stream.scala @@ -12,11 +12,16 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Try import scala.concurrent.duration._ +import scala.util.control.NoStackTrace object Stream { def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(producer, Nil) def apply[T](iterator: Iterator[T])(implicit ec: ExecutionContext): Stream[T] = StreamImpl(new IteratorProducer(iterator), Nil) def apply[T](seq: immutable.Seq[T]) = ??? + + def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Stream[T] = apply(gen.produce(f)) + + object Stop extends RuntimeException with NoStackTrace } trait Stream[T] { @@ -57,6 +62,10 @@ trait ProcessorGenerator { * INTERNAL API */ private[akka] def consume[I](producer: Producer[I], ops: List[Ast.AstNode]): Unit + /** + * INTERNAL API + */ + private[akka] def produce[T](f: () ⇒ T): Producer[T] } // FIXME default values? Should we have an extension that reads from config? diff --git a/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala index 270e61ddff..0191f9d284 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala @@ -36,14 +36,12 @@ private[akka] object SubscriberManagement { /** * INTERNAL API */ -private[akka] trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor { - def subscriber[T]: spi.Subscriber[T] +private[akka] trait SubscriptionWithCursor[T] extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor { + def subscriber: spi.Subscriber[T] def isActive: Boolean = cursor != Int.MinValue def deactivate(): Unit = cursor = Int.MinValue - def dispatch[T](element: T): Unit = subscriber.onNext(element) - - override def toString: String = "Subscription" + System.identityHashCode(this) // helpful for testing + def dispatch(element: T): Unit = subscriber.onNext(element) /////////////// internal interface, no unsynced access from subscriber's thread ////////////// @@ -56,24 +54,38 @@ private[akka] trait SubscriptionWithCursor extends spi.Subscription with Resizab */ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors { import SubscriberManagement._ - type S <: SubscriptionWithCursor + type S <: SubscriptionWithCursor[T] type Subscriptions = List[S] def initialBufferSize: Int def maxBufferSize: Int - // we keep an element (ring) buffer which serves two purposes: - // 1. Pre-fetch elements from our own upstream before they are requested by our downstream. - // 2. Allow for limited request rate differences between several downstream subscribers. - // - // The buffering logic is as follows: - // 1. We always request as many elements from our upstream as are still free in our current buffer instance. - // If a subscriber requests more than this number (either once or as a result of several `requestMore` calls) - // we resize the buffer if possible. - // 2. If two subscribers drift apart in their request rates we use the buffer for keeping the elements that the - // slow subscriber is behind with, thereby resizing the buffer if necessary and still allowed. + /** + * called when we are ready to consume more elements from our upstream + * MUST NOT call pushToDownstream + */ + protected def requestFromUpstream(elements: Int): Unit + + /** + * called before `shutdown()` if the stream is *not* being regularly completed + * but shut-down due to the last subscriber having cancelled its subscription + */ + protected def cancelUpstream(): Unit + + /** + * called when the spi.Publisher/Processor is ready to be shut down + */ + protected def shutdown(completed: Boolean): Unit + + /** + * Use to register a subscriber + */ + protected def createSubscription(subscriber: spi.Subscriber[T]): S + private[this] val buffer = new ResizableMultiReaderRingBuffer[T](initialBufferSize, maxBufferSize, this) + protected def bufferDebug: String = buffer.toString + // optimize for small numbers of subscribers by keeping subscribers in a plain list private[this] var subscriptions: Subscriptions = Nil @@ -83,25 +95,11 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff // if non-null, holds the end-of-stream state private[this] var endOfStream: EndOfStream = NotReached - // called when we are ready to consume more elements from our upstream - // the implementation of this method is allowed to synchronously call `pushToDownstream` - // if (part of) the requested elements are already available - protected def requestFromUpstream(elements: Int): Unit - - // called before `shutdown()` if the stream is *not* being regularly completed - // but shut-down due to the last subscriber having cancelled its subscription - protected def cancelUpstream(): Unit - - // called when the spi.Publisher/Processor is ready to be shut down. - protected def shutdown(): Unit - - // Use to register a subscriber - protected def createSubscription(subscriber: spi.Subscriber[T]): S - def cursors = subscriptions - // called from `Subscription::requestMore`, i.e. from another thread, - // override to add synchronization with itself, `subscribe` and `unregisterSubscription` + /** + * more demand was signaled from a given subscriber + */ protected def moreRequested(subscription: S, elements: Int): Unit = if (subscription.isActive) { @@ -110,31 +108,22 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff if (requested == 0) { // if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore` if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0 - } else { - val x = - try { - subscription.dispatch(buffer.read(subscription)) // as long as we can produce elements from our buffer we do so synchronously - -1 // we can't directly tailrec from here since we are in a try/catch, so signal with special value - } catch { - case NothingToReadException ⇒ if (eos ne NotReached) Long.MinValue else requested // terminate or request from upstream - } - if (x == -1) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) else x - } + } else if (buffer.count(subscription) > 0) { + subscription.dispatch(buffer.read(subscription)) + dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) + } else if (eos ne NotReached) Long.MinValue + else requested endOfStream match { case eos @ (NotReached | Completed) ⇒ val demand = subscription.requested + elements - assert(demand >= 0) dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { case Long.MinValue ⇒ eos(subscription.subscriber) unregisterSubscriptionInternal(subscription) - case 0 ⇒ - subscription.requested = 0 + case x ⇒ + subscription.requested = x requestFromUpstreamIfRequired() - case requested ⇒ - subscription.requested = requested - requestFromUpstreamIfPossible(requested) } case ErrorCompleted(_) ⇒ // ignore, the spi.Subscriber might not have seen our error event yet } @@ -146,81 +135,73 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff case head :: tail ⇒ maxRequested(tail, math.max(head.requested, result)) case _ ⇒ result } - if (pendingFromUpstream == 0) - requestFromUpstreamIfPossible(maxRequested(subscriptions)) - } - - private[this] final def requestFromUpstreamIfPossible(elements: Long): Unit = { - val toBeRequested = buffer.potentiallyAvailable(math.min(elements, Int.MaxValue).toInt) // Cap at Int.MaxValue - if (toBeRequested > 0) { - pendingFromUpstream += toBeRequested - requestFromUpstream(toBeRequested) + val desired = Math.min(Int.MaxValue, Math.min(maxRequested(subscriptions), buffer.maxAvailable) - pendingFromUpstream).toInt + if (desired > 0) { + pendingFromUpstream += desired + requestFromUpstream(desired) } } - // this method must be called by the implementing class whenever a new value is available to be pushed downstream + /** + * this method must be called by the implementing class whenever a new value is available to be pushed downstream + */ protected def pushToDownstream(value: T): Unit = { - @tailrec def dispatchAndReturnMaxRequested(remaining: Subscriptions, result: Long = 0): Long = + @tailrec def dispatch(remaining: Subscriptions, sent: Boolean = false): Boolean = remaining match { case head :: tail ⇒ - var requested = head.requested - if (requested > 0) - try { - val element = buffer.read(head) - head.dispatch(element) - requested -= 1 - head.requested = requested - } catch { - case NothingToReadException ⇒ throw new IllegalStateException("Output buffer read failure") // we just wrote a value, why can't we read it? - } - dispatchAndReturnMaxRequested(tail, math.max(result, requested)) - case _ ⇒ result + if (head.requested > 0) { + val element = buffer.read(head) + head.dispatch(element) + head.requested -= 1 + dispatch(tail, true) + } else dispatch(tail, sent) + case _ ⇒ sent } endOfStream match { case NotReached ⇒ pendingFromUpstream -= 1 if (!buffer.write(value)) throw new IllegalStateException("Output buffer overflow") - val maxRequested = dispatchAndReturnMaxRequested(subscriptions) - if (pendingFromUpstream == 0) requestFromUpstreamIfPossible(maxRequested) - case ShutDown ⇒ // don't throw if we have transitioned into shutdown in the mean-time, since there is an expected - case _ ⇒ // race condition between `cancelUpstream` and `pushToDownstream` + if (dispatch(subscriptions)) requestFromUpstreamIfRequired() + case _ ⇒ throw new IllegalStateException("pushToDownStream(...) after completeDownstream() or abortDownstream(...)") } } - // this method must be called by the implementing class whenever - // it has been determined that no more elements will be produced + /** + * this method must be called by the implementing class whenever + * it has been determined that no more elements will be produced + */ protected def completeDownstream(): Unit = { if (endOfStream eq NotReached) { - // we complete all subscriptions that have no more buffered elements - // is non-tail recursion acceptable here? (it's the fastest impl but might stack overflow for large numbers of subscribers) @tailrec def completeDoneSubscriptions(remaining: Subscriptions, result: Subscriptions = Nil): Subscriptions = - remaining match { case head :: tail ⇒ - val newResult = - if (buffer.count(head) == 0) { - head.deactivate() - Completed(head.subscriber) - result - } else head :: result - completeDoneSubscriptions(tail, newResult) + if (buffer.count(head) == 0) { + head.deactivate() + Completed(head.subscriber) + completeDoneSubscriptions(tail, result) + } else completeDoneSubscriptions(tail, head :: result) case _ ⇒ result } endOfStream = Completed subscriptions = completeDoneSubscriptions(subscriptions) - if (subscriptions.isEmpty) shutdown() + if (subscriptions.isEmpty) shutdown(completed = true) } // else ignore, we need to be idempotent } - // this method must be called by the implementing class to push an error downstream + /** + * this method must be called by the implementing class to push an error downstream + */ protected def abortDownstream(cause: Throwable): Unit = { endOfStream = ErrorCompleted(cause) subscriptions.foreach(s ⇒ endOfStream(s.subscriber)) subscriptions = Nil } + /** + * Register a new subscriber. + */ protected def registerSubscriber(subscriber: spi.Subscriber[T]): Unit = endOfStream match { case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒ subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) @@ -238,17 +219,18 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff eos(subscriber) } - // called from `Subscription::cancel`, i.e. from another thread, - // override to add synchronization with itself, `subscribe` and `moreRequested` + /** + * called from `Subscription::cancel`, i.e. from another thread, + * override to add synchronization with itself, `subscribe` and `moreRequested` + */ protected def unregisterSubscription(subscription: S): Unit = unregisterSubscriptionInternal(subscription) // must be idempotent private def unregisterSubscriptionInternal(subscription: S): Unit = { - // is non-tail recursion acceptable here? (it's the fastest impl but might stack overflow for large numbers of subscribers) - def removeFrom(remaining: Subscriptions): Subscriptions = + @tailrec def removeFrom(remaining: Subscriptions, result: Subscriptions = Nil): Subscriptions = remaining match { - case head :: tail ⇒ if (head eq subscription) tail else head :: removeFrom(tail) + case head :: tail ⇒ if (head eq subscription) tail reverse_::: result else removeFrom(tail, head :: result) case _ ⇒ throw new IllegalStateException("Subscription to unregister not found") } if (subscription.isActive) { @@ -260,7 +242,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff endOfStream = ShutDown cancelUpstream() } - shutdown() + shutdown(completed = false) } else requestFromUpstreamIfRequired() // we might have removed a "blocking" subscriber and can continue now } // else ignore, we need to be idempotent } @@ -288,9 +270,7 @@ private[akka] abstract class AbstractProducer[T](val initialBufferSize: Int, val override def createSubscription(subscriber: spi.Subscriber[T]): S = new Subscription(subscriber) - protected class Subscription(val _subscriber: spi.Subscriber[T]) extends SubscriptionWithCursor { - def subscriber[B]: spi.Subscriber[B] = _subscriber.asInstanceOf[spi.Subscriber[B]] - + protected class Subscription(val subscriber: spi.Subscriber[T]) extends SubscriptionWithCursor[T] { override def requestMore(elements: Int): Unit = if (elements <= 0) throw new IllegalArgumentException("Argument must be > 0") else moreRequested(this, elements) // needs to be able to ignore calls after termination / cancellation diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index f3d29a7e18..1a3c605f36 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -11,6 +11,7 @@ import org.reactivestreams.api.Processor import org.reactivestreams.spi.{ Subscriber, Subscription } import akka.actor.{ Actor, ActorLogging, ActorRef, Props } import akka.stream.GeneratorSettings +import akka.event.LoggingReceive /** * INTERNAL API @@ -30,7 +31,7 @@ class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] wi */ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) extends Actor with SubscriberManagement[Any] with ActorLogging { import ActorProcessor._ - type S = ActorSubscription + type S = ActorSubscription[Any] override def maxBufferSize: Int = settings.maxFanOutBufferSize override def initialBufferSize: Int = settings.initialFanOutBufferSize @@ -51,7 +52,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) } def waitingForUpstream: Receive = downstreamManagement orElse { - case OnComplete ⇒ shutdown() // There is nothing to flush here + case OnComplete ⇒ shutdown(completed = true) // There is nothing to flush here case OnSubscribe(subscription) ⇒ assert(subscription != null) upstream = subscription @@ -68,10 +69,10 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) case SubscribePending ⇒ subscribePending() case RequestMore(subscription, elements) ⇒ - moreRequested(subscription, elements) + moreRequested(subscription.asInstanceOf[S], elements) pump() case Cancel(subscription) ⇒ - unregisterSubscription(subscription) + unregisterSubscription(subscription.asInstanceOf[S]) pump() } @@ -80,7 +81,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) ////////////////////// Active state ////////////////////// - def running: Receive = downstreamManagement orElse { + def running: Receive = LoggingReceive(downstreamManagement orElse { case OnNext(element) ⇒ enqueueInputElement(element) pump() @@ -88,7 +89,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) flushAndComplete() pump() case OnError(cause) ⇒ failureReceived(cause) - } + }) // Called by SubscriberManagement when all subscribers are gone. // The method shutdown() is called automatically by SubscriberManagement after it called this method. @@ -96,6 +97,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) // Called by SubscriberManagement whenever the output buffer is ready to accept additional elements override protected def requestFromUpstream(elements: Int): Unit = { + log.debug(s"received downstream demand from buffer: $elements") downstreamBufferSpace += elements } @@ -176,13 +178,17 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) // Generate upstream requestMore for every Nth consumed input element protected def pump(): Unit = { try while (transferState.isExecutable) { + log.debug(s"iterating the pump with state $transferState and buffer $bufferDebug") transferState = transfer(transferState) } catch { case NonFatal(e) ⇒ fail(e) } + log.debug(s"finished iterating the pump with state $transferState and buffer $bufferDebug") + if (transferState.isCompleted) { if (!isShuttingDown) { + log.debug("shutting down the pump") if (!upstreamCompleted) upstream.cancel() - Arrays.fill(inputBuffer, nextInputElementCursor, nextInputElementCursor + inputBufferElements, null) + Arrays.fill(inputBuffer, 0, inputBuffer.length, null) inputBufferElements = 0 context.become(flushing) isShuttingDown = true @@ -212,16 +218,18 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) ////////////////////// Shutdown and cleanup (graceful and abort) ////////////////////// var isShuttingDown = false + var completed = false // Called by SubscriberManagement to signal that output buffer finished (flushed or aborted) - override def shutdown(): Unit = { + override def shutdown(completed: Boolean): Unit = { isShuttingDown = true + this.completed = completed context.stop(self) } override def postStop(): Unit = { if (exposedPublisher ne null) - exposedPublisher.shutdown() + exposedPublisher.shutdown(completed) // Non-gracefully stopped, do our best here if (!isShuttingDown) abortDownstream(new IllegalStateException("Processor actor terminated abruptly")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index 021a91b6da..bf15a2bf21 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -4,13 +4,18 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference - import scala.annotation.tailrec - import org.reactivestreams.api.{ Consumer, Producer } import org.reactivestreams.spi.{ Publisher, Subscriber } - import akka.actor.ActorRef +import akka.stream.GeneratorSettings +import akka.actor.ActorLogging +import akka.actor.Actor +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal +import akka.actor.Props +import scala.util.control.NoStackTrace +import akka.stream.Stream trait ActorProducerLike[T] extends Producer[T] { def impl: ActorRef @@ -27,6 +32,11 @@ trait ActorProducerLike[T] extends Producer[T] { class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T] +object ActorProducer { + def props[T](settings: GeneratorSettings, f: () ⇒ T): Props = + Props(new ActorProducerImpl(f, settings)) +} + final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { // The subscriber of an subscription attempt is first placed in this list of pending subscribers. @@ -55,20 +65,93 @@ final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { def takePendingSubscribers(): List[Subscriber[T]] = pendingSubscribers.getAndSet(Nil) - def shutdown(): Unit = + def shutdown(completed: Boolean): Unit = { + this.completed = completed pendingSubscribers.getAndSet(null) foreach reportShutdownError + } + + @volatile private var completed: Boolean = false private def reportShutdownError(subscriber: Subscriber[T]): Unit = - subscriber.onError(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) + if (completed) subscriber.onComplete() + else subscriber.onError(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) } -class ActorSubscription( final val impl: ActorRef, final val _subscriber: Subscriber[Any]) extends SubscriptionWithCursor { - override def subscriber[T]: Subscriber[T] = _subscriber.asInstanceOf[Subscriber[T]] +class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[T]) extends SubscriptionWithCursor[T] { override def requestMore(elements: Int): Unit = if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") else impl ! RequestMore(this, elements) override def cancel(): Unit = impl ! Cancel(this) - override def toString = "ActorSubscription" } +object ActorProducerImpl { + case object Generate +} + +class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] { + import Stream._ + import ActorProducerImpl._ + + type S = ActorSubscription[T] + var pub: ActorPublisher[T] = _ + var completed = false + + context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) + + def receive = { + case ExposedPublisher(pub) ⇒ + this.pub = pub.asInstanceOf[ActorPublisher[T]] + context.become(waitingForSubscribers) + } + + def waitingForSubscribers: Receive = { + case SubscribePending ⇒ + pub.takePendingSubscribers() foreach registerSubscriber + context.setReceiveTimeout(Duration.Undefined) + context.become(active) + } + + def active: Receive = { + case SubscribePending ⇒ + pub.takePendingSubscribers() foreach registerSubscriber + case RequestMore(sub, elements) ⇒ + moreRequested(sub.asInstanceOf[S], elements) + generate() + case Cancel(sub) ⇒ + unregisterSubscription(sub.asInstanceOf[S]) + generate() + case Generate ⇒ + generate() + } + + override def postStop(): Unit = { + pub.shutdown(completed) + } + + private var demand = 0 + private def generate(): Unit = { + val continue = + try { + pushToDownstream(f()) + true + } catch { + case Stop ⇒ { completeDownstream(); completed = true; false } + case NonFatal(e) ⇒ { abortDownstream(e); false } + } + demand -= 1 + if (continue && demand > 0) self ! Generate + } + + override def initialBufferSize = settings.initialFanOutBufferSize + override def maxBufferSize = settings.maxFanOutBufferSize + + override def createSubscription(subscriber: Subscriber[T]): ActorSubscription[T] = + new ActorSubscription(self, subscriber) + + override def requestFromUpstream(elements: Int): Unit = demand += elements + + override def cancelUpstream(): Unit = context.stop(self) + override def shutdown(completed: Boolean): Unit = context.stop(self) + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index 284c50b785..ffe9600bc4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -13,8 +13,8 @@ case class OnError(cause: Throwable) case object SubscribePending -case class RequestMore(subscription: ActorSubscription, demand: Int) -case class Cancel(subscriptions: ActorSubscription) +case class RequestMore(subscription: ActorSubscription[_], demand: Int) +case class Cancel(subscriptions: ActorSubscription[_]) case class ExposedPublisher(publisher: ActorPublisher[Any]) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala index 71ba12c49b..1a5e59c6ea 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala @@ -62,6 +62,8 @@ private[akka] class ActorBasedProcessorGenerator(settings: GeneratorSettings, co producer.produceTo(consumer.asInstanceOf[Consumer[I]]) } + override def produce[T](f: () ⇒ T): Producer[T] = new ActorProducer(context.actorOf(ActorProducer.props(settings, f))) + def processorForNode(op: AstNode): Processor[Any, Any] = new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op))) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala index 2af9c3ae24..4f4c86c481 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala @@ -24,14 +24,10 @@ private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // const private[this] val maxSizeBit = Integer.numberOfTrailingZeros(maxSize) private[this] var array = new Array[Any](initialSize) - // Usual ring buffer implementations keep two pointers into the array which are wrapped around - // (mod array.length) upon increase. However, there is an ambiguity when writeIx == readIx since this state - // will be reached when the buffer is completely empty as well as when the buffer is completely full. - // An easy fix is to add another field (like 'count') that serves as additional data point resolving the - // ambiguity. However, adding another field adds more overhead than required and is especially inconvenient - // when supporting multiple readers. - // Therefore the approach we take here does not add another field, rather we don't wrap around the pointers - // at all but simply rebase them from time to time when we have to loop through the cursors anyway. + /* + * two counters counting the number of elements ever written and read; wrap-around is + * handled by always looking at differences or masked values + */ private[this] var writeIx = 0 private[this] var readIx = 0 // the "oldest" of all read cursor indices, i.e. the one that is most behind @@ -61,14 +57,6 @@ private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // const */ def maxAvailable: Int = (1 << maxSizeBit) - size - /** - * Applies availability bounds to the given element number. - * Equivalent to `min(maxAvailable, max(immediatelyAvailable, elements))`. - */ - // FIXME this is nonsense (always returns maxAvailable) - def potentiallyAvailable(elements: Int): Int = - math.min(maxAvailable, math.max(immediatelyAvailable, elements)) - /** * Returns the number of elements that the buffer currently contains for the given cursor. */ @@ -86,7 +74,7 @@ private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // const def write(value: T): Boolean = if (size < array.length) { // if we have space left we can simply write and be done array(writeIx & mask) = value - writeIx = writeIx + 1 + writeIx += 1 true } else if (lenBit < maxSizeBit) { // if we are full but can grow we do so // the growing logic is quite simple: we assemble all current buffer entries in the new array @@ -117,45 +105,31 @@ private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // const */ def read(cursor: Cursor): T = { val c = cursor.cursor - if (c < writeIx) { + if (c - writeIx < 0) { cursor.cursor += 1 - if (c == readIx) updateReadIxAndPotentiallyRebaseCursors() - array(c & mask).asInstanceOf[T] + val ret = array(c & mask).asInstanceOf[T] + if (c == readIx) updateReadIx() + ret } else throw NothingToReadException } def onCursorRemoved(cursor: Cursor): Unit = if (cursor.cursor == readIx) // if this cursor is the last one it must be at readIx - updateReadIxAndPotentiallyRebaseCursors() + updateReadIx() - private def updateReadIxAndPotentiallyRebaseCursors(): Unit = { - val threshold = rebaseThreshold - if (readIx > threshold) { - @tailrec def rebaseCursorsAndReturnMin(remaining: List[Cursor], result: Int): Int = - remaining match { - case head :: tail ⇒ - head.cursor -= threshold - rebaseCursorsAndReturnMin(tail, math.min(head.cursor, result)) - case _ ⇒ result - } - writeIx -= threshold - readIx = rebaseCursorsAndReturnMin(cursors.cursors, writeIx) - } else { - @tailrec def minCursor(remaining: List[Cursor], result: Int): Int = - remaining match { - case head :: tail ⇒ minCursor(tail, math.min(head.cursor, result)) - case _ ⇒ result - } - readIx = minCursor(cursors.cursors, writeIx) - // TODO: is not nulling-out the now unused buffer cells acceptable here? + private def updateReadIx(): Unit = { + @tailrec def minCursor(remaining: List[Cursor], result: Int): Int = + remaining match { + case head :: tail ⇒ minCursor(tail, math.min(head.cursor - writeIx, result)) + case _ ⇒ result + } + val newReadIx = writeIx + minCursor(cursors.cursors, 0) + while (readIx != newReadIx) { + array(readIx & mask) = null + readIx += 1 } } - // from time to time we need to rebase all pointers and cursors so they don't overflow, - // rebasing is performed when `readIx` is greater than this threshold which *must* be a multiple of array.length! - // default: the largest safe threshold which is the largest multiple of array.length that is < Int.MaxValue/2 - protected def rebaseThreshold: Int = (Int.MaxValue / 2) & ~mask - protected def underlyingArray: Array[Any] = array override def toString: String = diff --git a/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala index e18dae49cb..dbcc5818d7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala @@ -104,7 +104,7 @@ private[akka] abstract class AbstractStrictProducer[T]( }) } - protected def shutdown(): Unit = cancelUpstream() + protected def shutdown(completed: Boolean): Unit = cancelUpstream() protected def cancelUpstream(): Unit = pending = 0 // outside Publisher interface, can potentially called from another thread, diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala new file mode 100644 index 0000000000..000bfb511c --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import org.scalatest.testng.TestNGSuiteLike +import org.reactivestreams.spi.Publisher +import org.reactivestreams.tck.PublisherVerification +import akka.stream.testkit.TestProducer +import akka.stream.impl.ActorBasedProcessorGenerator +import org.reactivestreams.api.Producer + +class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { + import system.dispatcher + + private val factory = ProcessorGenerator(GeneratorSettings()) + + private def createProducer(elements: Int): Producer[Int] = { + val iter = Iterator from 1000 + val iter2 = if (elements > 0) iter take elements else iter + Stream(factory, () ⇒ if (iter2.hasNext) iter2.next() else throw Stream.Stop).toProducer(factory) + } + + def createPublisher(elements: Int): Publisher[Int] = createProducer(elements).getPublisher + + override def createCompletedStatePublisher(): Publisher[Int] = { + val pub = createProducer(1) + Stream(pub).consume(factory) + Thread.sleep(100) + pub.getPublisher + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala index daa9fc4ce3..39c1a1ae80 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala @@ -8,9 +8,10 @@ import akka.stream.testkit.{ ChainSetup, StreamTestKit } import akka.testkit._ import org.reactivestreams.api.Producer import org.scalatest.FreeSpecLike +import com.typesafe.config.ConfigFactory @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamSpec extends AkkaSpec { +class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import system.dispatcher @@ -24,6 +25,7 @@ class StreamSpec extends AkkaSpec { val identity2: Stream[Any] ⇒ Stream[Any] = in ⇒ identity(in) "A Stream" must { + for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) { s"requests initial elements from upstream ($name, $n)" in { new ChainSetup(op, genSettings.copy(initialInputBufferSize = n)) { @@ -48,9 +50,6 @@ class StreamSpec extends AkkaSpec { upstreamSubscription.sendNext("d") downstream.expectNext("b") downstream.expectNext("c") - upstream.expectRequestMore(upstreamSubscription, 1) - upstream.expectRequestMore(upstreamSubscription, 1) - upstream.expectRequestMore(upstreamSubscription, 1) } } @@ -214,11 +213,13 @@ class StreamSpec extends AkkaSpec { upstreamSubscription.sendNext("firstElement") downstream.expectNext("firstElement") - downstream2Subscription.requestMore(1) - downstream2.expectNext("firstElement") upstream.expectRequestMore(upstreamSubscription, 1) upstreamSubscription.sendNext("element2") + downstream.expectNoMsg(1.second) + downstream2Subscription.requestMore(1) + downstream2.expectNext("firstElement") + downstream.expectNext("element2") downstream2Subscription.requestMore(1) diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala index 7fc5470993..e79136ebdf 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala @@ -73,6 +73,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { consumer.expectNoMsg(200.millis) subscription.requestMore(1) consumer.expectNext(6) + subscription.requestMore(1) consumer.expectComplete() } diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala index 4bc391f04b..340a4aed18 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala @@ -8,9 +8,10 @@ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import akka.stream.impl.IteratorProducer import akka.testkit.EventFilter +import com.typesafe.config.ConfigFactory @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamTransformSpec extends AkkaSpec { +class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import system.dispatcher @@ -71,6 +72,7 @@ class StreamTransformSpec extends AkkaSpec { consumer.expectNoMsg(200.millis) subscription.requestMore(1) consumer.expectNext(6) + subscription.requestMore(1) consumer.expectComplete() } diff --git a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala index 008f207b7b..5350a1e97d 100644 --- a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala +++ b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala @@ -8,7 +8,7 @@ import org.testng.annotations.AfterClass import akka.testkit.AkkaSpec trait WithActorSystem { - val system: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf) + implicit val system: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf) @AfterClass def shutdownActorSystem(): Unit = system.shutdown() diff --git a/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala b/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala index a05caa01d3..dd0f91ec26 100644 --- a/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala @@ -33,12 +33,12 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 2 inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=3)" read(2) shouldEqual 1 - inspect shouldEqual "1 2 3 0 (size=2, writeIx=3, readIx=1, cursors=3)" + inspect shouldEqual "0 2 3 0 (size=2, writeIx=3, readIx=1, cursors=3)" read(1) shouldEqual 3 read(1) shouldEqual null read(2) shouldEqual 2 read(2) shouldEqual 3 - inspect shouldEqual "1 2 3 0 (size=0, writeIx=3, readIx=3, cursors=3)" + inspect shouldEqual "0 0 0 0 (size=0, writeIx=3, readIx=3, cursors=3)" } "fail writes if there is no more space" in new Test(iSize = 4, mSize = 4, cursorCount = 2) { @@ -65,11 +65,11 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 5 read(1) shouldEqual 6 read(1) shouldEqual null - inspect shouldEqual "5 6 3 4 (size=0, writeIx=2, readIx=2, cursors=2)" + inspect shouldEqual "0 0 0 0 (size=0, writeIx=6, readIx=6, cursors=2)" write(7) shouldEqual true write(8) shouldEqual true write(9) shouldEqual true - inspect shouldEqual "9 6 7 8 (size=3, writeIx=5, readIx=2, cursors=2)" + inspect shouldEqual "9 0 7 8 (size=3, writeIx=9, readIx=6, cursors=2)" read(0) shouldEqual 7 read(0) shouldEqual 8 read(0) shouldEqual 9 @@ -78,7 +78,7 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 8 read(1) shouldEqual 9 read(1) shouldEqual null - inspect shouldEqual "9 6 7 8 (size=0, writeIx=5, readIx=5, cursors=2)" + inspect shouldEqual "0 0 0 0 (size=0, writeIx=9, readIx=9, cursors=2)" } "automatically grow if possible" in new Test(iSize = 2, mSize = 8, cursorCount = 2) { @@ -96,7 +96,7 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 1 read(1) shouldEqual 2 write(5) shouldEqual true - inspect shouldEqual "5 2 3 4 (size=3, writeIx=5, readIx=2, cursors=2)" + inspect shouldEqual "5 0 3 4 (size=3, writeIx=5, readIx=2, cursors=2)" write(6) shouldEqual true inspect shouldEqual "5 6 3 4 (size=4, writeIx=6, readIx=2, cursors=2)" write(7) shouldEqual true @@ -112,7 +112,7 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 6 read(1) shouldEqual 7 read(1) shouldEqual null - inspect shouldEqual "3 4 5 6 7 0 0 0 (size=0, writeIx=5, readIx=5, cursors=2)" + inspect shouldEqual "0 0 0 0 0 0 0 0 (size=0, writeIx=5, readIx=5, cursors=2)" } "pass the stress test" in { @@ -189,7 +189,6 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { class Test(iSize: Int, mSize: Int, cursorCount: Int) extends TestBuffer(iSize, mSize, new SimpleCursors(cursorCount)) { def read(cursorIx: Int): Integer = try read(cursors.cursors(cursorIx)) catch { case NothingToReadException ⇒ null } - override def rebaseThreshold: Int = underlyingArray.length // use a low threshold in order to test the rebasing logic } class SimpleCursors(cursorCount: Int) extends Cursors { diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index e8540c35a1..18f1b0fd6e 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -86,8 +86,7 @@ trait ScriptedTest extends ShouldMatchers { var _debugLog = Vector.empty[String] var currentScript = script - var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(maximumOverrun) - remainingDemand = Math.max(1, remainingDemand) + var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun) debugLog(s"starting with remainingDemand=$remainingDemand") var pendingRequests = 0 var outstandingDemand = 0