Merge pull request #16329 from akka/wip-harden-iterator-publisher-√

=str - hardens the IteratorPublisher by making it use the ReactiveStream...
This commit is contained in:
Konrad Malawski 2014-11-19 11:27:00 +01:00
commit 9ee2a1f882
3 changed files with 42 additions and 32 deletions

View file

@ -3,7 +3,7 @@
*/
package akka.stream
import org.reactivestreams.{ Subscription, Subscriber }
import org.reactivestreams.{ Subscription, Subscriber, Publisher }
import scala.util.control.NonFatal
@ -24,6 +24,9 @@ object ReactiveStreamsConstants {
final def validateRequest(n: Long): Unit =
if (n < 1) throw new IllegalArgumentException(NumberOfElementsInRequestMustBePositiveMsg) with SpecViolation
final def rejectAdditionalSubscriber[T](subsriber: Subscriber[T], rejector: Publisher[T]): Unit =
tryOnError(subsriber, new IllegalStateException(s"$rejector $SupportsOnlyASingleSubscriber"))
sealed trait SpecViolation {
self: Throwable
def violation: Throwable = self // this method is needed because Scalac is not smart enough to handle it otherwise
@ -32,8 +35,12 @@ object ReactiveStreamsConstants {
final class SignalThrewException(message: String, cause: Throwable) extends IllegalStateException(message, cause) with SpecViolation
final def tryOnError[T](subscriber: Subscriber[T], error: Throwable): Unit =
try subscriber.onError(error) catch {
case NonFatal(t) throw new SignalThrewException(subscriber + ".onError", t)
error match {
case sv: SpecViolation throw new IllegalStateException("It is not legal to try to signal onError with a SpecViolation", sv.violation)
case other
try subscriber.onError(other) catch {
case NonFatal(t) throw new SignalThrewException(subscriber + ".onError", t)
}
}
final def tryOnNext[T](subscriber: Subscriber[T], element: T): Unit =

View file

@ -22,11 +22,12 @@ private[akka] object IteratorPublisher {
private case object PushMore
private sealed trait State
private sealed trait StopState extends State
private case object Unitialized extends State
private case object Initialized extends State
private case object Cancelled extends State
private case object Completed extends State
private case class Errored(cause: Throwable) extends State
private case object Cancelled extends StopState
private case object Completed extends StopState
private case class Errored(cause: Throwable) extends StopState
}
/**
@ -35,18 +36,20 @@ private[akka] object IteratorPublisher {
*/
private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: MaterializerSettings) extends Actor {
import IteratorPublisher._
import ReactiveStreamsConstants._
private var exposedPublisher: ActorPublisher[Any] = _
private var subscriber: Subscriber[Any] = _
private var downstreamDemand: Long = 0L
private var state: State = Unitialized
private val maxPush = settings.maxInputBufferSize
private val maxPush = settings.maxInputBufferSize // FIXME why is this a good number?
def receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
context.become(waitingForFirstSubscriber)
case _ throw new IllegalStateException("The first message must be ExposedPublisher")
case _
throw new IllegalStateException("The first message must be ExposedPublisher")
}
def waitingForFirstSubscriber: Receive = {
@ -64,11 +67,9 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia
def active: Receive = {
case RequestMore(_, elements)
downstreamDemand += elements
if (downstreamDemand < 0) {
// Long has overflown, reactive-streams specification rule 3.17
val demandOverflowException = new IllegalStateException(ReactiveStreamsConstants.TotalPendingDemandMustNotExceedLongMaxValue)
stop(Errored(demandOverflowException))
} else
if (downstreamDemand < 0) // Long has overflown, reactive-streams specification rule 3.17
stop(Errored(new IllegalStateException(TotalPendingDemandMustNotExceedLongMaxValue)))
else
push()
case PushMore
push()
@ -84,7 +85,7 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia
if (downstreamDemand > 0) {
downstreamDemand -= 1
val hasNext = {
subscriber.onNext(iterator.next())
tryOnNext(subscriber, iterator.next())
iterator.hasNext
}
if (!hasNext)
@ -101,20 +102,22 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia
}
private def registerSubscriber(sub: Subscriber[Any]): Unit = {
if (subscriber eq null) {
subscriber = sub
subscriber.onSubscribe(new ActorSubscription(self, sub))
} else
sub.onError(new IllegalStateException(s"${Logging.simpleName(this)} ${ReactiveStreamsConstants.SupportsOnlyASingleSubscriber}"))
subscriber match {
case null
subscriber = sub
tryOnSubscribe(sub, new ActorSubscription(self, sub))
case _
rejectAdditionalSubscriber(sub, exposedPublisher)
}
}
private def stop(reason: State): Unit = {
private def stop(reason: StopState): Unit = {
state match {
case _: Errored | Cancelled | Completed throw new IllegalStateException
case _ // ok
case _: StopState throw new IllegalStateException(s"Already stopped. Transition attempted from $state to $reason")
case _
state = reason
context.stop(self)
}
state = reason
context.stop(self)
}
override def postStop(): Unit = {
@ -122,10 +125,10 @@ private[akka] class IteratorPublisher(iterator: Iterator[Any], settings: Materia
case Unitialized | Initialized | Cancelled
if (exposedPublisher ne null) exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
case Completed
subscriber.onComplete()
tryOnComplete(subscriber)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
case Errored(e)
subscriber.onError(e)
tryOnError(subscriber, e)
exposedPublisher.shutdown(Some(e))
}
// if onComplete or onError throws we let normal supervision take care of it,

View file

@ -107,12 +107,12 @@ final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = {
val publisher = try {
val it = iterable.iterator
ActorPublisher[Out](materializer.actorOf(IteratorPublisher.props(it, materializer.settings), name = s"$flowName-0-iterable"))
} catch {
case NonFatal(e) ErrorPublisher(e, s"$flowName-0-error").asInstanceOf[Publisher[Out]]
}
val publisher =
try ActorPublisher[Out](
materializer.actorOf(IteratorPublisher.props(iterable.iterator, materializer.settings),
name = s"$flowName-0-iterable")) catch {
case NonFatal(e) ErrorPublisher(e, s"$flowName-0-error").asInstanceOf[Publisher[Out]]
}
(publisher, ())
}
}