From 84785b33eb38455d7d09c8a9ea8eb9c1666ae6cb Mon Sep 17 00:00:00 2001 From: momania Date: Thu, 23 Sep 2010 17:07:51 +0200 Subject: [PATCH] - Adding java api to AMQP module - Reorg of params, especially declaration attributes and exhange name/params --- .../se/scalablesolutions/akka/amqp/AMQP.scala | 306 ++++++++++++++---- .../akka/amqp/ConsumerActor.scala | 65 ++-- .../akka/amqp/ExampleSession.scala | 48 +-- .../akka/amqp/FaultTolerantChannelActor.scala | 23 +- .../amqp/FaultTolerantConnectionActor.scala | 2 +- .../akka/amqp/ProducerActor.scala | 11 +- .../scalablesolutions/akka/amqp/rpc/RPC.scala | 43 ++- .../akka/amqp/rpc/RpcClientActor.scala | 2 +- project/build/AkkaProject.scala | 68 ++-- 9 files changed, 372 insertions(+), 196 deletions(-) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala index cd73d27e03..be53297cd3 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/AMQP.scala @@ -8,6 +8,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.config.OneForOneStrategy import com.rabbitmq.client.{ReturnListener, ShutdownListener, ConnectionFactory} +import ConnectionFactory._ import com.rabbitmq.client.AMQP.BasicProperties import java.lang.{String, IllegalArgumentException} @@ -19,56 +20,208 @@ import java.lang.{String, IllegalArgumentException} * @author Irmo Manie */ object AMQP { - case class ConnectionParameters( - host: String = ConnectionFactory.DEFAULT_HOST, - port: Int = ConnectionFactory.DEFAULT_AMQP_PORT, - username: String = ConnectionFactory.DEFAULT_USER, - password: String = ConnectionFactory.DEFAULT_PASS, - virtualHost: String = ConnectionFactory.DEFAULT_VHOST, - initReconnectDelay: Long = 5000, - connectionCallback: Option[ActorRef] = None) + /** + * Parameters used to make the connection to the amqp broker. Uses the rabbitmq defaults. + */ + case class ConnectionParameters( + host: String = DEFAULT_HOST, + port: Int = DEFAULT_AMQP_PORT, + username: String = DEFAULT_USER, + password: String = DEFAULT_PASS, + virtualHost: String = DEFAULT_VHOST, + initReconnectDelay: Long = 5000, + connectionCallback: Option[ActorRef] = None) { + + // Needed for Java API usage + def this() = this (DEFAULT_HOST, DEFAULT_AMQP_PORT, DEFAULT_USER, DEFAULT_PASS, DEFAULT_VHOST, 5000, None) + + // Needed for Java API usage + def this(host: String, port: Int, username: String, password: String, virtualHost: String) = + this (host, port, username, password, virtualHost, 5000, None) + + // Needed for Java API usage + def this(host: String, port: Int, username: String, password: String, virtualHost: String, initReconnectDelay: Long, connectionCallback: ActorRef) = + this (host, port, username, password, virtualHost, initReconnectDelay, Some(connectionCallback)) + + // Needed for Java API usage + def this(connectionCallback: ActorRef) = + this (DEFAULT_HOST, DEFAULT_AMQP_PORT, DEFAULT_USER, DEFAULT_PASS, DEFAULT_VHOST, 5000, Some(connectionCallback)) + + } + + /** + * Additional parameters for the channel + */ case class ChannelParameters( shutdownListener: Option[ShutdownListener] = None, - channelCallback: Option[ActorRef] = None) + channelCallback: Option[ActorRef] = None) { + // Needed for Java API usage + def this() = this (None, None) + + // Needed for Java API usage + def this(channelCallback: ActorRef) = this (None, Some(channelCallback)) + + // Needed for Java API usage + def this(shutdownListener: ShutdownListener, channelCallback: ActorRef) = + this (Some(shutdownListener), Some(channelCallback)) + } + + /** + * Declaration type used for either exchange or queue declaration + */ + sealed trait Declaration + case object NoActionDeclaration extends Declaration + case object PassiveDeclaration extends Declaration + case class ActiveDeclaration(durable: Boolean = false, autoDelete: Boolean = true, exclusive: Boolean = false) extends Declaration { + + // Needed for Java API usage + def this() = this (false, true, false) + + // Needed for Java API usage + def this(durable: Boolean, autoDelete: Boolean) = this (durable, autoDelete, false) + } + + /** + * Exchange specific parameters + */ case class ExchangeParameters( exchangeName: String, - exchangeType: ExchangeType, - exchangeDurable: Boolean = false, - exchangeAutoDelete: Boolean = true, - exchangePassive: Boolean = false, - configurationArguments: Map[String, AnyRef] = Map()) + exchangeType: ExchangeType = ExchangeType.Topic, + exchangeDeclaration: Declaration = new ActiveDeclaration(), + configurationArguments: Map[String, AnyRef] = Map.empty()) { + // Needed for Java API usage + def this(exchangeName: String) = + this (exchangeName, ExchangeType.Topic, new ActiveDeclaration(), Map.empty()) + + // Needed for Java API usage + def this(exchangeName: String, exchangeType: ExchangeType) = + this (exchangeName, exchangeType, new ActiveDeclaration(), Map.empty()) + + // Needed for Java API usage + def this(exchangeName: String, exchangeType: ExchangeType, exchangeDeclaration: Declaration) = + this (exchangeName, exchangeType, exchangeDeclaration, Map.empty()) + } + + /** + * Producer specific parameters + */ case class ProducerParameters( - exchangeParameters: ExchangeParameters, + exchangeParameters: Option[ExchangeParameters] = None, producerId: Option[String] = None, returnListener: Option[ReturnListener] = None, - channelParameters: Option[ChannelParameters] = None) + channelParameters: Option[ChannelParameters] = None) { + def this() = this(None, None, None, None) + + // Needed for Java API usage + def this(exchangeParameters: ExchangeParameters) = this (Some(exchangeParameters), None, None, None) + + // Needed for Java API usage + def this(exchangeParameters: ExchangeParameters, producerId: String) = + this (Some(exchangeParameters), Some(producerId), None, None) + + // Needed for Java API usage + def this(exchangeParameters: ExchangeParameters, returnListener: ReturnListener) = + this (Some(exchangeParameters), None, Some(returnListener), None) + + // Needed for Java API usage + def this(exchangeParameters: ExchangeParameters, channelParameters: ChannelParameters) = + this (Some(exchangeParameters), None, None, Some(channelParameters)) + + // Needed for Java API usage + def this(exchangeParameters: ExchangeParameters, producerId: String, returnListener: ReturnListener, channelParameters: ChannelParameters) = + this (Some(exchangeParameters), Some(producerId), Some(returnListener), Some(channelParameters)) + } + + /** + * Consumer specific parameters + */ case class ConsumerParameters( - exchangeParameters: ExchangeParameters, routingKey: String, deliveryHandler: ActorRef, queueName: Option[String] = None, - queueDurable: Boolean = false, - queueAutoDelete: Boolean = true, - queuePassive: Boolean = false, - queueExclusive: Boolean = false, + exchangeParameters: Option[ExchangeParameters], + queueDeclaration: Declaration = new ActiveDeclaration(), selfAcknowledging: Boolean = true, channelParameters: Option[ChannelParameters] = None) { - if (queueDurable && queueName.isEmpty) { - throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") + + if (queueName.isEmpty) { + queueDeclaration match { + case ActiveDeclaration(true, _, _) => + throw new IllegalArgumentException("A queue name is required when requesting a durable queue.") + case PassiveDeclaration => + throw new IllegalArgumentException("A queue name is required when requesting passive declaration.") + case NoActionDeclaration => () // ignore + } } + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef) = + this (routingKey, deliveryHandler, None, None, new ActiveDeclaration(), true, None) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, channelParameters: ChannelParameters) = + this (routingKey, deliveryHandler, None, None, new ActiveDeclaration(), true, Some(channelParameters)) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, selfAcknowledging: Boolean) = + this (routingKey, deliveryHandler, None, None, new ActiveDeclaration(), selfAcknowledging, None) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, selfAcknowledging: Boolean, channelParameters: ChannelParameters) = + this (routingKey, deliveryHandler, None, None, new ActiveDeclaration(), selfAcknowledging, Some(channelParameters)) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, queueName: String) = + this (routingKey, deliveryHandler, Some(queueName), None, new ActiveDeclaration(), true, None) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, queueDeclaration: Declaration, selfAcknowledging: Boolean, channelParameters: ChannelParameters) = + this (routingKey, deliveryHandler, Some(queueName), None, queueDeclaration, selfAcknowledging, Some(channelParameters)) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters) = + this (routingKey, deliveryHandler, None, Some(exchangeParameters), new ActiveDeclaration(), true, None) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, exchangeParameters: ExchangeParameters, selfAcknowledging: Boolean) = + this (routingKey, deliveryHandler, None, Some(exchangeParameters), new ActiveDeclaration(), selfAcknowledging, None) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters) = + this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), new ActiveDeclaration(), true, None) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters, queueDeclaration: Declaration) = + this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), queueDeclaration, true, None) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters, queueDeclaration: Declaration, selfAcknowledging: Boolean) = + this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), queueDeclaration, selfAcknowledging, None) + + // Needed for Java API usage + def this(routingKey: String, deliveryHandler: ActorRef, queueName: String, exchangeParameters: ExchangeParameters, queueDeclaration: Declaration, selfAcknowledging: Boolean, channelParameters: ChannelParameters) = + this (routingKey, deliveryHandler, Some(queueName), Some(exchangeParameters), queueDeclaration, selfAcknowledging, Some(channelParameters)) + + // How about that for some overloading... huh? :P (yes, I know, there are still posibilities left...sue me!) + // Who said java is easy :( } - def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters): ActorRef = { + def newConnection(connectionParameters: ConnectionParameters = new ConnectionParameters()): ActorRef = { val connection = actorOf(new FaultTolerantConnectionActor(connectionParameters)) supervisor.startLink(connection) connection ! Connect connection } + // Needed for Java API usage + def newConnection(): ActorRef = { + newConnection(new ConnectionParameters()) + } + def newProducer(connection: ActorRef, producerParameters: ProducerParameters): ActorRef = { val producer: ActorRef = Actor.actorOf(new ProducerActor(producerParameters)) connection.startLink(producer) @@ -86,7 +239,7 @@ object AMQP { } /** - * Convenience + * Convenience */ class ProducerClient[O](client: ActorRef, routingKey: String, toBinary: ToBinary[O]) { def send(request: O, replyTo: Option[String] = None) = { @@ -95,20 +248,19 @@ object AMQP { client ! Message(toBinary.toBinary(request), routingKey, false, false, Some(basicProperties)) } - def stop = client.stop + def stop() = client.stop } def newStringProducer(connection: ActorRef, - exchange: String, - routingKey: Option[String] = None, - producerId: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true, - passive: Boolean = true): ProducerClient[String] = { + exchangeName: Option[String], + routingKey: Option[String] = None, + producerId: Option[String] = None): ProducerClient[String] = { - val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, - exchangeDurable = durable, exchangeAutoDelete = autoDelete) - val rKey = routingKey.getOrElse("%s.request".format(exchange)) + if (exchangeName.isEmpty && routingKey.isEmpty) { + throw new IllegalArgumentException("Either exchange name or routing key is mandatory") + } + val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name))) + val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get)) val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId)) val toBinary = new ToBinary[String] { @@ -117,37 +269,57 @@ object AMQP { new ProducerClient(producerRef, rKey, toBinary) } + // Needed for Java API usage + def newStringProducer(connection: ActorRef): ProducerClient[String] = { + newStringProducer(connection, None, None, None) + } + // Needed for Java API usage + def newStringProducer(connection: ActorRef, exchangeName: String): ProducerClient[String] = { + newStringProducer(connection, Some(exchangeName), None, None) + } + + // Needed for Java API usage + def newStringProducer(connection: ActorRef, exchangeName: String, routingKey: String): ProducerClient[String] = { + newStringProducer(connection, Some(exchangeName), Some(routingKey), None) + } + + // Needed for Java API usage + def newStringProducer(connection: ActorRef, exchangeName: String, routingKey: String, producerId: String): ProducerClient[String] = { + newStringProducer(connection, Some(exchangeName), Some(routingKey), Some(producerId)) + } + + def newStringConsumer(connection: ActorRef, - exchange: String, - handler: String => Unit, - routingKey: Option[String] = None, - queueName: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true): ActorRef = { + handler: String => Unit, + exchangeName: Option[String], + routingKey: Option[String] = None, + queueName: Option[String] = None): ActorRef = { + + if (exchangeName.isEmpty && routingKey.isEmpty) { + throw new IllegalArgumentException("Either exchange name or routing key is mandatory") + } val deliveryHandler = actor { case Delivery(payload, _, _, _, _) => handler.apply(new String(payload)) } - val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, - exchangeDurable = durable, exchangeAutoDelete = autoDelete) - val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name))) + val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get)) val qName = queueName.getOrElse("%s.in".format(rKey)) - newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete)) + newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters)) } def newProtobufProducer[O <: com.google.protobuf.Message](connection: ActorRef, - exchange: String, - routingKey: Option[String] = None, - producerId: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true, - passive: Boolean = true): ProducerClient[O] = { + exchangeName: Option[String], + routingKey: Option[String] = None, + producerId: Option[String] = None): ProducerClient[O] = { - val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, - exchangeDurable = durable, exchangeAutoDelete = autoDelete) - val rKey = routingKey.getOrElse("%s.request".format(exchange)) + if (exchangeName.isEmpty && routingKey.isEmpty) { + throw new IllegalArgumentException("Either exchange name or routing key is mandatory") + } + val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name))) + val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get)) val producerRef = newProducer(connection, ProducerParameters(exchangeParameters, producerId)) new ProducerClient(producerRef, rKey, new ToBinary[O] { @@ -156,12 +328,14 @@ object AMQP { } def newProtobufConsumer[I <: com.google.protobuf.Message](connection: ActorRef, - exchange: String, - handler: I => Unit, - routingKey: Option[String] = None, - queueName: Option[String] = None, - durable: Boolean = false, - autoDelete: Boolean = true)(implicit manifest: Manifest[I]): ActorRef = { + handler: I => Unit, + exchangeName: Option[String], + routingKey: Option[String] = None, + queueName: Option[String] = None)(implicit manifest: Manifest[I]): ActorRef = { + + if (exchangeName.isEmpty && routingKey.isEmpty) { + throw new IllegalArgumentException("Either exchange name or routing key is mandatory") + } val deliveryHandler = actor { case Delivery(payload, _, _, _, _) => { @@ -169,22 +343,20 @@ object AMQP { } } - val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, - exchangeDurable = durable, exchangeAutoDelete = autoDelete) - val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val exchangeParameters = exchangeName.flatMap(name => Some(ExchangeParameters(name))) + val rKey = routingKey.getOrElse("%s.request".format(exchangeName.get)) val qName = queueName.getOrElse("%s.in".format(rKey)) - newConsumer(connection, ConsumerParameters(exchangeParameters, rKey, deliveryHandler, Some(qName), durable, autoDelete)) + newConsumer(connection, ConsumerParameters(rKey, deliveryHandler, Some(qName), exchangeParameters)) } /** * Main supervisor */ - class AMQPSupervisorActor extends Actor { import self._ - faultHandler = Some(OneForOneStrategy(5, 5000)) + faultHandler = Some(OneForOneStrategy(None, None)) // never die trapExit = List(classOf[Throwable]) def receive = { @@ -194,7 +366,7 @@ object AMQP { private val supervisor = actorOf(new AMQPSupervisorActor).start - def shutdownAll = { + def shutdownAll() = { supervisor.shutdownLinkedActors } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala index 0ca9046093..277500e136 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ConsumerActor.scala @@ -6,20 +6,16 @@ package se.scalablesolutions.akka.amqp import collection.JavaConversions -import se.scalablesolutions.akka.amqp.AMQP.ConsumerParameters import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.AkkaException -import com.rabbitmq.client.AMQP.Queue.DeclareOk import com.rabbitmq.client.AMQP.BasicProperties import com.rabbitmq.client.{Channel, Envelope, DefaultConsumer} +import se.scalablesolutions.akka.amqp.AMQP.{NoActionDeclaration, ActiveDeclaration, PassiveDeclaration, ConsumerParameters} private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) - extends FaultTolerantChannelActor( - consumerParameters.exchangeParameters, consumerParameters.channelParameters) { - + extends FaultTolerantChannelActor( + consumerParameters.exchangeParameters, consumerParameters.channelParameters) { import consumerParameters._ - import exchangeParameters._ var listenerTag: Option[String] = None @@ -34,15 +30,21 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) protected def setupChannel(ch: Channel) = { - val queueDeclare: DeclareOk = { + val queueDeclare: com.rabbitmq.client.AMQP.Queue.DeclareOk = { queueName match { case Some(name) => - log.debug("Declaring new queue [%s] for %s", name, toString) - if (queuePassive) ch.queueDeclarePassive(name) - else { - ch.queueDeclare( - name, queueDurable, queueExclusive, queueAutoDelete, - JavaConversions.asMap(configurationArguments)) + queueDeclaration match { + case PassiveDeclaration => + log.debug("Passively declaring new queue [%s] for %s", name, toString) + ch.queueDeclarePassive(name) + case ActiveDeclaration(durable, autoDelete, exclusive) => + log.debug("Actively declaring new queue [%s] for %s", name, toString) + val configurationArguments = exchangeParameters match { + case Some(params) => params.configurationArguments + case _ => Map.empty() + } + ch.queueDeclare(name, durable, exclusive, autoDelete, JavaConversions.asMap(configurationArguments)) + case NoActionDeclaration => new com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk(name, 0, 0) // do nothing here } case None => log.debug("Declaring new generated queue for %s", toString) @@ -50,8 +52,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) } } - log.debug("Binding new queue [%s] for %s", queueDeclare.getQueue, toString) - ch.queueBind(queueDeclare.getQueue, exchangeName, routingKey) + exchangeParameters.foreach { + params => + log.debug("Binding new queue [%s] for %s", queueDeclare.getQueue, toString) + ch.queueBind(queueDeclare.getQueue, params.exchangeName, routingKey) + } val tag = ch.basicConsume(queueDeclare.getQueue, false, new DefaultConsumer(ch) with Logging { override def handleDelivery(tag: String, envelope: Envelope, properties: BasicProperties, payload: Array[Byte]) { @@ -77,11 +82,12 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) private def acknowledgeDeliveryTag(deliveryTag: Long, remoteAcknowledgement: Boolean) = { log.debug("Acking message with delivery tag [%s]", deliveryTag) - channel.foreach{ch => - ch.basicAck(deliveryTag, false) - if (remoteAcknowledgement) { - deliveryHandler ! Acknowledged(deliveryTag) - } + channel.foreach { + ch => + ch.basicAck(deliveryTag, false) + if (remoteAcknowledgement) { + deliveryHandler ! Acknowledged(deliveryTag) + } } } @@ -90,10 +96,11 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) // FIXME: when rabbitmq 1.9 arrives, basicReject should be available on the API and implemented instead of this log.warning("Consumer is rejecting delivery with tag [%s] - " + "for now this means we have to self terminate and kill the channel - see you in a second.") - channel.foreach{ch => - if (remoteAcknowledgement) { - deliveryHandler ! Rejected(deliveryTag) - } + channel.foreach { + ch => + if (remoteAcknowledgement) { + deliveryHandler ! Rejected(deliveryTag) + } } throw new RejectionException(deliveryTag) } @@ -115,10 +122,8 @@ private[amqp] class ConsumerActor(consumerParameters: ConsumerParameters) } override def toString = - "AMQP.Consumer[id= "+ self.id + - ", exchange=" + exchangeName + - ", exchangeType=" + exchangeType + - ", durable=" + exchangeDurable + - ", autoDelete=" + exchangeAutoDelete + "]" + "AMQP.Consumer[id= " + self.id + + ", exchangeParameters=" + exchangeParameters + + ", queueDeclaration=" + queueDeclaration + "]" } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala index ecb3029444..8468d75919 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ExampleSession.scala @@ -69,11 +69,11 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_direct_exchange", ExchangeType.Direct) - val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "some.routing", actor { + val consumer = AMQP.newConsumer(connection, ConsumerParameters("some.routing", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) - })) + }, None, Some(exchangeParameters))) - val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters)) + val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: You sucked!!".getBytes, "some.routing") } @@ -84,15 +84,15 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_fanout_exchange", ExchangeType.Fanout) - val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@george_bush", actor { + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) - })) + }, None, Some(exchangeParameters))) - val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@barack_obama", actor { + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor { case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) - })) + }, None, Some(exchangeParameters))) - val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters)) + val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: I'm going surfing".getBytes, "") } @@ -103,15 +103,15 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_topic_exchange", ExchangeType.Topic) - val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@george_bush", actor { + val bushConsumer = AMQP.newConsumer(connection, ConsumerParameters("@george_bush", actor { case Delivery(payload, _, _, _, _) => log.info("@george_bush received message from: %s", new String(payload)) - })) + }, None, Some(exchangeParameters))) - val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "@barack_obama", actor { + val obamaConsumer = AMQP.newConsumer(connection, ConsumerParameters("@barack_obama", actor { case Delivery(payload, _, _, _, _) => log.info("@barack_obama received message from: %s", new String(payload)) - })) + }, None, Some(exchangeParameters))) - val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters)) + val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) producer ! Message("@jonas_boner: You still suck!!".getBytes, "@george_bush") producer ! Message("@jonas_boner: Yes I can!".getBytes, "@barack_obama") } @@ -138,11 +138,11 @@ object ExampleSession { val exchangeParameters = ExchangeParameters("my_callback_exchange", ExchangeType.Direct) val channelParameters = ChannelParameters(channelCallback = Some(channelCallback)) - val consumer = AMQP.newConsumer(connection, ConsumerParameters(exchangeParameters, "callback.routing", actor { + val consumer = AMQP.newConsumer(connection, ConsumerParameters("callback.routing", actor { case _ => () // not used - }, channelParameters = Some(channelParameters))) + }, None, Some(exchangeParameters), channelParameters = Some(channelParameters))) - val producer = AMQP.newProducer(connection, ProducerParameters(exchangeParameters)) + val producer = AMQP.newProducer(connection, ProducerParameters(Some(exchangeParameters))) // Wait until both channels (producer & consumer) are started before stopping the connection channelCountdown.await(2, TimeUnit.SECONDS) @@ -155,10 +155,10 @@ object ExampleSession { val exchangeName = "easy.string" // listen by default to: - // exchange = exchangeName - // routingKey = .request + // exchange = optional exchangeName + // routingKey = provided routingKey or .request // queueName = .in - AMQP.newStringConsumer(connection, exchangeName, message => println("Received message: "+message)) + AMQP.newStringConsumer(connection, message => println("Received message: "+message), Some(exchangeName)) // send by default to: // exchange = exchangeName @@ -177,9 +177,9 @@ object ExampleSession { log.info("Received "+message) } - AMQP.newProtobufConsumer(connection, exchangeName, protobufMessageHandler) + AMQP.newProtobufConsumer(connection, protobufMessageHandler, Some(exchangeName)) - val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, exchangeName) + val producerClient = AMQP.newProtobufProducer[AddressProtocol](connection, Some(exchangeName)) producerClient.send(AddressProtocol.newBuilder.setHostname("akkarocks.com").setPort(1234).build) } @@ -187,7 +187,7 @@ object ExampleSession { val connection = AMQP.newConnection() - val exchangeParameters = ExchangeParameters("my_rpc_exchange", ExchangeType.Topic) + val exchangeName = "my_rpc_exchange" /** Server */ val serverFromBinary = new FromBinary[String] { @@ -200,7 +200,7 @@ object ExampleSession { def requestHandler(request: String) = 3 - val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeParameters, "rpc.in.key", rpcServerSerializer, + val rpcServer = RPC.newRpcServer[String,Int](connection, exchangeName, "rpc.in.key", rpcServerSerializer, requestHandler, queueName = Some("rpc.in.key.queue")) @@ -213,7 +213,7 @@ object ExampleSession { } val rpcClientSerializer = new RpcClientSerializer[String, Int](clientToBinary, clientFromBinary) - val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeParameters, "rpc.in.key", rpcClientSerializer) + val rpcClient = RPC.newRpcClient[String,Int](connection, exchangeName, "rpc.in.key", rpcClientSerializer) val response = (rpcClient !! "rpc_request") log.info("Response: " + response) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala index 4d642df554..6617c62a44 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantChannelActor.scala @@ -10,13 +10,10 @@ import se.scalablesolutions.akka.actor.Actor import Actor._ import com.rabbitmq.client.{ShutdownSignalException, Channel, ShutdownListener} import scala.PartialFunction -import se.scalablesolutions.akka.amqp.AMQP.{ExchangeParameters, ChannelParameters} +import se.scalablesolutions.akka.amqp.AMQP._ abstract private[amqp] class FaultTolerantChannelActor( - exchangeParameters: ExchangeParameters, channelParameters: Option[ChannelParameters]) extends Actor { - - import exchangeParameters._ - + exchangeParameters: Option[ExchangeParameters], channelParameters: Option[ChannelParameters]) extends Actor { protected[amqp] var channel: Option[Channel] = None log.info("%s is started", toString) @@ -64,12 +61,16 @@ abstract private[amqp] class FaultTolerantChannelActor( protected def setupChannel(ch: Channel) private def setupChannelInternal(ch: Channel) = if (channel.isEmpty) { - if (exchangeName != "") { - if (exchangePassive) { - ch.exchangeDeclarePassive(exchangeName) - } else { - ch.exchangeDeclare(exchangeName, exchangeType.toString, exchangeDurable, exchangeAutoDelete, JavaConversions.asMap(configurationArguments)) - } + + exchangeParameters.foreach { + params => + import params._ + exchangeDeclaration match { + case PassiveDeclaration => ch.exchangeDeclarePassive(exchangeName) + case ActiveDeclaration(durable, autoDelete, _) => + ch.exchangeDeclare(exchangeName, exchangeType.toString, durable, autoDelete, JavaConversions.asMap(configurationArguments)) + case NoActionDeclaration => // ignore + } } ch.addShutdownListener(new ShutdownListener { def shutdownCompleted(cause: ShutdownSignalException) = { diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala index 0fd3f715b5..72a897dccf 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/FaultTolerantConnectionActor.scala @@ -19,7 +19,7 @@ private[amqp] class FaultTolerantConnectionActor(connectionParameters: Connectio self.lifeCycle = Some(LifeCycle(Permanent)) self.trapExit = List(classOf[Throwable]) - self.faultHandler = Some(OneForOneStrategy(5, 5000)) + self.faultHandler = Some(OneForOneStrategy(None, None)) // never die val reconnectionTimer = new Timer("%s-timer".format(self.id)) diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala index 3551ffa276..84c80d683b 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/ProducerActor.scala @@ -7,14 +7,14 @@ package se.scalablesolutions.akka.amqp import com.rabbitmq.client._ import se.scalablesolutions.akka.amqp.AMQP.ProducerParameters -import se.scalablesolutions.akka.AkkaException private[amqp] class ProducerActor(producerParameters: ProducerParameters) extends FaultTolerantChannelActor( producerParameters.exchangeParameters, producerParameters.channelParameters) { import producerParameters._ - import exchangeParameters._ + + val exchangeName = exchangeParameters.flatMap(params => Some(params.exchangeName)) producerId.foreach(id => self.id = id) @@ -22,7 +22,7 @@ private[amqp] class ProducerActor(producerParameters: ProducerParameters) case message@Message(payload, routingKey, mandatory, immediate, properties) if channel.isDefined => { log.debug("Sending message [%s]", message) - channel.foreach(_.basicPublish(exchangeName, routingKey, mandatory, immediate, properties.getOrElse(null), payload)) + channel.foreach(_.basicPublish(exchangeName.getOrElse(null), routingKey, mandatory, immediate, properties.getOrElse(null), payload)) } case message@Message(payload, routingKey, mandatory, immediate, properties) => { log.warning("Unable to send message [%s]", message) @@ -55,9 +55,6 @@ private[amqp] class ProducerActor(producerParameters: ProducerParameters) override def toString = "AMQP.Poducer[id= "+ self.id + - ", exchange=" + exchangeName + - ", exchangeType=" + exchangeType + - ", durable=" + exchangeDurable + - ", autoDelete=" + exchangeAutoDelete + "]" + ", exchangeParameters=" + exchangeParameters + "]" } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala index 5c50fd4670..394315a68b 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RPC.scala @@ -9,29 +9,30 @@ import se.scalablesolutions.akka.amqp._ object RPC { def newRpcClient[O, I](connection: ActorRef, - exchangeParameters: ExchangeParameters, + exchangeName: String, routingKey: String, serializer: RpcClientSerializer[O, I], channelParameters: Option[ChannelParameters] = None): ActorRef = { val rpcActor: ActorRef = actorOf(new RpcClientActor[O, I]( - exchangeParameters, routingKey, serializer, channelParameters)) + ExchangeParameters(exchangeName), routingKey, serializer, channelParameters)) connection.startLink(rpcActor) rpcActor ! Start rpcActor } def newRpcServer[I, O](connection: ActorRef, - exchangeParameters: ExchangeParameters, + exchangeName: String, routingKey: String, serializer: RpcServerSerializer[I, O], requestHandler: I => O, queueName: Option[String] = None, channelParameters: Option[ChannelParameters] = None): RpcServerHandle = { - val producer = newProducer(connection, ProducerParameters( - ExchangeParameters("", ExchangeType.Direct), channelParameters = channelParameters)) + + val producer = newProducer(connection, ProducerParameters(channelParameters = channelParameters)) val rpcServer = actorOf(new RpcServerActor[I, O](producer, serializer, requestHandler)) - val consumer = newConsumer(connection, ConsumerParameters(exchangeParameters, routingKey, rpcServer, - channelParameters = channelParameters, selfAcknowledging = false, queueName = queueName)) + val consumer = newConsumer(connection, ConsumerParameters(routingKey, rpcServer, + exchangeParameters = Some(ExchangeParameters(exchangeName)), channelParameters = channelParameters, + selfAcknowledging = false, queueName = queueName)) RpcServerHandle(producer, consumer) } @@ -66,7 +67,7 @@ object RPC { def newProtobufRpcServer[I <: Message, O <: Message]( connection: ActorRef, - exchange: String, + exchangeName: String, requestHandler: I => O, routingKey: Option[String] = None, queueName: Option[String] = None, @@ -82,12 +83,12 @@ object RPC { def toBinary(t: O) = t.toByteArray }) - startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer) + startServer(connection, exchangeName, requestHandler, routingKey, queueName, durable, autoDelete, serializer) } def newProtobufRpcClient[O <: Message, I <: Message]( connection: ActorRef, - exchange: String, + exchangeName: String, routingKey: Option[String] = None, durable: Boolean = false, autoDelete: Boolean = true, @@ -103,11 +104,11 @@ object RPC { } }) - startClient(connection, exchange, routingKey, durable, autoDelete, passive, serializer) + startClient(connection, exchangeName, routingKey, durable, autoDelete, passive, serializer) } def newStringRpcServer(connection: ActorRef, - exchange: String, + exchangeName: String, requestHandler: String => String, routingKey: Option[String] = None, queueName: Option[String] = None, @@ -123,7 +124,7 @@ object RPC { def toBinary(t: String) = t.getBytes }) - startServer(connection, exchange, requestHandler, routingKey, queueName, durable, autoDelete, serializer) + startServer(connection, exchangeName, requestHandler, routingKey, queueName, durable, autoDelete, serializer) } def newStringRpcClient(connection: ActorRef, @@ -147,23 +148,21 @@ object RPC { } private def startClient[O, I](connection: ActorRef, - exchange: String, + exchangeName: String, routingKey: Option[String] = None, durable: Boolean = false, autoDelete: Boolean = true, passive: Boolean = true, serializer: RpcClientSerializer[O, I]): RpcClient[O, I] = { - val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, - exchangeDurable = durable, exchangeAutoDelete = autoDelete, exchangePassive = passive) - val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) - val client = newRpcClient(connection, exchangeParameters, rKey, serializer) + val client = newRpcClient(connection, exchangeName, rKey, serializer) new RpcClient(client) } private def startServer[I, O](connection: ActorRef, - exchange: String, + exchangeName: String, requestHandler: I => O, routingKey: Option[String] = None, queueName: Option[String] = None, @@ -171,12 +170,10 @@ object RPC { autoDelete: Boolean = true, serializer: RpcServerSerializer[I, O]): RpcServerHandle = { - val exchangeParameters = ExchangeParameters(exchange, ExchangeType.Topic, - exchangeDurable = durable, exchangeAutoDelete = autoDelete) - val rKey = routingKey.getOrElse("%s.request".format(exchange)) + val rKey = routingKey.getOrElse("%s.request".format(exchangeName)) val qName = queueName.getOrElse("%s.in".format(rKey)) - newRpcServer(connection, exchangeParameters, rKey, serializer, requestHandler, queueName = Some(qName)) + newRpcServer(connection, exchangeName, rKey, serializer, requestHandler, Some(qName)) } } diff --git a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala index 10596e393f..90fe3ac66a 100644 --- a/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala +++ b/akka-amqp/src/main/scala/se/scalablesolutions/akka/amqp/rpc/RpcClientActor.scala @@ -13,7 +13,7 @@ class RpcClientActor[I,O]( routingKey: String, serializer: RpcClientSerializer[I,O], channelParameters: Option[ChannelParameters] = None) - extends FaultTolerantChannelActor(exchangeParameters, channelParameters) { + extends FaultTolerantChannelActor(Some(exchangeParameters), channelParameters) { import exchangeParameters._ diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 28360f155b..16aae50d09 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -40,19 +40,19 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- object Repositories { - lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") - lazy val CasbahRepo = MavenRepository("Casbah Repo", "http://repo.bumnetworks.com/releases") - lazy val CasbahSnapshotRepo = MavenRepository("Casbah Snapshots", "http://repo.bumnetworks.com/snapshots") - lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") +// lazy val AkkaRepo = MavenRepository("Akka Repository", "http://scalablesolutions.se/akka/repository") +// lazy val CasbahRepo = MavenRepository("Casbah Repo", "http://repo.bumnetworks.com/releases") +// lazy val CasbahSnapshotRepo = MavenRepository("Casbah Snapshots", "http://repo.bumnetworks.com/snapshots") +// lazy val CodehausRepo = MavenRepository("Codehaus Repo", "http://repository.codehaus.org") lazy val EmbeddedRepo = MavenRepository("Embedded Repo", (info.projectPath / "embedded-repo").asURL.toString) - lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") - lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") - lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") - lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") - lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") - lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") - lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases") - lazy val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/") +// lazy val FusesourceSnapshotRepo = MavenRepository("Fusesource Snapshots", "http://repo.fusesource.com/nexus/content/repositories/snapshots") +// lazy val GuiceyFruitRepo = MavenRepository("GuiceyFruit Repo", "http://guiceyfruit.googlecode.com/svn/repo/releases/") +// lazy val JBossRepo = MavenRepository("JBoss Repo", "https://repository.jboss.org/nexus/content/groups/public/") +// lazy val JavaNetRepo = MavenRepository("java.net Repo", "http://download.java.net/maven/2") +// lazy val SonatypeSnapshotRepo = MavenRepository("Sonatype OSS Repo", "http://oss.sonatype.org/content/repositories/releases") +// lazy val SunJDMKRepo = MavenRepository("Sun JDMK Repo", "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo") +// lazy val CasbahRepoReleases = MavenRepository("Casbah Release Repo", "http://repo.bumnetworks.com/releases") +// lazy val ZookeeperRepo = MavenRepository("Zookeeper Repo", "http://lilycms.org/maven/maven2/deploy/") } // ------------------------------------------------------------------------------------------------------------------- @@ -63,27 +63,31 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- import Repositories._ - lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) - lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository) - lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) - // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo) - lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) - lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) - lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) - lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) - lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) - lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) - lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) - lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo) - lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) - lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) - lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository) - lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository) - lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases) - lazy val zookeeperRelease = ModuleConfiguration("org.apache.hadoop.zookeeper",ZookeeperRepo) - lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo) - lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo) +// lazy val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", SonatypeSnapshotRepo) +// lazy val jettyModuleConfig = ModuleConfiguration("org.eclipse.jetty", sbt.DefaultMavenRepository) +// lazy val guiceyFruitModuleConfig = ModuleConfiguration("org.guiceyfruit", GuiceyFruitRepo) +// // lazy val hawtdispatchModuleConfig = ModuleConfiguration("org.fusesource.hawtdispatch", FusesourceSnapshotRepo) +// lazy val jbossModuleConfig = ModuleConfiguration("org.jboss", JBossRepo) +// lazy val jdmkModuleConfig = ModuleConfiguration("com.sun.jdmk", SunJDMKRepo) +// lazy val jmsModuleConfig = ModuleConfiguration("javax.jms", SunJDMKRepo) +// lazy val jmxModuleConfig = ModuleConfiguration("com.sun.jmx", SunJDMKRepo) +// lazy val jerseyContrModuleConfig = ModuleConfiguration("com.sun.jersey.contribs", JavaNetRepo) +// lazy val jerseyModuleConfig = ModuleConfiguration("com.sun.jersey", JavaNetRepo) +// lazy val jgroupsModuleConfig = ModuleConfiguration("jgroups", JBossRepo) +// lazy val multiverseModuleConfig = ModuleConfiguration("org.multiverse", CodehausRepo) +// lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) +// lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) +// lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository) +// lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository) +// lazy val casbahRelease = ModuleConfiguration("com.novus",CasbahRepoReleases) +// lazy val zookeeperRelease = ModuleConfiguration("org.apache.hadoop.zookeeper",ZookeeperRepo) +// lazy val casbahModuleConfig = ModuleConfiguration("com.novus", CasbahRepo) +// lazy val timeModuleConfig = ModuleConfiguration("org.scala-tools", "time", CasbahSnapshotRepo) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! + val mavenLocal = "Local Maven Repository" at "file:/e:/maven-repository" + + val efgfpNexusReleasesRepository = "Nexus Releases" at "http://nexus/nexus/content/groups/public" + val efgfpNexusSnaphotsRepository = "Nexus Snapshots" at "http://nexus/nexus/content/groups/public-snapshots" // ------------------------------------------------------------------------------------------------------------------- // Versions