diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 1f669c9e32..07a6fa2fb7 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -255,7 +255,7 @@ object Actor extends Logging { case object Spawn actorOf(new Actor() { def receive = { - case Spawn => body; self.stop + case Spawn => try { body } finally { self.stop } } }).start ! Spawn } diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index 07486521ec..c3048bd74a 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -29,6 +29,9 @@ object Scheduler extends Logging { log.info("Starting up Scheduler") + /** + * Schedules to send the specified message to the receiver after initialDelay and then repeated after delay + */ def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { log.trace( "Schedule scheduled event\n\tevent = [%s]\n\treceiver = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]", @@ -38,19 +41,68 @@ object Scheduler extends Logging { new Runnable { def run = receiver ! message }, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { - case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e) + case e: Exception => throw SchedulerException(message + " could not be scheduled on " + receiver, e) } } + /** + * Schedules to run specified function to the receiver after initialDelay and then repeated after delay, + * avoid blocking operations since this is executed in the schedulers thread + */ + def schedule(f: () => Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = + schedule(new Runnable { def run = f() }, initialDelay, delay, timeUnit) + + /** + * Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay, + * avoid blocking operations since this is executed in the schedulers thread + */ + def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { + log.trace( + "Schedule scheduled event\n\trunnable = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]", + runnable, initialDelay, delay, timeUnit) + + try { + service.scheduleAtFixedRate(runnable,initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] + } catch { + case e: Exception => throw SchedulerException("Failed to schedule a Runnable", e) + } + } + + /** + * Schedules to send the specified message to the receiver after delay + */ def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { log.trace( "Schedule one-time event\n\tevent = [%s]\n\treceiver = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]", message, receiver, delay, timeUnit) try { service.schedule( - new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] + new Runnable { def run = receiver ! message }, + delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { - case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e) + case e: Exception => throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e) + } + } + + /** + * Schedules a function to be run after delay, + * avoid blocking operations since the runnable is executed in the schedulers thread + */ + def scheduleOnce(f: () => Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = + scheduleOnce(new Runnable { def run = f() }, delay, timeUnit) + + /** + * Schedules a runnable to be run after delay, + * avoid blocking operations since the runnable is executed in the schedulers thread + */ + def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { + log.trace( + "Schedule one-time event\n\trunnable = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]", + runnable, delay, timeUnit) + try { + service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] + } catch { + case e: Exception => throw SchedulerException("Failed to scheduleOnce a Runnable", e) } } diff --git a/akka-core/src/test/scala/misc/SchedulerSpec.scala b/akka-core/src/test/scala/misc/SchedulerSpec.scala index 510b24dedc..16dd21f327 100644 --- a/akka-core/src/test/scala/misc/SchedulerSpec.scala +++ b/akka-core/src/test/scala/misc/SchedulerSpec.scala @@ -28,16 +28,24 @@ class SchedulerSpec extends JUnitSuite { // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) + + val countDownLatch2 = new CountDownLatch(3) + + Scheduler.schedule( () => countDownLatch2.countDown, 0, 50, TimeUnit.MILLISECONDS) + + // after max 1 second it should be executed at least the 3 times already + assert(countDownLatch2.await(1, TimeUnit.SECONDS)) } @Test def schedulerShouldScheduleOnce = withCleanEndState { case object Tick - val countDownLatch = new CountDownLatch(2) + val countDownLatch = new CountDownLatch(3) val tickActor = actor { case Tick => countDownLatch.countDown } // run every 50 millisec Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS) + Scheduler.scheduleOnce( () => countDownLatch.countDown, 50, TimeUnit.MILLISECONDS) // after 1 second the wait should fail assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)