From 4717694440a5b4d3efb91e0e3b50c8d2596d0139 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 12 Jul 2010 22:05:18 +0200 Subject: [PATCH 1/6] Switching ActorRegistry storage solution --- akka-core/src/main/scala/actor/ActorRegistry.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 20855c18d5..c568c8de03 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor import scala.collection.mutable.ListBuffer import scala.reflect.Manifest -import java.util.concurrent.{CopyOnWriteArraySet, ConcurrentHashMap} +import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} import java.util.{Set=>JSet} import se.scalablesolutions.akka.util.ListenerManagement @@ -29,6 +29,11 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * @author Jonas Bonér */ object ActorRegistry extends ListenerManagement { + + private val refComparator = new java.util.Comparator[ActorRef]{ + def compare(a: ActorRef,b: ActorRef) = a.uuid.compareTo(b.uuid) + } + private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]] private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]] @@ -117,7 +122,7 @@ object ActorRegistry extends ListenerManagement { if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) if (actorsById.containsKey(id)) actorsById.get(id).add(actor) else { - val set = new CopyOnWriteArraySet[ActorRef] + val set = new ConcurrentSkipListSet[ActorRef](refComparator) set.add(actor) actorsById.put(id, set) } @@ -126,7 +131,7 @@ object ActorRegistry extends ListenerManagement { val className = actor.actor.getClass.getName if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor) else { - val set = new CopyOnWriteArraySet[ActorRef] + val set = new ConcurrentSkipListSet[ActorRef](refComparator) set.add(actor) actorsByClassName.put(className, set) } From abb186630c4c19f540876f38dff1aa9632de385a Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 12 Jul 2010 23:17:08 +0200 Subject: [PATCH 2/6] Closing ticket 294 --- .../scala/FaultTolerantConnectionActor.scala | 2 +- akka-amqp/src/main/scala/RpcClientActor.scala | 2 +- akka-core/src/main/scala/actor/Actor.scala | 16 ---------------- 3 files changed, 2 insertions(+), 18 deletions(-) diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala index 1bf8e2e088..32687ee8dd 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -38,7 +38,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio } case None => { log.warning("Unable to create new channel - no connection") - reply(None) + self.reply(None) } } } diff --git a/akka-amqp/src/main/scala/RpcClientActor.scala b/akka-amqp/src/main/scala/RpcClientActor.scala index 972eac0586..8ff7d8a0ac 100644 --- a/akka-amqp/src/main/scala/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/RpcClientActor.scala @@ -26,7 +26,7 @@ class RpcClientActor(exchangeParameters: ExchangeParameters, rpcClient match { case Some(client) => val response: Array[Byte] = client.primitiveCall(inSerializer.toBinary(payload)) - reply(outSerializer.fromBinary(response, None)) + self.reply(outSerializer.fromBinary(response, None)) case None => error("%s has no client to send messages with".format(this)) } } diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index b38401a4a6..e1227168b2 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -408,22 +408,6 @@ trait Actor extends Logging { */ def initTransactionalState {} - /** - * Use reply(..) to reply with a message to the original sender of the message currently - * being processed. - *

- * Throws an IllegalStateException if unable to determine what to reply to. - */ - def reply(message: Any) = self.reply(message) - - /** - * Use reply_?(..) to reply with a message to the original sender of the message currently - * being processed. - *

- * Returns true if reply was sent, and false if unable to determine what to reply to. - */ - def reply_?(message: Any): Boolean = self.reply_?(message) - /** * Is the actor able to handle the message passed in as arguments? */ From 273de9e8c5d5db220ce29bcbdc5c0e9871a1efff Mon Sep 17 00:00:00 2001 From: momania Date: Wed, 14 Jul 2010 09:39:17 +0200 Subject: [PATCH 3/6] small refactor - use patternmatching better --- akka-core/src/main/scala/actor/ActorRef.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 7543817343..9035f6ad24 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -1057,18 +1057,19 @@ sealed class LocalActorRef private[akka]( protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { - if (faultHandler.isDefined) { - faultHandler.get match { - // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy - case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => - restartLinkedActors(reason) + faultHandler match { + // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy + case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) => + restartLinkedActors(reason) - case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => - dead.restart(reason) - } - } else throw new IllegalActorStateException( - "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + - "\n\tto non-empty list of exception classes - can't proceed " + toString) + case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) => + dead.restart(reason) + + case None => + throw new IllegalActorStateException( + "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + + "\n\tto non-empty list of exception classes - can't proceed " + toString) + } } else { _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on } From 0f10d447830423b5149a4cd15f25b3a7be5ccabe Mon Sep 17 00:00:00 2001 From: momania Date: Wed, 14 Jul 2010 10:02:07 +0200 Subject: [PATCH 4/6] Test #328 --- akka-core/src/main/scala/actor/ActorRef.scala | 3 +- .../test/scala/SupervisorHierarchySpec.scala | 53 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 akka-core/src/test/scala/SupervisorHierarchySpec.scala diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 9035f6ad24..8a95548b28 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -1071,7 +1071,8 @@ sealed class LocalActorRef private[akka]( "\n\tto non-empty list of exception classes - can't proceed " + toString) } } else { - _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on + if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle + _supervisor.foreach(_ ! Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on } } diff --git a/akka-core/src/test/scala/SupervisorHierarchySpec.scala b/akka-core/src/test/scala/SupervisorHierarchySpec.scala new file mode 100644 index 0000000000..75751e3d58 --- /dev/null +++ b/akka-core/src/test/scala/SupervisorHierarchySpec.scala @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import java.lang.Throwable +import Actor._ +import se.scalablesolutions.akka.config.OneForOneStrategy +import java.util.concurrent.{TimeUnit, CountDownLatch} + +class SupervisorHierarchySpec extends JUnitSuite { + + @Test + def killWorkerShouldRestartMangerAndOtherWorkers = { + val countDown = new CountDownLatch(4) + + val workerOne = actorOf(new CountDownActor(countDown)) + val workerTwo = actorOf(new CountDownActor(countDown)) + val workerThree = actorOf(new CountDownActor( countDown)) + + val boss = actorOf(new Actor{ + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 1000)) + + protected def receive = { case _ => () } + }).start + + val manager = actorOf(new CountDownActor(countDown)) + boss.startLink(manager) + + manager.startLink(workerOne) + manager.startLink(workerTwo) + manager.startLink(workerThree) + + workerOne ! Exit(workerOne, new RuntimeException("Fire the worker!")) + + // manager + all workers should be restarted by only killing a worker + // manager doesn't trap exits, so boss will restart manager + + assert(countDown.await(4, TimeUnit.SECONDS)) + } + + class CountDownActor(countDown: CountDownLatch) extends Actor { + + protected def receive = { case _ => () } + + override def postRestart(reason: Throwable) = countDown.countDown + } +} + From 7c347637a001b70843207850ddf85fefcfd3587a Mon Sep 17 00:00:00 2001 From: momania Date: Wed, 14 Jul 2010 10:18:01 +0200 Subject: [PATCH 5/6] - make consumer restart when delegated handling actor fails - made single object to flag test enable/disable --- akka-amqp/src/main/scala/AMQP.scala | 3 +-- .../scala/FaultTolerantConnectionActor.scala | 4 ++++ .../test/scala/AMQPConnectionRecoveryTest.scala | 5 +++-- .../scala/AMQPConsumerChannelRecoveryTest.scala | 4 ++-- .../AMQPConsumerConnectionRecoveryTest.scala | 4 ++-- .../AMQPConsumerManualAcknowledgeTest.scala | 16 +++++++++++----- .../src/test/scala/AMQPConsumerMessageTest.scala | 4 ++-- .../scala/AMQPProducerChannelRecoveryTest.scala | 4 ++-- .../AMQPProducerConnectionRecoveryTest.scala | 4 ++-- .../src/test/scala/AMQPProducerMessageTest.scala | 4 ++-- .../src/test/scala/AMQPRpcClientServerTest.scala | 6 +++--- akka-amqp/src/test/scala/AMQPTest.scala | 9 +++++++++ 12 files changed, 43 insertions(+), 24 deletions(-) create mode 100644 akka-amqp/src/test/scala/AMQPTest.scala diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 974a7196a6..be3556e812 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -9,8 +9,7 @@ import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException -import se.scalablesolutions.akka.util.{Logging} -import java.util.UUID +import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.serialization.Serializer /** diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala index 32687ee8dd..5f0a49910e 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -10,6 +10,7 @@ import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} +import se.scalablesolutions.akka.config.OneForOneStrategy private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor { import connectionParameters._ @@ -17,6 +18,9 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio self.id = "amqp-connection-%s".format(host) self.lifeCycle = Some(LifeCycle(Permanent)) + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 5000)) + val reconnectionTimer = new Timer("%s-timer".format(self.id)) val connectionFactory: ConnectionFactory = new ConnectionFactory() diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala index 72ccab3cc1..3bc2cb20dd 100644 --- a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala @@ -17,8 +17,9 @@ import org.scalatest.matchers.MustMatchers class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def connectionAndRecovery = { + @Test + def connectionAndRecovery = if (AMQPTest.enabled) { + val connectedLatch = new StandardLatch val reconnectingLatch = new StandardLatch val reconnectedLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala index b9bf0e3dbe..0f6fadfcc4 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala @@ -18,8 +18,8 @@ import se.scalablesolutions.akka.amqp.AMQP._ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerChannelRecovery = { + @Test + def consumerChannelRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala index 34a135f091..9dccd43be8 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala @@ -18,8 +18,8 @@ import se.scalablesolutions.akka.amqp.AMQP._ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerConnectionRecovery = { + @Test + def consumerConnectionRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala index c616630317..d48f38afc5 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessageManualAcknowledge = { + @Test + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { val countDown = new CountDownLatch(2) @@ -30,15 +30,21 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + val failLatch = new StandardLatch val acknowledgeLatch = new StandardLatch var deliveryTagCheck: Long = -1 val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.ack.this", actor { case Delivery(payload, _, deliveryTag, _, sender) => { - deliveryTagCheck = deliveryTag - sender.foreach(_ ! Acknowledge(deliveryTag)) + if (!failLatch.isOpen) { + failLatch.open + error("Make it fail!") + } else { + deliveryTagCheck = deliveryTag + sender.foreach(_ ! Acknowledge(deliveryTag)) + } } case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open - }, selfAcknowledging = false, channelParameters = Some(channelParameters))) + }, queueName = Some("self.ack.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters))) diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala index dd01e4729a..af94b0a515 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala @@ -16,8 +16,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessage = { + @Test + def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala index b2ad2e2e58..095a21fc86 100644 --- a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerChannelRecovery = { + @Test + def producerChannelRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala index c0463469c9..71bc08bdaa 100644 --- a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerConnectionRecovery = { + @Test + def producerConnectionRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala index d426031230..ab9bb00e7c 100644 --- a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala @@ -19,8 +19,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParamete class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerMessage = { + @Test + def producerMessage = if (AMQPTest.enabled) { val connection: ActorRef = AMQP.newConnection() try { diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index 08f9f47a32..eebcfccce3 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -16,8 +16,8 @@ import se.scalablesolutions.akka.serialization.Serializer class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessage = { + @Test + def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { @@ -28,7 +28,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging case Stopped => () } - val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Topic) + val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val stringSerializer = new Serializer { def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/AMQPTest.scala new file mode 100644 index 0000000000..e50ab673f6 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPTest.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp.test + +object AMQPTest { + def enabled = false +} \ No newline at end of file From 5a6783f8468129ea6e35975c93ac639455a931fe Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 14 Jul 2010 20:08:08 +0200 Subject: [PATCH 6/6] Laying the foundation for current-message-resend --- akka-core/src/main/scala/actor/ActorRef.scala | 34 ++++++++++++++----- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 8a95548b28..2472ea924d 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -197,10 +197,13 @@ trait ActorRef extends TransactionManagement { */ protected[akka] val dispatcherLock = new ReentrantLock - protected[akka] var _sender: Option[ActorRef] = None - protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None - protected[akka] def sender_=(s: Option[ActorRef]) = guard.withGuard { _sender = s } - protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = guard.withGuard { _senderFuture = sf } + /** + * This is a reference to the message currently being processed by the actor + */ + protected[akka] var _currentMessage: Option[MessageInvocation] = None + + protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg } + protected[akka] def currentMessage = guard.withGuard { _currentMessage } /** * Returns the uuid for the actor. @@ -211,13 +214,27 @@ trait ActorRef extends TransactionManagement { * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ - def sender: Option[ActorRef] = guard.withGuard { _sender } + def sender: Option[ActorRef] = { + //Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } + val msg = currentMessage + if(msg.isEmpty) + None + else + msg.get.sender + } /** * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def senderFuture: Option[CompletableFuture[Any]] = guard.withGuard { _senderFuture } + def senderFuture: Option[CompletableFuture[Any]] = { + //Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } + val msg = currentMessage + if(msg.isEmpty) + None + else + msg.get.senderFuture + } /** * Is the actor being restarted? @@ -992,14 +1009,15 @@ sealed class LocalActorRef private[akka]( Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) return } - sender = messageHandle.sender - senderFuture = messageHandle.senderFuture + currentMessage = Option(messageHandle) try { dispatch(messageHandle) } catch { case e => Actor.log.error(e, "Could not invoke actor [%s]", this) throw e + } finally { + currentMessage = None //TODO: Don't reset this, we might want to resend the message } }