Merge pull request #17792 from drewhk/wip-17765-materializer-shutdown

+str #17765: Add shutdown() to materializer, also fix interpreter interruption errors
This commit is contained in:
drewhk 2015-06-22 13:52:03 +02:00
commit 9ab2de5e24
6 changed files with 107 additions and 16 deletions

View file

@ -3,7 +3,7 @@
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import akka.actor._
import akka.dispatch.Dispatchers
@ -38,6 +38,13 @@ private[akka] case class ActorFlowMaterializerImpl(
import ActorFlowMaterializerImpl._
import akka.stream.impl.Stages._
private val haveShutDown = new AtomicBoolean(false)
override def shutdown(): Unit =
if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
override def isShutdown: Boolean = haveShutDown.get()
override def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
@ -59,6 +66,8 @@ private[akka] case class ActorFlowMaterializerImpl(
}
override def materialize[Mat](runnableFlow: Graph[ClosedShape, Mat]): Mat = {
if (haveShutDown.get())
throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.")
if (StreamLayout.Debug) runnableFlow.module.validate()
val session = new MaterializerSession(runnableFlow.module) {