- more style related cleanup

This commit is contained in:
Peter Veentjer 2011-05-18 08:37:58 +02:00
parent 61fb04affb
commit e7d1eaf15c
6 changed files with 501 additions and 384 deletions

View file

@ -87,7 +87,7 @@ class InvalidMessageException private[akka](message: String, cause: Throwab
* This message is thrown by default when an Actors behavior doesn't match a message
*/
case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception {
override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg)
override def getMessage = "Actor %s does not handle [%s]".format(ref, msg)
override def fillInStackTrace() = this //Don't waste cycles generating stack trace
}
@ -103,12 +103,12 @@ object Actor extends ListenerManagement {
*/
private[akka] lazy val shutdownHook = {
val hook = new Runnable {
override def run {
override def run() {
// Clear Thread.subclassAudits
val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits")
tf.setAccessible(true)
val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]]
subclassAudits synchronized {subclassAudits.clear}
subclassAudits synchronized {subclassAudits.clear()}
}
}
Runtime.getRuntime.addShutdownHook(new Thread(hook))
@ -385,6 +385,7 @@ object Actor extends ListenerManagement {
"] for serialization of actor [" + address +
"] since " + reason)
//todo: serializer is not used.
val serializer: Serializer = {
if (serializerClassName == "N/A") serializerErrorDueTo("no class name defined in configuration")
val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match {
@ -643,7 +644,7 @@ trait Actor {
/**
* Reverts the Actor behavior to the previous one in the hotswap stack.
*/
def unbecome(): Unit = {
def unbecome() {
val h = self.hotswap
if (h.nonEmpty) self.hotswap = h.pop
}
@ -666,19 +667,21 @@ trait Actor {
}
}
private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match {
case HotSwap(code, discardOld) => become(code(self), discardOld)
case RevertHotSwap => unbecome()
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop()
case Restart(reason) => throw reason
case Kill => throw new ActorKilledException("Kill")
case PoisonPill =>
val f = self.senderFuture
self.stop()
if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
private final def autoReceiveMessage(msg: AutoReceivedMessage) {
msg match {
case HotSwap(code, discardOld) => become(code(self), discardOld)
case RevertHotSwap => unbecome()
case Exit(dead, reason) => self.handleTrapExit(dead, reason)
case Link(child) => self.link(child)
case Unlink(child) => self.unlink(child)
case UnlinkAndStop(child) => self.unlink(child); child.stop()
case Restart(reason) => throw reason
case Kill => throw new ActorKilledException("Kill")
case PoisonPill =>
val f = self.senderFuture()
self.stop()
if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill"))
}
}
private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior

View file

@ -41,13 +41,15 @@ abstract class Channel[T] {
* Scala API. <p/>
* Sends the specified message to the channel.
*/
def !(msg: T): Unit
def !(msg: T)
/**
* Java API. <p/>
* Sends the specified message to the channel.
*/
def sendOneWay(msg: T): Unit = this.!(msg)
def sendOneWay(msg: T) {
this.!(msg)
}
}
/**
@ -116,8 +118,10 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* Defines the default timeout for an initial receive invocation.
* When specified, the receive function should be able to handle a 'ReceiveTimeout' message.
*/
def setReceiveTimeout(timeout: Long) = this.receiveTimeout = Some(timeout)
def getReceiveTimeout(): Option[Long] = receiveTimeout
def setReceiveTimeout(timeout: Long) {
this.receiveTimeout = Some(timeout)
}
def getReceiveTimeout: Option[Long] = receiveTimeout
/**
* Akka Java API. <p/>
@ -133,7 +137,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* </pre>
*/
def setFaultHandler(handler: FaultHandlingStrategy)
def getFaultHandler(): FaultHandlingStrategy
def getFaultHandler: FaultHandlingStrategy
/**
@ -151,8 +155,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* getContext().setLifeCycle(temporary());
* </pre>
*/
def setLifeCycle(lifeCycle: LifeCycle): Unit
def getLifeCycle(): LifeCycle
def setLifeCycle(lifeCycle: LifeCycle)
def getLifeCycle: LifeCycle
/**
* Akka Java API. <p/>
@ -166,8 +170,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* The default is also that all actors that are created and spawned from within this actor
* is sharing the same dispatcher as its creator.
*/
def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher
def getDispatcher(): MessageDispatcher = dispatcher
def setDispatcher(dispatcher: MessageDispatcher) {this.dispatcher = dispatcher}
def getDispatcher: MessageDispatcher = dispatcher
/**
* Holds the hot swapped partial function.
@ -189,7 +193,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
/**
* Returns the uuid for the actor.
*/
def getUuid() = _uuid
def getUuid = _uuid
def uuid = _uuid
/**
@ -197,14 +201,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
def getSender(): Option[ActorRef] = sender
def getSender: Option[ActorRef] = sender
/**
* Akka Java API. <p/>
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '!!' or '!!!', else None.
*/
def getSenderFuture(): Option[CompletableFuture[Any]] = senderFuture
def getSenderFuture: Option[CompletableFuture[Any]] = senderFuture
/**
* Is the actor being restarted?
@ -238,7 +242,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
/**
* Only for internal use. UUID is effectively final.
*/
protected[akka] def uuid_=(uid: Uuid) = _uuid = uid
protected[akka] def uuid_=(uid: Uuid) {
_uuid = uid
}
/**
* Akka Java API. <p/>
@ -249,7 +255,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* </pre>
* <p/>
*/
def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null)
def sendOneWay(message: AnyRef):Unit = {
sendOneWay(message, null)
}
/**
* Akka Java API. <p/>
@ -262,7 +270,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* </pre>
* <p/>
*/
def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender))
def sendOneWay(message: AnyRef, sender: ActorRef) {
this.!(message)(Option(sender))
}
/**
* Akka Java API. <p/>
@ -306,14 +316,16 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout()) and omits the sender
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]]
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T]
= sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]]
/**
* Akka Java API. <p/>
* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_]
* Uses the Actors default timeout (setTimeout())
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]]
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T]
= sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]]
/**
* Akka Java API. <p/>
@ -326,15 +338,17 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* If you are sending messages using <code>sendRequestReplyFuture</code> then you <b>have to</b> use <code>getContext().reply(..)</code>
* to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T]
= !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]]
/**
* Akka Java API. <p/>
* Forwards the message specified to this actor and preserves the original sender of the message
*/
def forward(message: AnyRef, sender: ActorRef): Unit =
def forward(message: AnyRef, sender: ActorRef) {
if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null")
else forward(message)(Some(sender))
}
/**
* Akka Java API. <p/>
@ -343,7 +357,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def replyUnsafe(message: AnyRef) = reply(message)
def replyUnsafe(message: AnyRef) {
reply(message)
}
/**
* Akka Java API. <p/>
@ -357,7 +373,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(md: MessageDispatcher): Unit
def dispatcher_=(md: MessageDispatcher)
/**
* Get the dispatcher for this actor.
@ -373,12 +389,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* Shuts down the actor its dispatcher and message queue.
* Alias for 'stop'.
*/
def exit() = stop()
def exit() {
stop()
}
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop(): Unit
def stop()
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
@ -388,12 +406,12 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* 'trap' these exceptions and automatically restart the linked actors according to the restart strategy
* defined by the 'faultHandler'.
*/
def link(actorRef: ActorRef): Unit
def link(actorRef: ActorRef)
/**
* Unlink the actor.
*/
def unlink(actorRef: ActorRef): Unit
def unlink(actorRef: ActorRef)
/**
* Atomically start and link an actor.
@ -409,7 +427,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* Akka Java API. <p/>
* Returns the mailbox size.
*/
def getMailboxSize(): Int = mailboxSize
def getMailboxSize: Int = mailboxSize
/**
* Returns the supervisor, if there is one.
@ -420,7 +438,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* Akka Java API. <p/>
* Returns the supervisor, if there is one.
*/
def getSupervisor(): ActorRef = supervisor getOrElse null
def getSupervisor: ActorRef = supervisor getOrElse null
/**
* Returns an unmodifiable Java Map containing the linked actors,
@ -433,7 +451,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* Returns an unmodifiable Java Map containing the linked actors,
* please note that the backing map is thread-safe but not immutable
*/
def getLinkedActors(): JMap[Uuid, ActorRef] = linkedActors
def getLinkedActors: JMap[Uuid, ActorRef] = linkedActors
/**
* Abstraction for unification of sender and senderFuture for later reply
@ -459,9 +477,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
*/
def getChannel: Channel[Any] = channel
protected[akka] def invoke(messageHandle: MessageInvocation): Unit
protected[akka] def invoke(messageHandle: MessageInvocation)
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef])
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@ -473,16 +491,16 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
protected[akka] def actor: Actor = actorInstance.get
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit
protected[akka] def supervisor_=(sup: Option[ActorRef])
protected[akka] def mailbox: AnyRef
protected[akka] def mailbox_=(value: AnyRef): AnyRef
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable)
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int])
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int])
override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
@ -541,7 +559,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
_supervisor = __supervisor
hotswap = __hotswap
setActorSelfFields(actor,this)
start
start()
}
// ========= PUBLIC FUNCTIONS =========
@ -549,11 +567,13 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
/**
* Sets the dispatcher for this actor. Needs to be invoked before the actor is started.
*/
def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard {
if (!isBeingRestarted) {
if (!isRunning) _dispatcher = md
else throw new ActorInitializationException(
"Can not swap dispatcher for " + toString + " after it has been started")
def dispatcher_=(md: MessageDispatcher) {
guard.withGuard {
if (!isBeingRestarted) {
if (!isRunning) _dispatcher = md
else throw new ActorInitializationException(
"Can not swap dispatcher for " + toString + " after it has been started")
}
}
}
@ -585,23 +605,25 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
/**
* Shuts down the actor its dispatcher and message queue.
*/
def stop() = guard.withGuard {
if (isRunning) {
receiveTimeout = None
cancelReceiveTimeout
dispatcher.detach(this)
_status = ActorRefInternals.SHUTDOWN
try {
actor.postStop
} finally {
currentMessage = null
Actor.registry.unregister(this)
if (isRemotingEnabled)
Actor.remote.unregister(this)
def stop() {
guard.withGuard {
if (isRunning) {
receiveTimeout = None
cancelReceiveTimeout
dispatcher.detach(this)
_status = ActorRefInternals.SHUTDOWN
try {
actor.postStop()
} finally {
currentMessage = null
Actor.registry.unregister(this)
if (isRemotingEnabled)
Actor.remote.unregister(this)
setActorSelfFields(actorInstance.get,null)
}
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
setActorSelfFields(actorInstance.get, null)
}
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
}
}
/**
@ -614,15 +636,17 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
* <p/>
* To be invoked from within the actor itself.
*/
def link(actorRef: ActorRef): Unit = guard.withGuard {
val actorRefSupervisor = actorRef.supervisor
val hasSupervisorAlready = actorRefSupervisor.isDefined
if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy
else if (hasSupervisorAlready) throw new IllegalActorStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
else {
_linkedActors.put(actorRef.uuid, actorRef)
actorRef.supervisor = Some(this)
def link(actorRef: ActorRef) {
guard.withGuard {
val actorRefSupervisor = actorRef.supervisor
val hasSupervisorAlready = actorRefSupervisor.isDefined
if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy
else if (hasSupervisorAlready) throw new IllegalActorStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
else {
_linkedActors.put(actorRef.uuid, actorRef)
actorRef.supervisor = Some(this)
}
}
}
@ -631,11 +655,13 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
* <p/>
* To be invoked from within the actor itself.
*/
def unlink(actorRef: ActorRef) = guard.withGuard {
if(_linkedActors.remove(actorRef.uuid) eq null)
throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink")
def unlink(actorRef: ActorRef) {
guard.withGuard {
if (_linkedActors.remove(actorRef.uuid) eq null)
throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink")
actorRef.supervisor = None
actorRef.supervisor = None
}
}
/**
@ -663,10 +689,13 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
// ========= AKKA PROTECTED FUNCTIONS =========
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
_supervisor = sup
}
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None)
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None)
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@ -682,8 +711,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
/**
* Callback for the dispatcher. This is the single entry point to the user Actor implementation.
*/
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = {
guard.lock.lock
protected[akka] def invoke(messageHandle: MessageInvocation) {
guard.lock.lock()
try {
if (!isShutdown) {
currentMessage = messageHandle
@ -706,7 +735,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
throw e
}
}
} finally { guard.lock.unlock }
} finally { guard.lock.unlock() }
}
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
@ -768,7 +797,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
val freshActor = newActor
setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.preStart
freshActor.preStart()
freshActor.postRestart(reason)
}
}
@ -779,7 +808,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification)
}
stop
stop()
}
@tailrec def attemptRestart() {
@ -858,7 +887,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
true
}
private def handleExceptionInDispatch(reason: Throwable, message: Any) = {
private def handleExceptionInDispatch(reason: Throwable, message: Any) {
EventHandler.error(reason, this, message.toString)
//Prevent any further messages to be processed until the actor has been restarted
@ -875,7 +904,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
}
}
private def notifySupervisorWithMessage(notification: LifeCycleMessage) = {
private def notifySupervisorWithMessage(notification: LifeCycleMessage) {
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
_supervisor.foreach { sup =>
if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors
@ -920,19 +949,19 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor,
lookupAndSetSelfFields(actor.getClass, actor, value)
}
private def initializeActorInstance = {
actor.preStart // run actor preStart
private def initializeActorInstance() {
actor.preStart() // run actor preStart
Actor.registry.register(this)
}
protected[akka] def checkReceiveTimeout = {
cancelReceiveTimeout
protected[akka] def checkReceiveTimeout() {
cancelReceiveTimeout()
if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS))
}
}
protected[akka] def cancelReceiveTimeout = {
protected[akka] def cancelReceiveTimeout() {
if (_futureTimeout.isDefined) {
_futureTimeout.get.cancel(true)
_futureTimeout = None
@ -962,7 +991,7 @@ private[akka] case class RemoteActorRef private[akka] (
val actorType: ActorType = ActorType.ScalaActor)
extends ActorRef with ScalaActorRef {
ensureRemotingEnabled
ensureRemotingEnabled()
timeout = _timeout
// FIXME BAD, we should not have different ActorRefs
@ -980,10 +1009,11 @@ private[akka] case class RemoteActorRef private[akka] (
"Actor with Address [" + address + "] is not bound to a Clustered Deployment")
}
start
start()
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
Actor.remote.send[Any](message, senderOption, None, remoteAddress, timeout, true, this, None, actorType, loader)
}
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@ -998,35 +1028,53 @@ private[akka] case class RemoteActorRef private[akka] (
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
def start: ActorRef = synchronized {
def start(): ActorRef = synchronized {
_status = ActorRefInternals.RUNNING
this
}
def stop: Unit = synchronized {
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
def stop() {
synchronized {
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}
}
}
// ==== NOT SUPPORTED ====
@deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1")
def actorClass: Class[_ <: Actor] = unsupported
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher_=(md: MessageDispatcher) {
unsupported
}
def dispatcher: MessageDispatcher = unsupported
def link(actorRef: ActorRef): Unit = unsupported
def unlink(actorRef: ActorRef): Unit = unsupported
def link(actorRef: ActorRef) {
unsupported
}
def unlink(actorRef: ActorRef) {
unsupported
}
def startLink(actorRef: ActorRef): ActorRef = unsupported
def supervisor: Option[ActorRef] = unsupported
def linkedActors: JMap[Uuid, ActorRef] = unsupported
protected[akka] def mailbox: AnyRef = unsupported
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
unsupported
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}
protected[akka] def invoke(messageHandle: MessageInvocation) {
unsupported
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
unsupported
}
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}
@ -1127,7 +1175,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* </pre>
* <p/>
*/
def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = {
def !(message: Any)(implicit sender: Option[ActorRef] = None) {
if (isRunning) postMessageToMailbox(message, sender)
else throw new ActorInitializationException(
"Actor has not been started, you need to invoke 'actor.start()' before using it")
@ -1199,12 +1247,14 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef =>
* <p/>
* Throws an IllegalStateException if unable to determine what to reply to.
*/
def reply(message: Any) = if (!reply_?(message)) throw new IllegalActorStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
"\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." +
"\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
def reply(message: Any) {
if (!reply_?(message)) throw new IllegalActorStateException(
"\n\tNo sender in scope, can't reply. " +
"\n\tYou have probably: " +
"\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." +
"\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." +
"\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope")
}
/**
* Use <code>reply_?(..)</code> to reply with a message to the original sender of the message currently

View file

@ -687,13 +687,13 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com
}
def await(atMost: Duration) = {
_lock.lock
_lock.lock()
if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
}
def await = {
_lock.lock
_lock.lock()
if (try { awaitUnsafe(timeLeft()) } finally { _lock.unlock }) this
else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds")
}

View file

@ -16,22 +16,24 @@ import akka.actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final case class MessageInvocation(val receiver: ActorRef,
val message: Any,
val sender: Option[ActorRef],
val senderFuture: Option[CompletableFuture[Any]]) {
final case class MessageInvocation(receiver: ActorRef,
message: Any,
sender: Option[ActorRef],
senderFuture: Option[CompletableFuture[Any]]) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
def invoke = try {
receiver.invoke(this)
} catch {
case e: NullPointerException => throw new ActorInitializationException(
"Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).")
def invoke() {
try {
receiver.invoke(this)
} catch {
case e: NullPointerException => throw new ActorInitializationException(
"Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).")
}
}
}
final case class FutureInvocation[T](future: CompletableFuture[T], function: () => T, cleanup: () => Unit) extends Runnable {
def run = {
def run() {
future complete (try {
Right(function())
} catch {
@ -46,22 +48,23 @@ final case class FutureInvocation[T](future: CompletableFuture[T], function: ()
object MessageDispatcher {
val UNSCHEDULED = 0
val SCHEDULED = 1
val SCHEDULED = 1
val RESCHEDULED = 2
implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait MessageDispatcher {
import MessageDispatcher._
protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val uuids = new ConcurrentSkipListSet[Uuid]
protected val futures = new AtomicLong(0L)
protected val guard = new ReentrantGuard
protected val active = new Switch(false)
protected val guard = new ReentrantGuard
protected val active = new Switch(false)
private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard
@ -73,18 +76,24 @@ trait MessageDispatcher {
/**
* Attaches the specified actorRef to this dispatcher
*/
final def attach(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef)
final def attach(actorRef: ActorRef) {
guard withGuard {
register(actorRef)
}
}
/**
* Detaches the specified actorRef from this dispatcher
*/
final def detach(actorRef: ActorRef): Unit = guard withGuard {
unregister(actorRef)
final def detach(actorRef: ActorRef) {
guard withGuard {
unregister(actorRef)
}
}
private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation)
private[akka] final def dispatchMessage(invocation: MessageInvocation) {
dispatch(invocation)
}
private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = {
futures.getAndIncrement()
@ -92,7 +101,11 @@ trait MessageDispatcher {
val future = new DefaultCompletableFuture[T](timeout)
if (active.isOff)
guard withGuard { active.switchOn { start } }
guard withGuard {
active.switchOn {
start()
}
}
executeFuture(FutureInvocation[T](future, block, futureCleanup))
future
@ -104,7 +117,7 @@ trait MessageDispatcher {
}
private val futureCleanup: () => Unit =
() => if (futures.decrementAndGet() == 0) {
() => if (futures.decrementAndGet() == 0) {
guard withGuard {
if (futures.get == 0 && uuids.isEmpty) {
shutdownSchedule match {
@ -126,7 +139,7 @@ trait MessageDispatcher {
uuids add actorRef.uuid
if (active.isOff) {
active.switchOn {
start
start()
}
}
}
@ -134,7 +147,7 @@ trait MessageDispatcher {
private[akka] def unregister(actorRef: ActorRef) = {
if (uuids remove actorRef.uuid) {
actorRef.mailbox = null
if (uuids.isEmpty && futures.get == 0){
if (uuids.isEmpty && futures.get == 0) {
shutdownSchedule match {
case UNSCHEDULED =>
shutdownSchedule = SCHEDULED
@ -150,31 +163,33 @@ trait MessageDispatcher {
/**
* Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
*/
def stopAllAttachedActors {
def stopAllAttachedActors() {
val i = uuids.iterator
while (i.hasNext()) {
val uuid = i.next()
Actor.registry.local.actorFor(uuid) match {
case Some(actor) => actor.stop()
case None => {}
case None => {}
}
}
}
private val shutdownAction = new Runnable {
def run = guard withGuard {
shutdownSchedule match {
case RESCHEDULED =>
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED =>
if (uuids.isEmpty && futures.get == 0) {
active switchOff {
shutdown // shut down in the dispatcher's references is zero
def run() {
guard withGuard {
shutdownSchedule match {
case RESCHEDULED =>
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED =>
if (uuids.isEmpty && futures.get == 0) {
active switchOff {
shutdown() // shut down in the dispatcher's references is zero
}
}
}
shutdownSchedule = UNSCHEDULED
case UNSCHEDULED => //Do nothing
shutdownSchedule = UNSCHEDULED
case UNSCHEDULED => //Do nothing
}
}
}
}
@ -188,29 +203,29 @@ trait MessageDispatcher {
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
*/
def suspend(actorRef: ActorRef): Unit
def suspend(actorRef: ActorRef)
/*
* After the call to this method, the dispatcher must begin any new message processing for the specified reference
*/
def resume(actorRef: ActorRef): Unit
def resume(actorRef: ActorRef)
/**
* Will be called when the dispatcher is to queue an invocation for execution
*/
private[akka] def dispatch(invocation: MessageInvocation): Unit
private[akka] def dispatch(invocation: MessageInvocation)
private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit
private[akka] def executeFuture(invocation: FutureInvocation[_])
/**
* Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
*/
private[akka] def start(): Unit
private[akka] def start()
/**
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
*/
private[akka] def shutdown(): Unit
private[akka] def shutdown()
/**
* Returns the size of the mailbox for the specified actor
@ -235,25 +250,30 @@ abstract class MessageDispatcherConfigurator {
def mailboxType(config: Configuration): MailboxType = {
val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY)
if (capacity < 1) UnboundedMailbox()
else BoundedMailbox(capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT))
else {
val duration = Duration(
config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt),
TIME_UNIT)
BoundedMailbox(capacity, duration)
}
}
def configureThreadPool(config: Configuration, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
import ThreadPoolConfigDispatcherBuilder.conf_?
//Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure(
conf_?(config getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
conf_?(config getInt "keep-alive-time")(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))),
conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)),
conf_?(config getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)),
conf_?(config getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)),
conf_?(config getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)),
conf_?(config getDouble "max-pool-size-factor")(factor => _.setMaxPoolSizeFromFactor(factor)),
conf_?(config getInt "executor-bounds")(bounds => _.setExecutorBounds(bounds)),
conf_?(config getBool "allow-core-timeout")(allow => _.setAllowCoreThreadTimeout(allow)),
conf_?(config getString "rejection-policy" map {
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "abort" => new AbortPolicy()
case "caller-runs" => new CallerRunsPolicy()
case "discard-oldest" => new DiscardOldestPolicy()
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
case "discard" => new DiscardPolicy()
case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x)
})(policy => _.setRejectionPolicy(policy)))
}
}

View file

@ -46,7 +46,7 @@ object Format {
val bos = new ByteArrayOutputStream
val out = new ObjectOutputStream(bos)
out.writeObject(obj)
out.close
out.close()
bos.toByteArray
}
@ -55,7 +55,7 @@ object Format {
//if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else
new ObjectInputStream(new ByteArrayInputStream(bytes))
val obj = in.readObject
in.close
in.close()
obj
}
}

View file

@ -37,7 +37,6 @@ import akka.serialization.{Format, Serializers}
import akka.serialization.Compression.LZF
import akka.AkkaException
//import akka.monitoring.Monitoring
import akka.cluster.zookeeper._
import com.eaio.uuid.UUID
@ -55,12 +54,12 @@ class ClusterException(message: String) extends AkkaException(message)
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait ClusterNodeMBean {
def start: Unit
def stop: Unit
def start()
def stop()
def disconnect: Unit
def reconnect: Unit
def resign: Unit
def disconnect()
def reconnect()
def resign()
def isConnected: Boolean
@ -86,9 +85,9 @@ trait ClusterNodeMBean {
def getUuidsForActorsInUseOnNode(nodeName: String): Array[String]
def getAddressesForActorsInUseOnNode(nodeName: String): Array[String]
def setConfigElement(key: String, value: String): Unit
def setConfigElement(key: String, value: String)
def getConfigElement(key: String): AnyRef
def removeConfigElement(key: String): Unit
def removeConfigElement(key: String)
def getConfigElementKeys: Array[String]
}
@ -116,22 +115,24 @@ object Cluster {
* For Scala API.
*/
trait ChangeListener {
def notify(event: ChangeNotification, client: ClusterNode) = event match {
case NodeConnected(name) => nodeConnected(name, client)
case NodeDisconnected(name) => nodeDisconnected(name, client)
case NewLeader(name: String) => newLeader(name, client)
case NewSession => thisNodeNewSession(client)
case ThisNode.Connected => thisNodeConnected(client)
case ThisNode.Disconnected => thisNodeDisconnected(client)
case ThisNode.Expired => thisNodeExpired(client)
def notify(event: ChangeNotification, client: ClusterNode) {
event match {
case NodeConnected(name) => nodeConnected(name, client)
case NodeDisconnected(name) => nodeDisconnected(name, client)
case NewLeader(name: String) => newLeader(name, client)
case NewSession => thisNodeNewSession(client)
case ThisNode.Connected => thisNodeConnected(client)
case ThisNode.Disconnected => thisNodeDisconnected(client)
case ThisNode.Expired => thisNodeExpired(client)
}
}
def nodeConnected(node: String, client: ClusterNode) = {}
def nodeDisconnected(node: String, client: ClusterNode) = {}
def newLeader(name: String, client: ClusterNode) = {}
def thisNodeNewSession(client: ClusterNode) = {}
def thisNodeConnected(client: ClusterNode) = {}
def thisNodeDisconnected(client: ClusterNode) = {}
def thisNodeExpired(client: ClusterNode) = {}
def nodeConnected(node: String, client: ClusterNode) {}
def nodeDisconnected(node: String, client: ClusterNode) {}
def newLeader(name: String, client: ClusterNode) {}
def thisNodeNewSession(client: ClusterNode) {}
def thisNodeConnected(client: ClusterNode) {}
def thisNodeDisconnected(client: ClusterNode) {}
def thisNodeExpired(client: ClusterNode) {}
}
/**
@ -156,11 +157,12 @@ object Cluster {
val defaultSerializer = new SerializableSerializer
private val _zkServer = new AtomicReference[Option[ZkServer]](None)
private val _nodes = new AtomicReference(new Nodes)
private val _nodes = new AtomicReference[Nodes](new Nodes)
private val _clusterNames = new ConcurrentSkipListSet[String]
private[cluster] def updateNodes(f: Nodes => Nodes) =
private[cluster] def updateNodes(f: Nodes => Nodes) {
while (Some(_nodes.get).map(node => _nodes.compareAndSet(node, f(node)) == false).get) {}
}
/**
* Looks up the local hostname.
@ -288,18 +290,22 @@ object Cluster {
* <p/>
* <b>WARNING: Use with care</b>
*/
def reset(): Unit = withPrintStackTraceOnError {
EventHandler.info(this, "Resetting all clusters connected to in this JVM")
if (!clusters.isEmpty) {
nodes foreach { tp =>
val (_, node) = tp
node.disconnect
node.remoteService.shutdown
def reset() {
withPrintStackTraceOnError {
EventHandler.info(this, "Resetting all clusters connected to in this JVM")
if (!clusters.isEmpty) {
nodes foreach {
tp =>
val (_, node) = tp
node.disconnect()
node.remoteService.shutdown()
}
implicit val zkClient = newZkClient
clusters foreach (resetNodesInCluster(_))
ignore[ZkNoNodeException](zkClient.deleteRecursive(ZooKeeperBarrier.BarriersNode))
zkClient.close()
}
implicit val zkClient = newZkClient
clusters foreach (resetNodesInCluster(_))
ignore[ZkNoNodeException](zkClient.deleteRecursive(ZooKeeperBarrier.BarriersNode))
zkClient.close
}
}
@ -314,11 +320,13 @@ object Cluster {
/**
* Shut down the local ZooKeeper server.
*/
def shutdownLocalCluster() = withPrintStackTraceOnError {
EventHandler.info(this, "Shuts down local cluster")
reset
_zkServer.get.foreach(_.shutdown)
_zkServer.set(None)
def shutdownLocalCluster() {
withPrintStackTraceOnError {
EventHandler.info(this, "Shuts down local cluster")
reset()
_zkServer.get.foreach(_.shutdown())
_zkServer.set(None)
}
}
/**
@ -350,6 +358,8 @@ object Cluster {
}
/**
* A Cluster is made up by a bunch of jvm's, the ClusterNode.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ClusterNode private[akka] (
@ -371,13 +381,13 @@ class ClusterNode private[akka] (
val remoteClientLifeCycleListener = actorOf(new Actor {
def receive = {
case RemoteClientError(cause, client, address) => client.shutdownClientModule
case RemoteClientDisconnected(client, address) => client.shutdownClientModule
case _ => //ignore other
case RemoteClientError(cause, client, address) => client.shutdownClientModule()
case RemoteClientDisconnected(client, address) => client.shutdownClientModule()
case _ => //ignore other
}
}, "akka.cluster.remoteClientLifeCycleListener").start
}, "akka.cluster.remoteClientLifeCycleListener").start()
val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start
val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start()
val remoteService: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport
@ -441,13 +451,13 @@ class ClusterNode private[akka] (
private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
private[cluster] val leaderElectionCallback = new LockListener {
def lockAcquired {
override def lockAcquired() {
EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName))
self.isLeader.set(true)
self.publish(Cluster.NewLeader(self.nodeAddress.nodeName))
}
def lockReleased {
override def lockReleased() {
EventHandler.info(this,
"Node [%s] is *NOT* the leader anymore".format(self.nodeAddress.nodeName))
self.isLeader.set(false)
@ -473,44 +483,47 @@ class ClusterNode private[akka] (
def start(): ClusterNode = {
isConnected switchOn {
initializeNode
initializeNode()
}
this
}
def stop(): Unit = isConnected switchOff {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
def stop() {
isConnected switchOff {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
locallyCachedMembershipNodes.clear
locallyCheckedOutActors.clear
locallyCachedMembershipNodes.clear()
locallyCheckedOutActors.clear()
replicaConnections.toList.foreach({ case (_, (address, _)) =>
remote.shutdownClientConnection(address) // shut down client connections
})
replicaConnections.toList.foreach({
case (_, (address, _)) =>
Actor.remote.shutdownClientConnection(address) // shut down client connections
})
remoteService.shutdown // shutdown server
remoteService.shutdown() // shutdown server
remoteClientLifeCycleListener.stop
remoteDaemon.stop
remoteClientLifeCycleListener.stop()
remoteDaemon.stop()
// for monitoring remote listener
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop)
// for monitoring remote listener
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop())
replicaConnections.clear
updateNodes(_ - nodeAddress)
replicaConnections.clear()
updateNodes(_ - nodeAddress)
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
}
}
def disconnect(): ClusterNode = {
zkClient.unsubscribeAll
zkClient.close
zkClient.unsubscribeAll()
zkClient.close()
this
}
def reconnect(): ClusterNode = {
zkClient.reconnect
zkClient.reconnect()
this
}
@ -526,7 +539,9 @@ class ClusterNode private[akka] (
this
} else throw new IllegalStateException("Can not register 'ChangeListener' after the cluster node has been started")
private[cluster] def publish(change: ChangeNotification) = changeListeners.iterator.foreach(_.notify(change, this))
private[cluster] def publish(change: ChangeNotification) {
changeListeners.iterator.foreach(_.notify(change, this))
}
// =======================================
// Leader
@ -540,7 +555,7 @@ class ClusterNode private[akka] (
/**
* Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op.
*/
def resign() { if (isLeader.get) leaderLock.unlock }
def resign() { if (isLeader.get) leaderLock.unlock() }
// =======================================
// Actor
@ -685,7 +700,7 @@ class ClusterNode private[akka] (
/**
* Removes actor with uuid from the cluster.
*/
def remove(uuid: UUID) = {
def remove(uuid: UUID) {
releaseActorOnAllNodes(uuid)
locallyCheckedOutActors.remove(uuid)
@ -766,7 +781,7 @@ class ClusterNode private[akka] (
// val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format))
val actor = fromBinary[T](bytes, remoteServerAddress)(format)
remoteService.register(UUID_PREFIX + uuid, actor) // clustered refs are always registered and looked up by UUID
actor.start
actor.start()
actor.asInstanceOf[LocalActorRef]
case Right(exception) => throw exception
}
@ -776,16 +791,20 @@ class ClusterNode private[akka] (
/**
* Using (checking out) all actors with a specific UUID on all nodes in the cluster.
*/
def useActorOnAllNodes(uuid: UUID): Unit = isConnected ifOn {
EventHandler.debug(this,
"Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid))
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
membershipNodes foreach { node =>
replicaConnections.get(node) foreach { case (_, connection) =>
connection ! command
def useActorOnAllNodes(uuid: UUID) {
isConnected ifOn {
EventHandler.debug(this,
"Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid))
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
membershipNodes foreach {
node =>
replicaConnections.get(node) foreach {
case (_, connection) =>
connection ! command
}
}
}
}
@ -793,43 +812,53 @@ class ClusterNode private[akka] (
/**
* Using (checking out) specific UUID on a specefic node.
*/
def useActorOnNode(node: String, uuid: UUID): Unit = isConnected ifOn {
replicaConnections.get(node) foreach { case (_, connection) =>
connection ! RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
def useActorOnNode(node: String, uuid: UUID) {
isConnected ifOn {
replicaConnections.get(node) foreach {
case (_, connection) =>
connection ! RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
}
}
}
/**
* Checks in an actor after done using it on this node.
*/
def release(actorAddress: String): Unit = isConnected ifOn {
actorUuidsForActorAddress(actorAddress) foreach { uuid =>
EventHandler.debug(this,
"Releasing actor with UUID [%s] after usage".format(uuid))
locallyCheckedOutActors.remove(uuid)
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, nodeAddress)))
ignore[ZkNoNodeException](zkClient.delete(actorRegistryNodePathFor(uuid, remoteServerAddress)))
def release(actorAddress: String) {
isConnected ifOn {
actorUuidsForActorAddress(actorAddress) foreach {
uuid =>
EventHandler.debug(this,
"Releasing actor with UUID [%s] after usage".format(uuid))
locallyCheckedOutActors.remove(uuid)
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, nodeAddress)))
ignore[ZkNoNodeException](zkClient.delete(actorRegistryNodePathFor(uuid, remoteServerAddress)))
}
}
}
/**
* Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'.
*/
def releaseActorOnAllNodes(uuid: UUID): Unit = isConnected ifOn {
EventHandler.debug(this,
"Releasing (checking in) all actors with UUID [%s] on all nodes in cluster".format(uuid))
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(RELEASE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
nodesForActorsInUseWithUuid(uuid) foreach { node =>
replicaConnections.get(node) foreach { case (_, connection) =>
connection ! command
def releaseActorOnAllNodes(uuid: UUID) {
isConnected ifOn {
EventHandler.debug(this,
"Releasing (checking in) all actors with UUID [%s] on all nodes in cluster".format(uuid))
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(RELEASE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
nodesForActorsInUseWithUuid(uuid) foreach {
node =>
replicaConnections.get(node) foreach {
case (_, connection) =>
connection ! command
}
}
}
}
@ -845,8 +874,11 @@ class ClusterNode private[akka] (
EventHandler.debug(this,
"Creating cluster actor ref with router [%s] for actors [%s]".format(router, addresses.mkString(", ")))
def registerClusterActorRefForAddress(actorRef: ClusterActorRef, addresses: Array[(UUID, InetSocketAddress)]) =
addresses foreach { case (_, address) => clusterActorRefs.put(address, actorRef) }
def registerClusterActorRefForAddress(actorRef: ClusterActorRef, addresses: Array[(UUID, InetSocketAddress)]) {
addresses foreach {
case (_, address) => clusterActorRefs.put(address, actorRef)
}
}
// FIXME remove?
def refByUuid(uuid: UUID): ActorRef = {
@ -874,19 +906,23 @@ class ClusterNode private[akka] (
/**
* Migrate the actor from 'this' node to node 'to'.
*/
def migrate(to: NodeAddress, actorAddress: String): Unit = migrate(nodeAddress, to, actorAddress)
def migrate(to: NodeAddress, actorAddress: String) {
migrate(nodeAddress, to, actorAddress)
}
/**
* Migrate the actor from node 'from' to node 'to'.
*/
def migrate(
from: NodeAddress, to: NodeAddress, actorAddress: String): Unit = isConnected ifOn {
if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'")
if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'")
if (isInUseOnNode(actorAddress, from)) {
migrateWithoutCheckingThatActorResidesOnItsHomeNode(from, to, actorAddress)
} else {
throw new ClusterException("Can't move actor from node [" + from + "] since it does not exist on this node")
from: NodeAddress, to: NodeAddress, actorAddress: String) {
isConnected ifOn {
if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'")
if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'")
if (isInUseOnNode(actorAddress, from)) {
migrateWithoutCheckingThatActorResidesOnItsHomeNode(from, to, actorAddress)
} else {
throw new ClusterException("Can't move actor from node [" + from + "] since it does not exist on this node")
}
}
}
@ -1013,7 +1049,7 @@ class ClusterNode private[akka] (
/**
* Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument).
*/
def send(f: Function0[Unit], replicationFactor: Int): Unit = {
def send(f: Function0[Unit], replicationFactor: Int) {
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN0_UNIT)
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f)))
@ -1038,7 +1074,7 @@ class ClusterNode private[akka] (
* Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument)
* with the argument speficied.
*/
def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int): Unit = {
def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int) {
val message = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(FUNCTION_FUN1_ARG_UNIT)
.setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg))))
@ -1093,10 +1129,12 @@ class ClusterNode private[akka] (
case e: KeeperException.NoNodeException => null
}
def removeConfigElement(key: String) = ignore[ZkNoNodeException]{
EventHandler.debug(this,
"Removing config element with key [%s] from cluster registry".format(key))
zkClient.deleteRecursive(configurationPathFor(key))
def removeConfigElement(key: String) {
ignore[ZkNoNodeException] {
EventHandler.debug(this,
"Removing config element with key [%s] from cluster registry".format(key))
zkClient.deleteRecursive(configurationPathFor(key))
}
}
def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_NODE).toList.toArray.asInstanceOf[Array[String]]
@ -1141,15 +1179,15 @@ class ClusterNode private[akka] (
private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String =
"%s/%s:%s".format(actorRegistryNodePathFor(uuid), address.getHostName, address.getPort)
private[cluster] def initializeNode = {
private[cluster] def initializeNode() {
EventHandler.info(this, "Initializing cluster node [%s]".format(nodeAddress))
createRootClusterNode
createRootClusterNode()
val isLeader = joinLeaderElection
if (isLeader) createNodeStructureIfNeeded
if (isLeader) createNodeStructureIfNeeded()
registerListeners
joinMembershipNode
joinActorsAtAddressNode
fetchMembershipChildrenNodes
joinMembershipNode()
joinActorsAtAddressNode()
fetchMembershipChildrenNodes()
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
}
@ -1173,7 +1211,7 @@ class ClusterNode private[akka] (
var replicas = HashSet.empty[ActorRef]
if (replicationFactor < 1) return replicas
connectToAllReplicas
connectToAllReplicas()
val numberOfReplicas = replicaConnections.size
val replicaConnectionsAsArray = replicaConnections.toList map { case (node, (address, actorRef)) => actorRef } // the ActorRefs
@ -1196,7 +1234,7 @@ class ClusterNode private[akka] (
/**
* Connect to all available replicas unless already connected).
*/
private def connectToAllReplicas = {
private def connectToAllReplicas() {
membershipNodes foreach { node =>
if (!replicaConnections.contains(node)) {
val address = addressForNode(node)
@ -1206,7 +1244,7 @@ class ClusterNode private[akka] (
}
}
private[cluster] def joinMembershipNode = {
private[cluster] def joinMembershipNode() {
nodeNameToAddress.put(nodeAddress.nodeName, remoteServerAddress)
try {
EventHandler.info(this,
@ -1220,8 +1258,9 @@ class ClusterNode private[akka] (
}
}
private[cluster] def joinActorsAtAddressNode =
private[cluster] def joinActorsAtAddressNode() {
ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName)))
}
private[cluster] def joinLeaderElection: Boolean = {
EventHandler.info(this, "Node [%s] is joining leader election".format(nodeAddress.nodeName))
@ -1328,12 +1367,14 @@ class ClusterNode private[akka] (
private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String] =
(locallyCachedMembershipNodes diff Set(nodes: _*)).toList
private def createRootClusterNode: Unit = ignore[ZkNodeExistsException] {
private def createRootClusterNode() {
ignore[ZkNodeExistsException] {
zkClient.create(CLUSTER_NODE, null, CreateMode.PERSISTENT)
EventHandler.info(this, "Created node [%s]".format(CLUSTER_NODE))
}
}
private def createNodeStructureIfNeeded = {
private def createNodeStructureIfNeeded() {
baseNodes.foreach { path =>
try {
zkClient.create(path, null, CreateMode.PERSISTENT)
@ -1353,9 +1394,9 @@ class ClusterNode private[akka] (
zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
}
private def fetchMembershipChildrenNodes = {
private def fetchMembershipChildrenNodes() {
val membershipChildren = zkClient.getChildren(MEMBERSHIP_NODE)
locallyCachedMembershipNodes.clear
locallyCachedMembershipNodes.clear()
membershipChildren.iterator.foreach(locallyCachedMembershipNodes.add)
}
@ -1363,41 +1404,41 @@ class ClusterNode private[akka] (
val clusterMBean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
import Cluster._
def start = self.start
def stop = self.stop
override def start() {self.start()}
override def stop() {self.stop()}
def disconnect = self.disconnect
def reconnect = self.reconnect
def resign = self.resign
override def disconnect() = self.disconnect()
override def reconnect() {self.reconnect()}
override def resign() {self.resign()}
def isConnected = self.isConnected.isOn
override def isConnected = self.isConnected.isOn
def getRemoteServerHostname = self.nodeAddress.hostname
def getRemoteServerPort = self.nodeAddress.port
override def getRemoteServerHostname = self.nodeAddress.hostname
override def getRemoteServerPort = self.nodeAddress.port
def getNodeName = self.nodeAddress.nodeName
def getClusterName = self.nodeAddress.clusterName
def getZooKeeperServerAddresses = self.zkServerAddresses
override def getNodeName = self.nodeAddress.nodeName
override def getClusterName = self.nodeAddress.clusterName
override def getZooKeeperServerAddresses = self.zkServerAddresses
def getMemberNodes = self.locallyCachedMembershipNodes.iterator.map(_.toString).toArray
def getLeader = self.leader.toString
override def getMemberNodes = self.locallyCachedMembershipNodes.iterator.map(_.toString).toArray
override def getLeader = self.leader.toString
def getUuidsForActorsInUse = self.uuidsForActorsInUse.map(_.toString).toArray
def getAddressesForActorsInUse = self.addressesForActorsInUse.map(_.toString).toArray
override def getUuidsForActorsInUse = self.uuidsForActorsInUse.map(_.toString).toArray
override def getAddressesForActorsInUse = self.addressesForActorsInUse.map(_.toString).toArray
def getUuidsForClusteredActors = self.uuidsForClusteredActors.map(_.toString).toArray
def getAddressesForClusteredActors = self.addressesForClusteredActors.map(_.toString).toArray
override def getUuidsForClusteredActors = self.uuidsForClusteredActors.map(_.toString).toArray
override def getAddressesForClusteredActors = self.addressesForClusteredActors.map(_.toString).toArray
def getNodesForActorInUseWithUuid(uuid: String) = self.nodesForActorsInUseWithUuid(stringToUuid(uuid))
def getNodesForActorInUseWithAddress(id: String) = self.nodesForActorsInUseWithAddress(id)
override def getNodesForActorInUseWithUuid(uuid: String) = self.nodesForActorsInUseWithUuid(stringToUuid(uuid))
override def getNodesForActorInUseWithAddress(id: String) = self.nodesForActorsInUseWithAddress(id)
def getUuidsForActorsInUseOnNode(nodeName: String) = self.uuidsForActorsInUseOnNode(nodeName).map(_.toString).toArray
def getAddressesForActorsInUseOnNode(nodeName: String) = self.addressesForActorsInUseOnNode(nodeName).map(_.toString).toArray
override def getUuidsForActorsInUseOnNode(nodeName: String) = self.uuidsForActorsInUseOnNode(nodeName).map(_.toString).toArray
override def getAddressesForActorsInUseOnNode(nodeName: String) = self.addressesForActorsInUseOnNode(nodeName).map(_.toString).toArray
def setConfigElement(key: String, value: String) = self.setConfigElement(key, value.getBytes("UTF-8"))
def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8")
def removeConfigElement(key: String) = self.removeConfigElement(key)
def getConfigElementKeys = self.getConfigElementKeys.toArray
override def setConfigElement(key: String, value: String) {self.setConfigElement(key, value.getBytes("UTF-8"))}
override def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8")
override def removeConfigElement(key: String) { self.removeConfigElement(key)}
override def getConfigElementKeys =self.getConfigElementKeys.toArray
}
JMX.register(clusterJmxObjectName, clusterMBean)
@ -1411,24 +1452,28 @@ class ClusterNode private[akka] (
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class MembershipChildListener(self: ClusterNode) extends IZkChildListener with ErrorHandler {
def handleChildChange(parentPath: String, currentChilds: JList[String]) = withErrorHandler {
if (currentChilds ne null) {
val childList = currentChilds.toList
if (!childList.isEmpty) EventHandler.debug(this,
"MembershipChildListener at [%s] has children [%s]"
.format(self.nodeAddress.nodeName, childList.mkString(" ")))
self.findNewlyConnectedMembershipNodes(childList) foreach { name =>
self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map
self.publish(Cluster.NodeConnected(name))
}
def handleChildChange(parentPath: String, currentChilds: JList[String]) {
withErrorHandler {
if (currentChilds ne null) {
val childList = currentChilds.toList
if (!childList.isEmpty) EventHandler.debug(this,
"MembershipChildListener at [%s] has children [%s]"
.format(self.nodeAddress.nodeName, childList.mkString(" ")))
self.findNewlyConnectedMembershipNodes(childList) foreach {
name =>
self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map
self.publish(Cluster.NodeConnected(name))
}
self.findNewlyDisconnectedMembershipNodes(childList) foreach { name =>
self.nodeNameToAddress.remove(name) // update 'nodename-address' map
self.publish(Cluster.NodeDisconnected(name))
}
self.findNewlyDisconnectedMembershipNodes(childList) foreach {
name =>
self.nodeNameToAddress.remove(name) // update 'nodename-address' map
self.publish(Cluster.NodeDisconnected(name))
}
self.locallyCachedMembershipNodes.clear
childList.foreach(self.locallyCachedMembershipNodes.add)
self.locallyCachedMembershipNodes.clear()
childList.foreach(self.locallyCachedMembershipNodes.add)
}
}
}
}
@ -1437,27 +1482,26 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class StateListener(self: ClusterNode) extends IZkStateListener {
def handleStateChanged(state: KeeperState) = state match {
case KeeperState.SyncConnected =>
EventHandler.debug(this, "Cluster node [%s] - Connected".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Connected)
case KeeperState.Disconnected =>
EventHandler.debug(this, "Cluster node [%s] - Disconnected".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Disconnected)
case KeeperState.Expired =>
EventHandler.debug(this, "Cluster node [%s] - Expired".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Expired)
def handleStateChanged(state: KeeperState) {
state match {
case KeeperState.SyncConnected =>
EventHandler.debug(this, "Cluster node [%s] - Connected".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Connected)
case KeeperState.Disconnected =>
EventHandler.debug(this, "Cluster node [%s] - Disconnected".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Disconnected)
case KeeperState.Expired =>
EventHandler.debug(this, "Cluster node [%s] - Expired".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Expired)
}
}
/**
* Re-initialize after the zookeeper session has expired and a new session has been created.
*/
def handleNewSession = {
def handleNewSession() {
EventHandler.debug(this, "Session expired re-initializing node [%s]".format(self.nodeAddress))
self.initializeNode
self.initializeNode()
self.publish(Cluster.NewSession)
}
}
@ -1521,15 +1565,15 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
else EventHandler.warning(this,
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message))
case START => cluster.start
case START => cluster.start()
case STOP => cluster.stop
case STOP => cluster.stop()
case DISCONNECT => cluster.disconnect
case DISCONNECT => cluster.disconnect()
case RECONNECT => cluster.reconnect
case RECONNECT => cluster.reconnect()
case RESIGN => cluster.resign
case RESIGN => cluster.resign()
case FAIL_OVER_CONNECTIONS =>
val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)])
@ -1539,7 +1583,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
def receive = {
case f: Function0[Unit] => try { f() } finally { self.stop }
case f: Function0[Unit] => try { f() } finally { self.stop() }
}
}).start ! payloadFor(message, classOf[Function0[Unit]])
@ -1547,7 +1591,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
def receive = {
case f: Function0[Any] => try { self.reply(f()) } finally { self.stop }
case f: Function0[Any] => try { self.reply(f()) } finally { self.stop() }
}
}).start forward payloadFor(message, classOf[Function0[Any]])
@ -1555,7 +1599,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
def receive = {
case t: Tuple2[Function1[Any, Unit], Any] => try { t._1(t._2) } finally { self.stop }
case t: Tuple2[Function1[Any, Unit], Any] => try { t._1(t._2) } finally { self.stop() }
}
}).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
@ -1563,7 +1607,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
def receive = {
case t: Tuple2[Function1[Any, Any], Any] => try { self.reply(t._1(t._2)) } finally { self.stop }
case t: Tuple2[Function1[Any, Any], Any] => try { self.reply(t._1(t._2)) } finally { self.stop() }
}
}).start forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}