diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 6e046da6ea..54f5d3f7a6 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -87,7 +87,7 @@ class InvalidMessageException private[akka](message: String, cause: Throwab * This message is thrown by default when an Actors behavior doesn't match a message */ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception { - override def getMessage() = "Actor %s does not handle [%s]".format(ref, msg) + override def getMessage = "Actor %s does not handle [%s]".format(ref, msg) override def fillInStackTrace() = this //Don't waste cycles generating stack trace } @@ -103,12 +103,12 @@ object Actor extends ListenerManagement { */ private[akka] lazy val shutdownHook = { val hook = new Runnable { - override def run { + override def run() { // Clear Thread.subclassAudits val tf = classOf[java.lang.Thread].getDeclaredField("subclassAudits") tf.setAccessible(true) val subclassAudits = tf.get(null).asInstanceOf[java.util.Map[_,_]] - subclassAudits synchronized {subclassAudits.clear} + subclassAudits synchronized {subclassAudits.clear()} } } Runtime.getRuntime.addShutdownHook(new Thread(hook)) @@ -385,6 +385,7 @@ object Actor extends ListenerManagement { "] for serialization of actor [" + address + "] since " + reason) + //todo: serializer is not used. val serializer: Serializer = { if (serializerClassName == "N/A") serializerErrorDueTo("no class name defined in configuration") val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match { @@ -643,7 +644,7 @@ trait Actor { /** * Reverts the Actor behavior to the previous one in the hotswap stack. */ - def unbecome(): Unit = { + def unbecome() { val h = self.hotswap if (h.nonEmpty) self.hotswap = h.pop } @@ -666,19 +667,21 @@ trait Actor { } } - private final def autoReceiveMessage(msg: AutoReceivedMessage): Unit = msg match { - case HotSwap(code, discardOld) => become(code(self), discardOld) - case RevertHotSwap => unbecome() - case Exit(dead, reason) => self.handleTrapExit(dead, reason) - case Link(child) => self.link(child) - case Unlink(child) => self.unlink(child) - case UnlinkAndStop(child) => self.unlink(child); child.stop() - case Restart(reason) => throw reason - case Kill => throw new ActorKilledException("Kill") - case PoisonPill => - val f = self.senderFuture - self.stop() - if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) + private final def autoReceiveMessage(msg: AutoReceivedMessage) { + msg match { + case HotSwap(code, discardOld) => become(code(self), discardOld) + case RevertHotSwap => unbecome() + case Exit(dead, reason) => self.handleTrapExit(dead, reason) + case Link(child) => self.link(child) + case Unlink(child) => self.unlink(child) + case UnlinkAndStop(child) => self.unlink(child); child.stop() + case Restart(reason) => throw reason + case Kill => throw new ActorKilledException("Kill") + case PoisonPill => + val f = self.senderFuture() + self.stop() + if (f.isDefined) f.get.completeWithException(new ActorKilledException("PoisonPill")) + } } private lazy val processingBehavior = receive //ProcessingBehavior is the original behavior diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 61543631fb..68f22f93cb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -41,13 +41,15 @@ abstract class Channel[T] { * Scala API.

* Sends the specified message to the channel. */ - def !(msg: T): Unit + def !(msg: T) /** * Java API.

* Sends the specified message to the channel. */ - def sendOneWay(msg: T): Unit = this.!(msg) + def sendOneWay(msg: T) { + this.!(msg) + } } /** @@ -116,8 +118,10 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * Defines the default timeout for an initial receive invocation. * When specified, the receive function should be able to handle a 'ReceiveTimeout' message. */ - def setReceiveTimeout(timeout: Long) = this.receiveTimeout = Some(timeout) - def getReceiveTimeout(): Option[Long] = receiveTimeout + def setReceiveTimeout(timeout: Long) { + this.receiveTimeout = Some(timeout) + } + def getReceiveTimeout: Option[Long] = receiveTimeout /** * Akka Java API.

@@ -133,7 +137,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * */ def setFaultHandler(handler: FaultHandlingStrategy) - def getFaultHandler(): FaultHandlingStrategy + def getFaultHandler: FaultHandlingStrategy /** @@ -151,8 +155,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * getContext().setLifeCycle(temporary()); * */ - def setLifeCycle(lifeCycle: LifeCycle): Unit - def getLifeCycle(): LifeCycle + def setLifeCycle(lifeCycle: LifeCycle) + def getLifeCycle: LifeCycle /** * Akka Java API.

@@ -166,8 +170,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * The default is also that all actors that are created and spawned from within this actor * is sharing the same dispatcher as its creator. */ - def setDispatcher(dispatcher: MessageDispatcher) = this.dispatcher = dispatcher - def getDispatcher(): MessageDispatcher = dispatcher + def setDispatcher(dispatcher: MessageDispatcher) {this.dispatcher = dispatcher} + def getDispatcher: MessageDispatcher = dispatcher /** * Holds the hot swapped partial function. @@ -189,7 +193,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal /** * Returns the uuid for the actor. */ - def getUuid() = _uuid + def getUuid = _uuid def uuid = _uuid /** @@ -197,14 +201,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ - def getSender(): Option[ActorRef] = sender + def getSender: Option[ActorRef] = sender /** * Akka Java API.

* The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def getSenderFuture(): Option[CompletableFuture[Any]] = senderFuture + def getSenderFuture: Option[CompletableFuture[Any]] = senderFuture /** * Is the actor being restarted? @@ -238,7 +242,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal /** * Only for internal use. UUID is effectively final. */ - protected[akka] def uuid_=(uid: Uuid) = _uuid = uid + protected[akka] def uuid_=(uid: Uuid) { + _uuid = uid + } /** * Akka Java API.

@@ -249,7 +255,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * *

*/ - def sendOneWay(message: AnyRef): Unit = sendOneWay(message, null) + def sendOneWay(message: AnyRef):Unit = { + sendOneWay(message, null) + } /** * Akka Java API.

@@ -262,7 +270,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * *

*/ - def sendOneWay(message: AnyRef, sender: ActorRef): Unit = this.!(message)(Option(sender)) + def sendOneWay(message: AnyRef, sender: ActorRef) { + this.!(message)(Option(sender)) + } /** * Akka Java API.

@@ -306,14 +316,16 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] * Uses the Actors default timeout (setTimeout()) and omits the sender */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]] + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef): Future[T] + = sendRequestReplyFuture(message, timeout, null).asInstanceOf[Future[T]] /** * Akka Java API.

* @see sendRequestReplyFuture(message: AnyRef, sender: ActorRef): Future[_] * Uses the Actors default timeout (setTimeout()) */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]] + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, sender: ActorRef): Future[T] + = sendRequestReplyFuture(message, timeout, sender).asInstanceOf[Future[T]] /** * Akka Java API.

@@ -326,15 +338,17 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * If you are sending messages using sendRequestReplyFuture then you have to use getContext().reply(..) * to send a reply message to the original sender. If not then the sender will block until the timeout expires. */ - def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]] + def sendRequestReplyFuture[T <: AnyRef](message: AnyRef, timeout: Long, sender: ActorRef): Future[T] + = !!!(message, timeout)(Option(sender)).asInstanceOf[Future[T]] /** * Akka Java API.

* Forwards the message specified to this actor and preserves the original sender of the message */ - def forward(message: AnyRef, sender: ActorRef): Unit = + def forward(message: AnyRef, sender: ActorRef) { if (sender eq null) throw new IllegalArgumentException("The 'sender' argument to 'forward' can't be null") else forward(message)(Some(sender)) + } /** * Akka Java API.

@@ -343,7 +357,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal *

* Throws an IllegalStateException if unable to determine what to reply to. */ - def replyUnsafe(message: AnyRef) = reply(message) + def replyUnsafe(message: AnyRef) { + reply(message) + } /** * Akka Java API.

@@ -357,7 +373,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(md: MessageDispatcher): Unit + def dispatcher_=(md: MessageDispatcher) /** * Get the dispatcher for this actor. @@ -373,12 +389,14 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * Shuts down the actor its dispatcher and message queue. * Alias for 'stop'. */ - def exit() = stop() + def exit() { + stop() + } /** * Shuts down the actor its dispatcher and message queue. */ - def stop(): Unit + def stop() /** * Links an other actor to this actor. Links are unidirectional and means that a the linking actor will @@ -388,12 +406,12 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * 'trap' these exceptions and automatically restart the linked actors according to the restart strategy * defined by the 'faultHandler'. */ - def link(actorRef: ActorRef): Unit + def link(actorRef: ActorRef) /** * Unlink the actor. */ - def unlink(actorRef: ActorRef): Unit + def unlink(actorRef: ActorRef) /** * Atomically start and link an actor. @@ -409,7 +427,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * Akka Java API.

* Returns the mailbox size. */ - def getMailboxSize(): Int = mailboxSize + def getMailboxSize: Int = mailboxSize /** * Returns the supervisor, if there is one. @@ -420,7 +438,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * Akka Java API.

* Returns the supervisor, if there is one. */ - def getSupervisor(): ActorRef = supervisor getOrElse null + def getSupervisor: ActorRef = supervisor getOrElse null /** * Returns an unmodifiable Java Map containing the linked actors, @@ -433,7 +451,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * Returns an unmodifiable Java Map containing the linked actors, * please note that the backing map is thread-safe but not immutable */ - def getLinkedActors(): JMap[Uuid, ActorRef] = linkedActors + def getLinkedActors: JMap[Uuid, ActorRef] = linkedActors /** * Abstraction for unification of sender and senderFuture for later reply @@ -459,9 +477,9 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal */ def getChannel: Channel[Any] = channel - protected[akka] def invoke(messageHandle: MessageInvocation): Unit + protected[akka] def invoke(messageHandle: MessageInvocation) - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -473,16 +491,16 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal protected[akka] def actor: Actor = actorInstance.get - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit + protected[akka] def supervisor_=(sup: Option[ActorRef]) protected[akka] def mailbox: AnyRef protected[akka] def mailbox_=(value: AnyRef): AnyRef - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) override def hashCode: Int = HashCode.hash(HashCode.SEED, uuid) @@ -541,7 +559,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, _supervisor = __supervisor hotswap = __hotswap setActorSelfFields(actor,this) - start + start() } // ========= PUBLIC FUNCTIONS ========= @@ -549,11 +567,13 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, /** * Sets the dispatcher for this actor. Needs to be invoked before the actor is started. */ - def dispatcher_=(md: MessageDispatcher): Unit = guard.withGuard { - if (!isBeingRestarted) { - if (!isRunning) _dispatcher = md - else throw new ActorInitializationException( - "Can not swap dispatcher for " + toString + " after it has been started") + def dispatcher_=(md: MessageDispatcher) { + guard.withGuard { + if (!isBeingRestarted) { + if (!isRunning) _dispatcher = md + else throw new ActorInitializationException( + "Can not swap dispatcher for " + toString + " after it has been started") + } } } @@ -585,23 +605,25 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, /** * Shuts down the actor its dispatcher and message queue. */ - def stop() = guard.withGuard { - if (isRunning) { - receiveTimeout = None - cancelReceiveTimeout - dispatcher.detach(this) - _status = ActorRefInternals.SHUTDOWN - try { - actor.postStop - } finally { - currentMessage = null - Actor.registry.unregister(this) - if (isRemotingEnabled) - Actor.remote.unregister(this) + def stop() { + guard.withGuard { + if (isRunning) { + receiveTimeout = None + cancelReceiveTimeout + dispatcher.detach(this) + _status = ActorRefInternals.SHUTDOWN + try { + actor.postStop() + } finally { + currentMessage = null + Actor.registry.unregister(this) + if (isRemotingEnabled) + Actor.remote.unregister(this) - setActorSelfFields(actorInstance.get,null) - } - } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") + setActorSelfFields(actorInstance.get, null) + } + } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") + } } /** @@ -614,15 +636,17 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, *

* To be invoked from within the actor itself. */ - def link(actorRef: ActorRef): Unit = guard.withGuard { - val actorRefSupervisor = actorRef.supervisor - val hasSupervisorAlready = actorRefSupervisor.isDefined - if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy - else if (hasSupervisorAlready) throw new IllegalActorStateException( - "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") - else { - _linkedActors.put(actorRef.uuid, actorRef) - actorRef.supervisor = Some(this) + def link(actorRef: ActorRef) { + guard.withGuard { + val actorRefSupervisor = actorRef.supervisor + val hasSupervisorAlready = actorRefSupervisor.isDefined + if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return // we already supervise this guy + else if (hasSupervisorAlready) throw new IllegalActorStateException( + "Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails") + else { + _linkedActors.put(actorRef.uuid, actorRef) + actorRef.supervisor = Some(this) + } } } @@ -631,11 +655,13 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, *

* To be invoked from within the actor itself. */ - def unlink(actorRef: ActorRef) = guard.withGuard { - if(_linkedActors.remove(actorRef.uuid) eq null) - throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") + def unlink(actorRef: ActorRef) { + guard.withGuard { + if (_linkedActors.remove(actorRef.uuid) eq null) + throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink") - actorRef.supervisor = None + actorRef.supervisor = None + } } /** @@ -663,10 +689,13 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, // ========= AKKA PROTECTED FUNCTIONS ========= - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup + protected[akka] def supervisor_=(sup: Option[ActorRef]) { + _supervisor = sup + } - protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = - dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) + protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) { + dispatcher dispatchMessage new MessageInvocation(this, message, senderOption, None) + } protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -682,8 +711,8 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, /** * Callback for the dispatcher. This is the single entry point to the user Actor implementation. */ - protected[akka] def invoke(messageHandle: MessageInvocation): Unit = { - guard.lock.lock + protected[akka] def invoke(messageHandle: MessageInvocation) { + guard.lock.lock() try { if (!isShutdown) { currentMessage = messageHandle @@ -706,7 +735,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, throw e } } - } finally { guard.lock.unlock } + } finally { guard.lock.unlock() } } protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { @@ -768,7 +797,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, val freshActor = newActor setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call - freshActor.preStart + freshActor.preStart() freshActor.postRestart(reason) } } @@ -779,7 +808,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) } - stop + stop() } @tailrec def attemptRestart() { @@ -858,7 +887,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, true } - private def handleExceptionInDispatch(reason: Throwable, message: Any) = { + private def handleExceptionInDispatch(reason: Throwable, message: Any) { EventHandler.error(reason, this, message.toString) //Prevent any further messages to be processed until the actor has been restarted @@ -875,7 +904,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, } } - private def notifySupervisorWithMessage(notification: LifeCycleMessage) = { + private def notifySupervisorWithMessage(notification: LifeCycleMessage) { // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client _supervisor.foreach { sup => if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors @@ -920,19 +949,19 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () => Actor, lookupAndSetSelfFields(actor.getClass, actor, value) } - private def initializeActorInstance = { - actor.preStart // run actor preStart + private def initializeActorInstance() { + actor.preStart() // run actor preStart Actor.registry.register(this) } - protected[akka] def checkReceiveTimeout = { - cancelReceiveTimeout + protected[akka] def checkReceiveTimeout() { + cancelReceiveTimeout() if (receiveTimeout.isDefined && dispatcher.mailboxSize(this) <= 0) { //Only reschedule if desired and there are currently no more messages to be processed _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, receiveTimeout.get, TimeUnit.MILLISECONDS)) } } - protected[akka] def cancelReceiveTimeout = { + protected[akka] def cancelReceiveTimeout() { if (_futureTimeout.isDefined) { _futureTimeout.get.cancel(true) _futureTimeout = None @@ -962,7 +991,7 @@ private[akka] case class RemoteActorRef private[akka] ( val actorType: ActorType = ActorType.ScalaActor) extends ActorRef with ScalaActorRef { - ensureRemotingEnabled + ensureRemotingEnabled() timeout = _timeout // FIXME BAD, we should not have different ActorRefs @@ -980,10 +1009,11 @@ private[akka] case class RemoteActorRef private[akka] ( "Actor with Address [" + address + "] is not bound to a Clustered Deployment") } - start + start() - def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = + def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) { Actor.remote.send[Any](message, senderOption, None, remoteAddress, timeout, true, this, None, actorType, loader) + } def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, @@ -998,35 +1028,53 @@ private[akka] case class RemoteActorRef private[akka] ( else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString) } - def start: ActorRef = synchronized { + def start(): ActorRef = synchronized { _status = ActorRefInternals.RUNNING this } - def stop: Unit = synchronized { - if (_status == ActorRefInternals.RUNNING) { - _status = ActorRefInternals.SHUTDOWN - postMessageToMailbox(RemoteActorSystemMessage.Stop, None) + def stop() { + synchronized { + if (_status == ActorRefInternals.RUNNING) { + _status = ActorRefInternals.SHUTDOWN + postMessageToMailbox(RemoteActorSystemMessage.Stop, None) + } } } // ==== NOT SUPPORTED ==== @deprecated("Will be removed without replacement, doesn't make any sense to have in the face of `become` and `unbecome`", "1.1") def actorClass: Class[_ <: Actor] = unsupported - def dispatcher_=(md: MessageDispatcher): Unit = unsupported + def dispatcher_=(md: MessageDispatcher) { + unsupported + } def dispatcher: MessageDispatcher = unsupported - def link(actorRef: ActorRef): Unit = unsupported - def unlink(actorRef: ActorRef): Unit = unsupported + def link(actorRef: ActorRef) { + unsupported + } + def unlink(actorRef: ActorRef) { + unsupported + } def startLink(actorRef: ActorRef): ActorRef = unsupported def supervisor: Option[ActorRef] = unsupported def linkedActors: JMap[Uuid, ActorRef] = unsupported protected[akka] def mailbox: AnyRef = unsupported protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported - protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported - protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported - protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = unsupported + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { + unsupported + } + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + unsupported + } + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + unsupported + } + protected[akka] def invoke(messageHandle: MessageInvocation) { + unsupported + } + protected[akka] def supervisor_=(sup: Option[ActorRef]) { + unsupported + } protected[akka] def actorInstance: AtomicReference[Actor] = unsupported private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") } @@ -1127,7 +1175,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => * *

*/ - def !(message: Any)(implicit sender: Option[ActorRef] = None): Unit = { + def !(message: Any)(implicit sender: Option[ActorRef] = None) { if (isRunning) postMessageToMailbox(message, sender) else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start()' before using it") @@ -1199,12 +1247,14 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef => *

* Throws an IllegalStateException if unable to determine what to reply to. */ - def reply(message: Any) = if (!reply_?(message)) throw new IllegalActorStateException( - "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably: " + - "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + - "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." + - "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope") + def reply(message: Any) { + if (!reply_?(message)) throw new IllegalActorStateException( + "\n\tNo sender in scope, can't reply. " + + "\n\tYou have probably: " + + "\n\t\t1. Sent a message to an Actor from an instance that is NOT an Actor." + + "\n\t\t2. Invoked a method on an TypedActor from an instance NOT an TypedActor." + + "\n\tElse you might want to use 'reply_?' which returns Boolean(true) if succes and Boolean(false) if no sender in scope") + } /** * Use reply_?(..) to reply with a message to the original sender of the message currently diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index ba5233f48b..2614767e94 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -687,13 +687,13 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } def await(atMost: Duration) = { - _lock.lock + _lock.lock() if (try { awaitUnsafe(atMost.toNanos min timeLeft()) } finally { _lock.unlock }) this else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") } def await = { - _lock.lock + _lock.lock() if (try { awaitUnsafe(timeLeft()) } finally { _lock.unlock }) this else throw new FutureTimeoutException("Futures timed out after [" + NANOS.toMillis(timeoutInNanos) + "] milliseconds") } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 85d1bda374..5fbc97ed36 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -16,22 +16,24 @@ import akka.actor._ /** * @author Jonas Bonér */ -final case class MessageInvocation(val receiver: ActorRef, - val message: Any, - val sender: Option[ActorRef], - val senderFuture: Option[CompletableFuture[Any]]) { +final case class MessageInvocation(receiver: ActorRef, + message: Any, + sender: Option[ActorRef], + senderFuture: Option[CompletableFuture[Any]]) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") - def invoke = try { - receiver.invoke(this) - } catch { - case e: NullPointerException => throw new ActorInitializationException( - "Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).") + def invoke() { + try { + receiver.invoke(this) + } catch { + case e: NullPointerException => throw new ActorInitializationException( + "Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).") + } } } final case class FutureInvocation[T](future: CompletableFuture[T], function: () => T, cleanup: () => Unit) extends Runnable { - def run = { + def run() { future complete (try { Right(function()) } catch { @@ -46,22 +48,23 @@ final case class FutureInvocation[T](future: CompletableFuture[T], function: () object MessageDispatcher { val UNSCHEDULED = 0 - val SCHEDULED = 1 + val SCHEDULED = 1 val RESCHEDULED = 2 implicit def defaultGlobalDispatcher = Dispatchers.defaultGlobalDispatcher } /** - * @author Jonas Bonér + * @author Jonas Bonér */ trait MessageDispatcher { + import MessageDispatcher._ - protected val uuids = new ConcurrentSkipListSet[Uuid] + protected val uuids = new ConcurrentSkipListSet[Uuid] protected val futures = new AtomicLong(0L) - protected val guard = new ReentrantGuard - protected val active = new Switch(false) + protected val guard = new ReentrantGuard + protected val active = new Switch(false) private var shutdownSchedule = UNSCHEDULED //This can be non-volatile since it is protected by guard withGuard @@ -73,18 +76,24 @@ trait MessageDispatcher { /** * Attaches the specified actorRef to this dispatcher */ - final def attach(actorRef: ActorRef): Unit = guard withGuard { - register(actorRef) + final def attach(actorRef: ActorRef) { + guard withGuard { + register(actorRef) + } } /** * Detaches the specified actorRef from this dispatcher */ - final def detach(actorRef: ActorRef): Unit = guard withGuard { - unregister(actorRef) + final def detach(actorRef: ActorRef) { + guard withGuard { + unregister(actorRef) + } } - private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation) + private[akka] final def dispatchMessage(invocation: MessageInvocation) { + dispatch(invocation) + } private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = { futures.getAndIncrement() @@ -92,7 +101,11 @@ trait MessageDispatcher { val future = new DefaultCompletableFuture[T](timeout) if (active.isOff) - guard withGuard { active.switchOn { start } } + guard withGuard { + active.switchOn { + start() + } + } executeFuture(FutureInvocation[T](future, block, futureCleanup)) future @@ -104,7 +117,7 @@ trait MessageDispatcher { } private val futureCleanup: () => Unit = - () => if (futures.decrementAndGet() == 0) { + () => if (futures.decrementAndGet() == 0) { guard withGuard { if (futures.get == 0 && uuids.isEmpty) { shutdownSchedule match { @@ -126,7 +139,7 @@ trait MessageDispatcher { uuids add actorRef.uuid if (active.isOff) { active.switchOn { - start + start() } } } @@ -134,7 +147,7 @@ trait MessageDispatcher { private[akka] def unregister(actorRef: ActorRef) = { if (uuids remove actorRef.uuid) { actorRef.mailbox = null - if (uuids.isEmpty && futures.get == 0){ + if (uuids.isEmpty && futures.get == 0) { shutdownSchedule match { case UNSCHEDULED => shutdownSchedule = SCHEDULED @@ -150,31 +163,33 @@ trait MessageDispatcher { /** * Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors */ - def stopAllAttachedActors { + def stopAllAttachedActors() { val i = uuids.iterator while (i.hasNext()) { val uuid = i.next() Actor.registry.local.actorFor(uuid) match { case Some(actor) => actor.stop() - case None => {} + case None => {} } } } private val shutdownAction = new Runnable { - def run = guard withGuard { - shutdownSchedule match { - case RESCHEDULED => - shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED => - if (uuids.isEmpty && futures.get == 0) { - active switchOff { - shutdown // shut down in the dispatcher's references is zero + def run() { + guard withGuard { + shutdownSchedule match { + case RESCHEDULED => + shutdownSchedule = SCHEDULED + Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) + case SCHEDULED => + if (uuids.isEmpty && futures.get == 0) { + active switchOff { + shutdown() // shut down in the dispatcher's references is zero + } } - } - shutdownSchedule = UNSCHEDULED - case UNSCHEDULED => //Do nothing + shutdownSchedule = UNSCHEDULED + case UNSCHEDULED => //Do nothing + } } } } @@ -188,29 +203,29 @@ trait MessageDispatcher { /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference */ - def suspend(actorRef: ActorRef): Unit + def suspend(actorRef: ActorRef) /* * After the call to this method, the dispatcher must begin any new message processing for the specified reference */ - def resume(actorRef: ActorRef): Unit + def resume(actorRef: ActorRef) /** * Will be called when the dispatcher is to queue an invocation for execution */ - private[akka] def dispatch(invocation: MessageInvocation): Unit + private[akka] def dispatch(invocation: MessageInvocation) - private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit + private[akka] def executeFuture(invocation: FutureInvocation[_]) /** * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown */ - private[akka] def start(): Unit + private[akka] def start() /** * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached */ - private[akka] def shutdown(): Unit + private[akka] def shutdown() /** * Returns the size of the mailbox for the specified actor @@ -235,25 +250,30 @@ abstract class MessageDispatcherConfigurator { def mailboxType(config: Configuration): MailboxType = { val capacity = config.getInt("mailbox-capacity", Dispatchers.MAILBOX_CAPACITY) if (capacity < 1) UnboundedMailbox() - else BoundedMailbox(capacity, Duration(config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), TIME_UNIT)) + else { + val duration = Duration( + config.getInt("mailbox-push-timeout-time", Dispatchers.MAILBOX_PUSH_TIME_OUT.toMillis.toInt), + TIME_UNIT) + BoundedMailbox(capacity, duration) + } } def configureThreadPool(config: Configuration, createDispatcher: => (ThreadPoolConfig) => MessageDispatcher): ThreadPoolConfigDispatcherBuilder = { import ThreadPoolConfigDispatcherBuilder.conf_? //Apply the following options to the config if they are present in the config - ThreadPoolConfigDispatcherBuilder(createDispatcher,ThreadPoolConfig()).configure( - conf_?(config getInt "keep-alive-time" )(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), + ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure( + conf_?(config getInt "keep-alive-time")(time => _.setKeepAliveTime(Duration(time, TIME_UNIT))), conf_?(config getDouble "core-pool-size-factor")(factor => _.setCorePoolSizeFromFactor(factor)), - conf_?(config getDouble "max-pool-size-factor" )(factor => _.setMaxPoolSizeFromFactor(factor)), - conf_?(config getInt "executor-bounds" )(bounds => _.setExecutorBounds(bounds)), - conf_?(config getBool "allow-core-timeout" )(allow => _.setAllowCoreThreadTimeout(allow)), + conf_?(config getDouble "max-pool-size-factor")(factor => _.setMaxPoolSizeFromFactor(factor)), + conf_?(config getInt "executor-bounds")(bounds => _.setExecutorBounds(bounds)), + conf_?(config getBool "allow-core-timeout")(allow => _.setAllowCoreThreadTimeout(allow)), conf_?(config getString "rejection-policy" map { - case "abort" => new AbortPolicy() - case "caller-runs" => new CallerRunsPolicy() + case "abort" => new AbortPolicy() + case "caller-runs" => new CallerRunsPolicy() case "discard-oldest" => new DiscardOldestPolicy() - case "discard" => new DiscardPolicy() - case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) + case "discard" => new DiscardPolicy() + case x => throw new IllegalArgumentException("[%s] is not a valid rejectionPolicy!" format x) })(policy => _.setRejectionPolicy(policy))) } } diff --git a/akka-actor/src/main/scala/akka/serialization/Format.scala b/akka-actor/src/main/scala/akka/serialization/Format.scala index 3f63916a8e..bd7cb09392 100644 --- a/akka-actor/src/main/scala/akka/serialization/Format.scala +++ b/akka-actor/src/main/scala/akka/serialization/Format.scala @@ -46,7 +46,7 @@ object Format { val bos = new ByteArrayOutputStream val out = new ObjectOutputStream(bos) out.writeObject(obj) - out.close + out.close() bos.toByteArray } @@ -55,7 +55,7 @@ object Format { //if (classLoader.isDefined) new ClassLoaderObjectInputStream(classLoader.get, new ByteArrayInputStream(bytes)) else new ObjectInputStream(new ByteArrayInputStream(bytes)) val obj = in.readObject - in.close + in.close() obj } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index ea21ef84a9..f17b4c9997 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -37,7 +37,6 @@ import akka.serialization.{Format, Serializers} import akka.serialization.Compression.LZF import akka.AkkaException -//import akka.monitoring.Monitoring import akka.cluster.zookeeper._ import com.eaio.uuid.UUID @@ -55,12 +54,12 @@ class ClusterException(message: String) extends AkkaException(message) * @author Jonas Bonér */ trait ClusterNodeMBean { - def start: Unit - def stop: Unit + def start() + def stop() - def disconnect: Unit - def reconnect: Unit - def resign: Unit + def disconnect() + def reconnect() + def resign() def isConnected: Boolean @@ -86,9 +85,9 @@ trait ClusterNodeMBean { def getUuidsForActorsInUseOnNode(nodeName: String): Array[String] def getAddressesForActorsInUseOnNode(nodeName: String): Array[String] - def setConfigElement(key: String, value: String): Unit + def setConfigElement(key: String, value: String) def getConfigElement(key: String): AnyRef - def removeConfigElement(key: String): Unit + def removeConfigElement(key: String) def getConfigElementKeys: Array[String] } @@ -116,22 +115,24 @@ object Cluster { * For Scala API. */ trait ChangeListener { - def notify(event: ChangeNotification, client: ClusterNode) = event match { - case NodeConnected(name) => nodeConnected(name, client) - case NodeDisconnected(name) => nodeDisconnected(name, client) - case NewLeader(name: String) => newLeader(name, client) - case NewSession => thisNodeNewSession(client) - case ThisNode.Connected => thisNodeConnected(client) - case ThisNode.Disconnected => thisNodeDisconnected(client) - case ThisNode.Expired => thisNodeExpired(client) + def notify(event: ChangeNotification, client: ClusterNode) { + event match { + case NodeConnected(name) => nodeConnected(name, client) + case NodeDisconnected(name) => nodeDisconnected(name, client) + case NewLeader(name: String) => newLeader(name, client) + case NewSession => thisNodeNewSession(client) + case ThisNode.Connected => thisNodeConnected(client) + case ThisNode.Disconnected => thisNodeDisconnected(client) + case ThisNode.Expired => thisNodeExpired(client) + } } - def nodeConnected(node: String, client: ClusterNode) = {} - def nodeDisconnected(node: String, client: ClusterNode) = {} - def newLeader(name: String, client: ClusterNode) = {} - def thisNodeNewSession(client: ClusterNode) = {} - def thisNodeConnected(client: ClusterNode) = {} - def thisNodeDisconnected(client: ClusterNode) = {} - def thisNodeExpired(client: ClusterNode) = {} + def nodeConnected(node: String, client: ClusterNode) {} + def nodeDisconnected(node: String, client: ClusterNode) {} + def newLeader(name: String, client: ClusterNode) {} + def thisNodeNewSession(client: ClusterNode) {} + def thisNodeConnected(client: ClusterNode) {} + def thisNodeDisconnected(client: ClusterNode) {} + def thisNodeExpired(client: ClusterNode) {} } /** @@ -156,11 +157,12 @@ object Cluster { val defaultSerializer = new SerializableSerializer private val _zkServer = new AtomicReference[Option[ZkServer]](None) - private val _nodes = new AtomicReference(new Nodes) + private val _nodes = new AtomicReference[Nodes](new Nodes) private val _clusterNames = new ConcurrentSkipListSet[String] - private[cluster] def updateNodes(f: Nodes => Nodes) = + private[cluster] def updateNodes(f: Nodes => Nodes) { while (Some(_nodes.get).map(node => _nodes.compareAndSet(node, f(node)) == false).get) {} + } /** * Looks up the local hostname. @@ -288,18 +290,22 @@ object Cluster { *

* WARNING: Use with care */ - def reset(): Unit = withPrintStackTraceOnError { - EventHandler.info(this, "Resetting all clusters connected to in this JVM") - if (!clusters.isEmpty) { - nodes foreach { tp => - val (_, node) = tp - node.disconnect - node.remoteService.shutdown + def reset() { + withPrintStackTraceOnError { + EventHandler.info(this, "Resetting all clusters connected to in this JVM") + + if (!clusters.isEmpty) { + nodes foreach { + tp => + val (_, node) = tp + node.disconnect() + node.remoteService.shutdown() + } + implicit val zkClient = newZkClient + clusters foreach (resetNodesInCluster(_)) + ignore[ZkNoNodeException](zkClient.deleteRecursive(ZooKeeperBarrier.BarriersNode)) + zkClient.close() } - implicit val zkClient = newZkClient - clusters foreach (resetNodesInCluster(_)) - ignore[ZkNoNodeException](zkClient.deleteRecursive(ZooKeeperBarrier.BarriersNode)) - zkClient.close } } @@ -314,11 +320,13 @@ object Cluster { /** * Shut down the local ZooKeeper server. */ - def shutdownLocalCluster() = withPrintStackTraceOnError { - EventHandler.info(this, "Shuts down local cluster") - reset - _zkServer.get.foreach(_.shutdown) - _zkServer.set(None) + def shutdownLocalCluster() { + withPrintStackTraceOnError { + EventHandler.info(this, "Shuts down local cluster") + reset() + _zkServer.get.foreach(_.shutdown()) + _zkServer.set(None) + } } /** @@ -350,6 +358,8 @@ object Cluster { } /** + * A Cluster is made up by a bunch of jvm's, the ClusterNode. + * * @author Jonas Bonér */ class ClusterNode private[akka] ( @@ -371,13 +381,13 @@ class ClusterNode private[akka] ( val remoteClientLifeCycleListener = actorOf(new Actor { def receive = { - case RemoteClientError(cause, client, address) => client.shutdownClientModule - case RemoteClientDisconnected(client, address) => client.shutdownClientModule - case _ => //ignore other + case RemoteClientError(cause, client, address) => client.shutdownClientModule() + case RemoteClientDisconnected(client, address) => client.shutdownClientModule() + case _ => //ignore other } - }, "akka.cluster.remoteClientLifeCycleListener").start + }, "akka.cluster.remoteClientLifeCycleListener").start() - val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start + val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start() val remoteService: RemoteSupport = { val remote = new akka.remote.netty.NettyRemoteSupport @@ -441,13 +451,13 @@ class ClusterNode private[akka] ( private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer) private[cluster] val leaderElectionCallback = new LockListener { - def lockAcquired { + override def lockAcquired() { EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName)) self.isLeader.set(true) self.publish(Cluster.NewLeader(self.nodeAddress.nodeName)) } - def lockReleased { + override def lockReleased() { EventHandler.info(this, "Node [%s] is *NOT* the leader anymore".format(self.nodeAddress.nodeName)) self.isLeader.set(false) @@ -473,44 +483,47 @@ class ClusterNode private[akka] ( def start(): ClusterNode = { isConnected switchOn { - initializeNode + initializeNode() } this } - def stop(): Unit = isConnected switchOff { - ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) + def stop() { + isConnected switchOff { + ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath)) - locallyCachedMembershipNodes.clear - locallyCheckedOutActors.clear + locallyCachedMembershipNodes.clear() + locallyCheckedOutActors.clear() - replicaConnections.toList.foreach({ case (_, (address, _)) => - remote.shutdownClientConnection(address) // shut down client connections - }) + replicaConnections.toList.foreach({ + case (_, (address, _)) => + Actor.remote.shutdownClientConnection(address) // shut down client connections + }) - remoteService.shutdown // shutdown server + remoteService.shutdown() // shutdown server - remoteClientLifeCycleListener.stop - remoteDaemon.stop + remoteClientLifeCycleListener.stop() + remoteDaemon.stop() - // for monitoring remote listener - registry.local.actors.filter(remoteService.hasListener).foreach(_.stop) + // for monitoring remote listener + registry.local.actors.filter(remoteService.hasListener).foreach(_.stop()) - replicaConnections.clear - updateNodes(_ - nodeAddress) + replicaConnections.clear() + updateNodes(_ - nodeAddress) - disconnect() - EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) + disconnect() + EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress)) + } } def disconnect(): ClusterNode = { - zkClient.unsubscribeAll - zkClient.close + zkClient.unsubscribeAll() + zkClient.close() this } def reconnect(): ClusterNode = { - zkClient.reconnect + zkClient.reconnect() this } @@ -526,7 +539,9 @@ class ClusterNode private[akka] ( this } else throw new IllegalStateException("Can not register 'ChangeListener' after the cluster node has been started") - private[cluster] def publish(change: ChangeNotification) = changeListeners.iterator.foreach(_.notify(change, this)) + private[cluster] def publish(change: ChangeNotification) { + changeListeners.iterator.foreach(_.notify(change, this)) + } // ======================================= // Leader @@ -540,7 +555,7 @@ class ClusterNode private[akka] ( /** * Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op. */ - def resign() { if (isLeader.get) leaderLock.unlock } + def resign() { if (isLeader.get) leaderLock.unlock() } // ======================================= // Actor @@ -685,7 +700,7 @@ class ClusterNode private[akka] ( /** * Removes actor with uuid from the cluster. */ - def remove(uuid: UUID) = { + def remove(uuid: UUID) { releaseActorOnAllNodes(uuid) locallyCheckedOutActors.remove(uuid) @@ -766,7 +781,7 @@ class ClusterNode private[akka] ( // val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format)) val actor = fromBinary[T](bytes, remoteServerAddress)(format) remoteService.register(UUID_PREFIX + uuid, actor) // clustered refs are always registered and looked up by UUID - actor.start + actor.start() actor.asInstanceOf[LocalActorRef] case Right(exception) => throw exception } @@ -776,16 +791,20 @@ class ClusterNode private[akka] ( /** * Using (checking out) all actors with a specific UUID on all nodes in the cluster. */ - def useActorOnAllNodes(uuid: UUID): Unit = isConnected ifOn { - EventHandler.debug(this, - "Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid)) - val command = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(USE) - .setActorUuid(uuidToUuidProtocol(uuid)) - .build - membershipNodes foreach { node => - replicaConnections.get(node) foreach { case (_, connection) => - connection ! command + def useActorOnAllNodes(uuid: UUID) { + isConnected ifOn { + EventHandler.debug(this, + "Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid)) + val command = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(USE) + .setActorUuid(uuidToUuidProtocol(uuid)) + .build + membershipNodes foreach { + node => + replicaConnections.get(node) foreach { + case (_, connection) => + connection ! command + } } } } @@ -793,43 +812,53 @@ class ClusterNode private[akka] ( /** * Using (checking out) specific UUID on a specefic node. */ - def useActorOnNode(node: String, uuid: UUID): Unit = isConnected ifOn { - replicaConnections.get(node) foreach { case (_, connection) => - connection ! RemoteDaemonMessageProtocol.newBuilder - .setMessageType(USE) - .setActorUuid(uuidToUuidProtocol(uuid)) - .build + def useActorOnNode(node: String, uuid: UUID) { + isConnected ifOn { + replicaConnections.get(node) foreach { + case (_, connection) => + connection ! RemoteDaemonMessageProtocol.newBuilder + .setMessageType(USE) + .setActorUuid(uuidToUuidProtocol(uuid)) + .build + } } } /** * Checks in an actor after done using it on this node. */ - def release(actorAddress: String): Unit = isConnected ifOn { - actorUuidsForActorAddress(actorAddress) foreach { uuid => - EventHandler.debug(this, - "Releasing actor with UUID [%s] after usage".format(uuid)) - locallyCheckedOutActors.remove(uuid) - ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid))) - ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(nodeAddress.nodeName, uuid))) - ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, nodeAddress))) - ignore[ZkNoNodeException](zkClient.delete(actorRegistryNodePathFor(uuid, remoteServerAddress))) + def release(actorAddress: String) { + isConnected ifOn { + actorUuidsForActorAddress(actorAddress) foreach { + uuid => + EventHandler.debug(this, + "Releasing actor with UUID [%s] after usage".format(uuid)) + locallyCheckedOutActors.remove(uuid) + ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid))) + ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(nodeAddress.nodeName, uuid))) + ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, nodeAddress))) + ignore[ZkNoNodeException](zkClient.delete(actorRegistryNodePathFor(uuid, remoteServerAddress))) + } } } /** * Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'. */ - def releaseActorOnAllNodes(uuid: UUID): Unit = isConnected ifOn { - EventHandler.debug(this, - "Releasing (checking in) all actors with UUID [%s] on all nodes in cluster".format(uuid)) - val command = RemoteDaemonMessageProtocol.newBuilder - .setMessageType(RELEASE) - .setActorUuid(uuidToUuidProtocol(uuid)) - .build - nodesForActorsInUseWithUuid(uuid) foreach { node => - replicaConnections.get(node) foreach { case (_, connection) => - connection ! command + def releaseActorOnAllNodes(uuid: UUID) { + isConnected ifOn { + EventHandler.debug(this, + "Releasing (checking in) all actors with UUID [%s] on all nodes in cluster".format(uuid)) + val command = RemoteDaemonMessageProtocol.newBuilder + .setMessageType(RELEASE) + .setActorUuid(uuidToUuidProtocol(uuid)) + .build + nodesForActorsInUseWithUuid(uuid) foreach { + node => + replicaConnections.get(node) foreach { + case (_, connection) => + connection ! command + } } } } @@ -845,8 +874,11 @@ class ClusterNode private[akka] ( EventHandler.debug(this, "Creating cluster actor ref with router [%s] for actors [%s]".format(router, addresses.mkString(", "))) - def registerClusterActorRefForAddress(actorRef: ClusterActorRef, addresses: Array[(UUID, InetSocketAddress)]) = - addresses foreach { case (_, address) => clusterActorRefs.put(address, actorRef) } + def registerClusterActorRefForAddress(actorRef: ClusterActorRef, addresses: Array[(UUID, InetSocketAddress)]) { + addresses foreach { + case (_, address) => clusterActorRefs.put(address, actorRef) + } + } // FIXME remove? def refByUuid(uuid: UUID): ActorRef = { @@ -874,19 +906,23 @@ class ClusterNode private[akka] ( /** * Migrate the actor from 'this' node to node 'to'. */ - def migrate(to: NodeAddress, actorAddress: String): Unit = migrate(nodeAddress, to, actorAddress) + def migrate(to: NodeAddress, actorAddress: String) { + migrate(nodeAddress, to, actorAddress) + } /** * Migrate the actor from node 'from' to node 'to'. */ def migrate( - from: NodeAddress, to: NodeAddress, actorAddress: String): Unit = isConnected ifOn { - if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'") - if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'") - if (isInUseOnNode(actorAddress, from)) { - migrateWithoutCheckingThatActorResidesOnItsHomeNode(from, to, actorAddress) - } else { - throw new ClusterException("Can't move actor from node [" + from + "] since it does not exist on this node") + from: NodeAddress, to: NodeAddress, actorAddress: String) { + isConnected ifOn { + if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'") + if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'") + if (isInUseOnNode(actorAddress, from)) { + migrateWithoutCheckingThatActorResidesOnItsHomeNode(from, to, actorAddress) + } else { + throw new ClusterException("Can't move actor from node [" + from + "] since it does not exist on this node") + } } } @@ -1013,7 +1049,7 @@ class ClusterNode private[akka] ( /** * Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument). */ - def send(f: Function0[Unit], replicationFactor: Int): Unit = { + def send(f: Function0[Unit], replicationFactor: Int) { val message = RemoteDaemonMessageProtocol.newBuilder .setMessageType(FUNCTION_FUN0_UNIT) .setPayload(ByteString.copyFrom(Serializers.Java.toBinary(f))) @@ -1038,7 +1074,7 @@ class ClusterNode private[akka] ( * Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument) * with the argument speficied. */ - def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int): Unit = { + def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int) { val message = RemoteDaemonMessageProtocol.newBuilder .setMessageType(FUNCTION_FUN1_ARG_UNIT) .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((f, arg)))) @@ -1093,10 +1129,12 @@ class ClusterNode private[akka] ( case e: KeeperException.NoNodeException => null } - def removeConfigElement(key: String) = ignore[ZkNoNodeException]{ - EventHandler.debug(this, - "Removing config element with key [%s] from cluster registry".format(key)) - zkClient.deleteRecursive(configurationPathFor(key)) + def removeConfigElement(key: String) { + ignore[ZkNoNodeException] { + EventHandler.debug(this, + "Removing config element with key [%s] from cluster registry".format(key)) + zkClient.deleteRecursive(configurationPathFor(key)) + } } def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_NODE).toList.toArray.asInstanceOf[Array[String]] @@ -1141,15 +1179,15 @@ class ClusterNode private[akka] ( private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String = "%s/%s:%s".format(actorRegistryNodePathFor(uuid), address.getHostName, address.getPort) - private[cluster] def initializeNode = { + private[cluster] def initializeNode() { EventHandler.info(this, "Initializing cluster node [%s]".format(nodeAddress)) - createRootClusterNode + createRootClusterNode() val isLeader = joinLeaderElection - if (isLeader) createNodeStructureIfNeeded + if (isLeader) createNodeStructureIfNeeded() registerListeners - joinMembershipNode - joinActorsAtAddressNode - fetchMembershipChildrenNodes + joinMembershipNode() + joinActorsAtAddressNode() + fetchMembershipChildrenNodes() EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress)) } @@ -1173,7 +1211,7 @@ class ClusterNode private[akka] ( var replicas = HashSet.empty[ActorRef] if (replicationFactor < 1) return replicas - connectToAllReplicas + connectToAllReplicas() val numberOfReplicas = replicaConnections.size val replicaConnectionsAsArray = replicaConnections.toList map { case (node, (address, actorRef)) => actorRef } // the ActorRefs @@ -1196,7 +1234,7 @@ class ClusterNode private[akka] ( /** * Connect to all available replicas unless already connected). */ - private def connectToAllReplicas = { + private def connectToAllReplicas() { membershipNodes foreach { node => if (!replicaConnections.contains(node)) { val address = addressForNode(node) @@ -1206,7 +1244,7 @@ class ClusterNode private[akka] ( } } - private[cluster] def joinMembershipNode = { + private[cluster] def joinMembershipNode() { nodeNameToAddress.put(nodeAddress.nodeName, remoteServerAddress) try { EventHandler.info(this, @@ -1220,8 +1258,9 @@ class ClusterNode private[akka] ( } } - private[cluster] def joinActorsAtAddressNode = + private[cluster] def joinActorsAtAddressNode() { ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName))) + } private[cluster] def joinLeaderElection: Boolean = { EventHandler.info(this, "Node [%s] is joining leader election".format(nodeAddress.nodeName)) @@ -1328,12 +1367,14 @@ class ClusterNode private[akka] ( private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String] = (locallyCachedMembershipNodes diff Set(nodes: _*)).toList - private def createRootClusterNode: Unit = ignore[ZkNodeExistsException] { + private def createRootClusterNode() { + ignore[ZkNodeExistsException] { zkClient.create(CLUSTER_NODE, null, CreateMode.PERSISTENT) EventHandler.info(this, "Created node [%s]".format(CLUSTER_NODE)) + } } - private def createNodeStructureIfNeeded = { + private def createNodeStructureIfNeeded() { baseNodes.foreach { path => try { zkClient.create(path, null, CreateMode.PERSISTENT) @@ -1353,9 +1394,9 @@ class ClusterNode private[akka] ( zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener) } - private def fetchMembershipChildrenNodes = { + private def fetchMembershipChildrenNodes() { val membershipChildren = zkClient.getChildren(MEMBERSHIP_NODE) - locallyCachedMembershipNodes.clear + locallyCachedMembershipNodes.clear() membershipChildren.iterator.foreach(locallyCachedMembershipNodes.add) } @@ -1363,41 +1404,41 @@ class ClusterNode private[akka] ( val clusterMBean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean { import Cluster._ - def start = self.start - def stop = self.stop + override def start() {self.start()} + override def stop() {self.stop()} - def disconnect = self.disconnect - def reconnect = self.reconnect - def resign = self.resign + override def disconnect() = self.disconnect() + override def reconnect() {self.reconnect()} + override def resign() {self.resign()} - def isConnected = self.isConnected.isOn + override def isConnected = self.isConnected.isOn - def getRemoteServerHostname = self.nodeAddress.hostname - def getRemoteServerPort = self.nodeAddress.port + override def getRemoteServerHostname = self.nodeAddress.hostname + override def getRemoteServerPort = self.nodeAddress.port - def getNodeName = self.nodeAddress.nodeName - def getClusterName = self.nodeAddress.clusterName - def getZooKeeperServerAddresses = self.zkServerAddresses + override def getNodeName = self.nodeAddress.nodeName + override def getClusterName = self.nodeAddress.clusterName + override def getZooKeeperServerAddresses = self.zkServerAddresses - def getMemberNodes = self.locallyCachedMembershipNodes.iterator.map(_.toString).toArray - def getLeader = self.leader.toString + override def getMemberNodes = self.locallyCachedMembershipNodes.iterator.map(_.toString).toArray + override def getLeader = self.leader.toString - def getUuidsForActorsInUse = self.uuidsForActorsInUse.map(_.toString).toArray - def getAddressesForActorsInUse = self.addressesForActorsInUse.map(_.toString).toArray + override def getUuidsForActorsInUse = self.uuidsForActorsInUse.map(_.toString).toArray + override def getAddressesForActorsInUse = self.addressesForActorsInUse.map(_.toString).toArray - def getUuidsForClusteredActors = self.uuidsForClusteredActors.map(_.toString).toArray - def getAddressesForClusteredActors = self.addressesForClusteredActors.map(_.toString).toArray + override def getUuidsForClusteredActors = self.uuidsForClusteredActors.map(_.toString).toArray + override def getAddressesForClusteredActors = self.addressesForClusteredActors.map(_.toString).toArray - def getNodesForActorInUseWithUuid(uuid: String) = self.nodesForActorsInUseWithUuid(stringToUuid(uuid)) - def getNodesForActorInUseWithAddress(id: String) = self.nodesForActorsInUseWithAddress(id) + override def getNodesForActorInUseWithUuid(uuid: String) = self.nodesForActorsInUseWithUuid(stringToUuid(uuid)) + override def getNodesForActorInUseWithAddress(id: String) = self.nodesForActorsInUseWithAddress(id) - def getUuidsForActorsInUseOnNode(nodeName: String) = self.uuidsForActorsInUseOnNode(nodeName).map(_.toString).toArray - def getAddressesForActorsInUseOnNode(nodeName: String) = self.addressesForActorsInUseOnNode(nodeName).map(_.toString).toArray + override def getUuidsForActorsInUseOnNode(nodeName: String) = self.uuidsForActorsInUseOnNode(nodeName).map(_.toString).toArray + override def getAddressesForActorsInUseOnNode(nodeName: String) = self.addressesForActorsInUseOnNode(nodeName).map(_.toString).toArray - def setConfigElement(key: String, value: String) = self.setConfigElement(key, value.getBytes("UTF-8")) - def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8") - def removeConfigElement(key: String) = self.removeConfigElement(key) - def getConfigElementKeys = self.getConfigElementKeys.toArray + override def setConfigElement(key: String, value: String) {self.setConfigElement(key, value.getBytes("UTF-8"))} + override def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8") + override def removeConfigElement(key: String) { self.removeConfigElement(key)} + override def getConfigElementKeys =self.getConfigElementKeys.toArray } JMX.register(clusterJmxObjectName, clusterMBean) @@ -1411,24 +1452,28 @@ class ClusterNode private[akka] ( * @author Jonas Bonér */ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with ErrorHandler { - def handleChildChange(parentPath: String, currentChilds: JList[String]) = withErrorHandler { - if (currentChilds ne null) { - val childList = currentChilds.toList - if (!childList.isEmpty) EventHandler.debug(this, - "MembershipChildListener at [%s] has children [%s]" - .format(self.nodeAddress.nodeName, childList.mkString(" "))) - self.findNewlyConnectedMembershipNodes(childList) foreach { name => - self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map - self.publish(Cluster.NodeConnected(name)) - } + def handleChildChange(parentPath: String, currentChilds: JList[String]) { + withErrorHandler { + if (currentChilds ne null) { + val childList = currentChilds.toList + if (!childList.isEmpty) EventHandler.debug(this, + "MembershipChildListener at [%s] has children [%s]" + .format(self.nodeAddress.nodeName, childList.mkString(" "))) + self.findNewlyConnectedMembershipNodes(childList) foreach { + name => + self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map + self.publish(Cluster.NodeConnected(name)) + } - self.findNewlyDisconnectedMembershipNodes(childList) foreach { name => - self.nodeNameToAddress.remove(name) // update 'nodename-address' map - self.publish(Cluster.NodeDisconnected(name)) - } + self.findNewlyDisconnectedMembershipNodes(childList) foreach { + name => + self.nodeNameToAddress.remove(name) // update 'nodename-address' map + self.publish(Cluster.NodeDisconnected(name)) + } - self.locallyCachedMembershipNodes.clear - childList.foreach(self.locallyCachedMembershipNodes.add) + self.locallyCachedMembershipNodes.clear() + childList.foreach(self.locallyCachedMembershipNodes.add) + } } } } @@ -1437,27 +1482,26 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E * @author Jonas Bonér */ class StateListener(self: ClusterNode) extends IZkStateListener { - def handleStateChanged(state: KeeperState) = state match { - - case KeeperState.SyncConnected => - EventHandler.debug(this, "Cluster node [%s] - Connected".format(self.nodeAddress)) - self.publish(Cluster.ThisNode.Connected) - - case KeeperState.Disconnected => - EventHandler.debug(this, "Cluster node [%s] - Disconnected".format(self.nodeAddress)) - self.publish(Cluster.ThisNode.Disconnected) - - case KeeperState.Expired => - EventHandler.debug(this, "Cluster node [%s] - Expired".format(self.nodeAddress)) - self.publish(Cluster.ThisNode.Expired) + def handleStateChanged(state: KeeperState) { + state match { + case KeeperState.SyncConnected => + EventHandler.debug(this, "Cluster node [%s] - Connected".format(self.nodeAddress)) + self.publish(Cluster.ThisNode.Connected) + case KeeperState.Disconnected => + EventHandler.debug(this, "Cluster node [%s] - Disconnected".format(self.nodeAddress)) + self.publish(Cluster.ThisNode.Disconnected) + case KeeperState.Expired => + EventHandler.debug(this, "Cluster node [%s] - Expired".format(self.nodeAddress)) + self.publish(Cluster.ThisNode.Expired) + } } /** * Re-initialize after the zookeeper session has expired and a new session has been created. */ - def handleNewSession = { + def handleNewSession() { EventHandler.debug(this, "Session expired re-initializing node [%s]".format(self.nodeAddress)) - self.initializeNode + self.initializeNode() self.publish(Cluster.NewSession) } } @@ -1521,15 +1565,15 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { else EventHandler.warning(this, "None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message)) - case START => cluster.start + case START => cluster.start() - case STOP => cluster.stop + case STOP => cluster.stop() - case DISCONNECT => cluster.disconnect + case DISCONNECT => cluster.disconnect() - case RECONNECT => cluster.reconnect + case RECONNECT => cluster.reconnect() - case RESIGN => cluster.resign + case RESIGN => cluster.resign() case FAIL_OVER_CONNECTIONS => val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)]) @@ -1539,7 +1583,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { actorOf(new Actor() { self.dispatcher = functionServerDispatcher def receive = { - case f: Function0[Unit] => try { f() } finally { self.stop } + case f: Function0[Unit] => try { f() } finally { self.stop() } } }).start ! payloadFor(message, classOf[Function0[Unit]]) @@ -1547,7 +1591,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { actorOf(new Actor() { self.dispatcher = functionServerDispatcher def receive = { - case f: Function0[Any] => try { self.reply(f()) } finally { self.stop } + case f: Function0[Any] => try { self.reply(f()) } finally { self.stop() } } }).start forward payloadFor(message, classOf[Function0[Any]]) @@ -1555,7 +1599,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { actorOf(new Actor() { self.dispatcher = functionServerDispatcher def receive = { - case t: Tuple2[Function1[Any, Unit], Any] => try { t._1(t._2) } finally { self.stop } + case t: Tuple2[Function1[Any, Unit], Any] => try { t._1(t._2) } finally { self.stop() } } }).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) @@ -1563,7 +1607,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { actorOf(new Actor() { self.dispatcher = functionServerDispatcher def receive = { - case t: Tuple2[Function1[Any, Any], Any] => try { self.reply(t._1(t._2)) } finally { self.stop } + case t: Tuple2[Function1[Any, Any], Any] => try { self.reply(t._1(t._2)) } finally { self.stop() } } }).start forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) }