Merge branch 'master' into wip-1361-modularize-config-reference-patriknw

Conflicts:
	akka-actor/src/main/resources/akka-actor-reference.conf
	akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala
	akka-actor/src/main/scala/akka/actor/ActorSystem.scala
	akka-remote/src/main/scala/akka/remote/Gossiper.scala
	akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
This commit is contained in:
Patrik Nordwall 2011-11-24 14:56:19 +01:00
commit c53d5e16e5
18 changed files with 189 additions and 139 deletions

View file

@ -2,9 +2,10 @@ package akka.actor
import org.scalatest.BeforeAndAfterEach import org.scalatest.BeforeAndAfterEach
import org.multiverse.api.latches.StandardLatch import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.EventFilter import akka.testkit.EventFilter
import akka.util.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
@ -28,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
def receive = { case Tick countDownLatch.countDown() } def receive = { case Tick countDownLatch.countDown() }
}) })
// run every 50 millisec // run every 50 millisec
collectCancellable(system.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds))
// after max 1 second it should be executed at least the 3 times already // after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS)) assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3) val countDownLatch2 = new CountDownLatch(3)
collectCancellable(system.scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.schedule(() countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds))
// after max 1 second it should be executed at least the 3 times already // after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(2, TimeUnit.SECONDS)) assert(countDownLatch2.await(2, TimeUnit.SECONDS))
@ -49,8 +50,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
// run every 50 millisec // run every 50 millisec
collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds))
collectCancellable(system.scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.scheduleOnce(() countDownLatch.countDown(), 50 milliseconds))
// after 1 second the wait should fail // after 1 second the wait should fail
assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) assert(countDownLatch.await(2, TimeUnit.SECONDS) == false)
@ -58,22 +59,6 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
assert(countDownLatch.getCount == 1) assert(countDownLatch.getCount == 1)
} }
/**
* ticket #372
* FIXME rewrite the test so that registry is not used
*/
// "not create actors" in {
// object Ping
// val ticks = new CountDownLatch(1000)
// val actor = actorOf(new Actor {
// def receive = { case Ping ticks.countDown }
// })
// val numActors = system.registry.local.actors.length
// (1 to 1000).foreach(_ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS)))
// assert(ticks.await(10, TimeUnit.SECONDS))
// assert(system.registry.local.actors.length === numActors)
// }
/** /**
* ticket #372 * ticket #372
*/ */
@ -86,7 +71,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
(1 to 10).foreach { i (1 to 10).foreach { i
val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second))
timeout.cancel() timeout.cancel()
} }
@ -114,15 +99,59 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
}) })
val actor = (supervisor ? props).as[ActorRef].get val actor = (supervisor ? props).as[ActorRef].get
collectCancellable(system.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds))
// appx 2 pings before crash // appx 2 pings before crash
EventFilter[Exception]("CRASH", occurrences = 1) intercept { EventFilter[Exception]("CRASH", occurrences = 1) intercept {
collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds))
} }
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
// should be enough time for the ping countdown to recover and reach 6 pings // should be enough time for the ping countdown to recover and reach 6 pings
assert(pingLatch.await(4, TimeUnit.SECONDS)) assert(pingLatch.await(4, TimeUnit.SECONDS))
} }
"never fire prematurely" in {
val ticks = new CountDownLatch(300)
case class Msg(ts: Long)
val actor = actorOf(new Actor {
def receive = {
case Msg(ts)
val now = System.currentTimeMillis
// Make sure that no message has been dispatched before the scheduled time (10ms) has occurred
if (now - ts < 10) throw new RuntimeException("Interval is too small: " + (now - ts))
ticks.countDown()
}
})
(1 to 300).foreach { i
collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.currentTimeMillis), 10 milliseconds))
Thread.sleep(5)
}
assert(ticks.await(3, TimeUnit.SECONDS) == true)
}
"schedule with different initial delay and frequency" in {
val ticks = new CountDownLatch(3)
case object Msg
val actor = actorOf(new Actor {
def receive = {
case Msg ticks.countDown()
}
})
val startTime = System.nanoTime()
val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds)
ticks.await(3, TimeUnit.SECONDS)
val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000
assert(elapsedTimeMs > 1200)
assert(elapsedTimeMs < 1500) // the precision is not ms exact
cancellable.cancel()
}
} }
} }

View file

@ -5,12 +5,10 @@
package akka.config package akka.config
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.actor.ActorSystem
import java.io.File
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigParseOptions
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import java.util.concurrent.TimeUnit import akka.util.duration._
import akka.util.Duration import akka.util.Duration
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -37,7 +35,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1)
getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000)
getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000)
settings.DispatcherDefaultShutdown must equal(Duration(1, TimeUnit.SECONDS)) settings.DispatcherDefaultShutdown must equal(1 second)
getInt("akka.actor.default-dispatcher.throughput") must equal(5) getInt("akka.actor.default-dispatcher.throughput") must equal(5)
settings.DispatcherThroughput must equal(5) settings.DispatcherThroughput must equal(5)
getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0) getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0)

View file

@ -1,14 +1,8 @@
package akka.dispatch package akka.dispatch
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.junit.JUnitRunner import java.util.concurrent.{ TimeUnit, BlockingQueue }
import org.junit.runner.RunWith
import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue }
import java.util.{ Queue }
import akka.util._ import akka.util._
import akka.util.Duration._ import akka.util.duration._
import akka.actor.{ LocalActorRef, Actor }
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -23,7 +17,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val q = factory(config) val q = factory(config)
ensureInitialMailboxState(config, q) ensureInitialMailboxState(config, q)
implicit val within = Duration(1, TimeUnit.SECONDS) implicit val within = 1 second
val f = spawn { val f = spawn {
q.dequeue q.dequeue
@ -33,7 +27,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
} }
"create a bounded mailbox with 10 capacity and with push timeout" in { "create a bounded mailbox with 10 capacity and with push timeout" in {
val config = BoundedMailbox(10, Duration(10, TimeUnit.MILLISECONDS)) val config = BoundedMailbox(10, 10 milliseconds)
val q = factory(config) val q = factory(config)
ensureInitialMailboxState(config, q) ensureInitialMailboxState(config, q)
@ -58,11 +52,11 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
} }
"dequeue what was enqueued properly for bounded mailboxes" in { "dequeue what was enqueued properly for bounded mailboxes" in {
testEnqueueDequeue(BoundedMailbox(10000, Duration(-1, TimeUnit.MILLISECONDS))) testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond))
} }
"dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in {
testEnqueueDequeue(BoundedMailbox(10000, Duration(100, TimeUnit.MILLISECONDS))) testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds))
} }
} }
@ -97,7 +91,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
} }
def testEnqueueDequeue(config: MailboxType) { def testEnqueueDequeue(config: MailboxType) {
implicit val within = Duration(10, TimeUnit.SECONDS) implicit val within = 10 seconds
val q = factory(config) val q = factory(config)
ensureInitialMailboxState(config, q) ensureInitialMailboxState(config, q)

View file

@ -1,12 +1,11 @@
package akka.performance.microbench package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec import akka.performance.workbench.PerformanceSpec
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import akka.actor._ import akka.actor._
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
import akka.dispatch._ import akka.dispatch._
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import akka.util.Duration import akka.util.Duration
import akka.util.duration._
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 // -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -15,7 +14,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(system.dispatcherFactory.prerequisites, name, 5, new Dispatcher(system.dispatcherFactory.prerequisites, name, 5,
Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig()) Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig())
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients) .setCorePoolSize(maxClients)
.build .build

View file

@ -16,6 +16,7 @@
package org.jboss.netty.akka.util; package org.jboss.netty.akka.util;
import akka.event.LoggingAdapter; import akka.event.LoggingAdapter;
import akka.util.Duration;
import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap; import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap;
import org.jboss.netty.akka.util.internal.ReusableIterator; import org.jboss.netty.akka.util.internal.ReusableIterator;
@ -96,24 +97,24 @@ public class HashedWheelTimer implements Timer {
* @param threadFactory a {@link java.util.concurrent.ThreadFactory} that creates a * @param threadFactory a {@link java.util.concurrent.ThreadFactory} that creates a
* background {@link Thread} which is dedicated to * background {@link Thread} which is dedicated to
* {@link TimerTask} execution. * {@link TimerTask} execution.
* @param tickDuration the duration between tick * @param duration the duration between ticks
* @param unit the time unit of the {@code tickDuration}
* @param ticksPerWheel the size of the wheel * @param ticksPerWheel the size of the wheel
*/ */
public HashedWheelTimer( public HashedWheelTimer(
LoggingAdapter logger, LoggingAdapter logger,
ThreadFactory threadFactory, ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) { Duration duration,
int ticksPerWheel) {
if (threadFactory == null) { if (threadFactory == null) {
throw new NullPointerException("threadFactory"); throw new NullPointerException("threadFactory");
} }
if (unit == null) { if (duration == null) {
throw new NullPointerException("unit"); throw new NullPointerException("duration");
} }
if (tickDuration <= 0) { if (duration.toNanos() <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"tickDuration must be greater than 0: " + tickDuration); "duration must be greater than 0 ns: " + duration.toNanos());
} }
if (ticksPerWheel <= 0) { if (ticksPerWheel <= 0) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
@ -128,14 +129,14 @@ public class HashedWheelTimer implements Timer {
mask = wheel.length - 1; mask = wheel.length - 1;
// Convert tickDuration to milliseconds. // Convert tickDuration to milliseconds.
this.tickDuration = tickDuration = unit.toMillis(tickDuration); this.tickDuration = duration.toMillis();
// Prevent overflow. // Prevent overflow.
if (tickDuration == Long.MAX_VALUE || if (tickDuration == Long.MAX_VALUE ||
tickDuration >= Long.MAX_VALUE / wheel.length) { tickDuration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"tickDuration is too long: " + "tickDuration is too long: " +
tickDuration + ' ' + unit); tickDuration + ' ' + duration.unit());
} }
roundDuration = tickDuration * wheel.length; roundDuration = tickDuration * wheel.length;
@ -231,23 +232,22 @@ public class HashedWheelTimer implements Timer {
return Collections.unmodifiableSet(unprocessedTimeouts); return Collections.unmodifiableSet(unprocessedTimeouts);
} }
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { public Timeout newTimeout(TimerTask task, Duration delay) {
final long currentTime = System.currentTimeMillis(); final long currentTime = System.currentTimeMillis();
if (task == null) { if (task == null) {
throw new NullPointerException("task"); throw new NullPointerException("task");
} }
if (unit == null) { if (delay == null) {
throw new NullPointerException("unit"); throw new NullPointerException("delay");
} }
if (!workerThread.isAlive()) { if (!workerThread.isAlive()) {
start(); start();
} }
delay = unit.toMillis(delay); HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay.toMillis());
HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); scheduleTimeout(timeout, delay.toMillis());
scheduleTimeout(timeout, delay);
return timeout; return timeout;
} }

View file

@ -15,6 +15,7 @@
*/ */
package org.jboss.netty.akka.util; package org.jboss.netty.akka.util;
import akka.util.Duration;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -41,7 +42,7 @@ public interface Timer {
* @throws IllegalStateException if this timer has been * @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already * {@linkplain #stop() stopped} already
*/ */
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); Timeout newTimeout(TimerTask task, Duration delay);
/** /**
* Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all * Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all

View file

@ -123,5 +123,20 @@ akka {
} }
# Used to set the behavior of the scheduler.
# Changing the default values may change the system behavior drastically so make sure you know what you're doing!
#
scheduler {
# The HashedWheelTimer (HWT) implementation from Netty is used as the default scheduler in the system.
#
# HWT does not execute the scheduled tasks on exact time.
# It will, on every tick, check if there are any tasks behind the schedule and execute them.
# You can increase or decrease the accuracy of the execution timing by specifying smaller or larger tick duration.
# If you are scheduling a lot of tasks you should consider increasing the ticks per wheel.
# For more information see: http://www.jboss.org/netty/
tickDuration = 100ms
ticksPerWheel = 512
}
} }

View file

@ -9,7 +9,7 @@ import scala.annotation.tailrec
import scala.collection.immutable.{ Stack, TreeMap } import scala.collection.immutable.{ Stack, TreeMap }
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import akka.event.Logging.{ Debug, Warning, Error } import akka.event.Logging.{ Debug, Warning, Error }
import akka.util.Helpers import akka.util.{ Duration, Helpers }
/** /**
* The actor context - the view of the actor cell from the actor. * The actor context - the view of the actor cell from the actor.
@ -400,7 +400,7 @@ private[akka] class ActorCell(
if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) { if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) {
recvtimeout._2.cancel() //Cancel any ongoing future recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed //Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout._1, TimeUnit.MILLISECONDS)) receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout._1, TimeUnit.MILLISECONDS)))
} else cancelReceiveTimeout() } else cancelReceiveTimeout()
} }

View file

@ -15,7 +15,7 @@ import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream }
import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter }
import akka.AkkaException import akka.AkkaException
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.util.{ Switch, Helpers } import akka.util.{ Duration, Switch, Helpers }
import akka.remote.RemoteAddress import akka.remote.RemoteAddress
import akka.remote.LocalOnly import akka.remote.LocalOnly
@ -394,20 +394,20 @@ class LocalDeathWatch extends DeathWatch with ActorClassification {
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay))
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable = def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay))
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable = def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay))
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay))
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): Cancellable = def scheduleOnce(f: () Unit, delay: Duration): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)) new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay))
private def createSingleTask(runnable: Runnable): TimerTask = private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } } new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } }
@ -415,23 +415,23 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = private def createSingleTask(receiver: ActorRef, message: Any): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } }
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = { private def createSingleTask(f: () Unit): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } }
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = {
new TimerTask { new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) { def run(timeout: org.jboss.netty.akka.util.Timeout) {
receiver ! message receiver ! message
timeout.getTimer.newTimeout(this, delay, timeUnit) timeout.getTimer.newTimeout(this, delay)
} }
} }
} }
private def createSingleTask(f: () Unit): TimerTask = private def createContinuousTask(f: () Unit, delay: Duration): TimerTask = {
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } }
private def createContinuousTask(f: () Unit, delay: Long, timeUnit: TimeUnit): TimerTask = {
new TimerTask { new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) { def run(timeout: org.jboss.netty.akka.util.Timeout) {
f() f()
timeout.getTimer.newTimeout(this, delay, timeUnit) timeout.getTimer.newTimeout(this, delay)
} }
} }
} }

View file

@ -13,7 +13,6 @@ import com.eaio.uuid.UUID
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.remote.RemoteAddress import akka.remote.RemoteAddress
import org.jboss.netty.akka.util.HashedWheelTimer import org.jboss.netty.akka.util.HashedWheelTimer
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.SECONDS
import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.TimeUnit.NANOSECONDS import java.util.concurrent.TimeUnit.NANOSECONDS
@ -23,10 +22,11 @@ import com.typesafe.config.ConfigParseOptions
import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigRoot
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import java.lang.reflect.InvocationTargetException import java.lang.reflect.InvocationTargetException
import java.util.concurrent.ConcurrentHashMap
import akka.util.{ Helpers, Duration, ReflectiveAccess } import akka.util.{ Helpers, Duration, ReflectiveAccess }
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
@ -94,6 +94,9 @@ object ActorSystem {
val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala
val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
if (ConfigVersion != Version) if (ConfigVersion != Version)
throw new ConfigurationException("Akka JAR version [" + Version + throw new ConfigurationException("Akka JAR version [" + Version +
"] does not match the provided config version [" + ConfigVersion + "]") "] does not match the provided config version [" + ConfigVersion + "]")
@ -297,8 +300,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A
eventStream.startStdoutLogger(settings) eventStream.startStdoutLogger(settings)
val log = new BusLogging(eventStream, "ActorSystem") // this used only for .getClass in tagging messages val log = new BusLogging(eventStream, "ActorSystem") // this used only for .getClass in tagging messages
// FIXME make this configurable val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel))
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, MILLISECONDS, 512))
val provider: ActorRefProvider = { val provider: ActorRefProvider = {
val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {

View file

@ -7,6 +7,7 @@ import akka.util._
import scala.collection.mutable import scala.collection.mutable
import akka.event.Logging import akka.event.Logging
import akka.util.Duration._
object FSM { object FSM {
@ -33,9 +34,9 @@ object FSM {
def schedule(actor: ActorRef, timeout: Duration) { def schedule(actor: ActorRef, timeout: Duration) {
if (repeat) { if (repeat) {
ref = Some(system.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) ref = Some(system.scheduler.schedule(actor, this, timeout, timeout))
} else { } else {
ref = Some(system.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) ref = Some(system.scheduler.scheduleOnce(actor, this, timeout))
} }
} }
@ -522,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement {
if (timeout.isDefined) { if (timeout.isDefined) {
val t = timeout.get val t = timeout.get
if (t.finite_? && t.length >= 0) { if (t.finite_? && t.length >= 0) {
timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t))
} }
} }
} }

View file

@ -9,46 +9,50 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*
* Rework of David Pollak's ActorPing class in the Lift Project
* which is licensed under the Apache 2 License.
*/ */
package akka.actor package akka.actor
import java.util.concurrent._
import akka.util.Duration import akka.util.Duration
import akka.AkkaException
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) { trait Scheduler {
def this(msg: String) = this(msg, null) /**
} * Schedules a message to be sent repeatedly with an initial delay and frequency.
* E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set
* delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS)
*/
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable
trait JScheduler { /**
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable * Schedules a function to be run repeatedly with an initial delay and a frequency.
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS)
} */
def schedule(f: () Unit, initialDelay: Duration, frequency: Duration): Cancellable
abstract class Scheduler extends JScheduler { /**
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed.
*/
def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): Cancellable /**
* Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent.
*/
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = /**
schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) * Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run.
*/
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable = def scheduleOnce(f: () Unit, delay: Duration): Cancellable
schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable =
scheduleOnce(receiver, message, delay.length, delay.unit)
def scheduleOnce(f: () Unit, delay: Duration): Cancellable =
scheduleOnce(f, delay.length, delay.unit)
} }
trait Cancellable { trait Cancellable {
/**
* Cancels the underlying scheduled task.
*/
def cancel(): Unit def cancel(): Unit
/**
* Checks if the underlying scheduled task has been cancelled.
*/
def isCancelled: Boolean def isCancelled: Boolean
} }

View file

@ -135,7 +135,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
shutdownScheduleUpdater.get(this) match { shutdownScheduleUpdater.get(this) match {
case UNSCHEDULED case UNSCHEDULED
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
scheduler.scheduleOnce(shutdownAction, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) scheduler.scheduleOnce(shutdownAction, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS))
() ()
} else ifSensibleToDoSoThenScheduleShutdown() } else ifSensibleToDoSoThenScheduleShutdown()
case SCHEDULED case SCHEDULED
@ -210,7 +210,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
} }
case RESCHEDULED case RESCHEDULED
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
scheduler.scheduleOnce(this, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) scheduler.scheduleOnce(this, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS))
else run() else run()
} }
} }

View file

@ -956,12 +956,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable { val runnable = new Runnable {
def run() { def run() {
if (!isCompleted) { if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
else func(DefaultPromise.this) else func(DefaultPromise.this)
} }
} }
} }
val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS))
onComplete(_ timeoutFuture.cancel()) onComplete(_ timeoutFuture.cancel())
false false
} else true } else true
@ -983,12 +983,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable { val runnable = new Runnable {
def run() { def run() {
if (!isCompleted) { if (!isCompleted) {
if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS))
else promise complete (try { Right(fallback) } catch { case e Left(e) }) else promise complete (try { Right(fallback) } catch { case e Left(e) })
} }
} }
} }
dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS))
promise promise
} }
} else this } else this

View file

@ -8,6 +8,7 @@ import akka.actor._
import akka.actor.Status._ import akka.actor.Status._
import akka.event.Logging import akka.event.Logging
import akka.util.duration._ import akka.util.duration._
import akka.util.Duration
import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
@ -122,8 +123,8 @@ class Gossiper(remote: Remote) {
{ {
// start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between
system.scheduler schedule (() initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) system.scheduler schedule (() initateGossip(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit))
system.scheduler schedule (() scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) system.scheduler schedule (() scrutinize(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit))
} }
/** /**

View file

@ -281,6 +281,8 @@ trait RemoteMarshallingOps {
def system: ActorSystem def system: ActorSystem
protected def useUntrustedMode: Boolean
def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol.newBuilder val arp = AkkaRemoteProtocol.newBuilder
arp.setMessage(rmp) arp.setMessage(rmp)
@ -323,15 +325,15 @@ trait RemoteMarshallingOps {
messageBuilder messageBuilder
} }
def receiveMessage(remoteMessage: RemoteMessage, untrustedMode: Boolean) { def receiveMessage(remoteMessage: RemoteMessage) {
val recipient = remoteMessage.recipient val recipient = remoteMessage.recipient
remoteMessage.payload match { remoteMessage.payload match {
case Left(t) throw t case Left(t) throw t
case Right(r) r match { case Right(r) r match {
case _: Terminate if (untrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop() case _: Terminate if (useUntrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop()
case _: AutoReceivedMessage if (untrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") case _: AutoReceivedMessage if (useUntrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor")
case m recipient.!(m)(remoteMessage.sender) case m recipient.!(m)(remoteMessage.sender)
} }
} }
} }

View file

@ -95,8 +95,8 @@ abstract class RemoteClient private[akka] (
} }
class PassiveRemoteClient(val currentChannel: Channel, class PassiveRemoteClient(val currentChannel: Channel,
remoteSupport: NettyRemoteSupport, remoteSupport: NettyRemoteSupport,
remoteAddress: RemoteAddress) remoteAddress: RemoteAddress)
extends RemoteClient(remoteSupport, remoteAddress) { extends RemoteClient(remoteSupport, remoteAddress) {
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn {
@ -297,7 +297,7 @@ class ActiveRemoteClientHandler(
} }
case arp: AkkaRemoteProtocol if arp.hasMessage case arp: AkkaRemoteProtocol if arp.hasMessage
client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport, client.loader), untrustedMode = false) //TODO FIXME Sensible or not? client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport, client.loader))
case other case other
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.remoteSupport, client.remoteAddress) throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.remoteSupport, client.remoteAddress)
@ -364,11 +364,15 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi
private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock private val clientsLock = new ReentrantReadWriteLock
protected[akka] def send(message: Any, override protected def useUntrustedMode = serverSettings.UntrustedMode
senderOption: Option[ActorRef],
recipientAddress: RemoteAddress, protected[akka] def send(
recipient: ActorRef, message: Any,
loader: Option[ClassLoader]): Unit = { senderOption: Option[ActorRef],
recipientAddress: RemoteAddress,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit = {
clientsLock.readLock.lock clientsLock.readLock.lock
try { try {
val client = remoteClients.get(recipientAddress) match { val client = remoteClients.get(recipientAddress) match {
@ -634,7 +638,7 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try {
event.getMessage match { event.getMessage match {
case remote: AkkaRemoteProtocol if remote.hasMessage case remote: AkkaRemoteProtocol if remote.hasMessage
remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader), UntrustedMode) remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader))
case remote: AkkaRemoteProtocol if remote.hasInstruction case remote: AkkaRemoteProtocol if remote.hasInstruction
val instruction = remote.getInstruction val instruction = remote.getInstruction

View file

@ -4,7 +4,7 @@ package sample.fsm.dining.become
//http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus/ //http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus/
import akka.actor.{ ActorRef, Actor, ActorSystem } import akka.actor.{ ActorRef, Actor, ActorSystem }
import java.util.concurrent.TimeUnit import akka.util.duration._
/* /*
* First we define our messages, they basically speak for themselves * First we define our messages, they basically speak for themselves
@ -77,7 +77,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
case Taken(`chopstickToWaitFor`) case Taken(`chopstickToWaitFor`)
println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address)
become(eating) become(eating)
system.scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS) system.scheduler.scheduleOnce(self, Think, 5 seconds)
case Busy(chopstick) case Busy(chopstick)
become(thinking) become(thinking)
@ -106,7 +106,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
left ! Put(self) left ! Put(self)
right ! Put(self) right ! Put(self)
println("%s puts down his chopsticks and starts to think", name) println("%s puts down his chopsticks and starts to think", name)
system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) system.scheduler.scheduleOnce(self, Eat, 5 seconds)
} }
//All hakkers start in a non-eating state //All hakkers start in a non-eating state
@ -114,7 +114,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor {
case Think case Think
println("%s starts to think", name) println("%s starts to think", name)
become(thinking) become(thinking)
system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) system.scheduler.scheduleOnce(self, Eat, 5 seconds)
} }
} }