diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index b8a672aaf1..a9085d2a81 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -127,10 +127,10 @@ object ActorModelSpec { super.unregister(actor) } - protected[akka] abstract override def dispatch(invocation: Envelope) { - val stats = getStats(invocation.receiver.self) + protected[akka] abstract override def dispatch(receiver: ActorCell, invocation: Envelope) { + val stats = getStats(receiver.self) stats.msgsReceived.incrementAndGet() - super.dispatch(invocation) + super.dispatch(receiver, invocation) } protected[akka] abstract override def start() { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index b51896f659..b4cc86cca2 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -80,12 +80,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn result } - def createMessageInvocation(msg: Any): Envelope = { - new Envelope( - actorOf(new Actor { //Dummy actor - def receive = { case _ ⇒ } - }).asInstanceOf[LocalActorRef].underlying, msg, NullChannel) - } + def createMessageInvocation(msg: Any): Envelope = Envelope(msg, NullChannel) def ensureInitialMailboxState(config: MailboxType, q: Mailbox) { q must not be null diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 910777685c..392e935b4a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -278,7 +278,7 @@ private[akka] class ActorCell( //TODO FIXME remove this method def supervisor: Option[ActorRef] = props.supervisor - def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher dispatch Envelope(this, message, channel) + def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher.dispatch(this, Envelope(message, channel)) def postMessageToMailboxAndCreateFutureResultWithTimeout( message: Any, @@ -288,7 +288,7 @@ private[akka] class ActorCell( case f: ActorPromise ⇒ f case _ ⇒ new ActorPromise(timeout)(dispatcher) } - dispatcher dispatch Envelope(this, message, future) + dispatcher.dispatch(this, Envelope(message, future)) future } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 58502fa334..d7bdb68229 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -17,13 +17,7 @@ import scala.annotation.tailrec /** * @author Jonas Bonér */ -final case class Envelope(val receiver: ActorCell, val message: Any, val channel: UntypedChannel) { - if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") - - final def invoke() { - receiver invoke this - } -} +final case class Envelope(val message: Any, val channel: UntypedChannel) object SystemMessage { @tailrec @@ -295,7 +289,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable /** * Will be called when the dispatcher is to queue an invocation for execution */ - protected[akka] def dispatch(invocation: Envelope) + protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) /** * Suggest to register the provided mailbox for execution diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index f38e8d8f54..f38e3a657e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -57,12 +57,7 @@ class BalancingDispatcher( class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { final def enqueue(handle: Envelope) = messageQueue.enqueue(handle) - final def dequeue(): Envelope = { - val envelope = messageQueue.dequeue() - if (envelope eq null) null - else if (envelope.receiver eq actor) envelope - else envelope.copy(receiver = actor) - } + final def dequeue(): Envelope = messageQueue.dequeue() final def numberOfMessages: Int = messageQueue.numberOfMessages @@ -106,8 +101,7 @@ class BalancingDispatcher( } else true } - override protected[akka] def dispatch(invocation: Envelope) = { - val receiver = invocation.receiver + override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { messageQueue enqueue invocation val buddy = buddies.pollFirst() diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 53d28a79a3..f9819c3cd3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -78,8 +78,8 @@ class Dispatcher( protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) - protected[akka] def dispatch(invocation: Envelope) = { - val mbox = invocation.receiver.mailbox + protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { + val mbox = receiver.mailbox mbox enqueue invocation registerForExecution(mbox, true, false) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index df730efd8c..6ea3afb3d7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -170,7 +170,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag var processedMessages = 0 val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 do { - nextMessage.invoke + actor invoke nextMessage processAllSystemMessages() //After we're done, process all system messages @@ -182,7 +182,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } else null //Abort } while (nextMessage ne null) } else { //If we only run one message per process - nextMessage.invoke //Just run it + actor invoke nextMessage //Just run it processAllSystemMessages() //After we're done, process all system messages } } diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index 2e4d177aba..6a8b431e39 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -101,7 +101,7 @@ class ActorSerialization(val app: AkkaApplication, remote: RemoteSupport) { l map { m ⇒ remoteActorSerialization.createRemoteMessageProtocolBuilder( - Option(m.receiver.self), + localRef, Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 829cce72c3..b6cb07cb02 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -150,20 +150,20 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling } } - protected[akka] override def dispatch(handle: Envelope) { - val mbox = getMailbox(handle.receiver) + protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) { + val mbox = getMailbox(receiver) val queue = mbox.queue val execute = mbox.suspendSwitch.fold { queue.push(handle) if (warnings && handle.channel.isInstanceOf[Promise[_]]) { - app.eventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format handle.receiver) + app.eventHandler.warning(this, "suspendSwitch, creating Future could deadlock; target: %s" format receiver) } false } { queue.push(handle) if (queue.isActive) { if (warnings && handle.channel.isInstanceOf[Promise[_]]) { - app.eventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format handle.receiver) + app.eventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format receiver) } false } else { @@ -200,10 +200,10 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling } if (handle ne null) { try { - handle.invoke + mbox.actor.invoke(handle) if (warnings) handle.channel match { case f: ActorPromise if !f.isCompleted ⇒ - app.eventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message)) + app.eventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (mbox.actor, handle.message)) case _ ⇒ } true