From 659eff725afedd015ec401ba4c2ae9b1fbfba8ad Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sun, 30 Mar 2014 21:49:11 +0200 Subject: [PATCH] !str add ActorProducer and fix FanOutBox MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The ActorProducer is an actor-based Publisher which runs a thunk of code until Stop is thrown. This means that completion is only signaled if demand is present for one more element, which makes sense and it legal according to the wording of the spec. The TCK was too strict in this regard and has been relaxed. Things would have “worked” without the relaxation if I had not also fixed the output buffer management. SubscriptionManagement in collaboration with the ResizableMultiReaderRingBuffer previously generated demand on its own, acting like a true buffer. This is undesired since we want to auto-tune the input buffers, which would get a lot more complicated and instable with autonomously buffering output stages in the mix. Removing this extra-buffering uncovered several places in the test suite which implicitly relied on this, which were fixed as well. --- .../src/main/scala/akka/stream/Stream.scala | 9 + .../akka/stream/impl/AbstractProducer.scala | 178 ++++++++---------- .../akka/stream/impl/ActorProcessor.scala | 26 ++- .../akka/stream/impl/ActorProducer.scala | 99 +++++++++- .../scala/akka/stream/impl/Messages.scala | 4 +- .../akka/stream/impl/ProcessorGenerator.scala | 2 + .../impl/ResizableMultiReaderRingBuffer.scala | 66 ++----- .../akka/stream/impl/StrictProducer.scala | 2 +- .../scala/akka/stream/ActorProducerTest.scala | 32 ++++ .../test/scala/akka/stream/StreamSpec.scala | 13 +- .../stream/StreamTransformRecoverSpec.scala | 1 + .../akka/stream/StreamTransformSpec.scala | 4 +- .../scala/akka/stream/WithActorSystem.scala | 2 +- .../ResizableMultiReaderRingBufferSpec.scala | 15 +- .../akka/stream/testkit/ScriptedTest.scala | 3 +- 15 files changed, 273 insertions(+), 183 deletions(-) create mode 100644 akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala index 85275a6326..c33c1bce52 100644 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ b/akka-stream/src/main/scala/akka/stream/Stream.scala @@ -12,11 +12,16 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.util.Try import scala.concurrent.duration._ +import scala.util.control.NoStackTrace object Stream { def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(producer, Nil) def apply[T](iterator: Iterator[T])(implicit ec: ExecutionContext): Stream[T] = StreamImpl(new IteratorProducer(iterator), Nil) def apply[T](seq: immutable.Seq[T]) = ??? + + def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Stream[T] = apply(gen.produce(f)) + + object Stop extends RuntimeException with NoStackTrace } trait Stream[T] { @@ -57,6 +62,10 @@ trait ProcessorGenerator { * INTERNAL API */ private[akka] def consume[I](producer: Producer[I], ops: List[Ast.AstNode]): Unit + /** + * INTERNAL API + */ + private[akka] def produce[T](f: () ⇒ T): Producer[T] } // FIXME default values? Should we have an extension that reads from config? diff --git a/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala index 270e61ddff..0191f9d284 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala @@ -36,14 +36,12 @@ private[akka] object SubscriberManagement { /** * INTERNAL API */ -private[akka] trait SubscriptionWithCursor extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor { - def subscriber[T]: spi.Subscriber[T] +private[akka] trait SubscriptionWithCursor[T] extends spi.Subscription with ResizableMultiReaderRingBuffer.Cursor { + def subscriber: spi.Subscriber[T] def isActive: Boolean = cursor != Int.MinValue def deactivate(): Unit = cursor = Int.MinValue - def dispatch[T](element: T): Unit = subscriber.onNext(element) - - override def toString: String = "Subscription" + System.identityHashCode(this) // helpful for testing + def dispatch(element: T): Unit = subscriber.onNext(element) /////////////// internal interface, no unsynced access from subscriber's thread ////////////// @@ -56,24 +54,38 @@ private[akka] trait SubscriptionWithCursor extends spi.Subscription with Resizab */ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuffer.Cursors { import SubscriberManagement._ - type S <: SubscriptionWithCursor + type S <: SubscriptionWithCursor[T] type Subscriptions = List[S] def initialBufferSize: Int def maxBufferSize: Int - // we keep an element (ring) buffer which serves two purposes: - // 1. Pre-fetch elements from our own upstream before they are requested by our downstream. - // 2. Allow for limited request rate differences between several downstream subscribers. - // - // The buffering logic is as follows: - // 1. We always request as many elements from our upstream as are still free in our current buffer instance. - // If a subscriber requests more than this number (either once or as a result of several `requestMore` calls) - // we resize the buffer if possible. - // 2. If two subscribers drift apart in their request rates we use the buffer for keeping the elements that the - // slow subscriber is behind with, thereby resizing the buffer if necessary and still allowed. + /** + * called when we are ready to consume more elements from our upstream + * MUST NOT call pushToDownstream + */ + protected def requestFromUpstream(elements: Int): Unit + + /** + * called before `shutdown()` if the stream is *not* being regularly completed + * but shut-down due to the last subscriber having cancelled its subscription + */ + protected def cancelUpstream(): Unit + + /** + * called when the spi.Publisher/Processor is ready to be shut down + */ + protected def shutdown(completed: Boolean): Unit + + /** + * Use to register a subscriber + */ + protected def createSubscription(subscriber: spi.Subscriber[T]): S + private[this] val buffer = new ResizableMultiReaderRingBuffer[T](initialBufferSize, maxBufferSize, this) + protected def bufferDebug: String = buffer.toString + // optimize for small numbers of subscribers by keeping subscribers in a plain list private[this] var subscriptions: Subscriptions = Nil @@ -83,25 +95,11 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff // if non-null, holds the end-of-stream state private[this] var endOfStream: EndOfStream = NotReached - // called when we are ready to consume more elements from our upstream - // the implementation of this method is allowed to synchronously call `pushToDownstream` - // if (part of) the requested elements are already available - protected def requestFromUpstream(elements: Int): Unit - - // called before `shutdown()` if the stream is *not* being regularly completed - // but shut-down due to the last subscriber having cancelled its subscription - protected def cancelUpstream(): Unit - - // called when the spi.Publisher/Processor is ready to be shut down. - protected def shutdown(): Unit - - // Use to register a subscriber - protected def createSubscription(subscriber: spi.Subscriber[T]): S - def cursors = subscriptions - // called from `Subscription::requestMore`, i.e. from another thread, - // override to add synchronization with itself, `subscribe` and `unregisterSubscription` + /** + * more demand was signaled from a given subscriber + */ protected def moreRequested(subscription: S, elements: Int): Unit = if (subscription.isActive) { @@ -110,31 +108,22 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff if (requested == 0) { // if we are at end-of-stream and have nothing more to read we complete now rather than after the next `requestMore` if ((eos ne NotReached) && buffer.count(subscription) == 0) Long.MinValue else 0 - } else { - val x = - try { - subscription.dispatch(buffer.read(subscription)) // as long as we can produce elements from our buffer we do so synchronously - -1 // we can't directly tailrec from here since we are in a try/catch, so signal with special value - } catch { - case NothingToReadException ⇒ if (eos ne NotReached) Long.MinValue else requested // terminate or request from upstream - } - if (x == -1) dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) else x - } + } else if (buffer.count(subscription) > 0) { + subscription.dispatch(buffer.read(subscription)) + dispatchFromBufferAndReturnRemainingRequested(requested - 1, eos) + } else if (eos ne NotReached) Long.MinValue + else requested endOfStream match { case eos @ (NotReached | Completed) ⇒ val demand = subscription.requested + elements - assert(demand >= 0) dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { case Long.MinValue ⇒ eos(subscription.subscriber) unregisterSubscriptionInternal(subscription) - case 0 ⇒ - subscription.requested = 0 + case x ⇒ + subscription.requested = x requestFromUpstreamIfRequired() - case requested ⇒ - subscription.requested = requested - requestFromUpstreamIfPossible(requested) } case ErrorCompleted(_) ⇒ // ignore, the spi.Subscriber might not have seen our error event yet } @@ -146,81 +135,73 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff case head :: tail ⇒ maxRequested(tail, math.max(head.requested, result)) case _ ⇒ result } - if (pendingFromUpstream == 0) - requestFromUpstreamIfPossible(maxRequested(subscriptions)) - } - - private[this] final def requestFromUpstreamIfPossible(elements: Long): Unit = { - val toBeRequested = buffer.potentiallyAvailable(math.min(elements, Int.MaxValue).toInt) // Cap at Int.MaxValue - if (toBeRequested > 0) { - pendingFromUpstream += toBeRequested - requestFromUpstream(toBeRequested) + val desired = Math.min(Int.MaxValue, Math.min(maxRequested(subscriptions), buffer.maxAvailable) - pendingFromUpstream).toInt + if (desired > 0) { + pendingFromUpstream += desired + requestFromUpstream(desired) } } - // this method must be called by the implementing class whenever a new value is available to be pushed downstream + /** + * this method must be called by the implementing class whenever a new value is available to be pushed downstream + */ protected def pushToDownstream(value: T): Unit = { - @tailrec def dispatchAndReturnMaxRequested(remaining: Subscriptions, result: Long = 0): Long = + @tailrec def dispatch(remaining: Subscriptions, sent: Boolean = false): Boolean = remaining match { case head :: tail ⇒ - var requested = head.requested - if (requested > 0) - try { - val element = buffer.read(head) - head.dispatch(element) - requested -= 1 - head.requested = requested - } catch { - case NothingToReadException ⇒ throw new IllegalStateException("Output buffer read failure") // we just wrote a value, why can't we read it? - } - dispatchAndReturnMaxRequested(tail, math.max(result, requested)) - case _ ⇒ result + if (head.requested > 0) { + val element = buffer.read(head) + head.dispatch(element) + head.requested -= 1 + dispatch(tail, true) + } else dispatch(tail, sent) + case _ ⇒ sent } endOfStream match { case NotReached ⇒ pendingFromUpstream -= 1 if (!buffer.write(value)) throw new IllegalStateException("Output buffer overflow") - val maxRequested = dispatchAndReturnMaxRequested(subscriptions) - if (pendingFromUpstream == 0) requestFromUpstreamIfPossible(maxRequested) - case ShutDown ⇒ // don't throw if we have transitioned into shutdown in the mean-time, since there is an expected - case _ ⇒ // race condition between `cancelUpstream` and `pushToDownstream` + if (dispatch(subscriptions)) requestFromUpstreamIfRequired() + case _ ⇒ throw new IllegalStateException("pushToDownStream(...) after completeDownstream() or abortDownstream(...)") } } - // this method must be called by the implementing class whenever - // it has been determined that no more elements will be produced + /** + * this method must be called by the implementing class whenever + * it has been determined that no more elements will be produced + */ protected def completeDownstream(): Unit = { if (endOfStream eq NotReached) { - // we complete all subscriptions that have no more buffered elements - // is non-tail recursion acceptable here? (it's the fastest impl but might stack overflow for large numbers of subscribers) @tailrec def completeDoneSubscriptions(remaining: Subscriptions, result: Subscriptions = Nil): Subscriptions = - remaining match { case head :: tail ⇒ - val newResult = - if (buffer.count(head) == 0) { - head.deactivate() - Completed(head.subscriber) - result - } else head :: result - completeDoneSubscriptions(tail, newResult) + if (buffer.count(head) == 0) { + head.deactivate() + Completed(head.subscriber) + completeDoneSubscriptions(tail, result) + } else completeDoneSubscriptions(tail, head :: result) case _ ⇒ result } endOfStream = Completed subscriptions = completeDoneSubscriptions(subscriptions) - if (subscriptions.isEmpty) shutdown() + if (subscriptions.isEmpty) shutdown(completed = true) } // else ignore, we need to be idempotent } - // this method must be called by the implementing class to push an error downstream + /** + * this method must be called by the implementing class to push an error downstream + */ protected def abortDownstream(cause: Throwable): Unit = { endOfStream = ErrorCompleted(cause) subscriptions.foreach(s ⇒ endOfStream(s.subscriber)) subscriptions = Nil } + /** + * Register a new subscriber. + */ protected def registerSubscriber(subscriber: spi.Subscriber[T]): Unit = endOfStream match { case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒ subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) @@ -238,17 +219,18 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff eos(subscriber) } - // called from `Subscription::cancel`, i.e. from another thread, - // override to add synchronization with itself, `subscribe` and `moreRequested` + /** + * called from `Subscription::cancel`, i.e. from another thread, + * override to add synchronization with itself, `subscribe` and `moreRequested` + */ protected def unregisterSubscription(subscription: S): Unit = unregisterSubscriptionInternal(subscription) // must be idempotent private def unregisterSubscriptionInternal(subscription: S): Unit = { - // is non-tail recursion acceptable here? (it's the fastest impl but might stack overflow for large numbers of subscribers) - def removeFrom(remaining: Subscriptions): Subscriptions = + @tailrec def removeFrom(remaining: Subscriptions, result: Subscriptions = Nil): Subscriptions = remaining match { - case head :: tail ⇒ if (head eq subscription) tail else head :: removeFrom(tail) + case head :: tail ⇒ if (head eq subscription) tail reverse_::: result else removeFrom(tail, head :: result) case _ ⇒ throw new IllegalStateException("Subscription to unregister not found") } if (subscription.isActive) { @@ -260,7 +242,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff endOfStream = ShutDown cancelUpstream() } - shutdown() + shutdown(completed = false) } else requestFromUpstreamIfRequired() // we might have removed a "blocking" subscriber and can continue now } // else ignore, we need to be idempotent } @@ -288,9 +270,7 @@ private[akka] abstract class AbstractProducer[T](val initialBufferSize: Int, val override def createSubscription(subscriber: spi.Subscriber[T]): S = new Subscription(subscriber) - protected class Subscription(val _subscriber: spi.Subscriber[T]) extends SubscriptionWithCursor { - def subscriber[B]: spi.Subscriber[B] = _subscriber.asInstanceOf[spi.Subscriber[B]] - + protected class Subscription(val subscriber: spi.Subscriber[T]) extends SubscriptionWithCursor[T] { override def requestMore(elements: Int): Unit = if (elements <= 0) throw new IllegalArgumentException("Argument must be > 0") else moreRequested(this, elements) // needs to be able to ignore calls after termination / cancellation diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index f3d29a7e18..1a3c605f36 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -11,6 +11,7 @@ import org.reactivestreams.api.Processor import org.reactivestreams.spi.{ Subscriber, Subscription } import akka.actor.{ Actor, ActorLogging, ActorRef, Props } import akka.stream.GeneratorSettings +import akka.event.LoggingReceive /** * INTERNAL API @@ -30,7 +31,7 @@ class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] wi */ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) extends Actor with SubscriberManagement[Any] with ActorLogging { import ActorProcessor._ - type S = ActorSubscription + type S = ActorSubscription[Any] override def maxBufferSize: Int = settings.maxFanOutBufferSize override def initialBufferSize: Int = settings.initialFanOutBufferSize @@ -51,7 +52,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) } def waitingForUpstream: Receive = downstreamManagement orElse { - case OnComplete ⇒ shutdown() // There is nothing to flush here + case OnComplete ⇒ shutdown(completed = true) // There is nothing to flush here case OnSubscribe(subscription) ⇒ assert(subscription != null) upstream = subscription @@ -68,10 +69,10 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) case SubscribePending ⇒ subscribePending() case RequestMore(subscription, elements) ⇒ - moreRequested(subscription, elements) + moreRequested(subscription.asInstanceOf[S], elements) pump() case Cancel(subscription) ⇒ - unregisterSubscription(subscription) + unregisterSubscription(subscription.asInstanceOf[S]) pump() } @@ -80,7 +81,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) ////////////////////// Active state ////////////////////// - def running: Receive = downstreamManagement orElse { + def running: Receive = LoggingReceive(downstreamManagement orElse { case OnNext(element) ⇒ enqueueInputElement(element) pump() @@ -88,7 +89,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) flushAndComplete() pump() case OnError(cause) ⇒ failureReceived(cause) - } + }) // Called by SubscriberManagement when all subscribers are gone. // The method shutdown() is called automatically by SubscriberManagement after it called this method. @@ -96,6 +97,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) // Called by SubscriberManagement whenever the output buffer is ready to accept additional elements override protected def requestFromUpstream(elements: Int): Unit = { + log.debug(s"received downstream demand from buffer: $elements") downstreamBufferSpace += elements } @@ -176,13 +178,17 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) // Generate upstream requestMore for every Nth consumed input element protected def pump(): Unit = { try while (transferState.isExecutable) { + log.debug(s"iterating the pump with state $transferState and buffer $bufferDebug") transferState = transfer(transferState) } catch { case NonFatal(e) ⇒ fail(e) } + log.debug(s"finished iterating the pump with state $transferState and buffer $bufferDebug") + if (transferState.isCompleted) { if (!isShuttingDown) { + log.debug("shutting down the pump") if (!upstreamCompleted) upstream.cancel() - Arrays.fill(inputBuffer, nextInputElementCursor, nextInputElementCursor + inputBufferElements, null) + Arrays.fill(inputBuffer, 0, inputBuffer.length, null) inputBufferElements = 0 context.become(flushing) isShuttingDown = true @@ -212,16 +218,18 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) ////////////////////// Shutdown and cleanup (graceful and abort) ////////////////////// var isShuttingDown = false + var completed = false // Called by SubscriberManagement to signal that output buffer finished (flushed or aborted) - override def shutdown(): Unit = { + override def shutdown(completed: Boolean): Unit = { isShuttingDown = true + this.completed = completed context.stop(self) } override def postStop(): Unit = { if (exposedPublisher ne null) - exposedPublisher.shutdown() + exposedPublisher.shutdown(completed) // Non-gracefully stopped, do our best here if (!isShuttingDown) abortDownstream(new IllegalStateException("Processor actor terminated abruptly")) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index 021a91b6da..bf15a2bf21 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -4,13 +4,18 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference - import scala.annotation.tailrec - import org.reactivestreams.api.{ Consumer, Producer } import org.reactivestreams.spi.{ Publisher, Subscriber } - import akka.actor.ActorRef +import akka.stream.GeneratorSettings +import akka.actor.ActorLogging +import akka.actor.Actor +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal +import akka.actor.Props +import scala.util.control.NoStackTrace +import akka.stream.Stream trait ActorProducerLike[T] extends Producer[T] { def impl: ActorRef @@ -27,6 +32,11 @@ trait ActorProducerLike[T] extends Producer[T] { class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T] +object ActorProducer { + def props[T](settings: GeneratorSettings, f: () ⇒ T): Props = + Props(new ActorProducerImpl(f, settings)) +} + final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { // The subscriber of an subscription attempt is first placed in this list of pending subscribers. @@ -55,20 +65,93 @@ final class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { def takePendingSubscribers(): List[Subscriber[T]] = pendingSubscribers.getAndSet(Nil) - def shutdown(): Unit = + def shutdown(completed: Boolean): Unit = { + this.completed = completed pendingSubscribers.getAndSet(null) foreach reportShutdownError + } + + @volatile private var completed: Boolean = false private def reportShutdownError(subscriber: Subscriber[T]): Unit = - subscriber.onError(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) + if (completed) subscriber.onComplete() + else subscriber.onError(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) } -class ActorSubscription( final val impl: ActorRef, final val _subscriber: Subscriber[Any]) extends SubscriptionWithCursor { - override def subscriber[T]: Subscriber[T] = _subscriber.asInstanceOf[Subscriber[T]] +class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[T]) extends SubscriptionWithCursor[T] { override def requestMore(elements: Int): Unit = if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") else impl ! RequestMore(this, elements) override def cancel(): Unit = impl ! Cancel(this) - override def toString = "ActorSubscription" } +object ActorProducerImpl { + case object Generate +} + +class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] { + import Stream._ + import ActorProducerImpl._ + + type S = ActorSubscription[T] + var pub: ActorPublisher[T] = _ + var completed = false + + context.setReceiveTimeout(settings.downstreamSubscriptionTimeout) + + def receive = { + case ExposedPublisher(pub) ⇒ + this.pub = pub.asInstanceOf[ActorPublisher[T]] + context.become(waitingForSubscribers) + } + + def waitingForSubscribers: Receive = { + case SubscribePending ⇒ + pub.takePendingSubscribers() foreach registerSubscriber + context.setReceiveTimeout(Duration.Undefined) + context.become(active) + } + + def active: Receive = { + case SubscribePending ⇒ + pub.takePendingSubscribers() foreach registerSubscriber + case RequestMore(sub, elements) ⇒ + moreRequested(sub.asInstanceOf[S], elements) + generate() + case Cancel(sub) ⇒ + unregisterSubscription(sub.asInstanceOf[S]) + generate() + case Generate ⇒ + generate() + } + + override def postStop(): Unit = { + pub.shutdown(completed) + } + + private var demand = 0 + private def generate(): Unit = { + val continue = + try { + pushToDownstream(f()) + true + } catch { + case Stop ⇒ { completeDownstream(); completed = true; false } + case NonFatal(e) ⇒ { abortDownstream(e); false } + } + demand -= 1 + if (continue && demand > 0) self ! Generate + } + + override def initialBufferSize = settings.initialFanOutBufferSize + override def maxBufferSize = settings.maxFanOutBufferSize + + override def createSubscription(subscriber: Subscriber[T]): ActorSubscription[T] = + new ActorSubscription(self, subscriber) + + override def requestFromUpstream(elements: Int): Unit = demand += elements + + override def cancelUpstream(): Unit = context.stop(self) + override def shutdown(completed: Boolean): Unit = context.stop(self) + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index 284c50b785..ffe9600bc4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -13,8 +13,8 @@ case class OnError(cause: Throwable) case object SubscribePending -case class RequestMore(subscription: ActorSubscription, demand: Int) -case class Cancel(subscriptions: ActorSubscription) +case class RequestMore(subscription: ActorSubscription[_], demand: Int) +case class Cancel(subscriptions: ActorSubscription[_]) case class ExposedPublisher(publisher: ActorPublisher[Any]) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala index 71ba12c49b..1a5e59c6ea 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala @@ -62,6 +62,8 @@ private[akka] class ActorBasedProcessorGenerator(settings: GeneratorSettings, co producer.produceTo(consumer.asInstanceOf[Consumer[I]]) } + override def produce[T](f: () ⇒ T): Producer[T] = new ActorProducer(context.actorOf(ActorProducer.props(settings, f))) + def processorForNode(op: AstNode): Processor[Any, Any] = new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op))) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala index 2af9c3ae24..4f4c86c481 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ResizableMultiReaderRingBuffer.scala @@ -24,14 +24,10 @@ private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // const private[this] val maxSizeBit = Integer.numberOfTrailingZeros(maxSize) private[this] var array = new Array[Any](initialSize) - // Usual ring buffer implementations keep two pointers into the array which are wrapped around - // (mod array.length) upon increase. However, there is an ambiguity when writeIx == readIx since this state - // will be reached when the buffer is completely empty as well as when the buffer is completely full. - // An easy fix is to add another field (like 'count') that serves as additional data point resolving the - // ambiguity. However, adding another field adds more overhead than required and is especially inconvenient - // when supporting multiple readers. - // Therefore the approach we take here does not add another field, rather we don't wrap around the pointers - // at all but simply rebase them from time to time when we have to loop through the cursors anyway. + /* + * two counters counting the number of elements ever written and read; wrap-around is + * handled by always looking at differences or masked values + */ private[this] var writeIx = 0 private[this] var readIx = 0 // the "oldest" of all read cursor indices, i.e. the one that is most behind @@ -61,14 +57,6 @@ private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // const */ def maxAvailable: Int = (1 << maxSizeBit) - size - /** - * Applies availability bounds to the given element number. - * Equivalent to `min(maxAvailable, max(immediatelyAvailable, elements))`. - */ - // FIXME this is nonsense (always returns maxAvailable) - def potentiallyAvailable(elements: Int): Int = - math.min(maxAvailable, math.max(immediatelyAvailable, elements)) - /** * Returns the number of elements that the buffer currently contains for the given cursor. */ @@ -86,7 +74,7 @@ private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // const def write(value: T): Boolean = if (size < array.length) { // if we have space left we can simply write and be done array(writeIx & mask) = value - writeIx = writeIx + 1 + writeIx += 1 true } else if (lenBit < maxSizeBit) { // if we are full but can grow we do so // the growing logic is quite simple: we assemble all current buffer entries in the new array @@ -117,45 +105,31 @@ private[akka] class ResizableMultiReaderRingBuffer[T](initialSize: Int, // const */ def read(cursor: Cursor): T = { val c = cursor.cursor - if (c < writeIx) { + if (c - writeIx < 0) { cursor.cursor += 1 - if (c == readIx) updateReadIxAndPotentiallyRebaseCursors() - array(c & mask).asInstanceOf[T] + val ret = array(c & mask).asInstanceOf[T] + if (c == readIx) updateReadIx() + ret } else throw NothingToReadException } def onCursorRemoved(cursor: Cursor): Unit = if (cursor.cursor == readIx) // if this cursor is the last one it must be at readIx - updateReadIxAndPotentiallyRebaseCursors() + updateReadIx() - private def updateReadIxAndPotentiallyRebaseCursors(): Unit = { - val threshold = rebaseThreshold - if (readIx > threshold) { - @tailrec def rebaseCursorsAndReturnMin(remaining: List[Cursor], result: Int): Int = - remaining match { - case head :: tail ⇒ - head.cursor -= threshold - rebaseCursorsAndReturnMin(tail, math.min(head.cursor, result)) - case _ ⇒ result - } - writeIx -= threshold - readIx = rebaseCursorsAndReturnMin(cursors.cursors, writeIx) - } else { - @tailrec def minCursor(remaining: List[Cursor], result: Int): Int = - remaining match { - case head :: tail ⇒ minCursor(tail, math.min(head.cursor, result)) - case _ ⇒ result - } - readIx = minCursor(cursors.cursors, writeIx) - // TODO: is not nulling-out the now unused buffer cells acceptable here? + private def updateReadIx(): Unit = { + @tailrec def minCursor(remaining: List[Cursor], result: Int): Int = + remaining match { + case head :: tail ⇒ minCursor(tail, math.min(head.cursor - writeIx, result)) + case _ ⇒ result + } + val newReadIx = writeIx + minCursor(cursors.cursors, 0) + while (readIx != newReadIx) { + array(readIx & mask) = null + readIx += 1 } } - // from time to time we need to rebase all pointers and cursors so they don't overflow, - // rebasing is performed when `readIx` is greater than this threshold which *must* be a multiple of array.length! - // default: the largest safe threshold which is the largest multiple of array.length that is < Int.MaxValue/2 - protected def rebaseThreshold: Int = (Int.MaxValue / 2) & ~mask - protected def underlyingArray: Array[Any] = array override def toString: String = diff --git a/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala index e18dae49cb..dbcc5818d7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StrictProducer.scala @@ -104,7 +104,7 @@ private[akka] abstract class AbstractStrictProducer[T]( }) } - protected def shutdown(): Unit = cancelUpstream() + protected def shutdown(completed: Boolean): Unit = cancelUpstream() protected def cancelUpstream(): Unit = pending = 0 // outside Publisher interface, can potentially called from another thread, diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala new file mode 100644 index 0000000000..000bfb511c --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import org.scalatest.testng.TestNGSuiteLike +import org.reactivestreams.spi.Publisher +import org.reactivestreams.tck.PublisherVerification +import akka.stream.testkit.TestProducer +import akka.stream.impl.ActorBasedProcessorGenerator +import org.reactivestreams.api.Producer + +class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { + import system.dispatcher + + private val factory = ProcessorGenerator(GeneratorSettings()) + + private def createProducer(elements: Int): Producer[Int] = { + val iter = Iterator from 1000 + val iter2 = if (elements > 0) iter take elements else iter + Stream(factory, () ⇒ if (iter2.hasNext) iter2.next() else throw Stream.Stop).toProducer(factory) + } + + def createPublisher(elements: Int): Publisher[Int] = createProducer(elements).getPublisher + + override def createCompletedStatePublisher(): Publisher[Int] = { + val pub = createProducer(1) + Stream(pub).consume(factory) + Thread.sleep(100) + pub.getPublisher + } +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala index daa9fc4ce3..39c1a1ae80 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamSpec.scala @@ -8,9 +8,10 @@ import akka.stream.testkit.{ ChainSetup, StreamTestKit } import akka.testkit._ import org.reactivestreams.api.Producer import org.scalatest.FreeSpecLike +import com.typesafe.config.ConfigFactory @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamSpec extends AkkaSpec { +class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import system.dispatcher @@ -24,6 +25,7 @@ class StreamSpec extends AkkaSpec { val identity2: Stream[Any] ⇒ Stream[Any] = in ⇒ identity(in) "A Stream" must { + for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) { s"requests initial elements from upstream ($name, $n)" in { new ChainSetup(op, genSettings.copy(initialInputBufferSize = n)) { @@ -48,9 +50,6 @@ class StreamSpec extends AkkaSpec { upstreamSubscription.sendNext("d") downstream.expectNext("b") downstream.expectNext("c") - upstream.expectRequestMore(upstreamSubscription, 1) - upstream.expectRequestMore(upstreamSubscription, 1) - upstream.expectRequestMore(upstreamSubscription, 1) } } @@ -214,11 +213,13 @@ class StreamSpec extends AkkaSpec { upstreamSubscription.sendNext("firstElement") downstream.expectNext("firstElement") - downstream2Subscription.requestMore(1) - downstream2.expectNext("firstElement") upstream.expectRequestMore(upstreamSubscription, 1) upstreamSubscription.sendNext("element2") + downstream.expectNoMsg(1.second) + downstream2Subscription.requestMore(1) + downstream2.expectNext("firstElement") + downstream.expectNext("element2") downstream2Subscription.requestMore(1) diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala index 7fc5470993..e79136ebdf 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala @@ -73,6 +73,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { consumer.expectNoMsg(200.millis) subscription.requestMore(1) consumer.expectNext(6) + subscription.requestMore(1) consumer.expectComplete() } diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala index 4bc391f04b..340a4aed18 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala @@ -8,9 +8,10 @@ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import akka.stream.impl.IteratorProducer import akka.testkit.EventFilter +import com.typesafe.config.ConfigFactory @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamTransformSpec extends AkkaSpec { +class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import system.dispatcher @@ -71,6 +72,7 @@ class StreamTransformSpec extends AkkaSpec { consumer.expectNoMsg(200.millis) subscription.requestMore(1) consumer.expectNext(6) + subscription.requestMore(1) consumer.expectComplete() } diff --git a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala index 008f207b7b..5350a1e97d 100644 --- a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala +++ b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala @@ -8,7 +8,7 @@ import org.testng.annotations.AfterClass import akka.testkit.AkkaSpec trait WithActorSystem { - val system: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf) + implicit val system: ActorSystem = ActorSystem(getClass.getSimpleName, AkkaSpec.testConf) @AfterClass def shutdownActorSystem(): Unit = system.shutdown() diff --git a/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala b/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala index a05caa01d3..dd0f91ec26 100644 --- a/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/impl/ResizableMultiReaderRingBufferSpec.scala @@ -33,12 +33,12 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 2 inspect shouldEqual "1 2 3 0 (size=3, writeIx=3, readIx=0, cursors=3)" read(2) shouldEqual 1 - inspect shouldEqual "1 2 3 0 (size=2, writeIx=3, readIx=1, cursors=3)" + inspect shouldEqual "0 2 3 0 (size=2, writeIx=3, readIx=1, cursors=3)" read(1) shouldEqual 3 read(1) shouldEqual null read(2) shouldEqual 2 read(2) shouldEqual 3 - inspect shouldEqual "1 2 3 0 (size=0, writeIx=3, readIx=3, cursors=3)" + inspect shouldEqual "0 0 0 0 (size=0, writeIx=3, readIx=3, cursors=3)" } "fail writes if there is no more space" in new Test(iSize = 4, mSize = 4, cursorCount = 2) { @@ -65,11 +65,11 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 5 read(1) shouldEqual 6 read(1) shouldEqual null - inspect shouldEqual "5 6 3 4 (size=0, writeIx=2, readIx=2, cursors=2)" + inspect shouldEqual "0 0 0 0 (size=0, writeIx=6, readIx=6, cursors=2)" write(7) shouldEqual true write(8) shouldEqual true write(9) shouldEqual true - inspect shouldEqual "9 6 7 8 (size=3, writeIx=5, readIx=2, cursors=2)" + inspect shouldEqual "9 0 7 8 (size=3, writeIx=9, readIx=6, cursors=2)" read(0) shouldEqual 7 read(0) shouldEqual 8 read(0) shouldEqual 9 @@ -78,7 +78,7 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 8 read(1) shouldEqual 9 read(1) shouldEqual null - inspect shouldEqual "9 6 7 8 (size=0, writeIx=5, readIx=5, cursors=2)" + inspect shouldEqual "0 0 0 0 (size=0, writeIx=9, readIx=9, cursors=2)" } "automatically grow if possible" in new Test(iSize = 2, mSize = 8, cursorCount = 2) { @@ -96,7 +96,7 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 1 read(1) shouldEqual 2 write(5) shouldEqual true - inspect shouldEqual "5 2 3 4 (size=3, writeIx=5, readIx=2, cursors=2)" + inspect shouldEqual "5 0 3 4 (size=3, writeIx=5, readIx=2, cursors=2)" write(6) shouldEqual true inspect shouldEqual "5 6 3 4 (size=4, writeIx=6, readIx=2, cursors=2)" write(7) shouldEqual true @@ -112,7 +112,7 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { read(1) shouldEqual 6 read(1) shouldEqual 7 read(1) shouldEqual null - inspect shouldEqual "3 4 5 6 7 0 0 0 (size=0, writeIx=5, readIx=5, cursors=2)" + inspect shouldEqual "0 0 0 0 0 0 0 0 (size=0, writeIx=5, readIx=5, cursors=2)" } "pass the stress test" in { @@ -189,7 +189,6 @@ class ResizableMultiReaderRingBufferSpec extends WordSpec with ShouldMatchers { class Test(iSize: Int, mSize: Int, cursorCount: Int) extends TestBuffer(iSize, mSize, new SimpleCursors(cursorCount)) { def read(cursorIx: Int): Integer = try read(cursors.cursors(cursorIx)) catch { case NothingToReadException ⇒ null } - override def rebaseThreshold: Int = underlyingArray.length // use a low threshold in order to test the rebasing logic } class SimpleCursors(cursorCount: Int) extends Cursors { diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index e8540c35a1..18f1b0fd6e 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -86,8 +86,7 @@ trait ScriptedTest extends ShouldMatchers { var _debugLog = Vector.empty[String] var currentScript = script - var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(maximumOverrun) - remainingDemand = Math.max(1, remainingDemand) + var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun) debugLog(s"starting with remainingDemand=$remainingDemand") var pendingRequests = 0 var outstandingDemand = 0