From f5cc1be3f9baf02e19aaf0e7e1f1f516cb99688a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 15 May 2014 08:49:37 +0200 Subject: [PATCH] +str #15195 Add internal SynchronousProducerFromIterable --- .../SynchronousProducerFromIterable.scala | 96 +++++++++ .../SynchronousProducerFromIterableSpec.scala | 198 ++++++++++++++++++ 2 files changed, 294 insertions(+) create mode 100644 akka-stream/src/main/scala/akka/stream/impl/SynchronousProducerFromIterable.scala create mode 100644 akka-stream/src/test/scala/akka/stream/impl/SynchronousProducerFromIterableSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousProducerFromIterable.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousProducerFromIterable.scala new file mode 100644 index 0000000000..6816d75a7c --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousProducerFromIterable.scala @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.collection.immutable +import org.reactivestreams.api.Consumer +import org.reactivestreams.spi.Subscription +import scala.annotation.tailrec +import org.reactivestreams.spi.Subscriber +import org.reactivestreams.spi.Publisher +import org.reactivestreams.api.Producer +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +private[akka] object SynchronousProducerFromIterable { + def apply[T](iterable: immutable.Iterable[T]): Producer[T] = + if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[T]] + else new SynchronousProducerFromIterable(iterable) + + private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription { + var done = false + var demand = 0 + var pushing = false + + override def cancel(): Unit = + done = true + + override def requestMore(elements: Int): Unit = { + @tailrec def pushNext(): Unit = { + if (!done) + if (iterator.isEmpty) { + done = true + subscriber.onComplete() + } else if (demand != 0) { + demand -= 1 + subscriber.onNext(iterator.next()) + pushNext() + } + } + + if (pushing) + demand += elements // reentrant call to requestMore from onNext + else { + try { + pushing = true + demand = elements + pushNext() + } catch { + case NonFatal(e) ⇒ + done = true + subscriber.onError(e) + } finally { pushing = false } + } + } + } +} + +/** + * INTERNAL API + * Producer that will push all requested elements from the iterator of the iterable + * to the subscriber in the calling thread of `requestMore`. + * + * It is only intended to be used with iterators over static collections. + * Do *NOT* use it for iterators on lazy collections or other implementations that do more + * than merely retrieve an element in their `next()` method! + * + * It is the responsibility of the subscriber to provide necessary memory visibility + * if calls to `requestMore` and `cancel` are performed from different threads. + * For example, usage from an actor is fine. Concurrent calls to the subscription is not allowed. + * Reentrant calls to `requestMore` directly from `onNext` are supported by this producer. + */ +private[akka] class SynchronousProducerFromIterable[T](private val iterable: immutable.Iterable[T]) + extends Producer[T] with Publisher[T] { + + import SynchronousProducerFromIterable.IteratorSubscription + + override def getPublisher: Publisher[T] = this + + override def subscribe(subscriber: Subscriber[T]): Unit = + subscriber.onSubscribe(new IteratorSubscription(subscriber, iterable.iterator)) + + override def produceTo(consumer: Consumer[T]): Unit = + getPublisher.subscribe(consumer.getSubscriber) + + override def equals(o: Any): Boolean = o match { + case other: SynchronousProducerFromIterable[T] ⇒ iterable == other.iterable + case _ ⇒ false + } + + override def hashCode: Int = iterable.hashCode + + override def toString: String = s"SynchronousProducerFromIterable(${iterable.mkString(", ")})" +} diff --git a/akka-stream/src/test/scala/akka/stream/impl/SynchronousProducerFromIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/impl/SynchronousProducerFromIterableSpec.scala new file mode 100644 index 0000000000..57a8a13b46 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/impl/SynchronousProducerFromIterableSpec.scala @@ -0,0 +1,198 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.collection.immutable +import scala.concurrent.duration._ +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit +import org.reactivestreams.api.Consumer +import org.reactivestreams.spi.Subscriber +import akka.testkit.TestProbe +import org.reactivestreams.spi.Subscription + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class SynchronousProducerFromIterableSpec extends AkkaSpec { + + "A SynchronousProducerFromIterable" must { + "produce elements" in { + val p = SynchronousProducerFromIterable(List(1, 2, 3)) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(1) + c.expectNext(1) + c.expectNoMsg(100.millis) + sub.requestMore(2) + c.expectNext(2) + c.expectNext(3) + c.expectComplete() + } + + "complete empty" in { + val p = SynchronousProducerFromIterable(List.empty[Int]) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + c.expectComplete() + c.expectNoMsg(100.millis) + + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c2) + c2.expectComplete() + } + + "produce elements with multiple subscribers" in { + val p = SynchronousProducerFromIterable(List(1, 2, 3)) + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c1) + p.produceTo(c2) + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub1.requestMore(1) + sub2.requestMore(2) + c1.expectNext(1) + c2.expectNext(1) + c2.expectNext(2) + c1.expectNoMsg(100.millis) + c2.expectNoMsg(100.millis) + sub1.requestMore(2) + sub2.requestMore(2) + c1.expectNext(2) + c1.expectNext(3) + c2.expectNext(3) + c1.expectComplete() + c2.expectComplete() + } + + "produce elements to later subscriber" in { + val p = SynchronousProducerFromIterable(List(1, 2, 3)) + val c1 = StreamTestKit.consumerProbe[Int] + val c2 = StreamTestKit.consumerProbe[Int] + p.produceTo(c1) + + val sub1 = c1.expectSubscription() + sub1.requestMore(1) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + p.produceTo(c2) + val sub2 = c2.expectSubscription() + sub2.requestMore(2) + // starting from first element, new iterator per subscriber + c2.expectNext(1) + c2.expectNext(2) + c2.expectNoMsg(100.millis) + sub2.requestMore(1) + c2.expectNext(3) + c2.expectComplete() + sub1.requestMore(2) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + } + + "not produce after cancel" in { + val p = SynchronousProducerFromIterable(List(1, 2, 3)) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(1) + c.expectNext(1) + sub.cancel() + sub.requestMore(2) + c.expectNoMsg(100.millis) + } + + "not produce after cancel from onNext" in { + val p = SynchronousProducerFromIterable(List(1, 2, 3, 4, 5)) + val probe = TestProbe() + p.produceTo(new Consumer[Int] with Subscriber[Int] { + var sub: Subscription = _ + override val getSubscriber: Subscriber[Int] = this + override def onError(cause: Throwable): Unit = probe.ref ! cause + override def onComplete(): Unit = probe.ref ! "complete" + override def onNext(element: Int): Unit = { + probe.ref ! element + if (element == 3) sub.cancel() + } + override def onSubscribe(subscription: Subscription): Unit = { + sub = subscription + sub.requestMore(10) + } + }) + + probe.expectMsg(1) + probe.expectMsg(2) + probe.expectMsg(3) + probe.expectNoMsg(500.millis) + } + + "produce onError when iterator throws" in { + val iterable = new immutable.Iterable[Int] { + override def iterator: Iterator[Int] = new Iterator[Int] { + private var n = 0 + override def hasNext: Boolean = n < 3 + override def next(): Int = { + n += 1 + if (n == 2) throw new IllegalStateException("not two") + n + } + } + } + val p = SynchronousProducerFromIterable(iterable) + val c = StreamTestKit.consumerProbe[Int] + p.produceTo(c) + val sub = c.expectSubscription() + sub.requestMore(1) + c.expectNext(1) + c.expectNoMsg(100.millis) + sub.requestMore(2) + c.expectError.getMessage should be("not two") + sub.requestMore(2) + c.expectNoMsg(100.millis) + } + + "handle reentrant requests" in { + val N = 50000 + val p = SynchronousProducerFromIterable(1 to N) + val probe = TestProbe() + p.produceTo(new Consumer[Int] with Subscriber[Int] { + var sub: Subscription = _ + override val getSubscriber: Subscriber[Int] = this + override def onError(cause: Throwable): Unit = probe.ref ! cause + override def onComplete(): Unit = probe.ref ! "complete" + override def onNext(element: Int): Unit = { + probe.ref ! element + sub.requestMore(1) + + } + override def onSubscribe(subscription: Subscription): Unit = { + sub = subscription + sub.requestMore(1) + } + }) + probe.receiveN(N) should be((1 to N).toVector) + probe.expectMsg("complete") + } + + "have value equality of producer" in { + val p1 = SynchronousProducerFromIterable(List(1, 2, 3)) + val p2 = SynchronousProducerFromIterable(List(1, 2, 3)) + p1 should be(p2) + p2 should be(p1) + val p3 = SynchronousProducerFromIterable(List(1, 2, 3, 4)) + p1 should not be (p3) + p3 should not be (p1) + val p4 = SynchronousProducerFromIterable(Vector.empty[String]) + val p5 = SynchronousProducerFromIterable(Set.empty[String]) + p1 should not be (p4) + p4 should be(p5) + p5 should be(p4) + } + + "have nice toString" in { + SynchronousProducerFromIterable(List(1, 2, 3)).toString should be("SynchronousProducerFromIterable(1, 2, 3)") + } + } +}