=str: Materializer should report being shut down on sys termination

This commit is contained in:
Endre Sándor Varga 2015-06-25 12:54:29 +02:00
parent 911943fc92
commit 292eba2d73
3 changed files with 19 additions and 6 deletions

View file

@ -1,6 +1,6 @@
package akka.stream
import akka.actor.Props
import akka.actor.{ ActorSystem, Props }
import akka.stream.impl.{ StreamSupervisor, ActorMaterializerImpl }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.AkkaSpec
@ -58,6 +58,14 @@ class ActorMaterializerSpec extends AkkaSpec with ImplicitSender {
3.seconds)
}
"report correctly if it has been shut down from the side" in {
val sys = ActorSystem()
val m = ActorMaterializer.create(sys)
sys.shutdown()
sys.awaitTermination()
m.isShutdown should ===(true)
}
}
}

View file

@ -5,6 +5,7 @@ package akka.stream
import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
import akka.stream.impl._
@ -49,13 +50,15 @@ object ActorMaterializer {
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply(materializerSettings: ActorMaterializerSettings, namePrefix: String, optimizations: Optimizations)(implicit context: ActorRefFactory): ActorMaterializer = {
val haveShutDown = new AtomicBoolean(false)
val system = actorSystemOf(context)
new ActorMaterializerImpl(
system,
materializerSettings,
system.dispatchers,
context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)),
context.actorOf(StreamSupervisor.props(materializerSettings, haveShutDown).withDispatcher(materializerSettings.dispatcher)),
haveShutDown,
FlowNameCounter(system).counter,
namePrefix,
optimizations)

View file

@ -31,6 +31,7 @@ private[akka] case class ActorMaterializerImpl(
override val settings: ActorMaterializerSettings,
dispatchers: Dispatchers,
val supervisor: ActorRef,
val haveShutDown: AtomicBoolean,
flowNameCounter: AtomicLong,
namePrefix: String,
optimizations: Optimizations)
@ -38,8 +39,6 @@ private[akka] case class ActorMaterializerImpl(
import ActorMaterializerImpl._
import akka.stream.impl.Stages._
private val haveShutDown = new AtomicBoolean(false)
override def shutdown(): Unit =
if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
@ -255,7 +254,8 @@ private[akka] class FlowNameCounter extends Extension {
* INTERNAL API
*/
private[akka] object StreamSupervisor {
def props(settings: ActorMaterializerSettings): Props = Props(new StreamSupervisor(settings)).withDeploy(Deploy.local)
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
final case class Materialize(props: Props, name: String) extends DeadLetterSuppression with NoSerializationVerificationNeeded
@ -269,7 +269,7 @@ private[akka] object StreamSupervisor {
final case object StoppedChildren
}
private[akka] class StreamSupervisor(settings: ActorMaterializerSettings) extends Actor {
private[akka] class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor {
import akka.stream.impl.StreamSupervisor._
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
@ -283,6 +283,8 @@ private[akka] class StreamSupervisor(settings: ActorMaterializerSettings) extend
context.children.foreach(context.stop)
sender() ! StoppedChildren
}
override def postStop(): Unit = haveShutDown.set(true)
}
/**