From 94cdfa06439eb696b1b78534747ea4ee1ef774e7 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 18 Nov 2014 20:45:24 +0100 Subject: [PATCH] +str - Removes AsynchronousIterablePublisher --- .../AsynchronousIterablePublisherSpec.scala | 180 ------------------ .../akka/stream/actor/ActorPublisher.scala | 7 +- .../impl/AsynchronousIterablePublisher.scala | 146 -------------- 3 files changed, 4 insertions(+), 329 deletions(-) delete mode 100644 akka-stream-tests/src/test/scala/akka/stream/impl/AsynchronousIterablePublisherSpec.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/AsynchronousIterablePublisher.scala diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/AsynchronousIterablePublisherSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/AsynchronousIterablePublisherSpec.scala deleted file mode 100644 index d437dfd4c5..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/AsynchronousIterablePublisherSpec.scala +++ /dev/null @@ -1,180 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.concurrent.duration._ -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.StreamTestKit -import akka.testkit.TestProbe -import org.reactivestreams.{ Subscriber, Subscription } - -class AsynchronousIterablePublisherSpec extends AkkaSpec { - def executor = ExecutionContext.global - "A SynchronousPublisherFromIterable" must { - "produce elements" in { - val p = AsynchronousIterablePublisher(1 to 3, "range", executor) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(1) - c.expectNext(1) - c.expectNoMsg(100.millis) - sub.request(2) - c.expectNext(2) - c.expectNext(3) - c.expectComplete() - } - - "complete empty" in { - val p = AsynchronousIterablePublisher(List.empty[Int], "empty", executor) - def verifyNewSubscriber(i: Int): Unit = { - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - c.expectSubscription() - c.expectComplete() - c.expectNoMsg(100.millis) - } - - 1 to 10 foreach verifyNewSubscriber - } - - "produce elements with multiple subscribers" in { - val p = AsynchronousIterablePublisher(1 to 3, "range", executor) - val c1 = StreamTestKit.SubscriberProbe[Int]() - val c2 = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c1) - p.subscribe(c2) - val sub1 = c1.expectSubscription() - val sub2 = c2.expectSubscription() - sub1.request(1) - sub2.request(2) - c1.expectNext(1) - c2.expectNext(1) - c2.expectNext(2) - c1.expectNoMsg(100.millis) - c2.expectNoMsg(100.millis) - sub1.request(2) - sub2.request(2) - c1.expectNext(2) - c1.expectNext(3) - c2.expectNext(3) - c1.expectComplete() - c2.expectComplete() - } - - "produce elements to later subscriber" in { - val p = AsynchronousIterablePublisher(1 to 3, "range", executor) - val c1 = StreamTestKit.SubscriberProbe[Int]() - val c2 = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c1) - - val sub1 = c1.expectSubscription() - sub1.request(1) - c1.expectNext(1) - c1.expectNoMsg(100.millis) - p.subscribe(c2) - val sub2 = c2.expectSubscription() - sub2.request(2) - // starting from first element, new iterator per subscriber - c2.expectNext(1) - c2.expectNext(2) - c2.expectNoMsg(100.millis) - sub2.request(1) - c2.expectNext(3) - c2.expectComplete() - sub1.request(2) - c1.expectNext(2) - c1.expectNext(3) - c1.expectComplete() - } - - "not produce after cancel" in { - val p = AsynchronousIterablePublisher(1 to 3, "range", executor) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(1) - c.expectNext(1) - sub.cancel() - sub.request(2) - c.expectNoMsg(100.millis) - } - - "not produce after cancel from onNext" in { - val p = AsynchronousIterablePublisher(1 to 5, "range", executor) - val probe = TestProbe() - p.subscribe(new Subscriber[Int] { - var sub: Subscription = _ - override def onError(cause: Throwable): Unit = probe.ref ! cause - override def onComplete(): Unit = probe.ref ! "complete" - override def onNext(element: Int): Unit = { - probe.ref ! element - if (element == 3) sub.cancel() - } - override def onSubscribe(subscription: Subscription): Unit = { - sub = subscription - sub.request(10) - } - }) - - probe.expectMsg(1) - probe.expectMsg(2) - probe.expectMsg(3) - probe.expectNoMsg(500.millis) - } - - "produce onError when iterator throws" in { - val iterable = new immutable.Iterable[Int] { - override def iterator: Iterator[Int] = new Iterator[Int] { - private var n = 0 - override def hasNext: Boolean = n < 3 - override def next(): Int = { - n += 1 - if (n == 2) throw new IllegalStateException("not two") - n - } - } - } - val p = AsynchronousIterablePublisher(iterable, "iterable", executor) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(1) - c.expectNext(1) - c.expectNoMsg(100.millis) - sub.request(2) - c.expectError.getMessage should be("not two") - sub.request(2) - c.expectNoMsg(100.millis) - } - - "handle reentrant requests" in { - val N = 50000 - val p = AsynchronousIterablePublisher(1 to N, "range", executor) - val probe = TestProbe() - p.subscribe(new Subscriber[Int] { - var sub: Subscription = _ - override def onError(cause: Throwable): Unit = probe.ref ! cause - override def onComplete(): Unit = probe.ref ! "complete" - override def onNext(element: Int): Unit = { - probe.ref ! element - sub.request(1) - - } - override def onSubscribe(subscription: Subscription): Unit = { - sub = subscription - sub.request(1) - } - }) - probe.receiveN(N) should be((1 to N).toVector) - probe.expectMsg("complete") - } - - "have a toString that doesn't OOME" in { - AsynchronousIterablePublisher(1 to 3, "range", executor).toString should be("range") - } - } -} diff --git a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala index 37ade3dd93..b88c02d857 100644 --- a/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala @@ -342,9 +342,10 @@ private[akka] class ActorPublisherSubscription[T](ref: ActorRef) extends Subscri import ActorPublisher._ import ActorPublisherMessage._ - override def request(n: Long): Unit = - if (n <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) - else ref ! Request(n) + override def request(n: Long): Unit = { + ReactiveStreamsConstants.validateRequest(n) + ref ! Request(n) + } override def cancel(): Unit = ref ! Cancel } diff --git a/akka-stream/src/main/scala/akka/stream/impl/AsynchronousIterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/AsynchronousIterablePublisher.scala deleted file mode 100644 index d51244df36..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/AsynchronousIterablePublisher.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.stream.ReactiveStreamsConstants -import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean } -import org.reactivestreams.{ Publisher, Subscriber, Subscription } - -import scala.annotation.tailrec -import scala.collection.immutable -import scala.concurrent.ExecutionContext -import scala.util.control.NonFatal - -/** - * INTERNAL API - */ -private[akka] object AsynchronousIterablePublisher { - def apply[T](iterable: immutable.Iterable[T], name: String, executor: ExecutionContext): Publisher[T] = - new AsynchronousIterablePublisher(iterable, name, executor) - - object IteratorSubscription { - def apply[T](subscriber: Subscriber[T], iterator: Iterator[T], executor: ExecutionContext): Unit = - new IteratorSubscription[T](subscriber, iterator, executor).init() - } - - private[this] sealed trait State - private[this] final case object Unitialized extends State - private[this] final case object Initializing extends State - private[this] final case object Initialized extends State - private[this] final case object Cancelled extends State - private[this] final case object Completed extends State - private[this] final case object Errored extends State - - private[this] final class IteratorSubscription[T](subscriber: Subscriber[T], - iterator: Iterator[T], // TODO null out iterator when completed? - executor: ExecutionContext) - extends AtomicLong with Subscription with Runnable { - import ReactiveStreamsConstants._ - // FIXME if we want to get crazy, cache-line pad this class - private[this] val scheduled = new AtomicBoolean(false) - // FIXME if we want to get even more crazy, we could encode these states into an AtomicInteger and merge it with scheduled - @volatile private[this] var state: State = Unitialized - - // TODO/FIXME technically we could use the fact that we're an AtomicLong to ensure visibility of this - //Should only be called once, please - def init(): Unit = if (state == Unitialized && scheduled.compareAndSet(false, true)) executor.execute(this) - - override def cancel(): Unit = state = Cancelled - - override def request(elements: Long): Unit = { - ReactiveStreamsConstants.validateRequest(elements) - if (getAndAdd(elements) == 0 && scheduled.compareAndSet(false, true)) executor.execute(this) // FIXME overflow protection - } - - override def run(): Unit = try { - def scheduleForExecutionIfHasDemand(): Unit = - if (get() > 0 && scheduled.compareAndSet(false, true)) executor.execute(this) // loop via executor - - @tailrec def loop(): Unit = { - state match { - case current @ (Initialized | Initializing) ⇒ - // The only transition that can occur from the outside is to Cancelled - getAndSet(0) match { - case 0 if current eq Initialized ⇒ - scheduled.set(false) - scheduleForExecutionIfHasDemand() - case n ⇒ - - @tailrec def push(n: Long): State = - state match { // Important to do the volatile read here since we are checking for external cancellation - case c @ Cancelled ⇒ c - case s if iterator.hasNext ⇒ - if (n > 0) { - tryOnNext(subscriber, iterator.next()) - push(n - 1) - } else s - case _ ⇒ Completed - } - - (try push(n): AnyRef catch { - case NonFatal(t: AnyRef) ⇒ t - }) match { - case Initialized ⇒ - loop() - case Unitialized ⇒ - state = Errored - tryOnError(subscriber, new IllegalStateException("BUG: AsynchronousIterablePublisher was Uninitialized!")) - case Initializing ⇒ - state = Initialized - loop() - case Cancelled | Errored ⇒ () - case Completed ⇒ - state = Completed - tryOnComplete(subscriber) - case s: SpecViolation ⇒ - state = Errored - executor.reportFailure(s.violation) - case t: Throwable ⇒ - state = Errored - tryOnError(subscriber, t) - } - } - case Unitialized ⇒ - state = Initializing - tryOnSubscribe(subscriber, this) // If this fails, this is a spec violation - loop() - case Cancelled | Completed | Errored ⇒ () // Do nothing - } - } - - loop() - } catch { - case NonFatal(e) ⇒ executor.reportFailure(e) // This should never happen. Last words. - } - } -} - -/** - * INTERNAL API - * Publisher that will push all requested elements from the iterator of the iterable - * to the subscriber in the calling thread of `requestMore`. - * - * It is only intended to be used with iterators over static collections. - * Do *NOT* use it for iterators on lazy collections or other implementations that do more - * than merely retrieve an element in their `next()` method! - * - * It is the responsibility of the subscriber to provide necessary memory visibility - * if calls to `requestMore` and `cancel` are performed from different threads. - * For example, usage from an actor is fine. Concurrent calls to the subscription is not allowed. - * Reentrant calls to `requestMore` directly from `onNext` are supported by this publisher. - */ -private[akka] final class AsynchronousIterablePublisher[T]( - private[this] val iterable: immutable.Iterable[T], - private[this] val name: String, - private[this] val executor: ExecutionContext) extends Publisher[T] { - - import AsynchronousIterablePublisher.IteratorSubscription - - override def subscribe(subscriber: Subscriber[_ >: T]): Unit = - try IteratorSubscription(subscriber, iterable.iterator, executor) catch { - case NonFatal(t) ⇒ ErrorPublisher(t, name).subscribe(subscriber) // FIXME this is dodgy - } - - override def toString: String = name -}