Moving in Deployer udner the provider
This commit is contained in:
parent
a75310a181
commit
d8d322c6e8
7 changed files with 25 additions and 25 deletions
|
|
@ -13,7 +13,7 @@ class DeployerSpec extends AkkaSpec {
|
|||
|
||||
"A Deployer" must {
|
||||
"be able to parse 'akka.actor.deployment._' config elements" in {
|
||||
val deployment = app.deployer.lookupInConfig("service-ping")
|
||||
val deployment = app.provider.deployer.lookupInConfig("service-ping")
|
||||
deployment must be('defined)
|
||||
|
||||
deployment must equal(Some(
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ class LocalActorRefProviderSpec extends AkkaSpec {
|
|||
|
||||
(0 until 100) foreach { i ⇒ // 100 concurrent runs
|
||||
val address = "new-actor" + i
|
||||
implicit val timeout = Timeout(30 seconds)
|
||||
implicit val timeout = Timeout(5 seconds)
|
||||
((1 to 4) map { _ ⇒ Future { provider.actorOf(Props(c ⇒ { case _ ⇒ }), app.guardian, address) } }).map(_.get).distinct.size must be(1)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
|
|||
"be able to shut down its instance" in {
|
||||
val address = "round-robin-0"
|
||||
|
||||
app.deployer.deploy(
|
||||
app.provider.deployer.deploy(
|
||||
Deploy(
|
||||
address,
|
||||
None,
|
||||
|
|
@ -52,7 +52,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
|
|||
"deliver messages in a round robin fashion" in {
|
||||
val address = "round-robin-1"
|
||||
|
||||
app.deployer.deploy(
|
||||
app.provider.deployer.deploy(
|
||||
Deploy(
|
||||
address,
|
||||
None,
|
||||
|
|
@ -97,7 +97,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
|
|||
"deliver a broadcast message using the !" in {
|
||||
val address = "round-robin-2"
|
||||
|
||||
app.deployer.deploy(
|
||||
app.provider.deployer.deploy(
|
||||
Deploy(
|
||||
address,
|
||||
None,
|
||||
|
|
@ -132,7 +132,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
|
|||
"be able to shut down its instance" in {
|
||||
val address = "random-0"
|
||||
|
||||
app.deployer.deploy(
|
||||
app.provider.deployer.deploy(
|
||||
Deploy(
|
||||
address,
|
||||
None,
|
||||
|
|
@ -166,7 +166,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
|
|||
"deliver messages in a random fashion" in {
|
||||
val address = "random-1"
|
||||
|
||||
app.deployer.deploy(
|
||||
app.provider.deployer.deploy(
|
||||
Deploy(
|
||||
address,
|
||||
None,
|
||||
|
|
@ -211,7 +211,7 @@ class ConfiguredLocalRoutingSpec extends AkkaSpec {
|
|||
"deliver a broadcast message using the !" in {
|
||||
val address = "random-2"
|
||||
|
||||
app.deployer.deploy(
|
||||
app.provider.deployer.deploy(
|
||||
Deploy(
|
||||
address,
|
||||
None,
|
||||
|
|
|
|||
|
|
@ -194,9 +194,6 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
|
|||
// TODO think about memory consistency effects when doing funky stuff inside constructor
|
||||
val deadLetters = new DeadLetterActorRef(this)
|
||||
|
||||
// TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor
|
||||
val deployer = new Deployer(this)
|
||||
|
||||
val deathWatch = provider.createDeathWatch()
|
||||
|
||||
// TODO think about memory consistency effects when doing funky stuff inside constructor
|
||||
|
|
|
|||
|
|
@ -25,6 +25,11 @@ trait ActorRefProvider {
|
|||
|
||||
def actorFor(address: String): Option[ActorRef]
|
||||
|
||||
/**
|
||||
* What deployer will be used to resolve deployment configuration?
|
||||
*/
|
||||
private[akka] def deployer: Deployer
|
||||
|
||||
private[akka] def actorOf(props: Props, supervisor: ActorRef, address: String, systemService: Boolean): ActorRef
|
||||
|
||||
private[akka] def evict(address: String): Boolean
|
||||
|
|
@ -92,6 +97,8 @@ class ActorRefProviderException(message: String) extends AkkaException(message)
|
|||
*/
|
||||
class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
|
||||
|
||||
private[akka] val deployer: Deployer = new Deployer(app)
|
||||
|
||||
val terminationFuture = new DefaultPromise[AkkaApplication.ExitStatus](Timeout.never)(app.dispatcher)
|
||||
|
||||
/**
|
||||
|
|
@ -152,7 +159,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
|
|||
actors.putIfAbsent(address, newFuture) match {
|
||||
case null ⇒
|
||||
val actor: ActorRef = try {
|
||||
(if (systemService) None else app.deployer.lookupDeploymentFor(address)) match { // see if the deployment already exists, if so use it, if not create actor
|
||||
(if (systemService) None else deployer.lookupDeployment(address)) match { // see if the deployment already exists, if so use it, if not create actor
|
||||
|
||||
// create a local actor
|
||||
case None | Some(DeploymentConfig.Deploy(_, _, DeploymentConfig.Direct, _, _, DeploymentConfig.LocalScope)) ⇒
|
||||
|
|
|
|||
|
|
@ -36,8 +36,8 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
|
|||
|
||||
val deploymentConfig = new DeploymentConfig(app)
|
||||
|
||||
lazy val instance: ActorDeployer = {
|
||||
val deployer = LocalDeployer
|
||||
val instance: ActorDeployer = {
|
||||
val deployer = new LocalDeployer()
|
||||
deployer.init(deploymentsInConfig)
|
||||
deployer
|
||||
}
|
||||
|
|
@ -323,20 +323,14 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object LocalDeployer extends ActorDeployer {
|
||||
class LocalDeployer extends ActorDeployer {
|
||||
private val deployments = new ConcurrentHashMap[String, Deploy]
|
||||
|
||||
private[akka] def init(deployments: Seq[Deploy]) {
|
||||
deployments foreach (deploy(_)) // deploy
|
||||
}
|
||||
private[akka] def init(deployments: Seq[Deploy]): Unit = deployments foreach deploy // deploy
|
||||
|
||||
private[akka] def shutdown() {
|
||||
deployments.clear() //TODO do something else/more?
|
||||
}
|
||||
private[akka] def shutdown(): Unit = deployments.clear() //TODO do something else/more?
|
||||
|
||||
private[akka] def deploy(deployment: Deploy) {
|
||||
deployments.putIfAbsent(deployment.address, deployment)
|
||||
}
|
||||
private[akka] def deploy(deployment: Deploy): Unit = deployments.putIfAbsent(deployment.address, deployment)
|
||||
|
||||
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = Option(deployments.get(address))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,6 +44,8 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
|
|||
private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime
|
||||
private[akka] def terminationFuture = local.terminationFuture
|
||||
|
||||
private[akka] def deployer: Deployer = local.deployer
|
||||
|
||||
def defaultDispatcher = app.dispatcher
|
||||
def defaultTimeout = app.AkkaConfig.ActorTimeout
|
||||
|
||||
|
|
@ -55,7 +57,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
|
|||
actors.putIfAbsent(address, newFuture) match { // we won the race -- create the actor and resolve the future
|
||||
case null ⇒
|
||||
val actor: ActorRef = try {
|
||||
app.deployer.lookupDeploymentFor(address) match {
|
||||
deployer.lookupDeploymentFor(address) match {
|
||||
case Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, failureDetectorType, DeploymentConfig.RemoteScope(remoteAddresses))) ⇒
|
||||
|
||||
// FIXME move to AccrualFailureDetector as soon as we have the Gossiper up and running and remove the option to select impl in the akka.conf file since we only have one
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue