Merge commit

This commit is contained in:
Viktor Klang 2011-09-29 13:11:35 +02:00
commit a12ee36151
57 changed files with 596 additions and 275 deletions

View file

@ -517,16 +517,16 @@ class DefaultClusterNode private[akka] (
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationFactor: Int, serializer: Serializer): ClusterNode =
store(actorAddress, () Actor.actorOf(actorClass, actorAddress), replicationFactor, Transient, false, serializer)
def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, serializer: Serializer): ClusterNode =
store(actorAddress, () Actor.actorOf(actorClass, actorAddress), nrOfInstances, Transient, false, serializer)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode =
store(actorAddress, () Actor.actorOf(actorClass, actorAddress), replicationFactor, replicationScheme, false, serializer)
def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode =
store(actorAddress, () Actor.actorOf(actorClass, actorAddress), nrOfInstances, replicationScheme, false, serializer)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@ -549,16 +549,16 @@ class DefaultClusterNode private[akka] (
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
store(actorAddress, () Actor.actorOf(actorClass, actorAddress), replicationFactor, Transient, serializeMailbox, serializer)
def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
store(actorAddress, () Actor.actorOf(actorClass, actorAddress), nrOfInstances, Transient, serializeMailbox, serializer)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorAddress: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
store(actorAddress, () Actor.actorOf(actorClass, actorAddress), replicationFactor, replicationScheme, serializeMailbox, serializer)
def store[T <: Actor](actorAddress: String, actorClass: Class[T], nrOfInstances: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
store(actorAddress, () Actor.actorOf(actorClass, actorAddress), nrOfInstances, replicationScheme, serializeMailbox, serializer)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@ -589,24 +589,24 @@ class DefaultClusterNode private[akka] (
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, serializer: Serializer): ClusterNode =
store(actorAddress, actorFactory, replicationFactor, Transient, false, serializer)
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, serializer: Serializer): ClusterNode =
store(actorAddress, actorFactory, nrOfInstances, Transient, false, serializer)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode =
store(actorAddress, actorFactory, replicationFactor, replicationScheme, false, serializer)
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode =
store(actorAddress, actorFactory, nrOfInstances, replicationScheme, false, serializer)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
store(actorAddress, actorFactory, replicationFactor, Transient, serializeMailbox, serializer)
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
store(actorAddress, actorFactory, nrOfInstances, Transient, serializeMailbox, serializer)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@ -619,14 +619,14 @@ class DefaultClusterNode private[akka] (
/**
* Needed to have reflection through structural typing work.
*/
def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode =
store(actorAddress, actorFactory, replicationFactor, replicationScheme, serializeMailbox, serializer.asInstanceOf[Serializer])
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode =
store(actorAddress, actorFactory, nrOfInstances, replicationScheme, serializeMailbox, serializer.asInstanceOf[Serializer])
/**
* Needed to have reflection through structural typing work.
*/
def store(actorAddress: String, actorFactory: () ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode =
store(actorAddress, actorFactory, replicationFactor, Transient, serializeMailbox, serializer)
def store(actorAddress: String, actorFactory: () ActorRef, nrOfInstances: Int, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode =
store(actorAddress, actorFactory, nrOfInstances, Transient, serializeMailbox, serializer)
/**
* Clusters an actor. If the actor is already clustered then the clustered version will be updated
@ -636,7 +636,7 @@ class DefaultClusterNode private[akka] (
def store(
actorAddress: String,
actorFactory: () ActorRef,
replicationFactor: Int,
nrOfInstances: Int,
replicationScheme: ReplicationScheme,
serializeMailbox: Boolean,
serializer: Serializer): ClusterNode = {
@ -686,7 +686,7 @@ class DefaultClusterNode private[akka] (
// create ADDRESS -> UUIDs mapping
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress)))
useActorOnNodes(nodesForReplicationFactor(replicationFactor, Some(actorAddress)).toArray, actorAddress)
useActorOnNodes(nodesForNrOfInstances(nrOfInstances, Some(actorAddress)).toArray, actorAddress)
this
}
@ -1025,9 +1025,9 @@ class DefaultClusterNode private[akka] (
// =======================================
/**
* Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument).
* Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument).
*/
def send(f: Function0[Unit], replicationFactor: Int) {
def send(f: Function0[Unit], nrOfInstances: Int) {
Serialization.serialize(f) match {
case Left(error) throw error
case Right(bytes)
@ -1035,15 +1035,15 @@ class DefaultClusterNode private[akka] (
.setMessageType(FUNCTION_FUN0_UNIT)
.setPayload(ByteString.copyFrom(bytes))
.build
nodeConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
nodeConnectionsForNrOfInstances(nrOfInstances) foreach (_ ! message)
}
}
/**
* Send a function 'Function0[Any]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument).
* Send a function 'Function0[Any]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument).
* Returns an 'Array' with all the 'Future's from the computation.
*/
def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]] = {
def send(f: Function0[Any], nrOfInstances: Int): List[Future[Any]] = {
Serialization.serialize(f) match {
case Left(error) throw error
case Right(bytes)
@ -1051,16 +1051,16 @@ class DefaultClusterNode private[akka] (
.setMessageType(FUNCTION_FUN0_ANY)
.setPayload(ByteString.copyFrom(bytes))
.build
val results = nodeConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
val results = nodeConnectionsForNrOfInstances(nrOfInstances) map (_ ? message)
results.toList.asInstanceOf[List[Future[Any]]]
}
}
/**
* Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument)
* Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument)
* with the argument speficied.
*/
def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int) {
def send(f: Function1[Any, Unit], arg: Any, nrOfInstances: Int) {
Serialization.serialize((f, arg)) match {
case Left(error) throw error
case Right(bytes)
@ -1068,16 +1068,16 @@ class DefaultClusterNode private[akka] (
.setMessageType(FUNCTION_FUN1_ARG_UNIT)
.setPayload(ByteString.copyFrom(bytes))
.build
nodeConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
nodeConnectionsForNrOfInstances(nrOfInstances) foreach (_ ! message)
}
}
/**
* Send a function 'Function1[Any, Any]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument)
* Send a function 'Function1[Any, Any]' to be invoked on a random number of nodes (defined by 'nrOfInstances' argument)
* with the argument speficied.
* Returns an 'Array' with all the 'Future's from the computation.
*/
def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]] = {
def send(f: Function1[Any, Any], arg: Any, nrOfInstances: Int): List[Future[Any]] = {
Serialization.serialize((f, arg)) match {
case Left(error) throw error
case Right(bytes)
@ -1085,7 +1085,7 @@ class DefaultClusterNode private[akka] (
.setMessageType(FUNCTION_FUN1_ARG_ANY)
.setPayload(ByteString.copyFrom(bytes))
.build
val results = nodeConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
val results = nodeConnectionsForNrOfInstances(nrOfInstances) map (_ ? message)
results.toList.asInstanceOf[List[Future[Any]]]
}
}
@ -1211,16 +1211,16 @@ class DefaultClusterNode private[akka] (
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String, uuid: UUID): String = "%s/%s".format(actorAddressToUuidsPathFor(actorAddress), uuid)
/**
* Returns a random set with node names of size 'replicationFactor'.
* Default replicationFactor is 0, which returns the empty Set.
* Returns a random set with node names of size 'nrOfInstances'.
* Default nrOfInstances is 0, which returns the empty Set.
*/
private def nodesForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Set[String] = {
private def nodesForNrOfInstances(nrOfInstances: Int = 0, actorAddress: Option[String] = None): Set[String] = {
var replicaNames = Set.empty[String]
val nrOfClusterNodes = nodeConnections.get.connections.size
if (replicationFactor < 1) return replicaNames
if (nrOfClusterNodes < replicationFactor) throw new IllegalArgumentException(
"Replication factor [" + replicationFactor +
if (nrOfInstances < 1) return replicaNames
if (nrOfClusterNodes < nrOfInstances) throw new IllegalArgumentException(
"Replication factor [" + nrOfInstances +
"] is greater than the number of available nodeNames [" + nrOfClusterNodes + "]")
val preferredNodes =
@ -1228,7 +1228,7 @@ class DefaultClusterNode private[akka] (
// use 'preferred-nodes' in deployment config for the actor
Deployer.deploymentFor(actorAddress.get) match {
case Deploy(_, _, _, _, Cluster(nodes, _, _))
nodes map (node DeploymentConfig.nodeNameFor(node)) take replicationFactor
nodes map (node DeploymentConfig.nodeNameFor(node)) take nrOfInstances
case _
throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered")
}
@ -1243,11 +1243,11 @@ class DefaultClusterNode private[akka] (
val nrOfCurrentReplicaNames = replicaNames.size
val replicaSet =
if (nrOfCurrentReplicaNames > replicationFactor) throw new IllegalStateException("Replica set is larger than replication factor")
else if (nrOfCurrentReplicaNames == replicationFactor) replicaNames
if (nrOfCurrentReplicaNames > nrOfInstances) throw new IllegalStateException("Replica set is larger than replication factor")
else if (nrOfCurrentReplicaNames == nrOfInstances) replicaNames
else {
val random = new java.util.Random(System.currentTimeMillis)
while (replicaNames.size < replicationFactor) {
while (replicaNames.size < nrOfInstances) {
replicaNames = replicaNames + membershipNodes(random.nextInt(nrOfClusterNodes))
}
replicaNames
@ -1260,12 +1260,12 @@ class DefaultClusterNode private[akka] (
}
/**
* Returns a random set with replica connections of size 'replicationFactor'.
* Default replicationFactor is 0, which returns the empty Set.
* Returns a random set with replica connections of size 'nrOfInstances'.
* Default nrOfInstances is 0, which returns the empty Set.
*/
private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Set[ActorRef] = {
private def nodeConnectionsForNrOfInstances(nrOfInstances: Int = 0, actorAddress: Option[String] = None): Set[ActorRef] = {
for {
node nodesForReplicationFactor(replicationFactor, actorAddress)
node nodesForNrOfInstances(nrOfInstances, actorAddress)
connectionOption nodeConnections.get.connections(node)
connection connectionOption
actorRef connection._2