Fixed clustered management of actor serializer.
Various renames and refactorings. Changed all internal usages of 'actorOf' to 'localActorOf'. Signed-off-by: Jonas Bonér <jonasremove@jonasboner.com>
This commit is contained in:
parent
1ad99bd610
commit
4d31751793
13 changed files with 200 additions and 303 deletions
|
|
@ -251,7 +251,7 @@ class ActorRefSpec extends WordSpec with MustMatchers {
|
|||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
(intercept[java.lang.IllegalStateException] {
|
||||
in.readObject
|
||||
}).getMessage must be === "Trying to deserialize ActorRef (" + expectedSerializedRepresentation + ") but it's not found in the local registry and remoting is not enabled!"
|
||||
}).getMessage must be === "Trying to deserialize ActorRef [" + expectedSerializedRepresentation + "] but it's not found in the local registry and remoting is not enabled."
|
||||
}
|
||||
|
||||
"support nested actorOfs" in {
|
||||
|
|
|
|||
|
|
@ -426,7 +426,7 @@ object Actor extends ListenerManagement {
|
|||
"] since " + reason)
|
||||
|
||||
val serializer: Serializer =
|
||||
akka.serialization.Serialization.getSerializer(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s)
|
||||
akka.serialization.Serialization.serializerFor(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s)
|
||||
|
||||
def storeActorAndGetClusterRef(replicationScheme: ReplicationScheme, serializer: Serializer): ActorRef = {
|
||||
// add actor to cluster registry (if not already added)
|
||||
|
|
|
|||
|
|
@ -461,7 +461,7 @@ class LocalActorRef private[akka] (
|
|||
"]")
|
||||
|
||||
private val serializer: Serializer =
|
||||
akka.serialization.Serialization.getSerializer(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s)
|
||||
akka.serialization.Serialization.serializerFor(this.getClass).fold(x ⇒ serializerErrorDueTo(x.toString), s ⇒ s)
|
||||
|
||||
private lazy val replicationStorage: Either[TransactionLog, AnyRef] = {
|
||||
replicationScheme match {
|
||||
|
|
@ -787,17 +787,19 @@ class LocalActorRef private[akka] (
|
|||
true
|
||||
|
||||
case _ ⇒ // either permanent or none where default is permanent
|
||||
val success = try {
|
||||
performRestart()
|
||||
true
|
||||
} catch {
|
||||
case e ⇒
|
||||
EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
|
||||
false // an error or exception here should trigger a retry
|
||||
}
|
||||
finally {
|
||||
currentMessage = null
|
||||
}
|
||||
val success =
|
||||
try {
|
||||
performRestart()
|
||||
true
|
||||
} catch {
|
||||
case e ⇒
|
||||
EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
|
||||
false // an error or exception here should trigger a retry
|
||||
}
|
||||
finally {
|
||||
currentMessage = null
|
||||
}
|
||||
|
||||
if (success) {
|
||||
_status = ActorRefInternals.RUNNING
|
||||
dispatcher.resume(this)
|
||||
|
|
@ -1220,11 +1222,11 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR
|
|||
def reply_?(message: Any): Boolean = channel.safe_!(message)(this)
|
||||
}
|
||||
|
||||
case class SerializedActorRef(val uuid: Uuid,
|
||||
val address: String,
|
||||
val hostname: String,
|
||||
val port: Int,
|
||||
val timeout: Long) {
|
||||
case class SerializedActorRef(uuid: Uuid,
|
||||
address: String,
|
||||
hostname: String,
|
||||
port: Int,
|
||||
timeout: Long) {
|
||||
@throws(classOf[java.io.ObjectStreamException])
|
||||
def readResolve(): AnyRef = Actor.registry.local.actorFor(uuid) match {
|
||||
case Some(actor) ⇒ actor
|
||||
|
|
@ -1233,7 +1235,7 @@ case class SerializedActorRef(val uuid: Uuid,
|
|||
RemoteActorRef(new InetSocketAddress(hostname, port), address, timeout, None)
|
||||
else
|
||||
throw new IllegalStateException(
|
||||
"Trying to deserialize ActorRef (" + this +
|
||||
") but it's not found in the local registry and remoting is not enabled!")
|
||||
"Trying to deserialize ActorRef [" + this +
|
||||
"] but it's not found in the local registry and remoting is not enabled.")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,7 +104,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act
|
|||
private val _childActors = new ConcurrentHashMap[String, List[ActorRef]]
|
||||
private val _childSupervisors = new CopyOnWriteArrayList[Supervisor]
|
||||
|
||||
private[akka] val supervisor = actorOf(new SupervisorActor(handler, maxRestartsHandler)).start()
|
||||
private[akka] val supervisor = localActorOf(new SupervisorActor(handler, maxRestartsHandler)).start()
|
||||
|
||||
def uuid = supervisor.uuid
|
||||
|
||||
|
|
|
|||
|
|
@ -139,7 +139,6 @@ trait ClusterNode {
|
|||
def remoteServerAddress: InetSocketAddress
|
||||
|
||||
val isConnected = new Switch(false)
|
||||
val isLeader = new AtomicBoolean(false)
|
||||
val electionNumber = new AtomicInteger(Int.MaxValue)
|
||||
|
||||
private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
|
||||
|
|
@ -168,6 +167,11 @@ trait ClusterNode {
|
|||
*/
|
||||
def leader: String
|
||||
|
||||
/**
|
||||
* Returns true if 'this' node is the current leader.
|
||||
*/
|
||||
def isLeader: Boolean
|
||||
|
||||
/**
|
||||
* Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op.
|
||||
*/
|
||||
|
|
@ -178,122 +182,122 @@ trait ClusterNode {
|
|||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], format: Serializer): ClusterNode
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, format: Serializer): ClusterNode
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, format: Serializer): ClusterNode
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, format: Serializer): ClusterNode
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, format: Serializer): ClusterNode
|
||||
def store(actorRef: ActorRef, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode
|
||||
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, format: Serializer): ClusterNode
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, serializeMailbox: Boolean, format: Serializer): ClusterNode
|
||||
def store(actorRef: ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode
|
||||
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Needed to have reflection through structural typing work.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode
|
||||
|
||||
/**
|
||||
* Needed to have reflection through structural typing work.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: AnyRef): ClusterNode
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode
|
||||
|
||||
/**
|
||||
* Removes actor with uuid from the cluster.
|
||||
|
|
@ -330,7 +334,7 @@ trait ClusterNode {
|
|||
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
||||
* for remote access through lookup by its UUID.
|
||||
*/
|
||||
def use[T <: Actor](actorAddress: String, format: Serializer): Option[ActorRef]
|
||||
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef]
|
||||
|
||||
/**
|
||||
* Using (checking out) all actors with a specific UUID on all nodes in the cluster.
|
||||
|
|
@ -418,9 +422,9 @@ trait ClusterNode {
|
|||
def addressesForActorsInUseOnNode(nodeName: String): Array[String]
|
||||
|
||||
/**
|
||||
* Returns Format for actor with UUID.
|
||||
* Returns Serializer for actor with UUID.
|
||||
*/
|
||||
def formatForActor(actorAddress: String): Serializer
|
||||
def serializerForActor(actorAddress: String): Serializer
|
||||
|
||||
/**
|
||||
* Returns home address for actor with UUID.
|
||||
|
|
@ -478,9 +482,9 @@ trait ClusterNode {
|
|||
|
||||
private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String]
|
||||
|
||||
private[cluster] def joinMembershipNode()
|
||||
private[cluster] def joinMembershipPath()
|
||||
|
||||
private[cluster] def joinActorsAtAddressNode()
|
||||
private[cluster] def joinActorsAtAddressPath()
|
||||
|
||||
private[cluster] def joinLeaderElection: Boolean
|
||||
|
||||
|
|
@ -504,7 +508,7 @@ trait ClusterNode {
|
|||
|
||||
private[cluster] def actorRegistryPathFor(uuid: UUID): String
|
||||
|
||||
private[cluster] def actorRegistryFormatPathFor(uuid: UUID): String
|
||||
private[cluster] def actorRegistrySerializerPathFor(uuid: UUID): String
|
||||
|
||||
private[cluster] def actorRegistryActorAddressPathFor(uuid: UUID): String
|
||||
|
||||
|
|
|
|||
|
|
@ -169,7 +169,7 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
|
|||
val eventHandler: ActorRef = {
|
||||
implicit object format extends StatelessActorFormat[RemoteEventHandler]
|
||||
val clazz = classOf[RemoteEventHandler]
|
||||
val handler = Actor.actorOf(clazz, clazz.getName).start()
|
||||
val handler = Actor.localActorOf(clazz, clazz.getName).start()
|
||||
// add the remote client and server listener that pipes the events to the event handler system
|
||||
addListener(handler)
|
||||
handler
|
||||
|
|
|
|||
|
|
@ -134,7 +134,7 @@ object Routing {
|
|||
* Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
|
||||
*/
|
||||
def loadBalancerActor(actors: ⇒ InfiniteIterator[ActorRef]): ActorRef =
|
||||
actorOf(new Actor with LoadBalancer {
|
||||
localActorOf(new Actor with LoadBalancer {
|
||||
val seq = actors
|
||||
}).start()
|
||||
|
||||
|
|
@ -142,7 +142,7 @@ object Routing {
|
|||
* Creates a Router given a routing and a message-transforming function.
|
||||
*/
|
||||
def routerActor(routing: PF[Any, ActorRef], msgTransformer: (Any) ⇒ Any): ActorRef =
|
||||
actorOf(new Actor with Router {
|
||||
localActorOf(new Actor with Router {
|
||||
override def transform(msg: Any) = msgTransformer(msg)
|
||||
def routes = routing
|
||||
}).start()
|
||||
|
|
@ -150,7 +150,7 @@ object Routing {
|
|||
/**
|
||||
* Creates a Router given a routing.
|
||||
*/
|
||||
def routerActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Router {
|
||||
def routerActor(routing: PF[Any, ActorRef]): ActorRef = localActorOf(new Actor with Router {
|
||||
def routes = routing
|
||||
}).start()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,29 +1,34 @@
|
|||
package akka.serialization
|
||||
|
||||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.serialization
|
||||
|
||||
import akka.util.ReflectiveAccess._
|
||||
import akka.config.Config
|
||||
import akka.config.Config._
|
||||
import akka.actor.{ ActorRef, Actor }
|
||||
import akka.AkkaException
|
||||
|
||||
/**
|
||||
* Serialization module. Contains methods for serialization and deserialization as well as
|
||||
* locating a Serializer for a particular class as defined in the mapping in the 'akka.conf' file.
|
||||
*/
|
||||
object Serialization {
|
||||
case class NoSerializerFoundException(m: String) extends Exception(m)
|
||||
case class NoSerializerFoundException(m: String) extends AkkaException(m)
|
||||
|
||||
def serialize(o: AnyRef): Either[Exception, Array[Byte]] =
|
||||
getSerializer(o.getClass).fold((ex) ⇒ Left(ex), (ser) ⇒ Right(ser.toBinary(o)))
|
||||
serializerFor(o.getClass).fold((ex) ⇒ Left(ex), (ser) ⇒ Right(ser.toBinary(o)))
|
||||
|
||||
def deserialize(
|
||||
bytes: Array[Byte],
|
||||
clazz: Class[_],
|
||||
classLoader: Option[ClassLoader]): Either[Exception, AnyRef] =
|
||||
getSerializer(clazz)
|
||||
serializerFor(clazz)
|
||||
.fold((ex) ⇒ Left(ex),
|
||||
(ser) ⇒ Right(ser.fromBinary(bytes, Some(clazz), classLoader)))
|
||||
|
||||
def getSerializer(clazz: Class[_]): Either[Exception, Serializer] = {
|
||||
def serializerFor(clazz: Class[_]): Either[Exception, Serializer] = {
|
||||
Config.serializerMap.get(clazz.getName) match {
|
||||
case Some(serializerName: String) ⇒
|
||||
getClassFor(serializerName) match {
|
||||
|
|
@ -31,14 +36,14 @@ object Serialization {
|
|||
case Left(exception) ⇒ Left(exception)
|
||||
}
|
||||
case _ ⇒
|
||||
getDefaultSerializer match {
|
||||
defaultSerializer match {
|
||||
case Some(s: Serializer) ⇒ Right(s)
|
||||
case None ⇒ Left(NoSerializerFoundException("No default serializer found for " + clazz))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def getDefaultSerializer = {
|
||||
private def defaultSerializer = {
|
||||
Config.serializers.get("default") match {
|
||||
case Some(ser: String) ⇒
|
||||
getClassFor(ser) match {
|
||||
|
|
|
|||
|
|
@ -32,8 +32,8 @@ private[camel] object TypedCamel {
|
|||
* and re-uses the <code>activationTracker</code> of <code>service</code>.
|
||||
*/
|
||||
def onCamelServiceStart(service: CamelService) {
|
||||
consumerPublisher = actorOf(new TypedConsumerPublisher(service.activationTracker))
|
||||
publishRequestor = actorOf(new TypedConsumerPublishRequestor)
|
||||
consumerPublisher = localActorOf(new TypedConsumerPublisher(service.activationTracker))
|
||||
publishRequestor = localActorOf(new TypedConsumerPublishRequestor)
|
||||
|
||||
registerPublishRequestor
|
||||
|
||||
|
|
|
|||
|
|
@ -26,9 +26,9 @@ import TypedCamelAccess._
|
|||
* @author Martin Krasser
|
||||
*/
|
||||
trait CamelService extends Bootable {
|
||||
private[camel] val activationTracker = actorOf(new ActivationTracker)
|
||||
private[camel] val consumerPublisher = actorOf(new ConsumerPublisher(activationTracker))
|
||||
private[camel] val publishRequestor = actorOf(new ConsumerPublishRequestor)
|
||||
private[camel] val activationTracker = localActorOf(new ActivationTracker)
|
||||
private[camel] val consumerPublisher = localActorOf(new ConsumerPublisher(activationTracker))
|
||||
private[camel] val publishRequestor = localActorOf(new ConsumerPublishRequestor)
|
||||
|
||||
private val serviceEnabled = config.getList("akka.enabled-modules").exists(_ == "camel")
|
||||
|
||||
|
|
|
|||
|
|
@ -312,25 +312,25 @@ class DefaultClusterNode private[akka] (
|
|||
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
|
||||
|
||||
// static nodes
|
||||
val CLUSTER_NODE = "/" + nodeAddress.clusterName
|
||||
val MEMBERSHIP_NODE = CLUSTER_NODE + "/members"
|
||||
val CONFIGURATION_NODE = CLUSTER_NODE + "/config"
|
||||
val PROVISIONING_NODE = CLUSTER_NODE + "/provisioning"
|
||||
val ACTOR_REGISTRY_NODE = CLUSTER_NODE + "/actor-registry"
|
||||
val ACTOR_LOCATIONS_NODE = CLUSTER_NODE + "/actor-locations"
|
||||
val ACTOR_ADDRESS_TO_UUIDS_NODE = CLUSTER_NODE + "/actor-address-to-uuids"
|
||||
val ACTORS_AT_NODE_NODE = CLUSTER_NODE + "/actors-at-address"
|
||||
val baseNodes = List(
|
||||
CLUSTER_NODE,
|
||||
MEMBERSHIP_NODE,
|
||||
ACTOR_REGISTRY_NODE,
|
||||
ACTOR_LOCATIONS_NODE,
|
||||
ACTORS_AT_NODE_NODE,
|
||||
ACTOR_ADDRESS_TO_UUIDS_NODE,
|
||||
CONFIGURATION_NODE,
|
||||
PROVISIONING_NODE)
|
||||
val CLUSTER_PATH = "/" + nodeAddress.clusterName
|
||||
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
|
||||
val CONFIGURATION_PATH = CLUSTER_PATH + "/config"
|
||||
val PROVISIONING_PATH = CLUSTER_PATH + "/provisioning"
|
||||
val ACTOR_REGISTRY_PATH = CLUSTER_PATH + "/actor-registry"
|
||||
val ACTOR_LOCATIONS_PATH = CLUSTER_PATH + "/actor-locations"
|
||||
val ACTOR_ADDRESS_TO_UUIDS_PATH = CLUSTER_PATH + "/actor-address-to-uuids"
|
||||
val ACTORS_AT_PATH_PATH = CLUSTER_PATH + "/actors-at-address"
|
||||
val basePaths = List(
|
||||
CLUSTER_PATH,
|
||||
MEMBERSHIP_PATH,
|
||||
ACTOR_REGISTRY_PATH,
|
||||
ACTOR_LOCATIONS_PATH,
|
||||
ACTORS_AT_PATH_PATH,
|
||||
ACTOR_ADDRESS_TO_UUIDS_PATH,
|
||||
CONFIGURATION_PATH,
|
||||
PROVISIONING_PATH)
|
||||
|
||||
val LEADER_ELECTION_NODE = CLUSTER_NODE + "/leader" // should NOT be part of 'baseNodes' only used by 'leaderLock'
|
||||
val LEADER_ELECTION_PATH = CLUSTER_PATH + "/leader" // should NOT be part of 'basePaths' only used by 'leaderLock'
|
||||
|
||||
private val membershipNodePath = membershipPathFor(nodeAddress.nodeName)
|
||||
|
||||
|
|
@ -355,27 +355,18 @@ class DefaultClusterNode private[akka] (
|
|||
lazy private[cluster] val leaderElectionCallback = new LockListener {
|
||||
override def lockAcquired() {
|
||||
EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName))
|
||||
self.isLeader.set(true)
|
||||
self.publish(NewLeader(self.nodeAddress.nodeName))
|
||||
}
|
||||
|
||||
override def lockReleased() {
|
||||
EventHandler.info(this,
|
||||
"Node [%s] is *NOT* the leader anymore".format(self.nodeAddress.nodeName))
|
||||
self.isLeader.set(false)
|
||||
// self.publish(Cluster.LeaderChange)
|
||||
EventHandler.info(this, "Node [%s] is *NOT* the leader anymore".format(self.nodeAddress.nodeName))
|
||||
}
|
||||
}
|
||||
|
||||
lazy private[cluster] val leaderLock = new WriteLock(
|
||||
zkClient.connection.getZookeeper, LEADER_ELECTION_NODE, null, leaderElectionCallback) {
|
||||
|
||||
// ugly hack, but what do you do? <--- haha epic
|
||||
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
|
||||
ownerIdField.setAccessible(true)
|
||||
|
||||
def leader: String = ownerIdField.get(this).asInstanceOf[String]
|
||||
}
|
||||
zkClient.connection.getZookeeper,
|
||||
LEADER_ELECTION_PATH, null,
|
||||
leaderElectionCallback)
|
||||
|
||||
if (enableJMX) createMBean
|
||||
|
||||
|
|
@ -449,15 +440,20 @@ class DefaultClusterNode private[akka] (
|
|||
// =======================================
|
||||
|
||||
/**
|
||||
* Returns the name of the current leader.
|
||||
* Returns the name of the current leader lock.
|
||||
*/
|
||||
def leader: String = leaderLock.leader
|
||||
def leader: String = leaderLock.getId
|
||||
|
||||
/**
|
||||
* Returns true if 'this' node is the current leader.
|
||||
*/
|
||||
def isLeader: Boolean = leaderLock.isOwner
|
||||
|
||||
/**
|
||||
* Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op.
|
||||
*/
|
||||
def resign() {
|
||||
if (isLeader.get) leaderLock.unlock()
|
||||
if (isLeader) leaderLock.unlock()
|
||||
}
|
||||
|
||||
// =======================================
|
||||
|
|
@ -469,132 +465,132 @@ class DefaultClusterNode private[akka] (
|
|||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], format: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, 0, Transient, false, format)
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], serializer: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, 0, Transient, false, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, false, format)
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, false, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, format: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, false, format)
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializer: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, false, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, false, format)
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, false, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, 0, Transient, serializeMailbox, format)
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, 0, Transient, serializeMailbox, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, serializeMailbox, format)
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, serializeMailbox, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, serializeMailbox, format)
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, serializeMailbox, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, serializeMailbox, format)
|
||||
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
|
||||
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, serializeMailbox, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, format: Serializer): ClusterNode =
|
||||
store(actorRef, 0, Transient, false, format)
|
||||
def store(actorRef: ActorRef, serializer: Serializer): ClusterNode =
|
||||
store(actorRef, 0, Transient, false, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
|
||||
store(actorRef, 0, replicationScheme, false, format)
|
||||
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode =
|
||||
store(actorRef, 0, replicationScheme, false, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, format: Serializer): ClusterNode =
|
||||
store(actorRef, replicationFactor, Transient, false, format)
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializer: Serializer): ClusterNode =
|
||||
store(actorRef, replicationFactor, Transient, false, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
|
||||
store(actorRef, replicationFactor, replicationScheme, false, format)
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializer: Serializer): ClusterNode =
|
||||
store(actorRef, replicationFactor, replicationScheme, false, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
||||
store(actorRef, 0, Transient, serializeMailbox, format)
|
||||
def store(actorRef: ActorRef, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
|
||||
store(actorRef, 0, Transient, serializeMailbox, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
||||
store(actorRef, replicationFactor, Transient, serializeMailbox, format)
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
|
||||
store(actorRef, replicationFactor, Transient, serializeMailbox, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
||||
* available durable store.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
||||
store(actorRef, 0, replicationScheme, serializeMailbox, format)
|
||||
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: Serializer): ClusterNode =
|
||||
store(actorRef, 0, replicationScheme, serializeMailbox, serializer)
|
||||
|
||||
/**
|
||||
* Needed to have reflection through structural typing work.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
|
||||
store(actorRef, replicationFactor, replicationScheme, serializeMailbox, format.asInstanceOf[Serializer])
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode =
|
||||
store(actorRef, replicationFactor, replicationScheme, serializeMailbox, serializer.asInstanceOf[Serializer])
|
||||
|
||||
/**
|
||||
* Needed to have reflection through structural typing work.
|
||||
*/
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
|
||||
store(actorRef, replicationFactor, Transient, serializeMailbox, format)
|
||||
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, serializer: AnyRef): ClusterNode =
|
||||
store(actorRef, replicationFactor, Transient, serializeMailbox, serializer)
|
||||
|
||||
/**
|
||||
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
||||
|
|
@ -606,13 +602,15 @@ class DefaultClusterNode private[akka] (
|
|||
replicationFactor: Int,
|
||||
replicationScheme: ReplicationScheme,
|
||||
serializeMailbox: Boolean,
|
||||
format: Serializer): ClusterNode = if (isConnected.isOn) {
|
||||
serializer: Serializer): ClusterNode = if (isConnected.isOn) {
|
||||
|
||||
import akka.serialization.ActorSerialization._
|
||||
|
||||
if (!actorRef.isInstanceOf[LocalActorRef]) throw new IllegalArgumentException(
|
||||
"'actorRef' must be an instance of 'LocalActorRef' [" + actorRef.getClass.getName + "]")
|
||||
|
||||
val serializerClassName = serializer.getClass.getName
|
||||
|
||||
val uuid = actorRef.uuid
|
||||
EventHandler.debug(this,
|
||||
"Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid))
|
||||
|
|
@ -641,11 +639,11 @@ class DefaultClusterNode private[akka] (
|
|||
case Right(exception) ⇒ actorRegistryPath
|
||||
}
|
||||
|
||||
// create UUID -> Format registry
|
||||
// create UUID -> serializer class name registry
|
||||
try {
|
||||
zkClient.createPersistent(actorRegistryFormatPathFor(uuid), format)
|
||||
zkClient.createPersistent(actorRegistrySerializerPathFor(uuid), serializerClassName)
|
||||
} catch {
|
||||
case e: ZkNodeExistsException ⇒ zkClient.writeData(actorRegistryFormatPathFor(uuid), format)
|
||||
case e: ZkNodeExistsException ⇒ zkClient.writeData(actorRegistrySerializerPathFor(uuid), serializerClassName)
|
||||
}
|
||||
|
||||
// create UUID -> ADDRESS registry
|
||||
|
|
@ -748,13 +746,13 @@ class DefaultClusterNode private[akka] (
|
|||
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
||||
* for remote access through lookup by its UUID.
|
||||
*/
|
||||
def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, formatForActor(actorAddress))
|
||||
def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, serializerForActor(actorAddress))
|
||||
|
||||
/**
|
||||
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
||||
* for remote access through lookup by its UUID.
|
||||
*/
|
||||
def use[T <: Actor](actorAddress: String, format: Serializer): Option[ActorRef] = if (isConnected.isOn) {
|
||||
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.isOn) {
|
||||
|
||||
import akka.serialization.ActorSerialization._
|
||||
|
||||
|
|
@ -916,7 +914,7 @@ class DefaultClusterNode private[akka] (
|
|||
* Returns the UUIDs of all actors registered in this cluster.
|
||||
*/
|
||||
def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) {
|
||||
zkClient.getChildren(ACTOR_REGISTRY_NODE).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
|
||||
zkClient.getChildren(ACTOR_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
|
||||
} else Array.empty[UUID]
|
||||
|
||||
/**
|
||||
|
|
@ -1009,22 +1007,30 @@ class DefaultClusterNode private[akka] (
|
|||
} else Array.empty[String]
|
||||
|
||||
/**
|
||||
* Returns Format for actor with UUID.
|
||||
* Returns Serializer for actor with specific address.
|
||||
*/
|
||||
def formatForActor(actorAddress: String): Serializer = {
|
||||
def serializerForActor(actorAddress: String): Serializer = {
|
||||
// FIXME should only be 1 single class name per actor address - FIX IT
|
||||
|
||||
val formats = actorUuidsForActorAddress(actorAddress) map { uuid ⇒
|
||||
val serializerClassNames = actorUuidsForActorAddress(actorAddress) map { uuid ⇒
|
||||
try {
|
||||
Some(zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Serializer])
|
||||
Some(zkClient.readData(actorRegistrySerializerPathFor(uuid), new Stat).asInstanceOf[String])
|
||||
} catch {
|
||||
case e: ZkNoNodeException ⇒ None
|
||||
}
|
||||
} filter (_.isDefined) map (_.get)
|
||||
|
||||
if (formats.isEmpty) throw new IllegalStateException("No Serializer found for [%s]".format(actorAddress))
|
||||
if (formats.forall(_ == formats.head) == false) throw new IllegalStateException("Multiple Serializer classes found for [%s]".format(actorAddress))
|
||||
if (serializerClassNames.isEmpty) throw new IllegalStateException("No serializer found for actor with address [%s]".format(actorAddress))
|
||||
if (serializerClassNames.forall(_ == serializerClassNames.head) == false)
|
||||
throw new IllegalStateException("Multiple serializers found for actor with address [%s]".format(actorAddress))
|
||||
|
||||
formats.head
|
||||
val serializerClassName = serializerClassNames.head
|
||||
ReflectiveAccess.getClassFor(serializerClassName) match { // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess.
|
||||
case Right(clazz) ⇒ clazz.newInstance.asInstanceOf[Serializer]
|
||||
case Left(error) ⇒
|
||||
EventHandler.error(error, this, "Could not load serializer class [%s] due to: %s".format(serializerClassName, error.toString))
|
||||
throw error
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -1160,30 +1166,30 @@ class DefaultClusterNode private[akka] (
|
|||
}
|
||||
}
|
||||
|
||||
def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_NODE).toList.toArray.asInstanceOf[Array[String]]
|
||||
def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_PATH).toList.toArray.asInstanceOf[Array[String]]
|
||||
|
||||
// =======================================
|
||||
// Private
|
||||
// =======================================
|
||||
|
||||
private[cluster] def membershipPathFor(node: String) = "%s/%s".format(MEMBERSHIP_NODE, node)
|
||||
private[cluster] def membershipPathFor(node: String) = "%s/%s".format(MEMBERSHIP_PATH, node)
|
||||
|
||||
private[cluster] def configurationPathFor(key: String) = "%s/%s".format(CONFIGURATION_NODE, key)
|
||||
private[cluster] def configurationPathFor(key: String) = "%s/%s".format(CONFIGURATION_PATH, key)
|
||||
|
||||
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String) = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_NODE, actorAddress.replace('.', '_'))
|
||||
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String) = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_PATH, actorAddress.replace('.', '_'))
|
||||
|
||||
private[cluster] def actorLocationsPathFor(uuid: UUID) = "%s/%s".format(ACTOR_LOCATIONS_NODE, uuid)
|
||||
private[cluster] def actorLocationsPathFor(uuid: UUID) = "%s/%s".format(ACTOR_LOCATIONS_PATH, uuid)
|
||||
|
||||
private[cluster] def actorLocationsPathFor(uuid: UUID, node: NodeAddress) =
|
||||
"%s/%s/%s".format(ACTOR_LOCATIONS_NODE, uuid, node.nodeName)
|
||||
"%s/%s/%s".format(ACTOR_LOCATIONS_PATH, uuid, node.nodeName)
|
||||
|
||||
private[cluster] def actorsAtNodePathFor(node: String) = "%s/%s".format(ACTORS_AT_NODE_NODE, node)
|
||||
private[cluster] def actorsAtNodePathFor(node: String) = "%s/%s".format(ACTORS_AT_PATH_PATH, node)
|
||||
|
||||
private[cluster] def actorAtNodePathFor(node: String, uuid: UUID) = "%s/%s/%s".format(ACTORS_AT_NODE_NODE, node, uuid)
|
||||
private[cluster] def actorAtNodePathFor(node: String, uuid: UUID) = "%s/%s/%s".format(ACTORS_AT_PATH_PATH, node, uuid)
|
||||
|
||||
private[cluster] def actorRegistryPathFor(uuid: UUID) = "%s/%s".format(ACTOR_REGISTRY_NODE, uuid)
|
||||
private[cluster] def actorRegistryPathFor(uuid: UUID) = "%s/%s".format(ACTOR_REGISTRY_PATH, uuid)
|
||||
|
||||
private[cluster] def actorRegistryFormatPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "format")
|
||||
private[cluster] def actorRegistrySerializerPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "serializer")
|
||||
|
||||
private[cluster] def actorRegistryActorAddressPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "address")
|
||||
|
||||
|
|
@ -1206,9 +1212,9 @@ class DefaultClusterNode private[akka] (
|
|||
val isLeader = joinLeaderElection()
|
||||
if (isLeader) createNodeStructureIfNeeded()
|
||||
registerListeners()
|
||||
joinMembershipNode()
|
||||
joinActorsAtAddressNode()
|
||||
fetchMembershipChildrenNodes()
|
||||
joinMembershipPath()
|
||||
joinActorsAtAddressPath()
|
||||
fetchMembershipNodes()
|
||||
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
|
||||
}
|
||||
|
||||
|
|
@ -1280,7 +1286,7 @@ class DefaultClusterNode private[akka] (
|
|||
}
|
||||
}
|
||||
|
||||
private[cluster] def joinMembershipNode() {
|
||||
private[cluster] def joinMembershipPath() {
|
||||
nodeNameToAddress += (nodeAddress.nodeName -> remoteServerAddress)
|
||||
try {
|
||||
EventHandler.info(this,
|
||||
|
|
@ -1294,13 +1300,17 @@ class DefaultClusterNode private[akka] (
|
|||
}
|
||||
}
|
||||
|
||||
private[cluster] def joinActorsAtAddressNode() {
|
||||
private[cluster] def joinActorsAtAddressPath() {
|
||||
ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName)))
|
||||
}
|
||||
|
||||
private[cluster] def joinLeaderElection(): Boolean = {
|
||||
EventHandler.info(this, "Node [%s] is joining leader election".format(nodeAddress.nodeName))
|
||||
leaderLock.lock
|
||||
try {
|
||||
leaderLock.lock
|
||||
} catch {
|
||||
case e: KeeperException.NodeExistsException ⇒ false
|
||||
}
|
||||
}
|
||||
|
||||
private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) {
|
||||
|
|
@ -1336,8 +1346,8 @@ class DefaultClusterNode private[akka] (
|
|||
migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check
|
||||
NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress)
|
||||
|
||||
implicit val format: Serializer = formatForActor(actorAddress)
|
||||
use(actorAddress) foreach { actor ⇒
|
||||
val serializer: Serializer = serializerForActor(actorAddress)
|
||||
use(actorAddress, serializer) foreach { actor ⇒
|
||||
// FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)'
|
||||
//actor.homeAddress = remoteServerAddress
|
||||
val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress")
|
||||
|
|
@ -1417,13 +1427,13 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
private def createRootClusterNode() {
|
||||
ignore[ZkNodeExistsException] {
|
||||
zkClient.create(CLUSTER_NODE, null, CreateMode.PERSISTENT)
|
||||
EventHandler.info(this, "Created node [%s]".format(CLUSTER_NODE))
|
||||
zkClient.create(CLUSTER_PATH, null, CreateMode.PERSISTENT)
|
||||
EventHandler.info(this, "Created node [%s]".format(CLUSTER_PATH))
|
||||
}
|
||||
}
|
||||
|
||||
private def createNodeStructureIfNeeded() {
|
||||
baseNodes.foreach { path ⇒
|
||||
basePaths.foreach { path ⇒
|
||||
try {
|
||||
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
|
||||
EventHandler.debug(this, "Created node [%s]".format(path))
|
||||
|
|
@ -1438,16 +1448,16 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
private def registerListeners() = {
|
||||
zkClient.subscribeStateChanges(stateListener)
|
||||
zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
|
||||
zkClient.subscribeChildChanges(MEMBERSHIP_PATH, membershipListener)
|
||||
}
|
||||
|
||||
private def unregisterListeners() = {
|
||||
zkClient.unsubscribeStateChanges(stateListener)
|
||||
zkClient.unsubscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
|
||||
zkClient.unsubscribeChildChanges(MEMBERSHIP_PATH, membershipListener)
|
||||
}
|
||||
|
||||
private def fetchMembershipChildrenNodes() {
|
||||
val membershipChildren = zkClient.getChildren(MEMBERSHIP_NODE)
|
||||
private def fetchMembershipNodes() {
|
||||
val membershipChildren = zkClient.getChildren(MEMBERSHIP_PATH)
|
||||
locallyCachedMembershipNodes.clear()
|
||||
membershipChildren.iterator.foreach(locallyCachedMembershipNodes.add)
|
||||
}
|
||||
|
|
@ -1621,12 +1631,12 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
if (message.hasActorUuid) {
|
||||
for {
|
||||
address ← cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid))
|
||||
format ← cluster.formatForActor(address)
|
||||
} cluster.use(address, format)
|
||||
serializer ← cluster.serializerForActor(address)
|
||||
} cluster.use(address, serializer)
|
||||
|
||||
} else if (message.hasActorAddress) {
|
||||
val address = message.getActorAddress
|
||||
cluster.formatForActor(address) foreach (format ⇒ cluster.use(address, format))
|
||||
cluster.serializerForActor(address) foreach (serializer ⇒ cluster.use(address, serializer))
|
||||
|
||||
} else {
|
||||
EventHandler.warning(this,
|
||||
|
|
@ -1669,7 +1679,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
cluster.failOverConnections(from, to)
|
||||
|
||||
case FUNCTION_FUN0_UNIT ⇒
|
||||
actorOf(new Actor() {
|
||||
localActorOf(new Actor() {
|
||||
self.dispatcher = computeGridDispatcher
|
||||
|
||||
def receive = {
|
||||
|
|
@ -1682,7 +1692,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
}).start ! payloadFor(message, classOf[Function0[Unit]])
|
||||
|
||||
case FUNCTION_FUN0_ANY ⇒
|
||||
actorOf(new Actor() {
|
||||
localActorOf(new Actor() {
|
||||
self.dispatcher = computeGridDispatcher
|
||||
|
||||
def receive = {
|
||||
|
|
@ -1695,7 +1705,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
}).start forward payloadFor(message, classOf[Function0[Any]])
|
||||
|
||||
case FUNCTION_FUN1_ARG_UNIT ⇒
|
||||
actorOf(new Actor() {
|
||||
localActorOf(new Actor() {
|
||||
self.dispatcher = computeGridDispatcher
|
||||
|
||||
def receive = {
|
||||
|
|
@ -1708,7 +1718,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
|||
}).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
|
||||
|
||||
case FUNCTION_FUN1_ARG_ANY ⇒
|
||||
actorOf(new Actor() {
|
||||
localActorOf(new Actor() {
|
||||
self.dispatcher = computeGridDispatcher
|
||||
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -41,130 +41,6 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with
|
|||
var zkServer: ZkServer = _
|
||||
|
||||
"A ClusterNode" should {
|
||||
"and another cluster node should be able to agree on a leader election when starting up" in {
|
||||
val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-1-1", port = 9001))
|
||||
node1.start()
|
||||
|
||||
val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-1-2", port = 9002))
|
||||
node2.start()
|
||||
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
node1.leader must be(node2.leader)
|
||||
|
||||
node1.stop()
|
||||
node2.stop()
|
||||
node1.isRunning must be(false)
|
||||
node2.isRunning must be(false)
|
||||
}
|
||||
|
||||
"and two another cluster nodes should be able to agree on a leader election when starting up" in {
|
||||
val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-2-1", port = 9001))
|
||||
val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-2-2", port = 9002))
|
||||
val node3 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-2-3", port = 9003))
|
||||
|
||||
node1.start()
|
||||
node2.start()
|
||||
node3.start()
|
||||
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
node1.leader must be(node2.leader)
|
||||
node2.leader must be(node3.leader)
|
||||
node3.leader must be(node1.leader)
|
||||
|
||||
node1.stop()
|
||||
node2.stop()
|
||||
node3.stop()
|
||||
node1.isRunning must be(false)
|
||||
node2.isRunning must be(false)
|
||||
node3.isRunning must be(false)
|
||||
}
|
||||
|
||||
"and two another cluster nodes should be able to agree on a leader election the first is shut down" in {
|
||||
val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-3-1", port = 9001))
|
||||
val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-3-2", port = 9002))
|
||||
val node3 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-3-3", port = 9003))
|
||||
|
||||
node1.start()
|
||||
node2.start()
|
||||
node3.start()
|
||||
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
node1.leader must be(node2.leader)
|
||||
node2.leader must be(node3.leader)
|
||||
node3.leader must be(node1.leader)
|
||||
|
||||
node1.stop()
|
||||
node1.isRunning must be(false)
|
||||
Thread.sleep(500)
|
||||
node2.leader must be(node2.leaderLock.getId)
|
||||
node2.leader must be(node2.leader)
|
||||
|
||||
node2.stop()
|
||||
node2.isRunning must be(false)
|
||||
Thread.sleep(500)
|
||||
node3.leader must be(node3.leaderLock.getId)
|
||||
|
||||
node3.stop()
|
||||
node3.isRunning must be(false)
|
||||
}
|
||||
|
||||
"and two another cluster nodes should be able to agree on a leader election the second is shut down" in {
|
||||
val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-4-1", port = 9001))
|
||||
val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-4-2", port = 9002))
|
||||
val node3 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-4-3", port = 9003))
|
||||
|
||||
node1.start()
|
||||
node2.start()
|
||||
node3.start()
|
||||
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
node1.leader must be(node2.leader)
|
||||
node2.leader must be(node3.leader)
|
||||
node3.leader must be(node1.leader)
|
||||
|
||||
node2.stop()
|
||||
node2.isRunning must be(false)
|
||||
Thread.sleep(500)
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
node3.leader must be(node1.leader)
|
||||
|
||||
node3.stop()
|
||||
node3.isRunning must be(false)
|
||||
Thread.sleep(500)
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
|
||||
node1.stop()
|
||||
node1.isRunning must be(false)
|
||||
}
|
||||
|
||||
"and two another cluster nodes should be able to agree on a leader election the third is shut down" in {
|
||||
val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-5-1", port = 9001))
|
||||
val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-5-2", port = 9002))
|
||||
val node3 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "leader-5-3", port = 9003))
|
||||
|
||||
node1.start()
|
||||
node2.start()
|
||||
node3.start()
|
||||
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
node1.leader must be(node2.leader)
|
||||
node2.leader must be(node3.leader)
|
||||
node3.leader must be(node1.leader)
|
||||
|
||||
node3.stop()
|
||||
node3.isRunning must be(false)
|
||||
Thread.sleep(500)
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
node2.leader must be(node1.leader)
|
||||
|
||||
node2.stop()
|
||||
Thread.sleep(500)
|
||||
node2.isRunning must be(false)
|
||||
node1.leader must be(node1.leaderLock.getId)
|
||||
|
||||
node1.stop()
|
||||
node1.isRunning must be(false)
|
||||
}
|
||||
|
||||
"be able to cluster an actor by ActorRef" in {
|
||||
// create actor
|
||||
|
|
|
|||
|
|
@ -95,7 +95,7 @@ trait TestKit {
|
|||
* ActorRef of the test actor. Access is provided to enable e.g.
|
||||
* registration as message target.
|
||||
*/
|
||||
implicit val testActor = actorOf(new TestActor(queue)).start()
|
||||
implicit val testActor = localActorOf(new TestActor(queue)).start()
|
||||
|
||||
/**
|
||||
* Implicit sender reference so that replies are possible for messages sent
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue