=str #17759: Handle when request() throws in ActorInterpreter

This commit is contained in:
Endre Sándor Varga 2015-06-17 16:34:05 +02:00
parent 632868b868
commit f1a5eeef5e
4 changed files with 42 additions and 8 deletions

View file

@ -4,6 +4,7 @@
package akka.stream.impl
import akka.stream.Supervision._
import akka.stream.impl.ReactiveStreamsCompliance.SpecViolation
import akka.stream.testkit.AkkaSpec
import akka.stream._
import akka.stream.scaladsl._
@ -13,6 +14,7 @@ import akka.stream.stage.Stage
import akka.stream.stage.PushPullStage
import akka.stream.stage.Context
import akka.testkit.TestLatch
import org.reactivestreams.{ Subscription, Subscriber, Publisher }
import scala.concurrent.Await
import scala.concurrent.duration._
@ -133,6 +135,21 @@ class ActorInterpreterSpec extends AkkaSpec {
"satisfy large demand" in largeDemand(0)
"satisfy larger demand" in largeDemand(1)
"handle spec violations" in {
a[SpecViolation] should be thrownBy {
Await.result(
Source(new Publisher[String] {
def subscribe(s: Subscriber[_ >: String]) = {
s.onSubscribe(new Subscription {
def cancel() = ()
def request(n: Long) = sys.error("test error")
})
}
}).runFold("")(_ + _),
3.seconds)
}
}
def largeDemand(extra: Int): Unit = {
val N = 3 * system.settings.config.getInt("akka.stream.materializer.output-burst-limit")
val large = new PushPullStage[Int, Int] {

View file

@ -100,8 +100,22 @@ private[stream] object ReactiveStreamsCompliance {
}
}
final def tryOnComplete[T](subscriber: Subscriber[T]): Unit =
final def tryOnComplete[T](subscriber: Subscriber[T]): Unit = {
try subscriber.onComplete() catch {
case NonFatal(t) throw new SignalThrewException(subscriber + ".onComplete", t)
}
}
final def tryRequest(subscription: Subscription, demand: Long, onError: (Throwable) Unit): Unit = {
try subscription.request(demand) catch {
case NonFatal(t) onError(new SignalThrewException("It is illegal to throw exceptions from request(), rule 3.16", t))
}
}
final def tryCancel(subscription: Subscription, onError: (Throwable) Unit): Unit = {
try subscription.cancel() catch {
case NonFatal(t) onError(new SignalThrewException("It is illegal to throw exceptions from cancel(), rule 3.15", t))
}
}
}

View file

@ -5,6 +5,7 @@ package akka.stream.impl.fusing
import java.util.Arrays
import akka.actor._
import akka.stream.impl.ReactiveStreamsCompliance._
import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings, OperationAttributes, ActorFlowMaterializer }
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
@ -14,6 +15,8 @@ import akka.stream.stage._
import org.reactivestreams.{ Subscriber, Subscription }
import akka.event.{ Logging, LoggingAdapter }
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@ -49,7 +52,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
batchRemaining -= 1
if (batchRemaining == 0 && !upstreamCompleted) {
upstream.request(requestBatchSize)
tryRequest(upstream, requestBatchSize, onError)
batchRemaining = requestBatchSize
}
@ -91,7 +94,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
def cancel(): Unit = {
if (!upstreamCompleted) {
upstreamCompleted = true
if (upstream ne null) upstream.cancel()
if (upstream ne null) tryCancel(upstream, onError)
downstreamWaiting = false
clear()
}
@ -112,14 +115,14 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
private def onSubscribe(subscription: Subscription): Unit = {
assert(subscription != null)
if (upstreamCompleted)
subscription.cancel()
tryCancel(subscription, onError)
else if (downstreamCanceled) {
upstreamCompleted = true
subscription.cancel()
tryCancel(subscription, onError)
} else {
upstream = subscription
// Prefetch
upstream.request(inputBuffer.length)
tryRequest(upstream, inputBuffer.length, onError)
subreceive.become(upstreamRunning)
}
}
@ -145,7 +148,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
case OnComplete onComplete()
case OnError(cause) onError(cause)
case OnSubscribe(subscription) subscription.cancel() // spec rule 2.5
case OnSubscribe(subscription) tryCancel(subscription, onError) // spec rule 2.5
}
}

View file

@ -65,7 +65,7 @@ object Sink {
*/
def foreach[T](f: function.Procedure[T]): Sink[T, Future[Unit]] =
new Sink(scaladsl.Sink.foreach(f.apply))
/**
* A `Sink` that will invoke the given procedure for each received element in parallel. The sink is materialized
* into a [[scala.concurrent.Future]].