diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 43ab2cbe62..f746a95be3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -11,7 +11,7 @@ import akka.stream.stage.Stage import scala.collection.immutable import scala.concurrent.duration._ import akka.actor._ -import akka.stream.{ OperationAttributes, ActorFlowMaterializerSettings, ActorFlowMaterializer } +import akka.stream.{ AbruptTerminationException, OperationAttributes, ActorFlowMaterializerSettings, ActorFlowMaterializer } import akka.stream.impl._ import akka.stream.testkit._ import akka.stream.testkit.Utils._ @@ -533,8 +533,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece def checkError(sprobe: TestSubscriber.ManualProbe[Any]): Unit = { val error = sprobe.expectError() - error.isInstanceOf[IllegalStateException] should be(true) - error.getMessage should be("Processor actor terminated abruptly") + error.isInstanceOf[AbruptTerminationException] should be(true) + error.getMessage should startWith("Processor actor") } val downstream2 = TestSubscriber.manualProbe[Any]() diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 472c8ebb43..2c3aef362b 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -13,6 +13,8 @@ import com.typesafe.config.Config import scala.concurrent.duration._ import akka.japi.function +import scala.util.control.NoStackTrace + object ActorFlowMaterializer { /** @@ -163,6 +165,14 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { */ class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause) +/** + * This exception signals that an actor implementing a Reactive Streams Subscriber, Publisher or Processor + * has been terminated without being notified by an onError, onComplete or cancel signal. This usually happens + * when an ActorSystem is shut down while stream processing actors are still running. + */ +final case class AbruptTerminationException(actor: ActorRef) + extends RuntimeException(s"Processor actor [$actor] terminated abruptly") with NoStackTrace + object ActorFlowMaterializerSettings { def apply( diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index acc4158d88..242bec4612 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -5,7 +5,7 @@ package akka.stream.impl import java.util.Arrays import akka.actor._ -import akka.stream.ActorFlowMaterializerSettings +import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings } import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnComplete, OnError } import org.reactivestreams.{ Subscriber, Subscription, Processor } @@ -288,7 +288,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: ActorFlowMateriali override def postStop(): Unit = { primaryInputs.cancel() - primaryOutputs.error(new IllegalStateException("Processor actor terminated abruptly")) + primaryOutputs.error(AbruptTerminationException(self)) } override def postRestart(reason: Throwable): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 19a13fac44..af087c84a4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -5,10 +5,9 @@ package akka.stream.impl import akka.actor.{ ActorRef, ActorLogging, Actor } import akka.actor.Props -import akka.stream.ActorFlowMaterializerSettings +import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings, InPort, Shape } import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber } import akka.stream.scaladsl.FlexiMerge.MergeLogic -import akka.stream.{ InPort, Shape } import org.reactivestreams.{ Subscription, Subscriber } import akka.actor.DeadLetterSuppression @@ -248,7 +247,7 @@ private[akka] abstract class FanIn(val settings: ActorFlowMaterializerSettings, override def postStop(): Unit = { inputBunch.cancel() - primaryOutputs.error(new IllegalStateException("Processor actor terminated abruptly")) + primaryOutputs.error(AbruptTerminationException(self)) } override def postRestart(reason: Throwable): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala index baf2dc32ef..b727791b9a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanOut.scala @@ -4,14 +4,13 @@ package akka.stream.impl import akka.stream.scaladsl.FlexiRoute.RouteLogic -import akka.stream.Shape +import akka.stream.{ AbruptTerminationException, Shape, ActorFlowMaterializerSettings } import scala.collection.immutable import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.Props -import akka.stream.ActorFlowMaterializerSettings import org.reactivestreams.Subscription import akka.actor.DeadLetterSuppression @@ -279,7 +278,7 @@ private[akka] abstract class FanOut(val settings: ActorFlowMaterializerSettings, override def postStop(): Unit = { primaryInputs.cancel() - outputBunch.cancel(new IllegalStateException("Processor actor terminated abruptly")) + outputBunch.cancel(AbruptTerminationException(self)) } override def postRestart(reason: Throwable): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 39f33882ab..83563e8702 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -5,16 +5,14 @@ package akka.stream.impl.fusing import java.util.Arrays import akka.actor._ -import akka.stream.ActorFlowMaterializerSettings +import akka.stream.{ AbruptTerminationException, ActorFlowMaterializerSettings, OperationAttributes, ActorFlowMaterializer } import akka.stream.actor.ActorSubscriber.OnSubscribe import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } import akka.stream.impl._ -import akka.stream.OperationAttributes import akka.stream.impl.fusing.OneBoundedInterpreter.{ InitializationFailed, InitializationFailure, InitializationSuccessful } import akka.stream.stage._ import org.reactivestreams.{ Subscriber, Subscription } import akka.event.{ Logging, LoggingAdapter } -import akka.stream.ActorFlowMaterializer /** * INTERNAL API @@ -361,7 +359,7 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings override def postStop(): Unit = { upstream.cancel() - downstream.fail(new IllegalStateException("Processor actor terminated abruptly")) + downstream.fail(AbruptTerminationException(self)) } override def postRestart(reason: Throwable): Unit = {