+act #3513 Clarify gracefulStop stopMessage, and add Java API
This commit is contained in:
parent
1c0e799370
commit
d506f6df4f
6 changed files with 105 additions and 7 deletions
|
|
@ -37,7 +37,7 @@ trait GracefulStopSupport {
|
||||||
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
|
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
|
||||||
* is completed with failure [[akka.pattern.AskTimeoutException]].
|
* is completed with failure [[akka.pattern.AskTimeoutException]].
|
||||||
*
|
*
|
||||||
* If you want to invoke specalized stopping logic on your target actor instead of PoisonPill, you can pass your
|
* If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your
|
||||||
* stop command as a parameter:
|
* stop command as a parameter:
|
||||||
* {{{
|
* {{{
|
||||||
* gracefulStop(someChild, timeout, MyStopGracefullyMessage).onComplete {
|
* gracefulStop(someChild, timeout, MyStopGracefullyMessage).onComplete {
|
||||||
|
|
|
||||||
|
|
@ -174,6 +174,22 @@ object Patterns {
|
||||||
def gracefulStop(target: ActorRef, timeout: FiniteDuration): Future[java.lang.Boolean] =
|
def gracefulStop(target: ActorRef, timeout: FiniteDuration): Future[java.lang.Boolean] =
|
||||||
scalaGracefulStop(target, timeout).asInstanceOf[Future[java.lang.Boolean]]
|
scalaGracefulStop(target, timeout).asInstanceOf[Future[java.lang.Boolean]]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a [[scala.concurrent.Future]] that will be completed with success (value `true`) when
|
||||||
|
* existing messages of the target actor has been processed and the actor has been
|
||||||
|
* terminated.
|
||||||
|
*
|
||||||
|
* Useful when you need to wait for termination or compose ordered termination of several actors.
|
||||||
|
*
|
||||||
|
* If you want to invoke specialized stopping logic on your target actor instead of PoisonPill, you can pass your
|
||||||
|
* stop command as `stopMessage` parameter
|
||||||
|
*
|
||||||
|
* If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]]
|
||||||
|
* is completed with failure [[akka.pattern.AskTimeoutException]].
|
||||||
|
*/
|
||||||
|
def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any): Future[java.lang.Boolean] =
|
||||||
|
scalaGracefulStop(target, timeout, stopMessage).asInstanceOf[Future[java.lang.Boolean]]
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
|
* Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable
|
||||||
* after the specified duration.
|
* after the specified duration.
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import static akka.pattern.Patterns.pipe;
|
||||||
import static akka.pattern.Patterns.gracefulStop;
|
import static akka.pattern.Patterns.gracefulStop;
|
||||||
//#import-gracefulStop
|
//#import-gracefulStop
|
||||||
|
|
||||||
|
import akka.actor.PoisonPill;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
@ -403,11 +403,11 @@ public class UntypedActorDocTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void usePatternsGracefulStop() throws Exception {
|
public void usePatternsGracefulStop() throws Exception {
|
||||||
ActorRef actorRef = system.actorOf(Props.create(MyUntypedActor.class));
|
ActorRef actorRef = system.actorOf(Props.create(Manager.class));
|
||||||
//#gracefulStop
|
//#gracefulStop
|
||||||
try {
|
try {
|
||||||
Future<Boolean> stopped =
|
Future<Boolean> stopped =
|
||||||
gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS));
|
gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), Manager.SHUTDOWN);
|
||||||
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
|
Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));
|
||||||
// the actor has been stopped
|
// the actor has been stopped
|
||||||
} catch (AskTimeoutException e) {
|
} catch (AskTimeoutException e) {
|
||||||
|
|
@ -592,6 +592,43 @@ public class UntypedActorDocTest {
|
||||||
return "Hi";
|
return "Hi";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static
|
||||||
|
//#gracefulStop-actor
|
||||||
|
public class Manager extends UntypedActor {
|
||||||
|
|
||||||
|
public static final String SHUTDOWN = "shutdown";
|
||||||
|
|
||||||
|
ActorRef worker = getContext().watch(getContext().actorOf(
|
||||||
|
Props.create(Cruncher.class), "worker"));
|
||||||
|
|
||||||
|
public void onReceive(Object message) {
|
||||||
|
if (message.equals("job")) {
|
||||||
|
worker.tell("crunch", getSelf());
|
||||||
|
} else if (message.equals(SHUTDOWN)) {
|
||||||
|
worker.tell(PoisonPill.getInstance(), getSelf());
|
||||||
|
getContext().become(shuttingDown);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Procedure<Object> shuttingDown = new Procedure<Object>() {
|
||||||
|
@Override
|
||||||
|
public void apply(Object message) {
|
||||||
|
if (message.equals("job")) {
|
||||||
|
getSender().tell("service unavailable, shutting down", getSelf());
|
||||||
|
} else if (message instanceof Terminated) {
|
||||||
|
getContext().stop(getSelf());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
//#gracefulStop-actor
|
||||||
|
|
||||||
|
static class Cruncher extends UntypedActor {
|
||||||
|
public void onReceive(Object message) {
|
||||||
|
// crunch...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static
|
static
|
||||||
//#hot-swap-actor
|
//#hot-swap-actor
|
||||||
|
|
|
||||||
|
|
@ -664,10 +664,18 @@ termination of several actors:
|
||||||
.. includecode:: code/docs/actor/UntypedActorDocTest.java
|
.. includecode:: code/docs/actor/UntypedActorDocTest.java
|
||||||
:include: gracefulStop
|
:include: gracefulStop
|
||||||
|
|
||||||
|
.. includecode:: code/docs/actor/UntypedActorDocTest.java
|
||||||
|
:include: gracefulStop-actor
|
||||||
|
|
||||||
When ``gracefulStop()`` returns successfully, the actor’s ``postStop()`` hook
|
When ``gracefulStop()`` returns successfully, the actor’s ``postStop()`` hook
|
||||||
will have been executed: there exists a happens-before edge between the end of
|
will have been executed: there exists a happens-before edge between the end of
|
||||||
``postStop()`` and the return of ``gracefulStop()``.
|
``postStop()`` and the return of ``gracefulStop()``.
|
||||||
|
|
||||||
|
In the above example a custom ``Manager.SHUTDOWN`` message is sent to the target
|
||||||
|
actor to initiate the process of stopping the actor. You can use ``PoisonPill`` for
|
||||||
|
this, but then you have limited possibilities to perform interactions with other actors
|
||||||
|
before stopping the target actor. Simple cleanup tasks can be handled in ``postStop``.
|
||||||
|
|
||||||
.. warning::
|
.. warning::
|
||||||
|
|
||||||
Keep in mind that an actor stopping and its name being deregistered are
|
Keep in mind that an actor stopping and its name being deregistered are
|
||||||
|
|
|
||||||
|
|
@ -706,10 +706,17 @@ termination of several actors:
|
||||||
|
|
||||||
.. includecode:: code/docs/actor/ActorDocSpec.scala#gracefulStop
|
.. includecode:: code/docs/actor/ActorDocSpec.scala#gracefulStop
|
||||||
|
|
||||||
|
.. includecode:: code/docs/actor/ActorDocSpec.scala#gracefulStop-actor
|
||||||
|
|
||||||
When ``gracefulStop()`` returns successfully, the actor’s ``postStop()`` hook
|
When ``gracefulStop()`` returns successfully, the actor’s ``postStop()`` hook
|
||||||
will have been executed: there exists a happens-before edge between the end of
|
will have been executed: there exists a happens-before edge between the end of
|
||||||
``postStop()`` and the return of ``gracefulStop()``.
|
``postStop()`` and the return of ``gracefulStop()``.
|
||||||
|
|
||||||
|
In the above example a custom ``Manager.Shutdown`` message is sent to the target
|
||||||
|
actor to initiate the process of stopping the actor. You can use ``PoisonPill`` for
|
||||||
|
this, but then you have limited possibilities to perform interactions with other actors
|
||||||
|
before stopping the target actor. Simple cleanup tasks can be handled in ``postStop``.
|
||||||
|
|
||||||
.. warning::
|
.. warning::
|
||||||
|
|
||||||
Keep in mind that an actor stopping and its name being deregistered are
|
Keep in mind that an actor stopping and its name being deregistered are
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ import akka.event.Logging
|
||||||
//#imports1
|
//#imports1
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import akka.actor.{ ActorRef, ActorSystem }
|
import akka.actor.{ ActorRef, ActorSystem, PoisonPill, Terminated }
|
||||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||||
import org.scalatest.Matchers
|
import org.scalatest.Matchers
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
|
|
@ -136,6 +136,36 @@ class ReplyException extends Actor {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//#gracefulStop-actor
|
||||||
|
object Manager {
|
||||||
|
case object Shutdown
|
||||||
|
}
|
||||||
|
|
||||||
|
class Manager extends Actor {
|
||||||
|
import Manager._
|
||||||
|
val worker = context.watch(context.actorOf(Props[Cruncher], "worker"))
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case "job" => worker ! "crunch"
|
||||||
|
case Shutdown =>
|
||||||
|
worker ! PoisonPill
|
||||||
|
context become shuttingDown
|
||||||
|
}
|
||||||
|
|
||||||
|
def shuttingDown: Receive = {
|
||||||
|
case "job" => sender() ! "service unavailable, shutting down"
|
||||||
|
case Terminated(`worker`) =>
|
||||||
|
context stop self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#gracefulStop-actor
|
||||||
|
|
||||||
|
class Cruncher extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case "crunch" => // crunch...
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//#swapper
|
//#swapper
|
||||||
case object Swap
|
case object Swap
|
||||||
class Swapper extends Actor {
|
class Swapper extends Actor {
|
||||||
|
|
@ -480,13 +510,13 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
|
||||||
}
|
}
|
||||||
|
|
||||||
"using pattern gracefulStop" in {
|
"using pattern gracefulStop" in {
|
||||||
val actorRef = system.actorOf(Props[MyActor])
|
val actorRef = system.actorOf(Props[Manager])
|
||||||
//#gracefulStop
|
//#gracefulStop
|
||||||
import akka.pattern.gracefulStop
|
import akka.pattern.gracefulStop
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
|
|
||||||
try {
|
try {
|
||||||
val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)
|
val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds, Manager.Shutdown)
|
||||||
Await.result(stopped, 6 seconds)
|
Await.result(stopped, 6 seconds)
|
||||||
// the actor has been stopped
|
// the actor has been stopped
|
||||||
} catch {
|
} catch {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue