From 292eba2d73556b90e1fb8bc94e520f5fa6a0624b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 25 Jun 2015 12:54:29 +0200 Subject: [PATCH] =str: Materializer should report being shut down on sys termination --- .../test/scala/akka/stream/ActorMaterializerSpec.scala | 10 +++++++++- .../src/main/scala/akka/stream/ActorMaterializer.scala | 5 ++++- .../scala/akka/stream/impl/ActorMaterializerImpl.scala | 10 ++++++---- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala index 61e49e3956..7d61333187 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -1,6 +1,6 @@ package akka.stream -import akka.actor.Props +import akka.actor.{ ActorSystem, Props } import akka.stream.impl.{ StreamSupervisor, ActorMaterializerImpl } import akka.stream.scaladsl.{ Sink, Source } import akka.stream.testkit.AkkaSpec @@ -58,6 +58,14 @@ class ActorMaterializerSpec extends AkkaSpec with ImplicitSender { 3.seconds) } + "report correctly if it has been shut down from the side" in { + val sys = ActorSystem() + val m = ActorMaterializer.create(sys) + sys.shutdown() + sys.awaitTermination() + m.isShutdown should ===(true) + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 6b01304bb2..26c3a7ce58 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -5,6 +5,7 @@ package akka.stream import java.util.Locale import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props } import akka.stream.impl._ @@ -49,13 +50,15 @@ object ActorMaterializer { * `namePrefix-flowNumber-flowStepNumber-stepName`. */ def apply(materializerSettings: ActorMaterializerSettings, namePrefix: String, optimizations: Optimizations)(implicit context: ActorRefFactory): ActorMaterializer = { + val haveShutDown = new AtomicBoolean(false) val system = actorSystemOf(context) new ActorMaterializerImpl( system, materializerSettings, system.dispatchers, - context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)), + context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher)), + haveShutDown, FlowNameCounter(system).counter, namePrefix, optimizations) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 29a1360f2d..7fb4825dd2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -31,6 +31,7 @@ private[akka] case class ActorMaterializerImpl( override val settings: ActorMaterializerSettings, dispatchers: Dispatchers, val supervisor: ActorRef, + val haveShutDown: AtomicBoolean, flowNameCounter: AtomicLong, namePrefix: String, optimizations: Optimizations) @@ -38,8 +39,6 @@ private[akka] case class ActorMaterializerImpl( import ActorMaterializerImpl._ import akka.stream.impl.Stages._ - private val haveShutDown = new AtomicBoolean(false) - override def shutdown(): Unit = if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill @@ -255,7 +254,8 @@ private[akka] class FlowNameCounter extends Extension { * INTERNAL API */ private[akka] object StreamSupervisor { - def props(settings: ActorMaterializerSettings): Props = Props(new StreamSupervisor(settings)).withDeploy(Deploy.local) + def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props = + Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local) final case class Materialize(props: Props, name: String) extends DeadLetterSuppression with NoSerializationVerificationNeeded @@ -269,7 +269,7 @@ private[akka] object StreamSupervisor { final case object StoppedChildren } -private[akka] class StreamSupervisor(settings: ActorMaterializerSettings) extends Actor { +private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor { import akka.stream.impl.StreamSupervisor._ override def supervisorStrategy = SupervisorStrategy.stoppingStrategy @@ -283,6 +283,8 @@ private[akka] class StreamSupervisor(settings: ActorMaterializerSettings) extend context.children.foreach(context.stop) sender() ! StoppedChildren } + + override def postStop(): Unit = haveShutDown.set(true) } /**