+str - Fixes #16065 by adding an IteratorPublisher and removing SimpleCallbackPublisher

Adds an IteratorPublisher and IteratorPublisherImpl
Removes the SimpleCallbackPublisher
Switches the callback-source () => Option[T] to use a new ThunkIterator and the IteratorPublisher
Removes the use of a ControlThrowable (Stop) for signalling end since we now use Iterator
Improves a couple of streams-related tests so when they fail we get better output
This commit is contained in:
Viktor Klang 2014-11-07 18:02:29 +01:00
parent d60d5bc849
commit 18067cbf8f
9 changed files with 71 additions and 83 deletions

View file

@ -72,7 +72,7 @@ class FlowExpandSpec extends AkkaSpec {
.expand[Int, Int](seed = i i, extrapolate = i (i, i))
.fold(Set.empty[Int])(_ + _)
Await.result(future, 10.seconds) should be(Set.empty[Int] ++ (1 to 100))
Await.result(future, 10.seconds) should contain theSameElementsAs ((1 to 100).toSet)
}
"backpressure publisher when subscriber is slower" in {

View file

@ -37,9 +37,10 @@ class FlowIteratorSpec extends AkkaSpec {
}
"complete empty" in {
val p = Source(List.empty[Int].iterator).runWith(Sink.publisher)
val p = Source[Int](Iterator.empty).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscription()
c.expectComplete()
c.expectNoMsg(100.millis)

View file

@ -136,17 +136,14 @@ class GraphBalanceSpec extends AkkaSpec {
"fairly balance between three outputs" in {
val numElementsForSink = 10000
val f1, f2, f3 = Sink.fold[Int, Int](0)(_ + _)
val outputs = Seq.fill(3)(Sink.fold[Int, Int](0)(_ + _))
val g = FlowGraph { implicit b
val balance = Balance[Int]("balance", waitForAllDownstreams = true)
Source(Stream.fill(10000 * 3)(1)) ~> balance ~> f1
balance ~> f2
balance ~> f3
Source(Stream.fill(numElementsForSink * outputs.size)(1)) ~> balance
for { o outputs } balance ~> o
}.run()
Seq(f1, f2, f3) map { sink
Await.result(g.get(sink), 3.seconds) should be(numElementsForSink +- 1000)
}
for { o outputs } Await.result(g.get(o), 3.seconds) should be(numElementsForSink +- 1000)
}
"produce to second even though first cancels" in {

View file

@ -12,7 +12,6 @@ import akka.stream.impl.Cancel
import akka.stream.impl.ExposedPublisher
import akka.stream.impl.RequestMore
import akka.stream.impl.SoftShutdown
import akka.stream.impl.Stop
import akka.stream.impl.SubscribePending
import akka.stream.impl.SubscriberManagement
import akka.stream.impl.ActorBasedFlowMaterializer
@ -112,11 +111,7 @@ private class PersistentSourceImpl(persistenceId: String, sourceSettings: Persis
case Cancel(sub)
unregisterSubscription(sub.asInstanceOf[S])
case Response(ps)
try {
ps.foreach(pushToDownstream)
} catch {
case Stop
completeDownstream(); shutdownReason = None
try ps.foreach(pushToDownstream) catch {
case NonFatal(e) abortDownstream(e); shutdownReason = Some(e)
}
}

View file

@ -17,9 +17,7 @@ import org.reactivestreams.{ Publisher, Subscriber }
* INTERNAL API
*/
private[akka] object SimpleCallbackPublisher {
def props[T](settings: MaterializerSettings, f: () T): Props =
Props(new SimpleCallbackPublisherImpl(f, settings)).withDispatcher(settings.dispatcher)
def props[T](f: () T, settings: MaterializerSettings): Props = IteratorPublisher.props(Iterator.continually(f()), settings)
}
/**
@ -126,22 +124,23 @@ private[akka] trait SoftShutdown { this: Actor ⇒
/**
* INTERNAL API
*/
private[akka] object SimpleCallbackPublisherImpl {
case object Generate
private[akka] object IteratorPublisherImpl {
case object Flush
}
/**
* INTERNAL API
*/
private[akka] class SimpleCallbackPublisherImpl[T](f: () T, settings: MaterializerSettings)
private[akka] class IteratorPublisherImpl[T](iterator: Iterator[T], settings: MaterializerSettings)
extends Actor
with ActorLogging
with SubscriberManagement[T]
with SoftShutdown {
import akka.stream.impl.SimpleCallbackPublisherImpl._
import IteratorPublisherImpl.Flush
type S = ActorSubscription[T]
private var demand = 0L
var pub: ActorPublisher[T] = _
var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason
@ -155,36 +154,46 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi
case SubscribePending
pub.takePendingSubscribers() foreach registerSubscriber
context.become(active)
flush()
}
final def active: Receive = {
case SubscribePending
pub.takePendingSubscribers() foreach registerSubscriber
flush()
case RequestMore(sub, elements)
moreRequested(sub.asInstanceOf[S], elements)
generate()
flush()
case Cancel(sub)
unregisterSubscription(sub.asInstanceOf[S])
generate()
case Generate
generate()
flush()
case Flush
flush()
}
override def postStop(): Unit =
if (pub ne null) pub.shutdown(shutdownReason)
private var demand = 0L
private def generate(): Unit = {
if (demand > 0) {
try {
demand -= 1
pushToDownstream(f())
if (demand > 0) self ! Generate
} catch {
case Stop { completeDownstream(); shutdownReason = None }
case NonFatal(e) { abortDownstream(e); shutdownReason = Some(e) }
}
private[this] def flush(): Unit = try {
val endOfStream =
if (iterator.hasNext) {
if (demand > 0) {
pushToDownstream(iterator.next())
demand -= 1
iterator.hasNext == false
} else false
} else true
if (endOfStream) {
completeDownstream()
shutdownReason = None
} else if (demand > 0) {
self ! Flush
}
} catch {
case NonFatal(e)
abortDownstream(e)
shutdownReason = Some(e)
}
override def initialBufferSize = settings.initialFanOutBufferSize
@ -203,5 +212,4 @@ private[akka] class SimpleCallbackPublisherImpl[T](f: () ⇒ T, settings: Materi
pub.shutdown(shutdownReason)
softShutdown()
}
}

View file

@ -10,12 +10,7 @@ import akka.stream.MaterializerSettings
* INTERNAL API
*/
private[akka] object IteratorPublisher {
def props(iterator: Iterator[Any], settings: MaterializerSettings): Props = {
def f(): Any = {
if (!iterator.hasNext) throw Stop
iterator.next()
}
SimpleCallbackPublisher.props(settings, f)
}
def props[T](iterator: Iterator[T], settings: MaterializerSettings): Props =
Props(new IteratorPublisherImpl(iterator, settings)).withDispatcher(settings.dispatcher)
}

View file

@ -1,15 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.util.control.ControlThrowable
/**
* INTERNAL API
*
* This exception must be thrown from a callback-based stream publisher to
* signal the end of stream (if the produced stream is not infinite). This is used for example in
* [[akka.stream.scaladsl.Flow#apply]] (the variant which takes a closure).
*/
private[akka] case object Stop extends ControlThrowable

View file

@ -3,20 +3,18 @@
*/
package akka.stream.scaladsl
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.{ Props, ActorRef }
import akka.stream.impl._
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast.AstNode
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.annotation.unchecked.uncheckedVariance
import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.Failure
import scala.util.Success
import scala.util.{ Success, Failure }
sealed trait ActorFlowSource[+Out] extends Source[Out] {
@ -114,8 +112,7 @@ final case class IteratorSource[Out](iterator: Iterator[Out]) extends SimpleActo
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) =
if (iterator.isEmpty) (EmptyPublisher[Out], ())
else (ActorPublisher[Out](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings),
(ActorPublisher[Out](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings),
name = s"$flowName-0-iterator")), ())
}
@ -135,21 +132,31 @@ final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends
name = s"$flowName-0-iterable")), ())
}
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
final case class ThunkSource[Out](f: () Option[Out]) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) =
(ActorPublisher[Out](materializer.actorOf(SimpleCallbackPublisher.props(materializer.settings,
() f() match {
case Some(out) out
case _ throw Stop
}), name = s"$flowName-0-thunk")), ())
final class ThunkIterator[Out](thunk: () Option[Out]) extends Iterator[Out] {
require(thunk ne null, "thunk is not allowed to be null")
private[this] var value: Option[Out] = null
private[this] def advance(): Unit =
value = thunk() match {
case null throw new NullPointerException("Thunk is not allowed to return null")
case option option
}
@tailrec override final def hasNext: Boolean = value match {
case null
advance(); hasNext
case option option.isDefined
}
@tailrec override final def next(): Out = value match {
case null
advance(); next()
case Some(next)
advance(); next
case None Iterator.empty.next()
}
override def toString: String = "ThunkIterator"
}
/**

View file

@ -116,7 +116,7 @@ object Source {
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
def apply[T](f: () Option[T]): Source[T] = ThunkSource(f)
def apply[T](f: () Option[T]): Source[T] = IteratorSource(new ThunkIterator(f))
/**
* Start a new `Source` from the given `Future`. The stream will consist of