Merge pull request #780 from akka/wip-2513-pattern-schedule-tick-to-self-ban
Document how to schedule periodic messages from an actor to itself. #2513
This commit is contained in:
commit
4a4465e439
4 changed files with 352 additions and 2 deletions
191
akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java
Normal file
191
akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java
Normal file
|
|
@ -0,0 +1,191 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.pattern;
|
||||||
|
|
||||||
|
import akka.actor.*;
|
||||||
|
import akka.testkit.*;
|
||||||
|
import akka.testkit.TestEvent.Mute;
|
||||||
|
import akka.testkit.TestEvent.UnMute;
|
||||||
|
import org.junit.*;
|
||||||
|
import scala.concurrent.util.Duration;
|
||||||
|
import scala.concurrent.util.FiniteDuration;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class SchedulerPatternTest {
|
||||||
|
|
||||||
|
static ActorSystem system;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() {
|
||||||
|
system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf());
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() {
|
||||||
|
system.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
static
|
||||||
|
//#schedule-constructor
|
||||||
|
public class ScheduleInConstructor extends UntypedActor {
|
||||||
|
|
||||||
|
private final Cancellable tick = getContext().system().scheduler().schedule(
|
||||||
|
Duration.create(500, TimeUnit.MILLISECONDS),
|
||||||
|
Duration.create(1000, TimeUnit.MILLISECONDS),
|
||||||
|
getSelf(), "tick", getContext().dispatcher());
|
||||||
|
//#schedule-constructor
|
||||||
|
// this variable and constructor is declared here to not show up in the docs
|
||||||
|
final ActorRef target;
|
||||||
|
public ScheduleInConstructor(ActorRef target) {
|
||||||
|
this.target = target;
|
||||||
|
}
|
||||||
|
//#schedule-constructor
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void postStop() {
|
||||||
|
tick.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object message) throws Exception {
|
||||||
|
if (message.equals("tick")) {
|
||||||
|
// do something useful here
|
||||||
|
//#schedule-constructor
|
||||||
|
target.tell(message, getSelf());
|
||||||
|
//#schedule-constructor
|
||||||
|
}
|
||||||
|
//#schedule-constructor
|
||||||
|
else if (message.equals("restart")) {
|
||||||
|
throw new ArithmeticException();
|
||||||
|
}
|
||||||
|
//#schedule-constructor
|
||||||
|
else {
|
||||||
|
unhandled(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#schedule-constructor
|
||||||
|
|
||||||
|
static
|
||||||
|
//#schedule-receive
|
||||||
|
public class ScheduleInReceive extends UntypedActor {
|
||||||
|
//#schedule-receive
|
||||||
|
// this variable and constructor is declared here to not show up in the docs
|
||||||
|
final ActorRef target;
|
||||||
|
public ScheduleInReceive(ActorRef target) {
|
||||||
|
this.target = target;
|
||||||
|
}
|
||||||
|
//#schedule-receive
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preStart() {
|
||||||
|
getContext().system().scheduler().scheduleOnce(
|
||||||
|
Duration.create(500, TimeUnit.MILLISECONDS),
|
||||||
|
getSelf(), "tick", getContext().dispatcher());
|
||||||
|
}
|
||||||
|
|
||||||
|
// override postRestart so we don't call preStart and schedule a new message
|
||||||
|
@Override
|
||||||
|
public void postRestart(Throwable reason) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onReceive(Object message) throws Exception {
|
||||||
|
if (message.equals("tick")) {
|
||||||
|
// send another periodic tick after the specified delay
|
||||||
|
getContext().system().scheduler().scheduleOnce(
|
||||||
|
Duration.create(1000, TimeUnit.MILLISECONDS),
|
||||||
|
getSelf(), "tick", getContext().dispatcher());
|
||||||
|
// do something useful here
|
||||||
|
//#schedule-receive
|
||||||
|
target.tell(message, getSelf());
|
||||||
|
//#schedule-receive
|
||||||
|
}
|
||||||
|
//#schedule-receive
|
||||||
|
else if (message.equals("restart")) {
|
||||||
|
throw new ArithmeticException();
|
||||||
|
}
|
||||||
|
//#schedule-receive
|
||||||
|
else {
|
||||||
|
unhandled(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#schedule-receive
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore // no way to tag this as timing sensitive
|
||||||
|
public void scheduleInConstructor() {
|
||||||
|
new TestSchedule(system) {{
|
||||||
|
final JavaTestKit probe = new JavaTestKit(system);
|
||||||
|
|
||||||
|
final Props props = new Props(new UntypedActorFactory() {
|
||||||
|
public UntypedActor create() {
|
||||||
|
return new ScheduleInConstructor(probe.getRef());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
testSchedule(probe, props, duration("3000 millis"), duration("2000 millis"));
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@Ignore // no way to tag this as timing sensitive
|
||||||
|
public void scheduleInReceive() {
|
||||||
|
|
||||||
|
new TestSchedule(system) {{
|
||||||
|
final JavaTestKit probe = new JavaTestKit(system);
|
||||||
|
|
||||||
|
final Props props = new Props(new UntypedActorFactory() {
|
||||||
|
public UntypedActor create() {
|
||||||
|
return new ScheduleInReceive(probe.getRef());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
testSchedule(probe, props, duration("3000 millis"), duration("2500 millis"));
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class TestSchedule extends JavaTestKit {
|
||||||
|
private ActorSystem system;
|
||||||
|
|
||||||
|
public TestSchedule(ActorSystem system) {
|
||||||
|
super(system);
|
||||||
|
this.system = system;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSchedule(final JavaTestKit probe, Props props,
|
||||||
|
FiniteDuration startDuration,
|
||||||
|
FiniteDuration afterRestartDuration) {
|
||||||
|
Iterable<akka.testkit.EventFilter> filter =
|
||||||
|
Arrays.asList(new akka.testkit.EventFilter[]{
|
||||||
|
(akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)});
|
||||||
|
try {
|
||||||
|
system.eventStream().publish(new Mute(filter));
|
||||||
|
|
||||||
|
final ActorRef actor = system.actorOf(props);
|
||||||
|
new Within(startDuration) {
|
||||||
|
protected void run() {
|
||||||
|
probe.expectMsgEquals("tick");
|
||||||
|
probe.expectMsgEquals("tick");
|
||||||
|
probe.expectMsgEquals("tick");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
actor.tell("restart", getRef());
|
||||||
|
new Within(afterRestartDuration) {
|
||||||
|
protected void run() {
|
||||||
|
probe.expectMsgEquals("tick");
|
||||||
|
probe.expectMsgEquals("tick");
|
||||||
|
}
|
||||||
|
};
|
||||||
|
system.stop(actor);
|
||||||
|
}
|
||||||
|
finally {
|
||||||
|
system.eventStream().publish(new UnMute(filter));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -17,6 +17,37 @@ sense to add to the ``akka.pattern`` package for creating an `OTP-like library
|
||||||
You might find some of the patterns described in the Scala chapter of
|
You might find some of the patterns described in the Scala chapter of
|
||||||
:ref:`howto-scala` useful even though the example code is written in Scala.
|
:ref:`howto-scala` useful even though the example code is written in Scala.
|
||||||
|
|
||||||
|
Scheduling Periodic Messages
|
||||||
|
============================
|
||||||
|
|
||||||
|
This pattern describes how to schedule periodic messages to yourself in two different
|
||||||
|
ways.
|
||||||
|
|
||||||
|
The first way is to set up periodic message scheduling in the constructor of the actor,
|
||||||
|
and cancel that scheduled sending in ``postStop`` or else we might have multiple registered
|
||||||
|
message sends to the same actor.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
With this approach the scheduled periodic message send will be restarted with the actor on restarts.
|
||||||
|
This also means that the time period that elapses between two tick messages during a restart may drift
|
||||||
|
off based on when you restart the scheduled message sends relative to the time that the last message was
|
||||||
|
sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-constructor
|
||||||
|
|
||||||
|
The second variant sets up an initial one shot message send in the ``preStart`` method
|
||||||
|
of the actor, and the then the actor when it receives this message sets up a new one shot
|
||||||
|
message send. You also have to override ``postRestart`` so we don't call ``preStart``
|
||||||
|
and schedule the initial message send again.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
With this approach we won't fill up the mailbox with tick messages if the actor is
|
||||||
|
under pressure, but only schedule a new tick message when we have seen the previous one.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-receive
|
||||||
|
|
||||||
Template Pattern
|
Template Pattern
|
||||||
================
|
================
|
||||||
|
|
||||||
|
|
@ -33,4 +64,3 @@ This is an especially nice pattern, since it does even come with some empty exam
|
||||||
Spread the word: this is the easiest way to get famous!
|
Spread the word: this is the easiest way to get famous!
|
||||||
|
|
||||||
Please keep this pattern at the end of this file.
|
Please keep this pattern at the end of this file.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,99 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.pattern
|
||||||
|
|
||||||
|
import language.postfixOps
|
||||||
|
|
||||||
|
import akka.actor.{ Props, ActorRef, Actor }
|
||||||
|
import scala.concurrent.util.duration._
|
||||||
|
import scala.concurrent.util.{ FiniteDuration, Duration }
|
||||||
|
import akka.testkit.{ TimingTest, AkkaSpec, filterException }
|
||||||
|
import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor
|
||||||
|
|
||||||
|
object SchedulerPatternSpec {
|
||||||
|
//#schedule-constructor
|
||||||
|
class ScheduleInConstructor extends Actor {
|
||||||
|
val tick =
|
||||||
|
context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")
|
||||||
|
//#schedule-constructor
|
||||||
|
// this var and constructor is declared here to not show up in the docs
|
||||||
|
var target: ActorRef = null
|
||||||
|
def this(target: ActorRef) = { this(); this.target = target }
|
||||||
|
//#schedule-constructor
|
||||||
|
|
||||||
|
override def postStop() = tick.cancel()
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case "tick" ⇒
|
||||||
|
// do something useful here
|
||||||
|
//#schedule-constructor
|
||||||
|
target ! "tick"
|
||||||
|
case "restart" ⇒
|
||||||
|
throw new ArithmeticException
|
||||||
|
//#schedule-constructor
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#schedule-constructor
|
||||||
|
|
||||||
|
//#schedule-receive
|
||||||
|
class ScheduleInReceive extends Actor {
|
||||||
|
import context._
|
||||||
|
//#schedule-receive
|
||||||
|
// this var and constructor is declared here to not show up in the docs
|
||||||
|
var target: ActorRef = null
|
||||||
|
def this(target: ActorRef) = { this(); this.target = target }
|
||||||
|
//#schedule-receive
|
||||||
|
|
||||||
|
override def preStart() =
|
||||||
|
system.scheduler.scheduleOnce(500 millis, self, "tick")
|
||||||
|
|
||||||
|
// override postRestart so we don't call preStart and schedule a new message
|
||||||
|
override def postRestart(reason: Throwable) = {}
|
||||||
|
|
||||||
|
def receive = {
|
||||||
|
case "tick" ⇒
|
||||||
|
// send another periodic tick after the specified delay
|
||||||
|
system.scheduler.scheduleOnce(1000 millis, self, "tick")
|
||||||
|
// do something useful here
|
||||||
|
//#schedule-receive
|
||||||
|
target ! "tick"
|
||||||
|
case "restart" ⇒
|
||||||
|
throw new ArithmeticException
|
||||||
|
//#schedule-receive
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#schedule-receive
|
||||||
|
}
|
||||||
|
|
||||||
|
class SchedulerPatternSpec extends AkkaSpec {
|
||||||
|
|
||||||
|
def testSchedule(actor: ActorRef, startDuration: FiniteDuration,
|
||||||
|
afterRestartDuration: FiniteDuration) = {
|
||||||
|
|
||||||
|
filterException[ArithmeticException] {
|
||||||
|
within(startDuration) {
|
||||||
|
expectMsg("tick")
|
||||||
|
expectMsg("tick")
|
||||||
|
expectMsg("tick")
|
||||||
|
}
|
||||||
|
actor ! "restart"
|
||||||
|
within(afterRestartDuration) {
|
||||||
|
expectMsg("tick")
|
||||||
|
expectMsg("tick")
|
||||||
|
}
|
||||||
|
system.stop(actor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"send periodic ticks from the constructor" taggedAs TimingTest in {
|
||||||
|
testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))),
|
||||||
|
3000 millis, 2000 millis)
|
||||||
|
}
|
||||||
|
|
||||||
|
"send ticks from the preStart and receive" taggedAs TimingTest in {
|
||||||
|
testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))),
|
||||||
|
3000 millis, 2500 millis)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -111,6 +111,37 @@ This is where the Spider pattern comes in."
|
||||||
|
|
||||||
The pattern is described `Discovering Message Flows in Actor System with the Spider Pattern <http://letitcrash.com/post/30585282971/discovering-message-flows-in-actor-systems-with-the>`_.
|
The pattern is described `Discovering Message Flows in Actor System with the Spider Pattern <http://letitcrash.com/post/30585282971/discovering-message-flows-in-actor-systems-with-the>`_.
|
||||||
|
|
||||||
|
Scheduling Periodic Messages
|
||||||
|
============================
|
||||||
|
|
||||||
|
This pattern describes how to schedule periodic messages to yourself in two different
|
||||||
|
ways.
|
||||||
|
|
||||||
|
The first way is to set up periodic message scheduling in the constructor of the actor,
|
||||||
|
and cancel that scheduled sending in ``postStop`` or else we might have multiple registered
|
||||||
|
message sends to the same actor.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
With this approach the scheduled periodic message send will be restarted with the actor on restarts.
|
||||||
|
This also means that the time period that elapses between two tick messages during a restart may drift
|
||||||
|
off based on when you restart the scheduled message sends relative to the time that the last message was
|
||||||
|
sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-constructor
|
||||||
|
|
||||||
|
The second variant sets up an initial one shot message send in the ``preStart`` method
|
||||||
|
of the actor, and the then the actor when it receives this message sets up a new one shot
|
||||||
|
message send. You also have to override ``postRestart`` so we don't call ``preStart``
|
||||||
|
and schedule the initial message send again.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
With this approach we won't fill up the mailbox with tick messages if the actor is
|
||||||
|
under pressure, but only schedule a new tick message when we have seen the previous one.
|
||||||
|
|
||||||
|
.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-receive
|
||||||
|
|
||||||
Template Pattern
|
Template Pattern
|
||||||
================
|
================
|
||||||
|
|
||||||
|
|
@ -127,4 +158,3 @@ This is an especially nice pattern, since it does even come with some empty exam
|
||||||
Spread the word: this is the easiest way to get famous!
|
Spread the word: this is the easiest way to get famous!
|
||||||
|
|
||||||
Please keep this pattern at the end of this file.
|
Please keep this pattern at the end of this file.
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue