diff --git a/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala new file mode 100644 index 0000000000..9eedf56da5 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/ActorMaterializerSpec.scala @@ -0,0 +1,54 @@ +package akka.stream + +import akka.stream.impl.{ StreamSupervisor, ActorFlowMaterializerImpl } +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.testkit.AkkaSpec +import akka.testkit.{ ImplicitSender, TestProbe } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class ActorMaterializerSpec extends AkkaSpec with ImplicitSender { + + "ActorMaterializer" must { + + "report shutdown status properly" in { + val m = ActorFlowMaterializer.create(system) + + m.isShutdown should ===(false) + m.shutdown() + m.isShutdown should ===(true) + } + + "properly shut down actors associated with it" in { + val m = ActorFlowMaterializer.create(system) + + val f = Source.lazyEmpty[Int].runFold(0)(_ + _)(m) + m.shutdown() + + an[AbruptTerminationException] should be thrownBy + Await.result(f, 3.seconds) + } + + "refuse materialization after shutdown" in { + val m = ActorFlowMaterializer.create(system) + m.shutdown() + an[IllegalStateException] should be thrownBy + Source(1 to 5).runForeach(println)(m) + } + + "shut down the supervisor actor it encapsulates" in { + val m = ActorFlowMaterializer.create(system).asInstanceOf[ActorFlowMaterializerImpl] + + Source.lazyEmpty[Any].to(Sink.ignore).run()(m) + m.supervisor ! StreamSupervisor.GetChildren + expectMsgType[StreamSupervisor.Children] + m.shutdown() + + m.supervisor ! StreamSupervisor.GetChildren + expectNoMsg(1.second) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala index 7352be1a2c..4eca857805 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/ActorInterpreterSpec.scala @@ -136,7 +136,7 @@ class ActorInterpreterSpec extends AkkaSpec { "satisfy larger demand" in largeDemand(1) "handle spec violations" in { - a[SpecViolation] should be thrownBy { + a[AbruptTerminationException] should be thrownBy { Await.result( Source(new Publisher[String] { def subscribe(s: Subscriber[_ >: String]) = { diff --git a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala index 12807046e2..4dd5c19658 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorFlowMaterializer.scala @@ -147,6 +147,18 @@ abstract class ActorFlowMaterializer extends FlowMaterializer { def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings + /** + * Shuts down this materializer and all the stages that have been materialized through this materializer. After + * having shut down, this materializer cannot be used again. Any attempt to materialize stages after having + * shut down will result in an IllegalStateException being thrown at materialization time. + */ + def shutdown(): Unit + + /** + * Indicates if the materializer has been shut down. + */ + def isShutdown: Boolean + /** * INTERNAL API: this might become public later */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 91056e6908..4ad84b1337 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } import akka.actor._ import akka.dispatch.Dispatchers @@ -38,6 +38,13 @@ private[akka] case class ActorFlowMaterializerImpl( import ActorFlowMaterializerImpl._ import akka.stream.impl.Stages._ + private val haveShutDown = new AtomicBoolean(false) + + override def shutdown(): Unit = + if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill + + override def isShutdown: Boolean = haveShutDown.get() + override def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() @@ -59,6 +66,8 @@ private[akka] case class ActorFlowMaterializerImpl( } override def materialize[Mat](runnableFlow: Graph[ClosedShape, Mat]): Mat = { + if (haveShutDown.get()) + throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.") if (StreamLayout.Debug) runnableFlow.module.validate() val session = new MaterializerSession(runnableFlow.module) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala index e0d6a2bc8d..f7464f50b0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ReactiveStreamsCompliance.scala @@ -106,15 +106,15 @@ private[stream] object ReactiveStreamsCompliance { } } - final def tryRequest(subscription: Subscription, demand: Long, onError: (Throwable) ⇒ Unit): Unit = { + final def tryRequest(subscription: Subscription, demand: Long): Unit = { try subscription.request(demand) catch { - case NonFatal(t) ⇒ onError(new SignalThrewException("It is illegal to throw exceptions from request(), rule 3.16", t)) + case NonFatal(t) ⇒ throw new SignalThrewException("It is illegal to throw exceptions from request(), rule 3.16", t) } } - final def tryCancel(subscription: Subscription, onError: (Throwable) ⇒ Unit): Unit = { + final def tryCancel(subscription: Subscription): Unit = { try subscription.cancel() catch { - case NonFatal(t) ⇒ onError(new SignalThrewException("It is illegal to throw exceptions from cancel(), rule 3.15", t)) + case NonFatal(t) ⇒ throw new SignalThrewException("It is illegal to throw exceptions from cancel(), rule 3.15", t) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index def327ea91..80ef8b52fd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -52,7 +52,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) batchRemaining -= 1 if (batchRemaining == 0 && !upstreamCompleted) { - tryRequest(upstream, requestBatchSize, onError) + tryRequest(upstream, requestBatchSize) batchRemaining = requestBatchSize } @@ -94,7 +94,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) def cancel(): Unit = { if (!upstreamCompleted) { upstreamCompleted = true - if (upstream ne null) tryCancel(upstream, onError) + if (upstream ne null) tryCancel(upstream) downstreamWaiting = false clear() } @@ -115,21 +115,32 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) private def onSubscribe(subscription: Subscription): Unit = { assert(subscription != null) if (upstreamCompleted) - tryCancel(subscription, onError) + tryCancel(subscription) else if (downstreamCanceled) { upstreamCompleted = true - tryCancel(subscription, onError) + tryCancel(subscription) } else { upstream = subscription // Prefetch - tryRequest(upstream, inputBuffer.length, onError) + tryRequest(upstream, inputBuffer.length) subreceive.become(upstreamRunning) } } - private def onError(e: Throwable): Unit = { - upstreamCompleted = true - enterAndFail(e) + // Call this when an error happens that does not come from the usual onError channel + // (exceptions while calling RS interfaces, abrupt termination etc) + def onInternalError(e: Throwable): Unit = { + if (!(upstreamCompleted || downstreamCanceled) && (upstream ne null)) { + upstream.cancel() + } + onError(e) + } + + def onError(e: Throwable): Unit = { + if (!upstreamCompleted) { + upstreamCompleted = true + enterAndFail(e) + } } private def waitingForUpstream: Actor.Receive = { @@ -148,7 +159,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String) case OnComplete ⇒ onComplete() case OnError(cause) ⇒ onError(cause) - case OnSubscribe(subscription) ⇒ tryCancel(subscription, onError) // spec rule 2.5 + case OnSubscribe(subscription) ⇒ tryCancel(subscription) // spec rule 2.5 } } @@ -361,8 +372,13 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings } override def postStop(): Unit = { + // This should handle termination while interpreter is running. If the upstream have been closed already this + // call has no effect and therefore do the right thing: nothing. + try upstream.onInternalError(AbruptTerminationException(self)) + // Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream + // otherwise this will have no effect + finally downstream.fail(AbruptTerminationException(self)) upstream.cancel() - downstream.fail(AbruptTerminationException(self)) } override def postRestart(reason: Throwable): Unit = {