From 99ad1e0eeb545ab87f3e2bc876b169bd5e4da9f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Fri, 5 Oct 2012 15:15:17 +0200 Subject: [PATCH 1/2] Document how to schedule periodic messages from an actor to itself. #2513 --- .../docs/pattern/SchedulerPatternTest.scala | 9 + .../pattern/SchedulerPatternTestBase.java | 189 ++++++++++++++++++ akka-docs/rst/java/howto.rst | 29 ++- .../docs/pattern/SchedulerPatternSpec.scala | 98 +++++++++ akka-docs/rst/scala/howto.rst | 29 ++- 5 files changed, 352 insertions(+), 2 deletions(-) create mode 100644 akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala create mode 100644 akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java create mode 100644 akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala new file mode 100644 index 0000000000..d450bbc090 --- /dev/null +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern + +import org.scalatest.junit.JUnitSuite + +class SchedulerPatternTest extends SchedulerPatternTestBase with JUnitSuite diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java new file mode 100644 index 0000000000..b2543bfb19 --- /dev/null +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java @@ -0,0 +1,189 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern; + +import akka.actor.*; +import akka.testkit.*; +import akka.testkit.TestEvent.Mute; +import akka.testkit.TestEvent.UnMute; +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import scala.concurrent.util.Duration; +import scala.concurrent.util.FiniteDuration; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +public class SchedulerPatternTestBase { + + ActorSystem system; + + @Before + public void setUp() { + system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf()); + } + + @After + public void tearDown() { + system.shutdown(); + } + + static + //#schedule-constructor + public class ScheduleInConstructor extends UntypedActor { + + private final Cancellable tick = getContext().system().scheduler().schedule( + Duration.create(500, TimeUnit.MILLISECONDS), + Duration.create(1000, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + //#schedule-constructor + ActorRef target; + public ScheduleInConstructor(ActorRef target) { + this.target = target; + } + //#schedule-constructor + + @Override + public void postStop() { + tick.cancel(); + } + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("tick")) { + // do something useful here + //#schedule-constructor + target.tell(message, getSelf()); + //#schedule-constructor + } + //#schedule-constructor + else if (message.equals("restart")) { + throw new ArithmeticException(); + } + //#schedule-constructor + else { + unhandled(message); + } + } + } + //#schedule-constructor + + static + //#schedule-receive + public class ScheduleInReceive extends UntypedActor { + //#schedule-receive + ActorRef target; + public ScheduleInReceive(ActorRef target) { + this.target = target; + } + //#schedule-receive + + @Override + public void preStart() { + getContext().system().scheduler().scheduleOnce( + Duration.create(500, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + } + + // override postRestart so we don't call preStart and schedule a new message + @Override + public void postRestart(Throwable reason) { + } + + @Override + public void onReceive(Object message) throws Exception { + if (message.equals("tick")) { + // send another periodic tick after the specified delay + getContext().system().scheduler().scheduleOnce( + Duration.create(1000, TimeUnit.MILLISECONDS), + getSelf(), "tick", getContext().system().dispatcher()); + // do something useful here + //#schedule-receive + target.tell(message, getSelf()); + //#schedule-receive + } + //#schedule-receive + else if (message.equals("restart")) { + throw new ArithmeticException(); + } + //#schedule-receive + else { + unhandled(message); + } + } + } + //#schedule-receive + + @Test + @Ignore // no way to tag this as timing sensitive + public void scheduleInConstructor() { + new TestSchedule(system) {{ + final JavaTestKit probe = new JavaTestKit(system); + + final Props props = new Props(new UntypedActorFactory() { + public UntypedActor create() { + return new ScheduleInConstructor(probe.getRef()); + } + }); + + testSchedule(probe, props, duration("3000 millis"), duration("2000 millis")); + }}; + } + + @Test + @Ignore // no way to tag this as timing sensitive + public void scheduleInReceive() { + + new TestSchedule(system) {{ + final JavaTestKit probe = new JavaTestKit(system); + + final Props props = new Props(new UntypedActorFactory() { + public UntypedActor create() { + return new ScheduleInReceive(probe.getRef()); + } + }); + + testSchedule(probe, props, duration("3000 millis"), duration("2500 millis")); + }}; + } + + public static class TestSchedule extends JavaTestKit { + private ActorSystem system; + + public TestSchedule(ActorSystem system) { + super(system); + this.system = system; + } + + public void testSchedule(final JavaTestKit probe, Props props, + FiniteDuration startDuration, + FiniteDuration afterRestartDuration) { + Iterable filter = + Arrays.asList(new akka.testkit.EventFilter[]{ + (akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)}); + system.eventStream().publish(new Mute(filter)); + + final ActorRef actor = system.actorOf(props); + new Within(startDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + actor.tell("restart", getRef()); + new Within(afterRestartDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + system.stop(actor); + + system.eventStream().publish(new UnMute(filter)); + } + } +} diff --git a/akka-docs/rst/java/howto.rst b/akka-docs/rst/java/howto.rst index 204d50e7dc..e1f9f1610b 100644 --- a/akka-docs/rst/java/howto.rst +++ b/akka-docs/rst/java/howto.rst @@ -17,6 +17,34 @@ sense to add to the ``akka.pattern`` package for creating an `OTP-like library You might find some of the patterns described in the Scala chapter of :ref:`howto-scala` useful even though the example code is written in Scala. +Scheduling Periodic Messages +============================ + +This pattern describes how to schedule periodic messages to yourself in two different +ways. + +The first way is to set up periodic message scheduling in the constructor of the actor, +and cancel that scheduled sending in ``postStop`` or else we might have multiple registered +message sends to the same actor. + +.. note:: + + With this approach the scheduler will be restarted with the actor on restarts. + +.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-constructor + +The second variant sets up an initial one shot message send in the ``preStart`` method +of the actor, and the then the actor when it receives this message sets up a new one shot +message send. You also have to override ``postRestart`` so we don't call ``preStart`` +and schedule the initial message send again. + +.. note:: + + With this approach we won't fill up the mailbox with tick messages if the actor is + under pressure, but only schedule a new tick message when we have seen the previous one. + +.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-receive + Template Pattern ================ @@ -33,4 +61,3 @@ This is an especially nice pattern, since it does even come with some empty exam Spread the word: this is the easiest way to get famous! Please keep this pattern at the end of this file. - diff --git a/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala new file mode 100644 index 0000000000..e79d94b2c5 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package docs.pattern + +import language.postfixOps + +import akka.actor.{ Props, ActorRef, Actor } +import scala.concurrent.util.duration._ +import scala.concurrent.util.{ FiniteDuration, Duration } +import akka.testkit.{ TimingTest, AkkaSpec, filterException } +import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor + +object SchedulerPatternSpec { + //#schedule-constructor + class ScheduleInConstructor extends Actor { + import context._ + + val tick = system.scheduler.schedule(500 millis, 1000 millis, self, "tick") + //#schedule-constructor + var target: ActorRef = null + def this(target: ActorRef) = { this(); this.target = target } + //#schedule-constructor + + override def postStop() = tick.cancel() + + def receive = { + case "tick" ⇒ + // do something useful here + //#schedule-constructor + target ! "tick" + case "restart" ⇒ + throw new ArithmeticException + //#schedule-constructor + } + } + //#schedule-constructor + + //#schedule-receive + class ScheduleInReceive extends Actor { + import context._ + //#schedule-receive + var target: ActorRef = null + def this(target: ActorRef) = { this(); this.target = target } + //#schedule-receive + + override def preStart() = + system.scheduler.scheduleOnce(500 millis, self, "tick") + + // override postRestart so we don't call preStart and schedule a new message + override def postRestart(reason: Throwable) = {} + + def receive = { + case "tick" ⇒ + // send another periodic tick after the specified delay + system.scheduler.scheduleOnce(1000 millis, self, "tick") + // do something useful here + //#schedule-receive + target ! "tick" + case "restart" ⇒ + throw new ArithmeticException + //#schedule-receive + } + } + //#schedule-receive +} + +class SchedulerPatternSpec extends AkkaSpec { + + def testSchedule(actor: ActorRef, startDuration: FiniteDuration, + afterRestartDuration: FiniteDuration) = { + + filterException[ArithmeticException] { + within(startDuration) { + expectMsg("tick") + expectMsg("tick") + expectMsg("tick") + } + actor ! "restart" + within(afterRestartDuration) { + expectMsg("tick") + expectMsg("tick") + } + system.stop(actor) + } + } + + "send periodic ticks from the constructor" taggedAs TimingTest in { + testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))), + 3000 millis, 2000 millis) + } + + "send ticks from the preStart and receive" taggedAs TimingTest in { + testSchedule(system.actorOf(Props(new ScheduleInConstructor(testActor))), + 3000 millis, 2500 millis) + } +} diff --git a/akka-docs/rst/scala/howto.rst b/akka-docs/rst/scala/howto.rst index 7d064e2491..c5203adb1c 100644 --- a/akka-docs/rst/scala/howto.rst +++ b/akka-docs/rst/scala/howto.rst @@ -111,6 +111,34 @@ This is where the Spider pattern comes in." The pattern is described `Discovering Message Flows in Actor System with the Spider Pattern `_. +Scheduling Periodic Messages +============================ + +This pattern describes how to schedule periodic messages to yourself in two different +ways. + +The first way is to set up periodic message scheduling in the constructor of the actor, +and cancel that scheduled sending in ``postStop`` or else we might have multiple registered +message sends to the same actor. + +.. note:: + + With this approach the scheduler will be restarted with the actor on restarts. + +.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-constructor + +The second variant sets up an initial one shot message send in the ``preStart`` method +of the actor, and the then the actor when it receives this message sets up a new one shot +message send. You also have to override ``postRestart`` so we don't call ``preStart`` +and schedule the initial message send again. + +.. note:: + + With this approach we won't fill up the mailbox with tick messages if the actor is + under pressure, but only schedule a new tick message when we have seen the previous one. + +.. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-receive + Template Pattern ================ @@ -127,4 +155,3 @@ This is an especially nice pattern, since it does even come with some empty exam Spread the word: this is the easiest way to get famous! Please keep this pattern at the end of this file. - From c63234ca4cbcf63e580ca041d2becd99e834792b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Mon, 8 Oct 2012 10:52:55 +0200 Subject: [PATCH 2/2] Changes according to review. #2513 --- ...estBase.java => SchedulerPatternTest.java} | 70 ++++++++++--------- .../docs/pattern/SchedulerPatternTest.scala | 9 --- akka-docs/rst/java/howto.rst | 9 ++- .../docs/pattern/SchedulerPatternSpec.scala | 7 +- akka-docs/rst/scala/howto.rst | 5 +- 5 files changed, 50 insertions(+), 50 deletions(-) rename akka-docs/rst/java/code/docs/pattern/{SchedulerPatternTestBase.java => SchedulerPatternTest.java} (76%) delete mode 100644 akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java similarity index 76% rename from akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java rename to akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java index b2543bfb19..05546232aa 100644 --- a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTestBase.java +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java @@ -8,26 +8,23 @@ import akka.actor.*; import akka.testkit.*; import akka.testkit.TestEvent.Mute; import akka.testkit.TestEvent.UnMute; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; +import org.junit.*; import scala.concurrent.util.Duration; import scala.concurrent.util.FiniteDuration; import java.util.Arrays; import java.util.concurrent.TimeUnit; -public class SchedulerPatternTestBase { +public class SchedulerPatternTest { - ActorSystem system; + static ActorSystem system; - @Before - public void setUp() { + @BeforeClass + public static void setUp() { system = ActorSystem.create("SchedulerPatternTest", AkkaSpec.testConf()); } - @After - public void tearDown() { + @AfterClass + public static void tearDown() { system.shutdown(); } @@ -38,9 +35,10 @@ public class SchedulerPatternTestBase { private final Cancellable tick = getContext().system().scheduler().schedule( Duration.create(500, TimeUnit.MILLISECONDS), Duration.create(1000, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().system().dispatcher()); + getSelf(), "tick", getContext().dispatcher()); //#schedule-constructor - ActorRef target; + // this variable and constructor is declared here to not show up in the docs + final ActorRef target; public ScheduleInConstructor(ActorRef target) { this.target = target; } @@ -75,7 +73,8 @@ public class SchedulerPatternTestBase { //#schedule-receive public class ScheduleInReceive extends UntypedActor { //#schedule-receive - ActorRef target; + // this variable and constructor is declared here to not show up in the docs + final ActorRef target; public ScheduleInReceive(ActorRef target) { this.target = target; } @@ -85,7 +84,7 @@ public class SchedulerPatternTestBase { public void preStart() { getContext().system().scheduler().scheduleOnce( Duration.create(500, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().system().dispatcher()); + getSelf(), "tick", getContext().dispatcher()); } // override postRestart so we don't call preStart and schedule a new message @@ -99,7 +98,7 @@ public class SchedulerPatternTestBase { // send another periodic tick after the specified delay getContext().system().scheduler().scheduleOnce( Duration.create(1000, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().system().dispatcher()); + getSelf(), "tick", getContext().dispatcher()); // do something useful here //#schedule-receive target.tell(message, getSelf()); @@ -164,26 +163,29 @@ public class SchedulerPatternTestBase { Iterable filter = Arrays.asList(new akka.testkit.EventFilter[]{ (akka.testkit.EventFilter) new ErrorFilter(ArithmeticException.class)}); - system.eventStream().publish(new Mute(filter)); + try { + system.eventStream().publish(new Mute(filter)); - final ActorRef actor = system.actorOf(props); - new Within(startDuration) { - protected void run() { - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - } - }; - actor.tell("restart", getRef()); - new Within(afterRestartDuration) { - protected void run() { - probe.expectMsgEquals("tick"); - probe.expectMsgEquals("tick"); - } - }; - system.stop(actor); - - system.eventStream().publish(new UnMute(filter)); + final ActorRef actor = system.actorOf(props); + new Within(startDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + actor.tell("restart", getRef()); + new Within(afterRestartDuration) { + protected void run() { + probe.expectMsgEquals("tick"); + probe.expectMsgEquals("tick"); + } + }; + system.stop(actor); + } + finally { + system.eventStream().publish(new UnMute(filter)); + } } } } diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala deleted file mode 100644 index d450bbc090..0000000000 --- a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.scala +++ /dev/null @@ -1,9 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package docs.pattern - -import org.scalatest.junit.JUnitSuite - -class SchedulerPatternTest extends SchedulerPatternTestBase with JUnitSuite diff --git a/akka-docs/rst/java/howto.rst b/akka-docs/rst/java/howto.rst index e1f9f1610b..922d318c75 100644 --- a/akka-docs/rst/java/howto.rst +++ b/akka-docs/rst/java/howto.rst @@ -29,9 +29,12 @@ message sends to the same actor. .. note:: - With this approach the scheduler will be restarted with the actor on restarts. + With this approach the scheduled periodic message send will be restarted with the actor on restarts. + This also means that the time period that elapses between two tick messages during a restart may drift + off based on when you restart the scheduled message sends relative to the time that the last message was + sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``. -.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-constructor +.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-constructor The second variant sets up an initial one shot message send in the ``preStart`` method of the actor, and the then the actor when it receives this message sets up a new one shot @@ -43,7 +46,7 @@ and schedule the initial message send again. With this approach we won't fill up the mailbox with tick messages if the actor is under pressure, but only schedule a new tick message when we have seen the previous one. -.. includecode:: code/docs/pattern/SchedulerPatternTestBase.java#schedule-receive +.. includecode:: code/docs/pattern/SchedulerPatternTest.java#schedule-receive Template Pattern ================ diff --git a/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala index e79d94b2c5..a669cb0bc5 100644 --- a/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala +++ b/akka-docs/rst/scala/code/docs/pattern/SchedulerPatternSpec.scala @@ -15,10 +15,10 @@ import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor object SchedulerPatternSpec { //#schedule-constructor class ScheduleInConstructor extends Actor { - import context._ - - val tick = system.scheduler.schedule(500 millis, 1000 millis, self, "tick") + val tick = + context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick") //#schedule-constructor + // this var and constructor is declared here to not show up in the docs var target: ActorRef = null def this(target: ActorRef) = { this(); this.target = target } //#schedule-constructor @@ -41,6 +41,7 @@ object SchedulerPatternSpec { class ScheduleInReceive extends Actor { import context._ //#schedule-receive + // this var and constructor is declared here to not show up in the docs var target: ActorRef = null def this(target: ActorRef) = { this(); this.target = target } //#schedule-receive diff --git a/akka-docs/rst/scala/howto.rst b/akka-docs/rst/scala/howto.rst index c5203adb1c..dcdebe06db 100644 --- a/akka-docs/rst/scala/howto.rst +++ b/akka-docs/rst/scala/howto.rst @@ -123,7 +123,10 @@ message sends to the same actor. .. note:: - With this approach the scheduler will be restarted with the actor on restarts. + With this approach the scheduled periodic message send will be restarted with the actor on restarts. + This also means that the time period that elapses between two tick messages during a restart may drift + off based on when you restart the scheduled message sends relative to the time that the last message was + sent, and how long the initial delay is. Worst case scenario is ``interval`` plus ``initialDelay``. .. includecode:: code/docs/pattern/SchedulerPatternSpec.scala#schedule-constructor