/** * Copyright (C) 2009-2010 Scalable Solutions AB */ package se.scalablesolutions.akka.dispatch import se.scalablesolutions.akka.actor.{ActorRegistry, ActorRef, Uuid, ActorInitializationException} import org.multiverse.commitbarriers.CountDownCommitBarrier import java.util.concurrent._ import se.scalablesolutions.akka.util. {Logging, HashCode} /** * @author Jonas Bonér */ final class MessageInvocation(val receiver: ActorRef, val message: Any, val sender: Option[ActorRef], val senderFuture: Option[CompletableFuture[Any]], val transactionSet: Option[CountDownCommitBarrier]) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") def invoke = try { receiver.invoke(this) } catch { case e: NullPointerException => throw new ActorInitializationException( "Don't call 'self ! message' in the Actor's constructor (in Scala this means in the body of the class).") } override def hashCode(): Int = { var result = HashCode.SEED result = HashCode.hash(result, receiver.actor) result = HashCode.hash(result, message.asInstanceOf[AnyRef]) result } override def equals(that: Any): Boolean = { that.isInstanceOf[MessageInvocation] && that.asInstanceOf[MessageInvocation].receiver.actor == receiver.actor && that.asInstanceOf[MessageInvocation].message == message } override def toString = { "MessageInvocation[" + "\n\tmessage = " + message + "\n\treceiver = " + receiver + "\n\tsender = " + sender + "\n\tsenderFuture = " + senderFuture + "\n\ttransactionSet = " + transactionSet + "]" } } /** * @author Jonas Bonér */ trait MessageDispatcher extends MailboxFactory with Logging { protected val uuids = new ConcurrentSkipListSet[Uuid] def register(actorRef: ActorRef) { start if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) uuids add actorRef.uuid } def unregister(actorRef: ActorRef) = { uuids remove actorRef.uuid actorRef.mailbox = null if (uuids.isEmpty) shutdown // shut down in the dispatcher's references is zero } def stopAllLinkedActors { val i = uuids.iterator while(i.hasNext()) { val uuid = i.next() i.remove() ActorRegistry.actorFor(uuid) match { case Some(actor) => actor.stop case None => log.warn("stopAllLinkedActors couldn't find linked actor: " + uuid) } } if(uuids.isEmpty) shutdown } def suspend(actorRef: ActorRef): Unit def resume(actorRef: ActorRef): Unit def dispatch(invocation: MessageInvocation): Unit protected def start: Unit protected def shutdown: Unit /** * Returns the size of the mailbox for the specified actor */ def mailboxSize(actorRef: ActorRef): Int }