diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala index b4b721eebd..92de27502a 100644 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -55,7 +55,9 @@ object Actor { } /** - * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model' + * Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model': + * http://en.wikipedia.org/wiki/Actor_model + * * @author Jonas Bonér */ trait Actor extends Logging with TransactionManagement { @@ -93,7 +95,7 @@ trait Actor extends Logging with TransactionManagement { * * Needs to be set if the actor is supervised programmatically. */ - @volatile var lifeCycleConfig: Option[LifeCycle] = None + @volatile var lifeCycle: Option[LifeCycle] = None /** * User overridable callback/setting. @@ -246,7 +248,7 @@ trait Actor extends Logging with TransactionManagement { // FIXME: Need to do reference count to know if EventBasedThreadPoolDispatcher and EventBasedSingleThreadDispatcher can be shut down _isRunning = false shutdown - } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") + } } /** @@ -585,9 +587,9 @@ trait Actor extends Logging with TransactionManagement { if (future.exception.isDefined) throw future.exception.get._2 else future.result.asInstanceOf[Option[T]] - private def base: PartialFunction[Any, Unit] = lifeCycle orElse (_hotswap getOrElse receive) + private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive) - private val lifeCycle: PartialFunction[Any, Unit] = { + private val lifeCycles: PartialFunction[Any, Unit] = { case Init(config) => init(config) case HotSwap(code) => _hotswap = code case Restart(reason) => restart(reason) @@ -613,7 +615,7 @@ trait Actor extends Logging with TransactionManagement { _linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason)) private[Actor] def restart(reason: AnyRef) = synchronized { - lifeCycleConfig match { + lifeCycle match { case None => throw new IllegalStateException("Actor [" + id + "] does not have a life-cycle defined.") // FIXME implement support for shutdown time diff --git a/akka-actors/src/main/scala/actor/Scheduler.scala b/akka-actors/src/main/scala/actor/Scheduler.scala index 03404eda9d..e2a4c9dfdf 100644 --- a/akka-actors/src/main/scala/actor/Scheduler.scala +++ b/akka-actors/src/main/scala/actor/Scheduler.scala @@ -28,7 +28,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio * which is licensed under the Apache 2 License. */ class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { - lifeCycleConfig = Some(LifeCycle(Permanent, 100)) + lifeCycle = Some(LifeCycle(Permanent, 100)) def receive: PartialFunction[Any, Unit] = { case UnSchedule => diff --git a/akka-actors/src/main/scala/actor/Supervisor.scala b/akka-actors/src/main/scala/actor/Supervisor.scala index 3b5236625d..4c8afda820 100644 --- a/akka-actors/src/main/scala/actor/Supervisor.scala +++ b/akka-actors/src/main/scala/actor/Supervisor.scala @@ -141,7 +141,7 @@ class Supervisor private[akka] (handler: FaultHandlingStrategy) extends Actor wi server match { case Supervise(actor, lifecycle) => actors.put(actor.getClass.getName, actor) - actor.lifeCycleConfig = Some(lifecycle) + actor.lifeCycle = Some(lifecycle) startLink(actor) case SupervisorConfig(_, _) => // recursive configuration diff --git a/akka-amqp/src/main/scala/AMQP.scala b/akka-amqp/src/main/scala/AMQP.scala index 83620a6265..f5bf5df96f 100644 --- a/akka-amqp/src/main/scala/AMQP.scala +++ b/akka-amqp/src/main/scala/AMQP.scala @@ -125,7 +125,7 @@ object AMQP extends Actor { case object Stop extends AMQPMessage - private[akka] case class CancelMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage + private[akka] case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage private[akka] case class Reconnect(delay: Long) extends InternalAMQPMessage @@ -307,35 +307,29 @@ object AMQP extends Actor { def receive = { case listener: MessageConsumerListener => startLink(listener.actor) - registerConsumer(listener) + registerListener(listener) log.info("Message consumer listener is registered [%s]", listener) - case CancelMessageConsumerListener(listener) => - listeners.get(listener) match { - case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", listener.toString(exchangeName)) - case Some(listener) => - listeners - listener - listener.tag match { - case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener.toString(exchangeName)) - case Some(tag) => - channel.basicCancel(tag) - unlink(listener.actor) - listener.actor.stop - log.debug("Message consumer is cancelled and shut down [%s]", listener) - } - } + case UnregisterMessageConsumerListener(listener) => + unregisterListener(listener) + + case Reconnect(delay) => + reconnect(delay) - case message @ Message(payload, routingKey, mandatory, immediate, properties) => - log.debug("Sending message [%s]", message) - channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]]) + case Failure(cause) => + log.error(cause, "") + throw cause - case Reconnect(delay) => reconnect(delay) + case Stop => + listeners.elements.toList.map(_._2).foreach(unregisterListener(_)) + disconnect + stop - case Failure(cause) => log.error(cause, ""); throw cause + case message: Message => + handleIllegalMessage("AMQP.Consumer [" + this + "] can't be used to send messages, ignoring message [" + message + "]") - case Stop => disconnect; stop - - case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]") + case unknown => + handleIllegalMessage("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]") } protected def setupChannel = { @@ -344,11 +338,11 @@ object AMQP extends Actor { channel.exchangeDeclare(exchangeName.toString, exchangeType.toString, passive, durable, configurationArguments.asJava) - listeners.elements.toList.map(_._2).foreach(registerConsumer) + listeners.elements.toList.map(_._2).foreach(registerListener) if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get) } - private def registerConsumer(listener: MessageConsumerListener) = { + private def registerListener(listener: MessageConsumerListener) = { log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName)) listeners.put(listener, listener) @@ -381,18 +375,43 @@ object AMQP extends Actor { } override def handleShutdownSignal(listenerTag: String, signal: ShutdownSignalException) = { - listeners.elements.toList.map(_._2).find(_.tag == listenerTag) match { - case None => log.warning("Could not find message listener for tag [%s]; can't shut listener down", listenerTag) + def hasTag(listener: MessageConsumerListener, listenerTag: String): Boolean = { + if (listener.tag.isEmpty) throw new IllegalStateException("MessageConsumerListener [" + listener + "] does not have a tag") + listener.tag.get == listenerTag + } + listeners.elements.toList.map(_._2).find(hasTag(_, listenerTag)) match { + case None => log.error("Could not find message listener for tag [%s]; can't shut listener down", listenerTag) case Some(listener) => - log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]", + log.warning("MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]", listener.toString(exchangeName), signal.getReference, signal.getReason) - self ! CancelMessageConsumerListener(listener) + self ! UnregisterMessageConsumerListener(listener) } } }) listener.tag = Some(listenerTag) } + private def unregisterListener(listener: MessageConsumerListener) = { + listeners.get(listener) match { + case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", listener.toString(exchangeName)) + case Some(listener) => + listeners - listener + listener.tag match { + case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener.toString(exchangeName)) + case Some(tag) => + channel.basicCancel(tag) + unlink(listener.actor) + listener.actor.stop + log.debug("Message consumer is cancelled and shut down [%s]", listener) + } + } + } + + private def handleIllegalMessage(errorMessage: String) = { + log.error(errorMessage) + throw new IllegalArgumentException(errorMessage) + } + override def toString(): String = "AMQP.Consumer[hostname=" + hostname + ", port=" + port + @@ -403,7 +422,7 @@ object AMQP extends Actor { } trait FaultTolerantConnectionActor extends Actor { - lifeCycleConfig = Some(LifeCycle(Permanent, 100)) + lifeCycle = Some(LifeCycle(Permanent, 100)) val reconnectionTimer = new Timer