Merge pull request #15758 from akka/wip-15757-refactor-actorsystem-termination-√
Wip 15757 refactor actorsystem termination √
This commit is contained in:
commit
4f463a058b
52 changed files with 192 additions and 173 deletions
|
|
@ -195,7 +195,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
system2.shutdown()
|
system2.terminate()
|
||||||
Await.ready(latch, 5 seconds)
|
Await.ready(latch, 5 seconds)
|
||||||
|
|
||||||
val expected = (for (i ← 1 to count) yield i).reverse
|
val expected = (for (i ← 1 to count) yield i).reverse
|
||||||
|
|
@ -213,28 +213,36 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
|
||||||
callbackWasRun = true
|
callbackWasRun = true
|
||||||
}
|
}
|
||||||
import system.dispatcher
|
import system.dispatcher
|
||||||
system2.scheduler.scheduleOnce(200.millis.dilated) { system2.shutdown() }
|
system2.scheduler.scheduleOnce(200.millis.dilated) { system2.terminate() }
|
||||||
|
|
||||||
system2.awaitTermination(5 seconds)
|
system2.awaitTermination(5 seconds)
|
||||||
|
Await.ready(system2.whenTerminated, 5 seconds)
|
||||||
callbackWasRun should be(true)
|
callbackWasRun should be(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
"return isTerminated status correctly" in {
|
"return isTerminated status correctly" in {
|
||||||
val system = ActorSystem()
|
val system = ActorSystem().asInstanceOf[ActorSystemImpl]
|
||||||
system.isTerminated should be(false)
|
system.isTerminated should be(false)
|
||||||
system.shutdown()
|
val wt = system.whenTerminated
|
||||||
|
wt.isCompleted should be(false)
|
||||||
|
val f = system.terminate()
|
||||||
|
val terminated = Await.result(wt, 10 seconds)
|
||||||
|
terminated.actor should be(system.provider.rootGuardian)
|
||||||
|
terminated.addressTerminated should be(true)
|
||||||
|
terminated.existenceConfirmed should be(true)
|
||||||
|
terminated should be theSameInstanceAs Await.result(f, 10 seconds)
|
||||||
system.awaitTermination(10 seconds)
|
system.awaitTermination(10 seconds)
|
||||||
system.isTerminated should be(true)
|
system.isTerminated should be(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
"throw RejectedExecutionException when shutdown" in {
|
"throw RejectedExecutionException when shutdown" in {
|
||||||
val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf)
|
val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf)
|
||||||
system2.shutdown()
|
Await.ready(system2.terminate(), 10 seconds)
|
||||||
system2.awaitTermination(10 seconds)
|
system2.awaitTermination(10 seconds)
|
||||||
|
|
||||||
intercept[RejectedExecutionException] {
|
intercept[RejectedExecutionException] {
|
||||||
system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") }
|
system2.registerOnTermination { println("IF YOU SEE THIS THEN THERE'S A BUG HERE") }
|
||||||
}.getMessage should be("Must be called prior to system shutdown.")
|
}.getMessage should be("ActorSystem already terminated.")
|
||||||
}
|
}
|
||||||
|
|
||||||
"reliably create waves of actors" in {
|
"reliably create waves of actors" in {
|
||||||
|
|
@ -252,10 +260,10 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
|
||||||
"reliable deny creation of actors while shutting down" in {
|
"reliable deny creation of actors while shutting down" in {
|
||||||
val system = ActorSystem()
|
val system = ActorSystem()
|
||||||
import system.dispatcher
|
import system.dispatcher
|
||||||
system.scheduler.scheduleOnce(200 millis) { system.shutdown() }
|
system.scheduler.scheduleOnce(200 millis) { system.terminate() }
|
||||||
var failing = false
|
var failing = false
|
||||||
var created = Vector.empty[ActorRef]
|
var created = Vector.empty[ActorRef]
|
||||||
while (!system.isTerminated) {
|
while (!system.whenTerminated.isCompleted) {
|
||||||
try {
|
try {
|
||||||
val t = system.actorOf(Props[ActorSystemSpec.Terminater])
|
val t = system.actorOf(Props[ActorSystemSpec.Terminater])
|
||||||
failing should not be true // because once failing => always failing (it’s due to shutdown)
|
failing should not be true // because once failing => always failing (it’s due to shutdown)
|
||||||
|
|
@ -278,7 +286,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
|
||||||
implicit val system = ActorSystem("Stop", AkkaSpec.testConf)
|
implicit val system = ActorSystem("Stop", AkkaSpec.testConf)
|
||||||
EventFilter[ActorKilledException]() intercept {
|
EventFilter[ActorKilledException]() intercept {
|
||||||
system.actorSelection("/user") ! Kill
|
system.actorSelection("/user") ! Kill
|
||||||
awaitCond(system.isTerminated)
|
Await.ready(system.whenTerminated, Duration.Inf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -313,7 +321,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
|
||||||
}))
|
}))
|
||||||
EventFilter[Exception]("hello") intercept {
|
EventFilter[Exception]("hello") intercept {
|
||||||
a ! "die"
|
a ! "die"
|
||||||
awaitCond(system.isTerminated)
|
Await.ready(system.whenTerminated, Duration.Inf)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -111,7 +111,7 @@ object Chameneos {
|
||||||
val actor = system.actorOf(Props(new Mall(1000000, 4)))
|
val actor = system.actorOf(Props(new Mall(1000000, 4)))
|
||||||
Thread.sleep(10000)
|
Thread.sleep(10000)
|
||||||
println("Elapsed: " + (end - start))
|
println("Elapsed: " + (end - start))
|
||||||
system.shutdown()
|
system.terminate()
|
||||||
}
|
}
|
||||||
|
|
||||||
def main(args: Array[String]): Unit = run()
|
def main(args: Array[String]): Unit = run()
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,7 @@ object Main {
|
||||||
val app = system.actorOf(Props(appClass), "app")
|
val app = system.actorOf(Props(appClass), "app")
|
||||||
val terminator = system.actorOf(Props(classOf[Terminator], app), "app-terminator")
|
val terminator = system.actorOf(Props(classOf[Terminator], app), "app-terminator")
|
||||||
} catch {
|
} catch {
|
||||||
case NonFatal(e) ⇒ system.shutdown(); throw e
|
case NonFatal(e) ⇒ system.terminate(); throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -43,7 +43,7 @@ object Main {
|
||||||
def receive = {
|
def receive = {
|
||||||
case Terminated(_) ⇒
|
case Terminated(_) ⇒
|
||||||
log.info("application supervisor has terminated, shutting down")
|
log.info("application supervisor has terminated, shutting down")
|
||||||
context.system.shutdown()
|
context.system.terminate()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -156,7 +156,7 @@ trait ActorRefProvider {
|
||||||
* This Future is completed upon termination of this ActorRefProvider, which
|
* This Future is completed upon termination of this ActorRefProvider, which
|
||||||
* is usually initiated by stopping the guardian via ActorSystem.stop().
|
* is usually initiated by stopping the guardian via ActorSystem.stop().
|
||||||
*/
|
*/
|
||||||
def terminationFuture: Future[Unit]
|
def terminationFuture: Future[Terminated]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Obtain the address which is to be used within sender references when
|
* Obtain the address which is to be used within sender references when
|
||||||
|
|
@ -463,45 +463,53 @@ private[akka] class LocalActorRefProvider private[akka] (
|
||||||
override val deadLetters: InternalActorRef =
|
override val deadLetters: InternalActorRef =
|
||||||
_deadLetters.getOrElse((p: ActorPath) ⇒ new DeadLetterActorRef(this, p, eventStream)).apply(rootPath / "deadLetters")
|
_deadLetters.getOrElse((p: ActorPath) ⇒ new DeadLetterActorRef(this, p, eventStream)).apply(rootPath / "deadLetters")
|
||||||
|
|
||||||
|
private[this] final val terminationPromise: Promise[Terminated] = Promise[Terminated]()
|
||||||
|
|
||||||
|
def terminationFuture: Future[Terminated] = terminationPromise.future
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* generate name for temporary actor refs
|
* generate name for temporary actor refs
|
||||||
*/
|
*/
|
||||||
private val tempNumber = new AtomicLong
|
private val tempNumber = new AtomicLong
|
||||||
|
|
||||||
private def tempName() = Helpers.base64(tempNumber.getAndIncrement())
|
|
||||||
|
|
||||||
private val tempNode = rootPath / "temp"
|
private val tempNode = rootPath / "temp"
|
||||||
|
|
||||||
override def tempPath(): ActorPath = tempNode / tempName()
|
override def tempPath(): ActorPath = tempNode / Helpers.base64(tempNumber.getAndIncrement())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Top-level anchor for the supervision hierarchy of this actor system. Will
|
* Top-level anchor for the supervision hierarchy of this actor system. Will
|
||||||
* receive only Supervise/ChildTerminated system messages or Failure message.
|
* receive only Supervise/ChildTerminated system messages or Failure message.
|
||||||
*/
|
*/
|
||||||
private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: InternalActorRef = new MinimalActorRef {
|
private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: InternalActorRef = new MinimalActorRef {
|
||||||
val stopped = new Switch(false)
|
val causeOfTermination: Promise[Terminated] = Promise[Terminated]()
|
||||||
|
|
||||||
@volatile
|
|
||||||
var causeOfTermination: Option[Throwable] = None
|
|
||||||
|
|
||||||
val path = rootPath / "bubble-walker"
|
val path = rootPath / "bubble-walker"
|
||||||
|
|
||||||
def provider: ActorRefProvider = LocalActorRefProvider.this
|
def provider: ActorRefProvider = LocalActorRefProvider.this
|
||||||
|
|
||||||
override def stop(): Unit = stopped switchOn { terminationPromise.complete(causeOfTermination.map(Failure(_)).getOrElse(Success(()))) }
|
def isWalking = causeOfTermination.future.isCompleted == false
|
||||||
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2") override def isTerminated: Boolean = stopped.isOn
|
|
||||||
|
|
||||||
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match {
|
override def stop(): Unit = {
|
||||||
|
causeOfTermination.trySuccess(Terminated(provider.rootGuardian)(existenceConfirmed = true, addressTerminated = true)) //Idempotent
|
||||||
|
terminationPromise.tryCompleteWith(causeOfTermination.future) // Signal termination downstream, idempotent
|
||||||
|
}
|
||||||
|
|
||||||
|
@deprecated("Use context.watch(actor) and receive Terminated(actor)", "2.2")
|
||||||
|
override def isTerminated: Boolean = !isWalking
|
||||||
|
|
||||||
|
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit =
|
||||||
|
if (isWalking)
|
||||||
|
message match {
|
||||||
case null ⇒ throw new InvalidMessageException("Message is null")
|
case null ⇒ throw new InvalidMessageException("Message is null")
|
||||||
case _ ⇒ log.error(s"$this received unexpected message [$message]")
|
case _ ⇒ log.error(s"$this received unexpected message [$message]")
|
||||||
})
|
}
|
||||||
|
|
||||||
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
|
override def sendSystemMessage(message: SystemMessage): Unit = if (isWalking) {
|
||||||
message match {
|
message match {
|
||||||
case Failed(child, ex, _) ⇒
|
case Failed(child: InternalActorRef, ex, _) ⇒
|
||||||
log.error(ex, s"guardian $child failed, shutting down!")
|
log.error(ex, s"guardian $child failed, shutting down!")
|
||||||
causeOfTermination = Some(ex)
|
causeOfTermination.tryFailure(ex)
|
||||||
child.asInstanceOf[InternalActorRef].stop()
|
child.stop()
|
||||||
case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead
|
case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead
|
||||||
case _: DeathWatchNotification ⇒ stop()
|
case _: DeathWatchNotification ⇒ stop()
|
||||||
case _ ⇒ log.error(s"$this received unexpected system message [$message]")
|
case _ ⇒ log.error(s"$this received unexpected system message [$message]")
|
||||||
|
|
@ -519,10 +527,6 @@ private[akka] class LocalActorRefProvider private[akka] (
|
||||||
@volatile
|
@volatile
|
||||||
private var system: ActorSystemImpl = _
|
private var system: ActorSystemImpl = _
|
||||||
|
|
||||||
lazy val terminationPromise: Promise[Unit] = Promise[Unit]()
|
|
||||||
|
|
||||||
def terminationFuture: Future[Unit] = terminationPromise.future
|
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
private var extraNames: Map[String, InternalActorRef] = Map()
|
private var extraNames: Map[String, InternalActorRef] = Map()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package akka.actor
|
||||||
|
|
||||||
import java.io.Closeable
|
import java.io.Closeable
|
||||||
import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
|
import java.util.concurrent.{ ConcurrentHashMap, ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
|
||||||
|
import java.util.concurrent.atomic.{ AtomicReference }
|
||||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import com.typesafe.config.{ Config, ConfigFactory }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
import akka.event._
|
import akka.event._
|
||||||
|
|
@ -17,7 +18,7 @@ import akka.util._
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import scala.concurrent.duration.{ FiniteDuration, Duration }
|
import scala.concurrent.duration.{ FiniteDuration, Duration }
|
||||||
import scala.concurrent.{ Await, Awaitable, CanAwait, Future, ExecutionContext, ExecutionContextExecutor }
|
import scala.concurrent.{ Await, Future, Promise, ExecutionContext, ExecutionContextExecutor }
|
||||||
import scala.util.{ Failure, Success, Try }
|
import scala.util.{ Failure, Success, Try }
|
||||||
import scala.util.control.{ NonFatal, ControlThrowable }
|
import scala.util.control.{ NonFatal, ControlThrowable }
|
||||||
|
|
||||||
|
|
@ -391,12 +392,14 @@ abstract class ActorSystem extends ActorRefFactory {
|
||||||
*
|
*
|
||||||
* @throws TimeoutException in case of timeout
|
* @throws TimeoutException in case of timeout
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use Await.result(whenTerminated, timeout) instead", "2.4")
|
||||||
def awaitTermination(timeout: Duration): Unit
|
def awaitTermination(timeout: Duration): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block current thread until the system has been shutdown. This will
|
* Block current thread until the system has been shutdown. This will
|
||||||
* block until after all on termination callbacks have been run.
|
* block until after all on termination callbacks have been run.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use Await.result(whenTerminated, Duration.Inf) instead", "2.4")
|
||||||
def awaitTermination(): Unit
|
def awaitTermination(): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -405,6 +408,7 @@ abstract class ActorSystem extends ActorRefFactory {
|
||||||
* (below which the logging actors reside) and the execute all registered
|
* (below which the logging actors reside) and the execute all registered
|
||||||
* termination handlers (see [[ActorSystem.registerOnTermination]]).
|
* termination handlers (see [[ActorSystem.registerOnTermination]]).
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use the terminate() method instead", "2.4")
|
||||||
def shutdown(): Unit
|
def shutdown(): Unit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -414,8 +418,23 @@ abstract class ActorSystem extends ActorRefFactory {
|
||||||
* returns `false`, the status is actually unknown, since it might have
|
* returns `false`, the status is actually unknown, since it might have
|
||||||
* changed since you queried it.
|
* changed since you queried it.
|
||||||
*/
|
*/
|
||||||
|
@deprecated("Use the whenTerminated method instead.", "2.4")
|
||||||
def isTerminated: Boolean
|
def isTerminated: Boolean
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Terminates this actor system. This will stop the guardian actor, which in turn
|
||||||
|
* will recursively stop all its child actors, then the system guardian
|
||||||
|
* (below which the logging actors reside) and the execute all registered
|
||||||
|
* termination handlers (see [[ActorSystem.registerOnTermination]]).
|
||||||
|
*/
|
||||||
|
def terminate(): Future[Terminated]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a Future which will be completed after the ActorSystem has been terminated
|
||||||
|
* and termination hooks have been executed.
|
||||||
|
*/
|
||||||
|
def whenTerminated: Future[Terminated]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Registers the provided extension and creates its payload, if this extension isn't already registered
|
* Registers the provided extension and creates its payload, if this extension isn't already registered
|
||||||
* This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization
|
* This method has putIfAbsent-semantics, this method can potentially block, waiting for the initialization
|
||||||
|
|
@ -533,7 +552,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.error(cause, "Uncaught fatal error from thread [{}] shutting down ActorSystem [{}]", thread.getName, name)
|
log.error(cause, "Uncaught fatal error from thread [{}] shutting down ActorSystem [{}]", thread.getName, name)
|
||||||
shutdown()
|
terminate()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -618,7 +637,9 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
override def reportFailure(t: Throwable): Unit = dispatcher reportFailure t
|
override def reportFailure(t: Throwable): Unit = dispatcher reportFailure t
|
||||||
})
|
})
|
||||||
|
|
||||||
def terminationFuture: Future[Unit] = provider.terminationFuture
|
private[this] final val terminationCallbacks = new TerminationCallbacks(provider.terminationFuture)(dispatcher)
|
||||||
|
|
||||||
|
override def whenTerminated: Future[Terminated] = terminationCallbacks.terminationFuture
|
||||||
def lookupRoot: InternalActorRef = provider.rootGuardian
|
def lookupRoot: InternalActorRef = provider.rootGuardian
|
||||||
def guardian: LocalActorRef = provider.guardian
|
def guardian: LocalActorRef = provider.guardian
|
||||||
def systemGuardian: LocalActorRef = provider.systemGuardian
|
def systemGuardian: LocalActorRef = provider.systemGuardian
|
||||||
|
|
@ -638,29 +659,23 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
this
|
this
|
||||||
} catch {
|
} catch {
|
||||||
case NonFatal(e) ⇒
|
case NonFatal(e) ⇒
|
||||||
try {
|
try terminate() catch { case NonFatal(_) ⇒ Try(stopScheduler()) }
|
||||||
shutdown()
|
|
||||||
} catch { case NonFatal(_) ⇒ Try(stopScheduler()) }
|
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
||||||
def start(): this.type = _start
|
def start(): this.type = _start
|
||||||
|
|
||||||
private lazy val terminationCallbacks = {
|
|
||||||
implicit val d = dispatcher
|
|
||||||
val callbacks = new TerminationCallbacks
|
|
||||||
terminationFuture onComplete (_ ⇒ callbacks.run)
|
|
||||||
callbacks
|
|
||||||
}
|
|
||||||
def registerOnTermination[T](code: ⇒ T) { registerOnTermination(new Runnable { def run = code }) }
|
def registerOnTermination[T](code: ⇒ T) { registerOnTermination(new Runnable { def run = code }) }
|
||||||
def registerOnTermination(code: Runnable) { terminationCallbacks.add(code) }
|
def registerOnTermination(code: Runnable) { terminationCallbacks.add(code) }
|
||||||
def awaitTermination(timeout: Duration) { Await.ready(terminationCallbacks, timeout) }
|
override def awaitTermination(timeout: Duration) { Await.ready(whenTerminated, timeout) }
|
||||||
def awaitTermination() = awaitTermination(Duration.Inf)
|
override def awaitTermination() = awaitTermination(Duration.Inf)
|
||||||
def isTerminated = terminationCallbacks.isTerminated
|
override def isTerminated = whenTerminated.isCompleted
|
||||||
|
|
||||||
def shutdown(): Unit = {
|
override def shutdown(): Unit = terminate()
|
||||||
|
|
||||||
|
override def terminate(): Future[Terminated] = {
|
||||||
if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener foreach stop
|
if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener foreach stop
|
||||||
guardian.stop()
|
guardian.stop()
|
||||||
|
whenTerminated
|
||||||
}
|
}
|
||||||
|
|
||||||
@volatile var aborting = false
|
@volatile var aborting = false
|
||||||
|
|
@ -673,7 +688,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
*/
|
*/
|
||||||
def abort(): Unit = {
|
def abort(): Unit = {
|
||||||
aborting = true
|
aborting = true
|
||||||
shutdown()
|
terminate()
|
||||||
}
|
}
|
||||||
|
|
||||||
//#create-scheduler
|
//#create-scheduler
|
||||||
|
|
@ -798,44 +813,31 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
printNode(lookupRoot, "")
|
printNode(lookupRoot, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
final class TerminationCallbacks extends Runnable with Awaitable[Unit] {
|
final class TerminationCallbacks[T](upStreamTerminated: Future[T])(implicit ec: ExecutionContext) {
|
||||||
private val lock = new ReentrantGuard
|
private[this] final val done = Promise[T]()
|
||||||
private var callbacks: List[Runnable] = _ //non-volatile since guarded by the lock
|
private[this] final val ref = new AtomicReference(done)
|
||||||
lock withGuard { callbacks = Nil }
|
|
||||||
|
|
||||||
private val latch = new CountDownLatch(1)
|
// onComplete never fires twice so safe to avoid nullcheck
|
||||||
|
upStreamTerminated onComplete { t ⇒ ref.getAndSet(null).complete(t) }
|
||||||
|
|
||||||
final def add(callback: Runnable): Unit = {
|
/**
|
||||||
latch.getCount match {
|
* Adds a Runnable that will be executed on ActorSystem termination.
|
||||||
case 0 ⇒ throw new RejectedExecutionException("Must be called prior to system shutdown.")
|
* Note that callbacks are executed in reverse order of insertion.
|
||||||
case _ ⇒ lock withGuard {
|
* @param r The callback to be executed on ActorSystem termination
|
||||||
if (latch.getCount == 0) throw new RejectedExecutionException("Must be called prior to system shutdown.")
|
* @throws RejectedExecutionException if called after ActorSystem has been terminated
|
||||||
else callbacks ::= callback
|
*/
|
||||||
}
|
final def add(r: Runnable): Unit = {
|
||||||
|
@tailrec def addRec(r: Runnable, p: Promise[T]): Unit = ref.get match {
|
||||||
|
case null ⇒ throw new RejectedExecutionException("ActorSystem already terminated.")
|
||||||
|
case some if ref.compareAndSet(some, p) ⇒ some.completeWith(p.future.andThen { case _ ⇒ r.run() })
|
||||||
|
case _ ⇒ addRec(r, p)
|
||||||
}
|
}
|
||||||
|
addRec(r, Promise[T]())
|
||||||
}
|
}
|
||||||
|
|
||||||
final def run(): Unit = lock withGuard {
|
/**
|
||||||
@tailrec def runNext(c: List[Runnable]): List[Runnable] = c match {
|
* Returns a Future which will be completed once all registered callbacks have been executed.
|
||||||
case Nil ⇒ Nil
|
*/
|
||||||
case callback :: rest ⇒
|
def terminationFuture: Future[T] = done.future
|
||||||
try callback.run() catch { case NonFatal(e) ⇒ log.error(e, "Failed to run termination callback, due to [{}]", e.getMessage) }
|
|
||||||
runNext(rest)
|
|
||||||
}
|
|
||||||
try { callbacks = runNext(callbacks) } finally latch.countDown()
|
|
||||||
}
|
|
||||||
|
|
||||||
final def ready(atMost: Duration)(implicit permit: CanAwait): this.type = {
|
|
||||||
if (atMost.isFinite()) {
|
|
||||||
if (!latch.await(atMost.length, atMost.unit))
|
|
||||||
throw new TimeoutException("Await termination timed out after [%s]" format (atMost.toString))
|
|
||||||
} else latch.await()
|
|
||||||
|
|
||||||
this
|
|
||||||
}
|
|
||||||
|
|
||||||
final def result(atMost: Duration)(implicit permit: CanAwait): Unit = ready(atMost)
|
|
||||||
|
|
||||||
final def isTerminated: Boolean = latch.getCount == 0
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
package akka.cluster
|
package akka.cluster
|
||||||
|
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import org.scalatest.BeforeAndAfter
|
import org.scalatest.BeforeAndAfter
|
||||||
|
|
@ -237,7 +238,7 @@ abstract class ClusterDeathWatchSpec
|
||||||
enterBarrier("first-unavailable")
|
enterBarrier("first-unavailable")
|
||||||
|
|
||||||
val timeout = remainingOrDefault
|
val timeout = remainingOrDefault
|
||||||
try system.awaitTermination(timeout) catch {
|
try Await.ready(system.whenTerminated, timeout) catch {
|
||||||
case _: TimeoutException ⇒
|
case _: TimeoutException ⇒
|
||||||
fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout,
|
fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout,
|
||||||
system.asInstanceOf[ActorSystemImpl].printTree))
|
system.asInstanceOf[ActorSystemImpl].printTree))
|
||||||
|
|
|
||||||
|
|
@ -53,10 +53,8 @@ abstract class RestartFirstSeedNodeSpec
|
||||||
|
|
||||||
override def afterAll(): Unit = {
|
override def afterAll(): Unit = {
|
||||||
runOn(seed1) {
|
runOn(seed1) {
|
||||||
if (seed1System.isTerminated)
|
shutdown(
|
||||||
shutdown(restartedSeed1System)
|
if (seed1System.whenTerminated.isCompleted) restartedSeed1System else seed1System)
|
||||||
else
|
|
||||||
shutdown(seed1System)
|
|
||||||
}
|
}
|
||||||
super.afterAll()
|
super.afterAll()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.cluster
|
||||||
|
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
|
@ -159,7 +160,7 @@ abstract class UnreachableNodeJoinsAgainSpec
|
||||||
|
|
||||||
runOn(victim) {
|
runOn(victim) {
|
||||||
val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
val victimAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
|
||||||
system.awaitTermination(10 seconds)
|
Await.ready(system.whenTerminated, 10 seconds)
|
||||||
// create new ActorSystem with same host:port
|
// create new ActorSystem with same host:port
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.netty.tcp {
|
akka.remote.netty.tcp {
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,7 @@ class MyActor extends Actor {
|
||||||
}
|
}
|
||||||
|
|
||||||
override def postStop() {
|
override def postStop() {
|
||||||
context.system.shutdown()
|
context.system.terminate()
|
||||||
}
|
}
|
||||||
//#business-logic-elided
|
//#business-logic-elided
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -26,7 +26,7 @@ class ClusterSingletonProxySpec extends WordSpecLike with Matchers with BeforeAn
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterAll() = testSystems.foreach(_.system.shutdown())
|
override def afterAll() = testSystems.foreach(_.system.terminate())
|
||||||
}
|
}
|
||||||
|
|
||||||
object ClusterSingletonProxySpec {
|
object ClusterSingletonProxySpec {
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,6 @@ import com.typesafe.config.ConfigFactory;
|
||||||
|
|
||||||
import static akka.japi.Util.classTag;
|
import static akka.japi.Util.classTag;
|
||||||
|
|
||||||
import static akka.actor.SupervisorStrategy.resume;
|
|
||||||
import static akka.actor.SupervisorStrategy.restart;
|
import static akka.actor.SupervisorStrategy.restart;
|
||||||
import static akka.actor.SupervisorStrategy.stop;
|
import static akka.actor.SupervisorStrategy.stop;
|
||||||
import static akka.actor.SupervisorStrategy.escalate;
|
import static akka.actor.SupervisorStrategy.escalate;
|
||||||
|
|
@ -76,12 +75,12 @@ public class FaultHandlingDocSample {
|
||||||
log.info("Current progress: {} %", progress.percent);
|
log.info("Current progress: {} %", progress.percent);
|
||||||
if (progress.percent >= 100.0) {
|
if (progress.percent >= 100.0) {
|
||||||
log.info("That's all, shutting down");
|
log.info("That's all, shutting down");
|
||||||
getContext().system().shutdown();
|
getContext().system().terminate();
|
||||||
}
|
}
|
||||||
} else if (msg == ReceiveTimeout.getInstance()) {
|
} else if (msg == ReceiveTimeout.getInstance()) {
|
||||||
// No progress within 15 seconds, ServiceUnavailable
|
// No progress within 15 seconds, ServiceUnavailable
|
||||||
log.error("Shutting down due to unavailable service");
|
log.error("Shutting down due to unavailable service");
|
||||||
getContext().system().shutdown();
|
getContext().system().terminate();
|
||||||
} else {
|
} else {
|
||||||
unhandled(msg);
|
unhandled(msg);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ public class EchoServer {
|
||||||
watcher.tell(ackServer, ActorRef.noSender());
|
watcher.tell(ackServer, ActorRef.noSender());
|
||||||
latch.await(10, TimeUnit.MINUTES);
|
latch.await(10, TimeUnit.MINUTES);
|
||||||
} finally {
|
} finally {
|
||||||
system.shutdown();
|
system.terminate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -678,7 +678,7 @@ actors does not respond (i.e. processing a message for extended periods of time
|
||||||
and therefore not receiving the stop command), this whole process will be
|
and therefore not receiving the stop command), this whole process will be
|
||||||
stuck.
|
stuck.
|
||||||
|
|
||||||
Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be
|
Upon :meth:`ActorSystem.terminate()`, the system guardian actors will be
|
||||||
stopped, and the aforementioned process will ensure proper termination of the
|
stopped, and the aforementioned process will ensure proper termination of the
|
||||||
whole system.
|
whole system.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -621,7 +621,7 @@ actors does not respond (i.e. processing a message for extended periods of time
|
||||||
and therefore not receiving the stop command), this whole process will be
|
and therefore not receiving the stop command), this whole process will be
|
||||||
stuck.
|
stuck.
|
||||||
|
|
||||||
Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be
|
Upon :meth:`ActorSystem.terminate()`, the system guardian actors will be
|
||||||
stopped, and the aforementioned process will ensure proper termination of the
|
stopped, and the aforementioned process will ensure proper termination of the
|
||||||
whole system.
|
whole system.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -672,7 +672,7 @@ actors does not respond (i.e. processing a message for extended periods of time
|
||||||
and therefore not receiving the stop command), this whole process will be
|
and therefore not receiving the stop command), this whole process will be
|
||||||
stuck.
|
stuck.
|
||||||
|
|
||||||
Upon :meth:`ActorSystem.shutdown()`, the system guardian actors will be
|
Upon :meth:`ActorSystem.terminate()`, the system guardian actors will be
|
||||||
stopped, and the aforementioned process will ensure proper termination of the
|
stopped, and the aforementioned process will ensure proper termination of the
|
||||||
whole system.
|
whole system.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -53,13 +53,13 @@ class Listener extends Actor with ActorLogging {
|
||||||
log.info("Current progress: {} %", percent)
|
log.info("Current progress: {} %", percent)
|
||||||
if (percent >= 100.0) {
|
if (percent >= 100.0) {
|
||||||
log.info("That's all, shutting down")
|
log.info("That's all, shutting down")
|
||||||
context.system.shutdown()
|
context.system.terminate()
|
||||||
}
|
}
|
||||||
|
|
||||||
case ReceiveTimeout =>
|
case ReceiveTimeout =>
|
||||||
// No progress within 15 seconds, ServiceUnavailable
|
// No progress within 15 seconds, ServiceUnavailable
|
||||||
log.error("Shutting down due to unavailable service")
|
log.error("Shutting down due to unavailable service")
|
||||||
context.system.shutdown()
|
context.system.terminate()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ object EchoServer extends App {
|
||||||
|
|
||||||
// make sure to stop the system so that the application stops
|
// make sure to stop the system so that the application stops
|
||||||
try run()
|
try run()
|
||||||
finally system.shutdown()
|
finally system.terminate()
|
||||||
|
|
||||||
def run(): Unit = {
|
def run(): Unit = {
|
||||||
import ActorDSL._
|
import ActorDSL._
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,9 @@ import java.net.InetSocketAddress
|
||||||
import akka.testkit.{ ImplicitSender, TestProbe, AkkaSpec }
|
import akka.testkit.{ ImplicitSender, TestProbe, AkkaSpec }
|
||||||
import akka.util.ByteString
|
import akka.util.ByteString
|
||||||
|
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration.Duration
|
||||||
|
|
||||||
object PullReadingExample {
|
object PullReadingExample {
|
||||||
|
|
||||||
class Listener(monitor: ActorRef) extends Actor {
|
class Listener(monitor: ActorRef) extends Actor {
|
||||||
|
|
@ -77,7 +80,6 @@ class PullReadingSpec extends AkkaSpec with ImplicitSender {
|
||||||
client.send(connection, ResumeReading)
|
client.send(connection, ResumeReading)
|
||||||
client.expectMsg(Received(ByteString("hello")))
|
client.expectMsg(Received(ByteString("hello")))
|
||||||
|
|
||||||
system.shutdown()
|
Await.ready(system.terminate(), Duration.Inf)
|
||||||
system.awaitTermination
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -322,7 +322,7 @@ trait PersistenceDocSpec {
|
||||||
|
|
||||||
processor ! PersistentBatch(List(Persistent("a"), Persistent("b")))
|
processor ! PersistentBatch(List(Persistent("a"), Persistent("b")))
|
||||||
//#batch-write
|
//#batch-write
|
||||||
system.shutdown()
|
system.terminate()
|
||||||
}
|
}
|
||||||
|
|
||||||
new AnyRef {
|
new AnyRef {
|
||||||
|
|
|
||||||
|
|
@ -284,7 +284,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
|
||||||
//#put-your-test-code-here
|
//#put-your-test-code-here
|
||||||
val probe = TestProbe()
|
val probe = TestProbe()
|
||||||
probe.send(testActor, "hello")
|
probe.send(testActor, "hello")
|
||||||
try expectMsg("hello") catch { case NonFatal(e) => system.shutdown(); throw e }
|
try expectMsg("hello") catch { case NonFatal(e) => system.terminate(); throw e }
|
||||||
//#put-your-test-code-here
|
//#put-your-test-code-here
|
||||||
|
|
||||||
shutdown(system)
|
shutdown(system)
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,7 @@ import scala.collection.JavaConverters._
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* def shutdown = {
|
* def shutdown = {
|
||||||
* system.shutdown()
|
* system.terminate()
|
||||||
* }
|
* }
|
||||||
* }
|
* }
|
||||||
* }}}
|
* }}}
|
||||||
|
|
|
||||||
|
|
@ -31,8 +31,10 @@ trait Player { this: TestConductorExt ⇒
|
||||||
|
|
||||||
private var _client: ActorRef = _
|
private var _client: ActorRef = _
|
||||||
private def client = _client match {
|
private def client = _client match {
|
||||||
case null ⇒ throw new IllegalStateException("TestConductor client not yet started")
|
case null ⇒
|
||||||
case _ if system.isTerminated ⇒ throw new IllegalStateException("TestConductor unavailable because system is shutdown; you need to startNewSystem() before this point")
|
throw new IllegalStateException("TestConductor client not yet started")
|
||||||
|
case _ if system.whenTerminated.isCompleted ⇒
|
||||||
|
throw new IllegalStateException("TestConductor unavailable because system is terminated; you need to startNewSystem() before this point")
|
||||||
case x ⇒ x
|
case x ⇒ x
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -241,7 +243,7 @@ private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress)
|
||||||
// FIXME: Currently ignoring, needs support from Remoting
|
// FIXME: Currently ignoring, needs support from Remoting
|
||||||
stay
|
stay
|
||||||
case TerminateMsg(Left(false)) ⇒
|
case TerminateMsg(Left(false)) ⇒
|
||||||
context.system.shutdown()
|
context.system.terminate()
|
||||||
stay
|
stay
|
||||||
case TerminateMsg(Left(true)) ⇒
|
case TerminateMsg(Left(true)) ⇒
|
||||||
context.system.asInstanceOf[ActorSystemImpl].abort()
|
context.system.asInstanceOf[ActorSystemImpl].abort()
|
||||||
|
|
|
||||||
|
|
@ -82,7 +82,7 @@ abstract class ActorSystemActivator extends BundleActivator {
|
||||||
*/
|
*/
|
||||||
def stop(context: BundleContext): Unit = {
|
def stop(context: BundleContext): Unit = {
|
||||||
registration foreach (_.unregister())
|
registration foreach (_.unregister())
|
||||||
system foreach (_.shutdown())
|
system foreach (_.terminate())
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -51,11 +51,11 @@ class PingPongActorSystemActivatorTest extends WordSpec with Matchers with PojoS
|
||||||
"stop the ActorSystem when bundle stops" in {
|
"stop the ActorSystem when bundle stops" in {
|
||||||
filterErrors() {
|
filterErrors() {
|
||||||
val system = serviceForType[ActorSystem]
|
val system = serviceForType[ActorSystem]
|
||||||
system.isTerminated should be(false)
|
system.whenTerminated.isCompleted should be(false)
|
||||||
|
|
||||||
bundleForName(TEST_BUNDLE_NAME).stop()
|
bundleForName(TEST_BUNDLE_NAME).stop()
|
||||||
system.awaitTermination()
|
Await.ready(system.whenTerminated, Duration.Inf)
|
||||||
system.isTerminated should be(true)
|
system.whenTerminated.isCompleted should be(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,8 @@ import akka.testkit._
|
||||||
|
|
||||||
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
|
import akka.persistence.AtLeastOnceDelivery.AtLeastOnceDeliverySnapshot
|
||||||
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
|
import akka.persistence.AtLeastOnceDelivery.UnconfirmedDelivery
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration.Duration
|
||||||
|
|
||||||
object SerializerSpecConfigs {
|
object SerializerSpecConfigs {
|
||||||
val customSerializers = ConfigFactory.parseString(
|
val customSerializers = ConfigFactory.parseString(
|
||||||
|
|
@ -202,8 +204,7 @@ class MessageSerializerRemotingSpec extends AkkaSpec(remote.withFallback(customS
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterTermination() {
|
override def afterTermination() {
|
||||||
remoteSystem.shutdown()
|
Await.ready(remoteSystem.terminate(), Duration.Inf)
|
||||||
remoteSystem.awaitTermination()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"A message serializer" must {
|
"A message serializer" must {
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.remote
|
||||||
|
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
import java.util.concurrent.TimeoutException
|
import java.util.concurrent.TimeoutException
|
||||||
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
|
|
@ -76,8 +77,9 @@ abstract class RemoteDeploymentDeathWatchSpec
|
||||||
|
|
||||||
sleep()
|
sleep()
|
||||||
// if the remote deployed actor is not removed the system will not shutdown
|
// if the remote deployed actor is not removed the system will not shutdown
|
||||||
|
|
||||||
val timeout = remainingOrDefault
|
val timeout = remainingOrDefault
|
||||||
try system.awaitTermination(timeout) catch {
|
try Await.ready(system.whenTerminated, timeout) catch {
|
||||||
case _: TimeoutException ⇒
|
case _: TimeoutException ⇒
|
||||||
fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout,
|
fail("Failed to stop [%s] within [%s] \n%s".format(system.name, timeout,
|
||||||
system.asInstanceOf[ActorSystemImpl].printTree))
|
system.asInstanceOf[ActorSystemImpl].printTree))
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@
|
||||||
package akka.remote
|
package akka.remote
|
||||||
|
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
|
|
@ -40,7 +41,7 @@ object RemoteNodeRestartDeathWatchMultiJvmSpec extends MultiNodeConfig {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "shutdown" ⇒
|
case "shutdown" ⇒
|
||||||
sender() ! "shutdown-ack"
|
sender() ! "shutdown-ack"
|
||||||
context.system.shutdown()
|
context.system.terminate()
|
||||||
case msg ⇒ sender() ! msg
|
case msg ⇒ sender() ! msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -100,7 +101,7 @@ abstract class RemoteNodeRestartDeathWatchSpec
|
||||||
|
|
||||||
enterBarrier("watch-established")
|
enterBarrier("watch-established")
|
||||||
|
|
||||||
system.awaitTermination(30.seconds)
|
Await.ready(system.whenTerminated, 30.seconds)
|
||||||
|
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.netty.tcp {
|
akka.remote.netty.tcp {
|
||||||
|
|
@ -110,7 +111,7 @@ abstract class RemoteNodeRestartDeathWatchSpec
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
freshSystem.actorOf(Props[Subject], "subject")
|
freshSystem.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
freshSystem.awaitTermination(30.seconds)
|
Await.ready(freshSystem.whenTerminated, 30.seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
class Subject extends Actor {
|
class Subject extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "shutdown" ⇒ context.system.shutdown()
|
case "shutdown" ⇒ context.system.terminate()
|
||||||
case msg ⇒ sender() ! msg
|
case msg ⇒ sender() ! msg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -133,7 +133,7 @@ abstract class RemoteNodeShutdownAndComesBackSpec
|
||||||
|
|
||||||
enterBarrier("watch-established")
|
enterBarrier("watch-established")
|
||||||
|
|
||||||
system.awaitTermination(30.seconds)
|
Await.ready(system.whenTerminated, 30.seconds)
|
||||||
|
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.netty.tcp {
|
akka.remote.netty.tcp {
|
||||||
|
|
@ -143,7 +143,7 @@ abstract class RemoteNodeShutdownAndComesBackSpec
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
freshSystem.actorOf(Props[Subject], "subject")
|
freshSystem.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
freshSystem.awaitTermination(30.seconds)
|
Await.ready(freshSystem.whenTerminated, 30.seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,8 +32,8 @@ object RemoteQuarantinePiercingSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
class Subject extends Actor {
|
class Subject extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "shutdown" ⇒ context.system.shutdown()
|
case "shutdown" ⇒ context.system.terminate()
|
||||||
case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid, self)
|
case "identify" ⇒ sender() ! (AddressUidExtension(context.system).addressUid -> self)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -100,7 +100,7 @@ abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuaranti
|
||||||
|
|
||||||
enterBarrier("actor-identified")
|
enterBarrier("actor-identified")
|
||||||
|
|
||||||
system.awaitTermination(30.seconds)
|
Await.ready(system.whenTerminated, 30.seconds)
|
||||||
|
|
||||||
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
|
||||||
akka.remote.netty.tcp {
|
akka.remote.netty.tcp {
|
||||||
|
|
@ -110,7 +110,7 @@ abstract class RemoteQuarantinePiercingSpec extends MultiNodeSpec(RemoteQuaranti
|
||||||
""").withFallback(system.settings.config))
|
""").withFallback(system.settings.config))
|
||||||
freshSystem.actorOf(Props[Subject], "subject")
|
freshSystem.actorOf(Props[Subject], "subject")
|
||||||
|
|
||||||
freshSystem.awaitTermination(30.seconds)
|
Await.ready(freshSystem.whenTerminated, 30.seconds)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import akka.actor.Props
|
||||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction._
|
import akka.remote.transport.ThrottlerTransportAdapter.Direction._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor.ActorLogging
|
import akka.actor.ActorLogging
|
||||||
import akka.remote.testconductor.TestConductor
|
import akka.remote.testconductor.TestConductor
|
||||||
|
|
@ -124,7 +125,7 @@ abstract class RemoteReDeploymentMultiJvmSpec extends MultiNodeSpec(RemoteReDepl
|
||||||
var sys: ActorSystem = null
|
var sys: ActorSystem = null
|
||||||
|
|
||||||
runOn(second) {
|
runOn(second) {
|
||||||
system.awaitTermination(30.seconds)
|
Await.ready(system.whenTerminated, 30.seconds)
|
||||||
expectNoMsg(sleepAfterKill)
|
expectNoMsg(sleepAfterKill)
|
||||||
sys = startNewSystem()
|
sys = startNewSystem()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -135,7 +135,7 @@ private[akka] class RemoteActorRefProvider(
|
||||||
override def rootGuardian: InternalActorRef = local.rootGuardian
|
override def rootGuardian: InternalActorRef = local.rootGuardian
|
||||||
override def guardian: LocalActorRef = local.guardian
|
override def guardian: LocalActorRef = local.guardian
|
||||||
override def systemGuardian: LocalActorRef = local.systemGuardian
|
override def systemGuardian: LocalActorRef = local.systemGuardian
|
||||||
override def terminationFuture: Future[Unit] = local.terminationFuture
|
override def terminationFuture: Future[Terminated] = local.terminationFuture
|
||||||
override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path)
|
override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path)
|
||||||
override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path)
|
override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path)
|
||||||
override def tempPath(): ActorPath = local.tempPath()
|
override def tempPath(): ActorPath = local.tempPath()
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,6 @@
|
||||||
package docs.actor;
|
package docs.actor;
|
||||||
|
|
||||||
import akka.actor.*;
|
import akka.actor.*;
|
||||||
import akka.event.LoggingAdapter;
|
|
||||||
import akka.event.Logging;
|
|
||||||
import akka.japi.pf.ReceiveBuilder;
|
import akka.japi.pf.ReceiveBuilder;
|
||||||
import akka.testkit.ErrorFilter;
|
import akka.testkit.ErrorFilter;
|
||||||
import akka.testkit.EventFilter;
|
import akka.testkit.EventFilter;
|
||||||
|
|
@ -65,9 +63,8 @@ public class ActorDocTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void afterClass() {
|
public static void afterClass() throws Exception {
|
||||||
system.shutdown();
|
Await.ready(system.terminate(), Duration.create("5 seconds"));
|
||||||
system.awaitTermination(Duration.create("5 seconds"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static
|
static
|
||||||
|
|
@ -316,7 +313,7 @@ public class ActorDocTest {
|
||||||
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
|
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
|
||||||
swapper.tell(Swap, ActorRef.noSender()); // logs Hi
|
swapper.tell(Swap, ActorRef.noSender()); // logs Hi
|
||||||
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
|
swapper.tell(Swap, ActorRef.noSender()); // logs Ho
|
||||||
system.shutdown();
|
system.terminate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//#swapper
|
//#swapper
|
||||||
|
|
|
||||||
|
|
@ -10,11 +10,11 @@ import akka.testkit.JavaTestKit;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import scala.PartialFunction;
|
|
||||||
import scala.concurrent.duration.Duration;
|
import scala.concurrent.duration.Duration;
|
||||||
import scala.runtime.BoxedUnit;
|
import scala.concurrent.Await;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
public class InitializationDocTest {
|
public class InitializationDocTest {
|
||||||
|
|
||||||
|
|
@ -26,9 +26,8 @@ public class InitializationDocTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void afterClass() {
|
public static void afterClass() throws Exception {
|
||||||
system.shutdown();
|
Await.ready(system.terminate(), Duration.create("5 seconds"));
|
||||||
system.awaitTermination(Duration.create("5 seconds"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class MessageInitExample extends AbstractActor {
|
public static class MessageInitExample extends AbstractActor {
|
||||||
|
|
|
||||||
|
|
@ -19,11 +19,8 @@ import akka.util.Timeout;
|
||||||
import com.typesafe.config.Config;
|
import com.typesafe.config.Config;
|
||||||
import com.typesafe.config.ConfigFactory;
|
import com.typesafe.config.ConfigFactory;
|
||||||
import scala.concurrent.duration.Duration;
|
import scala.concurrent.duration.Duration;
|
||||||
import scala.PartialFunction;
|
|
||||||
import scala.runtime.BoxedUnit;
|
|
||||||
|
|
||||||
import static akka.japi.Util.classTag;
|
import static akka.japi.Util.classTag;
|
||||||
import static akka.actor.SupervisorStrategy.resume;
|
|
||||||
import static akka.actor.SupervisorStrategy.restart;
|
import static akka.actor.SupervisorStrategy.restart;
|
||||||
import static akka.actor.SupervisorStrategy.stop;
|
import static akka.actor.SupervisorStrategy.stop;
|
||||||
import static akka.actor.SupervisorStrategy.escalate;
|
import static akka.actor.SupervisorStrategy.escalate;
|
||||||
|
|
@ -79,13 +76,13 @@ public class FaultHandlingDocSample {
|
||||||
log().info("Current progress: {} %", progress.percent);
|
log().info("Current progress: {} %", progress.percent);
|
||||||
if (progress.percent >= 100.0) {
|
if (progress.percent >= 100.0) {
|
||||||
log().info("That's all, shutting down");
|
log().info("That's all, shutting down");
|
||||||
context().system().shutdown();
|
context().system().terminate();
|
||||||
}
|
}
|
||||||
}).
|
}).
|
||||||
matchEquals(ReceiveTimeout.getInstance(), x -> {
|
matchEquals(ReceiveTimeout.getInstance(), x -> {
|
||||||
// No progress within 15 seconds, ServiceUnavailable
|
// No progress within 15 seconds, ServiceUnavailable
|
||||||
log().error("Shutting down due to unavailable service");
|
log().error("Shutting down due to unavailable service");
|
||||||
context().system().shutdown();
|
context().system().terminate();
|
||||||
}).build(), context()
|
}).build(), context()
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,6 @@ public class HelloKernel implements Bootable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
system.shutdown();
|
system.terminate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,6 @@ class HelloKernel extends Bootable {
|
||||||
}
|
}
|
||||||
|
|
||||||
def shutdown = {
|
def shutdown = {
|
||||||
system.shutdown()
|
system.terminate()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -21,7 +21,7 @@ public class Main2 {
|
||||||
receive(ReceiveBuilder.
|
receive(ReceiveBuilder.
|
||||||
match(Terminated.class, t -> {
|
match(Terminated.class, t -> {
|
||||||
log().info("{} has terminated, shutting down system", ref.path());
|
log().info("{} has terminated, shutting down system", ref.path());
|
||||||
context().system().shutdown();
|
context().system().terminate();
|
||||||
}).build());
|
}).build());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ public class Main2 {
|
||||||
public void onReceive(Object msg) {
|
public void onReceive(Object msg) {
|
||||||
if (msg instanceof Terminated) {
|
if (msg instanceof Terminated) {
|
||||||
log.info("{} has terminated, shutting down system", ref.path());
|
log.info("{} has terminated, shutting down system", ref.path());
|
||||||
getContext().system().shutdown();
|
getContext().system().terminate();
|
||||||
} else {
|
} else {
|
||||||
unhandled(msg);
|
unhandled(msg);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,7 @@ object Main2 {
|
||||||
def receive = {
|
def receive = {
|
||||||
case Terminated(_) =>
|
case Terminated(_) =>
|
||||||
log.info("{} has terminated, shutting down system", ref.path)
|
log.info("{} has terminated, shutting down system", ref.path)
|
||||||
context.system.shutdown()
|
context.system.terminate()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -125,6 +125,6 @@ public class PersistentActorExample {
|
||||||
processor.tell("print", null);
|
processor.tell("print", null);
|
||||||
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
system.shutdown();
|
system.terminate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -88,6 +88,6 @@ public class SnapshotExample {
|
||||||
persistentActor.tell("print", null);
|
persistentActor.tell("print", null);
|
||||||
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
system.shutdown();
|
system.terminate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -127,6 +127,6 @@ public class PersistentActorExample {
|
||||||
persistentActor.tell("print", null);
|
persistentActor.tell("print", null);
|
||||||
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
system.shutdown();
|
system.terminate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -95,6 +95,6 @@ public class SnapshotExample {
|
||||||
persistentActor.tell("print", null);
|
persistentActor.tell("print", null);
|
||||||
|
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
system.shutdown();
|
system.terminate();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -56,5 +56,5 @@ object PersistentActorExample extends App {
|
||||||
persistentActor ! "print"
|
persistentActor ! "print"
|
||||||
|
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
system.shutdown()
|
system.terminate()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -44,5 +44,5 @@ object SnapshotExample extends App {
|
||||||
persistentActor ! "print"
|
persistentActor ! "print"
|
||||||
|
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
system.shutdown()
|
system.terminate()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ class Receiver extends Actor {
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case m: Echo => sender() ! m
|
case m: Echo => sender() ! m
|
||||||
case Shutdown => context.system.shutdown()
|
case Shutdown => context.system.terminate()
|
||||||
case _ =>
|
case _ =>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -104,7 +104,7 @@ class Sender(path: String, totalMessages: Int, burstSize: Int, payloadSize: Int)
|
||||||
|
|
||||||
case Terminated(`actor`) =>
|
case Terminated(`actor`) =>
|
||||||
println("Receiver terminated")
|
println("Receiver terminated")
|
||||||
context.system.shutdown()
|
context.system.terminate()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -33,7 +33,6 @@ public class Main {
|
||||||
Integer result = Await.result(ask(calculatorService, task, new Timeout(duration)).mapTo(classTag(Integer.class)), duration);
|
Integer result = Await.result(ask(calculatorService, task, new Timeout(duration)).mapTo(classTag(Integer.class)), duration);
|
||||||
System.out.println("Got result: " + result);
|
System.out.println("Got result: " + result);
|
||||||
|
|
||||||
system.shutdown();
|
Await.ready(system.terminate(), Duration.Inf());
|
||||||
system.awaitTermination();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package akka.testkit
|
||||||
import language.postfixOps
|
import language.postfixOps
|
||||||
import scala.annotation.{ varargs, tailrec }
|
import scala.annotation.{ varargs, tailrec }
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import java.util.concurrent._
|
import java.util.concurrent._
|
||||||
|
|
@ -705,6 +706,10 @@ trait TestKitBase {
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* } finally {
|
* } finally {
|
||||||
|
* system.terminate()
|
||||||
|
* }
|
||||||
|
*
|
||||||
|
* } finally {
|
||||||
* system.shutdown()
|
* system.shutdown()
|
||||||
* }
|
* }
|
||||||
* }
|
* }
|
||||||
|
|
@ -768,8 +773,8 @@ object TestKit {
|
||||||
def shutdownActorSystem(actorSystem: ActorSystem,
|
def shutdownActorSystem(actorSystem: ActorSystem,
|
||||||
duration: Duration = 10.seconds,
|
duration: Duration = 10.seconds,
|
||||||
verifySystemShutdown: Boolean = false): Unit = {
|
verifySystemShutdown: Boolean = false): Unit = {
|
||||||
actorSystem.shutdown()
|
actorSystem.terminate()
|
||||||
try actorSystem.awaitTermination(duration) catch {
|
try Await.ready(actorSystem.whenTerminated, duration) catch {
|
||||||
case _: TimeoutException ⇒
|
case _: TimeoutException ⇒
|
||||||
val msg = "Failed to stop [%s] within [%s] \n%s".format(actorSystem.name, duration,
|
val msg = "Failed to stop [%s] within [%s] \n%s".format(actorSystem.name, duration,
|
||||||
actorSystem.asInstanceOf[ActorSystemImpl].printTree)
|
actorSystem.asInstanceOf[ActorSystemImpl].printTree)
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ class DefaultTimeoutSpec
|
||||||
|
|
||||||
implicit lazy val system = ActorSystem("AkkaCustomSpec")
|
implicit lazy val system = ActorSystem("AkkaCustomSpec")
|
||||||
|
|
||||||
override def afterAll = system.shutdown
|
override def afterAll = system.terminate
|
||||||
|
|
||||||
"A spec with DefaultTimeout" should {
|
"A spec with DefaultTimeout" should {
|
||||||
"use timeout from settings" in {
|
"use timeout from settings" in {
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ class ImplicitSenderSpec
|
||||||
|
|
||||||
implicit lazy val system = ActorSystem("AkkaCustomSpec")
|
implicit lazy val system = ActorSystem("AkkaCustomSpec")
|
||||||
|
|
||||||
override def afterAll = system.shutdown
|
override def afterAll = system.terminate
|
||||||
|
|
||||||
"An ImplicitSender" should {
|
"An ImplicitSender" should {
|
||||||
"have testActor as its self" in {
|
"have testActor as its self" in {
|
||||||
|
|
|
||||||
|
|
@ -457,7 +457,7 @@ object AkkaBuild extends Build {
|
||||||
|var remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=akka.remote.RemoteActorRefProvider").withFallback(config)
|
|var remoteConfig = ConfigFactory.parseString("akka.remote.netty{port=0,use-dispatcher-for-io=akka.actor.default-dispatcher,execution-pool-size=0},akka.actor.provider=akka.remote.RemoteActorRefProvider").withFallback(config)
|
||||||
|var system: ActorSystem = null
|
|var system: ActorSystem = null
|
||||||
|implicit def _system = system
|
|implicit def _system = system
|
||||||
|def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.shutdown()!") }
|
|def startSystem(remoting: Boolean = false) { system = ActorSystem("repl", if(remoting) remoteConfig else config); println("don’t forget to system.terminate()!") }
|
||||||
|implicit def ec = system.dispatcher
|
|implicit def ec = system.dispatcher
|
||||||
|implicit val timeout = Timeout(5 seconds)
|
|implicit val timeout = Timeout(5 seconds)
|
||||||
|""".stripMargin,
|
|""".stripMargin,
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue