Upgraded to RabbitMQ 1.7.0

This commit is contained in:
Jonas Bonér 2009-12-26 23:11:50 +01:00
parent 8b72777d61
commit 3b3b87ecad
4 changed files with 19 additions and 13 deletions

View file

@ -25,15 +25,10 @@
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<!--dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>1.7.0</version>
</dependency-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>rabbitmq-client</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>

View file

@ -66,10 +66,12 @@ object AMQP {
initReconnectDelay: Long,
passive: Boolean,
durable: Boolean,
configurationArguments: Map[String, AnyRef]) =
autoDelete: Boolean,
configurationArguments: Map[String, AnyRef]) =
supervisor.newConsumer(
config, hostname, port, exchangeName, exchangeType,
shutdownListener, initReconnectDelay, passive, durable, configurationArguments)
shutdownListener, initReconnectDelay,
passive, durable, autoDelete, configurationArguments)
def stopConnection(connection: FaultTolerantConnectionActor) = supervisor.stopConnection(connection)
@ -112,6 +114,7 @@ object AMQP {
initReconnectDelay: Long,
passive: Boolean,
durable: Boolean,
autoDelete: Boolean,
configurationArguments: Map[String, AnyRef]): Consumer = {
val consumer = new Consumer(
new ConnectionFactory(config),
@ -122,6 +125,7 @@ object AMQP {
initReconnectDelay,
passive,
durable,
autoDelete,
configurationArguments)
startLink(consumer)
consumer
@ -352,6 +356,7 @@ object AMQP {
val initReconnectDelay: Long,
val passive: Boolean,
val durable: Boolean,
val autoDelete: Boolean,
val configurationArguments: Map[java.lang.String, Object])
extends FaultTolerantConnectionActor {
consumer: Consumer =>
@ -406,9 +411,7 @@ object AMQP {
protected def setupChannel = {
connection = connectionFactory.newConnection(hostname, port)
channel = connection.createChannel
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString,
passive, durable,
configurationArguments.asJava)
channel.exchangeDeclare(exchangeName.toString, exchangeType.toString, passive, durable, autoDelete, configurationArguments.asJava)
listeners.elements.toList.map(_._2).foreach(registerListener)
if (shutdownListener.isDefined) connection.addShutdownListener(shutdownListener.get)
}

View file

@ -29,7 +29,7 @@ object ExampleSession {
}
def direct = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, Map[String, AnyRef]())
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, IM, ExchangeType.Direct, None, 100, false, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "direct", actor {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})
@ -38,7 +38,7 @@ object ExampleSession {
}
def fanout = {
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, Map[String, AnyRef]())
val consumer = AMQP.newConsumer(CONFIG, HOSTNAME, PORT, CHAT, ExchangeType.Fanout, None, 100, false, false, false, Map[String, AnyRef]())
consumer ! MessageConsumerListener("@george_bush", "", actor {
case Message(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload.asInstanceOf[Array[Byte]]))
})

View file

@ -228,6 +228,14 @@
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>nexus.griddynamics.net</id>
<name>Grid Dynamics Maven Repository</name>
<url>https://nexus.griddynamics.net/nexus/content/groups/public</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>databinder.net/repo/</id>
<name>dbDispatch Repository for Maven</name>