renamed lifeCycleConfig to lifeCycle + fixed AMQP bug/isses
This commit is contained in:
parent
62ff0dbe8b
commit
aca5536fc7
4 changed files with 60 additions and 39 deletions
|
|
@ -55,7 +55,9 @@ object Actor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model'
|
||||
* Actor base trait that should be extended by or mixed to create an Actor with the semantics of the 'Actor Model':
|
||||
* <a href="http://en.wikipedia.org/wiki/Actor_model">http://en.wikipedia.org/wiki/Actor_model</a>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait Actor extends Logging with TransactionManagement {
|
||||
|
|
@ -93,7 +95,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
*
|
||||
* Needs to be set if the actor is supervised programmatically.
|
||||
*/
|
||||
@volatile var lifeCycleConfig: Option[LifeCycle] = None
|
||||
@volatile var lifeCycle: Option[LifeCycle] = None
|
||||
|
||||
/**
|
||||
* User overridable callback/setting.
|
||||
|
|
@ -246,7 +248,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
// FIXME: Need to do reference count to know if EventBasedThreadPoolDispatcher and EventBasedSingleThreadDispatcher can be shut down
|
||||
_isRunning = false
|
||||
shutdown
|
||||
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -585,9 +587,9 @@ trait Actor extends Logging with TransactionManagement {
|
|||
if (future.exception.isDefined) throw future.exception.get._2
|
||||
else future.result.asInstanceOf[Option[T]]
|
||||
|
||||
private def base: PartialFunction[Any, Unit] = lifeCycle orElse (_hotswap getOrElse receive)
|
||||
private def base: PartialFunction[Any, Unit] = lifeCycles orElse (_hotswap getOrElse receive)
|
||||
|
||||
private val lifeCycle: PartialFunction[Any, Unit] = {
|
||||
private val lifeCycles: PartialFunction[Any, Unit] = {
|
||||
case Init(config) => init(config)
|
||||
case HotSwap(code) => _hotswap = code
|
||||
case Restart(reason) => restart(reason)
|
||||
|
|
@ -613,7 +615,7 @@ trait Actor extends Logging with TransactionManagement {
|
|||
_linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
|
||||
|
||||
private[Actor] def restart(reason: AnyRef) = synchronized {
|
||||
lifeCycleConfig match {
|
||||
lifeCycle match {
|
||||
case None => throw new IllegalStateException("Actor [" + id + "] does not have a life-cycle defined.")
|
||||
|
||||
// FIXME implement support for shutdown time
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ case class SchedulerException(msg: String, e: Throwable) extends RuntimeExceptio
|
|||
* which is licensed under the Apache 2 License.
|
||||
*/
|
||||
class ScheduleActor(val receiver: Actor, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
|
||||
lifeCycleConfig = Some(LifeCycle(Permanent, 100))
|
||||
lifeCycle = Some(LifeCycle(Permanent, 100))
|
||||
|
||||
def receive: PartialFunction[Any, Unit] = {
|
||||
case UnSchedule =>
|
||||
|
|
|
|||
|
|
@ -141,7 +141,7 @@ class Supervisor private[akka] (handler: FaultHandlingStrategy) extends Actor wi
|
|||
server match {
|
||||
case Supervise(actor, lifecycle) =>
|
||||
actors.put(actor.getClass.getName, actor)
|
||||
actor.lifeCycleConfig = Some(lifecycle)
|
||||
actor.lifeCycle = Some(lifecycle)
|
||||
startLink(actor)
|
||||
|
||||
case SupervisorConfig(_, _) => // recursive configuration
|
||||
|
|
|
|||
|
|
@ -125,7 +125,7 @@ object AMQP extends Actor {
|
|||
|
||||
case object Stop extends AMQPMessage
|
||||
|
||||
private[akka] case class CancelMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage
|
||||
private[akka] case class UnregisterMessageConsumerListener(consumer: MessageConsumerListener) extends InternalAMQPMessage
|
||||
|
||||
private[akka] case class Reconnect(delay: Long) extends InternalAMQPMessage
|
||||
|
||||
|
|
@ -307,35 +307,29 @@ object AMQP extends Actor {
|
|||
def receive = {
|
||||
case listener: MessageConsumerListener =>
|
||||
startLink(listener.actor)
|
||||
registerConsumer(listener)
|
||||
registerListener(listener)
|
||||
log.info("Message consumer listener is registered [%s]", listener)
|
||||
|
||||
case CancelMessageConsumerListener(listener) =>
|
||||
listeners.get(listener) match {
|
||||
case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", listener.toString(exchangeName))
|
||||
case Some(listener) =>
|
||||
listeners - listener
|
||||
listener.tag match {
|
||||
case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener.toString(exchangeName))
|
||||
case Some(tag) =>
|
||||
channel.basicCancel(tag)
|
||||
unlink(listener.actor)
|
||||
listener.actor.stop
|
||||
log.debug("Message consumer is cancelled and shut down [%s]", listener)
|
||||
}
|
||||
}
|
||||
case UnregisterMessageConsumerListener(listener) =>
|
||||
unregisterListener(listener)
|
||||
|
||||
case Reconnect(delay) =>
|
||||
reconnect(delay)
|
||||
|
||||
case message @ Message(payload, routingKey, mandatory, immediate, properties) =>
|
||||
log.debug("Sending message [%s]", message)
|
||||
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, properties, payload.asInstanceOf[Array[Byte]])
|
||||
case Failure(cause) =>
|
||||
log.error(cause, "")
|
||||
throw cause
|
||||
|
||||
case Reconnect(delay) => reconnect(delay)
|
||||
case Stop =>
|
||||
listeners.elements.toList.map(_._2).foreach(unregisterListener(_))
|
||||
disconnect
|
||||
stop
|
||||
|
||||
case Failure(cause) => log.error(cause, ""); throw cause
|
||||
case message: Message =>
|
||||
handleIllegalMessage("AMQP.Consumer [" + this + "] can't be used to send messages, ignoring message [" + message + "]")
|
||||
|
||||
case Stop => disconnect; stop
|
||||
|
||||
case unknown => throw new IllegalArgumentException("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]")
|
||||
case unknown =>
|
||||
handleIllegalMessage("Unknown message [" + unknown + "] to AMQP Consumer [" + this + "]")
|
||||
}
|
||||
|
||||
protected def setupChannel = {
|
||||
|
|
@ -344,11 +338,11 @@ object AMQP extends Actor {
|
|||
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString,
|
||||
passive, durable,
|
||||
configurationArguments.asJava)
|
||||
listeners.elements.toList.map(_._2).foreach(registerConsumer)
|
||||
listeners.elements.toList.map(_._2).foreach(registerListener)
|
||||
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
|
||||
}
|
||||
|
||||
private def registerConsumer(listener: MessageConsumerListener) = {
|
||||
private def registerListener(listener: MessageConsumerListener) = {
|
||||
log.debug("Register MessageConsumerListener %s", listener.toString(exchangeName))
|
||||
listeners.put(listener, listener)
|
||||
|
||||
|
|
@ -381,18 +375,43 @@ object AMQP extends Actor {
|
|||
}
|
||||
|
||||
override def handleShutdownSignal(listenerTag: String, signal: ShutdownSignalException) = {
|
||||
listeners.elements.toList.map(_._2).find(_.tag == listenerTag) match {
|
||||
case None => log.warning("Could not find message listener for tag [%s]; can't shut listener down", listenerTag)
|
||||
def hasTag(listener: MessageConsumerListener, listenerTag: String): Boolean = {
|
||||
if (listener.tag.isEmpty) throw new IllegalStateException("MessageConsumerListener [" + listener + "] does not have a tag")
|
||||
listener.tag.get == listenerTag
|
||||
}
|
||||
listeners.elements.toList.map(_._2).find(hasTag(_, listenerTag)) match {
|
||||
case None => log.error("Could not find message listener for tag [%s]; can't shut listener down", listenerTag)
|
||||
case Some(listener) =>
|
||||
log.warning("Message listener listener [%s] is being shutdown by [%s] due to [%s]",
|
||||
log.warning("MessageConsumerListener [%s] is being shutdown by [%s] due to [%s]",
|
||||
listener.toString(exchangeName), signal.getReference, signal.getReason)
|
||||
self ! CancelMessageConsumerListener(listener)
|
||||
self ! UnregisterMessageConsumerListener(listener)
|
||||
}
|
||||
}
|
||||
})
|
||||
listener.tag = Some(listenerTag)
|
||||
}
|
||||
|
||||
private def unregisterListener(listener: MessageConsumerListener) = {
|
||||
listeners.get(listener) match {
|
||||
case None => log.warning("Can't unregister message consumer listener [%s]; no such listener", listener.toString(exchangeName))
|
||||
case Some(listener) =>
|
||||
listeners - listener
|
||||
listener.tag match {
|
||||
case None => log.warning("Can't unregister message consumer listener [%s]; no listener tag", listener.toString(exchangeName))
|
||||
case Some(tag) =>
|
||||
channel.basicCancel(tag)
|
||||
unlink(listener.actor)
|
||||
listener.actor.stop
|
||||
log.debug("Message consumer is cancelled and shut down [%s]", listener)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def handleIllegalMessage(errorMessage: String) = {
|
||||
log.error(errorMessage)
|
||||
throw new IllegalArgumentException(errorMessage)
|
||||
}
|
||||
|
||||
override def toString(): String =
|
||||
"AMQP.Consumer[hostname=" + hostname +
|
||||
", port=" + port +
|
||||
|
|
@ -403,7 +422,7 @@ object AMQP extends Actor {
|
|||
}
|
||||
|
||||
trait FaultTolerantConnectionActor extends Actor {
|
||||
lifeCycleConfig = Some(LifeCycle(Permanent, 100))
|
||||
lifeCycle = Some(LifeCycle(Permanent, 100))
|
||||
|
||||
val reconnectionTimer = new Timer
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue