Merge pull request #17675 from akka/wip-remove-synchronousiterablepublisher-√
=str - Wip remove synchronousiterablepublisher √
This commit is contained in:
commit
ca5d27abbd
10 changed files with 69 additions and 418 deletions
|
|
@ -0,0 +1,20 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.tck
|
||||
|
||||
import akka.stream.impl.SingleElementPublisher
|
||||
|
||||
import scala.collection.immutable
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
import org.reactivestreams._
|
||||
|
||||
class SingleElementPublisherTest extends AkkaPublisherVerification[Int] {
|
||||
|
||||
def createPublisher(elements: Long): Publisher[Int] = {
|
||||
Source(SingleElementPublisher(0, "single-element-publisher")).runWith(Sink.publisher)
|
||||
}
|
||||
|
||||
override def maxElementsFromPublisher(): Long = 1
|
||||
}
|
||||
|
|
@ -1,25 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.tck
|
||||
|
||||
import akka.stream.impl.SynchronousIterablePublisher
|
||||
|
||||
import scala.collection.immutable
|
||||
import akka.stream.scaladsl.Sink
|
||||
import akka.stream.scaladsl.Source
|
||||
import org.reactivestreams._
|
||||
|
||||
class SyncIterablePublisherTest extends AkkaPublisherVerification[Int] {
|
||||
|
||||
def createPublisher(elements: Long): Publisher[Int] = {
|
||||
val iterable: immutable.Iterable[Int] =
|
||||
if (elements >= 10000)
|
||||
0 until 10000 // this publisher is not intended to be used for large collections
|
||||
else
|
||||
0 until elements.toInt
|
||||
|
||||
Source(SynchronousIterablePublisher(iterable, "synchronous-iterable-publisher")).runWith(Sink.publisher)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -20,9 +20,8 @@ class FlowFromFutureSpec extends AkkaSpec {
|
|||
|
||||
"A Flow based on a Future" must {
|
||||
"produce one element from already successful Future" in assertAllStagesStopped {
|
||||
val p = Source(Future.successful(1)).runWith(Sink.publisher)
|
||||
val c = TestSubscriber.manualProbe[Int]()
|
||||
p.subscribe(c)
|
||||
val p = Source(Future.successful(1)).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c)
|
||||
val sub = c.expectSubscription()
|
||||
c.expectNoMsg(100.millis)
|
||||
sub.request(1)
|
||||
|
|
@ -32,17 +31,15 @@ class FlowFromFutureSpec extends AkkaSpec {
|
|||
|
||||
"produce error from already failed Future" in assertAllStagesStopped {
|
||||
val ex = new RuntimeException("test") with NoStackTrace
|
||||
val p = Source(Future.failed[Int](ex)).runWith(Sink.publisher)
|
||||
val c = TestSubscriber.manualProbe[Int]()
|
||||
p.subscribe(c)
|
||||
Source(Future.failed[Int](ex)).runWith(Sink.publisher).subscribe(c)
|
||||
c.expectSubscriptionAndError(ex)
|
||||
}
|
||||
|
||||
"produce one element when Future is completed" in assertAllStagesStopped {
|
||||
val promise = Promise[Int]()
|
||||
val p = Source(promise.future).runWith(Sink.publisher)
|
||||
val c = TestSubscriber.manualProbe[Int]()
|
||||
p.subscribe(c)
|
||||
Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c)
|
||||
val sub = c.expectSubscription()
|
||||
sub.request(1)
|
||||
c.expectNoMsg(100.millis)
|
||||
|
|
@ -54,9 +51,8 @@ class FlowFromFutureSpec extends AkkaSpec {
|
|||
|
||||
"produce one element when Future is completed but not before request" in {
|
||||
val promise = Promise[Int]()
|
||||
val p = Source(promise.future).runWith(Sink.publisher)
|
||||
val c = TestSubscriber.manualProbe[Int]()
|
||||
p.subscribe(c)
|
||||
Source(promise.future).runWith(Sink.fanoutPublisher(1, 1)).subscribe(c)
|
||||
val sub = c.expectSubscription()
|
||||
promise.success(1)
|
||||
c.expectNoMsg(200.millis)
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import akka.stream.ActorFlowMaterializer
|
|||
import akka.stream.ActorFlowMaterializerSettings
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.impl.SynchronousIterablePublisher
|
||||
import org.reactivestreams.Subscription
|
||||
import akka.testkit.TestProbe
|
||||
import org.reactivestreams.Subscriber
|
||||
|
|
@ -71,79 +70,6 @@ class FlowIterableSpec extends AbstractFlowIteratorSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class SynchronousIterableSpec extends AbstractFlowIteratorSpec {
|
||||
override def testName = "A Flow based on small collection"
|
||||
override def createSource(elements: Int): Source[Int, Unit] =
|
||||
Source(SynchronousIterablePublisher(1 to elements, "range"))
|
||||
|
||||
"not produce after cancel from onNext" in {
|
||||
val p = SynchronousIterablePublisher(1 to 5, "range")
|
||||
val probe = TestProbe()
|
||||
p.subscribe(new Subscriber[Int] {
|
||||
var sub: Subscription = _
|
||||
override def onError(cause: Throwable): Unit = probe.ref ! cause
|
||||
override def onComplete(): Unit = probe.ref ! "complete"
|
||||
override def onNext(element: Int): Unit = {
|
||||
probe.ref ! element
|
||||
if (element == 3) sub.cancel()
|
||||
}
|
||||
override def onSubscribe(subscription: Subscription): Unit = {
|
||||
sub = subscription
|
||||
sub.request(10)
|
||||
}
|
||||
})
|
||||
|
||||
probe.expectMsg(1)
|
||||
probe.expectMsg(2)
|
||||
probe.expectMsg(3)
|
||||
probe.expectNoMsg(500.millis)
|
||||
}
|
||||
|
||||
"produce onError when iterator throws" in {
|
||||
val iterable = new immutable.Iterable[Int] {
|
||||
override def iterator: Iterator[Int] =
|
||||
(1 to 3).iterator.map(x ⇒ if (x == 2) throw new IllegalStateException("not two") else x)
|
||||
}
|
||||
val p = SynchronousIterablePublisher(iterable, "iterable")
|
||||
val c = TestSubscriber.manualProbe[Int]()
|
||||
p.subscribe(c)
|
||||
val sub = c.expectSubscription()
|
||||
sub.request(1)
|
||||
c.expectNext(1)
|
||||
c.expectNoMsg(100.millis)
|
||||
sub.request(2)
|
||||
c.expectError.getMessage should be("not two")
|
||||
sub.request(2)
|
||||
c.expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
"handle reentrant requests" in {
|
||||
val N = 50000
|
||||
val p = SynchronousIterablePublisher(1 to N, "range")
|
||||
val probe = TestProbe()
|
||||
p.subscribe(new Subscriber[Int] {
|
||||
var sub: Subscription = _
|
||||
override def onError(cause: Throwable): Unit = probe.ref ! cause
|
||||
override def onComplete(): Unit = probe.ref ! "complete"
|
||||
override def onNext(element: Int): Unit = {
|
||||
probe.ref ! element
|
||||
sub.request(1)
|
||||
|
||||
}
|
||||
override def onSubscribe(subscription: Subscription): Unit = {
|
||||
sub = subscription
|
||||
sub.request(1)
|
||||
}
|
||||
})
|
||||
probe.receiveN(N) should be((1 to N).toVector)
|
||||
probe.expectMsg("complete")
|
||||
}
|
||||
|
||||
"have a toString that doesn't OOME" in {
|
||||
SynchronousIterablePublisher(1 to 3, "range").toString should be("range")
|
||||
}
|
||||
}
|
||||
|
||||
abstract class AbstractFlowIteratorSpec extends AkkaSpec {
|
||||
|
||||
val settings = ActorFlowMaterializerSettings(system)
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio
|
|||
val result = new java.util.ArrayList[T]
|
||||
attributes.foreach { a ⇒
|
||||
if (c.isInstance(a))
|
||||
result.add(a.asInstanceOf[T])
|
||||
result.add(c.cast(a))
|
||||
}
|
||||
result
|
||||
}
|
||||
|
|
@ -47,8 +47,8 @@ final case class OperationAttributes private (attributes: immutable.Seq[Operatio
|
|||
* If no such attribute exists the `default` value is returned.
|
||||
*/
|
||||
def getAttribute[T <: Attribute](c: Class[T], default: T): T =
|
||||
attributes.find(a ⇒ c.isInstance(a)) match {
|
||||
case Some(a) ⇒ a.asInstanceOf[T]
|
||||
attributes.find(c.isInstance) match {
|
||||
case Some(a) ⇒ c.cast(a)
|
||||
case None ⇒ default
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -3,8 +3,7 @@
|
|||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import org.reactivestreams.{ Subscriber, Publisher }
|
||||
import org.reactivestreams.Subscription
|
||||
import org.reactivestreams.{ Subscriber, Publisher, Subscription }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -40,6 +39,36 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend
|
|||
override def toString: String = name
|
||||
}
|
||||
|
||||
private[akka] final case class SingleElementPublisher[T](value: T, name: String) extends Publisher[T] {
|
||||
import ReactiveStreamsCompliance._
|
||||
|
||||
private[this] class SingleElementSubscription(subscriber: Subscriber[_ >: T]) extends Subscription {
|
||||
private[this] var done: Boolean = false
|
||||
override def cancel(): Unit = done = true
|
||||
|
||||
override def request(elements: Long): Unit = if (!done) {
|
||||
if (elements < 1) rejectDueToNonPositiveDemand(subscriber)
|
||||
done = true
|
||||
try {
|
||||
tryOnNext(subscriber, value)
|
||||
tryOnComplete(subscriber)
|
||||
} catch {
|
||||
case _: SpecViolation ⇒ // TODO log?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
|
||||
try {
|
||||
requireNonNullSubscriber(subscriber)
|
||||
tryOnSubscribe(subscriber, new SingleElementSubscription(subscriber))
|
||||
} catch {
|
||||
case _: SpecViolation ⇒ // nothing we can do
|
||||
}
|
||||
def apply[T]: Publisher[T] = this.asInstanceOf[Publisher[T]]
|
||||
override def toString: String = name
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* This is only a legal subscription when it is immediately followed by
|
||||
|
|
|
|||
|
|
@ -1,147 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import scala.util.Try
|
||||
import akka.actor._
|
||||
import akka.stream.ActorFlowMaterializerSettings
|
||||
import akka.pattern.pipe
|
||||
import org.reactivestreams.Subscriber
|
||||
import org.reactivestreams.Subscription
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object FuturePublisher {
|
||||
def props(future: Future[Any], settings: ActorFlowMaterializerSettings): Props =
|
||||
Props(new FuturePublisher(future, settings)).withDispatcher(settings.dispatcher).withDeploy(Deploy.local)
|
||||
|
||||
object FutureSubscription {
|
||||
final case class Cancel(subscription: FutureSubscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
final case class RequestMore(subscription: FutureSubscription, elements: Long) extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
||||
}
|
||||
|
||||
case class FutureValue(value: Any) extends NoSerializationVerificationNeeded
|
||||
|
||||
class FutureSubscription(ref: ActorRef) extends Subscription {
|
||||
import akka.stream.impl.FuturePublisher.FutureSubscription._
|
||||
def cancel(): Unit = ref ! Cancel(this)
|
||||
def request(elements: Long): Unit = ref ! RequestMore(this, elements)
|
||||
override def toString = "FutureSubscription"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
// FIXME why do we need to have an actor to drive a Future?
|
||||
private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMaterializerSettings) extends Actor {
|
||||
import akka.stream.impl.FuturePublisher._
|
||||
import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel
|
||||
import akka.stream.impl.FuturePublisher.FutureSubscription.RequestMore
|
||||
import ReactiveStreamsCompliance._
|
||||
|
||||
var exposedPublisher: ActorPublisher[Any] = _
|
||||
var subscribers = Map.empty[Subscriber[Any], FutureSubscription]
|
||||
var subscriptions = Map.empty[FutureSubscription, Subscriber[Any]]
|
||||
var subscriptionsReadyForPush = Set.empty[FutureSubscription]
|
||||
var futureValue: Option[Try[Any]] = future.value
|
||||
var shutdownReason: Option[Throwable] = ActorPublisher.SomeNormalShutdownReason
|
||||
|
||||
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
|
||||
|
||||
def receive = {
|
||||
case ExposedPublisher(publisher) ⇒
|
||||
exposedPublisher = publisher
|
||||
context.become(waitingForFirstSubscriber)
|
||||
case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher")
|
||||
}
|
||||
|
||||
def waitingForFirstSubscriber: Receive = {
|
||||
case SubscribePending ⇒
|
||||
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
|
||||
import context.dispatcher
|
||||
future.map(FutureValue) pipeTo (self)
|
||||
context.become(active)
|
||||
}
|
||||
|
||||
def active: Receive = {
|
||||
case SubscribePending ⇒
|
||||
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
|
||||
case RequestMore(subscription, elements) ⇒ // FIXME we aren't tracking demand per subscription so we don't check for overflow. We should.
|
||||
if (subscriptions.contains(subscription)) {
|
||||
if (elements < 1) {
|
||||
val subscriber = subscriptions(subscription)
|
||||
rejectDueToNonPositiveDemand(subscriber)
|
||||
removeSubscriber(subscriber)
|
||||
} else {
|
||||
subscriptionsReadyForPush += subscription
|
||||
push(subscriptions(subscription))
|
||||
}
|
||||
}
|
||||
case Cancel(subscription) if subscriptions.contains(subscription) ⇒
|
||||
removeSubscriber(subscriptions(subscription))
|
||||
case Status.Failure(ex) ⇒
|
||||
if (futureValue.isEmpty) {
|
||||
futureValue = Some(Failure(ex))
|
||||
pushToAll()
|
||||
}
|
||||
case FutureValue(value) ⇒
|
||||
if (futureValue.isEmpty) {
|
||||
futureValue = Some(Success(value))
|
||||
pushToAll()
|
||||
}
|
||||
}
|
||||
|
||||
def pushToAll(): Unit = subscriptionsReadyForPush foreach { subscription ⇒ push(subscriptions(subscription)) }
|
||||
|
||||
def push(subscriber: Subscriber[Any]): Unit =
|
||||
futureValue match {
|
||||
case Some(someValue) ⇒ try someValue match {
|
||||
case Success(value) ⇒
|
||||
tryOnNext(subscriber, value)
|
||||
tryOnComplete(subscriber)
|
||||
case Failure(t) ⇒
|
||||
shutdownReason = Some(t)
|
||||
tryOnError(subscriber, t)
|
||||
} catch {
|
||||
case _: SpecViolation ⇒ // continue
|
||||
} finally {
|
||||
removeSubscriber(subscriber)
|
||||
}
|
||||
case None ⇒ // not completed yet
|
||||
}
|
||||
|
||||
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
|
||||
if (subscribers.contains(subscriber))
|
||||
rejectDuplicateSubscriber(subscriber)
|
||||
else {
|
||||
val subscription = new FutureSubscription(self)
|
||||
subscribers = subscribers.updated(subscriber, subscription)
|
||||
subscriptions = subscriptions.updated(subscription, subscriber)
|
||||
tryOnSubscribe(subscriber, subscription)
|
||||
}
|
||||
}
|
||||
|
||||
def removeSubscriber(subscriber: Subscriber[Any]): Unit = {
|
||||
val subscription = subscribers(subscriber)
|
||||
subscriptions -= subscription
|
||||
subscriptionsReadyForPush -= subscription
|
||||
subscribers -= subscriber
|
||||
if (subscribers.isEmpty) {
|
||||
exposedPublisher.shutdown(shutdownReason)
|
||||
context.stop(self)
|
||||
}
|
||||
}
|
||||
|
||||
override def postStop(): Unit =
|
||||
if (exposedPublisher ne null)
|
||||
exposedPublisher.shutdown(shutdownReason)
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -87,31 +87,6 @@ private[akka] final class PublisherSource[Out](p: Publisher[Out], val attributes
|
|||
override def withAttributes(attr: OperationAttributes): Module = new PublisherSource[Out](p, attr, amendShape(attr))
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* Start a new `Source` from the given `Future`. The stream will consist of
|
||||
* one element when the `Future` is completed with a successful value, which
|
||||
* may happen before or after materializing the `Flow`.
|
||||
* The stream terminates with an error if the `Future` is completed with a failure.
|
||||
*/
|
||||
private[akka] final class FutureSource[Out](future: Future[Out], val attributes: OperationAttributes, shape: SourceShape[Out]) extends SourceModule[Out, Unit](shape) {
|
||||
override def create(context: MaterializationContext) =
|
||||
future.value match {
|
||||
case Some(Success(element)) ⇒
|
||||
(SynchronousIterablePublisher(List(element), context.stageName), ()) // Option is not Iterable. sigh
|
||||
case Some(Failure(t)) ⇒
|
||||
(ErrorPublisher(t, context.stageName).asInstanceOf[Publisher[Out]], ())
|
||||
case None ⇒
|
||||
val actorMaterializer = ActorFlowMaterializer.downcast(context.materializer)
|
||||
val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes)
|
||||
(ActorPublisher[Out](actorMaterializer.actorOf(context,
|
||||
FuturePublisher.props(future, effectiveSettings))), ()) // FIXME this does not need to be an actor
|
||||
}
|
||||
|
||||
override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = new FutureSource(future, attributes, shape)
|
||||
override def withAttributes(attr: OperationAttributes): Module = new FutureSource(future, attr, amendShape(attr))
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -1,122 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import akka.dispatch.ExecutionContexts
|
||||
|
||||
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.util.control.NonFatal
|
||||
import akka.stream.impl.ReactiveStreamsCompliance._
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object SynchronousIterablePublisher {
|
||||
def apply[T](iterable: immutable.Iterable[T], name: String): Publisher[T] =
|
||||
new SynchronousIterablePublisher(iterable, name)
|
||||
|
||||
object IteratorSubscription {
|
||||
def apply[T](subscriber: Subscriber[T], iterator: Iterator[T]): Unit =
|
||||
new IteratorSubscription[T](subscriber, iterator).init()
|
||||
}
|
||||
|
||||
private[this] final class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription {
|
||||
var done = false
|
||||
var pendingDemand = 0L
|
||||
var pushing = false
|
||||
|
||||
import ReactiveStreamsCompliance._
|
||||
|
||||
def init(): Unit = try {
|
||||
if (!iterator.hasNext) { // Let's be prudent and issue onComplete immediately
|
||||
cancel()
|
||||
tryOnSubscribe(subscriber, this)
|
||||
tryOnComplete(subscriber)
|
||||
} else {
|
||||
tryOnSubscribe(subscriber, this)
|
||||
}
|
||||
} catch {
|
||||
case sv: SpecViolation ⇒
|
||||
cancel()
|
||||
throw sv // I think it is prudent to "escalate" the spec violation
|
||||
case NonFatal(e) ⇒
|
||||
cancel()
|
||||
tryOnError(subscriber, e)
|
||||
}
|
||||
|
||||
override def cancel(): Unit = done = true
|
||||
|
||||
override def request(elements: Long): Unit = {
|
||||
if (done) () // According to Reactive Streams Spec 3.6, `request` on a cancelled `Subscription` must be a NoOp
|
||||
else if (elements < 1) { // According to Reactive Streams Spec 3.9, with non-positive demand must yield onError
|
||||
cancel()
|
||||
rejectDueToNonPositiveDemand(subscriber)
|
||||
} else {
|
||||
pendingDemand += elements
|
||||
if (pendingDemand < 1)
|
||||
pendingDemand = Long.MaxValue // Long overflow, Reactive Streams Spec 3:17: effectively unbounded
|
||||
if (!pushing) {
|
||||
// According to Reactive Streams Spec 3:3, we must prevent unbounded recursion
|
||||
try {
|
||||
pushing = true
|
||||
pendingDemand = elements
|
||||
|
||||
@tailrec def pushNext(): Unit =
|
||||
if (done) ()
|
||||
else if (iterator.isEmpty) {
|
||||
cancel()
|
||||
tryOnComplete(subscriber)
|
||||
} else if (pendingDemand > 0) {
|
||||
pendingDemand -= 1
|
||||
tryOnNext(subscriber, iterator.next())
|
||||
pushNext()
|
||||
}
|
||||
|
||||
pushNext()
|
||||
} catch {
|
||||
case sv: SpecViolation ⇒
|
||||
cancel()
|
||||
throw sv // I think it is prudent to "escalate" the spec violation
|
||||
case NonFatal(e) ⇒
|
||||
cancel()
|
||||
tryOnError(subscriber, e)
|
||||
} finally {
|
||||
pushing = false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
* Publisher that will push all requested elements from the iterator of the iterable
|
||||
* to the subscriber in the calling thread of `requestMore`.
|
||||
*
|
||||
* It is only intended to be used with iterators over static collections.
|
||||
* Do *NOT* use it for iterators on lazy collections or other implementations that do more
|
||||
* than merely retrieve an element in their `next()` method!
|
||||
*
|
||||
* It is the responsibility of the subscriber to provide necessary memory visibility
|
||||
* if calls to `requestMore` and `cancel` are performed from different threads.
|
||||
* For example, usage from an actor is fine. Concurrent calls to the subscription is not allowed.
|
||||
* Reentrant calls to `requestMore` directly from `onNext` are supported by this publisher.
|
||||
*/
|
||||
private[akka] final class SynchronousIterablePublisher[T](
|
||||
private val iterable: immutable.Iterable[T],
|
||||
private val name: String) extends Publisher[T] {
|
||||
|
||||
import SynchronousIterablePublisher.IteratorSubscription
|
||||
|
||||
override def subscribe(subscriber: Subscriber[_ >: T]): Unit = {
|
||||
requireNonNullSubscriber(subscriber)
|
||||
IteratorSubscription(subscriber, try iterable.iterator catch { case NonFatal(t) ⇒ Iterator.continually(throw t) })
|
||||
}
|
||||
|
||||
override def toString: String = name
|
||||
}
|
||||
|
|
@ -7,7 +7,7 @@ import akka.actor.{ ActorRef, Cancellable, Props }
|
|||
import akka.stream._
|
||||
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousIterablePublisher, _ }
|
||||
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ }
|
||||
import akka.stream.stage.{ Context, PushPullStage, SyncDirective, TerminationDirective }
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
||||
|
|
@ -17,7 +17,7 @@ import akka.stream.stage.{ TerminationDirective, Directive, Context, PushPullSta
|
|||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.language.higherKinds
|
||||
import akka.actor.Props
|
||||
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousIterablePublisher }
|
||||
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher }
|
||||
import org.reactivestreams.Publisher
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
|
@ -165,6 +165,9 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
|
|||
|
||||
object Source extends SourceApply {
|
||||
|
||||
private[this] final val _id: Any ⇒ Any = x ⇒ x
|
||||
private[this] final def id[A]: A ⇒ A = _id.asInstanceOf[A ⇒ A]
|
||||
|
||||
private[stream] def apply[Out, Mat](module: SourceModule[Out, Mat]): Source[Out, Mat] =
|
||||
new Source(module)
|
||||
|
||||
|
|
@ -192,11 +195,8 @@ object Source extends SourceApply {
|
|||
* Elements are pulled out of the iterator in accordance with the demand coming
|
||||
* from the downstream transformation steps.
|
||||
*/
|
||||
def apply[T](f: () ⇒ Iterator[T]): Source[T, Unit] = {
|
||||
apply(new immutable.Iterable[T] {
|
||||
override def iterator: Iterator[T] = f()
|
||||
})
|
||||
}
|
||||
def apply[T](f: () ⇒ Iterator[T]): Source[T, Unit] =
|
||||
apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() })
|
||||
|
||||
/**
|
||||
* A graph with the shape of a source logically is a source, this method makes
|
||||
|
|
@ -216,9 +216,8 @@ object Source extends SourceApply {
|
|||
* stream will see an individual flow of elements (always starting from the
|
||||
* beginning) regardless of when they subscribed.
|
||||
*/
|
||||
def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] = {
|
||||
Source.single(()).mapConcat((_: Unit) ⇒ iterable).withAttributes(DefaultAttributes.iterableSource)
|
||||
}
|
||||
def apply[T](iterable: immutable.Iterable[T]): Source[T, Unit] =
|
||||
Source.single(iterable).mapConcat(id).withAttributes(DefaultAttributes.iterableSource)
|
||||
|
||||
/**
|
||||
* Start a new `Source` from the given `Future`. The stream will consist of
|
||||
|
|
@ -227,7 +226,7 @@ object Source extends SourceApply {
|
|||
* The stream terminates with a failure if the `Future` is completed with a failure.
|
||||
*/
|
||||
def apply[T](future: Future[T]): Source[T, Unit] =
|
||||
new Source(new FutureSource(future, DefaultAttributes.futureSource, shape("FutureSource")))
|
||||
Source.single(future).mapAsyncUnordered(1)(id).withAttributes(DefaultAttributes.futureSource)
|
||||
|
||||
/**
|
||||
* Elements are emitted periodically with the specified interval.
|
||||
|
|
@ -244,7 +243,7 @@ object Source extends SourceApply {
|
|||
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
|
||||
*/
|
||||
def single[T](element: T): Source[T, Unit] =
|
||||
apply(SynchronousIterablePublisher(List(element), "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize
|
||||
apply(SingleElementPublisher(element, "SingleSource")).withAttributes(DefaultAttributes.singleSource) // FIXME optimize
|
||||
|
||||
/**
|
||||
* Create a `Source` that will continually emit the given element.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue