diff --git a/akka-http-core/src/main/scala/akka/http/client/HttpClientProcessor.scala b/akka-http-core/src/main/scala/akka/http/client/HttpClientProcessor.scala index 750a480781..057c452078 100644 --- a/akka-http-core/src/main/scala/akka/http/client/HttpClientProcessor.scala +++ b/akka-http-core/src/main/scala/akka/http/client/HttpClientProcessor.scala @@ -18,7 +18,7 @@ object HttpClientProcessor { def apply[T](requestSubscriber: Subscriber[(HttpRequest, T)], responsePublisher: Publisher[(HttpResponse, T)]): HttpClientProcessor[T] = new HttpClientProcessor[T] { - override def subscribe(s: Subscriber[(HttpResponse, T)]): Unit = responsePublisher.subscribe(s) + override def subscribe(s: Subscriber[_ >: (HttpResponse, T)]): Unit = responsePublisher.subscribe(s) override def onError(t: Throwable): Unit = requestSubscriber.onError(t) override def onSubscribe(s: Subscription): Unit = requestSubscriber.onSubscribe(s) diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala index 261a1d3db3..ef6dff25f7 100644 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala @@ -122,7 +122,7 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings: } } - override def requestFromUpstream(elements: Int): Unit = + override def requestFromUpstream(elements: Long): Unit = buffer ! Request(elements) override def initialBufferSize = @@ -131,7 +131,7 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings: override def maxBufferSize = materializerSettings.maxFanOutBufferSize - override def createSubscription(subscriber: Subscriber[Any]): ActorSubscription[Any] = + override def createSubscription(subscriber: Subscriber[_ >: Any]): ActorSubscription[Any] = new ActorSubscription(self, subscriber) override def cancelUpstream(): Unit = { @@ -151,7 +151,7 @@ private class PersistentPublisherImpl(persistenceId: String, publisherSettings: } private object PersistentPublisherBuffer { - case class Request(num: Int) + case class Request(n: Long) case class Response(events: Vector[Any]) case object Fill @@ -168,31 +168,31 @@ private class PersistentPublisherBuffer(override val persistenceId: String, publ import PersistentPublisherBuffer._ import context.dispatcher - private var replayed = 0 - private var requested = 0 + private var replayed = 0L + private var pendingDemand = 0L private var buffer: Vector[Any] = Vector.empty override def viewId: String = persistenceId + "-stream-view" private val filling: Receive = { case Filled ⇒ - if (buffer.nonEmpty && requested > 0) respond(requested) + if (buffer.nonEmpty && pendingDemand > 0) respond(pendingDemand) if (buffer.nonEmpty) pause() else if (replayed > 0) fill() else schedule() case Request(num) ⇒ - requested += num - if (buffer.nonEmpty) respond(requested) + pendingDemand += num + if (buffer.nonEmpty) respond(pendingDemand) case persistentEvent ⇒ buffer :+= persistentEvent replayed += 1 - if (requested > 0) respond(requested) + if (pendingDemand > 0) respond(pendingDemand) } private val pausing: Receive = { case Request(num) ⇒ - requested += num - respond(requested) + pendingDemand += num + respond(pendingDemand) if (buffer.isEmpty) fill() } @@ -200,7 +200,7 @@ private class PersistentPublisherBuffer(override val persistenceId: String, publ case Fill ⇒ fill() case Request(num) ⇒ - requested += num + pendingDemand += num } def receive = filling @@ -242,10 +242,16 @@ private class PersistentPublisherBuffer(override val persistenceId: String, publ context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill) } - private def respond(num: Int): Unit = { - val (res, buf) = buffer.splitAt(num) - publisher ! Response(res) - buffer = buf - requested -= res.size + // TODO Breaks now? + private def respond(num: Long): Unit = { + if (num <= Int.MaxValue) { + val n = num.toInt + val (res, buf) = buffer.splitAt(n) + publisher ! Response(res) + buffer = buf + pendingDemand -= res.size + } else { + respond(Int.MaxValue) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala b/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala new file mode 100644 index 0000000000..62fbbec305 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/ReactiveStreamsConstants.scala @@ -0,0 +1,20 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +object ReactiveStreamsConstants { + + final val CanNotSubscribeTheSameSubscriberMultipleTimes = + "can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)" + + final val SupportsOnlyASingleSubscriber = + "only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.12)" + + final val NumberOfElementsInRequestMustBePositiveMsg = + "The number of requested elements must be > 0 (see reactive-streams specification, rule 3.9)" + + final val TotalPendingDemandMustNotExceedLongMaxValue = + "Total pending demand MUST NOT be > `java.lang.Long.MAX_VALUE` (see reactive-streams specification, rule 3.17)" + +} diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index fc25fdf2e5..9f7b99ac2a 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -4,6 +4,7 @@ package akka.stream.actor import java.util.concurrent.ConcurrentHashMap +import akka.stream.ReactiveStreamsConstants import org.reactivestreams.{ Publisher, Subscriber, Subscription } import akka.actor.AbstractActor import akka.actor.Actor @@ -47,7 +48,7 @@ object ActorPublisherMessage { * more elements. * @param n number of requested elements */ - @SerialVersionUID(1L) case class Request(n: Int) extends ActorPublisherMessage + @SerialVersionUID(1L) case class Request(n: Long) extends ActorPublisherMessage /** * This message is delivered to the [[ActorPublisher]] actor when the stream subscriber cancels the @@ -122,11 +123,7 @@ trait ActorPublisher[T] extends Actor { * This actor automatically keeps tracks of this amount based on * incoming request messages and outgoing `onNext`. */ - final def totalDemand: Int = longToIntMax(demand) - - private def longToIntMax(n: Long): Int = - if (n > Int.MaxValue) Int.MaxValue - else n.toInt + final def totalDemand: Long = demand /** * The terminal state after calling [[#onComplete]]. It is not allowed to @@ -210,7 +207,7 @@ trait ActorPublisher[T] extends Actor { demand += n super.aroundReceive(receive, msg) - case Subscribe(sub) ⇒ + case Subscribe(sub: Subscriber[_]) ⇒ lifecycleState match { case PreSubscriber ⇒ subscriber = sub @@ -219,12 +216,16 @@ trait ActorPublisher[T] extends Actor { case ErrorEmitted(cause) ⇒ sub.onError(cause) case Completed ⇒ sub.onComplete() case Active | Canceled ⇒ - sub.onError(new IllegalStateException(s"ActorPublisher [$self] can only have one subscriber")) + if (subscriber == sub) + sub.onError(new IllegalStateException(s"ActorPublisher [$self, sub: $sub] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) + else + sub.onError(new IllegalStateException(s"ActorPublisher [$self] ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}")) } case Cancel ⇒ lifecycleState = Canceled demand = 0 + subscriber = null super.aroundReceive(receive, msg) case _ ⇒ @@ -272,7 +273,7 @@ private[akka] case class ActorPublisherImpl[T](ref: ActorRef) extends Publisher[ import ActorPublisher._ import ActorPublisher.Internal._ - override def subscribe(sub: Subscriber[T]): Unit = + override def subscribe(sub: Subscriber[_ >: T]): Unit = ref ! Subscribe(sub.asInstanceOf[Subscriber[Any]]) } @@ -283,8 +284,8 @@ private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscri import ActorPublisher._ import ActorPublisherMessage._ - override def request(n: Int): Unit = - if (n <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + override def request(n: Long): Unit = + if (n <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) else ref ! Request(n) override def cancel(): Unit = ref ! Cancel } diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala index 572a1fff79..a9d25d6007 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala @@ -169,7 +169,7 @@ trait ActorSubscriber extends Actor { private val state = ActorSubscriberState(context.system) private var subscription: Option[Subscription] = None - private var requested = 0L + private var requested: Long = 0 private var canceled = false protected def requestStrategy: RequestStrategy @@ -244,7 +244,7 @@ trait ActorSubscriber extends Actor { /** * Request a number of elements from upstream. */ - protected def request(elements: Int): Unit = + protected def request(elements: Long): Unit = if (elements > 0 && !canceled) { // if we don't have a subscription yet, it will be requested when it arrives subscription.foreach(_.request(elements)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 60cd276a59..3d260e17f3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl +import akka.stream.ReactiveStreamsConstants import org.reactivestreams.{ Publisher, Subscriber, Subscription, Processor } import akka.actor._ import akka.stream.MaterializerSettings @@ -142,9 +143,10 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) } protected def upstreamRunning: Actor.Receive = { - case OnNext(element) ⇒ enqueueInputElement(element) - case OnComplete ⇒ onComplete() - case OnError(cause) ⇒ onError(cause) + case OnNext(element) ⇒ enqueueInputElement(element) + case OnComplete ⇒ onComplete() + case OnError(cause) ⇒ onError(cause) + case OnSubscribe(subscription) ⇒ subscription.cancel() // spec rule 2.5 } protected def completed: Actor.Receive = { @@ -161,6 +163,7 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) * INTERNAL API */ private[akka] class SimpleOutputs(self: ActorRef, val pump: Pump) extends DefaultOutputTransferStates { + protected var exposedPublisher: ActorPublisher[Any] = _ protected var subscriber: Subscriber[Any] = _ @@ -200,7 +203,7 @@ private[akka] class SimpleOutputs(self: ActorRef, val pump: Pump) extends Defaul if (subscriber eq null) { subscriber = sub subscriber.onSubscribe(new ActorSubscription(self, subscriber)) - } else sub.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) + } else sub.onError(new IllegalStateException(s"${getClass.getSimpleName} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}")) } protected def waitingExposedPublisher: Actor.Receive = { @@ -215,7 +218,16 @@ private[akka] class SimpleOutputs(self: ActorRef, val pump: Pump) extends Defaul case SubscribePending ⇒ subscribePending(exposedPublisher.takePendingSubscribers()) case RequestMore(subscription, elements) ⇒ + + // TODO centralize overflow protection downstreamDemand += elements + if (downstreamDemand < 0) { + // Long has overflown + val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue) + subscriber.onError(demandOverflowException) + cancel(demandOverflowException) + } + pump.pump() case Cancel(subscription) ⇒ downstreamCompleted = true diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 4d8ab7837b..df3efb8e57 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -4,12 +4,13 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference + import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } -import akka.stream.{ MaterializerSettings } +import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings } import org.reactivestreams.{ Publisher, Subscriber } + import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.duration.Duration import scala.util.control.{ NoStackTrace, NonFatal } /** @@ -54,10 +55,10 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt // SubscribePending message. The AtomicReference is set to null by the shutdown method, which is // called by the actor from postStop. Pending (unregistered) subscription attempts are denied by // the shutdown method. Subscription attempts after shutdown can be denied immediately. - private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[T]]](Nil) + private val pendingSubscribers = new AtomicReference[immutable.Seq[Subscriber[_ >: T]]](Nil) - override def subscribe(subscriber: Subscriber[T]): Unit = { - @tailrec def doSubscribe(subscriber: Subscriber[T]): Unit = { + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = { + @tailrec def doSubscribe(subscriber: Subscriber[_ >: T]): Unit = { val current = pendingSubscribers.get if (current eq null) reportSubscribeError(subscriber) @@ -72,7 +73,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt doSubscribe(subscriber) } - def takePendingSubscribers(): immutable.Seq[Subscriber[T]] = { + def takePendingSubscribers(): immutable.Seq[Subscriber[_ >: T]] = { val pending = pendingSubscribers.getAndSet(Nil) assert(pending ne null, "takePendingSubscribers must not be called after shutdown") pending.reverse @@ -88,7 +89,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt @volatile private var shutdownReason: Option[Throwable] = None - private def reportSubscribeError(subscriber: Subscriber[T]): Unit = + private def reportSubscribeError(subscriber: Subscriber[_ >: T]): Unit = shutdownReason match { case Some(e) ⇒ subscriber.onError(e) case None ⇒ subscriber.onComplete() @@ -108,9 +109,9 @@ private[akka] class ActorPublisher[T](val impl: ActorRef, val equalityValue: Opt /** * INTERNAL API */ -private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[T]) extends SubscriptionWithCursor[T] { - override def request(elements: Int): Unit = - if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") +private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends SubscriptionWithCursor[T] { + override def request(elements: Long): Unit = + if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) else impl ! RequestMore(this, elements) override def cancel(): Unit = impl ! Cancel(this) } @@ -149,7 +150,6 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi with SubscriberManagement[T] with SoftShutdown { - import akka.stream.impl.ActorBasedFlowMaterializer._ import akka.stream.impl.SimpleCallbackPublisherImpl._ type S = ActorSubscription[T] @@ -184,7 +184,7 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi override def postStop(): Unit = if (pub ne null) pub.shutdown(shutdownReason) - private var demand = 0 + private var demand = 0L private def generate(): Unit = { if (demand > 0) { try { @@ -201,10 +201,10 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi override def initialBufferSize = settings.initialFanOutBufferSize override def maxBufferSize = settings.maxFanOutBufferSize - override def createSubscription(subscriber: Subscriber[T]): ActorSubscription[T] = + override def createSubscription(subscriber: Subscriber[_ >: T]): ActorSubscription[T] = new ActorSubscription(self, subscriber) - override def requestFromUpstream(elements: Int): Unit = demand += elements + override def requestFromUpstream(elements: Long): Unit = demand += elements override def cancelUpstream(): Unit = { pub.shutdown(shutdownReason) diff --git a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala index a9dde4be20..a9de046ea6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/BlackholeSubscriber.scala @@ -3,6 +3,8 @@ */ package akka.stream.impl +import java.util.concurrent.atomic.AtomicReference + import org.reactivestreams.{ Subscriber, Subscription } /** @@ -12,13 +14,13 @@ import org.reactivestreams.{ Subscriber, Subscription } private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscriber[T] { private val lowWatermark = Math.max(1, highWatermark / 2) - private var requested = 0 + private var requested = 0L - private var subscription: Subscription = _ + private val subscription: AtomicReference[Subscription] = new AtomicReference(null) override def onSubscribe(sub: Subscription): Unit = { - subscription = sub - requestMore() + if (subscription.compareAndSet(null, sub)) requestMore() + else sub.cancel() } override def onError(cause: Throwable): Unit = () @@ -30,10 +32,10 @@ private[akka] class BlackholeSubscriber[T](highWatermark: Int) extends Subscribe requestMore() } - private def requestMore(): Unit = + protected def requestMore(): Unit = if (requested < lowWatermark) { val amount = highWatermark - requested - subscription.request(amount) + subscription.get().request(amount) requested += amount } diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 14a9f7c66f..99403769f3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -9,7 +9,7 @@ import org.reactivestreams.{ Subscriber, Publisher } * INTERNAL API */ private[akka] case object EmptyPublisher extends Publisher[Nothing] { - def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onComplete() + def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = subscriber.onComplete() def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] } @@ -17,6 +17,6 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { * INTERNAL API */ private[akka] case class ErrorPublisher(t: Throwable) extends Publisher[Nothing] { - def subscribe(subscriber: Subscriber[Nothing]): Unit = subscriber.onError(t) + def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = subscriber.onError(t) def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]] } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index fa2285fca2..61a58778b6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -11,12 +11,13 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu extends DefaultOutputTransferStates with SubscriberManagement[Any] { - override type S = ActorSubscription[Any] - override def createSubscription(subscriber: Subscriber[Any]): S = + override type S = ActorSubscription[_ >: Any] + override def createSubscription(subscriber: Subscriber[_ >: Any]): S = new ActorSubscription(self, subscriber) + protected var exposedPublisher: ActorPublisher[Any] = _ - private var downstreamBufferSpace = 0 + private var downstreamBufferSpace: Long = 0L private var downstreamCompleted = false override def demandAvailable = downstreamBufferSpace > 0 override def demandCount: Long = downstreamBufferSpace @@ -46,7 +47,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu def afterShutdown(): Unit - override protected def requestFromUpstream(elements: Int): Unit = downstreamBufferSpace += elements + override protected def requestFromUpstream(elements: Long): Unit = downstreamBufferSpace += elements private def subscribePending(): Unit = exposedPublisher.takePendingSubscribers() foreach registerSubscriber diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index d11fc1564e..30d7b79b0d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -3,14 +3,21 @@ */ package akka.stream.impl -import akka.actor.{ Actor, ActorRef, Props, Status, SupervisorStrategy } +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.Status +import akka.actor.SupervisorStrategy import akka.pattern.pipe import akka.stream.MaterializerSettings -import org.reactivestreams.{ Subscriber, Subscription } +import akka.stream.ReactiveStreamsConstants +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription import scala.concurrent.Future -import scala.concurrent.duration.Duration -import scala.util.{ Failure, Success, Try } +import scala.util.Failure +import scala.util.Success +import scala.util.Try /** * INTERNAL API @@ -27,8 +34,8 @@ private[akka] object FuturePublisher { class FutureSubscription(ref: ActorRef) extends Subscription { import akka.stream.impl.FuturePublisher.FutureSubscription._ def cancel(): Unit = ref ! Cancel(this) - def request(elements: Int): Unit = - if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + def request(elements: Long): Unit = + if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) else ref ! RequestMore(this) override def toString = "FutureSubscription" } @@ -39,7 +46,8 @@ private[akka] object FuturePublisher { */ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { import akka.stream.impl.FuturePublisher.FutureSubscription - import akka.stream.impl.FuturePublisher.FutureSubscription.{ Cancel, RequestMore } + import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel + import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore var exposedPublisher: ActorPublisher[Any] = _ var subscribers = Map.empty[Subscriber[Any], FutureSubscription] @@ -98,7 +106,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerS def registerSubscriber(subscriber: Subscriber[Any]): Unit = { if (subscribers.contains(subscriber)) - subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) + subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) else { val subscription = new FutureSubscription(self) subscribers = subscribers.updated(subscriber, subscription) diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala index 50b90a7322..3ebfc484a9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala @@ -4,12 +4,11 @@ package akka.stream.impl import akka.actor.{ Actor, ActorRef, Props, SupervisorStrategy, Terminated } -import akka.stream.MaterializerSettings +import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants } import org.reactivestreams.{ Subscriber, Subscription } import scala.annotation.tailrec import scala.collection.immutable -import scala.concurrent.duration.Duration import scala.util.control.NonFatal /** @@ -21,15 +20,15 @@ private[akka] object IterablePublisher { object BasicActorSubscription { case object Cancel - case class RequestMore(elements: Int) + case class RequestMore(elements: Long) } class BasicActorSubscription(ref: ActorRef) extends Subscription { import akka.stream.impl.IterablePublisher.BasicActorSubscription._ def cancel(): Unit = ref ! Cancel - def request(elements: Int): Unit = - if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + def request(elements: Long): Unit = + if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) else ref ! RequestMore(elements) override def toString = "BasicActorSubscription" } @@ -43,7 +42,6 @@ private[akka] object IterablePublisher { * beginning of the iterable and it can consume the elements in its own pace. */ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { - import akka.stream.impl.ActorBasedFlowMaterializer._ import akka.stream.impl.IterablePublisher.BasicActorSubscription require(iterable.nonEmpty, "Use EmptyPublisher for empty iterable") @@ -91,7 +89,7 @@ private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], setting def registerSubscriber(subscriber: Subscriber[Any]): Unit = { if (subscribers(subscriber)) - subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) + subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) else { val iterator = iterable.iterator val worker = context.watch(context.actorOf(IterablePublisherWorker.props(iterator, subscriber, @@ -130,17 +128,16 @@ private[akka] object IterablePublisherWorker { */ private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int) extends Actor with SoftShutdown { - import akka.stream.impl.ActorBasedFlowMaterializer._ import akka.stream.impl.IterablePublisher.BasicActorSubscription._ import akka.stream.impl.IterablePublisherWorker._ require(iterator.hasNext, "Iterator must not be empty") - var demand = 0L + var pendingDemand: Long = 0L def receive = { case RequestMore(elements) ⇒ - demand += elements + pendingDemand += elements push() case PushMore ⇒ push() @@ -151,8 +148,8 @@ private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber: private def push(): Unit = { @tailrec def doPush(n: Int): Unit = - if (demand > 0) { - demand -= 1 + if (pendingDemand > 0) { + pendingDemand -= 1 val hasNext = { subscriber.onNext(iterator.next()) iterator.hasNext @@ -161,7 +158,7 @@ private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber: subscriber.onComplete() context.parent ! Finished softShutdown() - } else if (n == 0 && demand > 0) + } else if (n == 0 && pendingDemand > 0) self ! PushMore else doPush(n - 1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala index fc1b55b7bc..e74fb5ddfa 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Messages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Messages.scala @@ -12,7 +12,7 @@ private[akka] case object SubscribePending /** * INTERNAL API */ -private[akka] case class RequestMore(subscription: ActorSubscription[_], demand: Int) +private[akka] case class RequestMore(subscription: ActorSubscription[_], demand: Long) /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala index ab95a235c1..b213b59dd7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala @@ -4,7 +4,7 @@ package akka.stream.impl import akka.stream.MaterializerSettings -import org.reactivestreams.{ Subscriber, Subscription, Publisher } +import org.reactivestreams.Subscriber /** * INTERNAL API @@ -15,7 +15,7 @@ private[akka] class BroadcastImpl(_settings: MaterializerSettings, other: Subscr override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) { var secondarySubscribed = false - override def registerSubscriber(subscriber: Subscriber[Any]): Unit = { + override def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = { if (!secondarySubscribed) { super.registerSubscriber(other) secondarySubscribed = true diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 04fea12684..5439e70fb4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -4,7 +4,7 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference - +import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings } import akka.actor.{ Actor, ActorRef } import akka.stream.MaterializerSettings import org.reactivestreams.{ Publisher, Subscriber, Subscription } @@ -15,13 +15,13 @@ import scala.collection.mutable */ private[akka] object MultiStreamOutputProcessor { case class SubstreamKey(id: Long) - case class SubstreamRequestMore(substream: SubstreamKey, demand: Int) + case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) case class SubstreamCancel(substream: SubstreamKey) case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { - override def request(elements: Int): Unit = - if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + override def request(elements: Long): Unit = + if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) else parent ! SubstreamRequestMore(substreamKey, elements) override def cancel(): Unit = parent ! SubstreamCancel(substreamKey) override def toString = "SubstreamSubscription" + System.identityHashCode(this) @@ -46,7 +46,7 @@ private[akka] object MultiStreamOutputProcessor { override def subreceive: SubReceive = throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block") - def enqueueOutputDemand(demand: Int): Unit = { + def enqueueOutputDemand(demand: Long): Unit = { downstreamDemand += demand pump.pump() } @@ -78,7 +78,7 @@ private[akka] object MultiStreamOutputProcessor { case Failed(e) ⇒ s.onError(e) } - override def subscribe(s: Subscriber[Any]): Unit = { + override def subscribe(s: Subscriber[_ >: Any]): Unit = { if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) else { state.get() match { diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index a8027d2346..215180e1f8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -3,6 +3,9 @@ */ package akka.stream.impl +import akka.stream.ReactiveStreamsConstants + +import scala.annotation.switch import scala.annotation.tailrec import org.reactivestreams.{ Subscriber, Subscription } import SubscriberManagement.ShutDown @@ -29,19 +32,32 @@ private[akka] object SubscriberManagement { def apply[T](subscriber: Subscriber[T]): Unit = subscriber.onError(cause) } - val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down spi.Publisher")) + val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down Publisher")) } /** * INTERNAL API */ private[akka] trait SubscriptionWithCursor[T] extends Subscription with ResizableMultiReaderRingBuffer.Cursor { - def subscriber: Subscriber[T] + def subscriber: Subscriber[_ >: T] def dispatch(element: T): Unit = subscriber.onNext(element) + /** Increases the `requested` counter, additionally providing overflow protection */ + def moreRequested(demand: Long): Long = { + val sum = totalDemand + demand + val noOverflow = sum > 0 + + if (noOverflow) totalDemand = sum + else subscriber.onError(new IllegalStateException(s"Total pending demand ($totalDemand + $demand) would overflow `Long`, for Subscriber $subscriber! ${ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue}")) + + sum + } + var active = true - var requested: Long = 0 // number of requested but not yet dispatched elements + + /** Do not increment directly, use `moreRequested(Long)` instead (it provides overflow protection)! */ + var totalDemand: Long = 0 // number of requested but not yet dispatched elements var cursor: Int = 0 // buffer cursor, managed by buffer } @@ -60,7 +76,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff * called when we are ready to consume more elements from our upstream * MUST NOT call pushToDownstream */ - protected def requestFromUpstream(elements: Int): Unit + protected def requestFromUpstream(elements: Long): Unit /** * called before `shutdown()` if the stream is *not* being regularly completed @@ -76,7 +92,7 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff /** * Use to register a subscriber */ - protected def createSubscription(subscriber: Subscriber[T]): S + protected def createSubscription(subscriber: Subscriber[_ >: T]): S private[this] val buffer = new ResizableMultiReaderRingBuffer[T](initialBufferSize, maxBufferSize, this) @@ -96,9 +112,8 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff /** * more demand was signaled from a given subscriber */ - protected def moreRequested(subscription: S, elements: Int): Unit = + protected def moreRequested(subscription: S, elements: Long): Unit = if (subscription.active) { - // returns Long.MinValue if the subscription is to be terminated @tailrec def dispatchFromBufferAndReturnRemainingRequested(requested: Long, eos: EndOfStream): Long = if (requested == 0) { @@ -112,23 +127,23 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff endOfStream match { case eos @ (NotReached | Completed) ⇒ - val demand = subscription.requested + elements + val demand = subscription.moreRequested(elements) dispatchFromBufferAndReturnRemainingRequested(demand, eos) match { case Long.MinValue ⇒ eos(subscription.subscriber) unregisterSubscriptionInternal(subscription) case x ⇒ - subscription.requested = x + subscription.totalDemand = x requestFromUpstreamIfRequired() } - case ErrorCompleted(_) ⇒ // ignore, the spi.Subscriber might not have seen our error event yet + case ErrorCompleted(_) ⇒ // ignore, the Subscriber might not have seen our error event yet } } private[this] final def requestFromUpstreamIfRequired(): Unit = { @tailrec def maxRequested(remaining: Subscriptions, result: Long = 0): Long = remaining match { - case head :: tail ⇒ maxRequested(tail, math.max(head.requested, result)) + case head :: tail ⇒ maxRequested(tail, math.max(head.totalDemand, result)) case _ ⇒ result } val desired = Math.min(Int.MaxValue, Math.min(maxRequested(subscriptions), buffer.maxAvailable) - pendingFromUpstream).toInt @@ -145,10 +160,10 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff @tailrec def dispatch(remaining: Subscriptions, sent: Boolean = false): Boolean = remaining match { case head :: tail ⇒ - if (head.requested > 0) { + if (head.totalDemand > 0) { val element = buffer.read(head) head.dispatch(element) - head.requested -= 1 + head.totalDemand -= 1 dispatch(tail, true) } else dispatch(tail, sent) case _ ⇒ sent @@ -198,9 +213,9 @@ private[akka] trait SubscriberManagement[T] extends ResizableMultiReaderRingBuff /** * Register a new subscriber. */ - protected def registerSubscriber(subscriber: Subscriber[T]): Unit = endOfStream match { + protected def registerSubscriber(subscriber: Subscriber[_ >: T]): Unit = endOfStream match { case NotReached if subscriptions.exists(_.subscriber eq subscriber) ⇒ - subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) + subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [${this}, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) case NotReached ⇒ val newSubscription = createSubscription(subscriber) subscriptions ::= newSubscription diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala index 1de4a968ba..b58976d3b4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala @@ -19,31 +19,31 @@ private[akka] object SynchronousPublisherFromIterable { private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription { var done = false - var demand = 0 + var pendingDemand = 0L var pushing = false override def cancel(): Unit = done = true - override def request(elements: Int): Unit = { + override def request(elements: Long): Unit = { @tailrec def pushNext(): Unit = { if (!done) if (iterator.isEmpty) { done = true subscriber.onComplete() - } else if (demand != 0) { - demand -= 1 + } else if (pendingDemand != 0) { + pendingDemand -= 1 subscriber.onNext(iterator.next()) pushNext() } } if (pushing) - demand += elements // reentrant call to requestMore from onNext + pendingDemand += elements // reentrant call to requestMore from onNext else { try { pushing = true - demand = elements + pendingDemand = elements pushNext() } catch { case NonFatal(e) ⇒ @@ -73,7 +73,7 @@ private[akka] class SynchronousPublisherFromIterable[T](private val iterable: im import akka.stream.impl.SynchronousPublisherFromIterable.IteratorSubscription - override def subscribe(subscriber: Subscriber[T]): Unit = + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = subscriber.onSubscribe(new IteratorSubscription(subscriber, iterable.iterator)) override def equals(o: Any): Boolean = o match { diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 1ef7bc033b..82085f8711 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -4,11 +4,11 @@ package akka.stream.impl import akka.actor.{ Actor, ActorRef, Cancellable, Props, SupervisorStrategy } -import akka.stream.MaterializerSettings +import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants } import org.reactivestreams.{ Subscriber, Subscription } import scala.collection.mutable -import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal /** @@ -19,15 +19,15 @@ private[akka] object TickPublisher { Props(new TickPublisher(initialDelay, interval, tick, settings)).withDispatcher(settings.dispatcher) object TickPublisherSubscription { - case class Cancel(subscriber: Subscriber[Any]) - case class RequestMore(elements: Int, subscriber: Subscriber[Any]) + case class Cancel(subscriber: Subscriber[_ >: Any]) + case class RequestMore(elements: Long, subscriber: Subscriber[_ >: Any]) } - class TickPublisherSubscription(ref: ActorRef, subscriber: Subscriber[Any]) extends Subscription { + class TickPublisherSubscription(ref: ActorRef, subscriber: Subscriber[_ >: Any]) extends Subscription { import akka.stream.impl.TickPublisher.TickPublisherSubscription._ def cancel(): Unit = ref ! Cancel(subscriber) - def request(elements: Int): Unit = - if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + def request(elements: Long): Unit = + if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) else ref ! RequestMore(elements, subscriber) override def toString = "TickPublisherSubscription" } @@ -47,7 +47,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite import akka.stream.impl.TickPublisher._ var exposedPublisher: ActorPublisher[Any] = _ - val demand = mutable.Map.empty[Subscriber[Any], Long] + val demand = mutable.Map.empty[Subscriber[_ >: Any], Long] override val supervisorStrategy = SupervisorStrategy.stoppingStrategy @@ -97,9 +97,9 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite } - def registerSubscriber(subscriber: Subscriber[Any]): Unit = { + def registerSubscriber(subscriber: Subscriber[_ >: Any]): Unit = { if (demand.contains(subscriber)) - subscriber.onError(new IllegalStateException(s"Cannot subscribe $subscriber twice")) + subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) else { val subscription = new TickPublisherSubscription(self, subscriber) demand(subscriber) = 0 @@ -107,7 +107,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite } } - private def unregisterSubscriber(subscriber: Subscriber[Any]): Unit = { + private def unregisterSubscriber(subscriber: Subscriber[_ >: Any]): Unit = { demand -= subscriber if (demand.isEmpty) { exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index d710474049..97ae52210e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -378,7 +378,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph // FIXME remove when real materialization is done def dummyProcessor(name: String): Processor[Any, Any] = new BlackholeSubscriber[Any](1) with Publisher[Any] with Processor[Any, Any] { - def subscribe(subscriber: Subscriber[Any]): Unit = subscriber.onComplete() + def subscribe(subscriber: Subscriber[_ >: Any]): Unit = subscriber.onComplete() override def toString = name } diff --git a/akka-stream/src/test/scala/akka/stream/ActorPublisherTest.scala b/akka-stream/src/test/scala/akka/stream/ActorPublisherTest.scala deleted file mode 100644 index 56c677100d..0000000000 --- a/akka-stream/src/test/scala/akka/stream/ActorPublisherTest.scala +++ /dev/null @@ -1,14 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import org.scalatest.testng.TestNGSuiteLike -import akka.stream.scaladsl.Flow -import akka.actor.ActorSystem -import akka.stream.testkit.AkkaSpec - -class ActorPublisherTest(_system: ActorSystem, /*env: TestEnvironment,*/ publisherShutdownTimeout: Long) { - // FIXME: Needs new TCK version - // Original code available in 82734877d080577cf538c2a47d60c117e078ac1c -} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala index 611d8fb28b..19311ae41c 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowDropWithinSpec.scala @@ -20,19 +20,19 @@ class FlowDropWithinSpec extends AkkaSpec { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[Int]() Flow(p).dropWithin(1.second).produceTo(c) - val pSub = p.expectSubscription - val cSub = c.expectSubscription + val pSub = p.expectSubscription() + val cSub = c.expectSubscription() cSub.request(100) - val demand1 = pSub.expectRequest - (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand2 = pSub.expectRequest - (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand3 = pSub.expectRequest + val demand1 = pSub.expectRequest() + (1 to demand1.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand2 = pSub.expectRequest() + (1 to demand2.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand3 = pSub.expectRequest() c.expectNoMsg(1500.millis) - (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } - ((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)) foreach { n ⇒ c.expectNext(n) } + (1 to demand3.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } + ((demand1 + demand2 + 1).toInt to (demand1 + demand2 + demand3).toInt) foreach { n ⇒ c.expectNext(n) } pSub.sendComplete() - c.expectComplete + c.expectComplete() c.expectNoMsg(200.millis) } diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 8e2f2a46bb..a2b9acd0f0 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -23,7 +23,7 @@ class FlowGroupBySpec extends AkkaSpec { p.subscribe(probe) val subscription = probe.expectSubscription() - def request(demand: Int): Unit = subscription.request(demand) + def request(demand: Long): Unit = subscription.request(demand) def expectNext(elem: Int): Unit = probe.expectNext(elem) def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) def expectComplete(): Unit = probe.expectComplete() diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala index 0065e19733..451334aa16 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupedWithinSpec.scala @@ -25,35 +25,35 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() Flow(p).groupedWithin(1000, 1.second).produceTo(c) - val pSub = p.expectSubscription - val cSub = c.expectSubscription + val pSub = p.expectSubscription() + val cSub = c.expectSubscription() cSub.request(100) - val demand1 = pSub.expectRequest - (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand2 = pSub.expectRequest - (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand3 = pSub.expectRequest - c.expectNext((1 to (demand1 + demand2)).toVector) - (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand1 = pSub.expectRequest() + (1 to demand1.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand2 = pSub.expectRequest() + (1 to demand2.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand3 = pSub.expectRequest() + c.expectNext((1 to (demand1 + demand2).toInt).toVector) + (1 to demand3.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } c.expectNoMsg(300.millis) - c.expectNext(((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)).toVector) + c.expectNext(((demand1 + demand2 + 1).toInt to (demand1 + demand2 + demand3).toInt).toVector) c.expectNoMsg(300.millis) - pSub.expectRequest + pSub.expectRequest() val last = input.next() pSub.sendNext(last) pSub.sendComplete() c.expectNext(List(last)) - c.expectComplete + c.expectComplete() c.expectNoMsg(200.millis) } "deliver bufferd elements onComplete before the timeout" in { val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() Flow(1 to 3).groupedWithin(1000, 10.second).produceTo(c) - val cSub = c.expectSubscription + val cSub = c.expectSubscription() cSub.request(100) c.expectNext((1 to 3).toList) - c.expectComplete + c.expectComplete() c.expectNoMsg(200.millis) } @@ -62,19 +62,19 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() Flow(p).groupedWithin(1000, 1.second).produceTo(c) - val pSub = p.expectSubscription - val cSub = c.expectSubscription + val pSub = p.expectSubscription() + val cSub = c.expectSubscription() cSub.request(1) - val demand1 = pSub.expectRequest - (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } - c.expectNext((1 to demand1).toVector) - val demand2 = pSub.expectRequest - (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + val demand1 = pSub.expectRequest() + (1 to demand1.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } + c.expectNext((1 to demand1.toInt).toVector) + val demand2 = pSub.expectRequest() + (1 to demand2.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } c.expectNoMsg(300.millis) cSub.request(1) - c.expectNext(((demand1 + 1) to (demand1 + demand2)).toVector) + c.expectNext(((demand1 + 1).toInt to (demand1 + demand2).toInt).toVector) pSub.sendComplete() - c.expectComplete + c.expectComplete() c.expectNoMsg(100.millis) } @@ -82,10 +82,10 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() Flow(p).groupedWithin(1000, 500.millis).produceTo(c) - val pSub = p.expectSubscription - val cSub = c.expectSubscription + val pSub = p.expectSubscription() + val cSub = c.expectSubscription() cSub.request(2) - pSub.expectRequest + pSub.expectRequest() c.expectNoMsg(600.millis) pSub.sendNext(1) pSub.sendNext(2) @@ -95,7 +95,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { cSub.request(3) c.expectNoMsg(600.millis) pSub.sendComplete() - c.expectComplete + c.expectComplete() c.expectNoMsg(100.millis) } @@ -104,13 +104,13 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val p = StreamTestKit.PublisherProbe[Int]() val c = StreamTestKit.SubscriberProbe[immutable.Seq[Int]]() Flow(p).groupedWithin(3, 2.second).produceTo(c) - val pSub = p.expectSubscription - val cSub = c.expectSubscription + val pSub = p.expectSubscription() + val cSub = c.expectSubscription() cSub.request(4) - val demand1 = pSub.expectRequest + val demand1 = pSub.expectRequest() demand1 should be(4) c.expectNoMsg(1000.millis) - (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + (1 to demand1.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } c.probe.within(1000.millis) { c.expectNext((1 to 3).toVector) } @@ -119,7 +119,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { c.expectNext(List(4)) } pSub.sendComplete() - c.expectComplete + c.expectComplete() c.expectNoMsg(100.millis) } diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index 489d33ceec..894739e53b 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -23,7 +23,7 @@ class FlowSplitWhenSpec extends AkkaSpec { p.subscribe(probe) val subscription = probe.expectSubscription() - def request(demand: Int): Unit = subscription.request(demand) + def request(demand: Long): Unit = subscription.request(demand) def expectNext(elem: Int): Unit = probe.expectNext(elem) def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) def expectComplete(): Unit = probe.expectComplete() diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala index 2f6dacc8b1..391e16ef0d 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala @@ -42,4 +42,4 @@ class FlowTakeSpec extends AkkaSpec with ScriptedTest { } -} \ No newline at end of file +} diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala index 688c000e60..0201520e78 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeWithinSpec.scala @@ -24,16 +24,16 @@ class FlowTakeWithinSpec extends AkkaSpec { val cSub = c.expectSubscription() cSub.request(100) val demand1 = pSub.expectRequest() - (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + (1 to demand1.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } val demand2 = pSub.expectRequest() - (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + (1 to demand2.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } val demand3 = pSub.expectRequest() - val sentN = demand1 + demand2 + val sentN = demand1.toInt + demand2.toInt (1 to sentN) foreach { n ⇒ c.expectNext(n) } within(2.seconds) { c.expectComplete() } - (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } + (1 to demand3.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } c.expectNoMsg(200.millis) } diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala deleted file mode 100644 index a4daef3514..0000000000 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import org.scalatest.testng.TestNGSuiteLike -// FIXME: new TCK needed -//import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment, IdentityProcessorVerification } -import akka.actor.{ ActorSystem, Props } -import akka.stream.impl.ActorProcessor -import akka.stream.impl.TransformProcessorImpl -import akka.stream.impl.Ast -import akka.testkit.{ TestEvent, EventFilter } -import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.scaladsl.Flow -import akka.stream.testkit.AkkaSpec -import java.util.concurrent.atomic.AtomicInteger - -class IdentityProcessorTest(_system: ActorSystem, /*env: TestEnvironment,*/ publisherShutdownTimeout: Long) { - - // FIXME: new TCK needed - // Original code available in 82734877d080577cf538c2a47d60c117e078ac1c -} diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala deleted file mode 100644 index b0f777eb49..0000000000 --- a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala +++ /dev/null @@ -1,17 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import org.scalatest.testng.TestNGSuiteLike -// FIXME: Needs new TCK version -//import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification } -import scala.collection.immutable -import akka.stream.scaladsl.Flow -import akka.actor.ActorSystem -import akka.stream.testkit.AkkaSpec - -class IterableProducerTest(_system: ActorSystem, /*env: TestEnvironment, */ publisherShutdownTimeout: Long) { - // FIXME: Needs new TCK version - // Original code available in 82734877d080577cf538c2a47d60c117e078ac1c -} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala deleted file mode 100644 index a4a50d4717..0000000000 --- a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala +++ /dev/null @@ -1,16 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import org.scalatest.testng.TestNGSuiteLike -// FIXME: Needs new TCK version -// import org.reactivestreams.tck.{ TestEnvironment, PublisherVerification } -import akka.stream.scaladsl.Flow -import akka.actor.ActorSystem -import akka.stream.testkit.AkkaSpec - -class IteratorProducerTest(_system: ActorSystem, /*env: TestEnvironment, */ publisherShutdownTimeout: Long) { - // FIXME: Needs new TCK version - // Original code available in 82734877d080577cf538c2a47d60c117e078ac1c -} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala b/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala deleted file mode 100644 index eb94bbfb86..0000000000 --- a/akka-stream/src/test/scala/akka/stream/WithActorSystem.scala +++ /dev/null @@ -1,16 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import akka.actor.ActorSystem -// FIXME: TestNG dependency comes from TCK. Needs new TCK version -//import org.testng.annotations.AfterClass - -trait WithActorSystem { - def system: ActorSystem - - // FIXME: TestNG dependency comes from TCK. Needs new TCK version - // @AfterClass - // def shutdownActorSystem(): Unit = system.shutdown() -} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala b/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala index d9cf395f2a..63ce043c33 100644 --- a/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/actor/ActorPublisherSpec.scala @@ -3,13 +3,19 @@ */ package akka.stream.actor -import akka.actor.{ ActorRef, PoisonPill, Props } -import akka.stream.{ MaterializerSettings, FlowMaterializer } +import akka.actor.ActorRef +import akka.actor.PoisonPill +import akka.actor.Props +import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow -import akka.stream.testkit.{ AkkaSpec, StreamTestKit } +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.testkit.EventFilter +import akka.testkit.ImplicitSender import akka.testkit.TestEvent.Mute -import akka.testkit.{ EventFilter, ImplicitSender, TestProbe } +import akka.testkit.TestProbe +import scala.annotation.tailrec import scala.concurrent.duration._ import scala.util.control.NoStackTrace @@ -25,7 +31,7 @@ object ActorPublisherSpec { case object Complete class TestPublisher(probe: ActorRef) extends ActorPublisher[String] { - import ActorPublisherMessage._ + import akka.stream.actor.ActorPublisherMessage._ def receive = { case Request(element) ⇒ probe ! TotalDemand(totalDemand) @@ -39,7 +45,7 @@ object ActorPublisherSpec { def senderProps: Props = Props[Sender].withDispatcher("akka.test.stream-dispatcher") class Sender extends ActorPublisher[Int] { - import ActorPublisherMessage._ + import akka.stream.actor.ActorPublisherMessage._ var buf = Vector.empty[Int] @@ -57,11 +63,19 @@ object ActorPublisherSpec { context.stop(self) } - def deliverBuf(): Unit = + @tailrec + final def deliverBuf(): Unit = if (totalDemand > 0) { - val (use, keep) = buf.splitAt(totalDemand) - buf = keep - use foreach onNext + if (totalDemand <= Int.MaxValue) { + val (use, keep) = buf.splitAt(totalDemand.toInt) + buf = keep + use foreach onNext + } else { + val (use, keep) = buf.splitAt(Int.MaxValue) + buf = keep + use foreach onNext + deliverBuf() + } } } @@ -69,7 +83,7 @@ object ActorPublisherSpec { Props(new Receiver(probe)).withDispatcher("akka.test.stream-dispatcher") class Receiver(probe: ActorRef) extends ActorSubscriber { - import ActorSubscriberMessage._ + import akka.stream.actor.ActorSubscriberMessage._ override val requestStrategy = WatermarkRequestStrategy(10) @@ -83,7 +97,7 @@ object ActorPublisherSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorPublisherSpec extends AkkaSpec with ImplicitSender { - import ActorPublisherSpec._ + import akka.stream.actor.ActorPublisherSpec._ system.eventStream.publish(Mute(EventFilter[IllegalStateException]())) diff --git a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala index ba9b1c1b86..4d3f177d91 100644 --- a/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -11,6 +11,7 @@ import scala.concurrent.duration._ class TcpFlowSpec extends AkkaSpec with TcpHelper { import akka.stream.io.TcpHelper._ + var demand = 0L "Outgoing TCP stream" must { diff --git a/akka-stream/src/test/scala/akka/stream/io/TcpHelper.scala b/akka-stream/src/test/scala/akka/stream/io/TcpHelper.scala index 425d4b098b..18a896cea4 100644 --- a/akka-stream/src/test/scala/akka/stream/io/TcpHelper.scala +++ b/akka-stream/src/test/scala/akka/stream/io/TcpHelper.scala @@ -161,7 +161,7 @@ trait TcpHelper { this: TestKitBase ⇒ val publisherProbe = StreamTestKit.PublisherProbe[ByteString]() publisherProbe.subscribe(tcpProcessor) val tcpWriteSubscription = publisherProbe.expectSubscription() - var demand = 0 + var demand = 0L def write(bytes: ByteString): Unit = { if (demand == 0) demand += tcpWriteSubscription.expectRequest() diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala index 5ea6bef67d..5a9fb633e3 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowDropWithinSpec.scala @@ -23,13 +23,13 @@ class FlowDropWithinSpec extends AkkaSpec { val cSub = c.expectSubscription cSub.request(100) val demand1 = pSub.expectRequest - (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } + (1 to demand1.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } val demand2 = pSub.expectRequest - (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } + (1 to demand2.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } val demand3 = pSub.expectRequest c.expectNoMsg(1500.millis) - (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } - ((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)) foreach { n ⇒ c.expectNext(n) } + (1 to demand3.toInt) foreach { _ ⇒ pSub.sendNext(input.next()) } + ((demand1 + demand2 + 1).toInt to (demand1 + demand2 + demand3).toInt) foreach { n ⇒ c.expectNext(n) } pSub.sendComplete() c.expectComplete c.expectNoMsg(200.millis) diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala index f9f76e72f2..d5f0797ad0 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGroupedWithinSpec.scala @@ -28,15 +28,15 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(100) - val demand1 = pSub.expectRequest + val demand1 = pSub.expectRequest.toInt (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand2 = pSub.expectRequest + val demand2 = pSub.expectRequest.toInt (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand3 = pSub.expectRequest - c.expectNext((1 to (demand1 + demand2)).toVector) + val demand3 = pSub.expectRequest.toInt + c.expectNext((1 to (demand1 + demand2).toInt).toVector) (1 to demand3) foreach { _ ⇒ pSub.sendNext(input.next()) } c.expectNoMsg(300.millis) - c.expectNext(((demand1 + demand2 + 1) to (demand1 + demand2 + demand3)).toVector) + c.expectNext(((demand1 + demand2 + 1).toInt to (demand1 + demand2 + demand3).toInt).toVector) c.expectNoMsg(300.millis) pSub.expectRequest val last = input.next() @@ -65,10 +65,10 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(1) - val demand1 = pSub.expectRequest + val demand1 = pSub.expectRequest.toInt (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } c.expectNext((1 to demand1).toVector) - val demand2 = pSub.expectRequest + val demand2 = pSub.expectRequest.toInt (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } c.expectNoMsg(300.millis) cSub.request(1) @@ -107,7 +107,7 @@ class FlowGroupedWithinSpec extends AkkaSpec with ScriptedTest { val pSub = p.expectSubscription val cSub = c.expectSubscription cSub.request(4) - val demand1 = pSub.expectRequest + val demand1 = pSub.expectRequest.toInt demand1 should be(4) c.expectNoMsg(1000.millis) (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala index e0f820c634..8c09df30b2 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTakeWithinSpec.scala @@ -22,11 +22,11 @@ class FlowTakeWithinSpec extends AkkaSpec { val pSub = p.expectSubscription() val cSub = c.expectSubscription() cSub.request(100) - val demand1 = pSub.expectRequest() + val demand1 = pSub.expectRequest().toInt (1 to demand1) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand2 = pSub.expectRequest() + val demand2 = pSub.expectRequest().toInt (1 to demand2) foreach { _ ⇒ pSub.sendNext(input.next()) } - val demand3 = pSub.expectRequest() + val demand3 = pSub.expectRequest().toInt val sentN = demand1 + demand2 (1 to sentN) foreach { n ⇒ c.expectNext(n) } within(2.seconds) { diff --git a/akka-stream/src/test/scala/akka/stream/tck/ActorPublisherTest.scala b/akka-stream/src/test/scala/akka/stream/tck/ActorPublisherTest.scala new file mode 100644 index 0000000000..b9b3737f21 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/ActorPublisherTest.scala @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.actor.Props +import akka.stream.actor.ActorPublisher +import akka.stream.actor.ActorPublisherMessage.Request +import akka.stream.tck.ActorPublisherTest.TestPublisher +import org.reactivestreams.Publisher + +object ActorPublisherTest { + + case object Produce + case object Loop + case object Complete + + class TestPublisher(allElements: Long) extends ActorPublisher[Int] { + + val source: Iterator[Int] = (if (allElements == Long.MaxValue) 1 to Int.MaxValue else 0 until allElements.toInt).toIterator + + override def receive: Receive = { + case Request(elements) ⇒ + loopDemand() + + case Produce if totalDemand > 0 && !isCompleted && source.hasNext ⇒ onNext(source.next()) + case Produce if !isCompleted && !source.hasNext ⇒ onComplete() + case Produce if isCompleted ⇒ // no-op + case _ ⇒ // no-op + } + + def loopDemand() { + val loopUntil = math.min(100, totalDemand) + 1 to loopUntil.toInt foreach { _ ⇒ self ! Produce } + if (loopUntil > 100) self ! Loop + } + } + +} + +class ActorPublisherTest extends AkkaPublisherVerification[Int](true) { + + override def createPublisher(elements: Long): Publisher[Int] = { + val ref = system.actorOf(Props(classOf[TestPublisher], elements).withDispatcher("akka.test.stream-dispatcher")) + + ActorPublisher(ref) + } +} diff --git a/akka-stream/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala b/akka-stream/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala new file mode 100644 index 0000000000..899e0bbe2d --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/ActorSubscriberTest.scala @@ -0,0 +1,29 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.actor.Props +import akka.stream.actor.ActorSubscriber +import akka.stream.actor.OneByOneRequestStrategy +import akka.stream.actor.RequestStrategy +import org.reactivestreams.Subscriber + +object ActorSubscriberOneByOneRequestTest { + class StrategySubscriber(val requestStrategy: RequestStrategy) extends ActorSubscriber { + + override def receive: Receive = { case _ ⇒ } + } +} + +class ActorSubscriberOneByOneRequestTest extends AkkaSubscriberBlackboxVerification[Int] { + import ActorSubscriberOneByOneRequestTest._ + + override def createSubscriber(): Subscriber[Int] = { + val props = Props(classOf[StrategySubscriber], OneByOneRequestStrategy) + ActorSubscriber(system.actorOf(props.withDispatcher("akka.test.stream-dispatcher"))) + } + + override def createHelperPublisher(elements: Long) = + createSimpleIntPublisher(elements) +} diff --git a/akka-stream/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala b/akka-stream/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala new file mode 100644 index 0000000000..212fb98055 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.actor.ActorSystem +import akka.stream._ +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import akka.testkit.EventFilter +import akka.testkit.TestEvent +import org.reactivestreams.Publisher +import org.reactivestreams.tck.IdentityProcessorVerification +import org.reactivestreams.tck.TestEnvironment +import org.scalatest.testng.TestNGSuiteLike + +import scala.collection.immutable + +abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) + extends IdentityProcessorVerification[T](env, publisherShutdownTimeout) + with TestNGSuiteLike { + + system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception"))) + + /** Readable way to ignore TCK specs; Return this for `createErrorStatePublisher` to skip tests including it */ + final def ignored: Publisher[T] = null + + def this(system: ActorSystem, printlnDebug: Boolean) { + this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis) + } + + def this(printlnDebug: Boolean) { + this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug) + } + + def this() { + this(false) + } + + override def skipStochasticTests() = true // TODO maybe enable? + + override def createErrorStatePublisher(): Publisher[T] = + StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!")) + + def createSimpleIntPublisher(elements: Long)(implicit mat: FlowMaterializer): Publisher[Int] = { + val iterable: immutable.Iterable[Int] = + if (elements == Long.MaxValue) 1 to Int.MaxValue + else 0 until elements.toInt + + Flow(iterable).toPublisher() + } + + /** By default Akka Publishers do not support Fanout! */ + override def maxSupportedSubscribers: Long = 1L +} diff --git a/akka-stream/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala b/akka-stream/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala new file mode 100644 index 0000000000..5b452f0aa1 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.actor.ActorSystem +import akka.stream._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import org.reactivestreams.Publisher +import org.reactivestreams.tck.{ PublisherVerification, TestEnvironment } +import org.scalatest.testng.TestNGSuiteLike +import org.testng.annotations.AfterClass +import scala.concurrent.duration._ + +abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEnvironment, publisherShutdownTimeout: Long) + extends PublisherVerification[T](env, publisherShutdownTimeout) + with TestNGSuiteLike { + + /** Readable way to ignore TCK specs; Return this for `createErrorStatePublisher` to skip tests including it */ + final def ignored: Publisher[T] = null + + def this(system: ActorSystem, printlnDebug: Boolean) { + this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis) + } + + def this(printlnDebug: Boolean) { + this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug) + } + + def this() { + this(false) + } + + implicit val materializer = FlowMaterializer(MaterializerSettings(system).copy(maxInputBufferSize = 512))(system) + + override def skipStochasticTests() = true // TODO maybe enable? + + @AfterClass + def shutdownActorSystem(): Unit = { + system.shutdown() + system.awaitTermination(10.seconds) + } + + override def createErrorStatePublisher(): Publisher[T] = + StreamTestKit.errorPublisher(new Exception("Unable to serve subscribers right now!")) +} diff --git a/akka-stream/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala b/akka-stream/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala new file mode 100644 index 0000000000..b4049eed5c --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.actor.ActorSystem +import akka.stream.FlowMaterializer +import akka.stream.MaterializerSettings +import akka.stream.Timeouts +import akka.stream.scaladsl.Flow +import akka.stream.testkit.AkkaSpec +import org.reactivestreams.Publisher +import org.reactivestreams.tck.SubscriberBlackboxVerification +import org.reactivestreams.tck.SubscriberWhiteboxVerification +import org.reactivestreams.tck.TestEnvironment +import org.scalatest.testng.TestNGSuiteLike +import org.testng.annotations.AfterClass + +import scala.collection.immutable +import scala.concurrent.duration._ + +abstract class AkkaSubscriberBlackboxVerification[T](val system: ActorSystem, env: TestEnvironment) + extends SubscriberBlackboxVerification[T](env) with TestNGSuiteLike + with AkkaSubscriberVerificationLike { + + def this(system: ActorSystem, printlnDebug: Boolean) { + this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug)) + } + + def this(printlnDebug: Boolean) { + this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug) + } + + def this() { + this(false) + } +} + +abstract class AkkaSubscriberWhiteboxVerification[T](val system: ActorSystem, env: TestEnvironment) + extends SubscriberWhiteboxVerification[T](env) with TestNGSuiteLike + with AkkaSubscriberVerificationLike { + + def this(system: ActorSystem, printlnDebug: Boolean) { + this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug)) + } + + def this(printlnDebug: Boolean) { + this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug) + } + + def this() { + this(false) + } +} + +trait AkkaSubscriberVerificationLike { + implicit def system: ActorSystem + + implicit val materializer = FlowMaterializer(MaterializerSettings(system)) + + def createSimpleIntPublisher(elements: Long): Publisher[Int] = { + val iterable: immutable.Iterable[Int] = + if (elements == Long.MaxValue) 1 to Int.MaxValue + else 0 until elements.toInt + + Flow(iterable).toPublisher() + } + + @AfterClass + def shutdownActorSystem(): Unit = { + system.shutdown() + system.awaitTermination(10.seconds) + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala b/akka-stream/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala new file mode 100644 index 0000000000..91eee0810f --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/BlackholeSubscriberTest.scala @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.impl.BlackholeSubscriber +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber + +class BlackholeSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { + + override def createSubscriber(): Subscriber[Int] = + new BlackholeSubscriber[Int](2) + + override def createHelperPublisher(elements: Long): Publisher[Int] = createSimpleIntPublisher(elements) +} + diff --git a/akka-stream/src/test/scala/akka/stream/tck/FanoutProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/tck/FanoutProcessorTest.scala new file mode 100644 index 0000000000..2d2c702c86 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/FanoutProcessorTest.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import java.util.concurrent.atomic.AtomicInteger + +import akka.stream._ +import akka.stream.impl.ActorBasedFlowMaterializer +import akka.stream.impl.Ast +import org.reactivestreams.Processor +import org.reactivestreams.Publisher + +class FanoutProcessorTest extends AkkaIdentityProcessorVerification[Int] { + + val processorCounter = new AtomicInteger + + override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize) + + implicit val materializer = FlowMaterializer(settings)(system) + + val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet() + + val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode( + Ast.FanoutBox(initialBufferSize = maxBufferSize / 2, maxBufferSize), flowName, 1) + + processor.asInstanceOf[Processor[Int, Int]] + } + + override def createHelperPublisher(elements: Long): Publisher[Int] = { + implicit val mat = FlowMaterializer()(system) + + createSimpleIntPublisher(elements)(mat) + } + + /** The Fanout Processor actually supports fanout */ + override def maxElementsFromPublisher = Long.MaxValue + +} diff --git a/akka-stream/src/test/scala/akka/stream/tck/IterablePublisherTest.scala b/akka-stream/src/test/scala/akka/stream/tck/IterablePublisherTest.scala new file mode 100644 index 0000000000..d554532206 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/IterablePublisherTest.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl.Flow +import org.reactivestreams._ + +import scala.collection.immutable + +class IterablePublisherTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + val iterable: immutable.Iterable[Int] = + if (elements == Long.MaxValue) + new immutable.Iterable[Int] { override def iterator = Iterator from 0 } + else + 0 until elements.toInt + + Flow(iterable).toPublisher() + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala b/akka-stream/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala new file mode 100644 index 0000000000..23a6ab9ef5 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala @@ -0,0 +1,21 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl.Flow +import org.reactivestreams.Publisher + +import scala.collection.immutable + +class IteratorPublisherTest extends AkkaPublisherVerification[Int](true) { + + def createPublisher(elements: Long): Publisher[Int] = { + val iterable: immutable.Iterable[Int] = + if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 } + else 0 until elements.toInt + + Flow(iterable).toPublisher() + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala b/akka-stream/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala new file mode 100644 index 0000000000..5f8891d368 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala @@ -0,0 +1,17 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import akka.stream.scaladsl.Flow +import org.reactivestreams._ + +class SimpleCallbackPublisherTest extends AkkaPublisherVerification[Int] { + + def createPublisher(elements: Long): Publisher[Int] = { + val iter = Iterator from 0 + val iter2 = if (elements > 0) iter take elements.toInt else iter + Flow(() ⇒ if (iter2.hasNext) Some(iter2.next()) else None).toPublisher() + } + +} \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/tck/TransformProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/tck/TransformProcessorTest.scala new file mode 100644 index 0000000000..45854094cc --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/tck/TransformProcessorTest.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.tck + +import java.util.concurrent.atomic.AtomicInteger + +import akka.stream._ +import akka.stream.impl.ActorBasedFlowMaterializer +import akka.stream.impl.Ast +import org.reactivestreams.Processor +import org.reactivestreams.Publisher + +class TransformProcessorTest extends AkkaIdentityProcessorVerification[Int] { + + val processorCounter = new AtomicInteger + + override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = { + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize) + + implicit val materializer = FlowMaterializer(settings)(system) + + val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet() + + val mkTransformer = () ⇒ + new Transformer[Any, Any] { + override def onNext(in: Any) = List(in) + } + + val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode( + Ast.Transform("transform", mkTransformer), flowName, 1) + + processor.asInstanceOf[Processor[Int, Int]] + } + + override def createHelperPublisher(elements: Long): Publisher[Int] = { + implicit val mat = FlowMaterializer()(system) + + createSimpleIntPublisher(elements)(mat) + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index 291d54b8ef..8c1c0bbe1d 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -89,11 +89,11 @@ trait ScriptedTest extends Matchers { var currentScript = script var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun) debugLog(s"starting with remainingDemand=$remainingDemand") - var pendingRequests = 0 - var outstandingDemand = 0 + var pendingRequests: Long = 0 + var outstandingDemand: Long = 0 var completed = false - def getNextDemand(): Int = { + def getNextDemand(): Long = { val max = Math.min(remainingDemand, maximumRequest) if (max == 1) { remainingDemand = 0 @@ -107,7 +107,7 @@ trait ScriptedTest extends Matchers { def debugLog(msg: String): Unit = _debugLog :+= msg - def request(demand: Int): Unit = { + def request(demand: Long): Unit = { debugLog(s"test environment requests $demand") downstreamSubscription.request(demand) outstandingDemand += demand diff --git a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 156cc2a3e4..41a1ad6b9e 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -16,7 +16,7 @@ object StreamTestKit { * Subscribes the subscriber and completes after the first request. */ def lazyEmptyPublisher[T]: Publisher[T] = new Publisher[T] { - override def subscribe(subscriber: Subscriber[T]): Unit = + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = subscriber.onSubscribe(CompletedSubscription(subscriber)) } @@ -31,21 +31,21 @@ object StreamTestKit { * Subscribes the subscriber and signals error after the first request. */ def lazyErrorPublisher[T](cause: Throwable): Publisher[T] = new Publisher[T] { - override def subscribe(subscriber: Subscriber[T]): Unit = + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = subscriber.onSubscribe(FailedSubscription(subscriber, cause)) } private case class FailedSubscription[T](subscriber: Subscriber[T], cause: Throwable) extends Subscription { - override def request(elements: Int): Unit = subscriber.onError(cause) + override def request(elements: Long): Unit = subscriber.onError(cause) override def cancel(): Unit = () } private case class CompletedSubscription[T](subscriber: Subscriber[T]) extends Subscription { - override def request(elements: Int): Unit = subscriber.onComplete() + override def request(elements: Long): Unit = subscriber.onComplete() override def cancel(): Unit = () } - class AutoPublisher[T](probe: PublisherProbe[T], initialPendingRequests: Int = 0) { + class AutoPublisher[T](probe: PublisherProbe[T], initialPendingRequests: Long = 0) { val subscription = probe.expectSubscription() var pendingRequests = initialPendingRequests @@ -65,14 +65,14 @@ object StreamTestKit { sealed trait PublisherEvent case class Subscribe(subscription: Subscription) extends PublisherEvent case class CancelSubscription(subscription: Subscription) extends PublisherEvent - case class RequestMore(subscription: Subscription, elements: Int) extends PublisherEvent + case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent - case class PublisherProbeSubscription[I](subscriber: Subscriber[I], publisherProbe: TestProbe) extends Subscription { - def request(elements: Int): Unit = publisherProbe.ref ! RequestMore(this, elements) + case class PublisherProbeSubscription[I](subscriber: Subscriber[_ >: I], publisherProbe: TestProbe) extends Subscription { + def request(elements: Long): Unit = publisherProbe.ref ! RequestMore(this, elements) def cancel(): Unit = publisherProbe.ref ! CancelSubscription(this) - def expectRequest(n: Int): Unit = publisherProbe.expectMsg(RequestMore(this, n)) - def expectRequest(): Int = publisherProbe.expectMsgPF() { + def expectRequest(n: Long): Unit = publisherProbe.expectMsg(RequestMore(this, n)) + def expectRequest(): Long = publisherProbe.expectMsgPF() { case RequestMore(sub, n) if sub eq this ⇒ n } @@ -142,7 +142,7 @@ object StreamTestKit { case class PublisherProbe[I]()(implicit system: ActorSystem) extends Publisher[I] { val probe: TestProbe = TestProbe() - def subscribe(subscriber: Subscriber[I]): Unit = { + def subscribe(subscriber: Subscriber[_ >: I]): Unit = { val subscription: PublisherProbeSubscription[I] = new PublisherProbeSubscription[I](subscriber, probe) probe.ref ! Subscribe(subscription) subscriber.onSubscribe(subscription) diff --git a/akka-stream/src/test/scala/akka/stream/testkit2/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit2/ScriptedTest.scala index b44c4188e2..65670d0498 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit2/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit2/ScriptedTest.scala @@ -90,8 +90,8 @@ trait ScriptedTest extends Matchers { var currentScript = script var remainingDemand = script.expectedOutputs.size + ThreadLocalRandom.current().nextInt(1, maximumOverrun) debugLog(s"starting with remainingDemand=$remainingDemand") - var pendingRequests = 0 - var outstandingDemand = 0 + var pendingRequests = 0L + var outstandingDemand = 0L var completed = false def getNextDemand(): Int = {