1. Changed the internal structure of cluster meta-data and how it is stored in ZooKeeper. Affected most of the cluster internals which have been rewritten to a large extent. Lots of code removed.

2. Fixed many issues and both known and hidden bugs in the migration code as well as other parts of the cluster functionality.
3. Made the node holding the ClusterActorRef being potentially part of the replica set for the actor it is representing.
4. Changed and cleaned up ClusterNode API, especially the ClusterNode.store methods.
5. Commented out ClusterNode.remove methods until we have a full story how to do removal
6. Renamed Peter's PeterExample test to a more descriptive name
7. Added round robin router test with 3 replicas
8. Rewrote migration tests to actually test correctly
9. Rewrote existing round robin router tests, now more solid
10. Misc improved logging and documentation and ScalaDoc

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-07-02 18:18:48 +02:00
parent 9297480ae1
commit 828f0355e1
31 changed files with 688 additions and 728 deletions

View file

@ -13,7 +13,7 @@ import ReflectiveAccess._
import akka.remoteinterface.RemoteSupport import akka.remoteinterface.RemoteSupport
import akka.japi.{ Creator, Procedure } import akka.japi.{ Creator, Procedure }
import akka.AkkaException import akka.AkkaException
import akka.serialization.{ Format, Serializer } import akka.serialization.{ Format, Serializer, Serialization }
import akka.cluster.ClusterNode import akka.cluster.ClusterNode
import akka.event.EventHandler import akka.event.EventHandler
import scala.collection.immutable.Stack import scala.collection.immutable.Stack
@ -464,12 +464,12 @@ object Actor extends ListenerManagement {
"] since " + reason) "] since " + reason)
val serializer: Serializer = val serializer: Serializer =
akka.serialization.Serialization.serializerFor(this.getClass).fold(x serializerErrorDueTo(x.toString), s s) Serialization.serializerFor(this.getClass).fold(x serializerErrorDueTo(x.toString), s s)
def storeActorAndGetClusterRef(replicationScheme: ReplicationScheme, serializer: Serializer): ActorRef = { def storeActorAndGetClusterRef(replicationScheme: ReplicationScheme, serializer: Serializer): ActorRef = {
// add actor to cluster registry (if not already added) // add actor to cluster registry (if not already added)
if (!cluster.isClustered(address)) if (!cluster.isClustered(address))
cluster.store(factory().start(), nrOfReplicas, replicationScheme, false, serializer) cluster.store(address, factory, nrOfReplicas, replicationScheme, false, serializer)
// remote node (not home node), check out as ClusterActorRef // remote node (not home node), check out as ClusterActorRef
cluster.ref(address, DeploymentConfig.routerTypeFor(router)) cluster.ref(address, DeploymentConfig.routerTypeFor(router))

View file

@ -95,17 +95,17 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
/** /**
* Registers an actor in the Cluster ActorRegistry. * Registers an actor in the Cluster ActorRegistry.
*/ */
private[akka] def registerInCluster[T <: Actor]( // private[akka] def registerInCluster[T <: Actor](
address: String, actorRef: ActorRef, replicas: Int, serializeMailbox: Boolean = false)(implicit format: Serializer) { // address: String, actorRef: ActorRef, replicas: Int, serializeMailbox: Boolean = false)(implicit format: Serializer) {
ClusterModule.node.store(actorRef, replicas, serializeMailbox, format) // // FIXME: implement ActorRegistry.registerInCluster(..)
} // }
/** /**
* Unregisters an actor in the Cluster ActorRegistry. * Unregisters an actor in the Cluster ActorRegistry.
*/ */
private[akka] def unregisterInCluster(address: String) { // private[akka] def unregisterInCluster(address: String) {
ClusterModule.node.remove(address) // ClusterModule.node.remove(address)
} // }
/** /**
* Get the typed actor proxy for a given typed actor ref. * Get the typed actor proxy for a given typed actor ref.

View file

@ -125,8 +125,6 @@ trait ClusterNode {
val isConnected = new Switch(false) val isConnected = new Switch(false)
private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]() private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
private[cluster] val nodeNameToAddress: ConcurrentMap[String, InetSocketAddress] = new ConcurrentHashMap[String, InetSocketAddress]
private[cluster] val locallyCheckedOutActors: ConcurrentMap[UUID, Array[Byte]] = new ConcurrentHashMap[UUID, Array[Byte]]
def membershipNodes: Array[String] def membershipNodes: Array[String]
@ -173,49 +171,49 @@ trait ClusterNode {
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store[T <: Actor](address: String, actorClass: Class[T], serializer: Serializer): ClusterNode def store[T <: Actor](actorAddress: String, actorClass: Class[T], serializer: Serializer): ClusterNode
/** /**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializer: Serializer): ClusterNode def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationFactor: Int, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, serializer: Serializer): ClusterNode def store[T <: Actor](actorAddress: String, actorClass: Class[T], serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@ -229,76 +227,75 @@ trait ClusterNode {
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store(actorRef: ActorRef, serializer: Serializer): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store(actorRef: ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store(actorRef: ActorRef, replicationFactor: Int, serializer: Serializer): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/** /**
* Needed to have reflection through structural typing work. * Needed to have reflection through structural typing work.
*/ */
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode
/** /**
* Needed to have reflection through structural typing work. * Needed to have reflection through structural typing work.
*/ */
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode
/** /**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/** /**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store. * available durable store.
*/ */
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
/** /**
* Removes actor from the cluster. * Removes actor from the cluster.
*/ */
def remove(actorRef: ActorRef) // def remove(actorRef: ActorRef)
/** /**
* Removes actor with address from the cluster. * Removes actor with address from the cluster.
*/ */
def remove(address: String): ClusterNode // def remove(address: String): ClusterNode
/** /**
* Is the actor with uuid clustered or not? * Is the actor with uuid clustered or not?
@ -328,14 +325,14 @@ trait ClusterNode {
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef]
/** /**
* Using (checking out) all actors with a specific UUID on all nodes in the cluster. * Using (checking out) actor on all nodes in the cluster.
*/ */
def useActorOnAllNodes(uuid: UUID) def useActorOnAllNodes(actorAddress: String)
/** /**
* Using (checking out) specific UUID on a specefic node. * Using (checking out) actor on a specific node.
*/ */
def useActorOnNode(node: String, uuid: UUID) def useActorOnNode(node: String, actorAddress: String)
/** /**
* Checks in an actor after done using it on this node. * Checks in an actor after done using it on this node.
@ -436,18 +433,20 @@ trait ClusterNode {
// =============== PRIVATE METHODS =============== // =============== PRIVATE METHODS ===============
// FIXME considering moving all these private[cluster] methods to a separate trait to get them out of the user's view
private[cluster] def remoteClientLifeCycleListener: ActorRef private[cluster] def remoteClientLifeCycleListener: ActorRef
private[cluster] def remoteDaemon: ActorRef private[cluster] def remoteDaemon: ActorRef
/** /**
* Removes actor with uuid from the cluster. * Removes actor with uuid from the cluster.
*/ */
private[cluster] def remove(uuid: UUID) // private[cluster] def remove(uuid: UUID)
/** /**
* Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'. * Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'.
*/ */
private[cluster] def releaseActorOnAllNodes(uuid: UUID) private[cluster] def releaseActorOnAllNodes(actorAddress: String)
/** /**
* Returns the UUIDs of all actors checked out on this node. * Returns the UUIDs of all actors checked out on this node.
@ -474,11 +473,6 @@ trait ClusterNode {
*/ */
private[cluster] def uuidsForActorAddress(actorAddress: String): Array[UUID] private[cluster] def uuidsForActorAddress(actorAddress: String): Array[UUID]
/**
* Returns the node names of all actors in use with UUID.
*/
private[cluster] def nodesForActorsInUseWithUuid(uuid: UUID): Array[String]
/** /**
* Returns the UUIDs of all actors in use registered on a specific node. * Returns the UUIDs of all actors in use registered on a specific node.
*/ */
@ -488,50 +482,43 @@ trait ClusterNode {
private[cluster] def publish(change: ChangeNotification) private[cluster] def publish(change: ChangeNotification)
private[cluster] def findFailedNodes(nodes: List[String]): List[String]
private[cluster] def findNewlyConnectedMembershipNodes(nodes: List[String]): List[String]
private[cluster] def findNewlyDisconnectedMembershipNodes(nodes: List[String]): List[String]
private[cluster] def findNewlyConnectedAvailableNodes(nodes: List[String]): List[String]
private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String]
private[cluster] def joinCluster() private[cluster] def joinCluster()
private[cluster] def joinLeaderElection: Boolean private[cluster] def joinLeaderElection: Boolean
private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress)
private[cluster] def migrateActorsOnFailedNodes(currentNodes: List[String]) private[cluster] def migrateActorsOnFailedNodes(
failedNodes: List[String],
currentClusterNodes: List[String],
oldClusterNodes: List[String],
disconnectedConnections: Map[String, InetSocketAddress])
private[cluster] def membershipPathFor(node: String): String private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
newlyConnectedMembershipNodes: Traversable[String],
private[cluster] def configurationPathFor(key: String): String newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress]
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String
private[cluster] def actorLocationsPathFor(uuid: UUID): String
private[cluster] def actorLocationsPathFor(uuid: UUID, node: NodeAddress): String
private[cluster] def actorsAtNodePathFor(node: String): String
private[cluster] def actorAtNodePathFor(node: String, uuid: UUID): String
private[cluster] def actorRegistryPathFor(uuid: UUID): String
private[cluster] def actorRegistrySerializerPathFor(uuid: UUID): String
private[cluster] def actorRegistryActorAddressPathFor(uuid: UUID): String
private[cluster] def actorRegistryNodePathFor(uuid: UUID): String
private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String
private[cluster] def remoteSocketAddressForNode(node: String): Option[InetSocketAddress] private[cluster] def remoteSocketAddressForNode(node: String): Option[InetSocketAddress]
private[cluster] def createActorsAtAddressPath() private[cluster] def createActorsAtAddressPath()
private[cluster] def membershipPathFor(node: String): String
private[cluster] def configurationPathFor(key: String): String
private[cluster] def actorAddressToNodesPathFor(actorAddress: String): String
private[cluster] def actorAddressToNodesPathFor(actorAddress: String, nodeName: String): String
private[cluster] def nodeToUuidsPathFor(node: String): String
private[cluster] def nodeToUuidsPathFor(node: String, uuid: UUID): String
private[cluster] def actorAddressRegistryPathFor(actorAddress: String): String
private[cluster] def actorAddressRegistrySerializerPathFor(actorAddress: String): String
private[cluster] def actorAddressRegistryUuidPathFor(actorAddress: String): String
private[cluster] def actorUuidRegistryPathFor(uuid: UUID): String
private[cluster] def actorUuidRegistryNodePathFor(uuid: UUID): String
private[cluster] def actorUuidRegistryAddressPathFor(uuid: UUID): String
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String
} }

View file

@ -102,6 +102,7 @@ object ReflectiveAccess {
def dequeue: MessageInvocation def dequeue: MessageInvocation
} }
// FIXME: remove?
type Serializer = { type Serializer = {
def toBinary(obj: AnyRef): Array[Byte] def toBinary(obj: AnyRef): Array[Byte]
def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef

File diff suppressed because it is too large Load diff

View file

@ -28,7 +28,6 @@ class ClusterActorRef private[akka] (
val address: String, val address: String,
_timeout: Long) _timeout: Long)
extends ActorRef with ScalaActorRef { this: Router.Router extends ActorRef with ScalaActorRef { this: Router.Router
timeout = _timeout timeout = _timeout
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]]( private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](

View file

@ -41,9 +41,9 @@ object ClusterDeployer {
val deploymentCoordinationPath = clusterPath + "/deployment-coordination" val deploymentCoordinationPath = clusterPath + "/deployment-coordination"
val deploymentInProgressLockPath = deploymentCoordinationPath + "/in-progress" val deploymentInProgressLockPath = deploymentCoordinationPath + "/in-progress"
val isDeploymentCompletedInClusterLockPath = deploymentCoordinationPath + "/completed" // should not be part of baseNodes val isDeploymentCompletedInClusterLockPath = deploymentCoordinationPath + "/completed" // should not be part of basePaths
val baseNodes = List(clusterPath, deploymentPath, deploymentCoordinationPath, deploymentInProgressLockPath) val basePaths = List(clusterPath, deploymentPath, deploymentCoordinationPath, deploymentInProgressLockPath)
private val isConnected = new Switch(false) private val isConnected = new Switch(false)
private val deploymentCompleted = new CountDownLatch(1) private val deploymentCompleted = new CountDownLatch(1)
@ -123,7 +123,7 @@ object ClusterDeployer {
val deployments = addresses map { address val deployments = addresses map { address
zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy] zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy]
} }
EventHandler.info(this, "Fetched clustered deployments [\n\t%s\n]" format deployments.mkString("\n\t")) EventHandler.info(this, "Fetched deployment plan from cluster [\n\t%s\n]" format deployments.mkString("\n\t"))
deployments deployments
} }
@ -131,10 +131,10 @@ object ClusterDeployer {
isConnected switchOn { isConnected switchOn {
EventHandler.info(this, "Initializing cluster deployer") EventHandler.info(this, "Initializing cluster deployer")
baseNodes foreach { path basePaths foreach { path
try { try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path)) EventHandler.debug(this, "Created ZooKeeper path for deployment [%s]".format(path))
} catch { } catch {
case e case e
val error = new DeploymentException(e.toString) val error = new DeploymentException(e.toString)
@ -148,7 +148,7 @@ object ClusterDeployer {
if (!isDeploymentCompletedInCluster) { if (!isDeploymentCompletedInCluster) {
if (deploymentInProgressLock.lock()) { if (deploymentInProgressLock.lock()) {
// try to be the one doing the clustered deployment // try to be the one doing the clustered deployment
EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]") EventHandler.info(this, "Pushing deployment plan cluster [\n\t" + allDeployments.mkString("\n\t") + "\n]")
allDeployments foreach (deploy(_)) // deploy allDeployments foreach (deploy(_)) // deploy
markDeploymentCompletedInCluster() markDeploymentCompletedInCluster()
deploymentInProgressLock.unlock() // signal deployment complete deploymentInProgressLock.unlock() // signal deployment complete

View file

@ -209,7 +209,7 @@ abstract class RemoteClient private[akka] (
senderFuture: Option[Promise[T]]): Option[Promise[T]] = { senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
if (isRunning) { if (isRunning) {
EventHandler.debug(this, "Sending to connection [%s] message [%s]".format(remoteAddress, request)) EventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request))
if (request.getOneWay) { if (request.getOneWay) {
try { try {
@ -950,7 +950,7 @@ class RemoteServerHandler(
val address = actorInfo.getAddress val address = actorInfo.getAddress
EventHandler.debug(this, EventHandler.debug(this,
"Creating an remotely available actor for address [%s] on node [%s]" "Looking up a remotely available actor for address [%s] on node [%s]"
.format(address, Config.nodename)) .format(address, Config.nodename))
val actorRef = Actor.createActor(address, () createSessionActor(actorInfo, channel)) val actorRef = Actor.createActor(address, () createSessionActor(actorInfo, channel))

View file

@ -39,9 +39,9 @@ class NodeDisconnectedChangeListenerMultiJvmNode1 extends WordSpec with MustMatc
} }
latch.await(10, TimeUnit.SECONDS) must be === true latch.await(10, TimeUnit.SECONDS) must be === true
}
node.shutdown() node.shutdown()
}
} }
override def beforeAll() = { override def beforeAll() = {

View file

@ -9,16 +9,14 @@ import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import akka.actor._ import akka.actor._
import Actor._
import akka.cluster._ import akka.cluster._
import ChangeListener._
import Cluster._ import Cluster._
import DeploymentConfig._
import akka.config.Config import akka.config.Config
import akka.serialization.Serialization import akka.serialization.Serialization
import java.util.concurrent._ /**
* Tests automatic transparent migration of an actor from node1 to node2 and then from node2 to node3.
*/
object MigrationAutomaticMultiJvmSpec { object MigrationAutomaticMultiJvmSpec {
var NrOfNodes = 3 var NrOfNodes = 3
@ -37,19 +35,20 @@ class MigrationAutomaticMultiJvmNode1 extends WordSpec with MustMatchers {
"be able to migrate an actor from one node to another" in { "be able to migrate an actor from one node to another" in {
barrier("start-node3", NrOfNodes) {
}
barrier("start-node2", NrOfNodes) {
}
barrier("start-node1", NrOfNodes) { barrier("start-node1", NrOfNodes) {
node.start() node.start()
} }
barrier("store-actor-in-node1", NrOfNodes) { barrier("store-actor-in-node1", NrOfNodes) {
val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x fail("No serializer found"), s s) val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x fail("No serializer found"), s s)
node.store(actorOf[HelloWorld]("hello-world"), 1, serializer) node.store("hello-world", classOf[HelloWorld], 1, serializer)
node.isInUseOnNode("hello-world") must be(true)
val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
actorRef.address must be("hello-world")
(actorRef ? "Hello").as[String].get must be("World from node [node1]")
}
barrier("start-node2", NrOfNodes) {
} }
node.shutdown() node.shutdown()
@ -66,22 +65,19 @@ class MigrationAutomaticMultiJvmNode2 extends WordSpec with MustMatchers with Be
"be able to migrate an actor from one node to another" in { "be able to migrate an actor from one node to another" in {
barrier("start-node3", NrOfNodes) {
}
barrier("start-node2", NrOfNodes) {
node.start()
}
barrier("start-node1", NrOfNodes) { barrier("start-node1", NrOfNodes) {
} }
barrier("store-actor-in-node1", NrOfNodes) { barrier("store-actor-in-node1", NrOfNodes) {
} }
Thread.sleep(2000) // wait for fail-over barrier("start-node2", NrOfNodes) {
node.start()
}
barrier("check-fail-over", NrOfNodes - 1) { Thread.sleep(2000) // wait for fail-over from node1 to node2
barrier("check-fail-over-to-node2", NrOfNodes - 1) {
// both remaining nodes should now have the replica // both remaining nodes should now have the replica
node.isInUseOnNode("hello-world") must be(true) node.isInUseOnNode("hello-world") must be(true)
val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
@ -89,6 +85,9 @@ class MigrationAutomaticMultiJvmNode2 extends WordSpec with MustMatchers with Be
(actorRef ? "Hello").as[String].get must be("World from node [node2]") (actorRef ? "Hello").as[String].get must be("World from node [node2]")
} }
barrier("start-node3", NrOfNodes - 1) {
}
node.shutdown() node.shutdown()
} }
} }
@ -101,22 +100,25 @@ class MigrationAutomaticMultiJvmNode3 extends WordSpec with MustMatchers with Be
"be able to migrate an actor from one node to another" in { "be able to migrate an actor from one node to another" in {
barrier("start-node3", NrOfNodes) {
node.start()
}
barrier("start-node2", NrOfNodes) {
}
barrier("start-node1", NrOfNodes) { barrier("start-node1", NrOfNodes) {
} }
barrier("store-actor-in-node1", NrOfNodes) { barrier("store-actor-in-node1", NrOfNodes) {
} }
Thread.sleep(2000) // wait for fail-over barrier("start-node2", NrOfNodes) {
}
barrier("check-fail-over", NrOfNodes - 1) { barrier("check-fail-over-to-node2", NrOfNodes - 1) {
}
barrier("start-node3", NrOfNodes - 1) {
node.start()
}
Thread.sleep(2000) // wait for fail-over from node2 to node3
barrier("check-fail-over-to-node3", NrOfNodes - 2) {
// both remaining nodes should now have the replica // both remaining nodes should now have the replica
node.isInUseOnNode("hello-world") must be(true) node.isInUseOnNode("hello-world") must be(true)
val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry")) val actorRef = Actor.registry.local.actorFor("hello-world").getOrElse(fail("Actor should have been in the local actor registry"))
@ -128,11 +130,12 @@ class MigrationAutomaticMultiJvmNode3 extends WordSpec with MustMatchers with Be
} }
} }
override def beforeAll() = { override def beforeAll() {
startLocalCluster() startLocalCluster()
} }
override def afterAll() = { override def afterAll() {
shutdownLocalCluster() shutdownLocalCluster()
} }
} }

View file

@ -45,7 +45,7 @@ class MigrationExplicitMultiJvmNode1 extends WordSpec with MustMatchers with Bef
barrier("store-1-in-node-1", NrOfNodes) { barrier("store-1-in-node-1", NrOfNodes) {
val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x fail("No serializer found"), s s) val serializer = Serialization.serializerFor(classOf[HelloWorld]).fold(x fail("No serializer found"), s s)
node.store(actorOf[HelloWorld]("hello-world"), serializer) node.store("hello-world", classOf[HelloWorld], serializer)
} }
barrier("use-1-in-node-2", NrOfNodes) { barrier("use-1-in-node-2", NrOfNodes) {

View file

@ -56,7 +56,7 @@ class RegistryStoreMultiJvmNode1 extends WordSpec with MustMatchers with BeforeA
barrier("store-1-in-node-1", NrOfNodes) { barrier("store-1-in-node-1", NrOfNodes) {
val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x fail("No serializer found"), s s) val serializer = Serialization.serializerFor(classOf[HelloWorld1]).fold(x fail("No serializer found"), s s)
node.store(actorOf[HelloWorld1]("hello-world-1"), serializer) node.store("hello-world-1", classOf[HelloWorld1], serializer)
} }
barrier("use-1-in-node-2", NrOfNodes) { barrier("use-1-in-node-2", NrOfNodes) {
@ -70,20 +70,6 @@ class RegistryStoreMultiJvmNode1 extends WordSpec with MustMatchers with BeforeA
barrier("use-2-in-node-2", NrOfNodes) { barrier("use-2-in-node-2", NrOfNodes) {
} }
barrier("store-3-in-node-1", NrOfNodes) {
val serializer = Serialization.serializerFor(classOf[HelloWorld2]).fold(x fail("No serializer found"), s s)
val actor = actorOf[HelloWorld2]("hello-world-3").start
actor ! "Hello"
actor ! "Hello"
actor ! "Hello"
actor ! "Hello"
actor ! "Hello"
node.store(actor, true, serializer)
}
barrier("use-3-in-node-2", NrOfNodes) {
}
node.shutdown() node.shutdown()
} }
} }
@ -137,19 +123,6 @@ class RegistryStoreMultiJvmNode2 extends WordSpec with MustMatchers {
(actorRef ? "Hello").as[String].get must be("World from node [node2]") (actorRef ? "Hello").as[String].get must be("World from node [node2]")
} }
barrier("store-3-in-node-1", NrOfNodes) {
}
barrier("use-3-in-node-2", NrOfNodes) {
val actorOrOption = node.use("hello-world-3")
if (actorOrOption.isEmpty) fail("Actor could not be retrieved")
val actorRef = actorOrOption.get
actorRef.address must be("hello-world-3")
(actorRef ? ("Count", 30000)).as[Int].get must be >= (2) // be conservative - can by 5 but also 2 if slow system
}
node.shutdown() node.shutdown()
} }
} }

View file

@ -1,75 +0,0 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.cluster.multijvmtestsample
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.cluster._
object SampleMultiJvmSpec {
val NrOfNodes = 2
}
class SampleMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import SampleMultiJvmSpec._
override def beforeAll() = {
Cluster.startLocalCluster()
}
override def afterAll() = {
Cluster.shutdownLocalCluster()
}
def resetCluster(): Unit = {
import akka.cluster.zookeeper._
import akka.util.Helpers.ignore
import org.I0Itec.zkclient.exception.ZkNoNodeException
val zkClient = Cluster.newZkClient
ignore[ZkNoNodeException](zkClient.deleteRecursive("/" + Cluster.name))
ignore[ZkNoNodeException](zkClient.deleteRecursive(ZooKeeperBarrier.BarriersNode))
zkClient.close
}
"A cluster" must {
"have jvm options" in {
System.getProperty("akka.cluster.nodename", "") must be("node1")
System.getProperty("akka.cluster.port", "") must be("9991")
akka.config.Config.config.getString("test.name", "") must be("node1")
}
"be able to start all nodes" in {
Cluster.barrier("start", NrOfNodes) {
Cluster.node.start()
}
Cluster.node.isRunning must be(true)
Cluster.node.shutdown()
}
}
}
class SampleMultiJvmNode2 extends WordSpec with MustMatchers {
import SampleMultiJvmSpec._
"A cluster" must {
"have jvm options" in {
System.getProperty("akka.cluster.nodename", "") must be("node2")
System.getProperty("akka.cluster.port", "") must be("9992")
akka.config.Config.config.getString("test.name", "") must be("node2")
}
"be able to start all nodes" in {
Cluster.barrier("start", NrOfNodes) {
Cluster.node.start()
}
Cluster.node.isRunning must be(true)
Cluster.node.shutdown()
}
}
}

View file

@ -1 +0,0 @@
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992

View file

@ -8,19 +8,16 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.bookkeeper.client.{ BookKeeper, BKException }
import BKException._
import akka.cluster._ import akka.cluster._
import Cluster._
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import akka.config.Config import akka.config.Config
/** /**
* todo: What is the main purpose of this test? * Test that if a single node is used with a round robin router with replication factor then the actor is instantiated on the single node.
*/ */
object RoundRobin1ReplicaMultiJvmSpec { object RoundRobin1ReplicaMultiJvmSpec {
val NrOfNodes = 2
class HelloWorld extends Actor with Serializable { class HelloWorld extends Actor with Serializable {
def receive = { def receive = {
@ -30,80 +27,33 @@ object RoundRobin1ReplicaMultiJvmSpec {
} }
} }
/**
* This node makes use of the remote actor and
*/
class RoundRobin1ReplicaMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { class RoundRobin1ReplicaMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import RoundRobin1ReplicaMultiJvmSpec._ import RoundRobin1ReplicaMultiJvmSpec._
private var bookKeeper: BookKeeper = _
// private var localBookKeeper: LocalBookKeeper = _
"A cluster" must { "A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in { "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
System.getProperty("akka.cluster.nodename", "") must be("node1") node.start()
System.getProperty("akka.cluster.port", "") must be("9991")
Cluster.barrier("start-node1", NrOfNodes) {
Cluster.node.start()
}
Cluster.barrier("start-node2", NrOfNodes) {}
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
Cluster.barrier("send-message-from-node2-to-node1", NrOfNodes) {}
Cluster.node.shutdown()
}
}
override def beforeAll() = {
Cluster.startLocalCluster()
// LocalBookKeeperEnsemble.start()
}
override def afterAll() = {
Cluster.shutdownLocalCluster()
// TransactionLog.shutdown()
// LocalBookKeeperEnsemble.shutdown()
}
}
/**
* This node checks if the basic behavior of the actor is working correctly.
*/
class RoundRobin1ReplicaMultiJvmNode2 extends WordSpec with MustMatchers {
import RoundRobin1ReplicaMultiJvmSpec._
"A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
System.getProperty("akka.cluster.nodename", "") must be("node2")
System.getProperty("akka.cluster.port", "") must be("9992")
Cluster.barrier("start-node1", NrOfNodes) {}
Cluster.barrier("start-node2", NrOfNodes) {
Cluster.node.start()
}
var hello: ActorRef = null var hello: ActorRef = null
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) { hello = Actor.actorOf[HelloWorld]("service-hello")
hello = Actor.actorOf[HelloWorld]("service-hello") hello must not equal (null)
hello must not equal (null) hello.address must equal("service-hello")
hello.address must equal("service-hello") hello.isInstanceOf[ClusterActorRef] must be(true)
hello.isInstanceOf[ClusterActorRef] must be(true)
}
Cluster.barrier("send-message-from-node2-to-node1", NrOfNodes) { hello must not equal (null)
hello must not equal (null) val reply = (hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))
val reply = (hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")) reply must equal("World from node [node1]")
reply must equal("World from node [node1]")
}
Cluster.node.shutdown() node.shutdown()
} }
} }
override def beforeAll() {
startLocalCluster()
}
override def afterAll() {
shutdownLocalCluster()
}
} }

View file

@ -8,10 +8,8 @@ import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.bookkeeper.client.{ BookKeeper, BKException }
import BKException._
import akka.cluster._ import akka.cluster._
import Cluster._
import akka.actor._ import akka.actor._
import akka.actor.Actor._ import akka.actor.Actor._
import akka.config.Config import akka.config.Config
@ -21,12 +19,11 @@ import akka.config.Config
* for running actors, or will it be just a 'client' talking to the cluster. * for running actors, or will it be just a 'client' talking to the cluster.
*/ */
object RoundRobin2ReplicasMultiJvmSpec { object RoundRobin2ReplicasMultiJvmSpec {
val NrOfNodes = 3 val NrOfNodes = 2
class HelloWorld extends Actor with Serializable { class HelloWorld extends Actor with Serializable {
def receive = { def receive = {
case "Hello" case "Hello"
println("Received message on [" + Config.nodename + "]")
self.reply("World from node [" + Config.nodename + "]") self.reply("World from node [" + Config.nodename + "]")
} }
} }
@ -38,9 +35,6 @@ object RoundRobin2ReplicasMultiJvmSpec {
class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import RoundRobin2ReplicasMultiJvmSpec._ import RoundRobin2ReplicasMultiJvmSpec._
private var bookKeeper: BookKeeper = _
private var localBookKeeper: LocalBookKeeper = _
"A cluster" must { "A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in { "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
@ -48,35 +42,32 @@ class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B
System.getProperty("akka.cluster.port", "") must be("9991") System.getProperty("akka.cluster.port", "") must be("9991")
//wait till node 1 has started. //wait till node 1 has started.
Cluster.barrier("start-node1", NrOfNodes) { barrier("start-node1", NrOfNodes) {
Cluster.node.start() node.start()
} }
//wait till ndoe 2 has started. //wait till ndoe 2 has started.
Cluster.barrier("start-node2", NrOfNodes) {} barrier("start-node2", NrOfNodes) {}
//wait till node 3 has started. //wait till node 3 has started.
Cluster.barrier("start-node3", NrOfNodes) {} barrier("start-node3", NrOfNodes) {}
//wait till an actor reference on node 2 has become available. //wait till an actor reference on node 2 has become available.
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {} barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
//wait till the node 2 has send a message to the replica's. //wait till the node 2 has send a message to the replica's.
Cluster.barrier("send-message-from-node2-to-replicas", NrOfNodes) {} barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
Cluster.node.shutdown() node.shutdown()
} }
} }
override def beforeAll() = { override def beforeAll() {
Cluster.startLocalCluster() startLocalCluster()
LocalBookKeeperEnsemble.start()
} }
override def afterAll() = { override def afterAll() {
Cluster.shutdownLocalCluster() shutdownLocalCluster()
TransactionLog.shutdown()
LocalBookKeeperEnsemble.shutdown()
} }
} }
@ -90,26 +81,26 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
System.getProperty("akka.cluster.port", "") must be("9992") System.getProperty("akka.cluster.port", "") must be("9992")
//wait till node 1 has started. //wait till node 1 has started.
Cluster.barrier("start-node1", NrOfNodes) {} barrier("start-node1", NrOfNodes) {}
//wait till node 2 has started. //wait till node 2 has started.
Cluster.barrier("start-node2", NrOfNodes) { barrier("start-node2", NrOfNodes) {
Cluster.node.start() node.start()
} }
//wait till node 3 has started. //wait till node 3 has started.
Cluster.barrier("start-node3", NrOfNodes) {} barrier("start-node3", NrOfNodes) {}
//check if the actorRef is the expected remoteActorRef. //check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null var hello: ActorRef = null
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) { barrier("get-ref-to-actor-on-node2", NrOfNodes) {
hello = Actor.actorOf[HelloWorld]("service-hello") hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null) hello must not equal (null)
hello.address must equal("service-hello") hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true) hello.isInstanceOf[ClusterActorRef] must be(true)
} }
Cluster.barrier("send-message-from-node2-to-replicas", NrOfNodes) { barrier("send-message-from-node2-to-replicas", NrOfNodes) {
//todo: is there a reason to check for null again since it already has been done in the previous block. //todo: is there a reason to check for null again since it already has been done in the previous block.
hello must not equal (null) hello must not equal (null)
@ -120,45 +111,19 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
} }
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3"))) count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
replies("World from node [node1]") must equal(4) replies("World from node [node1]") must equal(4)
replies("World from node [node3]") must equal(4) replies("World from node [node2]") must equal(4)
} }
Cluster.node.shutdown() node.shutdown()
}
}
}
class RoundRobin2ReplicasMultiJvmNode3 extends WordSpec with MustMatchers {
import RoundRobin2ReplicasMultiJvmSpec._
"A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
System.getProperty("akka.cluster.nodename", "") must be("node3")
System.getProperty("akka.cluster.port", "") must be("9993")
Cluster.barrier("start-node1", NrOfNodes) {}
Cluster.barrier("start-node2", NrOfNodes) {}
Cluster.barrier("start-node3", NrOfNodes) {
Cluster.node.start()
}
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
Cluster.barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
Cluster.node.shutdown()
} }
} }
} }

View file

@ -1,5 +1,5 @@
akka.event-handler-level = "DEBUG" akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1" akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 2 akka.actor.deployment.service-hello.clustered.replicas = 3
akka.actor.deployment.service-hello.clustered.stateless = on akka.actor.deployment.service-hello.clustered.stateless = on

View file

@ -1,4 +1,5 @@
akka.event-handler-level = "DEBUG" akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1" akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 1 akka.actor.deployment.service-hello.clustered.replicas = 3
akka.actor.deployment.service-hello.clustered.stateless = on

View file

@ -0,0 +1,5 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 3
akka.actor.deployment.service-hello.clustered.stateless = on

View file

@ -0,0 +1,155 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.cluster.routing.roundrobin_3_replicas
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
import akka.cluster._
import akka.actor._
import akka.actor.Actor._
import akka.config.Config
import Cluster._
/**
* When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible
* for running actors, or will it be just a 'client' talking to the cluster.
*/
object RoundRobin3ReplicasMultiJvmSpec {
val NrOfNodes = 3
class HelloWorld extends Actor with Serializable {
def receive = {
case "Hello"
self.reply("World from node [" + Config.nodename + "]")
}
}
}
/**
* What is the purpose of this node? Is this just a node for the cluster to make use of?
*/
class RoundRobin3ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import RoundRobin3ReplicasMultiJvmSpec._
"A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
//wait till node 1 has started.
barrier("start-node1", NrOfNodes) {
node.start()
}
//wait till ndoe 2 has started.
barrier("start-node2", NrOfNodes) {}
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
//wait till an actor reference on node 2 has become available.
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
//wait till the node 2 has send a message to the replica's.
barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
node.shutdown()
}
}
override def beforeAll() {
startLocalCluster()
}
override def afterAll() {
shutdownLocalCluster()
}
}
class RoundRobin3ReplicasMultiJvmNode2 extends WordSpec with MustMatchers {
import RoundRobin3ReplicasMultiJvmSpec._
import Cluster._
"A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
//wait till node 1 has started.
barrier("start-node1", NrOfNodes) {}
//wait till node 2 has started.
barrier("start-node2", NrOfNodes) {
node.start()
}
//wait till node 3 has started.
barrier("start-node3", NrOfNodes) {}
//check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null
barrier("get-ref-to-actor-on-node2", NrOfNodes) {
hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true)
}
barrier("send-message-from-node2-to-replicas", NrOfNodes) {
//todo: is there a reason to check for null again since it already has been done in the previous block.
hello must not equal (null)
val replies = collection.mutable.Map.empty[String, Int]
def count(reply: String) = {
if (replies.get(reply).isEmpty) replies.put(reply, 1)
else replies.put(reply, replies(reply) + 1)
}
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3")))
replies("World from node [node1]") must equal(4)
replies("World from node [node2]") must equal(4)
replies("World from node [node3]") must equal(4)
}
node.shutdown()
}
}
}
class RoundRobin3ReplicasMultiJvmNode3 extends WordSpec with MustMatchers {
import RoundRobin3ReplicasMultiJvmSpec._
import Cluster._
"A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
barrier("start-node1", NrOfNodes) {}
barrier("start-node2", NrOfNodes) {}
barrier("start-node3", NrOfNodes) {
node.start()
}
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
barrier("send-message-from-node2-to-replicas", NrOfNodes) {}
node.shutdown()
}
}
}

View file

@ -1,4 +1,4 @@
package akka.cluster.routing.peterexample package akka.cluster.routing.use_homenode_as_replica
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import akka.config.Config import akka.config.Config
@ -6,15 +6,11 @@ import org.scalatest.{ BeforeAndAfterAll, WordSpec }
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.actor.{ ActorRef, Actor } import akka.actor.{ ActorRef, Actor }
object PeterExampleMultiJvmSpec { object UseHomeNodeAsReplicaMultiJvmSpec {
val NrOfNodes = 2 val NrOfNodes = 2
class HelloWorld extends Actor with Serializable { class HelloWorld extends Actor with Serializable {
println("---------------------------------------------------------------------------")
println("HelloWorldActor has been created on node [" + Config.nodename + "]")
println("---------------------------------------------------------------------------")
def receive = { def receive = {
case x: String { case x: String {
println("Hello message was received") println("Hello message was received")
@ -34,13 +30,12 @@ class TestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
} }
} }
class PeterExampleMultiJvmNode1 extends TestNode { class UseHomeNodeAsReplicaMultiJvmNode1 extends TestNode {
import PeterExampleMultiJvmSpec._ import UseHomeNodeAsReplicaMultiJvmSpec._
"foo" must { "foo" must {
"bla" in { "bla" in {
/*
println("Node 1 has started") println("Node 1 has started")
Cluster.barrier("start-node1", NrOfNodes) { Cluster.barrier("start-node1", NrOfNodes) {
@ -55,20 +50,16 @@ class PeterExampleMultiJvmNode1 extends TestNode {
hello = Actor.actorOf[HelloWorld]("service-hello") hello = Actor.actorOf[HelloWorld]("service-hello")
} }
println("Successfully acquired reference")
println("Saying hello to actor") println("Saying hello to actor")
hello ! "say hello" hello ! "say hello"
Cluster.node.shutdown() */ Cluster.node.shutdown()
} }
} }
} }
class PeterExampleMultiJvmNode2 extends WordSpec with MustMatchers with BeforeAndAfterAll { class UseHomeNodeAsReplicaMultiJvmNode2 extends WordSpec with MustMatchers with BeforeAndAfterAll {
import PeterExampleMultiJvmSpec._
/*
import UseHomeNodeAsReplicaMultiJvmSpec._
"foo" must { "foo" must {
"bla" in { "bla" in {
println("Waiting for Node 1 to start") println("Waiting for Node 1 to start")
@ -84,5 +75,5 @@ class PeterExampleMultiJvmNode2 extends WordSpec with MustMatchers with BeforeAn
println("Shutting down JVM Node 2") println("Shutting down JVM Node 2")
Cluster.node.shutdown() Cluster.node.shutdown()
} }
} */ }
} }