Document how to schedule periodic messages from an actor to itself. #2513
This commit is contained in:
parent
89c1f66b1f
commit
99ad1e0eeb
5 changed files with 352 additions and 2 deletions
|
|
@ -0,0 +1,9 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package docs.pattern
|
||||
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
|
||||
class SchedulerPatternTest extends SchedulerPatternTestBase with JUnitSuite
|
||||
|
|
@ -0,0 +1,189 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package docs.pattern;
|
||||
|
||||
import akka.actor.*;
|
||||
import akka.testkit.*;
|
||||
import akka.testkit.TestEvent.Mute;
|
||||
import akka.testkit.TestEvent.UnMute;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.util.Duration;
|
||||
import scala.concurrent.util.FiniteDuration;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SchedulerPatternTestBase {
|
||||
|
||||
ActorSystem system;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
system.shutdown();
|
||||
}
|
||||
|
||||
static
|
||||
//#schedule-constructor
|
||||
public class ScheduleInConstructor extends UntypedActor {
|
||||
|
||||
private final Cancellable tick = getContext().system().scheduler().schedule(
|
||||
Duration.create(500, TimeUnit.MILLISECONDS),
|
||||
Duration.create(1000, TimeUnit.MILLISECONDS),
|
||||
getSelf(), "tick", getContext().system().dispatcher());
|
||||
//#schedule-constructor
|
||||
ActorRef target;
|
||||
public ScheduleInConstructor(ActorRef target) {
|
||||
this.target = target;
|
||||
}
|
||||
//#schedule-constructor
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
tick.cancel();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message.equals("tick")) {
|
||||
// do something useful here
|
||||
//#schedule-constructor
|
||||
target.tell(message, getSelf());
|
||||
//#schedule-constructor
|
||||
}
|
||||
//#schedule-constructor
|
||||
else if (message.equals("restart")) {
|
||||
throw new ArithmeticException();
|
||||
}
|
||||
//#schedule-constructor
|
||||
else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
//#schedule-constructor
|
||||
|
||||
static
|
||||
//#schedule-receive
|
||||
public class ScheduleInReceive extends UntypedActor {
|
||||
//#schedule-receive
|
||||
ActorRef target;
|
||||
public ScheduleInReceive(ActorRef target) {
|
||||
this.target = target;
|
||||
}
|
||||
//#schedule-receive
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
getContext().system().scheduler().scheduleOnce(
|
||||
Duration.create(500, TimeUnit.MILLISECONDS),
|
||||
getSelf(), "tick", getContext().system().dispatcher());
|
||||
}
|
||||
|
||||
// override postRestart so we don't call preStart and schedule a new message
|
||||
@Override
|
||||
public void postRestart(Throwable reason) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message.equals("tick")) {
|
||||
// send another periodic tick after the specified delay
|
||||
getContext().system().scheduler().scheduleOnce(
|
||||
Duration.create(1000, TimeUnit.MILLISECONDS),
|
||||
getSelf(), "tick", getContext().system().dispatcher());
|
||||
// do something useful here
|
||||
//#schedule-receive
|
||||
target.tell(message, getSelf());
|
||||
//#schedule-receive
|
||||
}
|
||||
//#schedule-receive
|
||||
else if (message.equals("restart")) {
|
||||
throw new ArithmeticException();
|
||||
}
|
||||
//#schedule-receive
|
||||
else {
|
||||
unhandled(message);
|
||||
}
|
||||
}
|
||||
}
|
||||
//#schedule-receive
|
||||
|
||||
@Test
|
||||
@Ignore // no way to tag this as timing sensitive
|
||||
public void scheduleInConstructor() {
|
||||
new TestSchedule(system) {{
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
|
||||
final Props props = new Props(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new ScheduleInConstructor(probe.getRef());
|
||||
}
|
||||
});
|
||||
|
||||
testSchedule(probe, props, duration("3000 millis"), duration("2000 millis"));
|
||||
}};
|
||||
}
|
||||
|
||||
@Test
|
||||
@Ignore // no way to tag this as timing sensitive
|
||||
public void scheduleInReceive() {
|
||||
|
||||
new TestSchedule(system) {{
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
|
||||
final Props props = new Props(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new ScheduleInReceive(probe.getRef());
|
||||
}
|
||||
});
|
||||
|
||||
testSchedule(probe, props, duration("3000 millis"), duration("2500 millis"));
|
||||
}};
|
||||
}
|
||||
|
||||
public static class TestSchedule extends JavaTestKit {
|
||||
private ActorSystem system;
|
||||
|
||||
public TestSchedule(ActorSystem system) {
|
||||
super(system);
|
||||
this.system = system;
|
||||
}
|
||||
|
||||
public void testSchedule(final JavaTestKit probe, Props props,
|
||||
FiniteDuration startDuration,
|
||||
FiniteDuration afterRestartDuration) {
|
||||
Iterable<akka.testkit.EventFilter> filter =
|
||||
Arrays.asList(new akka.testkit.EventFilter[]{
|
||||
(akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)});
|
||||
system.eventStream().publish(new Mute(filter));
|
||||
|
||||
final ActorRef actor = system.actorOf(props);
|
||||
new Within(startDuration) {
|
||||
protected void run() {
|
||||
probe.expectMsgEquals("tick");
|
||||
probe.expectMsgEquals("tick");
|
||||
probe.expectMsgEquals("tick");
|
||||
}
|
||||
};
|
||||
actor.tell("restart", getRef());
|
||||
new Within(afterRestartDuration) {
|
||||
protected void run() {
|
||||
probe.expectMsgEquals("tick");
|
||||
probe.expectMsgEquals("tick");
|
||||
}
|
||||
};
|
||||
system.stop(actor);
|
||||
|
||||
system.eventStream().publish(new UnMute(filter));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -17,6 +17,34 @@ sense to add to the ``akka.pattern`` package for creating an `OTP-like library
|
|||
You might find some of the patterns described in the Scala chapter of
|
||||
:ref:`howto-scala` useful even though the example code is written in Scala.
|
||||
|
||||
Scheduling Periodic Messages
|
||||
============================
|
||||
|
||||
This pattern describes how to schedule periodic messages to yourself in two different
|
||||
ways.
|
||||
|
||||
The first way is to set up periodic message scheduling in the constructor of the actor,
|
||||
and cancel that scheduled sending in ``postStop`` or else we might have multiple registered
|
||||
message sends to the same actor.
|
||||
|
||||
.. note::
|
||||
|
||||
With this approach the scheduler will be restarted with the actor on restarts.
|
||||
|
||||
.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-constructor
|
||||
|
||||
The second variant sets up an initial one shot message send in the ``preStart`` method
|
||||
of the actor, and the then the actor when it receives this message sets up a new one shot
|
||||
message send. You also have to override ``postRestart`` so we don't call ``preStart``
|
||||
and schedule the initial message send again.
|
||||
|
||||
.. note::
|
||||
|
||||
With this approach we won't fill up the mailbox with tick messages if the actor is
|
||||
under pressure, but only schedule a new tick message when we have seen the previous one.
|
||||
|
||||
.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-receive
|
||||
|
||||
Template Pattern
|
||||
================
|
||||
|
||||
|
|
@ -33,4 +61,3 @@ This is an especially nice pattern, since it does even come with some empty exam
|
|||
Spread the word: this is the easiest way to get famous!
|
||||
|
||||
Please keep this pattern at the end of this file.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue