+str reintroduced the TCK, 0.4.0.M2-SNAPSHOT, passing all tests

Cheers from JavaZone!
This commit is contained in:
Konrad 'ktoso' Malawski 2014-08-21 16:07:09 +02:00
parent 597ad076e4
commit 54e55a659c
50 changed files with 713 additions and 305 deletions

View file

@ -18,7 +18,7 @@ object HttpClientProcessor {
def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)], def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)],
responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] = responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] =
new HttpClientProcessor[T] { new HttpClientProcessor[T] {
override def subscribe(s: Subscriber[(HttpResponse, T)]): Unit = responsePublisher.subscribe(s) override def subscribe(s: Subscriber[_ >: (HttpResponse, T)]): Unit = responsePublisher.subscribe(s)
override def onError(t: Throwable): Unit = requestSubscriber.onError(t) override def onError(t: Throwable): Unit = requestSubscriber.onError(t)
override def onSubscribe(s: Subscription): Unit = requestSubscriber.onSubscribe(s) override def onSubscribe(s: Subscription): Unit = requestSubscriber.onSubscribe(s)

View file

@ -122,7 +122,7 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings:
} }
} }
override def requestFromUpstream(elements: Int): Unit = override def requestFromUpstream(elements: Long): Unit =
buffer ! Request(elements) buffer ! Request(elements)
override def initialBufferSize = override def initialBufferSize =
@ -131,7 +131,7 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings:
override def maxBufferSize = override def maxBufferSize =
materializerSettings.maxFanOutBufferSize materializerSettings.maxFanOutBufferSize
override def createSubscription(subscriber: Subscriber[Any]): ActorSubscription[Any] = override def createSubscription(subscriber: Subscriber[_ >: Any]): ActorSubscription[Any] =
new ActorSubscription(self, subscriber) new ActorSubscription(self, subscriber)
override def cancelUpstream(): Unit = { override def cancelUpstream(): Unit = {
@ -151,7 +151,7 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings:
} }
private object PersistentPublisherBuffer { private object PersistentPublisherBuffer {
case class Request(num: Int) case class Request(n: Long)
case class Response(events: Vector[Any]) case class Response(events: Vector[Any])
case object Fill case object Fill
@ -168,31 +168,31 @@ private class PersistentPublisherBuffer(override val persistenceId: String, publ
import PersistentPublisherBuffer._ import PersistentPublisherBuffer._
import context.dispatcher import context.dispatcher
private var replayed = 0 private var replayed = 0L
private var requested = 0 private var pendingDemand = 0L
private var buffer: Vector[Any] = Vector.empty private var buffer: Vector[Any] = Vector.empty
override def viewId: String = persistenceId + "-stream-view" override def viewId: String = persistenceId + "-stream-view"
private val filling: Receive = { private val filling: Receive = {
case Filled case Filled
if (buffer.nonEmpty && requested > 0) respond(requested) if (buffer.nonEmpty && pendingDemand > 0) respond(pendingDemand)
if (buffer.nonEmpty) pause() if (buffer.nonEmpty) pause()
else if (replayed > 0) fill() else if (replayed > 0) fill()
else schedule() else schedule()
case Request(num) case Request(num)
requested += num pendingDemand += num
if (buffer.nonEmpty) respond(requested) if (buffer.nonEmpty) respond(pendingDemand)
case persistentEvent case persistentEvent
buffer :+= persistentEvent buffer :+= persistentEvent
replayed += 1 replayed += 1
if (requested > 0) respond(requested) if (pendingDemand > 0) respond(pendingDemand)
} }
private val pausing: Receive = { private val pausing: Receive = {
case Request(num) case Request(num)
requested += num pendingDemand += num
respond(requested) respond(pendingDemand)
if (buffer.isEmpty) fill() if (buffer.isEmpty) fill()
} }
@ -200,7 +200,7 @@ private class PersistentPublisherBuffer(override val persistenceId: String, publ
case Fill case Fill
fill() fill()
case Request(num) case Request(num)
requested += num pendingDemand += num
} }
def receive = filling def receive = filling
@ -242,10 +242,16 @@ private class PersistentPublisherBuffer(override val persistenceId: String, publ
context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill) context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill)
} }
private def respond(num: Int): Unit = { // TODO Breaks now?
val (res, buf) = buffer.splitAt(num) private def respond(num: Long): Unit = {
publisher ! Response(res) if (num <= Int.MaxValue) {
buffer = buf val n = num.toInt
requested -= res.size val (res, buf) = buffer.splitAt(n)
publisher ! Response(res)
buffer = buf
pendingDemand -= res.size
} else {
respond(Int.MaxValue)
}
} }
} }

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
object ReactiveStreamsConstants {
final val CanNotSubscribeTheSameSubscriberMultipleTimes =
"can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)"
final val SupportsOnlyASingleSubscriber =
"only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.12)"
final val NumberOfElementsInRequestMustBePositiveMsg =
"The number of requested elements must be > 0 (see reactive-streams specification, rule 3.9)"
final val TotalPendingDemandMustNotExceedLongMaxValue =
"Total pending demand MUST NOT be > `java.lang.Long.MAX_VALUE` (see reactive-streams specification, rule 3.17)"
}

View file

@ -4,6 +4,7 @@
package akka.stream.actor package akka.stream.actor
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import akka.stream.ReactiveStreamsConstants
import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import akka.actor.AbstractActor import akka.actor.AbstractActor
import akka.actor.Actor import akka.actor.Actor
@ -47,7 +48,7 @@ object ActorPublisherMessage {
* more elements. * more elements.
* @param n number of requested elements * @param n number of requested elements
*/ */
@SerialVersionUID(1L) case class Request(n: Int) extends ActorPublisherMessage @SerialVersionUID(1L) case class Request(n: Long) extends ActorPublisherMessage
/** /**
* This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the
@ -122,11 +123,7 @@ trait ActorPublisher[T] extends Actor {
* This actor automatically keeps tracks of this amount based on * This actor automatically keeps tracks of this amount based on
* incoming request messages and outgoing `onNext`. * incoming request messages and outgoing `onNext`.
*/ */
final def totalDemand: Int = longToIntMax(demand) final def totalDemand: Long = demand
private def longToIntMax(n: Long): Int =
if (n > Int.MaxValue) Int.MaxValue
else n.toInt
/** /**
* The terminal state after calling [[#onComplete]]. It is not allowed to * The terminal state after calling [[#onComplete]]. It is not allowed to
@ -210,7 +207,7 @@ trait ActorPublisher[T] extends Actor {
demand += n demand += n
super.aroundReceive(receive, msg) super.aroundReceive(receive, msg)
case Subscribe(sub) case Subscribe(sub: Subscriber[_])
lifecycleState match { lifecycleState match {
case PreSubscriber case PreSubscriber
subscriber = sub subscriber = sub
@ -219,12 +216,16 @@ trait ActorPublisher[T] extends Actor {
case ErrorEmitted(cause) sub.onError(cause) case ErrorEmitted(cause) sub.onError(cause)
case Completed sub.onComplete() case Completed sub.onComplete()
case Active | Canceled case Active | Canceled
sub.onError(new IllegalStateException(s"ActorPublisher [$self] can only have one subscriber")) if (subscriber == sub)
sub.onError(new IllegalStateException(s"ActorPublisher [$self, sub: $sub] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
else
sub.onError(new IllegalStateException(s"ActorPublisher [$self] ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}"))
} }
case Cancel case Cancel
lifecycleState = Canceled lifecycleState = Canceled
demand = 0 demand = 0
subscriber = null
super.aroundReceive(receive, msg) super.aroundReceive(receive, msg)
case _ case _
@ -272,7 +273,7 @@ private[akka] case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[
import ActorPublisher._ import ActorPublisher._
import ActorPublisher.Internal._ import ActorPublisher.Internal._
override def subscribe(sub: Subscriber[T]): Unit = override def subscribe(sub: Subscriber[_ >: T]): Unit =
ref ! Subscribe(sub.asInstanceOf[Subscriber[Any]]) ref ! Subscribe(sub.asInstanceOf[Subscriber[Any]])
} }
@ -283,8 +284,8 @@ private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscri
import ActorPublisher._ import ActorPublisher._
import ActorPublisherMessage._ import ActorPublisherMessage._
override def request(n: Int): Unit = override def request(n: Long): Unit =
if (n <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") if (n <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
else ref ! Request(n) else ref ! Request(n)
override def cancel(): Unit = ref ! Cancel override def cancel(): Unit = ref ! Cancel
} }

View file

@ -169,7 +169,7 @@ trait ActorSubscriber extends Actor {
private val state = ActorSubscriberState(context.system) private val state = ActorSubscriberState(context.system)
private var subscription: Option[Subscription] = None private var subscription: Option[Subscription] = None
private var requested = 0L private var requested: Long = 0
private var canceled = false private var canceled = false
protected def requestStrategy: RequestStrategy protected def requestStrategy: RequestStrategy
@ -244,7 +244,7 @@ trait ActorSubscriber extends Actor {
/** /**
* Request a number of elements from upstream. * Request a number of elements from upstream.
*/ */
protected def request(elements: Int): Unit = protected def request(elements: Long): Unit =
if (elements > 0 && !canceled) { if (elements > 0 && !canceled) {
// if we don't have a subscription yet, it will be requested when it arrives // if we don't have a subscription yet, it will be requested when it arrives
subscription.foreach(_.request(elements)) subscription.foreach(_.request(elements))

View file

@ -3,6 +3,7 @@
*/ */
package akka.stream.impl package akka.stream.impl
import akka.stream.ReactiveStreamsConstants
import org.reactivestreams.{ Publisher, Subscriber, Subscription, Processor } import org.reactivestreams.{ Publisher, Subscriber, Subscription, Processor }
import akka.actor._ import akka.actor._
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
@ -142,9 +143,10 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
} }
protected def upstreamRunning: Actor.Receive = { protected def upstreamRunning: Actor.Receive = {
case OnNext(element) enqueueInputElement(element) case OnNext(element) enqueueInputElement(element)
case OnComplete onComplete() case OnComplete onComplete()
case OnError(cause) onError(cause) case OnError(cause) onError(cause)
case OnSubscribe(subscription) subscription.cancel() // spec rule 2.5
} }
protected def completed: Actor.Receive = { protected def completed: Actor.Receive = {
@ -161,6 +163,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump)
* INTERNAL API * INTERNAL API
*/ */
private[akka] class SimpleOutputs(self: ActorRef, val pump: Pump) extends DefaultOutputTransferStates { private[akka] class SimpleOutputs(self: ActorRef, val pump: Pump) extends DefaultOutputTransferStates {
protected var exposedPublisher: ActorPublisher[Any] = _ protected var exposedPublisher: ActorPublisher[Any] = _
protected var subscriber: Subscriber[Any] = _ protected var subscriber: Subscriber[Any] = _
@ -200,7 +203,7 @@ private[akka] class SimpleOutputs(self: ActorRef, val pump: Pump) extends Defaul
if (subscriber eq null) { if (subscriber eq null) {
subscriber = sub subscriber = sub
subscriber.onSubscribe(new ActorSubscription(self, subscriber)) subscriber.onSubscribe(new ActorSubscription(self, subscriber))
} else sub.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) } else sub.onError(new IllegalStateException(s"${getClass.getSimpleName} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}"))
} }
protected def waitingExposedPublisher: Actor.Receive = { protected def waitingExposedPublisher: Actor.Receive = {
@ -215,7 +218,16 @@ private[akka] class SimpleOutputs(self: ActorRef, val pump: Pump) extends Defaul
case SubscribePending case SubscribePending
subscribePending(exposedPublisher.takePendingSubscribers()) subscribePending(exposedPublisher.takePendingSubscribers())
case RequestMore(subscription, elements) case RequestMore(subscription, elements)
// TODO centralize overflow protection
downstreamDemand += elements downstreamDemand += elements
if (downstreamDemand < 0) {
// Long has overflown
val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue)
subscriber.onError(demandOverflowException)
cancel(demandOverflowException)
}
pump.pump() pump.pump()
case Cancel(subscription) case Cancel(subscription)
downstreamCompleted = true downstreamCompleted = true

View file

@ -4,12 +4,13 @@
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import akka.stream.{ MaterializerSettings } import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
import org.reactivestreams.{ Publisher, Subscriber } import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.control.{ NoStackTrace, NonFatal }
/** /**
@ -54,10 +55,10 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt
// SubscribePending message. The AtomicReference is set to null by the shutdown method, which is // SubscribePending message. The AtomicReference is set to null by the shutdown method, which is
// called by the actor from postStop. Pending (unregistered) subscription attempts are denied by // called by the actor from postStop. Pending (unregistered) subscription attempts are denied by
// the shutdown method. Subscription attempts after shutdown can be denied immediately. // the shutdown method. Subscription attempts after shutdown can be denied immediately.
private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[T]]](Nil) private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[_ >: T]]](Nil)
override def subscribe(subscriber: Subscriber[T]): Unit = { override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
@tailrec def doSubscribe(subscriber: Subscriber[T]): Unit = { @tailrec def doSubscribe(subscriber: Subscriber[_ >: T]): Unit = {
val current = pendingSubscribers.get val current = pendingSubscribers.get
if (current eq null) if (current eq null)
reportSubscribeError(subscriber) reportSubscribeError(subscriber)
@ -72,7 +73,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt
doSubscribe(subscriber) doSubscribe(subscriber)
} }
def takePendingSubscribers(): immutable.Seq[Subscriber[T]] = { def takePendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] = {
val pending = pendingSubscribers.getAndSet(Nil) val pending = pendingSubscribers.getAndSet(Nil)
assert(pending ne null, "takePendingSubscribers must not be called after shutdown") assert(pending ne null, "takePendingSubscribers must not be called after shutdown")
pending.reverse pending.reverse
@ -88,7 +89,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt
@volatile private var shutdownReason: Option[Throwable] = None @volatile private var shutdownReason: Option[Throwable] = None
private def reportSubscribeError(subscriber: Subscriber[T]): Unit = private def reportSubscribeError(subscriber: Subscriber[_ >: T]): Unit =
shutdownReason match { shutdownReason match {
case Some(e) subscriber.onError(e) case Some(e) subscriber.onError(e)
case None subscriber.onComplete() case None subscriber.onComplete()
@ -108,9 +109,9 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[T]) extends SubscriptionWithCursor[T] { private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends SubscriptionWithCursor[T] {
override def request(elements: Int): Unit = override def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
else impl ! RequestMore(this, elements) else impl ! RequestMore(this, elements)
override def cancel(): Unit = impl ! Cancel(this) override def cancel(): Unit = impl ! Cancel(this)
} }
@ -149,7 +150,6 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi
with SubscriberManagement[T] with SubscriberManagement[T]
with SoftShutdown { with SoftShutdown {
import akka.stream.impl.ActorBasedFlowMaterializer._
import akka.stream.impl.SimpleCallbackPublisherImpl._ import akka.stream.impl.SimpleCallbackPublisherImpl._
type S = ActorSubscription[T] type S = ActorSubscription[T]
@ -184,7 +184,7 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi
override def postStop(): Unit = override def postStop(): Unit =
if (pub ne null) pub.shutdown(shutdownReason) if (pub ne null) pub.shutdown(shutdownReason)
private var demand = 0 private var demand = 0L
private def generate(): Unit = { private def generate(): Unit = {
if (demand > 0) { if (demand > 0) {
try { try {
@ -201,10 +201,10 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi
override def initialBufferSize = settings.initialFanOutBufferSize override def initialBufferSize = settings.initialFanOutBufferSize
override def maxBufferSize = settings.maxFanOutBufferSize override def maxBufferSize = settings.maxFanOutBufferSize
override def createSubscription(subscriber: Subscriber[T]): ActorSubscription[T] = override def createSubscription(subscriber: Subscriber[_ >: T]): ActorSubscription[T] =
new ActorSubscription(self, subscriber) new ActorSubscription(self, subscriber)
override def requestFromUpstream(elements: Int): Unit = demand += elements override def requestFromUpstream(elements: Long): Unit = demand += elements
override def cancelUpstream(): Unit = { override def cancelUpstream(): Unit = {
pub.shutdown(shutdownReason) pub.shutdown(shutdownReason)

View file

@ -3,6 +3,8 @@
*/ */
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
/** /**
@ -12,13 +14,13 @@ import org.reactivestreams.{ Subscriber, Subscription }
private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscriber[T] { private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscriber[T] {
private val lowWatermark = Math.max(1, highWatermark / 2) private val lowWatermark = Math.max(1, highWatermark / 2)
private var requested = 0 private var requested = 0L
private var subscription: Subscription = _ private val subscription: AtomicReference[Subscription] = new AtomicReference(null)
override def onSubscribe(sub: Subscription): Unit = { override def onSubscribe(sub: Subscription): Unit = {
subscription = sub if (subscription.compareAndSet(null, sub)) requestMore()
requestMore() else sub.cancel()
} }
override def onError(cause: Throwable): Unit = () override def onError(cause: Throwable): Unit = ()
@ -30,10 +32,10 @@ private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscribe
requestMore() requestMore()
} }
private def requestMore(): Unit = protected def requestMore(): Unit =
if (requested < lowWatermark) { if (requested < lowWatermark) {
val amount = highWatermark - requested val amount = highWatermark - requested
subscription.request(amount) subscription.get().request(amount)
requested += amount requested += amount
} }

View file

@ -9,7 +9,7 @@ import org.reactivestreams.{ Subscriber, Publisher }
* INTERNAL API * INTERNAL API
*/ */
private[akka] case object EmptyPublisher extends Publisher[Nothing] { private[akka] case object EmptyPublisher extends Publisher[Nothing] {
def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onComplete() def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = subscriber.onComplete()
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
} }
@ -17,6 +17,6 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] {
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class ErrorPublisher(t: Throwable) extends Publisher[Nothing] { private[akka] case class ErrorPublisher(t: Throwable) extends Publisher[Nothing] {
def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onError(t) def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = subscriber.onError(t)
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
} }

View file

@ -11,12 +11,13 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
extends DefaultOutputTransferStates extends DefaultOutputTransferStates
with SubscriberManagement[Any] { with SubscriberManagement[Any] {
override type S = ActorSubscription[Any] override type S = ActorSubscription[_ >: Any]
override def createSubscription(subscriber: Subscriber[Any]): S = override def createSubscription(subscriber: Subscriber[_ >: Any]): S =
new ActorSubscription(self, subscriber) new ActorSubscription(self, subscriber)
protected var exposedPublisher: ActorPublisher[Any] = _ protected var exposedPublisher: ActorPublisher[Any] = _
private var downstreamBufferSpace = 0 private var downstreamBufferSpace: Long = 0L
private var downstreamCompleted = false private var downstreamCompleted = false
override def demandAvailable = downstreamBufferSpace > 0 override def demandAvailable = downstreamBufferSpace > 0
override def demandCount: Long = downstreamBufferSpace override def demandCount: Long = downstreamBufferSpace
@ -46,7 +47,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
def afterShutdown(): Unit def afterShutdown(): Unit
override protected def requestFromUpstream(elements: Int): Unit = downstreamBufferSpace += elements override protected def requestFromUpstream(elements: Long): Unit = downstreamBufferSpace += elements
private def subscribePending(): Unit = private def subscribePending(): Unit =
exposedPublisher.takePendingSubscribers() foreach registerSubscriber exposedPublisher.takePendingSubscribers() foreach registerSubscriber

View file

@ -3,14 +3,21 @@
*/ */
package akka.stream.impl package akka.stream.impl
import akka.actor.{ Actor, ActorRef, Props, Status, SupervisorStrategy } import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Status
import akka.actor.SupervisorStrategy
import akka.pattern.pipe import akka.pattern.pipe
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import org.reactivestreams.{ Subscriber, Subscription } import akka.stream.ReactiveStreamsConstants
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.Duration import scala.util.Failure
import scala.util.{ Failure, Success, Try } import scala.util.Success
import scala.util.Try
/** /**
* INTERNAL API * INTERNAL API
@ -27,8 +34,8 @@ private[akka] object FuturePublisher {
class FutureSubscription(ref: ActorRef) extends Subscription { class FutureSubscription(ref: ActorRef) extends Subscription {
import akka.stream.impl.FuturePublisher.FutureSubscription._ import akka.stream.impl.FuturePublisher.FutureSubscription._
def cancel(): Unit = ref ! Cancel(this) def cancel(): Unit = ref ! Cancel(this)
def request(elements: Int): Unit = def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
else ref ! RequestMore(this) else ref ! RequestMore(this)
override def toString = "FutureSubscription" override def toString = "FutureSubscription"
} }
@ -39,7 +46,8 @@ private[akka] object FuturePublisher {
*/ */
private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown {
import akka.stream.impl.FuturePublisher.FutureSubscription import akka.stream.impl.FuturePublisher.FutureSubscription
import akka.stream.impl.FuturePublisher.FutureSubscription.{ Cancel, RequestMore } import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel
import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore
var exposedPublisher: ActorPublisher[Any] = _ var exposedPublisher: ActorPublisher[Any] = _
var subscribers = Map.empty[Subscriber[Any], FutureSubscription] var subscribers = Map.empty[Subscriber[Any], FutureSubscription]
@ -98,7 +106,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS
def registerSubscriber(subscriber: Subscriber[Any]): Unit = { def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (subscribers.contains(subscriber)) if (subscribers.contains(subscriber))
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
else { else {
val subscription = new FutureSubscription(self) val subscription = new FutureSubscription(self)
subscribers = subscribers.updated(subscriber, subscription) subscribers = subscribers.updated(subscriber, subscription)

View file

@ -4,12 +4,11 @@
package akka.stream.impl package akka.stream.impl
import akka.actor.{ Actor, ActorRef, Props, SupervisorStrategy, Terminated } import akka.actor.{ Actor, ActorRef, Props, SupervisorStrategy, Terminated }
import akka.stream.MaterializerSettings import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants }
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**
@ -21,15 +20,15 @@ private[akka] object IterablePublisher {
object BasicActorSubscription { object BasicActorSubscription {
case object Cancel case object Cancel
case class RequestMore(elements: Int) case class RequestMore(elements: Long)
} }
class BasicActorSubscription(ref: ActorRef) class BasicActorSubscription(ref: ActorRef)
extends Subscription { extends Subscription {
import akka.stream.impl.IterablePublisher.BasicActorSubscription._ import akka.stream.impl.IterablePublisher.BasicActorSubscription._
def cancel(): Unit = ref ! Cancel def cancel(): Unit = ref ! Cancel
def request(elements: Int): Unit = def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
else ref ! RequestMore(elements) else ref ! RequestMore(elements)
override def toString = "BasicActorSubscription" override def toString = "BasicActorSubscription"
} }
@ -43,7 +42,6 @@ private[akka] object IterablePublisher {
* beginning of the iterable and it can consume the elements in its own pace. * beginning of the iterable and it can consume the elements in its own pace.
*/ */
private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], settings: MaterializerSettings) extends Actor with SoftShutdown {
import akka.stream.impl.ActorBasedFlowMaterializer._
import akka.stream.impl.IterablePublisher.BasicActorSubscription import akka.stream.impl.IterablePublisher.BasicActorSubscription
require(iterable.nonEmpty, "Use EmptyPublisher for empty iterable") require(iterable.nonEmpty, "Use EmptyPublisher for empty iterable")
@ -91,7 +89,7 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting
def registerSubscriber(subscriber: Subscriber[Any]): Unit = { def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (subscribers(subscriber)) if (subscribers(subscriber))
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
else { else {
val iterator = iterable.iterator val iterator = iterable.iterator
val worker = context.watch(context.actorOf(IterablePublisherWorker.props(iterator, subscriber, val worker = context.watch(context.actorOf(IterablePublisherWorker.props(iterator, subscriber,
@ -130,17 +128,16 @@ private[akka] object IterablePublisherWorker {
*/ */
private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int) private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int)
extends Actor with SoftShutdown { extends Actor with SoftShutdown {
import akka.stream.impl.ActorBasedFlowMaterializer._
import akka.stream.impl.IterablePublisher.BasicActorSubscription._ import akka.stream.impl.IterablePublisher.BasicActorSubscription._
import akka.stream.impl.IterablePublisherWorker._ import akka.stream.impl.IterablePublisherWorker._
require(iterator.hasNext, "Iterator must not be empty") require(iterator.hasNext, "Iterator must not be empty")
var demand = 0L var pendingDemand: Long = 0L
def receive = { def receive = {
case RequestMore(elements) case RequestMore(elements)
demand += elements pendingDemand += elements
push() push()
case PushMore case PushMore
push() push()
@ -151,8 +148,8 @@ private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber:
private def push(): Unit = { private def push(): Unit = {
@tailrec def doPush(n: Int): Unit = @tailrec def doPush(n: Int): Unit =
if (demand > 0) { if (pendingDemand > 0) {
demand -= 1 pendingDemand -= 1
val hasNext = { val hasNext = {
subscriber.onNext(iterator.next()) subscriber.onNext(iterator.next())
iterator.hasNext iterator.hasNext
@ -161,7 +158,7 @@ private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber:
subscriber.onComplete() subscriber.onComplete()
context.parent ! Finished context.parent ! Finished
softShutdown() softShutdown()
} else if (n == 0 && demand > 0) } else if (n == 0 && pendingDemand > 0)
self ! PushMore self ! PushMore
else else
doPush(n - 1) doPush(n - 1)

View file

@ -12,7 +12,7 @@ private[akka] case object SubscribePending
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class RequestMore(subscription: ActorSubscription[_], demand: Int) private[akka] case class RequestMore(subscription: ActorSubscription[_], demand: Long)
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -4,7 +4,7 @@
package akka.stream.impl package akka.stream.impl
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import org.reactivestreams.{ Subscriber, Subscription, Publisher } import org.reactivestreams.Subscriber
/** /**
* INTERNAL API * INTERNAL API
@ -15,7 +15,7 @@ private[akka] class BroadcastImpl(_settings: MaterializerSettings, other: Subscr
override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) { override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) {
var secondarySubscribed = false var secondarySubscribed = false
override def registerSubscriber(subscriber: Subscriber[Any]): Unit = { override def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = {
if (!secondarySubscribed) { if (!secondarySubscribed) {
super.registerSubscriber(other) super.registerSubscriber(other)
secondarySubscribed = true secondarySubscribed = true

View file

@ -4,7 +4,7 @@
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
import akka.actor.{ Actor, ActorRef } import akka.actor.{ Actor, ActorRef }
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
import org.reactivestreams.{ Publisher, Subscriber, Subscription } import org.reactivestreams.{ Publisher, Subscriber, Subscription }
@ -15,13 +15,13 @@ import scala.collection.mutable
*/ */
private[akka] object MultiStreamOutputProcessor { private[akka] object MultiStreamOutputProcessor {
case class SubstreamKey(id: Long) case class SubstreamKey(id: Long)
case class SubstreamRequestMore(substream: SubstreamKey, demand: Int) case class SubstreamRequestMore(substream: SubstreamKey, demand: Long)
case class SubstreamCancel(substream: SubstreamKey) case class SubstreamCancel(substream: SubstreamKey)
case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any])
class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
override def request(elements: Int): Unit = override def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
else parent ! SubstreamRequestMore(substreamKey, elements) else parent ! SubstreamRequestMore(substreamKey, elements)
override def cancel(): Unit = parent ! SubstreamCancel(substreamKey) override def cancel(): Unit = parent ! SubstreamCancel(substreamKey)
override def toString = "SubstreamSubscription" + System.identityHashCode(this) override def toString = "SubstreamSubscription" + System.identityHashCode(this)
@ -46,7 +46,7 @@ private[akka] object MultiStreamOutputProcessor {
override def subreceive: SubReceive = override def subreceive: SubReceive =
throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block") throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block")
def enqueueOutputDemand(demand: Int): Unit = { def enqueueOutputDemand(demand: Long): Unit = {
downstreamDemand += demand downstreamDemand += demand
pump.pump() pump.pump()
} }
@ -78,7 +78,7 @@ private[akka] object MultiStreamOutputProcessor {
case Failed(e) s.onError(e) case Failed(e) s.onError(e)
} }
override def subscribe(s: Subscriber[Any]): Unit = { override def subscribe(s: Subscriber[_ >: Any]): Unit = {
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else { else {
state.get() match { state.get() match {

View file

@ -3,6 +3,9 @@
*/ */
package akka.stream.impl package akka.stream.impl
import akka.stream.ReactiveStreamsConstants
import scala.annotation.switch
import scala.annotation.tailrec import scala.annotation.tailrec
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import SubscriberManagement.ShutDown import SubscriberManagement.ShutDown
@ -29,19 +32,32 @@ private[akka] object SubscriberManagement {
def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onError(cause) def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onError(cause)
} }
val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down Publisher"))
} }
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] trait SubscriptionWithCursor[T] extends Subscription with ResizableMultiReaderRingBuffer.Cursor { private[akka] trait SubscriptionWithCursor[T] extends Subscription with ResizableMultiReaderRingBuffer.Cursor {
def subscriber: Subscriber[T] def subscriber: Subscriber[_ >: T]
def dispatch(element: T): Unit = subscriber.onNext(element) def dispatch(element: T): Unit = subscriber.onNext(element)
/** Increases the `requested` counter, additionally providing overflow protection */
def moreRequested(demand: Long): Long = {
val sum = totalDemand + demand
val noOverflow = sum > 0
if (noOverflow) totalDemand = sum
else subscriber.onError(new IllegalStateException(s"Total pending demand ($totalDemand + $demand) would overflow `Long`, for Subscriber $subscriber! ${ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue}"))
sum
}
var active = true var active = true
var requested: Long = 0 // number of requested but not yet dispatched elements
/** Do not increment directly, use `moreRequested(Long)` instead (it provides overflow protection)! */
var totalDemand: Long = 0 // number of requested but not yet dispatched elements
var cursor: Int = 0 // buffer cursor, managed by buffer var cursor: Int = 0 // buffer cursor, managed by buffer
} }
@ -60,7 +76,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
* called when we are ready to consume more elements from our upstream * called when we are ready to consume more elements from our upstream
* MUST NOT call pushToDownstream * MUST NOT call pushToDownstream
*/ */
protected def requestFromUpstream(elements: Int): Unit protected def requestFromUpstream(elements: Long): Unit
/** /**
* called before `shutdown()` if the stream is *not* being regularly completed * called before `shutdown()` if the stream is *not* being regularly completed
@ -76,7 +92,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
/** /**
* Use to register a subscriber * Use to register a subscriber
*/ */
protected def createSubscription(subscriber: Subscriber[T]): S protected def createSubscription(subscriber: Subscriber[_ >: T]): S
private[this] val buffer = new ResizableMultiReaderRingBuffer[T](initialBufferSize, maxBufferSize, this) private[this] val buffer = new ResizableMultiReaderRingBuffer[T](initialBufferSize, maxBufferSize, this)
@ -96,9 +112,8 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
/** /**
* more demand was signaled from a given subscriber * more demand was signaled from a given subscriber
*/ */
protected def moreRequested(subscription: S, elements: Int): Unit = protected def moreRequested(subscription: S, elements: Long): Unit =
if (subscription.active) { if (subscription.active) {
// returns Long.MinValue if the subscription is to be terminated // returns Long.MinValue if the subscription is to be terminated
@tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long = @tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long =
if (requested == 0) { if (requested == 0) {
@ -112,23 +127,23 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
endOfStream match { endOfStream match {
case eos @ (NotReached | Completed) case eos @ (NotReached | Completed)
val demand = subscription.requested + elements val demand = subscription.moreRequested(elements)
dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { dispatchFromBufferAndReturnRemainingRequested(demand, eos) match {
case Long.MinValue case Long.MinValue
eos(subscription.subscriber) eos(subscription.subscriber)
unregisterSubscriptionInternal(subscription) unregisterSubscriptionInternal(subscription)
case x case x
subscription.requested = x subscription.totalDemand = x
requestFromUpstreamIfRequired() requestFromUpstreamIfRequired()
} }
case ErrorCompleted(_) // ignore, the spi.Subscriber might not have seen our error event yet case ErrorCompleted(_) // ignore, the Subscriber might not have seen our error event yet
} }
} }
private[this] final def requestFromUpstreamIfRequired(): Unit = { private[this] final def requestFromUpstreamIfRequired(): Unit = {
@tailrec def maxRequested(remaining: Subscriptions, result: Long = 0): Long = @tailrec def maxRequested(remaining: Subscriptions, result: Long = 0): Long =
remaining match { remaining match {
case head :: tail maxRequested(tail, math.max(head.requested, result)) case head :: tail maxRequested(tail, math.max(head.totalDemand, result))
case _ result case _ result
} }
val desired = Math.min(Int.MaxValue, Math.min(maxRequested(subscriptions), buffer.maxAvailable) - pendingFromUpstream).toInt val desired = Math.min(Int.MaxValue, Math.min(maxRequested(subscriptions), buffer.maxAvailable) - pendingFromUpstream).toInt
@ -145,10 +160,10 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
@tailrec def dispatch(remaining: Subscriptions, sent: Boolean = false): Boolean = @tailrec def dispatch(remaining: Subscriptions, sent: Boolean = false): Boolean =
remaining match { remaining match {
case head :: tail case head :: tail
if (head.requested > 0) { if (head.totalDemand > 0) {
val element = buffer.read(head) val element = buffer.read(head)
head.dispatch(element) head.dispatch(element)
head.requested -= 1 head.totalDemand -= 1
dispatch(tail, true) dispatch(tail, true)
} else dispatch(tail, sent) } else dispatch(tail, sent)
case _ sent case _ sent
@ -198,9 +213,9 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff
/** /**
* Register a new subscriber. * Register a new subscriber.
*/ */
protected def registerSubscriber(subscriber: Subscriber[T]): Unit = endOfStream match { protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match {
case NotReached if subscriptions.exists(_.subscriber eq subscriber) case NotReached if subscriptions.exists(_.subscriber eq subscriber)
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [${this}, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
case NotReached case NotReached
val newSubscription = createSubscription(subscriber) val newSubscription = createSubscription(subscriber)
subscriptions ::= newSubscription subscriptions ::= newSubscription

View file

@ -19,31 +19,31 @@ private[akka] object SynchronousPublisherFromIterable {
private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription { private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription {
var done = false var done = false
var demand = 0 var pendingDemand = 0L
var pushing = false var pushing = false
override def cancel(): Unit = override def cancel(): Unit =
done = true done = true
override def request(elements: Int): Unit = { override def request(elements: Long): Unit = {
@tailrec def pushNext(): Unit = { @tailrec def pushNext(): Unit = {
if (!done) if (!done)
if (iterator.isEmpty) { if (iterator.isEmpty) {
done = true done = true
subscriber.onComplete() subscriber.onComplete()
} else if (demand != 0) { } else if (pendingDemand != 0) {
demand -= 1 pendingDemand -= 1
subscriber.onNext(iterator.next()) subscriber.onNext(iterator.next())
pushNext() pushNext()
} }
} }
if (pushing) if (pushing)
demand += elements // reentrant call to requestMore from onNext pendingDemand += elements // reentrant call to requestMore from onNext
else { else {
try { try {
pushing = true pushing = true
demand = elements pendingDemand = elements
pushNext() pushNext()
} catch { } catch {
case NonFatal(e) case NonFatal(e)
@ -73,7 +73,7 @@ private[akka] class SynchronousPublisherFromIterable[T](private val iterable: im
import akka.stream.impl.SynchronousPublisherFromIterable.IteratorSubscription import akka.stream.impl.SynchronousPublisherFromIterable.IteratorSubscription
override def subscribe(subscriber: Subscriber[T]): Unit = override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
subscriber.onSubscribe(new IteratorSubscription(subscriber, iterable.iterator)) subscriber.onSubscribe(new IteratorSubscription(subscriber, iterable.iterator))
override def equals(o: Any): Boolean = o match { override def equals(o: Any): Boolean = o match {

View file

@ -4,11 +4,11 @@
package akka.stream.impl package akka.stream.impl
import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy } import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy }
import akka.stream.MaterializerSettings import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants }
import org.reactivestreams.{ Subscriber, Subscription } import org.reactivestreams.{ Subscriber, Subscription }
import scala.collection.mutable import scala.collection.mutable
import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal import scala.util.control.NonFatal
/** /**
@ -19,15 +19,15 @@ private[akka] object TickPublisher {
Props(new TickPublisher(initialDelay, interval, tick, settings)).withDispatcher(settings.dispatcher) Props(new TickPublisher(initialDelay, interval, tick, settings)).withDispatcher(settings.dispatcher)
object TickPublisherSubscription { object TickPublisherSubscription {
case class Cancel(subscriber: Subscriber[Any]) case class Cancel(subscriber: Subscriber[_ >: Any])
case class RequestMore(elements: Int, subscriber: Subscriber[Any]) case class RequestMore(elements: Long, subscriber: Subscriber[_ >: Any])
} }
class TickPublisherSubscription(ref: ActorRef, subscriber: Subscriber[Any]) extends Subscription { class TickPublisherSubscription(ref: ActorRef, subscriber: Subscriber[_ >: Any]) extends Subscription {
import akka.stream.impl.TickPublisher.TickPublisherSubscription._ import akka.stream.impl.TickPublisher.TickPublisherSubscription._
def cancel(): Unit = ref ! Cancel(subscriber) def cancel(): Unit = ref ! Cancel(subscriber)
def request(elements: Int): Unit = def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
else ref ! RequestMore(elements, subscriber) else ref ! RequestMore(elements, subscriber)
override def toString = "TickPublisherSubscription" override def toString = "TickPublisherSubscription"
} }
@ -47,7 +47,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
import akka.stream.impl.TickPublisher._ import akka.stream.impl.TickPublisher._
var exposedPublisher: ActorPublisher[Any] = _ var exposedPublisher: ActorPublisher[Any] = _
val demand = mutable.Map.empty[Subscriber[Any], Long] val demand = mutable.Map.empty[Subscriber[_ >: Any], Long]
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
@ -97,9 +97,9 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
} }
def registerSubscriber(subscriber: Subscriber[Any]): Unit = { def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = {
if (demand.contains(subscriber)) if (demand.contains(subscriber))
subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
else { else {
val subscription = new TickPublisherSubscription(self, subscriber) val subscription = new TickPublisherSubscription(self, subscriber)
demand(subscriber) = 0 demand(subscriber) = 0
@ -107,7 +107,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
} }
} }
private def unregisterSubscriber(subscriber: Subscriber[Any]): Unit = { private def unregisterSubscriber(subscriber: Subscriber[_ >: Any]): Unit = {
demand -= subscriber demand -= subscriber
if (demand.isEmpty) { if (demand.isEmpty) {
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)

View file

@ -378,7 +378,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
// FIXME remove when real materialization is done // FIXME remove when real materialization is done
def dummyProcessor(name: String): Processor[Any, Any] = new BlackholeSubscriber[Any](1) with Publisher[Any] with Processor[Any, Any] { def dummyProcessor(name: String): Processor[Any, Any] = new BlackholeSubscriber[Any](1) with Publisher[Any] with Processor[Any, Any] {
def subscribe(subscriber: Subscriber[Any]): Unit = subscriber.onComplete() def subscribe(subscriber: Subscriber[_ >: Any]): Unit = subscriber.onComplete()
override def toString = name override def toString = name
} }

View file

@ -1,14 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import akka.stream.scaladsl.Flow
import akka.actor.ActorSystem
import akka.stream.testkit.AkkaSpec
class ActorPublisherTest(_system: ActorSystem, /*env: TestEnvironment,*/ publisherShutdownTimeout: Long) {
// FIXME: Needs new TCK version
// Original code available in 82734877d080577cf538c2a47d60c117e078ac1c
}

View file

@ -20,19 +20,19 @@ class FlowDropWithinSpec extends AkkaSpec {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]()
Flow(p).dropWithin(1.second).produceTo(c) Flow(p).dropWithin(1.second).produceTo(c)
val pSub = p.expectSubscription val pSub = p.expectSubscription()
val cSub = c.expectSubscription val cSub = c.expectSubscription()
cSub.request(100) cSub.request(100)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest()
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1.toInt) foreach { _ pSub.sendNext(input.next()) }
val demand2 = pSub.expectRequest val demand2 = pSub.expectRequest()
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2.toInt) foreach { _ pSub.sendNext(input.next()) }
val demand3 = pSub.expectRequest val demand3 = pSub.expectRequest()
c.expectNoMsg(1500.millis) c.expectNoMsg(1500.millis)
(1 to demand3) foreach { _ pSub.sendNext(input.next()) } (1 to demand3.toInt) foreach { _ pSub.sendNext(input.next()) }
((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)) foreach { n c.expectNext(n) } ((demand1 + demand2 + 1).toInt to (demand1 + demand2 + demand3).toInt) foreach { n c.expectNext(n) }
pSub.sendComplete() pSub.sendComplete()
c.expectComplete c.expectComplete()
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
} }

View file

@ -23,7 +23,7 @@ class FlowGroupBySpec extends AkkaSpec {
p.subscribe(probe) p.subscribe(probe)
val subscription = probe.expectSubscription() val subscription = probe.expectSubscription()
def request(demand: Int): Unit = subscription.request(demand) def request(demand: Long): Unit = subscription.request(demand)
def expectNext(elem: Int): Unit = probe.expectNext(elem) def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete() def expectComplete(): Unit = probe.expectComplete()

View file

@ -25,35 +25,35 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(p).groupedWithin(1000, 1.second).produceTo(c) Flow(p).groupedWithin(1000, 1.second).produceTo(c)
val pSub = p.expectSubscription val pSub = p.expectSubscription()
val cSub = c.expectSubscription val cSub = c.expectSubscription()
cSub.request(100) cSub.request(100)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest()
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1.toInt) foreach { _ pSub.sendNext(input.next()) }
val demand2 = pSub.expectRequest val demand2 = pSub.expectRequest()
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2.toInt) foreach { _ pSub.sendNext(input.next()) }
val demand3 = pSub.expectRequest val demand3 = pSub.expectRequest()
c.expectNext((1 to (demand1 + demand2)).toVector) c.expectNext((1 to (demand1 + demand2).toInt).toVector)
(1 to demand3) foreach { _ pSub.sendNext(input.next()) } (1 to demand3.toInt) foreach { _ pSub.sendNext(input.next()) }
c.expectNoMsg(300.millis) c.expectNoMsg(300.millis)
c.expectNext(((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)).toVector) c.expectNext(((demand1 + demand2 + 1).toInt to (demand1 + demand2 + demand3).toInt).toVector)
c.expectNoMsg(300.millis) c.expectNoMsg(300.millis)
pSub.expectRequest pSub.expectRequest()
val last = input.next() val last = input.next()
pSub.sendNext(last) pSub.sendNext(last)
pSub.sendComplete() pSub.sendComplete()
c.expectNext(List(last)) c.expectNext(List(last))
c.expectComplete c.expectComplete()
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
} }
"deliver bufferd elements onComplete before the timeout" in { "deliver bufferd elements onComplete before the timeout" in {
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(1 to 3).groupedWithin(1000, 10.second).produceTo(c) Flow(1 to 3).groupedWithin(1000, 10.second).produceTo(c)
val cSub = c.expectSubscription val cSub = c.expectSubscription()
cSub.request(100) cSub.request(100)
c.expectNext((1 to 3).toList) c.expectNext((1 to 3).toList)
c.expectComplete c.expectComplete()
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
} }
@ -62,19 +62,19 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(p).groupedWithin(1000, 1.second).produceTo(c) Flow(p).groupedWithin(1000, 1.second).produceTo(c)
val pSub = p.expectSubscription val pSub = p.expectSubscription()
val cSub = c.expectSubscription val cSub = c.expectSubscription()
cSub.request(1) cSub.request(1)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest()
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1.toInt) foreach { _ pSub.sendNext(input.next()) }
c.expectNext((1 to demand1).toVector) c.expectNext((1 to demand1.toInt).toVector)
val demand2 = pSub.expectRequest val demand2 = pSub.expectRequest()
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2.toInt) foreach { _ pSub.sendNext(input.next()) }
c.expectNoMsg(300.millis) c.expectNoMsg(300.millis)
cSub.request(1) cSub.request(1)
c.expectNext(((demand1 + 1) to (demand1 + demand2)).toVector) c.expectNext(((demand1 + 1).toInt to (demand1 + demand2).toInt).toVector)
pSub.sendComplete() pSub.sendComplete()
c.expectComplete c.expectComplete()
c.expectNoMsg(100.millis) c.expectNoMsg(100.millis)
} }
@ -82,10 +82,10 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(p).groupedWithin(1000, 500.millis).produceTo(c) Flow(p).groupedWithin(1000, 500.millis).produceTo(c)
val pSub = p.expectSubscription val pSub = p.expectSubscription()
val cSub = c.expectSubscription val cSub = c.expectSubscription()
cSub.request(2) cSub.request(2)
pSub.expectRequest pSub.expectRequest()
c.expectNoMsg(600.millis) c.expectNoMsg(600.millis)
pSub.sendNext(1) pSub.sendNext(1)
pSub.sendNext(2) pSub.sendNext(2)
@ -95,7 +95,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
cSub.request(3) cSub.request(3)
c.expectNoMsg(600.millis) c.expectNoMsg(600.millis)
pSub.sendComplete() pSub.sendComplete()
c.expectComplete c.expectComplete()
c.expectNoMsg(100.millis) c.expectNoMsg(100.millis)
} }
@ -104,13 +104,13 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]()
Flow(p).groupedWithin(3, 2.second).produceTo(c) Flow(p).groupedWithin(3, 2.second).produceTo(c)
val pSub = p.expectSubscription val pSub = p.expectSubscription()
val cSub = c.expectSubscription val cSub = c.expectSubscription()
cSub.request(4) cSub.request(4)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest()
demand1 should be(4) demand1 should be(4)
c.expectNoMsg(1000.millis) c.expectNoMsg(1000.millis)
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1.toInt) foreach { _ pSub.sendNext(input.next()) }
c.probe.within(1000.millis) { c.probe.within(1000.millis) {
c.expectNext((1 to 3).toVector) c.expectNext((1 to 3).toVector)
} }
@ -119,7 +119,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
c.expectNext(List(4)) c.expectNext(List(4))
} }
pSub.sendComplete() pSub.sendComplete()
c.expectComplete c.expectComplete()
c.expectNoMsg(100.millis) c.expectNoMsg(100.millis)
} }

View file

@ -23,7 +23,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
p.subscribe(probe) p.subscribe(probe)
val subscription = probe.expectSubscription() val subscription = probe.expectSubscription()
def request(demand: Int): Unit = subscription.request(demand) def request(demand: Long): Unit = subscription.request(demand)
def expectNext(elem: Int): Unit = probe.expectNext(elem) def expectNext(elem: Int): Unit = probe.expectNext(elem)
def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)
def expectComplete(): Unit = probe.expectComplete() def expectComplete(): Unit = probe.expectComplete()

View file

@ -42,4 +42,4 @@ class FlowTakeSpec extends AkkaSpec with ScriptedTest {
} }
} }

View file

@ -24,16 +24,16 @@ class FlowTakeWithinSpec extends AkkaSpec {
val cSub = c.expectSubscription() val cSub = c.expectSubscription()
cSub.request(100) cSub.request(100)
val demand1 = pSub.expectRequest() val demand1 = pSub.expectRequest()
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1.toInt) foreach { _ pSub.sendNext(input.next()) }
val demand2 = pSub.expectRequest() val demand2 = pSub.expectRequest()
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2.toInt) foreach { _ pSub.sendNext(input.next()) }
val demand3 = pSub.expectRequest() val demand3 = pSub.expectRequest()
val sentN = demand1 + demand2 val sentN = demand1.toInt + demand2.toInt
(1 to sentN) foreach { n c.expectNext(n) } (1 to sentN) foreach { n c.expectNext(n) }
within(2.seconds) { within(2.seconds) {
c.expectComplete() c.expectComplete()
} }
(1 to demand3) foreach { _ pSub.sendNext(input.next()) } (1 to demand3.toInt) foreach { _ pSub.sendNext(input.next()) }
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
} }

View file

@ -1,23 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
// FIXME: new TCK needed
//import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment, IdentityProcessorVerification }
import akka.actor.{ ActorSystem, Props }
import akka.stream.impl.ActorProcessor
import akka.stream.impl.TransformProcessorImpl
import akka.stream.impl.Ast
import akka.testkit.{ TestEvent, EventFilter }
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import java.util.concurrent.atomic.AtomicInteger
class IdentityProcessorTest(_system: ActorSystem, /*env: TestEnvironment,*/ publisherShutdownTimeout: Long) {
// FIXME: new TCK needed
// Original code available in 82734877d080577cf538c2a47d60c117e078ac1c
}

View file

@ -1,17 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
// FIXME: Needs new TCK version
//import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification }
import scala.collection.immutable
import akka.stream.scaladsl.Flow
import akka.actor.ActorSystem
import akka.stream.testkit.AkkaSpec
class IterableProducerTest(_system: ActorSystem, /*env: TestEnvironment, */ publisherShutdownTimeout: Long) {
// FIXME: Needs new TCK version
// Original code available in 82734877d080577cf538c2a47d60c117e078ac1c
}

View file

@ -1,16 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import org.scalatest.testng.TestNGSuiteLike
// FIXME: Needs new TCK version
// import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification }
import akka.stream.scaladsl.Flow
import akka.actor.ActorSystem
import akka.stream.testkit.AkkaSpec
class IteratorProducerTest(_system: ActorSystem, /*env: TestEnvironment, */ publisherShutdownTimeout: Long) {
// FIXME: Needs new TCK version
// Original code available in 82734877d080577cf538c2a47d60c117e078ac1c
}

View file

@ -1,16 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.actor.ActorSystem
// FIXME: TestNG dependency comes from TCK. Needs new TCK version
//import org.testng.annotations.AfterClass
trait WithActorSystem {
def system: ActorSystem
// FIXME: TestNG dependency comes from TCK. Needs new TCK version
// @AfterClass
// def shutdownActorSystem(): Unit = system.shutdown()
}

View file

@ -3,13 +3,19 @@
*/ */
package akka.stream.actor package akka.stream.actor
import akka.actor.{ ActorRef, PoisonPill, Props } import akka.actor.ActorRef
import akka.stream.{ MaterializerSettings, FlowMaterializer } import akka.actor.PoisonPill
import akka.actor.Props
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Flow
import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.EventFilter
import akka.testkit.ImplicitSender
import akka.testkit.TestEvent.Mute import akka.testkit.TestEvent.Mute
import akka.testkit.{ EventFilter, ImplicitSender, TestProbe } import akka.testkit.TestProbe
import scala.annotation.tailrec
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
@ -25,7 +31,7 @@ object ActorPublisherSpec {
case object Complete case object Complete
class TestPublisher(probe: ActorRef) extends ActorPublisher[String] { class TestPublisher(probe: ActorRef) extends ActorPublisher[String] {
import ActorPublisherMessage._ import akka.stream.actor.ActorPublisherMessage._
def receive = { def receive = {
case Request(element) probe ! TotalDemand(totalDemand) case Request(element) probe ! TotalDemand(totalDemand)
@ -39,7 +45,7 @@ object ActorPublisherSpec {
def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher") def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher")
class Sender extends ActorPublisher[Int] { class Sender extends ActorPublisher[Int] {
import ActorPublisherMessage._ import akka.stream.actor.ActorPublisherMessage._
var buf = Vector.empty[Int] var buf = Vector.empty[Int]
@ -57,11 +63,19 @@ object ActorPublisherSpec {
context.stop(self) context.stop(self)
} }
def deliverBuf(): Unit = @tailrec
final def deliverBuf(): Unit =
if (totalDemand > 0) { if (totalDemand > 0) {
val (use, keep) = buf.splitAt(totalDemand) if (totalDemand <= Int.MaxValue) {
buf = keep val (use, keep) = buf.splitAt(totalDemand.toInt)
use foreach onNext buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
} }
} }
@ -69,7 +83,7 @@ object ActorPublisherSpec {
Props(new Receiver(probe)).withDispatcher("akka.test.stream-dispatcher") Props(new Receiver(probe)).withDispatcher("akka.test.stream-dispatcher")
class Receiver(probe: ActorRef) extends ActorSubscriber { class Receiver(probe: ActorRef) extends ActorSubscriber {
import ActorSubscriberMessage._ import akka.stream.actor.ActorSubscriberMessage._
override val requestStrategy = WatermarkRequestStrategy(10) override val requestStrategy = WatermarkRequestStrategy(10)
@ -83,7 +97,7 @@ object ActorPublisherSpec {
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorPublisherSpec extends AkkaSpec with ImplicitSender { class ActorPublisherSpec extends AkkaSpec with ImplicitSender {
import ActorPublisherSpec._ import akka.stream.actor.ActorPublisherSpec._
system.eventStream.publish(Mute(EventFilter[IllegalStateException]())) system.eventStream.publish(Mute(EventFilter[IllegalStateException]()))

View file

@ -11,6 +11,7 @@ import scala.concurrent.duration._
class TcpFlowSpec extends AkkaSpec with TcpHelper { class TcpFlowSpec extends AkkaSpec with TcpHelper {
import akka.stream.io.TcpHelper._ import akka.stream.io.TcpHelper._
var demand = 0L
"Outgoing TCP stream" must { "Outgoing TCP stream" must {

View file

@ -161,7 +161,7 @@ trait TcpHelper { this: TestKitBase ⇒
val publisherProbe = StreamTestKit.PublisherProbe[ByteString]() val publisherProbe = StreamTestKit.PublisherProbe[ByteString]()
publisherProbe.subscribe(tcpProcessor) publisherProbe.subscribe(tcpProcessor)
val tcpWriteSubscription = publisherProbe.expectSubscription() val tcpWriteSubscription = publisherProbe.expectSubscription()
var demand = 0 var demand = 0L
def write(bytes: ByteString): Unit = { def write(bytes: ByteString): Unit = {
if (demand == 0) demand += tcpWriteSubscription.expectRequest() if (demand == 0) demand += tcpWriteSubscription.expectRequest()

View file

@ -23,13 +23,13 @@ class FlowDropWithinSpec extends AkkaSpec {
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(100) cSub.request(100)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1.toInt) foreach { _ pSub.sendNext(input.next()) }
val demand2 = pSub.expectRequest val demand2 = pSub.expectRequest
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2.toInt) foreach { _ pSub.sendNext(input.next()) }
val demand3 = pSub.expectRequest val demand3 = pSub.expectRequest
c.expectNoMsg(1500.millis) c.expectNoMsg(1500.millis)
(1 to demand3) foreach { _ pSub.sendNext(input.next()) } (1 to demand3.toInt) foreach { _ pSub.sendNext(input.next()) }
((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)) foreach { n c.expectNext(n) } ((demand1 + demand2 + 1).toInt to (demand1 + demand2 + demand3).toInt) foreach { n c.expectNext(n) }
pSub.sendComplete() pSub.sendComplete()
c.expectComplete c.expectComplete
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)

View file

@ -28,15 +28,15 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val pSub = p.expectSubscription val pSub = p.expectSubscription
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(100) cSub.request(100)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest.toInt
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1) foreach { _ pSub.sendNext(input.next()) }
val demand2 = pSub.expectRequest val demand2 = pSub.expectRequest.toInt
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2) foreach { _ pSub.sendNext(input.next()) }
val demand3 = pSub.expectRequest val demand3 = pSub.expectRequest.toInt
c.expectNext((1 to (demand1 + demand2)).toVector) c.expectNext((1 to (demand1 + demand2).toInt).toVector)
(1 to demand3) foreach { _ pSub.sendNext(input.next()) } (1 to demand3) foreach { _ pSub.sendNext(input.next()) }
c.expectNoMsg(300.millis) c.expectNoMsg(300.millis)
c.expectNext(((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)).toVector) c.expectNext(((demand1 + demand2 + 1).toInt to (demand1 + demand2 + demand3).toInt).toVector)
c.expectNoMsg(300.millis) c.expectNoMsg(300.millis)
pSub.expectRequest pSub.expectRequest
val last = input.next() val last = input.next()
@ -65,10 +65,10 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val pSub = p.expectSubscription val pSub = p.expectSubscription
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(1) cSub.request(1)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest.toInt
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1) foreach { _ pSub.sendNext(input.next()) }
c.expectNext((1 to demand1).toVector) c.expectNext((1 to demand1).toVector)
val demand2 = pSub.expectRequest val demand2 = pSub.expectRequest.toInt
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2) foreach { _ pSub.sendNext(input.next()) }
c.expectNoMsg(300.millis) c.expectNoMsg(300.millis)
cSub.request(1) cSub.request(1)
@ -107,7 +107,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest {
val pSub = p.expectSubscription val pSub = p.expectSubscription
val cSub = c.expectSubscription val cSub = c.expectSubscription
cSub.request(4) cSub.request(4)
val demand1 = pSub.expectRequest val demand1 = pSub.expectRequest.toInt
demand1 should be(4) demand1 should be(4)
c.expectNoMsg(1000.millis) c.expectNoMsg(1000.millis)
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1) foreach { _ pSub.sendNext(input.next()) }

View file

@ -22,11 +22,11 @@ class FlowTakeWithinSpec extends AkkaSpec {
val pSub = p.expectSubscription() val pSub = p.expectSubscription()
val cSub = c.expectSubscription() val cSub = c.expectSubscription()
cSub.request(100) cSub.request(100)
val demand1 = pSub.expectRequest() val demand1 = pSub.expectRequest().toInt
(1 to demand1) foreach { _ pSub.sendNext(input.next()) } (1 to demand1) foreach { _ pSub.sendNext(input.next()) }
val demand2 = pSub.expectRequest() val demand2 = pSub.expectRequest().toInt
(1 to demand2) foreach { _ pSub.sendNext(input.next()) } (1 to demand2) foreach { _ pSub.sendNext(input.next()) }
val demand3 = pSub.expectRequest() val demand3 = pSub.expectRequest().toInt
val sentN = demand1 + demand2 val sentN = demand1 + demand2
(1 to sentN) foreach { n c.expectNext(n) } (1 to sentN) foreach { n c.expectNext(n) }
within(2.seconds) { within(2.seconds) {

View file

@ -0,0 +1,48 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.actor.Props
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.tck.ActorPublisherTest.TestPublisher
import org.reactivestreams.Publisher
object ActorPublisherTest {
case object Produce
case object Loop
case object Complete
class TestPublisher(allElements: Long) extends ActorPublisher[Int] {
val source: Iterator[Int] = (if (allElements == Long.MaxValue) 1 to Int.MaxValue else 0 until allElements.toInt).toIterator
override def receive: Receive = {
case Request(elements)
loopDemand()
case Produce if totalDemand > 0 && !isCompleted && source.hasNext onNext(source.next())
case Produce if !isCompleted && !source.hasNext onComplete()
case Produce if isCompleted // no-op
case _ // no-op
}
def loopDemand() {
val loopUntil = math.min(100, totalDemand)
1 to loopUntil.toInt foreach { _ self ! Produce }
if (loopUntil > 100) self ! Loop
}
}
}
class ActorPublisherTest extends AkkaPublisherVerification[Int](true) {
override def createPublisher(elements: Long): Publisher[Int] = {
val ref = system.actorOf(Props(classOf[TestPublisher], elements).withDispatcher("akka.test.stream-dispatcher"))
ActorPublisher(ref)
}
}

View file

@ -0,0 +1,29 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.actor.Props
import akka.stream.actor.ActorSubscriber
import akka.stream.actor.OneByOneRequestStrategy
import akka.stream.actor.RequestStrategy
import org.reactivestreams.Subscriber
object ActorSubscriberOneByOneRequestTest {
class StrategySubscriber(val requestStrategy: RequestStrategy) extends ActorSubscriber {
override def receive: Receive = { case _ }
}
}
class ActorSubscriberOneByOneRequestTest extends AkkaSubscriberBlackboxVerification[Int] {
import ActorSubscriberOneByOneRequestTest._
override def createSubscriber(): Subscriber[Int] = {
val props = Props(classOf[StrategySubscriber], OneByOneRequestStrategy)
ActorSubscriber(system.actorOf(props.withDispatcher("akka.test.stream-dispatcher")))
}
override def createHelperPublisher(elements: Long) =
createSimpleIntPublisher(elements)
}

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.testkit.EventFilter
import akka.testkit.TestEvent
import org.reactivestreams.Publisher
import org.reactivestreams.tck.IdentityProcessorVerification
import org.reactivestreams.tck.TestEnvironment
import org.scalatest.testng.TestNGSuiteLike
import scala.collection.immutable
abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
extends IdentityProcessorVerification[T](env, publisherShutdownTimeout)
with TestNGSuiteLike {
system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception")))
/** Readable way to ignore TCK specs; Return this for `createErrorStatePublisher` to skip tests including it */
final def ignored: Publisher[T] = null
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis)
}
def this(printlnDebug: Boolean) {
this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug)
}
def this() {
this(false)
}
override def skipStochasticTests() = true // TODO maybe enable?
override def createErrorStatePublisher(): Publisher[T] =
StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!"))
def createSimpleIntPublisher(elements: Long)(implicit mat: FlowMaterializer): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == Long.MaxValue) 1 to Int.MaxValue
else 0 until elements.toInt
Flow(iterable).toPublisher()
}
/** By default Akka Publishers do not support Fanout! */
override def maxSupportedSubscribers: Long = 1L
}

View file

@ -0,0 +1,47 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import org.reactivestreams.Publisher
import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment }
import org.scalatest.testng.TestNGSuiteLike
import org.testng.annotations.AfterClass
import scala.concurrent.duration._
abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long)
extends PublisherVerification[T](env, publisherShutdownTimeout)
with TestNGSuiteLike {
/** Readable way to ignore TCK specs; Return this for `createErrorStatePublisher` to skip tests including it */
final def ignored: Publisher[T] = null
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis)
}
def this(printlnDebug: Boolean) {
this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug)
}
def this() {
this(false)
}
implicit val materializer = FlowMaterializer(MaterializerSettings(system).copy(maxInputBufferSize = 512))(system)
override def skipStochasticTests() = true // TODO maybe enable?
@AfterClass
def shutdownActorSystem(): Unit = {
system.shutdown()
system.awaitTermination(10.seconds)
}
override def createErrorStatePublisher(): Publisher[T] =
StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!"))
}

View file

@ -0,0 +1,75 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.Timeouts
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import org.reactivestreams.Publisher
import org.reactivestreams.tck.SubscriberBlackboxVerification
import org.reactivestreams.tck.SubscriberWhiteboxVerification
import org.reactivestreams.tck.TestEnvironment
import org.scalatest.testng.TestNGSuiteLike
import org.testng.annotations.AfterClass
import scala.collection.immutable
import scala.concurrent.duration._
abstract class AkkaSubscriberBlackboxVerification[T](val system: ActorSystem, env: TestEnvironment)
extends SubscriberBlackboxVerification[T](env) with TestNGSuiteLike
with AkkaSubscriberVerificationLike {
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug))
}
def this(printlnDebug: Boolean) {
this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug)
}
def this() {
this(false)
}
}
abstract class AkkaSubscriberWhiteboxVerification[T](val system: ActorSystem, env: TestEnvironment)
extends SubscriberWhiteboxVerification[T](env) with TestNGSuiteLike
with AkkaSubscriberVerificationLike {
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug))
}
def this(printlnDebug: Boolean) {
this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug)
}
def this() {
this(false)
}
}
trait AkkaSubscriberVerificationLike {
implicit def system: ActorSystem
implicit val materializer = FlowMaterializer(MaterializerSettings(system))
def createSimpleIntPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == Long.MaxValue) 1 to Int.MaxValue
else 0 until elements.toInt
Flow(iterable).toPublisher()
}
@AfterClass
def shutdownActorSystem(): Unit = {
system.shutdown()
system.awaitTermination(10.seconds)
}
}

View file

@ -0,0 +1,17 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.impl.BlackholeSubscriber
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
class BlackholeSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
override def createSubscriber(): Subscriber[Int] =
new BlackholeSubscriber[Int](2)
override def createHelperPublisher(elements: Long): Publisher[Int] = createSimpleIntPublisher(elements)
}

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import java.util.concurrent.atomic.AtomicInteger
import akka.stream._
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast
import org.reactivestreams.Processor
import org.reactivestreams.Publisher
class FanoutProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val processorCounter = new AtomicInteger
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
implicit val materializer = FlowMaterializer(settings)(system)
val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet()
val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode(
Ast.FanoutBox(initialBufferSize = maxBufferSize / 2, maxBufferSize), flowName, 1)
processor.asInstanceOf[Processor[Int, Int]]
}
override def createHelperPublisher(elements: Long): Publisher[Int] = {
implicit val mat = FlowMaterializer()(system)
createSimpleIntPublisher(elements)(mat)
}
/** The Fanout Processor actually supports fanout */
override def maxElementsFromPublisher = Long.MaxValue
}

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.Flow
import org.reactivestreams._
import scala.collection.immutable
class IterablePublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == Long.MaxValue)
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else
0 until elements.toInt
Flow(iterable).toPublisher()
}
}

View file

@ -0,0 +1,21 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.Flow
import org.reactivestreams.Publisher
import scala.collection.immutable
class IteratorPublisherTest extends AkkaPublisherVerification[Int](true) {
def createPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else 0 until elements.toInt
Flow(iterable).toPublisher()
}
}

View file

@ -0,0 +1,17 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.Flow
import org.reactivestreams._
class SimpleCallbackPublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iter = Iterator from 0
val iter2 = if (elements > 0) iter take elements.toInt else iter
Flow(() if (iter2.hasNext) Some(iter2.next()) else None).toPublisher()
}
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import java.util.concurrent.atomic.AtomicInteger
import akka.stream._
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast
import org.reactivestreams.Processor
import org.reactivestreams.Publisher
class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val processorCounter = new AtomicInteger
override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)
implicit val materializer = FlowMaterializer(settings)(system)
val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet()
val mkTransformer = ()
new Transformer[Any, Any] {
override def onNext(in: Any) = List(in)
}
val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode(
Ast.Transform("transform", mkTransformer), flowName, 1)
processor.asInstanceOf[Processor[Int, Int]]
}
override def createHelperPublisher(elements: Long): Publisher[Int] = {
implicit val mat = FlowMaterializer()(system)
createSimpleIntPublisher(elements)(mat)
}
}

View file

@ -89,11 +89,11 @@ trait ScriptedTest extends Matchers {
var currentScript = script var currentScript = script
var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun) var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun)
debugLog(s"starting with remainingDemand=$remainingDemand") debugLog(s"starting with remainingDemand=$remainingDemand")
var pendingRequests = 0 var pendingRequests: Long = 0
var outstandingDemand = 0 var outstandingDemand: Long = 0
var completed = false var completed = false
def getNextDemand(): Int = { def getNextDemand(): Long = {
val max = Math.min(remainingDemand, maximumRequest) val max = Math.min(remainingDemand, maximumRequest)
if (max == 1) { if (max == 1) {
remainingDemand = 0 remainingDemand = 0
@ -107,7 +107,7 @@ trait ScriptedTest extends Matchers {
def debugLog(msg: String): Unit = _debugLog :+= msg def debugLog(msg: String): Unit = _debugLog :+= msg
def request(demand: Int): Unit = { def request(demand: Long): Unit = {
debugLog(s"test environment requests $demand") debugLog(s"test environment requests $demand")
downstreamSubscription.request(demand) downstreamSubscription.request(demand)
outstandingDemand += demand outstandingDemand += demand

View file

@ -16,7 +16,7 @@ object StreamTestKit {
* Subscribes the subscriber and completes after the first request. * Subscribes the subscriber and completes after the first request.
*/ */
def lazyEmptyPublisher[T]: Publisher[T] = new Publisher[T] { def lazyEmptyPublisher[T]: Publisher[T] = new Publisher[T] {
override def subscribe(subscriber: Subscriber[T]): Unit = override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
subscriber.onSubscribe(CompletedSubscription(subscriber)) subscriber.onSubscribe(CompletedSubscription(subscriber))
} }
@ -31,21 +31,21 @@ object StreamTestKit {
* Subscribes the subscriber and signals error after the first request. * Subscribes the subscriber and signals error after the first request.
*/ */
def lazyErrorPublisher[T](cause: Throwable): Publisher[T] = new Publisher[T] { def lazyErrorPublisher[T](cause: Throwable): Publisher[T] = new Publisher[T] {
override def subscribe(subscriber: Subscriber[T]): Unit = override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
subscriber.onSubscribe(FailedSubscription(subscriber, cause)) subscriber.onSubscribe(FailedSubscription(subscriber, cause))
} }
private case class FailedSubscription[T](subscriber: Subscriber[T], cause: Throwable) extends Subscription { private case class FailedSubscription[T](subscriber: Subscriber[T], cause: Throwable) extends Subscription {
override def request(elements: Int): Unit = subscriber.onError(cause) override def request(elements: Long): Unit = subscriber.onError(cause)
override def cancel(): Unit = () override def cancel(): Unit = ()
} }
private case class CompletedSubscription[T](subscriber: Subscriber[T]) extends Subscription { private case class CompletedSubscription[T](subscriber: Subscriber[T]) extends Subscription {
override def request(elements: Int): Unit = subscriber.onComplete() override def request(elements: Long): Unit = subscriber.onComplete()
override def cancel(): Unit = () override def cancel(): Unit = ()
} }
class AutoPublisher[T](probe: PublisherProbe[T], initialPendingRequests: Int = 0) { class AutoPublisher[T](probe: PublisherProbe[T], initialPendingRequests: Long = 0) {
val subscription = probe.expectSubscription() val subscription = probe.expectSubscription()
var pendingRequests = initialPendingRequests var pendingRequests = initialPendingRequests
@ -65,14 +65,14 @@ object StreamTestKit {
sealed trait PublisherEvent sealed trait PublisherEvent
case class Subscribe(subscription: Subscription) extends PublisherEvent case class Subscribe(subscription: Subscription) extends PublisherEvent
case class CancelSubscription(subscription: Subscription) extends PublisherEvent case class CancelSubscription(subscription: Subscription) extends PublisherEvent
case class RequestMore(subscription: Subscription, elements: Int) extends PublisherEvent case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent
case class PublisherProbeSubscription[I](subscriber: Subscriber[I], publisherProbe: TestProbe) extends Subscription { case class PublisherProbeSubscription[I](subscriber: Subscriber[_ >: I], publisherProbe: TestProbe) extends Subscription {
def request(elements: Int): Unit = publisherProbe.ref ! RequestMore(this, elements) def request(elements: Long): Unit = publisherProbe.ref ! RequestMore(this, elements)
def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this) def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this)
def expectRequest(n: Int): Unit = publisherProbe.expectMsg(RequestMore(this, n)) def expectRequest(n: Long): Unit = publisherProbe.expectMsg(RequestMore(this, n))
def expectRequest(): Int = publisherProbe.expectMsgPF() { def expectRequest(): Long = publisherProbe.expectMsgPF() {
case RequestMore(sub, n) if sub eq this n case RequestMore(sub, n) if sub eq this n
} }
@ -142,7 +142,7 @@ object StreamTestKit {
case class PublisherProbe[I]()(implicit system: ActorSystem) extends Publisher[I] { case class PublisherProbe[I]()(implicit system: ActorSystem) extends Publisher[I] {
val probe: TestProbe = TestProbe() val probe: TestProbe = TestProbe()
def subscribe(subscriber: Subscriber[I]): Unit = { def subscribe(subscriber: Subscriber[_ >: I]): Unit = {
val subscription: PublisherProbeSubscription[I] = new PublisherProbeSubscription[I](subscriber, probe) val subscription: PublisherProbeSubscription[I] = new PublisherProbeSubscription[I](subscriber, probe)
probe.ref ! Subscribe(subscription) probe.ref ! Subscribe(subscription)
subscriber.onSubscribe(subscription) subscriber.onSubscribe(subscription)

View file

@ -90,8 +90,8 @@ trait ScriptedTest extends Matchers {
var currentScript = script var currentScript = script
var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun) var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun)
debugLog(s"starting with remainingDemand=$remainingDemand") debugLog(s"starting with remainingDemand=$remainingDemand")
var pendingRequests = 0 var pendingRequests = 0L
var outstandingDemand = 0 var outstandingDemand = 0L
var completed = false var completed = false
def getNextDemand(): Int = { def getNextDemand(): Int = {