diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala new file mode 100644 index 0000000000..d450bbc090 --- /dev/null +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern + +import org.scalatest.junit.JUnitSuite + +class SchedulerPatternTest extends SchedulerPatternTestBase with JUnitSuite diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java new file mode 100644 index 0000000000..b2543bfb19 --- /dev/null +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java @@ -0,0 +1,189 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern; + +import akka.actor.*; +import akka.testkit.*; +import akka.testkit.TestEvent.Mute; +import akka.testkit.TestEvent.UnMute; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import scala.concurrent.util.Duration; +import scala.concurrent.util.FiniteDuration; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +public class SchedulerPatternTestBase { + + ActorSystem system; + + @Before + public void setUp() { + system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf()); + } + + @After + public void tearDown() { + system.shutdown(); + } + + static + //#schedule-constructor + public class ScheduleInConstructor extends UntypedActor { + + private final Cancellable tick = getContext().system().scheduler().schedule( + Duration.create(500, TimeUnit.MILLISECONDS), + Duration.create(1000, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + //#schedule-constructor + ActorRef target; + public ScheduleInConstructor(ActorRef target) { + this.target = target; + } + //#schedule-constructor + + @Override + public void postStop() { + tick.cancel(); + } + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("tick")) { + // do something useful here + //#schedule-constructor + target.tell(message, getSelf()); + //#schedule-constructor + } + //#schedule-constructor + else if (message.equals("restart")) { + throw new ArithmeticException(); + } + //#schedule-constructor + else { + unhandled(message); + } + } + } + //#schedule-constructor + + static + //#schedule-receive + public class ScheduleInReceive extends UntypedActor { + //#schedule-receive + ActorRef target; + public ScheduleInReceive(ActorRef target) { + this.target = target; + } + //#schedule-receive + + @Override + public void preStart() { + getContext().system().scheduler().scheduleOnce( + Duration.create(500, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + } + + // override postRestart so we don't call preStart and schedule a new message + @Override + public void postRestart(Throwable reason) { + } + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("tick")) { + // send another periodic tick after the specified delay + getContext().system().scheduler().scheduleOnce( + Duration.create(1000, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + // do something useful here + //#schedule-receive + target.tell(message, getSelf()); + //#schedule-receive + } + //#schedule-receive + else if (message.equals("restart")) { + throw new ArithmeticException(); + } + //#schedule-receive + else { + unhandled(message); + } + } + } + //#schedule-receive + + @Test + @Ignore // no way to tag this as timing sensitive + public void scheduleInConstructor() { + new TestSchedule(system) {{ + final JavaTestKit probe = new JavaTestKit(system); + + final Props props = new Props(new UntypedActorFactory() { + public UntypedActor create() { + return new ScheduleInConstructor(probe.getRef()); + } + }); + + testSchedule(probe, props, duration("3000 millis"), duration("2000 millis")); + }}; + } + + @Test + @Ignore // no way to tag this as timing sensitive + public void scheduleInReceive() { + + new TestSchedule(system) {{ + final JavaTestKit probe = new JavaTestKit(system); + + final Props props = new Props(new UntypedActorFactory() { + public UntypedActor create() { + return new ScheduleInReceive(probe.getRef()); + } + }); + + testSchedule(probe, props, duration("3000 millis"), duration("2500 millis")); + }}; + } + + public static class TestSchedule extends JavaTestKit { + private ActorSystem system; + + public TestSchedule(ActorSystem system) { + super(system); + this.system = system; + } + + public void testSchedule(final JavaTestKit probe, Props props, + FiniteDuration startDuration, + FiniteDuration afterRestartDuration) { + Iterable filter = + Arrays.asList(new akka.testkit.EventFilter[]{ + (akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)}); + system.eventStream().publish(new Mute(filter)); + + final ActorRef actor = system.actorOf(props); + new Within(startDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + actor.tell("restart", getRef()); + new Within(afterRestartDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + system.stop(actor); + + system.eventStream().publish(new UnMute(filter)); + } + } +} diff --git a/akka-docs/rst/java/howto.rst b/akka-docs/rst/java/howto.rst index 204d50e7dc..e1f9f1610b 100644 --- a/akka-docs/rst/java/howto.rst +++ b/akka-docs/rst/java/howto.rst @@ -17,6 +17,34 @@ sense to add to the ``akka.pattern`` package for creating an `OTP-like library You might find some of the patterns described in the Scala chapter of :ref:`howto-scala` useful even though the example code is written in Scala. +Scheduling Periodic Messages +============================ + +This pattern describes how to schedule periodic messages to yourself in two different +ways. + +The first way is to set up periodic message scheduling in the constructor of the actor, +and cancel that scheduled sending in ``postStop`` or else we might have multiple registered +message sends to the same actor. + +.. note:: + + With this approach the scheduler will be restarted with the actor on restarts. + +.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-constructor + +The second variant sets up an initial one shot message send in the ``preStart`` method +of the actor, and the then the actor when it receives this message sets up a new one shot +message send. You also have to override ``postRestart`` so we don't call ``preStart`` +and schedule the initial message send again. + +.. note:: + + With this approach we won't fill up the mailbox with tick messages if the actor is + under pressure, but only schedule a new tick message when we have seen the previous one. + +.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-receive + Template Pattern ================ @@ -33,4 +61,3 @@ This is an especially nice pattern, since it does even come with some empty exam Spread the word: this is the easiest way to get famous! Please keep this pattern at the end of this file. - diff --git a/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala new file mode 100644 index 0000000000..e79d94b2c5 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern + +import language.postfixOps + +import akka.actor.{ Props, ActorRef, Actor } +import scala.concurrent.util.duration._ +import scala.concurrent.util.{ FiniteDuration, Duration } +import akka.testkit.{ TimingTest, AkkaSpec, filterException } +import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor + +object SchedulerPatternSpec { + //#schedule-constructor + class ScheduleInConstructor extends Actor { + import context._ + + val tick = system.scheduler.schedule(500 millis, 1000 millis, self, "tick") + //#schedule-constructor + var target: ActorRef = null + def this(target: ActorRef) = { this(); this.target = target } + //#schedule-constructor + + override def postStop() = tick.cancel() + + def receive = { + case "tick" ⇒ + // do something useful here + //#schedule-constructor + target ! "tick" + case "restart" ⇒ + throw new ArithmeticException + //#schedule-constructor + } + } + //#schedule-constructor + + //#schedule-receive + class ScheduleInReceive extends Actor { + import context._ + //#schedule-receive + var target: ActorRef = null + def this(target: ActorRef) = { this(); this.target = target } + //#schedule-receive + + override def preStart() = + system.scheduler.scheduleOnce(500 millis, self, "tick") + + // override postRestart so we don't call preStart and schedule a new message + override def postRestart(reason: Throwable) = {} + + def receive = { + case "tick" ⇒ + // send another periodic tick after the specified delay + system.scheduler.scheduleOnce(1000 millis, self, "tick") + // do something useful here + //#schedule-receive + target ! "tick" + case "restart" ⇒ + throw new ArithmeticException + //#schedule-receive + } + } + //#schedule-receive +} + +class SchedulerPatternSpec extends AkkaSpec { + + def testSchedule(actor: ActorRef, startDuration: FiniteDuration, + afterRestartDuration: FiniteDuration) = { + + filterException[ArithmeticException] { + within(startDuration) { + expectMsg("tick") + expectMsg("tick") + expectMsg("tick") + } + actor ! "restart" + within(afterRestartDuration) { + expectMsg("tick") + expectMsg("tick") + } + system.stop(actor) + } + } + + "send periodic ticks from the constructor" taggedAs TimingTest in { + testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))), + 3000 millis, 2000 millis) + } + + "send ticks from the preStart and receive" taggedAs TimingTest in { + testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))), + 3000 millis, 2500 millis) + } +} diff --git a/akka-docs/rst/scala/howto.rst b/akka-docs/rst/scala/howto.rst index 7d064e2491..c5203adb1c 100644 --- a/akka-docs/rst/scala/howto.rst +++ b/akka-docs/rst/scala/howto.rst @@ -111,6 +111,34 @@ This is where the Spider pattern comes in." The pattern is described `Discovering Message Flows in Actor System with the Spider Pattern `_. +Scheduling Periodic Messages +============================ + +This pattern describes how to schedule periodic messages to yourself in two different +ways. + +The first way is to set up periodic message scheduling in the constructor of the actor, +and cancel that scheduled sending in ``postStop`` or else we might have multiple registered +message sends to the same actor. + +.. note:: + + With this approach the scheduler will be restarted with the actor on restarts. + +.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-constructor + +The second variant sets up an initial one shot message send in the ``preStart`` method +of the actor, and the then the actor when it receives this message sets up a new one shot +message send. You also have to override ``postRestart`` so we don't call ``preStart`` +and schedule the initial message send again. + +.. note:: + + With this approach we won't fill up the mailbox with tick messages if the actor is + under pressure, but only schedule a new tick message when we have seen the previous one. + +.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-receive + Template Pattern ================ @@ -127,4 +155,3 @@ This is an especially nice pattern, since it does even come with some empty exam Spread the word: this is the easiest way to get famous! Please keep this pattern at the end of this file. -