Merge pull request #24346 from akka/wip-22776-gracefulStop-patriknw
=typ Examples how to stop actor with cleanup (#22776)
This commit is contained in:
commit
1d5b913f7f
6 changed files with 337 additions and 9 deletions
|
|
@ -0,0 +1,104 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package jdocs.akka.typed;
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorSystem;
|
||||||
|
import akka.actor.typed.Behavior;
|
||||||
|
import akka.actor.typed.PostStop;
|
||||||
|
import akka.actor.typed.javadsl.Behaviors;
|
||||||
|
import scala.concurrent.Await;
|
||||||
|
import scala.concurrent.duration.Duration;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
public class GracefulStopDocTest {
|
||||||
|
|
||||||
|
//#master-actor
|
||||||
|
|
||||||
|
public abstract static class JobControl {
|
||||||
|
// no instances of this class, it's only a name space for messages
|
||||||
|
// and static methods
|
||||||
|
private JobControl() {
|
||||||
|
}
|
||||||
|
|
||||||
|
static interface JobControlLanguage {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class SpawnJob implements JobControlLanguage {
|
||||||
|
public final String name;
|
||||||
|
|
||||||
|
public SpawnJob(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final class GracefulShutdown implements JobControlLanguage {
|
||||||
|
|
||||||
|
public GracefulShutdown() {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static final Behavior<JobControlLanguage> mcpa = Behaviors.immutable(JobControlLanguage.class)
|
||||||
|
.onMessage(SpawnJob.class, (ctx, msg) -> {
|
||||||
|
ctx.getSystem().log().info("Spawning job {}!", msg.name);
|
||||||
|
ctx.spawn(Job.job(msg.name), msg.name);
|
||||||
|
return Behaviors.same();
|
||||||
|
})
|
||||||
|
.onSignal(PostStop.class, (ctx, signal) -> {
|
||||||
|
ctx.getSystem().log().info("Master Control Programme stopped");
|
||||||
|
return Behaviors.same();
|
||||||
|
})
|
||||||
|
.onMessage(GracefulShutdown.class, (ctx, msg) -> {
|
||||||
|
ctx.getSystem().log().info("Initiating graceful shutdown...");
|
||||||
|
|
||||||
|
// perform graceful stop, executing cleanup before final system termination
|
||||||
|
// behavior executing cleanup is passed as a parameter to Actor.stopped
|
||||||
|
return Behaviors.stopped(Behaviors.onSignal((context, PostStop) -> {
|
||||||
|
context.getSystem().log().info("Cleanup!");
|
||||||
|
return Behaviors.same();
|
||||||
|
}));
|
||||||
|
})
|
||||||
|
.onSignal(PostStop.class, (ctx, signal) -> {
|
||||||
|
ctx.getSystem().log().info("Master Control Programme stopped");
|
||||||
|
return Behaviors.same();
|
||||||
|
})
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
//#master-actor
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
//#graceful-shutdown
|
||||||
|
|
||||||
|
final ActorSystem<JobControl.JobControlLanguage> system =
|
||||||
|
ActorSystem.create(JobControl.mcpa, "B6700");
|
||||||
|
|
||||||
|
system.tell(new JobControl.SpawnJob("a"));
|
||||||
|
system.tell(new JobControl.SpawnJob("b"));
|
||||||
|
|
||||||
|
// sleep here to allow time for the new actors to be started
|
||||||
|
Thread.sleep(100);
|
||||||
|
|
||||||
|
system.tell(new JobControl.GracefulShutdown());
|
||||||
|
|
||||||
|
Await.result(system.whenTerminated(), Duration.create(3, TimeUnit.SECONDS));
|
||||||
|
//#graceful-shutdown
|
||||||
|
}
|
||||||
|
|
||||||
|
//#worker-actor
|
||||||
|
|
||||||
|
public static class Job {
|
||||||
|
public static Behavior<JobControl.JobControlLanguage> job(String name) {
|
||||||
|
return Behaviors.onSignal((ctx, PostStop) -> {
|
||||||
|
ctx.getSystem().log().info("Worker {} stopped", name);
|
||||||
|
return Behaviors.same();
|
||||||
|
});
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#worker-actor
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,70 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.actor.typed
|
||||||
|
package scaladsl
|
||||||
|
|
||||||
|
import akka.Done
|
||||||
|
import akka.NotUsed
|
||||||
|
import akka.testkit.typed.TestKit
|
||||||
|
import akka.testkit.typed.scaladsl.TestProbe
|
||||||
|
|
||||||
|
final class GracefulStopSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
||||||
|
|
||||||
|
"Graceful stop" must {
|
||||||
|
|
||||||
|
"properly stop the children and perform the cleanup" in {
|
||||||
|
val probe = TestProbe[String]("probe")
|
||||||
|
|
||||||
|
val behavior =
|
||||||
|
Behaviors.deferred[akka.NotUsed] { context ⇒
|
||||||
|
val c1 = context.spawn[NotUsed](Behaviors.onSignal {
|
||||||
|
case (_, PostStop) ⇒
|
||||||
|
probe.ref ! "child-done"
|
||||||
|
Behaviors.stopped
|
||||||
|
}, "child1")
|
||||||
|
|
||||||
|
val c2 = context.spawn[NotUsed](Behaviors.onSignal {
|
||||||
|
case (_, PostStop) ⇒
|
||||||
|
probe.ref ! "child-done"
|
||||||
|
Behaviors.stopped
|
||||||
|
}, "child2")
|
||||||
|
|
||||||
|
Behaviors.stopped {
|
||||||
|
Behaviors.onSignal {
|
||||||
|
case (ctx, PostStop) ⇒
|
||||||
|
// cleanup function body
|
||||||
|
probe.ref ! "parent-done"
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spawn(behavior)
|
||||||
|
probe.expectMsg("child-done")
|
||||||
|
probe.expectMsg("child-done")
|
||||||
|
probe.expectMsg("parent-done")
|
||||||
|
}
|
||||||
|
|
||||||
|
"properly perform the cleanup and stop itself for no children case" in {
|
||||||
|
val probe = TestProbe[Done]("probe")
|
||||||
|
|
||||||
|
val behavior =
|
||||||
|
Behaviors.deferred[akka.NotUsed] { context ⇒
|
||||||
|
// do not spawn any children
|
||||||
|
Behaviors.stopped {
|
||||||
|
Behaviors.onSignal {
|
||||||
|
case (ctx, PostStop) ⇒
|
||||||
|
// cleanup function body
|
||||||
|
probe.ref ! Done
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spawn(behavior)
|
||||||
|
probe.expectMsg(Done)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -4,11 +4,9 @@
|
||||||
package akka.actor.typed
|
package akka.actor.typed
|
||||||
package scaladsl
|
package scaladsl
|
||||||
|
|
||||||
import akka.testkit.typed.{ BehaviorTestkit, TestKit, TestKitSettings }
|
import akka.testkit.typed.TestKit
|
||||||
import akka.testkit.typed.scaladsl.TestProbe
|
import akka.testkit.typed.scaladsl.TestProbe
|
||||||
|
|
||||||
import scala.concurrent.duration.DurationInt
|
|
||||||
|
|
||||||
class ImmutablePartialSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
class ImmutablePartialSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
||||||
|
|
||||||
"An immutable partial" must {
|
"An immutable partial" must {
|
||||||
|
|
@ -21,14 +19,12 @@ class ImmutablePartialSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
||||||
probe.ref ! Command2
|
probe.ref ! Command2
|
||||||
Behaviors.same
|
Behaviors.same
|
||||||
}
|
}
|
||||||
val testkit = BehaviorTestkit(behavior)
|
val actor = spawn(behavior)
|
||||||
|
|
||||||
testkit.run(Command1)
|
actor ! Command1
|
||||||
testkit.currentBehavior shouldBe behavior
|
|
||||||
probe.expectNoMessage()
|
probe.expectNoMessage()
|
||||||
|
|
||||||
testkit.run(Command2)
|
actor ! Command2
|
||||||
testkit.currentBehavior shouldBe behavior
|
|
||||||
probe.expectMsg(Command2)
|
probe.expectMsg(Command2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,119 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package docs.akka.typed
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
import akka.actor.typed.ActorSystem
|
||||||
|
import akka.actor.typed.PostStop
|
||||||
|
import akka.actor.typed.scaladsl.Behaviors
|
||||||
|
import akka.event.LoggingAdapter
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
import akka.actor.typed.TypedAkkaSpecWithShutdown
|
||||||
|
import akka.testkit.typed.TestKit
|
||||||
|
|
||||||
|
object GracefulStopDocSpec {
|
||||||
|
|
||||||
|
//#master-actor
|
||||||
|
|
||||||
|
object MasterControlProgramActor {
|
||||||
|
sealed trait JobControlLanguage
|
||||||
|
final case class SpawnJob(name: String) extends JobControlLanguage
|
||||||
|
final case object GracefulShutdown extends JobControlLanguage
|
||||||
|
|
||||||
|
// Predefined cleanup operation
|
||||||
|
def cleanup(log: LoggingAdapter): Unit = log.info("Cleaning up!")
|
||||||
|
|
||||||
|
val mcpa = Behaviors.immutable[JobControlLanguage] { (ctx, msg) ⇒
|
||||||
|
msg match {
|
||||||
|
case SpawnJob(jobName) ⇒
|
||||||
|
ctx.system.log.info("Spawning job {}!", jobName)
|
||||||
|
ctx.spawn(Job.job(jobName), name = jobName)
|
||||||
|
Behaviors.same
|
||||||
|
case GracefulShutdown ⇒
|
||||||
|
ctx.system.log.info("Initiating graceful shutdown...")
|
||||||
|
// perform graceful stop, executing cleanup before final system termination
|
||||||
|
// behavior executing cleanup is passed as a parameter to Actor.stopped
|
||||||
|
Behaviors.stopped {
|
||||||
|
Behaviors.onSignal {
|
||||||
|
case (context, PostStop) ⇒
|
||||||
|
cleanup(context.system.log)
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}.onSignal {
|
||||||
|
case (ctx, PostStop) ⇒
|
||||||
|
ctx.system.log.info("MCPA stopped")
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#master-actor
|
||||||
|
|
||||||
|
//#worker-actor
|
||||||
|
|
||||||
|
object Job {
|
||||||
|
import GracefulStopDocSpec.MasterControlProgramActor.JobControlLanguage
|
||||||
|
|
||||||
|
def job(name: String) = Behaviors.onSignal[JobControlLanguage] {
|
||||||
|
case (ctx, PostStop) ⇒
|
||||||
|
ctx.system.log.info("Worker {} stopped", name)
|
||||||
|
Behaviors.same
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#worker-actor
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class GracefulStopDocSpec extends TestKit with TypedAkkaSpecWithShutdown {
|
||||||
|
|
||||||
|
import GracefulStopDocSpec._
|
||||||
|
|
||||||
|
"Graceful stop example" must {
|
||||||
|
|
||||||
|
"start some workers" in {
|
||||||
|
//#start-workers
|
||||||
|
import MasterControlProgramActor._
|
||||||
|
|
||||||
|
val system: ActorSystem[JobControlLanguage] = ActorSystem(mcpa, "B6700")
|
||||||
|
|
||||||
|
system ! SpawnJob("a")
|
||||||
|
system ! SpawnJob("b")
|
||||||
|
|
||||||
|
// sleep here to allow time for the new actors to be started
|
||||||
|
Thread.sleep(100)
|
||||||
|
|
||||||
|
// brutally stop the system
|
||||||
|
system.terminate()
|
||||||
|
|
||||||
|
Await.result(system.whenTerminated, 3.seconds)
|
||||||
|
//#start-workers
|
||||||
|
}
|
||||||
|
|
||||||
|
"gracefully stop workers and master" in {
|
||||||
|
//#graceful-shutdown
|
||||||
|
|
||||||
|
import MasterControlProgramActor._
|
||||||
|
|
||||||
|
val system: ActorSystem[JobControlLanguage] = ActorSystem(mcpa, "B7700")
|
||||||
|
|
||||||
|
system ! SpawnJob("a")
|
||||||
|
system ! SpawnJob("b")
|
||||||
|
|
||||||
|
Thread.sleep(100)
|
||||||
|
|
||||||
|
// gracefully stop the system
|
||||||
|
system ! GracefulShutdown
|
||||||
|
|
||||||
|
Thread.sleep(100)
|
||||||
|
|
||||||
|
Await.result(system.whenTerminated, 3.seconds)
|
||||||
|
//#graceful-shutdown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
38
akka-docs/src/main/paradox/actor-lifecycle-typed.md
Normal file
38
akka-docs/src/main/paradox/actor-lifecycle-typed.md
Normal file
|
|
@ -0,0 +1,38 @@
|
||||||
|
# Actor lifecycle
|
||||||
|
|
||||||
|
TODO intro
|
||||||
|
|
||||||
|
## Creating Actors
|
||||||
|
|
||||||
|
TODO
|
||||||
|
|
||||||
|
## Stopping Actors
|
||||||
|
|
||||||
|
An actor can stop itself by returning `Behaviors.stopped` as the next behavior.
|
||||||
|
|
||||||
|
Child actors can be forced to be stopped after it finishes processing its current message by using the
|
||||||
|
`stop` method of the `ActorContext` from the parent actor. Only child actors can be stopped in that way.
|
||||||
|
|
||||||
|
The child actors will be stopped as part of the shutdown procedure of the parent.
|
||||||
|
|
||||||
|
The `PostStop` signal that results from stopping an actor can be used for cleaning up resources. Note that
|
||||||
|
a behavior that handles such `PostStop` signal can optionally be defined as a parameter to `Behaviors.stopped`
|
||||||
|
if different actions is needed when the actor gracefully stops itself from when it is stopped abruptly.
|
||||||
|
|
||||||
|
Here is an illustrating example:
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [IntroSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/docs/akka/typed/GracefulStopDocSpec.scala) {
|
||||||
|
#imports
|
||||||
|
#master-actor
|
||||||
|
#worker-actor
|
||||||
|
#graceful-shutdown
|
||||||
|
}
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [IntroSpec.scala]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/GracefulStopDocTest.java) {
|
||||||
|
#imports
|
||||||
|
#master-actor
|
||||||
|
#worker-actor
|
||||||
|
#graceful-shutdown
|
||||||
|
}
|
||||||
|
|
@ -6,6 +6,7 @@
|
||||||
|
|
||||||
* [actors](actors-typed.md)
|
* [actors](actors-typed.md)
|
||||||
* [coexisting](coexisting.md)
|
* [coexisting](coexisting.md)
|
||||||
|
* [actor-lifecycle](actor-lifecycle-typed.md)
|
||||||
* [fault-tolerance](fault-tolerance-typed.md)
|
* [fault-tolerance](fault-tolerance-typed.md)
|
||||||
* [actor-discovery](actor-discovery-typed.md)
|
* [actor-discovery](actor-discovery-typed.md)
|
||||||
* [cluster](cluster-typed.md)
|
* [cluster](cluster-typed.md)
|
||||||
|
|
@ -13,4 +14,4 @@
|
||||||
* [persistence](persistence-typed.md)
|
* [persistence](persistence-typed.md)
|
||||||
* [testing](testing-typed.md)
|
* [testing](testing-typed.md)
|
||||||
|
|
||||||
@@@
|
@@@
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue