!str #17465: Don't create expensive exceptions for abrupt termination

This commit is contained in:
Endre Sándor Varga 2015-05-12 15:44:18 +02:00
parent 050a930f4f
commit 3124fc3018
6 changed files with 21 additions and 15 deletions

View file

@ -11,7 +11,7 @@ import akka.stream.stage.Stage
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor._
import akka.stream.{ OperationAttributes, ActorFlowMaterializerSettings, ActorFlowMaterializer }
import akka.stream.{ AbruptTerminationException, OperationAttributes, ActorFlowMaterializerSettings, ActorFlowMaterializer }
import akka.stream.impl._
import akka.stream.testkit._
import akka.stream.testkit.Utils._
@ -533,8 +533,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = {
val error = sprobe.expectError()
error.isInstanceOf[IllegalStateException] should be(true)
error.getMessage should be("Processor actor terminated abruptly")
error.isInstanceOf[AbruptTerminationException] should be(true)
error.getMessage should startWith("Processor actor")
}
val downstream2 = TestSubscriber.manualProbe[Any]()

View file

@ -13,6 +13,8 @@ import com.typesafe.config.Config
import scala.concurrent.duration._
import akka.japi.function
import scala.util.control.NoStackTrace
object ActorFlowMaterializer {
/**
@ -163,6 +165,14 @@ abstract class ActorFlowMaterializer extends FlowMaterializer {
*/
class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause)
/**
* This exception signals that an actor implementing a Reactive Streams Subscriber, Publisher or Processor
* has been terminated without being notified by an onError, onComplete or cancel signal. This usually happens
* when an ActorSystem is shut down while stream processing actors are still running.
*/
final case class AbruptTerminationException(actor: ActorRef)
extends RuntimeException(s"Processor actor [$actor] terminated abruptly") with NoStackTrace
object ActorFlowMaterializerSettings {
def apply(

View file

@ -5,7 +5,7 @@ package akka.stream.impl
import java.util.Arrays
import akka.actor._
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings }
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError }
import org.reactivestreams.{ Subscriber, Subscription, Processor }
@ -288,7 +288,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali
override def postStop(): Unit = {
primaryInputs.cancel()
primaryOutputs.error(new IllegalStateException("Processor actor terminated abruptly"))
primaryOutputs.error(AbruptTerminationException(self))
}
override def postRestart(reason: Throwable): Unit = {

View file

@ -5,10 +5,9 @@ package akka.stream.impl
import akka.actor.{ ActorRef, ActorLogging, Actor }
import akka.actor.Props
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings, InPort, Shape }
import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber }
import akka.stream.scaladsl.FlexiMerge.MergeLogic
import akka.stream.{ InPort, Shape }
import org.reactivestreams.{ Subscription, Subscriber }
import akka.actor.DeadLetterSuppression
@ -248,7 +247,7 @@ private[akka] abstract class FanIn(val settings: ActorFlowMaterializerSettings,
override def postStop(): Unit = {
inputBunch.cancel()
primaryOutputs.error(new IllegalStateException("Processor actor terminated abruptly"))
primaryOutputs.error(AbruptTerminationException(self))
}
override def postRestart(reason: Throwable): Unit = {

View file

@ -4,14 +4,13 @@
package akka.stream.impl
import akka.stream.scaladsl.FlexiRoute.RouteLogic
import akka.stream.Shape
import akka.stream.{ AbruptTerminationException, Shape, ActorFlowMaterializerSettings }
import scala.collection.immutable
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.stream.ActorFlowMaterializerSettings
import org.reactivestreams.Subscription
import akka.actor.DeadLetterSuppression
@ -279,7 +278,7 @@ private[akka] abstract class FanOut(val settings: ActorFlowMaterializerSettings,
override def postStop(): Unit = {
primaryInputs.cancel()
outputBunch.cancel(new IllegalStateException("Processor actor terminated abruptly"))
outputBunch.cancel(AbruptTerminationException(self))
}
override def postRestart(reason: Throwable): Unit = {

View file

@ -5,16 +5,14 @@ package akka.stream.impl.fusing
import java.util.Arrays
import akka.actor._
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings, OperationAttributes, ActorFlowMaterializer }
import akka.stream.actor.ActorSubscriber.OnSubscribe
import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete }
import akka.stream.impl._
import akka.stream.OperationAttributes
import akka.stream.impl.fusing.OneBoundedInterpreter.{ InitializationFailed, InitializationFailure, InitializationSuccessful }
import akka.stream.stage._
import org.reactivestreams.{ Subscriber, Subscription }
import akka.event.{ Logging, LoggingAdapter }
import akka.stream.ActorFlowMaterializer
/**
* INTERNAL API
@ -361,7 +359,7 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings
override def postStop(): Unit = {
upstream.cancel()
downstream.fail(new IllegalStateException("Processor actor terminated abruptly"))
downstream.fail(AbruptTerminationException(self))
}
override def postRestart(reason: Throwable): Unit = {