remove app argument from Deployer
This commit is contained in:
parent
1cdc8752c2
commit
3c61e593f2
12 changed files with 32 additions and 32 deletions
|
|
@ -15,6 +15,7 @@ import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream }
|
||||||
import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter }
|
import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter }
|
||||||
import akka.util.Helpers
|
import akka.util.Helpers
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
|
import com.eaio.uuid.UUID
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for all ActorRef providers to implement.
|
* Interface for all ActorRef providers to implement.
|
||||||
|
|
@ -31,6 +32,8 @@ trait ActorRefProvider {
|
||||||
|
|
||||||
def deathWatch: DeathWatch
|
def deathWatch: DeathWatch
|
||||||
|
|
||||||
|
def nodename: String
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* What deployer will be used to resolve deployment configuration?
|
* What deployer will be used to resolve deployment configuration?
|
||||||
*/
|
*/
|
||||||
|
|
@ -116,6 +119,7 @@ class ActorRefProviderException(message: String) extends AkkaException(message)
|
||||||
*/
|
*/
|
||||||
class LocalActorRefProvider(
|
class LocalActorRefProvider(
|
||||||
private val app: ActorSystem,
|
private val app: ActorSystem,
|
||||||
|
val AkkaConfig: ActorSystem.AkkaConfig,
|
||||||
val root: ActorPath,
|
val root: ActorPath,
|
||||||
val eventStream: EventStream,
|
val eventStream: EventStream,
|
||||||
val dispatcher: MessageDispatcher,
|
val dispatcher: MessageDispatcher,
|
||||||
|
|
@ -123,7 +127,12 @@ class LocalActorRefProvider(
|
||||||
|
|
||||||
val log = Logging(eventStream, this)
|
val log = Logging(eventStream, this)
|
||||||
|
|
||||||
private[akka] val deployer: Deployer = new Deployer(app)
|
val nodename: String = System.getProperty("akka.cluster.nodename") match {
|
||||||
|
case null | "" ⇒ new UUID().toString
|
||||||
|
case value ⇒ value
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] val deployer: Deployer = new Deployer(AkkaConfig, eventStream, nodename)
|
||||||
|
|
||||||
val terminationFuture = new DefaultPromise[ActorSystem.ExitStatus](Timeout.never)(app.dispatcher)
|
val terminationFuture = new DefaultPromise[ActorSystem.ExitStatus](Timeout.never)(app.dispatcher)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -142,11 +142,6 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
||||||
val startTime = System.currentTimeMillis
|
val startTime = System.currentTimeMillis
|
||||||
def uptime = (System.currentTimeMillis - startTime) / 1000
|
def uptime = (System.currentTimeMillis - startTime) / 1000
|
||||||
|
|
||||||
val nodename: String = System.getProperty("akka.cluster.nodename") match {
|
|
||||||
case null | "" ⇒ new UUID().toString
|
|
||||||
case value ⇒ value
|
|
||||||
}
|
|
||||||
|
|
||||||
val address = RemoteAddress(System.getProperty("akka.remote.hostname") match {
|
val address = RemoteAddress(System.getProperty("akka.remote.hostname") match {
|
||||||
case null | "" ⇒ InetAddress.getLocalHost.getHostAddress
|
case null | "" ⇒ InetAddress.getLocalHost.getHostAddress
|
||||||
case value ⇒ value
|
case value ⇒ value
|
||||||
|
|
@ -192,8 +187,8 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
||||||
case Left(e) ⇒ throw e
|
case Left(e) ⇒ throw e
|
||||||
case Right(b) ⇒ b
|
case Right(b) ⇒ b
|
||||||
}
|
}
|
||||||
val params: Array[Class[_]] = Array(classOf[ActorSystem], classOf[ActorPath], classOf[EventStream], classOf[MessageDispatcher], classOf[Scheduler])
|
val params: Array[Class[_]] = Array(classOf[ActorSystem], classOf[AkkaConfig], classOf[ActorPath], classOf[EventStream], classOf[MessageDispatcher], classOf[Scheduler])
|
||||||
val args: Array[AnyRef] = Array(this, root, eventStream, dispatcher, scheduler)
|
val args: Array[AnyRef] = Array(this, AkkaConfig, root, eventStream, dispatcher, scheduler)
|
||||||
|
|
||||||
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, params, args) match {
|
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, params, args) match {
|
||||||
case Left(e) ⇒ throw e
|
case Left(e) ⇒ throw e
|
||||||
|
|
@ -213,9 +208,6 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
|
||||||
eventStream.start(provider)
|
eventStream.start(provider)
|
||||||
eventStream.startDefaultLoggers(provider, AkkaConfig)
|
eventStream.startDefaultLoggers(provider, AkkaConfig)
|
||||||
|
|
||||||
// TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor
|
|
||||||
val deployer = new Deployer(this)
|
|
||||||
|
|
||||||
// TODO think about memory consistency effects when doing funky stuff inside constructor
|
// TODO think about memory consistency effects when doing funky stuff inside constructor
|
||||||
val typedActor = new TypedActor(this)
|
val typedActor = new TypedActor(this)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,9 +5,7 @@
|
||||||
package akka.actor
|
package akka.actor
|
||||||
|
|
||||||
import collection.immutable.Seq
|
import collection.immutable.Seq
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.actor.DeploymentConfig._
|
import akka.actor.DeploymentConfig._
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
|
|
@ -15,6 +13,7 @@ import akka.config.{ Configuration, ConfigurationException }
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import akka.remote.RemoteAddress
|
import akka.remote.RemoteAddress
|
||||||
|
import akka.event.EventStream
|
||||||
|
|
||||||
trait ActorDeployer {
|
trait ActorDeployer {
|
||||||
private[akka] def init(deployments: Seq[Deploy]): Unit
|
private[akka] def init(deployments: Seq[Deploy]): Unit
|
||||||
|
|
@ -34,10 +33,10 @@ trait ActorDeployer {
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class Deployer(val app: ActorSystem) extends ActorDeployer {
|
class Deployer(val AkkaConfig: ActorSystem.AkkaConfig, val eventStream: EventStream, val nodename: String) extends ActorDeployer {
|
||||||
|
|
||||||
val deploymentConfig = new DeploymentConfig(app)
|
val deploymentConfig = new DeploymentConfig(nodename)
|
||||||
val log = Logging(app.eventStream, this)
|
val log = Logging(eventStream, this)
|
||||||
|
|
||||||
val instance: ActorDeployer = {
|
val instance: ActorDeployer = {
|
||||||
val deployer = new LocalDeployer()
|
val deployer = new LocalDeployer()
|
||||||
|
|
@ -86,7 +85,7 @@ class Deployer(val app: ActorSystem) extends ActorDeployer {
|
||||||
|
|
||||||
private[akka] def pathsInConfig: List[String] = {
|
private[akka] def pathsInConfig: List[String] = {
|
||||||
val deploymentPath = "akka.actor.deployment"
|
val deploymentPath = "akka.actor.deployment"
|
||||||
app.config.getSection(deploymentPath) match {
|
AkkaConfig.config.getSection(deploymentPath) match {
|
||||||
case None ⇒ Nil
|
case None ⇒ Nil
|
||||||
case Some(pathConfig) ⇒
|
case Some(pathConfig) ⇒
|
||||||
pathConfig.map.keySet
|
pathConfig.map.keySet
|
||||||
|
|
@ -98,7 +97,7 @@ class Deployer(val app: ActorSystem) extends ActorDeployer {
|
||||||
/**
|
/**
|
||||||
* Lookup deployment in 'akka.conf' configuration file.
|
* Lookup deployment in 'akka.conf' configuration file.
|
||||||
*/
|
*/
|
||||||
private[akka] def lookupInConfig(path: String, configuration: Configuration = app.config): Option[Deploy] = {
|
private[akka] def lookupInConfig(path: String, configuration: Configuration = AkkaConfig.config): Option[Deploy] = {
|
||||||
import akka.util.ReflectiveAccess.{ createInstance, emptyArguments, emptyParams, getClassFor }
|
import akka.util.ReflectiveAccess.{ createInstance, emptyArguments, emptyParams, getClassFor }
|
||||||
|
|
||||||
// --------------------------------
|
// --------------------------------
|
||||||
|
|
|
||||||
|
|
@ -217,13 +217,13 @@ object DeploymentConfig {
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class DeploymentConfig(val app: ActorSystem) {
|
class DeploymentConfig(val nodename: String) {
|
||||||
|
|
||||||
import DeploymentConfig._
|
import DeploymentConfig._
|
||||||
|
|
||||||
case class ClusterScope(preferredNodes: Iterable[Home] = Vector(Node(app.nodename)), replication: ReplicationScheme = Transient) extends Scope
|
case class ClusterScope(preferredNodes: Iterable[Home] = Vector(Node(nodename)), replication: ReplicationScheme = Transient) extends Scope
|
||||||
|
|
||||||
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == app.nodename)
|
def isHomeNode(homes: Iterable[Home]): Boolean = homes exists (home ⇒ nodeNameFor(home) == nodename)
|
||||||
|
|
||||||
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
|
def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match {
|
||||||
case Deploy(_, _, _, _, ClusterScope(_, replicationScheme)) ⇒ Some(replicationScheme)
|
case Deploy(_, _, _, _, ClusterScope(_, replicationScheme)) ⇒ Some(replicationScheme)
|
||||||
|
|
|
||||||
|
|
@ -103,7 +103,7 @@ class NodeAddress(val clusterName: String, val nodeName: String) {
|
||||||
*/
|
*/
|
||||||
object NodeAddress {
|
object NodeAddress {
|
||||||
def apply(clusterName: String, nodeName: String): NodeAddress = new NodeAddress(clusterName, nodeName)
|
def apply(clusterName: String, nodeName: String): NodeAddress = new NodeAddress(clusterName, nodeName)
|
||||||
def apply(app: ActorSystem): NodeAddress = new NodeAddress(app.AkkaConfig.ClusterName, app.nodename)
|
def apply(app: ActorSystem): NodeAddress = new NodeAddress(app.AkkaConfig.ClusterName, app.provider.nodename)
|
||||||
|
|
||||||
def unapply(other: Any) = other match {
|
def unapply(other: Any) = other match {
|
||||||
case address: NodeAddress ⇒ Some((address.clusterName, address.nodeName))
|
case address: NodeAddress ⇒ Some((address.clusterName, address.nodeName))
|
||||||
|
|
|
||||||
|
|
@ -27,7 +27,7 @@ import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher }
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class Remote(val app: ActorSystem) {
|
class Remote(val app: ActorSystem, val nodename: String) {
|
||||||
|
|
||||||
val log = Logging(app, this)
|
val log = Logging(app, this)
|
||||||
|
|
||||||
|
|
@ -35,8 +35,6 @@ class Remote(val app: ActorSystem) {
|
||||||
import app.config
|
import app.config
|
||||||
import app.AkkaConfig._
|
import app.AkkaConfig._
|
||||||
|
|
||||||
val nodename = app.nodename
|
|
||||||
|
|
||||||
// TODO move to AkkaConfig?
|
// TODO move to AkkaConfig?
|
||||||
val shouldCompressData = config.getBool("akka.remote.use-compression", false)
|
val shouldCompressData = config.getBool("akka.remote.use-compression", false)
|
||||||
val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt
|
val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ import akka.event.EventStream
|
||||||
*/
|
*/
|
||||||
class RemoteActorRefProvider(
|
class RemoteActorRefProvider(
|
||||||
val app: ActorSystem,
|
val app: ActorSystem,
|
||||||
|
val AkkaConfig: ActorSystem.AkkaConfig,
|
||||||
val root: ActorPath,
|
val root: ActorPath,
|
||||||
val eventStream: EventStream,
|
val eventStream: EventStream,
|
||||||
val dispatcher: MessageDispatcher,
|
val dispatcher: MessageDispatcher,
|
||||||
|
|
@ -40,12 +41,13 @@ class RemoteActorRefProvider(
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import akka.dispatch.Promise
|
import akka.dispatch.Promise
|
||||||
|
|
||||||
val local = new LocalActorRefProvider(app, root, eventStream, dispatcher, scheduler)
|
val local = new LocalActorRefProvider(app, AkkaConfig, root, eventStream, dispatcher, scheduler)
|
||||||
def deathWatch = local.deathWatch
|
def deathWatch = local.deathWatch
|
||||||
def guardian = local.guardian
|
def guardian = local.guardian
|
||||||
def systemGuardian = local.systemGuardian
|
def systemGuardian = local.systemGuardian
|
||||||
|
def nodename = local.nodename
|
||||||
|
|
||||||
val remote = new Remote(app)
|
val remote = new Remote(app, nodename)
|
||||||
|
|
||||||
private val actors = new ConcurrentHashMap[String, AnyRef]
|
private val actors = new ConcurrentHashMap[String, AnyRef]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ object DirectRoutedRemoteActorMultiJvmSpec {
|
||||||
|
|
||||||
class SomeActor extends Actor with Serializable {
|
class SomeActor extends Actor with Serializable {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "identify" ⇒ sender ! app.nodename
|
case "identify" ⇒ sender ! app.provider.nodename
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ object NewRemoteActorMultiJvmSpec {
|
||||||
|
|
||||||
class SomeActor extends Actor with Serializable {
|
class SomeActor extends Actor with Serializable {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "identify" ⇒ sender ! app.nodename
|
case "identify" ⇒ sender ! app.provider.nodename
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ object RandomRoutedRemoteActorMultiJvmSpec {
|
||||||
val NrOfNodes = 4
|
val NrOfNodes = 4
|
||||||
class SomeActor extends Actor with Serializable {
|
class SomeActor extends Actor with Serializable {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "hit" ⇒ sender ! app.nodename
|
case "hit" ⇒ sender ! app.provider.nodename
|
||||||
case "end" ⇒ self.stop()
|
case "end" ⇒ self.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ object RoundRobinRoutedRemoteActorMultiJvmSpec {
|
||||||
val NrOfNodes = 4
|
val NrOfNodes = 4
|
||||||
class SomeActor extends Actor with Serializable {
|
class SomeActor extends Actor with Serializable {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "hit" ⇒ sender ! app.nodename
|
case "hit" ⇒ sender ! app.provider.nodename
|
||||||
case "end" ⇒ self.stop()
|
case "end" ⇒ self.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ object ScatterGatherRoutedRemoteActorMultiJvmSpec {
|
||||||
val NrOfNodes = 4
|
val NrOfNodes = 4
|
||||||
class SomeActor extends Actor with Serializable {
|
class SomeActor extends Actor with Serializable {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "hit" ⇒ sender ! app.nodename
|
case "hit" ⇒ sender ! app.provider.nodename
|
||||||
case "end" ⇒ self.stop()
|
case "end" ⇒ self.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue