From f1a5eeef5edea60f72d376be7375fc34483ce6a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 17 Jun 2015 16:34:05 +0200 Subject: [PATCH] =str #17759: Handle when request() throws in ActorInterpreter --- .../akka/stream/impl/ActorInterpreterSpec.scala | 17 +++++++++++++++++ .../stream/impl/ReactiveStreamsCompliance.scala | 16 +++++++++++++++- .../stream/impl/fusing/ActorInterpreter.scala | 15 +++++++++------ .../main/scala/akka/stream/javadsl/Sink.scala | 2 +- 4 files changed, 42 insertions(+), 8 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala index eb228395ae..8166f2c29d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala @@ -4,6 +4,7 @@ package akka.stream.impl import akka.stream.Supervision._ +import akka.stream.impl.ReactiveStreamsCompliance.SpecViolation import akka.stream.testkit.AkkaSpec import akka.stream._ import akka.stream.scaladsl._ @@ -13,6 +14,7 @@ import akka.stream.stage.Stage import akka.stream.stage.PushPullStage import akka.stream.stage.Context import akka.testkit.TestLatch +import org.reactivestreams.{ Subscription, Subscriber, Publisher } import scala.concurrent.Await import scala.concurrent.duration._ @@ -133,6 +135,21 @@ class ActorInterpreterSpec extends AkkaSpec { "satisfy large demand" in largeDemand(0) "satisfy larger demand" in largeDemand(1) + "handle spec violations" in { + a[SpecViolation] should be thrownBy { + Await.result( + Source(new Publisher[String] { + def subscribe(s: Subscriber[_ >: String]) = { + s.onSubscribe(new Subscription { + def cancel() = () + def request(n: Long) = sys.error("test error") + }) + } + }).runFold("")(_ + _), + 3.seconds) + } + } + def largeDemand(extra: Int): Unit = { val N = 3 * system.settings.config.getInt("akka.stream.materializer.output-burst-limit") val large = new PushPullStage[Int, Int] { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index 5e5bb6640a..e0d6a2bc8d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -100,8 +100,22 @@ private[stream] object ReactiveStreamsCompliance { } } - final def tryOnComplete[T](subscriber: Subscriber[T]): Unit = + final def tryOnComplete[T](subscriber: Subscriber[T]): Unit = { try subscriber.onComplete() catch { case NonFatal(t) ⇒ throw new SignalThrewException(subscriber + ".onComplete", t) } + } + + final def tryRequest(subscription: Subscription, demand: Long, onError: (Throwable) ⇒ Unit): Unit = { + try subscription.request(demand) catch { + case NonFatal(t) ⇒ onError(new SignalThrewException("It is illegal to throw exceptions from request(), rule 3.16", t)) + } + } + + final def tryCancel(subscription: Subscription, onError: (Throwable) ⇒ Unit): Unit = { + try subscription.cancel() catch { + case NonFatal(t) ⇒ onError(new SignalThrewException("It is illegal to throw exceptions from cancel(), rule 3.15", t)) + } + } + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 8e906db61e..def327ea91 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -5,6 +5,7 @@ package akka.stream.impl.fusing import java.util.Arrays import akka.actor._ +import akka.stream.impl.ReactiveStreamsCompliance._ import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings, OperationAttributes, ActorFlowMaterializer } import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } @@ -14,6 +15,8 @@ import akka.stream.stage._ import org.reactivestreams.{ Subscriber, Subscription } import akka.event.{ Logging, LoggingAdapter } +import scala.util.control.NonFatal + /** * INTERNAL API */ @@ -49,7 +52,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) batchRemaining -= 1 if (batchRemaining == 0 && !upstreamCompleted) { - upstream.request(requestBatchSize) + tryRequest(upstream, requestBatchSize, onError) batchRemaining = requestBatchSize } @@ -91,7 +94,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) def cancel(): Unit = { if (!upstreamCompleted) { upstreamCompleted = true - if (upstream ne null) upstream.cancel() + if (upstream ne null) tryCancel(upstream, onError) downstreamWaiting = false clear() } @@ -112,14 +115,14 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) private def onSubscribe(subscription: Subscription): Unit = { assert(subscription != null) if (upstreamCompleted) - subscription.cancel() + tryCancel(subscription, onError) else if (downstreamCanceled) { upstreamCompleted = true - subscription.cancel() + tryCancel(subscription, onError) } else { upstream = subscription // Prefetch - upstream.request(inputBuffer.length) + tryRequest(upstream, inputBuffer.length, onError) subreceive.become(upstreamRunning) } } @@ -145,7 +148,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) case OnComplete ⇒ onComplete() case OnError(cause) ⇒ onError(cause) - case OnSubscribe(subscription) ⇒ subscription.cancel() // spec rule 2.5 + case OnSubscribe(subscription) ⇒ tryCancel(subscription, onError) // spec rule 2.5 } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index e6a556463b..199e8eca8b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -65,7 +65,7 @@ object Sink { */ def foreach[T](f: function.Procedure[T]): Sink[T, Future[Unit]] = new Sink(scaladsl.Sink.foreach(f.apply)) - + /** * A `Sink` that will invoke the given procedure for each received element in parallel. The sink is materialized * into a [[scala.concurrent.Future]].