Added 'localActorOf' method to get an actor that by-passes the deployment. Made use of it in EventHandler

This commit is contained in:
Jonas Bonér 2011-06-17 11:56:08 +02:00
parent a6bdf9d9aa
commit 1997d971a0
4 changed files with 20 additions and 8 deletions

View file

@ -313,6 +313,18 @@ object Actor extends ListenerManagement {
createActor(address, () new LocalActorRef(() creator.create, address, Transient)) createActor(address, () new LocalActorRef(() creator.create, address, Transient))
} }
def localActorOf[T <: Actor: Manifest]: ActorRef = {
newLocalActorRef(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], new UUID().toString)
}
def localActorOf[T <: Actor](clazz: Class[T]): ActorRef = {
newLocalActorRef(clazz, new UUID().toString)
}
def localActorOf[T <: Actor](factory: T): ActorRef = {
new LocalActorRef(() factory, new UUID().toString, Transient)
}
/** /**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when * Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed. * the block has been executed.

View file

@ -17,7 +17,7 @@ import akka.AkkaException
* <p/> * <p/>
* Create, add and remove a listener: * Create, add and remove a listener:
* <pre> * <pre>
* val eventHandlerListener = Actor.actorOf(new Actor { * val eventHandlerListener = Actor.localActorOf(new Actor {
* self.dispatcher = EventHandler.EventHandlerDispatcher * self.dispatcher = EventHandler.EventHandlerDispatcher
* *
* def receive = { * def receive = {
@ -111,7 +111,7 @@ object EventHandler extends ListenerManagement {
defaultListeners foreach { listenerName defaultListeners foreach { listenerName
try { try {
ReflectiveAccess.getClassFor[Actor](listenerName) match { ReflectiveAccess.getClassFor[Actor](listenerName) match {
case r: Right[_, Class[Actor]] addListener(Actor.actorOf(r.b, listenerName).start()) case r: Right[_, Class[Actor]] addListener(Actor.localActorOf(r.b).start())
case l: Left[Exception, _] throw l.a case l: Left[Exception, _] throw l.a
} }
} catch { } catch {

View file

@ -1281,7 +1281,7 @@ class DefaultClusterNode private[akka] (
} }
private[cluster] def joinMembershipNode() { private[cluster] def joinMembershipNode() {
nodeNameToAddress.put(nodeAddress.nodeName, remoteServerAddress) nodeNameToAddress += (nodeAddress.nodeName -> remoteServerAddress)
try { try {
EventHandler.info(this, EventHandler.info(this,
"Joining cluster as membership node [%s] on [%s]".format(nodeAddress, membershipNodePath)) "Joining cluster as membership node [%s] on [%s]".format(nodeAddress, membershipNodePath))
@ -1350,7 +1350,7 @@ class DefaultClusterNode private[akka] (
} }
// notify all available nodes that they should fail-over all connections from 'from' to 'to' // notify all available nodes that they should fail-over all connections from 'from' to 'to'
val from = nodeNameToAddress.get(failedNodeName) val from = nodeNameToAddress(failedNodeName)
val to = remoteServerAddress val to = remoteServerAddress
Serialization.serialize((from, to)) match { Serialization.serialize((from, to)) match {
case Left(error) throw error case Left(error) throw error
@ -1527,12 +1527,12 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
"MembershipChildListener at [%s] has children [%s]" "MembershipChildListener at [%s] has children [%s]"
.format(self.nodeAddress.nodeName, childList.mkString(" "))) .format(self.nodeAddress.nodeName, childList.mkString(" ")))
self.findNewlyConnectedMembershipNodes(childList) foreach { name self.findNewlyConnectedMembershipNodes(childList) foreach { name
self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map self.addressForNode(name) foreach (address self.nodeNameToAddress += (name -> address)) // update 'nodename-address' map
self.publish(NodeConnected(name)) self.publish(NodeConnected(name))
} }
self.findNewlyDisconnectedMembershipNodes(childList) foreach { name self.findNewlyDisconnectedMembershipNodes(childList) foreach { name
self.nodeNameToAddress.remove(name) // update 'nodename-address' map self.nodeNameToAddress - name // update 'nodename-address' map
self.publish(NodeDisconnected(name)) self.publish(NodeDisconnected(name))
} }

View file

@ -21,7 +21,7 @@ import scala.collection.JavaConversions.collectionAsScalaIterable
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import java.util.concurrent.CountDownLatch import java.util.concurrent.{ CountDownLatch, TimeUnit }
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
/** /**
@ -154,7 +154,7 @@ object ClusterDeployer {
deploymentInProgressLock.unlock() // signal deployment complete deploymentInProgressLock.unlock() // signal deployment complete
} else { } else {
deploymentCompleted.await() // wait until deployment is completed by other "master" node deploymentCompleted.await(30, TimeUnit.SECONDS) // wait until deployment is completed by other "master" node
} }
} }