Merge branch 'master' into wip-1650-Future-or-√

This commit is contained in:
Viktor Klang 2012-01-17 13:49:24 +01:00
commit 861d51025b
12 changed files with 153 additions and 234 deletions

View file

@ -54,8 +54,7 @@ import com.eaio.util.lang.Hex;
* @author <a href="mailto:jb@eaio.de">Johann Burkard</a>
* @version $Id: UUID.java 1888 2009-03-15 12:43:24Z johann $
*/
public class UUID implements Comparable<UUID>, Serializable, Cloneable,
IDLEntity {
public class UUID implements Comparable<UUID>, Serializable, Cloneable {
/**
* Hasn't ever changed between versions.

View file

@ -1,55 +0,0 @@
/*
* uuid.idl
*
* Created 19:49 16.12.2003
*
* eaio: UUID - an implementation of the UUID specification
* Copyright (c) 2003-2009 Johann Burkard (jb@eaio.com) http://eaio.com.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included
* in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
* OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
* NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
* USE OR OTHER DEALINGS IN THE SOFTWARE.
*
*/
module com {
module eaio {
module uuid {
/**
* The UUID struct.
*/
struct UUID {
/**
* The time field of the UUID.
*/
long long time;
/**
* The clock sequence and node field of the UUID.
*/
long long clockSeqAndNode;
};
};
};
};

View file

@ -5,14 +5,12 @@
package akka.actor
import java.util.concurrent.atomic.AtomicLong
import org.jboss.netty.akka.util.{ Timer, TimerTask, HashedWheelTimer, Timeout HWTimeout }
import akka.config.ConfigurationException
import akka.dispatch._
import akka.routing._
import akka.AkkaException
import akka.util.{ Duration, Switch, Helpers, Timeout }
import akka.event._
import java.io.Closeable
/**
* Interface for all ActorRef providers to implement.
@ -43,8 +41,6 @@ trait ActorRefProvider {
def nodename: String
def clustername: String
/**
* The root path for all actors within this actor system, including remote
* address if enabled.
@ -523,150 +519,3 @@ class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassificat
}
}
/**
* Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher.
* Note that dispatcher is by-name parameter, because dispatcher might not be initialized
* when the scheduler is created.
*
* The HashedWheelTimer used by this class MUST throw an IllegalStateException
* if it does not enqueue a task. Once a task is queued, it MUST be executed or
* returned from stop().
*/
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: MessageDispatcher) extends Scheduler with Closeable {
def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
val continuousCancellable = new ContinuousCancellable
val task = new TimerTask with ContinuousScheduling {
def run(timeout: HWTimeout) {
receiver ! message
// Check if the receiver is still alive and kicking before reschedule the task
if (receiver.isTerminated) {
log.warning("Could not reschedule message to be sent because receiving actor has been terminated.")
} else {
scheduleNext(timeout, delay, continuousCancellable)
}
}
}
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
continuousCancellable
}
def schedule(initialDelay: Duration, delay: Duration)(f: Unit): Cancellable = {
val continuousCancellable = new ContinuousCancellable
val task = new TimerTask with ContinuousScheduling with Runnable {
def run = f
def run(timeout: HWTimeout) {
dispatcher execute this
scheduleNext(timeout, delay, continuousCancellable)
}
}
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
continuousCancellable
}
def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = {
val continuousCancellable = new ContinuousCancellable
val task = new TimerTask with ContinuousScheduling {
def run(timeout: HWTimeout) {
dispatcher.execute(runnable)
scheduleNext(timeout, delay, continuousCancellable)
}
}
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
continuousCancellable
}
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = {
val task = new TimerTask() {
def run(timeout: HWTimeout) { dispatcher.execute(runnable) }
}
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
}
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
val task = new TimerTask {
def run(timeout: HWTimeout) {
receiver ! message
}
}
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
}
def scheduleOnce(delay: Duration)(f: Unit): Cancellable = {
val task = new TimerTask {
def run(timeout: HWTimeout) {
dispatcher.execute(new Runnable { def run = f })
}
}
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
}
private trait ContinuousScheduling { this: TimerTask
def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) {
try {
delegator.swap(timeout.getTimer.newTimeout(this, delay))
} catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
}
}
private def execDirectly(t: HWTimeout): Unit = {
try t.getTask.run(t) catch {
case e: InterruptedException throw e
case e: Exception log.error(e, "exception while executing timer task")
}
}
def close() = {
import scala.collection.JavaConverters._
hashedWheelTimer.stop().asScala foreach execDirectly
}
}
/**
* Wrapper of a [[org.jboss.netty.akka.util.Timeout]] that delegates all
* methods. Needed to be able to cancel continuous tasks,
* since they create new Timeout for each tick.
*/
private[akka] class ContinuousCancellable extends Cancellable {
@volatile
private var delegate: HWTimeout = _
@volatile
private var cancelled = false
private[akka] def init(initialTimeout: HWTimeout): Unit = {
delegate = initialTimeout
}
private[akka] def swap(newTimeout: HWTimeout): Unit = {
val wasCancelled = isCancelled
delegate = newTimeout
if (wasCancelled || isCancelled) cancel()
}
def isCancelled(): Boolean = {
// delegate is initially null, but this object will not be exposed to the world until after init
cancelled || delegate.isCancelled()
}
def cancel(): Unit = {
// the underlying Timeout will not become cancelled once the task has been started to run,
// therefore we keep a flag here to make sure that rescheduling doesn't occur when cancelled
cancelled = true
// delegate is initially null, but this object will not be exposed to the world until after init
delegate.cancel()
}
}
class DefaultCancellable(val timeout: HWTimeout) extends Cancellable {
def cancel() {
timeout.cancel()
}
def isCancelled: Boolean = {
timeout.isCancelled
}
}

View file

@ -158,11 +158,6 @@ abstract class ActorSystem extends ActorRefFactory {
*/
def nodename: String
/**
* The logical name of the cluster this actor system belongs to.
*/
def clustername: String
/**
* Construct a path below the application guardian to be used with [[ActorSystem.actorFor]].
*/
@ -379,7 +374,6 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
def systemGuardian: InternalActorRef = provider.systemGuardian
def deathWatch: DeathWatch = provider.deathWatch
def nodename: String = provider.nodename
def clustername: String = provider.clustername
def /(actorName: String): ActorPath = guardian.path / actorName
def /(path: Iterable[String]): ActorPath = guardian.path / path

View file

@ -13,6 +13,11 @@
package akka.actor
import akka.util.Duration
import org.jboss.netty.akka.util.{ Timer, TimerTask, HashedWheelTimer, Timeout HWTimeout }
import akka.event.LoggingAdapter
import akka.dispatch.MessageDispatcher
import java.io.Closeable
//#scheduler
/**
* An Akka scheduler service. This one needs one special behavior: if
@ -108,3 +113,149 @@ trait Cancellable {
def isCancelled: Boolean
}
//#cancellable
/**
* Scheduled tasks (Runnable and functions) are executed with the supplied dispatcher.
* Note that dispatcher is by-name parameter, because dispatcher might not be initialized
* when the scheduler is created.
*
* The HashedWheelTimer used by this class MUST throw an IllegalStateException
* if it does not enqueue a task. Once a task is queued, it MUST be executed or
* returned from stop().
*/
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, dispatcher: MessageDispatcher) extends Scheduler with Closeable {
def schedule(initialDelay: Duration, delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
val continuousCancellable = new ContinuousCancellable
val task = new TimerTask with ContinuousScheduling {
def run(timeout: HWTimeout) {
receiver ! message
// Check if the receiver is still alive and kicking before reschedule the task
if (receiver.isTerminated) {
log.warning("Could not reschedule message to be sent because receiving actor has been terminated.")
} else {
scheduleNext(timeout, delay, continuousCancellable)
}
}
}
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
continuousCancellable
}
def schedule(initialDelay: Duration, delay: Duration)(f: Unit): Cancellable = {
val continuousCancellable = new ContinuousCancellable
val task = new TimerTask with ContinuousScheduling with Runnable {
def run = f
def run(timeout: HWTimeout) {
dispatcher execute this
scheduleNext(timeout, delay, continuousCancellable)
}
}
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
continuousCancellable
}
def schedule(initialDelay: Duration, delay: Duration, runnable: Runnable): Cancellable = {
val continuousCancellable = new ContinuousCancellable
val task = new TimerTask with ContinuousScheduling {
def run(timeout: HWTimeout) {
dispatcher.execute(runnable)
scheduleNext(timeout, delay, continuousCancellable)
}
}
continuousCancellable.init(hashedWheelTimer.newTimeout(task, initialDelay))
continuousCancellable
}
def scheduleOnce(delay: Duration, runnable: Runnable): Cancellable = {
val task = new TimerTask() {
def run(timeout: HWTimeout) { dispatcher.execute(runnable) }
}
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
}
def scheduleOnce(delay: Duration, receiver: ActorRef, message: Any): Cancellable = {
val task = new TimerTask {
def run(timeout: HWTimeout) {
receiver ! message
}
}
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
}
def scheduleOnce(delay: Duration)(f: Unit): Cancellable = {
val task = new TimerTask {
def run(timeout: HWTimeout) {
dispatcher.execute(new Runnable { def run = f })
}
}
new DefaultCancellable(hashedWheelTimer.newTimeout(task, delay))
}
private trait ContinuousScheduling { this: TimerTask
def scheduleNext(timeout: HWTimeout, delay: Duration, delegator: ContinuousCancellable) {
try {
delegator.swap(timeout.getTimer.newTimeout(this, delay))
} catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
}
}
private def execDirectly(t: HWTimeout): Unit = {
try t.getTask.run(t) catch {
case e: InterruptedException throw e
case e: Exception log.error(e, "exception while executing timer task")
}
}
def close() = {
import scala.collection.JavaConverters._
hashedWheelTimer.stop().asScala foreach execDirectly
}
}
/**
* Wrapper of a [[org.jboss.netty.akka.util.Timeout]] that delegates all
* methods. Needed to be able to cancel continuous tasks,
* since they create new Timeout for each tick.
*/
private[akka] class ContinuousCancellable extends Cancellable {
@volatile
private var delegate: HWTimeout = _
@volatile
private var cancelled = false
private[akka] def init(initialTimeout: HWTimeout): Unit = {
delegate = initialTimeout
}
private[akka] def swap(newTimeout: HWTimeout): Unit = {
val wasCancelled = isCancelled
delegate = newTimeout
if (wasCancelled || isCancelled) cancel()
}
def isCancelled(): Boolean = {
// delegate is initially null, but this object will not be exposed to the world until after init
cancelled || delegate.isCancelled()
}
def cancel(): Unit = {
// the underlying Timeout will not become cancelled once the task has been started to run,
// therefore we keep a flag here to make sure that rescheduling doesn't occur when cancelled
cancelled = true
// delegate is initially null, but this object will not be exposed to the world until after init
delegate.cancel()
}
}
class DefaultCancellable(val timeout: HWTimeout) extends Cancellable {
def cancel() {
timeout.cancel()
}
def isCancelled: Boolean = {
timeout.isCancelled
}
}

View file

@ -3,5 +3,4 @@ akka.event-handlers = ["akka.testkit.TestEventListener"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.cluster.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.nr-of-instances = 2
akka.remote.client.buffering.retry-message-send-on-failure = false
akka.actor.deployment.service-test.nr-of-instances = 2

View file

@ -3,4 +3,3 @@ akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.cluster.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.nr-of-instances = 2
akka.remote.client.buffering.retry-message-send-on-failure = false

View file

@ -3,4 +3,3 @@ akka.event-handler-level = "WARNING"
akka.actor.deployment.service-test.router = "round-robin"
akka.actor.deployment.service-test.cluster.preferred-nodes = ["node:node2","node:node3"]
akka.actor.deployment.service-test.nr-of-instances = 2
akka.remote.client.buffering.retry-message-send-on-failure = false

View file

@ -128,16 +128,6 @@ akka {
}
client {
buffering {
# Should message buffering on remote client error be used (buffer flushed
# on successful reconnect)
retry-message-send-on-failure = off
# If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using
# the property
capacity = -1
}
reconnect-delay = 5s
read-timeout = 3600s
message-frame-size = 1 MiB
@ -147,7 +137,6 @@ akka {
}
cluster {
name = "default-cluster"
nodename = "default"
seed-nodes = []
}

View file

@ -33,7 +33,6 @@ class RemoteActorRefProvider(
def guardian = local.guardian
def systemGuardian = local.systemGuardian
def nodename = remoteSettings.NodeName
def clustername = remoteSettings.ClusterName
def terminationFuture = local.terminationFuture
def dispatcher = local.dispatcher

View file

@ -25,7 +25,6 @@ class RemoteSettings(val config: Config, val systemName: String) {
val BackoffTimeout = Duration(config.getMilliseconds("akka.remote.backoff-timeout"), MILLISECONDS)
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module
val ClusterName = getString("akka.cluster.name")
val SeedNodes = Set.empty[RemoteNettyAddress] ++ getStringList("akka.cluster.seed-nodes").asScala.collect {
case RemoteAddressExtractor(addr) addr.transport
}

View file

@ -34,15 +34,12 @@ class RemoteConfigSpec extends AkkaSpec("akka.cluster.nodename = node1") {
getBytes("akka.remote.server.max-total-memory-size") must equal(0)
//akka.remote.client
getBoolean("akka.remote.client.buffering.retry-message-send-on-failure") must equal(false)
getInt("akka.remote.client.buffering.capacity") must equal(-1)
getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000)
getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000)
getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000)
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module
//akka.cluster
getString("akka.cluster.name") must equal("default-cluster")
getString("akka.cluster.nodename") must equal("node1")
getStringList("akka.cluster.seed-nodes") must equal(new java.util.ArrayList[String])