+str #17765: Add shutdown() to materializer, also fix interpreter interruption errors

This commit is contained in:
Endre Sándor Varga 2015-06-19 17:15:50 +02:00
parent e5305af485
commit 05aed95c91
6 changed files with 107 additions and 16 deletions

View file

@ -0,0 +1,54 @@
package akka.stream
import akka.stream.impl.{ StreamSupervisor, ActorFlowMaterializerImpl }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.AkkaSpec
import akka.testkit.{ ImplicitSender, TestProbe }
import scala.concurrent.Await
import scala.concurrent.duration._
class ActorMaterializerSpec extends AkkaSpec with ImplicitSender {
"ActorMaterializer" must {
"report shutdown status properly" in {
val m = ActorFlowMaterializer.create(system)
m.isShutdown should ===(false)
m.shutdown()
m.isShutdown should ===(true)
}
"properly shut down actors associated with it" in {
val m = ActorFlowMaterializer.create(system)
val f = Source.lazyEmpty[Int].runFold(0)(_ + _)(m)
m.shutdown()
an[AbruptTerminationException] should be thrownBy
Await.result(f, 3.seconds)
}
"refuse materialization after shutdown" in {
val m = ActorFlowMaterializer.create(system)
m.shutdown()
an[IllegalStateException] should be thrownBy
Source(1 to 5).runForeach(println)(m)
}
"shut down the supervisor actor it encapsulates" in {
val m = ActorFlowMaterializer.create(system).asInstanceOf[ActorFlowMaterializerImpl]
Source.lazyEmpty[Any].to(Sink.ignore).run()(m)
m.supervisor ! StreamSupervisor.GetChildren
expectMsgType[StreamSupervisor.Children]
m.shutdown()
m.supervisor ! StreamSupervisor.GetChildren
expectNoMsg(1.second)
}
}
}

View file

@ -136,7 +136,7 @@ class ActorInterpreterSpec extends AkkaSpec {
"satisfy larger demand" in largeDemand(1)
"handle spec violations" in {
a[SpecViolation] should be thrownBy {
a[AbruptTerminationException] should be thrownBy {
Await.result(
Source(new Publisher[String] {
def subscribe(s: Subscriber[_ >: String]) = {

View file

@ -147,6 +147,18 @@ abstract class ActorFlowMaterializer extends FlowMaterializer {
def effectiveSettings(opAttr: OperationAttributes): ActorFlowMaterializerSettings
/**
* Shuts down this materializer and all the stages that have been materialized through this materializer. After
* having shut down, this materializer cannot be used again. Any attempt to materialize stages after having
* shut down will result in an IllegalStateException being thrown at materialization time.
*/
def shutdown(): Unit
/**
* Indicates if the materializer has been shut down.
*/
def isShutdown: Boolean
/**
* INTERNAL API: this might become public later
*/

View file

@ -3,7 +3,7 @@
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import akka.actor._
import akka.dispatch.Dispatchers
@ -39,6 +39,13 @@ private[akka] case class ActorFlowMaterializerImpl(
import ActorFlowMaterializerImpl._
import akka.stream.impl.Stages._
private val haveShutDown = new AtomicBoolean(false)
override def shutdown(): Unit =
if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
override def isShutdown: Boolean = haveShutDown.get()
override def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
@ -60,6 +67,8 @@ private[akka] case class ActorFlowMaterializerImpl(
}
override def materialize[Mat](runnableFlow: Graph[ClosedShape, Mat]): Mat = {
if (haveShutDown.get())
throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.")
if (StreamLayout.Debug) runnableFlow.module.validate()
val session = new MaterializerSession(runnableFlow.module) {

View file

@ -106,15 +106,15 @@ private[stream] object ReactiveStreamsCompliance {
}
}
final def tryRequest(subscription: Subscription, demand: Long, onError: (Throwable) Unit): Unit = {
final def tryRequest(subscription: Subscription, demand: Long): Unit = {
try subscription.request(demand) catch {
case NonFatal(t) onError(new SignalThrewException("It is illegal to throw exceptions from request(), rule 3.16", t))
case NonFatal(t) throw new SignalThrewException("It is illegal to throw exceptions from request(), rule 3.16", t)
}
}
final def tryCancel(subscription: Subscription, onError: (Throwable) Unit): Unit = {
final def tryCancel(subscription: Subscription): Unit = {
try subscription.cancel() catch {
case NonFatal(t) onError(new SignalThrewException("It is illegal to throw exceptions from cancel(), rule 3.15", t))
case NonFatal(t) throw new SignalThrewException("It is illegal to throw exceptions from cancel(), rule 3.15", t)
}
}

View file

@ -52,7 +52,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
batchRemaining -= 1
if (batchRemaining == 0 && !upstreamCompleted) {
tryRequest(upstream, requestBatchSize, onError)
tryRequest(upstream, requestBatchSize)
batchRemaining = requestBatchSize
}
@ -94,7 +94,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
def cancel(): Unit = {
if (!upstreamCompleted) {
upstreamCompleted = true
if (upstream ne null) tryCancel(upstream, onError)
if (upstream ne null) tryCancel(upstream)
downstreamWaiting = false
clear()
}
@ -115,22 +115,33 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
private def onSubscribe(subscription: Subscription): Unit = {
assert(subscription != null)
if (upstreamCompleted)
tryCancel(subscription, onError)
tryCancel(subscription)
else if (downstreamCanceled) {
upstreamCompleted = true
tryCancel(subscription, onError)
tryCancel(subscription)
} else {
upstream = subscription
// Prefetch
tryRequest(upstream, inputBuffer.length, onError)
tryRequest(upstream, inputBuffer.length)
subreceive.become(upstreamRunning)
}
}
private def onError(e: Throwable): Unit = {
// Call this when an error happens that does not come from the usual onError channel
// (exceptions while calling RS interfaces, abrupt termination etc)
def onInternalError(e: Throwable): Unit = {
if (!(upstreamCompleted || downstreamCanceled) && (upstream ne null)) {
upstream.cancel()
}
onError(e)
}
def onError(e: Throwable): Unit = {
if (!upstreamCompleted) {
upstreamCompleted = true
enterAndFail(e)
}
}
private def waitingForUpstream: Actor.Receive = {
case OnComplete onComplete()
@ -148,7 +159,7 @@ private[akka] class BatchingActorInputBoundary(val size: Int, val name: String)
case OnComplete onComplete()
case OnError(cause) onError(cause)
case OnSubscribe(subscription) tryCancel(subscription, onError) // spec rule 2.5
case OnSubscribe(subscription) tryCancel(subscription) // spec rule 2.5
}
}
@ -361,8 +372,13 @@ private[akka] class ActorInterpreter(val settings: ActorFlowMaterializerSettings
}
override def postStop(): Unit = {
// This should handle termination while interpreter is running. If the upstream have been closed already this
// call has no effect and therefore do the right thing: nothing.
try upstream.onInternalError(AbruptTerminationException(self))
// Will only have an effect if the above call to the interpreter failed to emit a proper failure to the downstream
// otherwise this will have no effect
finally downstream.fail(AbruptTerminationException(self))
upstream.cancel()
downstream.fail(AbruptTerminationException(self))
}
override def postRestart(reason: Throwable): Unit = {