Fixed regression bug in AMQP supervisor code
This commit is contained in:
parent
3687b6f6e4
commit
cf76aa137b
1 changed files with 21 additions and 16 deletions
|
|
@ -77,16 +77,25 @@ object AMQP {
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class AMQPSupervisor extends Actor with Logging {
|
||||
class AMQPSupervisor extends Logging {
|
||||
|
||||
class AMQPSupervisorActor extends Actor {
|
||||
import scala.collection.JavaConversions._
|
||||
import self._
|
||||
|
||||
faultHandler = Some(OneForOneStrategy(5, 5000))
|
||||
trapExit = List(classOf[Throwable])
|
||||
|
||||
def receive = {
|
||||
case _ => {} // ignore all messages
|
||||
}
|
||||
}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import self._
|
||||
|
||||
private val supervisor = actorOf[AMQPSupervisorActor].start
|
||||
private val connections = new ConcurrentHashMap[ActorRef, ActorRef]
|
||||
|
||||
self.faultHandler = Some(OneForOneStrategy(5, 5000))
|
||||
self.trapExit = List(classOf[Throwable])
|
||||
start
|
||||
|
||||
def newProducer(
|
||||
config: ConnectionParameters,
|
||||
hostname: String,
|
||||
|
|
@ -95,14 +104,14 @@ object AMQP {
|
|||
returnListener: Option[ReturnListener],
|
||||
shutdownListener: Option[ShutdownListener],
|
||||
initReconnectDelay: Long): ActorRef = {
|
||||
val producer = actorOf( new Producer(
|
||||
val producer = actorOf(new Producer(
|
||||
new ConnectionFactory(config),
|
||||
hostname, port,
|
||||
exchangeName,
|
||||
returnListener,
|
||||
shutdownListener,
|
||||
initReconnectDelay))
|
||||
startLink(producer)
|
||||
supervisor.startLink(producer)
|
||||
producer
|
||||
}
|
||||
|
||||
|
|
@ -118,7 +127,7 @@ object AMQP {
|
|||
durable: Boolean,
|
||||
autoDelete: Boolean,
|
||||
configurationArguments: Map[String, AnyRef]): ActorRef = {
|
||||
val consumer = actorOf( new Consumer(
|
||||
val consumer = actorOf(new Consumer(
|
||||
new ConnectionFactory(config),
|
||||
hostname, port,
|
||||
exchangeName,
|
||||
|
|
@ -129,24 +138,20 @@ object AMQP {
|
|||
durable,
|
||||
autoDelete,
|
||||
configurationArguments))
|
||||
startLink(consumer)
|
||||
supervisor.startLink(consumer)
|
||||
consumer
|
||||
}
|
||||
|
||||
def stopConnection(connection: ActorRef) = {
|
||||
connection ! Stop
|
||||
unlink(connection)
|
||||
supervisor.unlink(connection)
|
||||
connections.remove(connection)
|
||||
}
|
||||
|
||||
override def shutdown = {
|
||||
def shutdown = {
|
||||
asMap(connections).valuesIterator.foreach(_ ! Stop)
|
||||
exit
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case _ => {} // ignore all messages
|
||||
}
|
||||
}
|
||||
|
||||
sealed trait AMQPMessage
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue