rename AkkaConfig values to CamelCase

This commit is contained in:
Roland 2011-10-12 11:34:35 +02:00
parent 42e1bba164
commit 36ec202d94
30 changed files with 146 additions and 139 deletions

View file

@ -20,7 +20,7 @@ import scala.Right;
public class JavaFutureTests { public class JavaFutureTests {
private final AkkaApplication app = new AkkaApplication(); private final AkkaApplication app = new AkkaApplication();
private final Timeout t = app.akkaConfig().TIMEOUT(); private final Timeout t = app.akkaConfig().ActorTimeout();
private final FutureFactory ff = new FutureFactory(app.dispatcher(), t); private final FutureFactory ff = new FutureFactory(app.dispatcher(), t);
@Test public void mustBeAbleToMapAFuture() { @Test public void mustBeAbleToMapAFuture() {

View file

@ -16,8 +16,8 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll {
} }
}, timeout = t)) }, timeout = t))
val defaultTimeout = app.AkkaConfig.TIMEOUT.duration val defaultTimeout = app.AkkaConfig.ActorTimeout.duration
val testTimeout = if (app.AkkaConfig.TIMEOUT.duration < 400.millis) 500 millis else 100 millis val testTimeout = if (app.AkkaConfig.ActorTimeout.duration < 400.millis) 500 millis else 100 millis
"An Actor-based Future" must { "An Actor-based Future" must {

View file

@ -489,8 +489,8 @@ abstract class ActorModelSpec extends AkkaSpec {
class DispatcherModelTest extends ActorModelSpec { class DispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config
new Dispatcher("foo", app.AkkaConfig.DispatcherThroughput, new Dispatcher("foo", app.AkkaConfig.DispatcherThroughput,
app.dispatcherFactory.THROUGHPUT_DEADLINE_TIME_MILLIS, app.dispatcherFactory.MAILBOX_TYPE, app.dispatcherFactory.ThroughputDeadlineTimeMillis, app.dispatcherFactory.MailboxType,
config, app.dispatcherFactory.DISPATCHER_SHUTDOWN_TIMEOUT) with MessageDispatcherInterceptor, config, app.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor,
ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor] ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor]
def dispatcherType = "Dispatcher" def dispatcherType = "Dispatcher"
} }
@ -498,8 +498,8 @@ class DispatcherModelTest extends ActorModelSpec {
class BalancingDispatcherModelTest extends ActorModelSpec { class BalancingDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher("foo", 1, // TODO check why 1 here? (came from old test) new BalancingDispatcher("foo", 1, // TODO check why 1 here? (came from old test)
app.dispatcherFactory.THROUGHPUT_DEADLINE_TIME_MILLIS, app.dispatcherFactory.MAILBOX_TYPE, app.dispatcherFactory.ThroughputDeadlineTimeMillis, app.dispatcherFactory.MailboxType,
config, app.dispatcherFactory.DISPATCHER_SHUTDOWN_TIMEOUT) with MessageDispatcherInterceptor, config, app.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor,
ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor] ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor]
def dispatcherType = "Balancing Dispatcher" def dispatcherType = "Balancing Dispatcher"

View file

@ -69,7 +69,7 @@ class DispatcherActorSpec extends AkkaSpec {
"respect the throughput setting" in { "respect the throughput setting" in {
val throughputDispatcher = app.dispatcherFactory. val throughputDispatcher = app.dispatcherFactory.
newDispatcher("THROUGHPUT", 101, 0, app.dispatcherFactory.MAILBOX_TYPE). newDispatcher("THROUGHPUT", 101, 0, app.dispatcherFactory.MailboxType).
setCorePoolSize(1). setCorePoolSize(1).
build build
@ -98,7 +98,7 @@ class DispatcherActorSpec extends AkkaSpec {
"respect throughput deadline" in { "respect throughput deadline" in {
val deadlineMs = 100 val deadlineMs = 100
val throughputDispatcher = app.dispatcherFactory. val throughputDispatcher = app.dispatcherFactory.
newDispatcher("THROUGHPUT", 2, deadlineMs, app.dispatcherFactory.MAILBOX_TYPE). newDispatcher("THROUGHPUT", 2, deadlineMs, app.dispatcherFactory.MailboxType).
setCorePoolSize(1). setCorePoolSize(1).
build build
val works = new AtomicBoolean(true) val works = new AtomicBoolean(true)

View file

@ -186,7 +186,7 @@ class Report(app: AkkaApplication,
sb.append("Args:\n ").append(args) sb.append("Args:\n ").append(args)
sb.append("\n") sb.append("\n")
sb.append("Akka version: ").append(app.AkkaConfig.CONFIG_VERSION) sb.append("Akka version: ").append(app.AkkaConfig.ConfigVersion)
sb.append("\n") sb.append("\n")
sb.append("Akka config:") sb.append("Akka config:")
for (key app.config.keys) { for (key app.config.keys) {

View file

@ -19,7 +19,7 @@ import akka.serialization.Serialization
object AkkaApplication { object AkkaApplication {
val VERSION = "2.0-SNAPSHOT" val Version = "2.0-SNAPSHOT"
val envHome = System.getenv("AKKA_HOME") match { val envHome = System.getenv("AKKA_HOME") match {
case null | "" | "." None case null | "" | "." None
@ -31,7 +31,7 @@ object AkkaApplication {
case value Some(value) case value Some(value)
} }
val GLOBAL_HOME = systemHome orElse envHome val GlobalHome = systemHome orElse envHome
val envConf = System.getenv("AKKA_MODE") match { val envConf = System.getenv("AKKA_MODE") match {
case null | "" None case null | "" None
@ -54,13 +54,15 @@ object AkkaApplication {
} catch { case _ None } } catch { case _ None }
val fromHome = try { val fromHome = try {
Some(Configuration.fromFile(GLOBAL_HOME.get + "/config/" + defaultLocation)) Some(Configuration.fromFile(GlobalHome.get + "/config/" + defaultLocation))
} catch { case _ None } } catch { case _ None }
val emptyConfig = Configuration.fromString("akka { version = \"" + VERSION + "\" }") val emptyConfig = Configuration.fromString("akka { version = \"" + Version + "\" }")
val defaultConfig = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig val defaultConfig = fromProperties orElse fromClasspath orElse fromHome getOrElse emptyConfig
def apply(name: String, config: Configuration) = new AkkaApplication(name, config)
def apply(name: String): AkkaApplication = new AkkaApplication(name) def apply(name: String): AkkaApplication = new AkkaApplication(name)
def apply(): AkkaApplication = new AkkaApplication() def apply(): AkkaApplication = new AkkaApplication()
@ -76,36 +78,36 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
object AkkaConfig { object AkkaConfig {
import config._ import config._
val CONFIG_VERSION = getString("akka.version", VERSION) val ConfigVersion = getString("akka.version", Version)
val TIME_UNIT = getString("akka.time-unit", "seconds") val DefaultTimeUnit = getString("akka.time-unit", "seconds")
val TIMEOUT = Timeout(Duration(getInt("akka.actor.timeout", 5), TIME_UNIT)) val ActorTimeout = Timeout(Duration(getInt("akka.actor.timeout", 5), DefaultTimeUnit))
val TimeoutMillis = TIMEOUT.duration.toMillis val ActorTimeoutMillis = ActorTimeout.duration.toMillis
val SERIALIZE_MESSAGES = getBool("akka.actor.serialize-messages", false) val SerializeAllMessages = getBool("akka.actor.serialize-messages", false)
val LogLevel = getString("akka.event-handler-level", "INFO") val LogLevel = getString("akka.event-handler-level", "INFO")
val EventHandlers = getList("akka.event-handlers") val EventHandlers = getList("akka.event-handlers")
val ADD_LOGGING_RECEIVE = getBool("akka.actor.debug.receive", false) val AddLoggingReceive = getBool("akka.actor.debug.receive", false)
val DEBUG_AUTO_RECEIVE = getBool("akka.actor.debug.autoreceive", false) val DebugAutoReceive = getBool("akka.actor.debug.autoreceive", false)
val DEBUG_LIFECYCLE = getBool("akka.actor.debug.lifecycle", false) val DebugLifecycle = getBool("akka.actor.debug.lifecycle", false)
val FsmDebugEvent = getBool("akka.actor.debug.fsm", false) val FsmDebugEvent = getBool("akka.actor.debug.fsm", false)
val DispatcherThroughput = getInt("akka.actor.throughput", 5) val DispatcherThroughput = getInt("akka.actor.throughput", 5)
val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout"). val DispatcherDefaultShutdown = getLong("akka.actor.dispatcher-shutdown-timeout").
map(time Duration(time, TIME_UNIT)). map(time Duration(time, DefaultTimeUnit)).
getOrElse(Duration(1000, TimeUnit.MILLISECONDS)) getOrElse(Duration(1000, TimeUnit.MILLISECONDS))
val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity", -1) val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity", -1)
val MailboxPushTimeout = Duration(getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), TIME_UNIT) val MailboxPushTimeout = Duration(getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time", 10), DefaultTimeUnit)
val ThroughputDeadlineTime = Duration(getInt("akka.actor.throughput-deadline-time", -1), TIME_UNIT) val DispatcherThroughputDeadlineTime = Duration(getInt("akka.actor.throughput-deadline-time", -1), DefaultTimeUnit)
val HOME = getString("akka.home") val Home = getString("akka.home")
val BOOT_CLASSES = getList("akka.boot") val BootClasses = getList("akka.boot")
val ENABLED_MODULES = getList("akka.enabled-modules") val EnabledModules = getList("akka.enabled-modules")
val CLUSTER_ENABLED = ENABLED_MODULES exists (_ == "cluster") val ClusterEnabled = EnabledModules exists (_ == "cluster")
val ClusterName = getString("akka.cluster.name", "default") val ClusterName = getString("akka.cluster.name", "default")
val REMOTE_TRANSPORT = getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport") val RemoteTransport = getString("akka.remote.layer", "akka.remote.netty.NettyRemoteSupport")
val REMOTE_SERVER_PORT = getInt("akka.remote.server.port", 2552) val RemoteServerPort = getInt("akka.remote.server.port", 2552)
} }
// Java API // Java API
@ -138,9 +140,9 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
case value value case value value
} }
if (CONFIG_VERSION != VERSION) if (ConfigVersion != Version)
throw new ConfigurationException("Akka JAR version [" + VERSION + throw new ConfigurationException("Akka JAR version [" + Version +
"] does not match the provided config version [" + CONFIG_VERSION + "]") "] does not match the provided config version [" + ConfigVersion + "]")
// TODO correctly pull its config from the config // TODO correctly pull its config from the config
val dispatcherFactory = new Dispatchers(this) val dispatcherFactory = new Dispatchers(this)

View file

@ -139,7 +139,7 @@ object Timeout {
implicit def durationToTimeout(duration: Duration) = new Timeout(duration) implicit def durationToTimeout(duration: Duration) = new Timeout(duration)
implicit def intToTimeout(timeout: Int) = new Timeout(timeout) implicit def intToTimeout(timeout: Int) = new Timeout(timeout)
implicit def longToTimeout(timeout: Long) = new Timeout(timeout) implicit def longToTimeout(timeout: Long) = new Timeout(timeout)
implicit def defaultTimeout(implicit application: AkkaApplication) = application.AkkaConfig.TIMEOUT implicit def defaultTimeout(implicit application: AkkaApplication) = application.AkkaConfig.ActorTimeout
} }
object Actor { object Actor {
@ -218,7 +218,7 @@ trait Actor {
/** /**
* The default timeout, based on the config setting 'akka.actor.timeout' * The default timeout, based on the config setting 'akka.actor.timeout'
*/ */
implicit val defaultTimeout = config.TIMEOUT implicit val defaultTimeout = config.ActorTimeout
/** /**
* Wrap a Receive partial function in a logging enclosure, which sends a * Wrap a Receive partial function in a logging enclosure, which sends a
@ -234,7 +234,7 @@ trait Actor {
* This method does NOT modify the given Receive unless * This method does NOT modify the given Receive unless
* akka.actor.debug.receive is set within akka.conf. * akka.actor.debug.receive is set within akka.conf.
*/ */
def loggable(self: AnyRef)(r: Receive): Receive = if (config.ADD_LOGGING_RECEIVE) LoggingReceive(self, r) else r def loggable(self: AnyRef)(r: Receive): Receive = if (config.AddLoggingReceive) LoggingReceive(self, r) else r
/** /**
* Some[ActorRef] representation of the 'self' ActorRef reference. * Some[ActorRef] representation of the 'self' ActorRef reference.
@ -423,7 +423,7 @@ trait Actor {
throw new InvalidMessageException("Message from [" + channel + "] to [" + self.toString + "] is null") throw new InvalidMessageException("Message from [" + channel + "] to [" + self.toString + "] is null")
def autoReceiveMessage(msg: AutoReceivedMessage) { def autoReceiveMessage(msg: AutoReceivedMessage) {
if (config.DEBUG_AUTO_RECEIVE) EventHandler.debug(this, "received AutoReceiveMessage " + msg) if (config.DebugAutoReceive) EventHandler.debug(this, "received AutoReceiveMessage " + msg)
msg match { msg match {
case HotSwap(code, discardOld) become(code(self), discardOld) case HotSwap(code, discardOld) become(code(self), discardOld)

View file

@ -333,7 +333,7 @@ private[akka] class ActorCell(
actor = created actor = created
created.preStart() created.preStart()
checkReceiveTimeout checkReceiveTimeout
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(created, "started") if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(created, "started")
} catch { } catch {
case e try { case e try {
EventHandler.error(e, this, "error while creating actor") EventHandler.error(e, this, "error while creating actor")
@ -347,7 +347,7 @@ private[akka] class ActorCell(
def recreate(cause: Throwable): Unit = try { def recreate(cause: Throwable): Unit = try {
val failedActor = actor val failedActor = actor
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(failedActor, "restarting") if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(failedActor, "restarting")
val freshActor = newActor() val freshActor = newActor()
if (failedActor ne null) { if (failedActor ne null) {
val c = currentMessage //One read only plz val c = currentMessage //One read only plz
@ -361,7 +361,7 @@ private[akka] class ActorCell(
} }
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.postRestart(cause) freshActor.postRestart(cause)
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(freshActor, "restarted") if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(freshActor, "restarted")
dispatcher.resume(this) //FIXME should this be moved down? dispatcher.resume(this) //FIXME should this be moved down?
@ -390,7 +390,7 @@ private[akka] class ActorCell(
try { try {
val a = actor val a = actor
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(a, "stopping") if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(a, "stopping")
if (a ne null) a.postStop() if (a ne null) a.postStop()
//Stop supervised actors //Stop supervised actors
@ -416,7 +416,7 @@ private[akka] class ActorCell(
val links = _children val links = _children
if (!links.exists(_.child == child)) { if (!links.exists(_.child == child)) {
_children = links :+ ChildRestartStats(child) _children = links :+ ChildRestartStats(child)
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(actor, "now supervising " + child) if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(actor, "now supervising " + child)
} else EventHandler.warning(actor, "Already supervising " + child) } else EventHandler.warning(actor, "Already supervising " + child)
} }
@ -428,10 +428,10 @@ private[akka] class ActorCell(
case Recreate(cause) recreate(cause) case Recreate(cause) recreate(cause)
case Link(subject) case Link(subject)
akka.event.InVMMonitoring.link(self, subject) akka.event.InVMMonitoring.link(self, subject)
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(actor, "now monitoring " + subject) if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(actor, "now monitoring " + subject)
case Unlink(subject) case Unlink(subject)
akka.event.InVMMonitoring.unlink(self, subject) akka.event.InVMMonitoring.unlink(self, subject)
if (application.AkkaConfig.DEBUG_LIFECYCLE) EventHandler.debug(actor, "stopped monitoring " + subject) if (application.AkkaConfig.DebugLifecycle) EventHandler.debug(actor, "stopped monitoring " + subject)
case Suspend suspend() case Suspend suspend()
case Resume resume() case Resume resume()
case Terminate terminate() case Terminate terminate()

View file

@ -17,12 +17,12 @@ trait BootableActorLoaderService extends Bootable {
def app: AkkaApplication def app: AkkaApplication
val BOOT_CLASSES = app.AkkaConfig.BOOT_CLASSES val BOOT_CLASSES = app.AkkaConfig.BootClasses
lazy val applicationLoader = createApplicationClassLoader() lazy val applicationLoader = createApplicationClassLoader()
protected def createApplicationClassLoader(): Option[ClassLoader] = Some({ protected def createApplicationClassLoader(): Option[ClassLoader] = Some({
if (app.AkkaConfig.HOME.isDefined) { if (app.AkkaConfig.Home.isDefined) {
val DEPLOY = app.AkkaConfig.HOME.get + "/deploy" val DEPLOY = app.AkkaConfig.Home.get + "/deploy"
val DEPLOY_DIR = new File(DEPLOY) val DEPLOY_DIR = new File(DEPLOY)
if (!DEPLOY_DIR.exists) { if (!DEPLOY_DIR.exists) {
System.exit(-1) System.exit(-1)

View file

@ -222,7 +222,7 @@ class TypedActor(val application: AkkaApplication) {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null) val actorVar = new AtomVar[ActorRef](null)
val timeout = props.timeout match { val timeout = props.timeout match {
case Timeout(Duration.MinusInf) application.AkkaConfig.TIMEOUT case Timeout(Duration.MinusInf) application.AkkaConfig.ActorTimeout
case x x case x x
} }
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(timeout)).asInstanceOf[T] val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(timeout)).asInstanceOf[T]

View file

@ -322,7 +322,7 @@ abstract class MessageDispatcherConfigurator(val application: AkkaApplication) {
else { else {
val duration = Duration( val duration = Duration(
config.getInt("mailbox-push-timeout-time", application.AkkaConfig.MailboxPushTimeout.toMillis.toInt), config.getInt("mailbox-push-timeout-time", application.AkkaConfig.MailboxPushTimeout.toMillis.toInt),
application.AkkaConfig.TIME_UNIT) application.AkkaConfig.DefaultTimeUnit)
BoundedMailbox(capacity, duration) BoundedMailbox(capacity, duration)
} }
} }
@ -332,7 +332,7 @@ abstract class MessageDispatcherConfigurator(val application: AkkaApplication) {
//Apply the following options to the config if they are present in the config //Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure( ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
conf_?(config getInt "keep-alive-time")(time _.setKeepAliveTime(Duration(time, application.AkkaConfig.TIME_UNIT))), conf_?(config getInt "keep-alive-time")(time _.setKeepAliveTime(Duration(time, application.AkkaConfig.DefaultTimeUnit))),
conf_?(config getDouble "core-pool-size-factor")(factor _.setCorePoolSizeFromFactor(factor)), conf_?(config getDouble "core-pool-size-factor")(factor _.setCorePoolSizeFromFactor(factor)),
conf_?(config getDouble "max-pool-size-factor")(factor _.setMaxPoolSizeFromFactor(factor)), conf_?(config getDouble "max-pool-size-factor")(factor _.setMaxPoolSizeFromFactor(factor)),
conf_?(config getInt "executor-bounds")(bounds _.setExecutorBounds(bounds)), conf_?(config getInt "executor-bounds")(bounds _.setExecutorBounds(bounds)),

View file

@ -44,14 +44,14 @@ import akka.AkkaApplication
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class Dispatchers(val application: AkkaApplication) { class Dispatchers(val application: AkkaApplication) {
val THROUGHPUT_DEADLINE_TIME_MILLIS = application.AkkaConfig.ThroughputDeadlineTime.toMillis.toInt val ThroughputDeadlineTimeMillis = application.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt
val MAILBOX_TYPE: MailboxType = val MailboxType: MailboxType =
if (application.AkkaConfig.MailboxCapacity < 1) UnboundedMailbox() if (application.AkkaConfig.MailboxCapacity < 1) UnboundedMailbox()
else BoundedMailbox(application.AkkaConfig.MailboxCapacity, application.AkkaConfig.MailboxPushTimeout) else BoundedMailbox(application.AkkaConfig.MailboxCapacity, application.AkkaConfig.MailboxPushTimeout)
val DISPATCHER_SHUTDOWN_TIMEOUT = application.AkkaConfig.DispatcherDefaultShutdown.toMillis val DispatcherShutdownMillis = application.AkkaConfig.DispatcherDefaultShutdown.toMillis
lazy val defaultGlobalDispatcher = lazy val defaultGlobalDispatcher =
application.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MAILBOX_TYPE).build application.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build
/** /**
* Creates an thread based dispatcher serving a single actor through the same single thread. * Creates an thread based dispatcher serving a single actor through the same single thread.
@ -60,8 +60,8 @@ class Dispatchers(val application: AkkaApplication) {
* E.g. each actor consumes its own thread. * E.g. each actor consumes its own thread.
*/ */
def newPinnedDispatcher(actor: LocalActorRef) = actor match { def newPinnedDispatcher(actor: LocalActorRef) = actor match {
case null new PinnedDispatcher(null, "anon", MAILBOX_TYPE, DISPATCHER_SHUTDOWN_TIMEOUT) case null new PinnedDispatcher(null, "anon", MailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(some.underlying, some.underlying.uuid.toString, MAILBOX_TYPE, DISPATCHER_SHUTDOWN_TIMEOUT) case some new PinnedDispatcher(some.underlying, some.underlying.uuid.toString, MailboxType, DispatcherShutdownMillis)
} }
/** /**
@ -71,8 +71,8 @@ class Dispatchers(val application: AkkaApplication) {
* E.g. each actor consumes its own thread. * E.g. each actor consumes its own thread.
*/ */
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match { def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
case null new PinnedDispatcher(null, "anon", mailboxType, DISPATCHER_SHUTDOWN_TIMEOUT) case null new PinnedDispatcher(null, "anon", mailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(some.underlying, some.underlying.uuid.toString, mailboxType, DISPATCHER_SHUTDOWN_TIMEOUT) case some new PinnedDispatcher(some.underlying, some.underlying.uuid.toString, mailboxType, DispatcherShutdownMillis)
} }
/** /**
@ -81,7 +81,7 @@ class Dispatchers(val application: AkkaApplication) {
* E.g. each actor consumes its own thread. * E.g. each actor consumes its own thread.
*/ */
def newPinnedDispatcher(name: String, mailboxType: MailboxType) = def newPinnedDispatcher(name: String, mailboxType: MailboxType) =
new PinnedDispatcher(null, name, mailboxType, DISPATCHER_SHUTDOWN_TIMEOUT) new PinnedDispatcher(null, name, mailboxType, DispatcherShutdownMillis)
/** /**
* Creates an thread based dispatcher serving a single actor through the same single thread. * Creates an thread based dispatcher serving a single actor through the same single thread.
@ -89,7 +89,7 @@ class Dispatchers(val application: AkkaApplication) {
* E.g. each actor consumes its own thread. * E.g. each actor consumes its own thread.
*/ */
def newPinnedDispatcher(name: String) = def newPinnedDispatcher(name: String) =
new PinnedDispatcher(null, name, MAILBOX_TYPE, DISPATCHER_SHUTDOWN_TIMEOUT) new PinnedDispatcher(null, name, MailboxType, DispatcherShutdownMillis)
/** /**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -98,7 +98,7 @@ class Dispatchers(val application: AkkaApplication) {
*/ */
def newDispatcher(name: String) = def newDispatcher(name: String) =
ThreadPoolConfigDispatcherBuilder(config new Dispatcher(name, application.AkkaConfig.DispatcherThroughput, ThreadPoolConfigDispatcherBuilder(config new Dispatcher(name, application.AkkaConfig.DispatcherThroughput,
THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
/** /**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -107,7 +107,7 @@ class Dispatchers(val application: AkkaApplication) {
*/ */
def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) new Dispatcher(name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
/** /**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -116,7 +116,7 @@ class Dispatchers(val application: AkkaApplication) {
*/ */
def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(name, throughput, throughputDeadlineMs, mailboxType, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) new Dispatcher(name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
/** /**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -125,7 +125,7 @@ class Dispatchers(val application: AkkaApplication) {
*/ */
def newBalancingDispatcher(name: String) = def newBalancingDispatcher(name: String) =
ThreadPoolConfigDispatcherBuilder(config new BalancingDispatcher(name, application.AkkaConfig.DispatcherThroughput, ThreadPoolConfigDispatcherBuilder(config new BalancingDispatcher(name, application.AkkaConfig.DispatcherThroughput,
THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
/** /**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -134,7 +134,7 @@ class Dispatchers(val application: AkkaApplication) {
*/ */
def newBalancingDispatcher(name: String, throughput: Int) = def newBalancingDispatcher(name: String, throughput: Int) =
ThreadPoolConfigDispatcherBuilder(config ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) new BalancingDispatcher(name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
/** /**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -143,7 +143,7 @@ class Dispatchers(val application: AkkaApplication) {
*/ */
def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) new BalancingDispatcher(name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
/** /**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -152,7 +152,7 @@ class Dispatchers(val application: AkkaApplication) {
*/ */
def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config, DISPATCHER_SHUTDOWN_TIMEOUT), ThreadPoolConfig()) new BalancingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
/** /**
* Utility function that tries to load the specified dispatcher config from the akka.conf * Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher * or else use the supplied default dispatcher
@ -211,7 +211,7 @@ class DispatcherConfigurator(application: AkkaApplication) extends MessageDispat
configureThreadPool(config, threadPoolConfig new Dispatcher( configureThreadPool(config, threadPoolConfig new Dispatcher(
config.getString("name", newUuid.toString), config.getString("name", newUuid.toString),
config.getInt("throughput", application.AkkaConfig.DispatcherThroughput), config.getInt("throughput", application.AkkaConfig.DispatcherThroughput),
config.getInt("throughput-deadline-time", application.AkkaConfig.ThroughputDeadlineTime.toMillis.toInt), config.getInt("throughput-deadline-time", application.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt),
mailboxType(config), mailboxType(config),
threadPoolConfig, threadPoolConfig,
application.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build application.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build
@ -223,7 +223,7 @@ class BalancingDispatcherConfigurator(application: AkkaApplication) extends Mess
configureThreadPool(config, threadPoolConfig new BalancingDispatcher( configureThreadPool(config, threadPoolConfig new BalancingDispatcher(
config.getString("name", newUuid.toString), config.getString("name", newUuid.toString),
config.getInt("throughput", application.AkkaConfig.DispatcherThroughput), config.getInt("throughput", application.AkkaConfig.DispatcherThroughput),
config.getInt("throughput-deadline-time", application.AkkaConfig.ThroughputDeadlineTime.toMillis.toInt), config.getInt("throughput-deadline-time", application.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt),
mailboxType(config), mailboxType(config),
threadPoolConfig, threadPoolConfig,
application.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build application.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build

View file

@ -333,10 +333,10 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒
trait RemoteClientModule extends RemoteModule { self: RemoteSupport trait RemoteClientModule extends RemoteModule { self: RemoteSupport
def actorFor(address: String, hostname: String, port: Int): ActorRef = def actorFor(address: String, hostname: String, port: Int): ActorRef =
actorFor(address, app.AkkaConfig.TimeoutMillis, hostname, port, None) actorFor(address, app.AkkaConfig.ActorTimeoutMillis, hostname, port, None)
def actorFor(address: String, hostname: String, port: Int, loader: ClassLoader): ActorRef = def actorFor(address: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(address, app.AkkaConfig.TimeoutMillis, hostname, port, Some(loader)) actorFor(address, app.AkkaConfig.ActorTimeoutMillis, hostname, port, Some(loader))
def actorFor(address: String, timeout: Long, hostname: String, port: Int): ActorRef = def actorFor(address: String, timeout: Long, hostname: String, port: Int): ActorRef =
actorFor(address, timeout, hostname, port, None) actorFor(address, timeout, hostname, port, None)

View file

@ -86,6 +86,6 @@ class AkkaLoader(application: AkkaApplication) {
============================================================================== ==============================================================================
Running version %s Running version %s
============================================================================== ==============================================================================
""".format(AkkaApplication.VERSION)) """.format(AkkaApplication.Version))
} }
} }

View file

@ -133,7 +133,7 @@ class ReflectiveAccess(val application: AkkaApplication) {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object ClusterModule { object ClusterModule {
lazy val isEnabled = application.AkkaConfig.CLUSTER_ENABLED //&& clusterInstance.isDefined lazy val isEnabled = application.AkkaConfig.ClusterEnabled //&& clusterInstance.isDefined
lazy val clusterRefClass: Class[_] = getClassFor("akka.cluster.ClusterActorRef") match { lazy val clusterRefClass: Class[_] = getClassFor("akka.cluster.ClusterActorRef") match {
case Left(e) throw e case Left(e) throw e
@ -239,9 +239,9 @@ class ReflectiveAccess(val application: AkkaApplication) {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object RemoteModule { object RemoteModule {
val TRANSPORT = application.AkkaConfig.REMOTE_TRANSPORT val TRANSPORT = application.AkkaConfig.RemoteTransport
val configDefaultAddress = new InetSocketAddress(application.hostname, application.AkkaConfig.REMOTE_SERVER_PORT) val configDefaultAddress = new InetSocketAddress(application.hostname, application.AkkaConfig.RemoteServerPort)
lazy val isEnabled = remoteSupportClass.isDefined lazy val isEnabled = remoteSupportClass.isDefined

View file

@ -15,7 +15,7 @@ import akka.event.EventHandler
*/ */
trait BootableRemoteActorService extends Bootable { trait BootableRemoteActorService extends Bootable {
self: BootableActorLoaderService self: BootableActorLoaderService
def settings: RemoteServerSettings def settings: RemoteServerSettings
protected lazy val remoteServerThread = new Thread(new Runnable() { protected lazy val remoteServerThread = new Thread(new Runnable() {

View file

@ -36,9 +36,9 @@ object NetworkEventStream {
} }
class NetworkEventStream(val app: AkkaApplication) { class NetworkEventStream(val app: AkkaApplication) {
import NetworkEventStream._ import NetworkEventStream._
/** /**
* Channel actor with a registry of listeners. * Channel actor with a registry of listeners.
*/ */

View file

@ -30,7 +30,7 @@ class RemoteActorRefProvider(val app: AkkaApplication, val remote: Remote) exten
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise import akka.dispatch.Promise
implicit def _app = app implicit def _app = app
private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]] private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]]

View file

@ -9,27 +9,27 @@ import akka.config.ConfigurationException
import akka.AkkaApplication import akka.AkkaApplication
class RemoteClientSettings(val app: AkkaApplication) { class RemoteClientSettings(val app: AkkaApplication) {
import app.config import app.config
import app.AkkaConfig.TIME_UNIT import app.AkkaConfig.DefaultTimeUnit
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie", "") match { val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie", "") match {
case "" None case "" None
case cookie Some(cookie) case cookie Some(cookie)
} }
val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window", 600), DefaultTimeUnit).toMillis
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 3600), TIME_UNIT) val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 3600), DefaultTimeUnit)
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), DefaultTimeUnit)
val REAP_FUTURES_DELAY = Duration(config.getInt("akka.remote.client.reap-futures-delay", 5), TIME_UNIT) val REAP_FUTURES_DELAY = Duration(config.getInt("akka.remote.client.reap-futures-delay", 5), DefaultTimeUnit)
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576) val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576)
} }
class RemoteServerSettings(val app: AkkaApplication) { class RemoteServerSettings(val app: AkkaApplication) {
import app.config import app.config
import app.AkkaConfig.TIME_UNIT import app.AkkaConfig.DefaultTimeUnit
val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "cluster") val isRemotingEnabled = config.getList("akka.enabled-modules").exists(_ == "cluster")
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576) val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size", 1048576)
val SECURE_COOKIE = config.getString("akka.remote.secure-cookie") val SECURE_COOKIE = config.getString("akka.remote.secure-cookie")
@ -42,11 +42,11 @@ class RemoteServerSettings(val app: AkkaApplication) {
val UNTRUSTED_MODE = config.getBool("akka.remote.server.untrusted-mode", false) val UNTRUSTED_MODE = config.getBool("akka.remote.server.untrusted-mode", false)
val PORT = config.getInt("akka.remote.server.port", 2552) val PORT = config.getInt("akka.remote.server.port", 2552)
val CONNECTION_TIMEOUT = Duration(config.getInt("akka.remote.server.connection-timeout", 100), TIME_UNIT) val CONNECTION_TIMEOUT = Duration(config.getInt("akka.remote.server.connection-timeout", 100), DefaultTimeUnit)
val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) val BACKLOG = config.getInt("akka.remote.server.backlog", 4096)
val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), TIME_UNIT) val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), DefaultTimeUnit)
val EXECUTION_POOL_SIZE = { val EXECUTION_POOL_SIZE = {
val sz = config.getInt("akka.remote.server.execution-pool-size", 16) val sz = config.getInt("akka.remote.server.execution-pool-size", 16)

View file

@ -28,13 +28,13 @@ import com.eaio.uuid.UUID
class Remote(val app: AkkaApplication) extends RemoteService { class Remote(val app: AkkaApplication) extends RemoteService {
import app.config import app.config
import app.AkkaConfig.TIME_UNIT import app.AkkaConfig.DefaultTimeUnit
val shouldCompressData = config.getBool("akka.remote.use-compression", false) val shouldCompressData = config.getBool("akka.remote.use-compression", false)
val remoteDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt val remoteDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt
val hostname = app.hostname val hostname = app.hostname
val port = app.AkkaConfig.REMOTE_SERVER_PORT val port = app.AkkaConfig.RemoteServerPort
val remoteDaemonServiceName = "akka-remote-daemon".intern val remoteDaemonServiceName = "akka-remote-daemon".intern
@ -58,7 +58,7 @@ class Remote(val app: AkkaApplication) extends RemoteService {
case _ //ignore other case _ //ignore other
} }
}), "akka.cluster.RemoteClientLifeCycleListener") }), "akka.cluster.RemoteClientLifeCycleListener")
lazy val eventStream = new NetworkEventStream(app) lazy val eventStream = new NetworkEventStream(app)
lazy val server: RemoteSupport = { lazy val server: RemoteSupport = {
@ -96,9 +96,9 @@ class Remote(val app: AkkaApplication) extends RemoteService {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class RemoteDaemon(val remote: Remote) extends Actor { class RemoteDaemon(val remote: Remote) extends Actor {
import remote._ import remote._
override def preRestart(reason: Throwable, msg: Option[Any]) { override def preRestart(reason: Throwable, msg: Option[Any]) {
EventHandler.debug(this, "RemoteDaemon failed due to [%s] restarting...".format(reason)) EventHandler.debug(this, "RemoteDaemon failed due to [%s] restarting...".format(reason))
} }

View file

@ -176,7 +176,7 @@ abstract class RemoteFailureDetectorBase(remote: Remote, initialConnections: Map
* Simple failure detector that removes the failing connection permanently on first error. * Simple failure detector that removes the failing connection permanently on first error.
*/ */
class RemoveConnectionOnFirstFailureRemoteFailureDetector(_remote: Remote, class RemoveConnectionOnFirstFailureRemoteFailureDetector(_remote: Remote,
initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef]) initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef])
extends RemoteFailureDetectorBase(_remote, initialConnections) { extends RemoteFailureDetectorBase(_remote, initialConnections) {
protected def newState() = State(Long.MinValue, initialConnections) protected def newState() = State(Long.MinValue, initialConnections)
@ -214,8 +214,8 @@ class RemoveConnectionOnFirstFailureRemoteFailureDetector(_remote: Remote,
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class BannagePeriodFailureDetector(_remote: Remote, class BannagePeriodFailureDetector(_remote: Remote,
initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef], initialConnections: Map[InetSocketAddress, ActorRef] = Map.empty[InetSocketAddress, ActorRef],
timeToBan: Duration) timeToBan: Duration)
extends RemoteFailureDetectorBase(_remote, initialConnections) { extends RemoteFailureDetectorBase(_remote, initialConnections) {
// FIXME considering adding a Scheduler event to notify the BannagePeriodFailureDetector unban the banned connection after the timeToBan have exprired // FIXME considering adding a Scheduler event to notify the BannagePeriodFailureDetector unban the banned connection after the timeToBan have exprired

View file

@ -50,7 +50,7 @@ object RemoteEncoder {
trait NettyRemoteClientModule extends RemoteClientModule { trait NettyRemoteClientModule extends RemoteClientModule {
self: RemoteSupport self: RemoteSupport
private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
private val remoteActors = new Index[RemoteAddress, Uuid] private val remoteActors = new Index[RemoteAddress, Uuid]
private val lock = new ReadWriteGuard private val lock = new ReadWriteGuard
@ -58,12 +58,12 @@ trait NettyRemoteClientModule extends RemoteClientModule {
def app: AkkaApplication def app: AkkaApplication
protected[akka] def send[T](message: Any, protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]], senderFuture: Option[Promise[T]],
remoteAddress: InetSocketAddress, remoteAddress: InetSocketAddress,
isOneWay: Boolean, isOneWay: Boolean,
actorRef: ActorRef, actorRef: ActorRef,
loader: Option[ClassLoader]): Option[Promise[T]] = loader: Option[ClassLoader]): Option[Promise[T]] =
withClientFor(remoteAddress, loader) { client withClientFor(remoteAddress, loader) { client
client.send[T](message, senderOption, senderFuture, remoteAddress, isOneWay, actorRef) client.send[T](message, senderOption, senderFuture, remoteAddress, isOneWay, actorRef)
} }
@ -197,7 +197,7 @@ abstract class RemoteClient private[akka] (
isOneWay: Boolean, isOneWay: Boolean,
actorRef: ActorRef): Option[Promise[T]] = { actorRef: ActorRef): Option[Promise[T]] = {
val messageProtocol = serialization.createRemoteMessageProtocolBuilder( val messageProtocol = serialization.createRemoteMessageProtocolBuilder(
Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.TimeoutMillis, Right(message), isOneWay, senderOption).build Some(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), isOneWay, senderOption).build
send(messageProtocol, senderFuture) send(messageProtocol, senderFuture)
} }
@ -475,7 +475,7 @@ class ActiveRemoteClientPipelineFactory(
remoteAddress: InetSocketAddress, remoteAddress: InetSocketAddress,
timer: HashedWheelTimer, timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory { client: ActiveRemoteClient) extends ChannelPipelineFactory {
import settings._ import settings._
def getPipeline: ChannelPipeline = { def getPipeline: ChannelPipeline = {
@ -654,10 +654,10 @@ class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with
} }
class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
val settings = new RemoteServerSettings(app) val settings = new RemoteServerSettings(app)
import settings._ import settings._
val serialization = new RemoteActorSerialization(app) val serialization = new RemoteActorSerialization(app)
val name = "NettyRemoteServer@" + host + ":" + port val name = "NettyRemoteServer@" + host + ":" + port
@ -712,7 +712,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod
trait NettyRemoteServerModule extends RemoteServerModule { trait NettyRemoteServerModule extends RemoteServerModule {
self: RemoteSupport self: RemoteSupport
def app: AkkaApplication def app: AkkaApplication
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
@ -903,7 +903,7 @@ class RemoteServerHandler(
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler { val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
import settings._ import settings._
implicit def app = server.app implicit def app = server.app
// applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY // applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY

View file

@ -27,7 +27,7 @@ import com.eaio.uuid.UUID
*/ */
class ActorSerialization(val app: AkkaApplication) { class ActorSerialization(val app: AkkaApplication) {
implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default implicit val defaultSerializer = akka.serialization.JavaSerializer // Format.Default
val remoteActorSerialization = new RemoteActorSerialization(app) val remoteActorSerialization = new RemoteActorSerialization(app)
def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef = def fromBinary[T <: Actor](bytes: Array[Byte], homeAddress: InetSocketAddress): ActorRef =
@ -70,7 +70,7 @@ class ActorSerialization(val app: AkkaApplication) {
val builder = SerializedActorRefProtocol.newBuilder val builder = SerializedActorRefProtocol.newBuilder
.setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build) .setUuid(UuidProtocol.newBuilder.setHigh(actorRef.uuid.getTime).setLow(actorRef.uuid.getClockSeqAndNode).build)
.setAddress(actorRef.address) .setAddress(actorRef.address)
.setTimeout(app.AkkaConfig.TimeoutMillis) .setTimeout(app.AkkaConfig.ActorTimeoutMillis)
replicationScheme match { replicationScheme match {
case _: Transient | Transient case _: Transient | Transient
@ -104,7 +104,7 @@ class ActorSerialization(val app: AkkaApplication) {
Option(m.receiver.ref), Option(m.receiver.ref),
Left(actorRef.uuid), Left(actorRef.uuid),
actorRef.address, actorRef.address,
app.AkkaConfig.TimeoutMillis, app.AkkaConfig.ActorTimeoutMillis,
Right(m.message), Right(m.message),
false, false,
m.channel match { m.channel match {
@ -201,7 +201,7 @@ class ActorSerialization(val app: AkkaApplication) {
} }
val props = Props(creator = factory, val props = Props(creator = factory,
timeout = if (protocol.hasTimeout) protocol.getTimeout else app.AkkaConfig.TIMEOUT, timeout = if (protocol.hasTimeout) protocol.getTimeout else app.AkkaConfig.ActorTimeout,
supervisor = storedSupervisor //TODO what dispatcher should it use? supervisor = storedSupervisor //TODO what dispatcher should it use?
//TODO what faultHandler should it use? //TODO what faultHandler should it use?
// //
@ -243,7 +243,7 @@ class RemoteActorSerialization(val app: AkkaApplication) {
EventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol)) EventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol))
val ref = RemoteActorRef( val ref = RemoteActorRef(
app, app.remote, app, app.remote,
JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress], JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress],
protocol.getAddress, protocol.getAddress,
loader) loader)
@ -272,7 +272,7 @@ class RemoteActorSerialization(val app: AkkaApplication) {
RemoteActorRefProtocol.newBuilder RemoteActorRefProtocol.newBuilder
.setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress))) .setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress)))
.setAddress(actor.address) .setAddress(actor.address)
.setTimeout(app.AkkaConfig.TimeoutMillis) .setTimeout(app.AkkaConfig.ActorTimeoutMillis)
.build .build
} }

View file

@ -9,7 +9,7 @@ import akka.serialization.SerializeSpec.Person
case class MyMessage(id: Long, name: String, status: Boolean) case class MyMessage(id: Long, name: String, status: Boolean)
class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll { class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll {
val serialization = new ActorSerialization(app) val serialization = new ActorSerialization(app)
"Serializable actor" must { "Serializable actor" must {
@ -76,9 +76,9 @@ class ActorSerializeSpec extends AkkaSpec with BeforeAndAfterAll {
"serialize protobuf" must { "serialize protobuf" must {
"must serialize" ignore { "must serialize" ignore {
val msg = MyMessage(123, "debasish ghosh", true) val msg = MyMessage(123, "debasish ghosh", true)
val ser = new Serialization(app) val ser = new Serialization(app)
val b = ser.serialize(ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build) match { val b = ser.serialize(ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build) match {
case Left(exception) fail(exception) case Left(exception) fail(exception)
case Right(bytes) bytes case Right(bytes) bytes

View file

@ -163,7 +163,7 @@ class FSMHakker(name: String, left: ActorRef, right: ActorRef) extends Actor wit
* Alright, here's our test-harness * Alright, here's our test-harness
*/ */
object DiningHakkersOnFsm { object DiningHakkersOnFsm {
val app = AkkaApplication() val app = AkkaApplication()
def run = { def run = {

View file

@ -153,7 +153,7 @@ class Agent[T](initialValue: T, application: AkkaApplication) {
def sendOff(f: T T): Unit = { def sendOff(f: T T): Unit = {
send((value: T) { send((value: T) {
suspend() suspend()
val pinnedDispatcher = new PinnedDispatcher(null, "agent-send-off", UnboundedMailbox(), application.AkkaConfig.TimeoutMillis) val pinnedDispatcher = new PinnedDispatcher(null, "agent-send-off", UnboundedMailbox(), application.AkkaConfig.ActorTimeoutMillis)
val threadBased = application.createActor(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) val threadBased = application.createActor(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
threadBased ! Update(f) threadBased ! Update(f)
value value
@ -171,7 +171,7 @@ class Agent[T](initialValue: T, application: AkkaApplication) {
val result = new DefaultPromise[T](timeout)(application.dispatcher) val result = new DefaultPromise[T](timeout)(application.dispatcher)
send((value: T) { send((value: T) {
suspend() suspend()
val pinnedDispatcher = new PinnedDispatcher(null, "agent-alter-off", UnboundedMailbox(), application.AkkaConfig.TimeoutMillis) val pinnedDispatcher = new PinnedDispatcher(null, "agent-alter-off", UnboundedMailbox(), application.AkkaConfig.ActorTimeoutMillis)
val threadBased = application.createActor(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) val threadBased = application.createActor(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]]
value value

View file

@ -12,7 +12,7 @@ import akka.routing.{ RoutedProps, Routing }
import akka.AkkaApplication import akka.AkkaApplication
object Pi extends App { object Pi extends App {
val app = AkkaApplication() val app = AkkaApplication()
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)

View file

@ -4,11 +4,11 @@
package akka.tutorial.java.second; package akka.tutorial.java.second;
import static akka.actor.Actors.actorOf;
import static akka.actor.Actors.poisonPill; import static akka.actor.Actors.poisonPill;
import static java.lang.System.currentTimeMillis; import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList; import static java.util.Arrays.asList;
import akka.AkkaApplication;
import akka.routing.RoutedProps; import akka.routing.RoutedProps;
import akka.routing.Routing; import akka.routing.Routing;
import scala.Option; import scala.Option;
@ -24,6 +24,8 @@ import scala.collection.JavaConversions;
import java.util.LinkedList; import java.util.LinkedList;
public class Pi { public class Pi {
private static final AkkaApplication app = new AkkaApplication();
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
Pi pi = new Pi(); Pi pi = new Pi();
@ -99,11 +101,11 @@ public class Pi {
LinkedList<ActorRef> workers = new LinkedList<ActorRef>(); LinkedList<ActorRef> workers = new LinkedList<ActorRef>();
for (int i = 0; i < nrOfWorkers; i++) { for (int i = 0; i < nrOfWorkers; i++) {
ActorRef worker = actorOf(Worker.class, "worker"); ActorRef worker = app.createActor(Worker.class);
workers.add(worker); workers.add(worker);
} }
router = Routing.actorOf(RoutedProps.apply().withConnections(workers).withRoundRobinRouter(), "pi"); router = app.routing().actorOf(RoutedProps.apply().withConnections(workers).withRoundRobinRouter(), "pi");
} }
@Override @Override
@ -159,11 +161,11 @@ public class Pi {
public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) throws Exception { public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) throws Exception {
// create the master // create the master
ActorRef master = actorOf(new UntypedActorFactory() { ActorRef master = app.createActor(new UntypedActorFactory() {
public UntypedActor create() { public UntypedActor create() {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements); return new Master(nrOfWorkers, nrOfMessages, nrOfElements);
} }
}, "worker"); });
// start the calculation // start the calculation
long start = currentTimeMillis(); long start = currentTimeMillis();

View file

@ -10,9 +10,12 @@ import System.{ currentTimeMillis ⇒ now }
import akka.routing.Routing.Broadcast import akka.routing.Routing.Broadcast
import akka.actor.{ Timeout, Channel, Actor, PoisonPill } import akka.actor.{ Timeout, Channel, Actor, PoisonPill }
import akka.routing.{ RoutedProps, Routing } import akka.routing.{ RoutedProps, Routing }
import akka.AkkaApplication
object Pi extends App { object Pi extends App {
val app = AkkaApplication()
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
// ==================== // ====================
@ -50,10 +53,10 @@ object Pi extends App {
var nrOfResults: Int = _ var nrOfResults: Int = _
// create the workers // create the workers
val workers = Vector.fill(nrOfWorkers)(actorOf[Worker]) val workers = Vector.fill(nrOfWorkers)(app.createActor[Worker])
// wrap them with a load-balancing router // wrap them with a load-balancing router
val router = Routing.actorOf(RoutedProps().withConnections(workers).withRoundRobinRouter, "pi") val router = app.routing.actorOf(RoutedProps().withConnections(workers).withRoundRobinRouter, "pi")
// phase 1, can accept a Calculate message // phase 1, can accept a Calculate message
def scatter: Receive = { def scatter: Receive = {
@ -96,7 +99,7 @@ object Pi extends App {
// ================== // ==================
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
// create the master // create the master
val master = actorOf(new Master(nrOfWorkers, nrOfElements, nrOfMessages)) val master = app.createActor(new Master(nrOfWorkers, nrOfElements, nrOfMessages))
//start the calculation //start the calculation
val start = now val start = now

View file

@ -65,7 +65,7 @@ object AkkaBuild extends Build {
lazy val stm = Project( lazy val stm = Project(
id = "akka-stm", id = "akka-stm",
base = file("akka-stm"), base = file("akka-stm"),
dependencies = Seq(actor, testkit % "test"), dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings ++ Seq( settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.stm libraryDependencies ++= Dependencies.stm
) )
@ -74,7 +74,7 @@ object AkkaBuild extends Build {
lazy val remote = Project( lazy val remote = Project(
id = "akka-remote", id = "akka-remote",
base = file("akka-remote"), base = file("akka-remote"),
dependencies = Seq(stm, actorTests % "test->test", testkit % "test"), dependencies = Seq(stm, actorTests % "test->test", testkit % "test->test"),
settings = defaultSettings ++ multiJvmSettings ++ Seq( settings = defaultSettings ++ multiJvmSettings ++ Seq(
libraryDependencies ++= Dependencies.cluster, libraryDependencies ++= Dependencies.cluster,
extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src => extraOptions in MultiJvm <<= (sourceDirectory in MultiJvm) { src =>
@ -108,7 +108,7 @@ object AkkaBuild extends Build {
lazy val http = Project( lazy val http = Project(
id = "akka-http", id = "akka-http",
base = file("akka-http"), base = file("akka-http"),
dependencies = Seq(actor, testkit % "test"), dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings ++ Seq( settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.http libraryDependencies ++= Dependencies.http
) )
@ -117,7 +117,7 @@ object AkkaBuild extends Build {
lazy val slf4j = Project( lazy val slf4j = Project(
id = "akka-slf4j", id = "akka-slf4j",
base = file("akka-slf4j"), base = file("akka-slf4j"),
dependencies = Seq(actor, testkit % "test"), dependencies = Seq(actor, testkit % "test->test"),
settings = defaultSettings ++ Seq( settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.slf4j libraryDependencies ++= Dependencies.slf4j
) )