diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 974a7196a6..be3556e812 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -9,8 +9,7 @@ import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} import java.lang.IllegalArgumentException -import se.scalablesolutions.akka.util.{Logging} -import java.util.UUID +import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.serialization.Serializer /** diff --git a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala index 32687ee8dd..5f0a49910e 100644 --- a/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/FaultTolerantConnectionActor.scala @@ -10,6 +10,7 @@ import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ConnectionParameters import se.scalablesolutions.akka.actor.{Exit, Actor} import se.scalablesolutions.akka.config.ScalaConfig.{Permanent, LifeCycle} +import se.scalablesolutions.akka.config.OneForOneStrategy private[amqp] class FaultTolerantConnectionActor(connectionParameters: ConnectionParameters) extends Actor { import connectionParameters._ @@ -17,6 +18,9 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio self.id = "amqp-connection-%s".format(host) self.lifeCycle = Some(LifeCycle(Permanent)) + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 5000)) + val reconnectionTimer = new Timer("%s-timer".format(self.id)) val connectionFactory: ConnectionFactory = new ConnectionFactory() diff --git a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala index 72ccab3cc1..3bc2cb20dd 100644 --- a/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConnectionRecoveryTest.scala @@ -17,8 +17,9 @@ import org.scalatest.matchers.MustMatchers class AMQPConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def connectionAndRecovery = { + @Test + def connectionAndRecovery = if (AMQPTest.enabled) { + val connectedLatch = new StandardLatch val reconnectingLatch = new StandardLatch val reconnectedLatch = new StandardLatch diff --git a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala index b9bf0e3dbe..0f6fadfcc4 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerChannelRecoveryTest.scala @@ -18,8 +18,8 @@ import se.scalablesolutions.akka.amqp.AMQP._ class AMQPConsumerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerChannelRecovery = { + @Test + def consumerChannelRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala index 34a135f091..9dccd43be8 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerConnectionRecoveryTest.scala @@ -18,8 +18,8 @@ import se.scalablesolutions.akka.amqp.AMQP._ class AMQPConsumerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerConnectionRecovery = { + @Test + def consumerConnectionRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala index c616630317..d48f38afc5 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerManualAcknowledgeTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessageManualAcknowledge = { + @Test + def consumerMessageManualAcknowledge = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { val countDown = new CountDownLatch(2) @@ -30,15 +30,21 @@ class AMQPConsumerManualAcknowledgeTest extends JUnitSuite with MustMatchers wit val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) + val failLatch = new StandardLatch val acknowledgeLatch = new StandardLatch var deliveryTagCheck: Long = -1 val consumer:ActorRef = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "manual.ack.this", actor { case Delivery(payload, _, deliveryTag, _, sender) => { - deliveryTagCheck = deliveryTag - sender.foreach(_ ! Acknowledge(deliveryTag)) + if (!failLatch.isOpen) { + failLatch.open + error("Make it fail!") + } else { + deliveryTagCheck = deliveryTag + sender.foreach(_ ! Acknowledge(deliveryTag)) + } } case Acknowledged(deliveryTag) => if (deliveryTagCheck == deliveryTag) acknowledgeLatch.open - }, selfAcknowledging = false, channelParameters = Some(channelParameters))) + }, queueName = Some("self.ack.queue"), selfAcknowledging = false, queueAutoDelete = false, channelParameters = Some(channelParameters))) val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters, channelParameters = Some(channelParameters))) diff --git a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala index dd01e4729a..af94b0a515 100644 --- a/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPConsumerMessageTest.scala @@ -16,8 +16,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ConsumerParamete class AMQPConsumerMessageTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessage = { + @Test + def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { diff --git a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala index b2ad2e2e58..095a21fc86 100644 --- a/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerChannelRecoveryTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter class AMQPProducerChannelRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerChannelRecovery = { + @Test + def producerChannelRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) diff --git a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala index c0463469c9..71bc08bdaa 100644 --- a/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerConnectionRecoveryTest.scala @@ -17,8 +17,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameter class AMQPProducerConnectionRecoveryTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerConnectionRecovery = { + @Test + def producerConnectionRecovery = if (AMQPTest.enabled) { val connection = AMQP.newConnection(ConnectionParameters(initReconnectDelay = 50)) try { diff --git a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala index d426031230..ab9bb00e7c 100644 --- a/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala +++ b/akka-amqp/src/test/scala/AMQPProducerMessageTest.scala @@ -19,8 +19,8 @@ import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ProducerParamete class AMQPProducerMessageTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def producerMessage = { + @Test + def producerMessage = if (AMQPTest.enabled) { val connection: ActorRef = AMQP.newConnection() try { diff --git a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala index 08f9f47a32..eebcfccce3 100644 --- a/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala +++ b/akka-amqp/src/test/scala/AMQPRpcClientServerTest.scala @@ -16,8 +16,8 @@ import se.scalablesolutions.akka.serialization.Serializer class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging { -// @Test - def consumerMessage = { + @Test + def consumerMessage = if (AMQPTest.enabled) { val connection = AMQP.newConnection() try { @@ -28,7 +28,7 @@ class AMQPRpcClientServerTest extends JUnitSuite with MustMatchers with Logging case Stopped => () } - val exchangeParameters = ExchangeParameters("text_exchange",ExchangeType.Topic) + val exchangeParameters = ExchangeParameters("text_topic_exchange", ExchangeType.Topic) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) val stringSerializer = new Serializer { def fromBinary(bytes: Array[Byte], clazz: Option[Class[_]]) = new String(bytes) diff --git a/akka-amqp/src/test/scala/AMQPTest.scala b/akka-amqp/src/test/scala/AMQPTest.scala new file mode 100644 index 0000000000..e50ab673f6 --- /dev/null +++ b/akka-amqp/src/test/scala/AMQPTest.scala @@ -0,0 +1,9 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.amqp.test + +object AMQPTest { + def enabled = false +} \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 3f2cb3d149..76adf9c729 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -46,8 +46,9 @@ case class Exit(dead: ActorRef, killer: Throwable) extends LifeCycleMessage case class Link(child: ActorRef) extends LifeCycleMessage case class Unlink(child: ActorRef) extends LifeCycleMessage case class UnlinkAndStop(child: ActorRef) extends LifeCycleMessage -case object Kill extends LifeCycleMessage case object ReceiveTimeout extends LifeCycleMessage +case class MaximumNumberOfRestartsWithinTimeRangeReached( + victim: ActorRef, maxNrOfRetries: Int, withinTimeRange: Int, lastExceptionCausingRestart: Throwable) extends LifeCycleMessage // Exceptions for Actors class ActorStartException private[akka](message: String) extends RuntimeException(message) @@ -427,12 +428,11 @@ trait Actor extends Logging { private val lifeCycles: Receive = { case HotSwap(code) => self.hotswap = code; self.checkReceiveTimeout // FIXME : how to reschedule receivetimeout on hotswap? - case Restart(reason) => self.restart(reason) case Exit(dead, reason) => self.handleTrapExit(dead, reason) case Link(child) => self.link(child) case Unlink(child) => self.unlink(child) case UnlinkAndStop(child) => self.unlink(child); child.stop - case Kill => throw new ActorKilledException("Actor [" + toString + "] was killed by a Kill message") + case Restart(reason) => throw reason } } diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 003ea772c4..0d70643dc8 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -196,10 +196,13 @@ trait ActorRef extends TransactionManagement { */ protected[akka] val dispatcherLock = new ReentrantLock - @volatile protected[akka] var _sender: Option[ActorRef] = None - @volatile protected[akka] var _senderFuture: Option[CompletableFuture[Any]] = None - protected[akka] def sender_=(s: Option[ActorRef]) = _sender = s - protected[akka] def senderFuture_=(sf: Option[CompletableFuture[Any]]) = _senderFuture = sf + /** + * This is a reference to the message currently being processed by the actor + */ + protected[akka] var _currentMessage: Option[MessageInvocation] = None + + protected[akka] def currentMessage_=(msg: Option[MessageInvocation]) = guard.withGuard { _currentMessage = msg } + protected[akka] def currentMessage = guard.withGuard { _currentMessage } /** * Returns the uuid for the actor. @@ -210,13 +213,23 @@ trait ActorRef extends TransactionManagement { * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, else None. */ - def sender: Option[ActorRef] = _sender + def sender: Option[ActorRef] = { + // Five lines of map-performance-avoidance, could be just: currentMessage map { _.sender } + val msg = currentMessage + if(msg.isEmpty) None + else msg.get.sender + } /** * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def senderFuture: Option[CompletableFuture[Any]] = _senderFuture + def senderFuture: Option[CompletableFuture[Any]] = { + // Five lines of map-performance-avoidance, could be just: currentMessage map { _.senderFuture } + val msg = currentMessage + if(msg.isEmpty) None + else msg.get.senderFuture + } /** * Is the actor being restarted? @@ -424,7 +437,7 @@ trait ActorRef extends TransactionManagement { /** * Starts up the actor and its message queue. */ - def start: ActorRef + def start(): ActorRef /** * Shuts down the actor its dispatcher and message queue. @@ -531,11 +544,11 @@ trait ActorRef extends TransactionManagement { protected[akka] def mailbox: Deque[MessageInvocation] - protected[akka] def restart(reason: Throwable): Unit - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit - protected[akka] def restartLinkedActors(reason: Throwable): Unit + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit + + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit protected[akka] def registerSupervisorAsRemoteActor: Option[String] @@ -584,7 +597,9 @@ sealed class LocalActorRef private[akka]( @volatile private var runActorInitialization = false @volatile private var isDeserialized = false @volatile private var loader: Option[ClassLoader] = None - + @volatile private var maxNrOfRetriesCount: Int = 0 + @volatile private var restartsWithinTimeRangeTimestamp: Long = 0L + protected[akka] val _mailbox: Deque[MessageInvocation] = new ConcurrentLinkedDeque[MessageInvocation] protected[this] val actorInstance = guard.withGuard { new AtomicReference[Actor](newActor) } @@ -643,11 +658,6 @@ sealed class LocalActorRef private[akka]( // ========= PUBLIC FUNCTIONS ========= - /** - * Returns the mailbox. - */ - def mailbox: Deque[MessageInvocation] = _mailbox - /** * Returns the class for the Actor instance that is managed by the ActorRef. */ @@ -877,6 +887,11 @@ sealed class LocalActorRef private[akka]( } } + /** + * Returns the mailbox. + */ + def mailbox: Deque[MessageInvocation] = _mailbox + /** * Returns the mailbox size. */ @@ -945,76 +960,96 @@ sealed class LocalActorRef private[akka]( /** * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. */ - protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {//actor.synchronized { + protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { if (isShutdown) Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) else { - sender = messageHandle.sender - senderFuture = messageHandle.senderFuture + currentMessage = Option(messageHandle) try { dispatch(messageHandle) } catch { case e => Actor.log.error(e, "Could not invoke actor [%s]", this) throw e - } + } finally { + currentMessage = None //TODO: Don't reset this, we might want to resend the message + } } } protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { if (trapExit.exists(_.isAssignableFrom(reason.getClass))) { - if (faultHandler.isDefined) { - faultHandler.get match { - // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy - case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => - restartLinkedActors(reason) + faultHandler match { + // FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy + case Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange)) => + restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) - case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => - dead.restart(reason) - } - } else throw new IllegalActorStateException( - "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + - "\n\tto non-empty list of exception classes - can't proceed " + toString) + case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) => + dead.restart(reason, maxNrOfRetries, withinTimeRange) + + case None => + throw new IllegalActorStateException( + "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + + "\n\tto non-empty list of exception classes - can't proceed " + toString) + } } else { - _supervisor.foreach(_ ! Exit(dead, reason)) // if 'trapExit' is not defined then pass the Exit on + if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle + _supervisor.foreach(_ ! Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on } } - protected[akka] def restart(reason: Throwable): Unit = { - _isBeingRestarted = true - val failedActor = actorInstance.get - val lock = guard.lock - guard.withGuard { - lifeCycle.get match { - case LifeCycle(scope, _, _) => { - scope match { - case Permanent => - Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) - restartLinkedActors(reason) - Actor.log.debug("Restarting linked actors for actor [%s].", id) - Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - failedActor.preRestart(reason) - nullOutActorRefReferencesFor(failedActor) - val freshActor = newActor - freshActor.init - freshActor.initTransactionalState - actorInstance.set(freshActor) - Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) - freshActor.postRestart(reason) - _isBeingRestarted = false - case Temporary => shutDownTemporaryActor(this) + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = { + if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis + maxNrOfRetriesCount += 1 + if (maxNrOfRetriesCount > maxNrOfRetries || (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange) { + val message = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) + Actor.log.warning( + "Maximum number of restarts [%s] within time range [%s] reached." + + "\n\tWill *not* restart actor [%s] anymore." + + "\n\tLast exception causing restart was [%s].", + maxNrOfRetries, withinTimeRange, this, reason) + _supervisor.foreach { sup => + if (sup.isDefinedAt(message)) sup ! message + else Actor.log.warning( + "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + + "\n\tCan't send the message to the supervisor [%s].", sup) + } + } else { + _isBeingRestarted = true + val failedActor = actorInstance.get + val lock = guard.lock + guard.withGuard { + lifeCycle.get match { + case LifeCycle(scope, _, _) => { + scope match { + case Permanent => + Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) + restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + Actor.log.debug("Restarting linked actors for actor [%s].", id) + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) + failedActor.preRestart(reason) + nullOutActorRefReferencesFor(failedActor) + val freshActor = newActor + freshActor.init + freshActor.initTransactionalState + actorInstance.set(freshActor) + Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) + freshActor.postRestart(reason) + _isBeingRestarted = false + case Temporary => shutDownTemporaryActor(this) + } } } } } } - protected[akka] def restartLinkedActors(reason: Throwable) = { + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int) = { linkedActorsAsList.foreach { actorRef => if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) actorRef.lifeCycle.get match { case LifeCycle(scope, _, _) => { scope match { - case Permanent => actorRef.restart(reason) + case Permanent => actorRef.restart(reason, maxNrOfRetries, withinTimeRange) case Temporary => shutDownTemporaryActor(actorRef) } } @@ -1289,9 +1324,9 @@ private[akka] case class RemoteActorRef private[akka] ( def supervisor: Option[ActorRef] = unsupported def shutdownLinkedActors: Unit = unsupported protected[akka] def mailbox: Deque[MessageInvocation] = unsupported - protected[akka] def restart(reason: Throwable): Unit = unsupported protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = unsupported - protected[akka] def restartLinkedActors(reason: Throwable): Unit = unsupported + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported + protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = unsupported protected[akka] def linkedActors: JMap[String, ActorRef] = unsupported protected[akka] def linkedActorsAsList: List[ActorRef] = unsupported protected[akka] def invoke(messageHandle: MessageInvocation): Unit = unsupported diff --git a/akka-core/src/main/scala/actor/ActorRegistry.scala b/akka-core/src/main/scala/actor/ActorRegistry.scala index 20855c18d5..c568c8de03 100644 --- a/akka-core/src/main/scala/actor/ActorRegistry.scala +++ b/akka-core/src/main/scala/actor/ActorRegistry.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.actor import scala.collection.mutable.ListBuffer import scala.reflect.Manifest -import java.util.concurrent.{CopyOnWriteArraySet, ConcurrentHashMap} +import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} import java.util.{Set=>JSet} import se.scalablesolutions.akka.util.ListenerManagement @@ -29,6 +29,11 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent * @author Jonas Bonér */ object ActorRegistry extends ListenerManagement { + + private val refComparator = new java.util.Comparator[ActorRef]{ + def compare(a: ActorRef,b: ActorRef) = a.uuid.compareTo(b.uuid) + } + private val actorsByUUID = new ConcurrentHashMap[String, ActorRef] private val actorsById = new ConcurrentHashMap[String, JSet[ActorRef]] private val actorsByClassName = new ConcurrentHashMap[String, JSet[ActorRef]] @@ -117,7 +122,7 @@ object ActorRegistry extends ListenerManagement { if (id eq null) throw new IllegalActorStateException("Actor.id is null " + actor) if (actorsById.containsKey(id)) actorsById.get(id).add(actor) else { - val set = new CopyOnWriteArraySet[ActorRef] + val set = new ConcurrentSkipListSet[ActorRef](refComparator) set.add(actor) actorsById.put(id, set) } @@ -126,7 +131,7 @@ object ActorRegistry extends ListenerManagement { val className = actor.actor.getClass.getName if (actorsByClassName.containsKey(className)) actorsByClassName.get(className).add(actor) else { - val set = new CopyOnWriteArraySet[ActorRef] + val set = new ConcurrentSkipListSet[ActorRef](refComparator) set.add(actor) actorsByClassName.put(className, set) } diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index 903f983173..b8ef4af55e 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -149,14 +149,7 @@ class GlobalStm extends TransactionManagement with Logging { def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body) def atomic[T](factory: TransactionFactory)(body: => T): T = { -/* MultiverseStmUtils.scheduleDeferredTask(new Runnable { - def run = try { - getTransactionSetInScope.tryJoinCommit(getRequiredThreadLocalTransaction, TransactionConfig.TIMEOUT, TimeUnit.MILLISECONDS) - clearTransaction - } catch { - case e: IllegalStateException => {} - }}) -*/ factory.boilerplate.execute(new TransactionalCallable[T]() { + factory.boilerplate.execute(new TransactionalCallable[T]() { def call(mtx: MultiverseTransaction): T = { if (!isTransactionSetInScope) createNewTransactionSet factory.addHooks @@ -166,7 +159,6 @@ class GlobalStm extends TransactionManagement with Logging { mtx.commit // Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake try { txSet.tryJoinCommit(mtx, TransactionConfig.TIMEOUT, TimeUnit.MILLISECONDS) } catch { case e: IllegalStateException => {} } -// try { txSet.joinCommit(mtx) } catch { case e: IllegalStateException => {} } result } }) diff --git a/akka-core/src/test/scala/SupervisorHierarchySpec.scala b/akka-core/src/test/scala/SupervisorHierarchySpec.scala new file mode 100644 index 0000000000..138313bafc --- /dev/null +++ b/akka-core/src/test/scala/SupervisorHierarchySpec.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test + +import Actor._ +import se.scalablesolutions.akka.config.OneForOneStrategy + +import java.util.concurrent.{TimeUnit, CountDownLatch} + +object SupervisorHierarchySpec { + class FireWorkerException(msg: String) extends Exception(msg) + + class CountDownActor(countDown: CountDownLatch) extends Actor { + protected def receive = { case _ => () } + override def postRestart(reason: Throwable) = countDown.countDown + } + + class CrasherActor extends Actor { + protected def receive = { case _ => () } + } +} + +class SupervisorHierarchySpec extends JUnitSuite { + import SupervisorHierarchySpec._ + + @Test + def killWorkerShouldRestartMangerAndOtherWorkers = { + val countDown = new CountDownLatch(4) + + val workerOne = actorOf(new CountDownActor(countDown)) + val workerTwo = actorOf(new CountDownActor(countDown)) + val workerThree = actorOf(new CountDownActor(countDown)) + + val boss = actorOf(new Actor{ + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(5, 1000)) + + protected def receive = { case _ => () } + }).start + + val manager = actorOf(new CountDownActor(countDown)) + boss.startLink(manager) + + manager.startLink(workerOne) + manager.startLink(workerTwo) + manager.startLink(workerThree) + + workerOne ! Exit(workerOne, new FireWorkerException("Fire the worker!")) + + // manager + all workers should be restarted by only killing a worker + // manager doesn't trap exits, so boss will restart manager + + assert(countDown.await(2, TimeUnit.SECONDS)) + } + + @Test + def supervisorShouldReceiveNotificationMessageWhenMaximumNumberOfRestartsWithinTimeRangeIsReached = { + val countDown = new CountDownLatch(2) + val crasher = actorOf(new CountDownActor(countDown)) + val boss = actorOf(new Actor{ + self.trapExit = List(classOf[Throwable]) + self.faultHandler = Some(OneForOneStrategy(1, 5000)) + protected def receive = { + case MaximumNumberOfRestartsWithinTimeRangeReached(_, _, _, _) => + countDown.countDown + } + }).start + boss.startLink(crasher) + + crasher ! Exit(crasher, new FireWorkerException("Fire the worker!")) + crasher ! Exit(crasher, new FireWorkerException("Fire the worker!")) + + assert(countDown.await(2, TimeUnit.SECONDS)) + } +} +