diff --git a/.history b/.history new file mode 100644 index 0000000000..209db6b195 --- /dev/null +++ b/.history @@ -0,0 +1,2 @@ +update +reload diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index 423098a49a..b2e41a02c7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -18,6 +18,7 @@ class DeployerSpec extends WordSpec with MustMatchers { deployment must equal(Some( Deploy( "service-ping", + None, LeastCPU, Clustered( Vector(Node("node1")), diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 6015272a74..6c4b71ead1 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -356,7 +356,7 @@ object Actor extends ListenerManagement { * JAVA API */ def actorOf[T <: Actor](creator: Creator[T]): ActorRef = - actorOf(creator, new UUID().toString) + actorOf(creator, newUuid().toString) /** * Creates an ActorRef out of the Actor. Allows you to pass in a factory (Creator) @@ -379,7 +379,7 @@ object Actor extends ListenerManagement { } def localActorOf[T <: Actor](clazz: Class[T]): ActorRef = { - newLocalActorRef(clazz, new UUID().toString) + newLocalActorRef(clazz, newUuid().toString) } def localActorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = { @@ -387,7 +387,7 @@ object Actor extends ListenerManagement { } def localActorOf[T <: Actor](factory: ⇒ T): ActorRef = { - new LocalActorRef(() ⇒ factory, new UUID().toString) + new LocalActorRef(() ⇒ factory, newUuid().toString) } def localActorOf[T <: Actor](factory: ⇒ T, address: String): ActorRef = { @@ -432,8 +432,8 @@ object Actor extends ListenerManagement { case None ⇒ // it is not -> create it try { Deployer.deploymentFor(address) match { - case Deploy(_, router, Local) ⇒ actorFactory() // create a local actor - case deploy ⇒ newClusterActorRef(actorFactory, address, deploy) + case Deploy(_, _, router, Local) ⇒ actorFactory() // create a local actor + case deploy ⇒ newClusterActorRef(actorFactory, address, deploy) } } catch { case e: DeploymentException ⇒ @@ -463,32 +463,26 @@ object Actor extends ListenerManagement { }, address) } - private def newClusterActorRef(factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = { + private[akka] def newClusterActorRef(factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = deploy match { - case Deploy( - configAdress, router, - Clustered( - preferredHomeNodes, - replicas, - replication)) ⇒ + case Deploy(configAddress, recipe, router, Clustered(preferredHomeNodes, replicas, replication)) ⇒ ClusterModule.ensureEnabled() - if (configAdress != address) throw new IllegalStateException( - "Deployment config for [" + address + "] is wrong [" + deploy + "]") - if (!Actor.remote.isRunning) throw new IllegalStateException( - "Remote server is not running") + if (configAddress != address) throw new IllegalStateException("Deployment config for [" + address + "] is wrong [" + deploy + "]") + if (!remote.isRunning) throw new IllegalStateException("Remote server is not running") val isHomeNode = DeploymentConfig.isHomeNode(preferredHomeNodes) - val nrOfReplicas = replicas.factor - val serializer: Serializer = - Serialization.serializerFor(this.getClass) + val serializer = recipe match { + case Some(r) ⇒ Serialization.serializerFor(r.implementationClass) + case None ⇒ Serialization.serializerFor(classOf[Actor]) //TODO revisit this decision of default + } def storeActorAndGetClusterRef(replicationScheme: ReplicationScheme, serializer: Serializer): ActorRef = { // add actor to cluster registry (if not already added) - if (!cluster.isClustered(address)) - cluster.store(address, factory, nrOfReplicas, replicationScheme, false, serializer) + if (!cluster.isClustered(address)) //WARNING!!!! Racy + cluster.store(address, factory, replicas.factor, replicationScheme, false, serializer) // remote node (not home node), check out as ClusterActorRef cluster.ref(address, DeploymentConfig.routerTypeFor(router)) @@ -503,21 +497,17 @@ object Actor extends ListenerManagement { "Can't replicate an actor [" + address + "] configured with another router than \"direct\" - found [" + router + "]") if (isHomeNode) { // stateful actor's home node - cluster - .use(address, serializer) + cluster.use(address, serializer) .getOrElse(throw new ConfigurationException( "Could not check out actor [" + address + "] from cluster registry as a \"local\" actor")) - } else { storeActorAndGetClusterRef(replication, serializer) } } case invalid ⇒ throw new IllegalActorStateException( - "Could not create actor with address [" + address + - "], not bound to a valid deployment scheme [" + invalid + "]") + "Could not create actor with address [" + address + "], not bound to a valid deployment scheme [" + invalid + "]") } - } } /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index e4fe387fcf..29da2fac04 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -32,8 +32,6 @@ case class ActorUnregistered(address: String, actor: ActorRef, typedActor: Optio * @author Jonas Bonér */ private[actor] final class ActorRegistry private[actor] () extends ListenerManagement { - - //private val isClusterEnabled = ReflectiveAccess.isClusterEnabled private val actorsByAddress = new ConcurrentHashMap[String, ActorRef] private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef] private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef] diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index c0e9d09430..55f1ab23a1 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -10,67 +10,53 @@ import java.util.concurrent.ConcurrentHashMap import akka.event.EventHandler import akka.actor.DeploymentConfig._ -import akka.config.{ ConfigurationException, Config } import akka.util.ReflectiveAccess._ import akka.AkkaException +import akka.serialization.{ Serializer, Serialization } +import akka.util.ReflectiveAccess +import akka.config.{ Configuration, ConfigurationException, Config } + +trait ActorDeployer { + private[akka] def init(deployments: Seq[Deploy]): Unit + private[akka] def shutdown(): Unit //TODO Why should we have "shutdown", should be crash only? + private[akka] def deploy(deployment: Deploy): Unit + private[akka] def lookupDeploymentFor(address: String): Option[Deploy] + private[akka] def deploy(deployment: Seq[Deploy]): Unit = deployment foreach (deploy(_)) +} /** * Deployer maps actor deployments to actor addresses. * * @author Jonas Bonér */ -object Deployer { +object Deployer extends ActorDeployer { val defaultAddress = Host(Config.hostname) - lazy val instance: ClusterModule.ClusterDeployer = { - val deployer = - if (ClusterModule.isEnabled) ClusterModule.clusterDeployer - else LocalDeployer + lazy val instance: ActorDeployer = { + val deployer = if (ClusterModule.isEnabled) ClusterModule.clusterDeployer else LocalDeployer deployer.init(deploymentsInConfig) deployer } - def start() { - instance.toString - } + def start(): Unit = instance.toString //Force evaluation - def shutdown() { - instance.shutdown() - } + private[akka] def init(deployments: Seq[Deploy]) = instance.init(deployments) - def deploy(deployment: Deploy) { - if (deployment eq null) throw new IllegalArgumentException("Deploy can not be null") - val address = deployment.address - Address.validate(address) - instance.deploy(deployment) - } + def shutdown(): Unit = instance.shutdown() //TODO Why should we have "shutdown", should be crash only? - def deploy(deployment: Seq[Deploy]) { - deployment foreach (deploy(_)) - } - - /** - * Undeploy is idemponent. E.g. safe to invoke multiple times. - */ - def undeploy(deployment: Deploy) { - instance.undeploy(deployment) - } - - def undeployAll() { - instance.undeployAll() - } + def deploy(deployment: Deploy): Unit = instance.deploy(deployment) def isLocal(deployment: Deploy): Boolean = deployment match { - case Deploy(_, _, Local) ⇒ true - case _ ⇒ false + case Deploy(_, _, _, Local) | Deploy(_, _, _, _: Local) ⇒ true + case _ ⇒ false } - def isClustered(deployment: Deploy): Boolean = isLocal(deployment) + def isClustered(deployment: Deploy): Boolean = !isLocal(deployment) - def isLocal(address: String): Boolean = isLocal(deploymentFor(address)) + def isLocal(address: String): Boolean = isLocal(deploymentFor(address)) //TODO Should this throw exception if address not found? - def isClustered(address: String): Boolean = !isLocal(address) + def isClustered(address: String): Boolean = !isLocal(address) //TODO Should this throw exception if address not found? /** * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound. @@ -87,15 +73,13 @@ object Deployer { if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_? else { - - val newDeployment = - try { - lookupInConfig(address) - } catch { - case e: ConfigurationException ⇒ - EventHandler.error(e, this, e.getMessage) - throw e - } + val newDeployment = try { + lookupInConfig(address) + } catch { + case e: ConfigurationException ⇒ + EventHandler.error(e, this, e.getMessage) + throw e + } newDeployment foreach { d ⇒ if (d eq null) { @@ -131,20 +115,21 @@ object Deployer { /** * Lookup deployment in 'akka.conf' configuration file. */ - private[akka] def lookupInConfig(address: String): Option[Deploy] = { + private[akka] def lookupInConfig(address: String, configuration: Configuration = Config.config): Option[Deploy] = { + import akka.util.ReflectiveAccess.{ createInstance, emptyArguments, emptyParams, getClassFor } // -------------------------------- // akka.actor.deployment.
// -------------------------------- val addressPath = "akka.actor.deployment." + address - Config.config.getSection(addressPath) match { - case None ⇒ Some(Deploy(address, Direct, Local)) + configuration.getSection(addressPath) match { + case None ⇒ Some(Deploy(address, None, Direct, Local)) case Some(addressConfig) ⇒ // -------------------------------- // akka.actor.deployment.
.router // -------------------------------- - val router = addressConfig.getString("router", "direct") match { + val router: Routing = addressConfig.getString("router", "direct") match { case "direct" ⇒ Direct case "round-robin" ⇒ RoundRobin case "random" ⇒ Random @@ -152,14 +137,21 @@ object Deployer { case "least-ram" ⇒ LeastRAM case "least-messages" ⇒ LeastMessages case customRouterClassName ⇒ - val customRouter = try { - Class.forName(customRouterClassName).newInstance.asInstanceOf[AnyRef] - } catch { - case e ⇒ throw new ConfigurationException( + createInstance[AnyRef](customRouterClassName, emptyParams, emptyArguments).fold( + e ⇒ throw new ConfigurationException( "Config option [" + addressPath + ".router] needs to be one of " + - "[\"direct\", \"round-robin\", \"random\", \"least-cpu\", \"least-ram\", \"least-messages\" or FQN of router class]") - } - CustomRouter(customRouter) + "[\"direct\", \"round-robin\", \"random\", \"least-cpu\", \"least-ram\", \"least-messages\" or FQN of router class]", e), + CustomRouter(_)) + } + + val recipe: Option[ActorRecipe] = addressConfig.getSection("create-as") map { section ⇒ + val implementationClass = section.getString("implementation-class") match { + case Some(impl) ⇒ + getClassFor[Actor](impl).fold(e ⇒ throw new ConfigurationException("Config option [" + addressPath + ".create-as.implementation-class] load failed", e), identity) + case None ⇒ throw new ConfigurationException("Config option [" + addressPath + ".create-as.implementation-class] is missing") + } + + ActorRecipe(implementationClass) } // -------------------------------- @@ -167,7 +159,7 @@ object Deployer { // -------------------------------- addressConfig.getSection("clustered") match { case None ⇒ - Some(Deploy(address, router, Local)) // deploy locally + Some(Deploy(address, recipe, router, Local)) // deploy locally case Some(clusteredConfig) ⇒ @@ -227,7 +219,7 @@ object Deployer { // -------------------------------- clusteredConfig.getSection("replication") match { case None ⇒ - Some(Deploy(address, router, Clustered(preferredNodes, replicationFactor, Transient))) + Some(Deploy(address, recipe, router, Clustered(preferredNodes, replicationFactor, Transient))) case Some(replicationConfig) ⇒ val storage = replicationConfig.getString("storage", "transaction-log") match { @@ -246,17 +238,14 @@ object Deployer { ".clustered.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" + unknown + "]") } - Some(Deploy(address, router, Clustered(preferredNodes, replicationFactor, Replication(storage, strategy)))) + Some(Deploy(address, recipe, router, Clustered(preferredNodes, replicationFactor, Replication(storage, strategy)))) } } } } private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = { - val e = new DeploymentAlreadyBoundException( - "Address [" + deployment.address + - "] already bound to [" + deployment + - "]. You have to invoke 'undeploy(deployment) first.") + val e = new DeploymentAlreadyBoundException("Address [" + deployment.address + "] already bound to [" + deployment + "]") EventHandler.error(e, this, e.getMessage) throw e } @@ -273,29 +262,30 @@ object Deployer { * * @author Jonas Bonér */ -object LocalDeployer { +object LocalDeployer extends ActorDeployer { private val deployments = new ConcurrentHashMap[String, Deploy] - private[akka] def init(deployments: List[Deploy]) { + private[akka] def init(deployments: Seq[Deploy]) { EventHandler.info(this, "Deploying actors locally [\n\t%s\n]" format deployments.mkString("\n\t")) deployments foreach (deploy(_)) // deploy } private[akka] def shutdown() { - undeployAll() - deployments.clear() + deployments.clear() //TODO do something else/more? } private[akka] def deploy(deployment: Deploy) { - if (deployments.putIfAbsent(deployment.address, deployment) != deployment) { - //Deployer.throwDeploymentBoundException(deployment) // FIXME uncomment this and fix the issue with multiple deployments - } + deployments.putIfAbsent(deployment.address, deployment) /* match { + case null ⇒ + deployment match { + case Deploy(address, Some(recipe), routing, _) ⇒ Actor.actorOf(recipe.implementationClass, address).start() //FIXME use routing? + case _ ⇒ + } + case `deployment` ⇒ //Already deployed TODO should it be like this? + case preexists ⇒ Deployer.throwDeploymentBoundException(deployment) + }*/ } - private[akka] def undeploy(deployment: Deploy): Unit = deployments.remove(deployment.address) - - private[akka] def undeployAll(): Unit = deployments.clear() - private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = Option(deployments.get(address)) } @@ -307,12 +297,13 @@ object LocalDeployer { object Address { private val validAddressPattern = java.util.regex.Pattern.compile("[0-9a-zA-Z\\-\\_\\$\\.]+") - def validate(address: String): Unit = + def validate(address: String) { if (!validAddressPattern.matcher(address).matches) { val e = new IllegalArgumentException("Address [" + address + "] is not valid, need to follow pattern: " + validAddressPattern.pattern) EventHandler.error(e, this, e.getMessage) throw e } + } } class DeploymentException private[akka] (message: String) extends AkkaException(message) diff --git a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala index 43d895d9a5..c7dd94bbdb 100644 --- a/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala +++ b/akka-actor/src/main/scala/akka/actor/DeploymentConfig.scala @@ -21,8 +21,16 @@ object DeploymentConfig { // -------------------------------- case class Deploy( address: String, + recipe: Option[ActorRecipe], routing: Routing = Direct, - scope: Scope = Local) + scope: Scope = Local) { + Address.validate(address) + } + + // -------------------------------- + // --- Actor Recipe + // -------------------------------- + case class ActorRecipe(implementationClass: Class[_ <: Actor]) //TODO Add ActorConfiguration here // -------------------------------- // --- Routing @@ -158,7 +166,7 @@ object DeploymentConfig { } def replicationSchemeFor(deployment: Deploy): Option[ReplicationScheme] = deployment match { - case Deploy(_, _, Clustered(_, _, replicationScheme)) ⇒ Some(replicationScheme) + case Deploy(_, _, _, Clustered(_, _, replicationScheme)) ⇒ Some(replicationScheme) case _ ⇒ None } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 3f0f33f01c..9f1311a349 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -50,14 +50,7 @@ object ReflectiveAccess { None } - lazy val clusterDeployerInstance: Option[ClusterDeployer] = getObjectFor("akka.cluster.ClusterDeployer$") match { - case Right(value) ⇒ Some(value) - case Left(exception) ⇒ - EventHandler.debug(this, exception.toString) - None - } - - lazy val serializerClass: Option[Class[_]] = getClassFor("akka.serialization.Serializer") match { + lazy val clusterDeployerInstance: Option[ActorDeployer] = getObjectFor("akka.cluster.ClusterDeployer$") match { case Right(value) ⇒ Some(value) case Left(exception) ⇒ EventHandler.debug(this, exception.toString) @@ -76,7 +69,7 @@ object ReflectiveAccess { clusterInstance.get.node } - lazy val clusterDeployer: ClusterDeployer = { + lazy val clusterDeployer: ActorDeployer = { ensureEnabled() clusterDeployerInstance.get } @@ -86,15 +79,6 @@ object ReflectiveAccess { transactionLogInstance.get } - type ClusterDeployer = { - def init(deployments: List[Deploy]) - def shutdown() - def deploy(deployment: Deploy) - def undeploy(deployment: Deploy) - def undeployAll() - def lookupDeploymentFor(address: String): Option[Deploy] - } - type Cluster = { def node: ClusterNode } @@ -104,12 +88,6 @@ object ReflectiveAccess { def dequeue: MessageInvocation } - // FIXME: remove? - type Serializer = { - def toBinary(obj: AnyRef): Array[Byte] - def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef - } - type TransactionLogObject = { def newLogFor( id: String, diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 1d3a984ff1..0e7157fa5e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1306,7 +1306,7 @@ class DefaultClusterNode private[akka] ( if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor Deployer.deploymentFor(actorAddress.get) match { - case Deploy(_, _, Clustered(nodes, _, _)) ⇒ + case Deploy(_, _, _, Clustered(nodes, _, _)) ⇒ nodes map (node ⇒ DeploymentConfig.nodeNameFor(node)) take replicationFactor case _ ⇒ throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered") diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index 9e136e50c4..cdfad82f7c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -114,18 +114,21 @@ class ClusterActorRef private[akka] (inetSocketAddresses: Array[Tuple2[UUID, Ine } def start(): this.type = synchronized[this.type] { - _status = ActorRefInternals.RUNNING + if (_status == ActorRefInternals.UNSTARTED) { + _status = ActorRefInternals.RUNNING + //TODO add this? Actor.registry.register(this) + } this } def stop() { synchronized { if (_status == ActorRefInternals.RUNNING) { + //TODO add this? Actor.registry.unregister(this) _status = ActorRefInternals.SHUTDOWN postMessageToMailbox(RemoteActorSystemMessage.Stop, None) // FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket) - inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala index a0a5fa40f2..63229b4770 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala @@ -4,8 +4,8 @@ package akka.cluster -import akka.actor.{ DeploymentConfig, Deployer, LocalDeployer, DeploymentException } -import DeploymentConfig._ +import akka.actor.DeploymentConfig._ +import akka.actor._ import akka.event.EventHandler import akka.config.Config import akka.util.Switch @@ -17,12 +17,10 @@ import org.apache.zookeeper.recipes.lock.{ WriteLock, LockListener } import org.I0Itec.zkclient.exception.{ ZkNoNodeException, ZkNodeExistsException } +import scala.collection.immutable.Seq import scala.collection.JavaConversions.collectionAsScalaIterable -import com.eaio.uuid.UUID - import java.util.concurrent.{ CountDownLatch, TimeUnit } -import java.util.concurrent.atomic.AtomicReference /** * A ClusterDeployer is responsible for deploying a Deploy. @@ -31,7 +29,7 @@ import java.util.concurrent.atomic.AtomicReference * * @author Jonas Bonér */ -object ClusterDeployer { +object ClusterDeployer extends ActorDeployer { val clusterName = Cluster.name val nodeName = Config.nodename val clusterPath = "/%s" format clusterName @@ -127,7 +125,7 @@ object ClusterDeployer { deployments } - private[akka] def init(deployments: List[Deploy]) { + private[akka] def init(deployments: Seq[Deploy]) { isConnected switchOn { EventHandler.info(this, "Initializing cluster deployer") @@ -143,7 +141,7 @@ object ClusterDeployer { } } - val allDeployments = deployments ::: systemDeployments + val allDeployments = deployments ++ systemDeployments if (!isDeploymentCompletedInCluster) { if (deploymentInProgressLock.lock()) { @@ -167,21 +165,20 @@ object ClusterDeployer { ensureRunning { LocalDeployer.deploy(deployment) deployment match { - case Deploy(_, _, Local) ⇒ {} // local deployment, do nothing here - case _ ⇒ // cluster deployment - val path = deploymentAddressPath.format(deployment.address) + case Deploy(_, _, _, Local) | Deploy(_, _, _, _: Local) ⇒ //TODO LocalDeployer.deploy(deployment)?? + case Deploy(address, recipe, routing, _) ⇒ // cluster deployment + /*TODO recipe foreach { r ⇒ + Deployer.newClusterActorRef(() ⇒ Actor.actorOf(r.implementationClass), address, deployment).start() + }*/ + val path = deploymentAddressPath.format(address) try { ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) zkClient.writeData(path, deployment) } catch { case e: NullPointerException ⇒ - handleError(new DeploymentException( - "Could not store deployment data [" + deployment + - "] in ZooKeeper since client session is closed")) + handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed")) case e: Exception ⇒ - handleError(new DeploymentException( - "Could not store deployment data [" + - deployment + "] in ZooKeeper due to: " + e)) + handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e)) } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala index 6393502377..bdc430ee6d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala @@ -13,6 +13,7 @@ import akka.util.Duration import System.{ currentTimeMillis ⇒ now } import java.io.File +import akka.actor.Deployer trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll { def testNodes: Int diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala index 23aaaec5a4..c40a06b404 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala @@ -44,26 +44,7 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends Cluste val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() node.isInUseOnNode("hello-world") must be(true) actorRef.address must be("hello-world") - var counter = 0 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + for (i ← 0 until 10) (actorRef ? Count(i)).as[String] must be(Some("World from node [node1]")) } barrier("start-node2", NrOfNodes) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf index 42e57847b5..cc2fb1ef3b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf @@ -2,7 +2,6 @@ akka.enabled-modules = ["cluster"] akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" akka.actor.deployment.hello-world.clustered.replication-factor = 1 - akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala index 2d95ae6047..97fbb1c79b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala @@ -3,7 +3,6 @@ */ package akka.cluster.replication.transactionlog.writethrough.nosnapshot - import akka.actor._ import akka.cluster._ import Cluster._ @@ -44,26 +43,8 @@ class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1 extends Clust val actorRef = Actor.actorOf[HelloWorld]("hello-world").start() node.isInUseOnNode("hello-world") must be(true) actorRef.address must be("hello-world") - var counter = 0 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") - counter += 1 - (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") + for (i ← 0 until 10) + (actorRef ? Count(i)).as[String] must be(Some("World from node [node1]")) } barrier("start-node2", NrOfNodes) {